You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/03/14 06:52:28 UTC

[rocketmq] 07/07: Delete useless code file

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-beta-tmp
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 190617e8b971d09b32e6a7b77cb77be9382fe496
Author: RongtongJin <ji...@mails.ucas.ac.cn>
AuthorDate: Mon Mar 14 14:46:19 2022 +0800

    Delete useless code file
---
 .../broker/transaction/TransactionRecord.java      |  43 --
 .../broker/transaction/TransactionStore.java       |  42 --
 .../transaction/jdbc/JDBCTransactionStore.java     | 242 ------
 .../jdbc/JDBCTransactionStoreConfig.java           |  57 --
 .../rocketmq/broker/util/ServiceProvider.java      | 201 -----
 .../processor/DefaultRequestProcessorTest.java     | 523 -------------
 .../remoting/netty/AsyncNettyRequestProcessor.java |  29 -
 .../org/apache/rocketmq/store/MessageExtBatch.java |  51 --
 .../rocketmq/store/MessageExtBrokerInner.java      |  64 --
 .../schedule/DelayOffsetSerializeWrapper.java      |  34 -
 .../store/schedule/ScheduleMessageService.java     | 819 ---------------------
 .../rocketmq/store/ScheduleMessageServiceTest.java | 194 -----
 .../store/schedule/ScheduleMessageServiceTest.java | 235 ------
 13 files changed, 2534 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionRecord.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionRecord.java
deleted file mode 100644
index 772f08e..0000000
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionRecord.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.broker.transaction;
-
-/**
- * This class will be removed in the version 4.4.0 and {@link OperationResult} class is recommended.
- */
-@Deprecated
-public class TransactionRecord {
-    // Commit Log Offset
-    private long offset;
-    private String producerGroup;
-
-    public long getOffset() {
-        return offset;
-    }
-
-    public void setOffset(long offset) {
-        this.offset = offset;
-    }
-
-    public String getProducerGroup() {
-        return producerGroup;
-    }
-
-    public void setProducerGroup(String producerGroup) {
-        this.producerGroup = producerGroup;
-    }
-}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java
deleted file mode 100644
index 03e0227..0000000
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.broker.transaction;
-
-import java.util.List;
-
-/**
- * This class will be removed in ther version 4.4.0, and {@link TransactionalMessageService} class is recommended.
- */
-@Deprecated
-public interface TransactionStore {
-    boolean open();
-
-    void close();
-
-    boolean put(final List<TransactionRecord> trs);
-
-    void remove(final List<Long> pks);
-
-    List<TransactionRecord> traverse(final long pk, final int nums);
-
-    long totalRecords();
-
-    long minPK();
-
-    long maxPK();
-}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java
deleted file mode 100644
index da4958d..0000000
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.broker.transaction.jdbc;
-
-import java.net.URL;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.rocketmq.broker.transaction.TransactionRecord;
-import org.apache.rocketmq.broker.transaction.TransactionStore;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
-
-public class JDBCTransactionStore implements TransactionStore {
-    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
-    private final JDBCTransactionStoreConfig jdbcTransactionStoreConfig;
-    private Connection connection;
-    private AtomicLong totalRecordsValue = new AtomicLong(0);
-
-    public JDBCTransactionStore(JDBCTransactionStoreConfig jdbcTransactionStoreConfig) {
-        this.jdbcTransactionStoreConfig = jdbcTransactionStoreConfig;
-    }
-
-    @Override
-    public boolean open() {
-        if (this.loadDriver()) {
-            Properties props = new Properties();
-            props.put("user", jdbcTransactionStoreConfig.getJdbcUser());
-            props.put("password", jdbcTransactionStoreConfig.getJdbcPassword());
-
-            try {
-                this.connection =
-                    DriverManager.getConnection(this.jdbcTransactionStoreConfig.getJdbcURL(), props);
-
-                this.connection.setAutoCommit(false);
-
-                if (!this.computeTotalRecords()) {
-                    return this.createDB();
-                }
-
-                return true;
-            } catch (SQLException e) {
-                log.info("Create JDBC Connection Exception", e);
-            }
-        }
-
-        return false;
-    }
-
-    private boolean loadDriver() {
-        try {
-            Class.forName(this.jdbcTransactionStoreConfig.getJdbcDriverClass()).newInstance();
-            log.info("Loaded the appropriate driver, {}",
-                this.jdbcTransactionStoreConfig.getJdbcDriverClass());
-            return true;
-        } catch (Exception e) {
-            log.info("Loaded the appropriate driver Exception", e);
-        }
-
-        return false;
-    }
-
-    private boolean computeTotalRecords() {
-        Statement statement = null;
-        ResultSet resultSet = null;
-        try {
-            statement = this.connection.createStatement();
-
-            resultSet = statement.executeQuery("select count(offset) as total from t_transaction");
-            if (!resultSet.next()) {
-                log.warn("computeTotalRecords ResultSet is empty");
-                return false;
-            }
-
-            this.totalRecordsValue.set(resultSet.getLong(1));
-        } catch (Exception e) {
-            log.warn("computeTotalRecords Exception", e);
-            return false;
-        } finally {
-            if (null != statement) {
-                try {
-                    statement.close();
-                } catch (SQLException e) {
-                }
-            }
-
-            if (null != resultSet) {
-                try {
-                    resultSet.close();
-                } catch (SQLException e) {
-                }
-            }
-        }
-
-        return true;
-    }
-
-    private boolean createDB() {
-        Statement statement = null;
-        try {
-            statement = this.connection.createStatement();
-
-            String sql = this.createTableSql();
-            log.info("createDB SQL:\n {}", sql);
-            statement.execute(sql);
-            this.connection.commit();
-            return true;
-        } catch (Exception e) {
-            log.warn("createDB Exception", e);
-            return false;
-        } finally {
-            if (null != statement) {
-                try {
-                    statement.close();
-                } catch (SQLException e) {
-                    log.warn("Close statement exception", e);
-                }
-            }
-        }
-    }
-
-    private String createTableSql() {
-        URL resource = JDBCTransactionStore.class.getClassLoader().getResource("transaction.sql");
-        String fileContent = MixAll.file2String(resource);
-        return fileContent;
-    }
-
-    @Override
-    public void close() {
-        try {
-            if (this.connection != null) {
-                this.connection.close();
-            }
-        } catch (SQLException e) {
-        }
-    }
-
-    @Override
-    public boolean put(List<TransactionRecord> trs) {
-        PreparedStatement statement = null;
-        try {
-            this.connection.setAutoCommit(false);
-            statement = this.connection.prepareStatement("insert into t_transaction values (?, ?)");
-            for (TransactionRecord tr : trs) {
-                statement.setLong(1, tr.getOffset());
-                statement.setString(2, tr.getProducerGroup());
-                statement.addBatch();
-            }
-            int[] executeBatch = statement.executeBatch();
-            this.connection.commit();
-            this.totalRecordsValue.addAndGet(updatedRows(executeBatch));
-            return true;
-        } catch (Exception e) {
-            log.warn("createDB Exception", e);
-            return false;
-        } finally {
-            if (null != statement) {
-                try {
-                    statement.close();
-                } catch (SQLException e) {
-                    log.warn("Close statement exception", e);
-                }
-            }
-        }
-    }
-
-    private long updatedRows(int[] rows) {
-        long res = 0;
-        for (int i : rows) {
-            res += i;
-        }
-
-        return res;
-    }
-
-    @Override
-    public void remove(List<Long> pks) {
-        PreparedStatement statement = null;
-        try {
-            this.connection.setAutoCommit(false);
-            statement = this.connection.prepareStatement("DELETE FROM t_transaction WHERE offset = ?");
-            for (long pk : pks) {
-                statement.setLong(1, pk);
-                statement.addBatch();
-            }
-            int[] executeBatch = statement.executeBatch();
-            this.connection.commit();
-        } catch (Exception e) {
-            log.warn("createDB Exception", e);
-        } finally {
-            if (null != statement) {
-                try {
-                    statement.close();
-                } catch (SQLException e) {
-                }
-            }
-        }
-    }
-
-    @Override
-    public List<TransactionRecord> traverse(long pk, int nums) {
-        return null;
-    }
-
-    @Override
-    public long totalRecords() {
-        return this.totalRecordsValue.get();
-    }
-
-    @Override
-    public long minPK() {
-        return 0;
-    }
-
-    @Override
-    public long maxPK() {
-        return 0;
-    }
-}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java
deleted file mode 100644
index 4b07959..0000000
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.broker.transaction.jdbc;
-
-public class JDBCTransactionStoreConfig {
-    private String jdbcDriverClass = "com.mysql.jdbc.Driver";
-    private String jdbcURL = "jdbc:mysql://xxx.xxx.xxx.xxx:1000/xxx?useUnicode=true&characterEncoding=UTF-8";
-    private String jdbcUser = "xxx";
-    private String jdbcPassword = "xxx";
-
-    public String getJdbcDriverClass() {
-        return jdbcDriverClass;
-    }
-
-    public void setJdbcDriverClass(String jdbcDriverClass) {
-        this.jdbcDriverClass = jdbcDriverClass;
-    }
-
-    public String getJdbcURL() {
-        return jdbcURL;
-    }
-
-    public void setJdbcURL(String jdbcURL) {
-        this.jdbcURL = jdbcURL;
-    }
-
-    public String getJdbcUser() {
-        return jdbcUser;
-    }
-
-    public void setJdbcUser(String jdbcUser) {
-        this.jdbcUser = jdbcUser;
-    }
-
-    public String getJdbcPassword() {
-        return jdbcPassword;
-    }
-
-    public void setJdbcPassword(String jdbcPassword) {
-        this.jdbcPassword = jdbcPassword;
-    }
-}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java b/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java
deleted file mode 100644
index e679660..0000000
--- a/broker/src/main/java/org/apache/rocketmq/broker/util/ServiceProvider.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to
- * You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.rocketmq.broker.util;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.List;
-
-public class ServiceProvider {
-
-    private final static Logger LOG = LoggerFactory
-        .getLogger(ServiceProvider.class);
-    /**
-     * A reference to the classloader that loaded this class. It's more efficient to compute it once and cache it here.
-     */
-    private static ClassLoader thisClassLoader;
-
-    /**
-     * JDK1.3+ <a href= "http://java.sun.com/j2se/1.3/docs/guide/jar/jar.html#Service%20Provider" > 'Service Provider' specification</a>.
-     */
-    public static final String TRANSACTION_SERVICE_ID = "META-INF/service/org.apache.rocketmq.broker.transaction.TransactionalMessageService";
-
-    public static final String TRANSACTION_LISTENER_ID = "META-INF/service/org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener";
-
-
-    public static final String RPC_HOOK_ID = "META-INF/service/org.apache.rocketmq.remoting.RPCHook";
-
-
-    public static final String ACL_VALIDATOR_ID = "META-INF/service/org.apache.rocketmq.acl.AccessValidator";
-
-
-
-    static {
-        thisClassLoader = getClassLoader(ServiceProvider.class);
-    }
-
-    /**
-     * Returns a string that uniquely identifies the specified object, including its class.
-     * <p>
-     * The returned string is of form "classname@hashcode", ie is the same as the return value of the Object.toString() method, but works even when the specified object's class has overidden the toString method.
-     *
-     * @param o may be null.
-     * @return a string of form classname@hashcode, or "null" if param o is null.
-     */
-    protected static String objectId(Object o) {
-        if (o == null) {
-            return "null";
-        } else {
-            return o.getClass().getName() + "@" + System.identityHashCode(o);
-        }
-    }
-
-    protected static ClassLoader getClassLoader(Class<?> clazz) {
-        try {
-            return clazz.getClassLoader();
-        } catch (SecurityException e) {
-            LOG.error("Unable to get classloader for class {} due to security restrictions !",
-                clazz, e.getMessage());
-            throw e;
-        }
-    }
-
-    protected static ClassLoader getContextClassLoader() {
-        ClassLoader classLoader = null;
-        try {
-            classLoader = Thread.currentThread().getContextClassLoader();
-        } catch (SecurityException ex) {
-            /**
-             * The getContextClassLoader() method throws SecurityException when the context
-             * class loader isn't an ancestor of the calling class's class
-             * loader, or if security permissions are restricted.
-             */
-        }
-        return classLoader;
-    }
-
-    protected static InputStream getResourceAsStream(ClassLoader loader, String name) {
-        if (loader != null) {
-            return loader.getResourceAsStream(name);
-        } else {
-            return ClassLoader.getSystemResourceAsStream(name);
-        }
-    }
-
-    public static <T> List<T> load(String name, Class<?> clazz) {
-        LOG.info("Looking for a resource file of name [{}] ...", name);
-        List<T> services = new ArrayList<T>();
-        try {
-            ArrayList<String> names = new ArrayList<String>();
-            final InputStream is = getResourceAsStream(getContextClassLoader(), name);
-            if (is != null) {
-                BufferedReader reader;
-                try {
-                    reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
-                } catch (java.io.UnsupportedEncodingException e) {
-                    reader = new BufferedReader(new InputStreamReader(is));
-                }
-                String serviceName = reader.readLine();
-                while (serviceName != null && !"".equals(serviceName)) {
-                    LOG.info(
-                        "Creating an instance as specified by file {} which was present in the path of the context classloader.",
-                        name);
-                    if (!names.contains(serviceName)) {
-                        names.add(serviceName);
-                    }
-
-                    services.add((T)initService(getContextClassLoader(), serviceName, clazz));
-
-                    serviceName = reader.readLine();
-                }
-                reader.close();
-            } else {
-                // is == null
-                LOG.warn("No resource file with name [{}] found.", name);
-            }
-        } catch (Exception e) {
-            LOG.error("Error occured when looking for resource file " + name, e);
-        }
-        return services;
-    }
-
-    public static <T> T loadClass(String name, Class<?> clazz) {
-        final InputStream is = getResourceAsStream(getContextClassLoader(), name);
-        if (is != null) {
-            BufferedReader reader;
-            try {
-                try {
-                    reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
-                } catch (java.io.UnsupportedEncodingException e) {
-                    reader = new BufferedReader(new InputStreamReader(is));
-                }
-                String serviceName = reader.readLine();
-                reader.close();
-                if (serviceName != null && !"".equals(serviceName)) {
-                    return initService(getContextClassLoader(), serviceName, clazz);
-                } else {
-                    LOG.warn("ServiceName is empty!");
-                    return null;
-                }
-            } catch (Exception e) {
-                LOG.warn("Error occurred when looking for resource file " + name, e);
-            }
-        }
-        return null;
-    }
-
-    protected static <T> T initService(ClassLoader classLoader, String serviceName, Class<?> clazz) {
-        Class<?> serviceClazz = null;
-        try {
-            if (classLoader != null) {
-                try {
-                    // Warning: must typecast here & allow exception to be generated/caught & recast properly
-                    serviceClazz = classLoader.loadClass(serviceName);
-                    if (clazz.isAssignableFrom(serviceClazz)) {
-                        LOG.info("Loaded class {} from classloader {}", serviceClazz.getName(),
-                            objectId(classLoader));
-                    } else {
-                        // This indicates a problem with the ClassLoader tree. An incompatible ClassLoader was used to load the implementation.
-                        LOG.error(
-                            "Class {} loaded from classloader {} does not extend {} as loaded by this classloader.",
-                            new Object[] {serviceClazz.getName(),
-                                objectId(serviceClazz.getClassLoader()), clazz.getName()});
-                    }
-                    return (T)serviceClazz.newInstance();
-                } catch (ClassNotFoundException ex) {
-                    if (classLoader == thisClassLoader) {
-                        // Nothing more to try, onwards.
-                        LOG.warn("Unable to locate any class {} via classloader", serviceName,
-                            objectId(classLoader));
-                        throw ex;
-                    }
-                    // Ignore exception, continue
-                } catch (NoClassDefFoundError e) {
-                    if (classLoader == thisClassLoader) {
-                        // Nothing more to try, onwards.
-                        LOG.warn(
-                            "Class {} cannot be loaded via classloader {}.it depends on some other class that cannot be found.",
-                            serviceClazz, objectId(classLoader));
-                        throw e;
-                    }
-                    // Ignore exception, continue
-                }
-            }
-        } catch (Exception e) {
-            LOG.error("Unable to init service.", e);
-        }
-        return (T)serviceClazz;
-    }
-}
diff --git a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java b/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java
deleted file mode 100644
index 6e3e6ef..0000000
--- a/namesrv/src/test/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessorTest.java
+++ /dev/null
@@ -1,523 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.namesrv.processor;
-
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.namesrv.NamesrvConfig;
-import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
-import org.apache.rocketmq.common.protocol.RequestCode;
-import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
-import org.apache.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader;
-import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHeader;
-import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHeader;
-import org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader;
-import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
-import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.namesrv.NamesrvController;
-import org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.netty.NettyServerConfig;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.assertj.core.util.Maps;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class DefaultRequestProcessorTest {
-    private DefaultRequestProcessor defaultRequestProcessor;
-
-    private NamesrvController namesrvController;
-
-    private NamesrvConfig namesrvConfig;
-
-    private NettyServerConfig nettyServerConfig;
-
-    private RouteInfoManager routeInfoManager;
-
-    private InternalLogger logger;
-
-    @Before
-    public void init() throws Exception {
-        namesrvConfig = new NamesrvConfig();
-        nettyServerConfig = new NettyServerConfig();
-        routeInfoManager = new RouteInfoManager();
-
-        namesrvController = new NamesrvController(namesrvConfig, nettyServerConfig);
-
-        Field field = NamesrvController.class.getDeclaredField("routeInfoManager");
-        field.setAccessible(true);
-        field.set(namesrvController, routeInfoManager);
-        defaultRequestProcessor = new DefaultRequestProcessor(namesrvController);
-
-        registerRouteInfoManager();
-
-        logger = mock(InternalLogger.class);
-        setFinalStatic(DefaultRequestProcessor.class.getDeclaredField("log"), logger);
-    }
-
-    @Test
-    public void testProcessRequest_PutKVConfig() throws RemotingCommandException {
-        PutKVConfigRequestHeader header = new PutKVConfigRequestHeader();
-        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PUT_KV_CONFIG,
-            header);
-        request.addExtField("namespace", "namespace");
-        request.addExtField("key", "key");
-        request.addExtField("value", "value");
-
-        RemotingCommand response = defaultRequestProcessor.processRequest(null, request);
-
-        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
-        assertThat(response.getRemark()).isNull();
-
-        assertThat(namesrvController.getKvConfigManager().getKVConfig("namespace", "key"))
-            .isEqualTo("value");
-    }
-
-    @Test
-    public void testProcessRequest_GetKVConfigReturnNotNull() throws RemotingCommandException {
-        namesrvController.getKvConfigManager().putKVConfig("namespace", "key", "value");
-
-        GetKVConfigRequestHeader header = new GetKVConfigRequestHeader();
-        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_KV_CONFIG,
-            header);
-        request.addExtField("namespace", "namespace");
-        request.addExtField("key", "key");
-
-        RemotingCommand response = defaultRequestProcessor.processRequest(null, request);
-
-        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
-        assertThat(response.getRemark()).isNull();
-
-        GetKVConfigResponseHeader responseHeader = (GetKVConfigResponseHeader) response
-            .readCustomHeader();
-
-        assertThat(responseHeader.getValue()).isEqualTo("value");
-    }
-
-    @Test
-    public void testProcessRequest_GetKVConfigReturnNull() throws RemotingCommandException {
-        GetKVConfigRequestHeader header = new GetKVConfigRequestHeader();
-        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_KV_CONFIG,
-            header);
-        request.addExtField("namespace", "namespace");
-        request.addExtField("key", "key");
-
-        RemotingCommand response = defaultRequestProcessor.processRequest(null, request);
-
-        assertThat(response.getCode()).isEqualTo(ResponseCode.QUERY_NOT_FOUND);
-        assertThat(response.getRemark()).isEqualTo("No config item, Namespace: namespace Key: key");
-
-        GetKVConfigResponseHeader responseHeader = (GetKVConfigResponseHeader) response
-            .readCustomHeader();
-
-        assertThat(responseHeader.getValue()).isNull();
-    }
-
-    @Test
-    public void testProcessRequest_DeleteKVConfig() throws RemotingCommandException {
-        namesrvController.getKvConfigManager().putKVConfig("namespace", "key", "value");
-
-        DeleteKVConfigRequestHeader header = new DeleteKVConfigRequestHeader();
-        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_KV_CONFIG,
-            header);
-        request.addExtField("namespace", "namespace");
-        request.addExtField("key", "key");
-
-        RemotingCommand response = defaultRequestProcessor.processRequest(null, request);
-
-        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
-        assertThat(response.getRemark()).isNull();
-
-        assertThat(namesrvController.getKvConfigManager().getKVConfig("namespace", "key"))
-            .isNull();
-    }
-
-    @Test
-    public void testProcessRequest_RegisterBroker() throws RemotingCommandException,
-        NoSuchFieldException, IllegalAccessException {
-        RemotingCommand request = genSampleRegisterCmd(true);
-
-        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
-        when(ctx.channel()).thenReturn(null);
-
-        RemotingCommand response = defaultRequestProcessor.processRequest(ctx, request);
-
-        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
-        assertThat(response.getRemark()).isNull();
-
-        RouteInfoManager routes = namesrvController.getRouteInfoManager();
-        Field brokerAddrTable = RouteInfoManager.class.getDeclaredField("brokerAddrTable");
-        brokerAddrTable.setAccessible(true);
-
-        BrokerData broker = new BrokerData();
-        broker.setBrokerName("broker");
-        broker.setBrokerAddrs((HashMap) Maps.newHashMap(new Long(2333), "10.10.1.1"));
-
-        assertThat((Map) brokerAddrTable.get(routes))
-            .contains(new HashMap.SimpleEntry("broker", broker));
-    }
-
-    /*@Test
-    public void testProcessRequest_RegisterBrokerLogicalQueue() throws Exception {
-        String cluster = "cluster";
-        String broker1Name = "broker1";
-        String broker1Addr = "10.10.1.1";
-        String broker2Name = "broker2";
-        String broker2Addr = "10.10.1.2";
-        String topic = "foobar";
-
-        LogicalQueueRouteData queueRouteData1 = new LogicalQueueRouteData(0, 0, new MessageQueue(topic, broker1Name, 0), MessageQueueRouteState.ReadOnly, 0, 10, 100, 100, broker1Addr);
-        {
-            RegisterBrokerRequestHeader header = new RegisterBrokerRequestHeader();
-            header.setBrokerName(broker1Name);
-            RemotingCommand request = RemotingCommand.createRequestCommand(
-                RequestCode.REGISTER_BROKER, header);
-            request.addExtField("brokerName", broker1Name);
-            request.addExtField("brokerAddr", broker1Addr);
-            request.addExtField("clusterName", cluster);
-            request.addExtField("haServerAddr", "10.10.2.1");
-            request.addExtField("brokerId", String.valueOf(MixAll.MASTER_ID));
-            request.setVersion(MQVersion.CURRENT_VERSION);
-            TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
-            topicConfigSerializeWrapper.setTopicConfigTable(new ConcurrentHashMap<>(Collections.singletonMap(topic, new TopicConfig(topic))));
-            topicConfigSerializeWrapper.setLogicalQueuesInfoMap(Maps.newHashMap(topic, new LogicalQueuesInfo(Collections.singletonMap(0, Lists.newArrayList(
-                queueRouteData1
-            )))));
-            topicConfigSerializeWrapper.setDataVersion(new DataVersion());
-            RegisterBrokerBody requestBody = new RegisterBrokerBody();
-            requestBody.setTopicConfigSerializeWrapper(topicConfigSerializeWrapper);
-            requestBody.setFilterServerList(Lists.<String>newArrayList());
-            request.setBody(requestBody.encode());
-
-            ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
-            when(ctx.channel()).thenReturn(null);
-
-            RemotingCommand response = defaultRequestProcessor.processRequest(ctx, request);
-
-            assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
-            assertThat(response.getRemark()).isNull();
-        }
-        LogicalQueueRouteData queueRouteData2 = new LogicalQueueRouteData(0, 100, new MessageQueue(topic, broker2Name, 0), MessageQueueRouteState.Normal, 0, -1, -1, -1, broker2Addr);
-        LogicalQueueRouteData queueRouteData3 = new LogicalQueueRouteData(1, 100, new MessageQueue(topic, broker2Name, 0), MessageQueueRouteState.Normal, 0, -1, -1, -1, broker2Addr);
-        {
-            RegisterBrokerRequestHeader header = new RegisterBrokerRequestHeader();
-            header.setBrokerName(broker2Name);
-            RemotingCommand request = RemotingCommand.createRequestCommand(
-                RequestCode.REGISTER_BROKER, header);
-            request.addExtField("brokerName", broker2Name);
-            request.addExtField("brokerAddr", broker2Addr);
-            request.addExtField("clusterName", cluster);
-            request.addExtField("haServerAddr", "10.10.2.1");
-            request.addExtField("brokerId", String.valueOf(MixAll.MASTER_ID));
-            request.setVersion(MQVersion.CURRENT_VERSION);
-            TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
-            topicConfigSerializeWrapper.setTopicConfigTable(new ConcurrentHashMap<>(Collections.singletonMap(topic, new TopicConfig(topic))));
-            topicConfigSerializeWrapper.setLogicalQueuesInfoMap(Maps.newHashMap(topic, new LogicalQueuesInfo(ImmutableMap.of(
-                0, Collections.singletonList(queueRouteData2),
-                1, Collections.singletonList(queueRouteData3)
-            ))));
-            topicConfigSerializeWrapper.setDataVersion(new DataVersion());
-            RegisterBrokerBody requestBody = new RegisterBrokerBody();
-            requestBody.setTopicConfigSerializeWrapper(topicConfigSerializeWrapper);
-            requestBody.setFilterServerList(Lists.<String>newArrayList());
-            request.setBody(requestBody.encode());
-
-            ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
-            when(ctx.channel()).thenReturn(null);
-
-            RemotingCommand response = defaultRequestProcessor.processRequest(ctx, request);
-
-            assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
-            assertThat(response.getRemark()).isNull();
-        }
-
-        {
-            GetRouteInfoRequestHeader header = new GetRouteInfoRequestHeader();
-            header.setTopic(topic);
-            header.setSysFlag(MessageSysFlag.LOGICAL_QUEUE_FLAG);
-            RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, header);
-            request.makeCustomHeaderToNet();
-
-            ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
-            when(ctx.channel()).thenReturn(null);
-
-            RemotingCommand response = defaultRequestProcessor.processRequest(ctx, request);
-
-            assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
-
-            TopicRouteDataNameSrv topicRouteDataNameSrv = JSON.parseObject(response.getBody(), TopicRouteDataNameSrv.class);
-            assertThat(topicRouteDataNameSrv).isNotNull();
-            LogicalQueuesInfoUnordered logicalQueuesInfoUnordered = new LogicalQueuesInfoUnordered();
-            logicalQueuesInfoUnordered.put(0, ImmutableMap.of(
-                new LogicalQueuesInfoUnordered.Key(queueRouteData1.getBrokerName(), queueRouteData1.getQueueId(), queueRouteData1.getOffsetDelta()), queueRouteData1,
-                new LogicalQueuesInfoUnordered.Key(queueRouteData2.getBrokerName(), queueRouteData2.getQueueId(), queueRouteData2.getOffsetDelta()), queueRouteData2
-            ));
-            logicalQueuesInfoUnordered.put(1, ImmutableMap.of(new LogicalQueuesInfoUnordered.Key(queueRouteData3.getBrokerName(), queueRouteData3.getQueueId(), queueRouteData3.getOffsetDelta()), queueRouteData3));
-            assertThat(topicRouteDataNameSrv.getLogicalQueuesInfoUnordered()).isEqualTo(logicalQueuesInfoUnordered);
-        }
-    }
-*/
-    @Test
-    public void testProcessRequest_RegisterBrokerWithFilterServer() throws RemotingCommandException,
-        NoSuchFieldException, IllegalAccessException {
-        RemotingCommand request = genSampleRegisterCmd(true);
-
-        // version >= MQVersion.Version.V3_0_11.ordinal() to register with filter server
-        request.setVersion(100);
-
-        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
-        when(ctx.channel()).thenReturn(null);
-
-        RemotingCommand response = defaultRequestProcessor.processRequest(ctx, request);
-
-        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
-        assertThat(response.getRemark()).isNull();
-
-        RouteInfoManager routes = namesrvController.getRouteInfoManager();
-        Field brokerAddrTable = RouteInfoManager.class.getDeclaredField("brokerAddrTable");
-        brokerAddrTable.setAccessible(true);
-
-        BrokerData broker = new BrokerData();
-        broker.setBrokerName("broker");
-        broker.setBrokerAddrs((HashMap) Maps.newHashMap(new Long(2333), "10.10.1.1"));
-
-        assertThat((Map) brokerAddrTable.get(routes))
-            .contains(new HashMap.SimpleEntry("broker", broker));
-    }
-
-    @Test
-    public void testProcessRequest_UnregisterBroker() throws RemotingCommandException, NoSuchFieldException, IllegalAccessException {
-        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
-        when(ctx.channel()).thenReturn(null);
-
-        //Register broker
-        RemotingCommand regRequest = genSampleRegisterCmd(true);
-        defaultRequestProcessor.processRequest(ctx, regRequest);
-
-        //Unregister broker
-        RemotingCommand unregRequest = genSampleRegisterCmd(false);
-        RemotingCommand unregResponse = defaultRequestProcessor.processRequest(ctx, unregRequest);
-
-        assertThat(unregResponse.getCode()).isEqualTo(ResponseCode.SUCCESS);
-        assertThat(unregResponse.getRemark()).isNull();
-
-        RouteInfoManager routes = namesrvController.getRouteInfoManager();
-        Field brokerAddrTable = RouteInfoManager.class.getDeclaredField("brokerAddrTable");
-        brokerAddrTable.setAccessible(true);
-
-        assertThat((Map) brokerAddrTable.get(routes)).isNotEmpty();
-    }
-
-    @Test
-    public void testGetRouteInfoByTopic() throws RemotingCommandException {
-        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
-        when(ctx.channel()).thenReturn(null);
-        RemotingCommand request = getRemotingCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC);
-        RemotingCommand remotingCommandSuccess = defaultRequestProcessor.processRequest(ctx, request);
-        assertThat(remotingCommandSuccess.getCode()).isEqualTo(ResponseCode.SUCCESS);
-        request.getExtFields().put("topic", "test");
-        RemotingCommand remotingCommandNoTopicRouteInfo = defaultRequestProcessor.processRequest(ctx, request);
-        assertThat(remotingCommandNoTopicRouteInfo.getCode()).isEqualTo(ResponseCode.TOPIC_NOT_EXIST);
-    }
-
-    @Test
-    public void testGetBrokerClusterInfo() throws RemotingCommandException {
-        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
-        when(ctx.channel()).thenReturn(null);
-        RemotingCommand request = getRemotingCommand(RequestCode.GET_BROKER_CLUSTER_INFO);
-        RemotingCommand remotingCommand = defaultRequestProcessor.processRequest(ctx, request);
-        assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.SUCCESS);
-    }
-
-    @Test
-    public void testWipeWritePermOfBroker() throws RemotingCommandException {
-        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
-        when(ctx.channel()).thenReturn(null);
-        RemotingCommand request = getRemotingCommand(RequestCode.WIPE_WRITE_PERM_OF_BROKER);
-        RemotingCommand remotingCommand = defaultRequestProcessor.processRequest(ctx, request);
-        assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.SUCCESS);
-    }
-
-    @Test
-    public void testGetAllTopicListFromNameserver() throws RemotingCommandException {
-        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
-        when(ctx.channel()).thenReturn(null);
-        RemotingCommand request = getRemotingCommand(RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER);
-        RemotingCommand remotingCommand = defaultRequestProcessor.processRequest(ctx, request);
-        assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.SUCCESS);
-    }
-
-    @Test
-    public void testDeleteTopicInNamesrv() throws RemotingCommandException {
-        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
-        when(ctx.channel()).thenReturn(null);
-        RemotingCommand request = getRemotingCommand(RequestCode.DELETE_TOPIC_IN_NAMESRV);
-        RemotingCommand remotingCommand = defaultRequestProcessor.processRequest(ctx, request);
-        assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.SUCCESS);
-    }
-
-    @Test
-    public void testGetKVListByNamespace() throws RemotingCommandException {
-        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
-        when(ctx.channel()).thenReturn(null);
-        RemotingCommand request = getRemotingCommand(RequestCode.GET_KVLIST_BY_NAMESPACE);
-        request.addExtField("namespace", "default-namespace-1");
-        RemotingCommand remotingCommand = defaultRequestProcessor.processRequest(ctx, request);
-        assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.QUERY_NOT_FOUND);
-        namesrvController.getKvConfigManager().putKVConfig("default-namespace-1", "key", "value");
-        RemotingCommand remotingCommandSuccess = defaultRequestProcessor.processRequest(ctx, request);
-        assertThat(remotingCommandSuccess.getCode()).isEqualTo(ResponseCode.SUCCESS);
-    }
-
-    @Test
-    public void testGetTopicsByCluster() throws RemotingCommandException {
-        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
-        when(ctx.channel()).thenReturn(null);
-        RemotingCommand request = getRemotingCommand(RequestCode.GET_TOPICS_BY_CLUSTER);
-        request.addExtField("cluster", "default-cluster");
-        RemotingCommand remotingCommand = defaultRequestProcessor.processRequest(ctx, request);
-        assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.SUCCESS);
-    }
-
-    @Test
-    public void testGetSystemTopicListFromNs() throws RemotingCommandException {
-        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
-        when(ctx.channel()).thenReturn(null);
-        RemotingCommand request = getRemotingCommand(RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS);
-        request.addExtField("cluster", "default-cluster");
-        RemotingCommand remotingCommand = defaultRequestProcessor.processRequest(ctx, request);
-        assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.SUCCESS);
-    }
-
-    @Test
-    public void testGetUnitTopicList() throws RemotingCommandException {
-        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
-        when(ctx.channel()).thenReturn(null);
-        RemotingCommand request = getRemotingCommand(RequestCode.GET_UNIT_TOPIC_LIST);
-        request.addExtField("cluster", "default-cluster");
-        RemotingCommand remotingCommand = defaultRequestProcessor.processRequest(ctx, request);
-        assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.SUCCESS);
-    }
-
-    @Test
-    public void testGetHasUnitSubTopicList() throws RemotingCommandException {
-        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
-        when(ctx.channel()).thenReturn(null);
-        RemotingCommand request = getRemotingCommand(RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST);
-        request.addExtField("cluster", "default-cluster");
-        RemotingCommand remotingCommand = defaultRequestProcessor.processRequest(ctx, request);
-        assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.SUCCESS);
-    }
-
-    @Test
-    public void testGetHasUnitSubUnUnitTopicList() throws RemotingCommandException {
-        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
-        when(ctx.channel()).thenReturn(null);
-        RemotingCommand request = getRemotingCommand(RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST);
-        request.addExtField("cluster", "default-cluster");
-        RemotingCommand remotingCommand = defaultRequestProcessor.processRequest(ctx, request);
-        assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.SUCCESS);
-    }
-
-    @Test
-    public void testUpdateConfig() throws RemotingCommandException {
-        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
-        when(ctx.channel()).thenReturn(null);
-        RemotingCommand request = getRemotingCommand(RequestCode.UPDATE_NAMESRV_CONFIG);
-        request.addExtField("cluster", "default-cluster");
-        Map<String, String> propertiesMap = new HashMap<>();
-        propertiesMap.put("key", "value");
-        request.setBody(propertiesMap.toString().getBytes());
-        RemotingCommand remotingCommand = defaultRequestProcessor.processRequest(ctx, request);
-        assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.SUCCESS);
-    }
-
-    @Test
-    public void testGetConfig() throws RemotingCommandException {
-        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
-        when(ctx.channel()).thenReturn(null);
-        RemotingCommand request = getRemotingCommand(RequestCode.GET_NAMESRV_CONFIG);
-        request.addExtField("cluster", "default-cluster");
-        RemotingCommand remotingCommand = defaultRequestProcessor.processRequest(ctx, request);
-        assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.SUCCESS);
-    }
-
-    private RemotingCommand getRemotingCommand(int code) {
-        RegisterBrokerRequestHeader header = new RegisterBrokerRequestHeader();
-        header.setBrokerName("broker");
-        RemotingCommand request = RemotingCommand.createRequestCommand(code, header);
-        request.addExtField("brokerName", "broker");
-        request.addExtField("brokerAddr", "10.10.1.1");
-        request.addExtField("clusterName", "cluster");
-        request.addExtField("haServerAddr", "10.10.2.1");
-        request.addExtField("brokerId", "2333");
-        request.addExtField("topic", "unit-test");
-        return request;
-    }
-
-    private static RemotingCommand genSampleRegisterCmd(boolean reg) {
-        RegisterBrokerRequestHeader header = new RegisterBrokerRequestHeader();
-        header.setBrokerName("broker");
-        RemotingCommand request = RemotingCommand.createRequestCommand(
-            reg ? RequestCode.REGISTER_BROKER : RequestCode.UNREGISTER_BROKER, header);
-        request.addExtField("brokerName", "broker");
-        request.addExtField("brokerAddr", "10.10.1.1");
-        request.addExtField("clusterName", "cluster");
-        request.addExtField("haServerAddr", "10.10.2.1");
-        request.addExtField("brokerId", "2333");
-        return request;
-    }
-
-    private static void setFinalStatic(Field field, Object newValue) throws Exception {
-        field.setAccessible(true);
-        Field modifiersField = Field.class.getDeclaredField("modifiers");
-        modifiersField.setAccessible(true);
-        modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
-        field.set(null, newValue);
-    }
-
-    private void registerRouteInfoManager() {
-        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
-        ConcurrentHashMap<String, TopicConfig> topicConfigConcurrentHashMap = new ConcurrentHashMap<>();
-        TopicConfig topicConfig = new TopicConfig();
-        topicConfig.setWriteQueueNums(8);
-        topicConfig.setTopicName("unit-test");
-        topicConfig.setPerm(6);
-        topicConfig.setReadQueueNums(8);
-        topicConfig.setOrder(false);
-        topicConfigConcurrentHashMap.put("unit-test", topicConfig);
-        topicConfigSerializeWrapper.setTopicConfigTable(topicConfigConcurrentHashMap);
-        Channel channel = mock(Channel.class);
-        RegisterBrokerResult registerBrokerResult = routeInfoManager.registerBroker("default-cluster", "127.0.0.1:10911", "default-broker", 0, "127.0.0.1:1001",
-            topicConfigSerializeWrapper, new ArrayList<String>(), channel);
-
-    }
-}
\ No newline at end of file
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AsyncNettyRequestProcessor.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AsyncNettyRequestProcessor.java
deleted file mode 100644
index db333f8..0000000
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/AsyncNettyRequestProcessor.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.netty;
-
-import io.netty.channel.ChannelHandlerContext;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-
-public abstract class AsyncNettyRequestProcessor implements NettyRequestProcessor {
-
-    public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception {
-        RemotingCommand response = processRequest(ctx, request);
-        responseCallback.callback(response);
-    }
-}
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageExtBatch.java b/store/src/main/java/org/apache/rocketmq/store/MessageExtBatch.java
deleted file mode 100644
index e62dfb4..0000000
--- a/store/src/main/java/org/apache/rocketmq/store/MessageExtBatch.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.store;
-
-import java.nio.ByteBuffer;
-
-public class MessageExtBatch extends MessageExtBrokerInner {
-
-    private static final long serialVersionUID = -2353110995348498537L;
-    /**
-     * Inner batch means the batch dose not need to be unwrapped
-     */
-    private boolean isInnerBatch = false;
-    public ByteBuffer wrap() {
-        assert getBody() != null;
-        return ByteBuffer.wrap(getBody(), 0, getBody().length);
-    }
-
-    public boolean isInnerBatch() {
-        return isInnerBatch;
-    }
-
-    public void setInnerBatch(boolean innerBatch) {
-        isInnerBatch = innerBatch;
-    }
-
-    private ByteBuffer encodedBuff;
-
-    public ByteBuffer getEncodedBuff() {
-        return encodedBuff;
-    }
-
-    public void setEncodedBuff(ByteBuffer encodedBuff) {
-        this.encodedBuff = encodedBuff;
-    }
-}
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java b/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java
deleted file mode 100644
index df7e6e5..0000000
--- a/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.store;
-
-import java.nio.ByteBuffer;
-
-import org.apache.rocketmq.common.TopicFilterType;
-import org.apache.rocketmq.common.message.MessageExt;
-
-public class MessageExtBrokerInner extends MessageExt {
-    private static final long serialVersionUID = 7256001576878700634L;
-    private String propertiesString;
-    private long tagsCode;
-
-    private ByteBuffer encodedBuff;
-
-    public ByteBuffer getEncodedBuff() {
-        return encodedBuff;
-    }
-
-    public void setEncodedBuff(ByteBuffer encodedBuff) {
-        this.encodedBuff = encodedBuff;
-    }
-
-    public static long tagsString2tagsCode(final TopicFilterType filter, final String tags) {
-        if (null == tags || tags.length() == 0) { return 0; }
-
-        return tags.hashCode();
-    }
-
-    public static long tagsString2tagsCode(final String tags) {
-        return tagsString2tagsCode(null, tags);
-    }
-
-    public String getPropertiesString() {
-        return propertiesString;
-    }
-
-    public void setPropertiesString(String propertiesString) {
-        this.propertiesString = propertiesString;
-    }
-
-    public long getTagsCode() {
-        return tagsCode;
-    }
-
-    public void setTagsCode(long tagsCode) {
-        this.tagsCode = tagsCode;
-    }
-}
diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java b/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java
deleted file mode 100644
index 7021992..0000000
--- a/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.store.schedule;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
-
-public class DelayOffsetSerializeWrapper extends RemotingSerializable {
-    private ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
-        new ConcurrentHashMap<Integer, Long>(32);
-
-    public ConcurrentMap<Integer, Long> getOffsetTable() {
-        return offsetTable;
-    }
-
-    public void setOffsetTable(ConcurrentMap<Integer, Long> offsetTable) {
-        this.offsetTable = offsetTable;
-    }
-}
diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
deleted file mode 100644
index 4d058ad..0000000
--- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
+++ /dev/null
@@ -1,819 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.store.schedule;
-
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-import org.apache.rocketmq.common.ConfigManager;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.common.TopicFilterType;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.common.message.MessageAccessor;
-import org.apache.rocketmq.common.message.MessageConst;
-import org.apache.rocketmq.common.message.MessageDecoder;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.running.RunningStats;
-import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.MessageExtBrokerInner;
-import org.apache.rocketmq.store.MessageStore;
-import org.apache.rocketmq.store.PutMessageResult;
-import org.apache.rocketmq.store.PutMessageStatus;
-import org.apache.rocketmq.store.config.StorePathConfigHelper;
-import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
-import org.apache.rocketmq.store.queue.CqUnit;
-import org.apache.rocketmq.store.queue.ReferredIterator;
-
-public class ScheduleMessageService extends ConfigManager {
-    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
-
-    private static final long FIRST_DELAY_TIME = 1000L;
-    private static final long DELAY_FOR_A_WHILE = 100L;
-    private static final long DELAY_FOR_A_PERIOD = 10000L;
-    private static final long WAIT_FOR_SHUTDOWN = 5000L;
-    private static final long DELAY_FOR_A_SLEEP = 10L;
-
-    private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
-        new ConcurrentHashMap<Integer, Long>(32);
-
-    private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
-        new ConcurrentHashMap<Integer, Long>(32);
-    private final DefaultMessageStore defaultMessageStore;
-    private final AtomicBoolean started = new AtomicBoolean(false);
-    private ScheduledExecutorService deliverExecutorService;
-    private MessageStore writeMessageStore;
-    private int maxDelayLevel;
-    private boolean enableAsyncDeliver = false;
-    private ScheduledExecutorService handleExecutorService;
-    private final Map<Integer /* level */, LinkedBlockingQueue<PutResultProcess>> deliverPendingTable =
-        new ConcurrentHashMap<>(32);
-
-    public ScheduleMessageService(final DefaultMessageStore defaultMessageStore) {
-        this.defaultMessageStore = defaultMessageStore;
-        this.writeMessageStore = defaultMessageStore;
-        if (defaultMessageStore != null) {
-            this.enableAsyncDeliver = defaultMessageStore.getMessageStoreConfig().isEnableScheduleAsyncDeliver();
-        }
-    }
-
-    public static int queueId2DelayLevel(final int queueId) {
-        return queueId + 1;
-    }
-
-    public static int delayLevel2QueueId(final int delayLevel) {
-        return delayLevel - 1;
-    }
-
-    /**
-     * @param writeMessageStore the writeMessageStore to set
-     */
-    public void setWriteMessageStore(MessageStore writeMessageStore) {
-        this.writeMessageStore = writeMessageStore;
-    }
-
-    public void buildRunningStats(HashMap<String, String> stats) {
-        Iterator<Map.Entry<Integer, Long>> it = this.offsetTable.entrySet().iterator();
-        while (it.hasNext()) {
-            Map.Entry<Integer, Long> next = it.next();
-            int queueId = delayLevel2QueueId(next.getKey());
-            long delayOffset = next.getValue();
-            long maxOffset = this.defaultMessageStore.getMaxOffsetInQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, queueId);
-            String value = String.format("%d,%d", delayOffset, maxOffset);
-            String key = String.format("%s_%d", RunningStats.scheduleMessageOffset.name(), next.getKey());
-            stats.put(key, value);
-        }
-    }
-
-    private void updateOffset(int delayLevel, long offset) {
-        this.offsetTable.put(delayLevel, offset);
-    }
-
-    public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) {
-        Long time = this.delayLevelTable.get(delayLevel);
-        if (time != null) {
-            return time + storeTimestamp;
-        }
-
-        return storeTimestamp + 1000;
-    }
-
-    public void start() {
-        if (started.compareAndSet(false, true)) {
-            super.load();
-            this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
-            if (this.enableAsyncDeliver) {
-                this.handleExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));
-            }
-            for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
-                Integer level = entry.getKey();
-                Long timeDelay = entry.getValue();
-                Long offset = this.offsetTable.get(level);
-                if (null == offset) {
-                    offset = 0L;
-                }
-
-                if (timeDelay != null) {
-                    if (this.enableAsyncDeliver) {
-                        this.handleExecutorService.schedule(new HandlePutResultTask(level), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
-                    }
-                    this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
-                }
-            }
-
-            this.deliverExecutorService.scheduleAtFixedRate(new Runnable() {
-
-                @Override
-                public void run() {
-                    try {
-                        if (started.get()) {
-                            ScheduleMessageService.this.persist();
-                        }
-                    } catch (Throwable e) {
-                        log.error("scheduleAtFixedRate flush exception", e);
-                    }
-                }
-            }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);
-        }
-    }
-
-    public void shutdown() {
-        if (this.started.compareAndSet(true, false) && null != this.deliverExecutorService) {
-            this.deliverExecutorService.shutdown();
-            try {
-                this.deliverExecutorService.awaitTermination(WAIT_FOR_SHUTDOWN, TimeUnit.MILLISECONDS);
-            } catch (InterruptedException e) {
-                log.error("deliverExecutorService awaitTermination error", e);
-            }
-
-            if (this.handleExecutorService != null) {
-                this.handleExecutorService.shutdown();
-                try {
-                    this.handleExecutorService.awaitTermination(WAIT_FOR_SHUTDOWN, TimeUnit.MILLISECONDS);
-                } catch (InterruptedException e) {
-                    log.error("handleExecutorService awaitTermination error", e);
-                }
-            }
-
-            if (this.deliverPendingTable != null) {
-                for (int i = 1; i <= this.deliverPendingTable.size(); i++) {
-                    log.warn("deliverPendingTable level: {}, size: {}", i, this.deliverPendingTable.get(i).size());
-                }
-            }
-
-            this.persist();
-        }
-    }
-
-    public boolean isStarted() {
-        return started.get();
-    }
-
-    public int getMaxDelayLevel() {
-        return maxDelayLevel;
-    }
-
-    @Override
-    public String encode() {
-        return this.encode(false);
-    }
-
-    @Override
-    public boolean load() {
-        boolean result = super.load();
-        result = result && this.parseDelayLevel();
-        result = result && this.correctDelayOffset();
-        return result;
-    }
-
-    public boolean correctDelayOffset() {
-        try {
-            for (int delayLevel : delayLevelTable.keySet()) {
-                ConsumeQueueInterface cq =
-                    ScheduleMessageService.this.defaultMessageStore.getQueueStore().findOrCreateConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
-                        delayLevel2QueueId(delayLevel));
-                Long currentDelayOffset = offsetTable.get(delayLevel);
-                if (currentDelayOffset == null || cq == null) {
-                    continue;
-                }
-                long correctDelayOffset = currentDelayOffset;
-                long cqMinOffset = cq.getMinOffsetInQueue();
-                long cqMaxOffset = cq.getMaxOffsetInQueue();
-                if (currentDelayOffset < cqMinOffset) {
-                    correctDelayOffset = cqMinOffset;
-                    log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
-                        currentDelayOffset, cqMinOffset, cqMaxOffset, cq.getQueueId());
-                }
-
-                if (currentDelayOffset > cqMaxOffset) {
-                    correctDelayOffset = cqMaxOffset;
-                    log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
-                        currentDelayOffset, cqMinOffset, cqMaxOffset, cq.getQueueId());
-                }
-                if (correctDelayOffset != currentDelayOffset) {
-                    log.error("correct delay offset [ delayLevel {} ] from {} to {}", delayLevel, currentDelayOffset, correctDelayOffset);
-                    offsetTable.put(delayLevel, correctDelayOffset);
-                }
-            }
-        } catch (Exception e) {
-            log.error("correctDelayOffset exception", e);
-            return false;
-        }
-        return true;
-    }
-
-    @Override
-    public String configFilePath() {
-        return StorePathConfigHelper.getDelayOffsetStorePath(this.defaultMessageStore.getMessageStoreConfig()
-            .getStorePathRootDir());
-    }
-
-    @Override
-    public void decode(String jsonString) {
-        if (jsonString != null) {
-            DelayOffsetSerializeWrapper delayOffsetSerializeWrapper =
-                DelayOffsetSerializeWrapper.fromJson(jsonString, DelayOffsetSerializeWrapper.class);
-            if (delayOffsetSerializeWrapper != null) {
-                this.offsetTable.putAll(delayOffsetSerializeWrapper.getOffsetTable());
-            }
-        }
-    }
-
-    @Override
-    public String encode(final boolean prettyFormat) {
-        DelayOffsetSerializeWrapper delayOffsetSerializeWrapper = new DelayOffsetSerializeWrapper();
-        delayOffsetSerializeWrapper.setOffsetTable(this.offsetTable);
-        return delayOffsetSerializeWrapper.toJson(prettyFormat);
-    }
-
-    public boolean parseDelayLevel() {
-        HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();
-        timeUnitTable.put("s", 1000L);
-        timeUnitTable.put("m", 1000L * 60);
-        timeUnitTable.put("h", 1000L * 60 * 60);
-        timeUnitTable.put("d", 1000L * 60 * 60 * 24);
-
-        String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
-        try {
-            String[] levelArray = levelString.split(" ");
-            for (int i = 0; i < levelArray.length; i++) {
-                String value = levelArray[i];
-                String ch = value.substring(value.length() - 1);
-                Long tu = timeUnitTable.get(ch);
-
-                int level = i + 1;
-                if (level > this.maxDelayLevel) {
-                    this.maxDelayLevel = level;
-                }
-                long num = Long.parseLong(value.substring(0, value.length() - 1));
-                long delayTimeMillis = tu * num;
-                this.delayLevelTable.put(level, delayTimeMillis);
-                if (this.enableAsyncDeliver) {
-                    this.deliverPendingTable.put(level, new LinkedBlockingQueue<>());
-                }
-            }
-        } catch (Exception e) {
-            log.error("parseDelayLevel exception", e);
-            log.info("levelString String = {}", levelString);
-            return false;
-        }
-
-        return true;
-    }
-
-    private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
-        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
-        msgInner.setBody(msgExt.getBody());
-        msgInner.setFlag(msgExt.getFlag());
-        MessageAccessor.setProperties(msgInner, msgExt.getProperties());
-
-        TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
-        long tagsCodeValue =
-            MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
-        msgInner.setTagsCode(tagsCodeValue);
-        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
-
-        msgInner.setSysFlag(msgExt.getSysFlag());
-        msgInner.setBornTimestamp(msgExt.getBornTimestamp());
-        msgInner.setBornHost(msgExt.getBornHost());
-        msgInner.setStoreHost(msgExt.getStoreHost());
-        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
-
-        msgInner.setWaitStoreMsgOK(false);
-        MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
-
-        msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
-
-        String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
-        int queueId = Integer.parseInt(queueIdStr);
-        msgInner.setQueueId(queueId);
-
-        return msgInner;
-    }
-
-    public int computeDelayLevel(long timeMillis) {
-        long intervalMillis = timeMillis - System.currentTimeMillis();
-        List<Map.Entry<Integer, Long>> sortedLevels = delayLevelTable.entrySet().stream().sorted(Comparator.comparingLong(Map.Entry::getValue)).collect(Collectors.toList());
-        for (Map.Entry<Integer, Long> entry : sortedLevels) {
-            if (entry.getValue() > intervalMillis) {
-                return entry.getKey();
-            }
-        }
-        return sortedLevels.get(sortedLevels.size() - 1).getKey();
-    }
-
-    class DeliverDelayedMessageTimerTask implements Runnable {
-        private final int delayLevel;
-        private final long offset;
-
-        public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
-            this.delayLevel = delayLevel;
-            this.offset = offset;
-        }
-
-        @Override
-        public void run() {
-            try {
-                if (isStarted()) {
-                    this.executeOnTimeup();
-                }
-            } catch (Exception e) {
-                // XXX: warn and notify me
-                log.error("ScheduleMessageService, executeOnTimeup exception", e);
-                this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_PERIOD);
-            }
-        }
-
-        /**
-         * @return
-         */
-        private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {
-
-            long result = deliverTimestamp;
-
-            long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);
-            if (deliverTimestamp > maxTimestamp) {
-                result = now;
-            }
-
-            return result;
-        }
-
-        public void executeOnTimeup() {
-            ConsumeQueueInterface cq =
-                ScheduleMessageService.this.defaultMessageStore.getConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
-                    delayLevel2QueueId(delayLevel));
-
-            if (cq == null) {
-                this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);
-                return;
-            }
-
-            ReferredIterator<CqUnit> bufferCQ = cq.iterateFrom(this.offset);
-            if (bufferCQ == null) {
-                long resetOffset;
-                if ((resetOffset = cq.getMinOffsetInQueue()) > this.offset) {
-                    log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}",
-                        this.offset, resetOffset, cq.getQueueId());
-                } else if ((resetOffset = cq.getMaxOffsetInQueue()) < this.offset) {
-                    log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}",
-                        this.offset, resetOffset, cq.getQueueId());
-                } else {
-                    resetOffset = this.offset;
-                }
-
-                this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);
-                return;
-            }
-
-            long nextOffset = this.offset;
-            try {
-                while (bufferCQ.hasNext() && isStarted()) {
-                    CqUnit cqUnit = bufferCQ.next();
-                    long offsetPy = cqUnit.getPos();
-                    int sizePy = cqUnit.getSize();
-                    long tagsCode = cqUnit.getTagsCode();
-
-                    if (!cqUnit.isTagsCodeValid()) {
-                        //can't find ext content.So re compute tags code.
-                        log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
-                            tagsCode, offsetPy, sizePy);
-                        long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
-                        tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
-                    }
-
-                    long now = System.currentTimeMillis();
-                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
-
-                    long currOffset = cqUnit.getQueueOffset();
-                    assert cqUnit.getBatchNum() == 1;
-                    nextOffset = currOffset + cqUnit.getBatchNum();
-
-                    long countdown = deliverTimestamp - now;
-                    if (countdown > 0) {
-                        this.scheduleNextTimerTask(currOffset, DELAY_FOR_A_WHILE);
-                        ScheduleMessageService.this.updateOffset(this.delayLevel, currOffset);
-                        return;
-                    }
-
-                    MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
-                    if (msgExt == null) {
-                        continue;
-                    }
-
-                    MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
-                    if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
-                        log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
-                            msgInner.getTopic(), msgInner);
-                        continue;
-                    }
-
-                    boolean deliverSuc;
-                    if (ScheduleMessageService.this.enableAsyncDeliver) {
-                        deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);
-                    } else {
-                        deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);
-                    }
-
-                    if (!deliverSuc) {
-                        this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
-                        return;
-                    }
-                }
-            } catch (Exception e) {
-                log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);
-            } finally {
-                bufferCQ.release();
-            }
-
-            this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
-        }
-
-        public void scheduleNextTimerTask(long offset, long delay) {
-            ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(
-                this.delayLevel, offset), delay, TimeUnit.MILLISECONDS);
-        }
-
-        private boolean syncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy,
-            int sizePy) {
-            PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, false);
-            PutMessageResult result = resultProcess.get();
-            boolean sendStatus = result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK;
-            if (sendStatus) {
-                ScheduleMessageService.this.updateOffset(this.delayLevel, resultProcess.getNextOffset());
-            }
-            return sendStatus;
-        }
-
-        private boolean asyncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy,
-            int sizePy) {
-            Queue<PutResultProcess> processesQueue = ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel);
-
-            //Flow Control
-            int currentPendingNum = processesQueue.size();
-            int maxPendingLimit = ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig()
-                .getScheduleAsyncDeliverMaxPendingLimit();
-            if (currentPendingNum > maxPendingLimit) {
-                log.warn("Asynchronous deliver triggers flow control, " +
-                    "currentPendingNum={}, maxPendingLimit={}", currentPendingNum, maxPendingLimit);
-                return false;
-            }
-
-            //Blocked
-            PutResultProcess firstProcess = processesQueue.peek();
-            if (firstProcess != null && firstProcess.need2Blocked()) {
-                log.warn("Asynchronous deliver block. info={}", firstProcess.toString());
-                return false;
-            }
-
-            PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, true);
-            processesQueue.add(resultProcess);
-            return true;
-        }
-
-        private PutResultProcess deliverMessage(MessageExtBrokerInner msgInner, String msgId, long offset,
-            long offsetPy, int sizePy, boolean autoResend) {
-            CompletableFuture<PutMessageResult> future =
-                ScheduleMessageService.this.writeMessageStore.asyncPutMessage(msgInner);
-            return new PutResultProcess()
-                .setTopic(msgInner.getTopic())
-                .setDelayLevel(this.delayLevel)
-                .setOffset(offset)
-                .setPhysicOffset(offsetPy)
-                .setPhysicSize(sizePy)
-                .setMsgId(msgId)
-                .setAutoResend(autoResend)
-                .setFuture(future)
-                .thenProcess();
-        }
-    }
-
-    public class HandlePutResultTask implements Runnable {
-        private final int delayLevel;
-
-        public HandlePutResultTask(int delayLevel) {
-            this.delayLevel = delayLevel;
-        }
-
-        @Override
-        public void run() {
-            LinkedBlockingQueue<PutResultProcess> pendingQueue =
-                ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel);
-
-            PutResultProcess putResultProcess;
-            while ((putResultProcess = pendingQueue.peek()) != null) {
-                try {
-                    switch (putResultProcess.getStatus()) {
-                        case SUCCESS:
-                            ScheduleMessageService.this.updateOffset(this.delayLevel, putResultProcess.getNextOffset());
-                            pendingQueue.remove();
-                            break;
-                        case RUNNING:
-                            break;
-                        case EXCEPTION:
-                            if (!isStarted()) {
-                                log.warn("HandlePutResultTask shutdown, info={}", putResultProcess.toString());
-                                return;
-                            }
-                            log.warn("putResultProcess error, info={}", putResultProcess.toString());
-                            putResultProcess.onException();
-                            break;
-                        case SKIP:
-                            log.warn("putResultProcess skip, info={}", putResultProcess.toString());
-                            pendingQueue.remove();
-                            break;
-                    }
-                } catch (Exception e) {
-                    log.error("HandlePutResultTask exception. info={}", putResultProcess.toString(), e);
-                    putResultProcess.onException();
-                }
-            }
-
-            if (isStarted()) {
-                ScheduleMessageService.this.handleExecutorService
-                    .schedule(new HandlePutResultTask(this.delayLevel), DELAY_FOR_A_SLEEP, TimeUnit.MILLISECONDS);
-            }
-        }
-    }
-
-    public class PutResultProcess {
-        private String topic;
-        private long offset;
-        private long physicOffset;
-        private int physicSize;
-        private int delayLevel;
-        private String msgId;
-        private boolean autoResend = false;
-        private CompletableFuture<PutMessageResult> future;
-
-        private volatile int resendCount = 0;
-        private volatile ProcessStatus status = ProcessStatus.RUNNING;
-
-        public PutResultProcess setTopic(String topic) {
-            this.topic = topic;
-            return this;
-        }
-
-        public PutResultProcess setOffset(long offset) {
-            this.offset = offset;
-            return this;
-        }
-
-        public PutResultProcess setPhysicOffset(long physicOffset) {
-            this.physicOffset = physicOffset;
-            return this;
-        }
-
-        public PutResultProcess setPhysicSize(int physicSize) {
-            this.physicSize = physicSize;
-            return this;
-        }
-
-        public PutResultProcess setDelayLevel(int delayLevel) {
-            this.delayLevel = delayLevel;
-            return this;
-        }
-
-        public PutResultProcess setMsgId(String msgId) {
-            this.msgId = msgId;
-            return this;
-        }
-
-        public PutResultProcess setAutoResend(boolean autoResend) {
-            this.autoResend = autoResend;
-            return this;
-        }
-
-        public PutResultProcess setFuture(CompletableFuture<PutMessageResult> future) {
-            this.future = future;
-            return this;
-        }
-
-        public String getTopic() {
-            return topic;
-        }
-
-        public long getOffset() {
-            return offset;
-        }
-
-        public long getNextOffset() {
-            return offset + 1;
-        }
-
-        public long getPhysicOffset() {
-            return physicOffset;
-        }
-
-        public int getPhysicSize() {
-            return physicSize;
-        }
-
-        public Integer getDelayLevel() {
-            return delayLevel;
-        }
-
-        public String getMsgId() {
-            return msgId;
-        }
-
-        public boolean isAutoResend() {
-            return autoResend;
-        }
-
-        public CompletableFuture<PutMessageResult> getFuture() {
-            return future;
-        }
-
-        public int getResendCount() {
-            return resendCount;
-        }
-
-        public PutResultProcess thenProcess() {
-            this.future.thenAccept(result -> {
-                this.handleResult(result);
-            });
-
-            this.future.exceptionally(e -> {
-                log.error("ScheduleMessageService put message exceptionally, info: {}",
-                    PutResultProcess.this.toString(), e);
-
-                onException();
-                return null;
-            });
-            return this;
-        }
-
-        private void handleResult(PutMessageResult result) {
-            if (result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
-                onSuccess(result);
-            } else {
-                log.warn("ScheduleMessageService put message failed. info: {}.", result);
-                onException();
-            }
-        }
-
-        public void onSuccess(PutMessageResult result) {
-            this.status = ProcessStatus.SUCCESS;
-            if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) {
-                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, result.getAppendMessageResult().getMsgNum());
-                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, result.getAppendMessageResult().getWroteBytes());
-                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, result.getAppendMessageResult().getMsgNum());
-                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, result.getAppendMessageResult().getWroteBytes());
-                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(this.topic, result.getAppendMessageResult().getMsgNum(), 1);
-                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(this.topic, result.getAppendMessageResult().getWroteBytes());
-                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(result.getAppendMessageResult().getMsgNum());
-            }
-        }
-
-        public void onException() {
-            log.warn("ScheduleMessageService onException, info: {}", this.toString());
-            if (this.autoResend) {
-                this.resend();
-            } else {
-                this.status = ProcessStatus.SKIP;
-            }
-        }
-
-        public ProcessStatus getStatus() {
-            return this.status;
-        }
-
-        public PutMessageResult get() {
-            try {
-                return this.future.get();
-            } catch (InterruptedException | ExecutionException e) {
-                return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
-            }
-        }
-
-        private void resend() {
-            log.info("Resend message, info: {}", this.toString());
-
-            // Gradually increase the resend interval.
-            try {
-                Thread.sleep(Math.min(this.resendCount++ * 100, 60 * 1000));
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-
-            try {
-                MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(this.physicOffset, this.physicSize);
-                if (msgExt == null) {
-                    log.warn("ScheduleMessageService resend not found message. info: {}", this.toString());
-                    this.status = need2Skip() ? ProcessStatus.SKIP : ProcessStatus.EXCEPTION;
-                    return;
-                }
-
-                MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
-                PutMessageResult result = ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);
-                this.handleResult(result);
-                if (result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
-                    log.info("Resend message success, info: {}", this.toString());
-                }
-            } catch (Exception e) {
-                this.status = ProcessStatus.EXCEPTION;
-                log.error("Resend message error, info: {}", this.toString(), e);
-            }
-        }
-
-        public boolean need2Blocked() {
-            int maxResendNum2Blocked = ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig()
-                .getScheduleAsyncDeliverMaxResendNum2Blocked();
-            return this.resendCount > maxResendNum2Blocked;
-        }
-
-        public boolean need2Skip() {
-            int maxResendNum2Blocked = ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig()
-                .getScheduleAsyncDeliverMaxResendNum2Blocked();
-            return this.resendCount > maxResendNum2Blocked * 2;
-        }
-
-        @Override
-        public String toString() {
-            return "PutResultProcess{" +
-                "topic='" + topic + '\'' +
-                ", offset=" + offset +
-                ", physicOffset=" + physicOffset +
-                ", physicSize=" + physicSize +
-                ", delayLevel=" + delayLevel +
-                ", msgId='" + msgId + '\'' +
-                ", autoResend=" + autoResend +
-                ", resendCount=" + resendCount +
-                ", status=" + status +
-                '}';
-        }
-    }
-
-    public enum ProcessStatus {
-        /**
-         * In process, the processing result has not yet been returned.
-         */
-        RUNNING,
-
-        /**
-         * Put message success.
-         */
-        SUCCESS,
-
-        /**
-         * Put message exception. When autoResend is true, the message will be resend.
-         */
-        EXCEPTION,
-
-        /**
-         * Skip put message. When the message cannot be looked, the message will be skipped.
-         */
-        SKIP,
-    }
-}
diff --git a/store/src/test/java/org/apache/rocketmq/store/ScheduleMessageServiceTest.java b/store/src/test/java/org/apache/rocketmq/store/ScheduleMessageServiceTest.java
deleted file mode 100644
index 1c0451c..0000000
--- a/store/src/test/java/org/apache/rocketmq/store/ScheduleMessageServiceTest.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.store;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.rocketmq.common.BrokerConfig;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.common.message.MessageConst;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.store.config.FlushDiskType;
-import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.store.schedule.ScheduleMessageService;
-import org.apache.rocketmq.store.stats.BrokerStatsManager;
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class ScheduleMessageServiceTest {
-
-    private Random random = new Random();
-
-    @Test
-    public void testCorrectDelayOffset_whenInit() throws Exception {
-
-        ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable = null;
-
-        DefaultMessageStore defaultMessageStore = new DefaultMessageStore(buildMessageStoreConfig(),
-            new BrokerStatsManager("simpleTest", true), null, new BrokerConfig());
-        ScheduleMessageService scheduleMessageService = new ScheduleMessageService(defaultMessageStore);
-        scheduleMessageService.parseDelayLevel();
-
-        ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable1 = new ConcurrentHashMap<>();
-        for (int i = 1; i <= 18; i++) {
-            offsetTable1.put(i, random.nextLong());
-        }
-
-        Field field = scheduleMessageService.getClass().getDeclaredField("offsetTable");
-        field.setAccessible(true);
-        field.set(scheduleMessageService, offsetTable1);
-
-        String jsonStr = scheduleMessageService.encode();
-        scheduleMessageService.decode(jsonStr);
-
-        offsetTable = (ConcurrentMap<Integer, Long>) field.get(scheduleMessageService);
-
-        for (Map.Entry<Integer, Long> entry : offsetTable.entrySet()) {
-            assertEquals(entry.getValue(), offsetTable1.get(entry.getKey()));
-        }
-
-        scheduleMessageService.correctDelayOffset();
-
-        offsetTable = (ConcurrentMap<Integer, Long>) field.get(scheduleMessageService);
-
-        for (long offset : offsetTable.values()) {
-            assertEquals(0, offset);
-        }
-
-    }
-
-    private MessageStoreConfig buildMessageStoreConfig() throws Exception {
-        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
-        messageStoreConfig.setMappedFileSizeCommitLog(1024 * 1024 * 10);
-        messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 1024 * 10);
-        messageStoreConfig.setMaxHashSlotNum(10000);
-        messageStoreConfig.setMaxIndexNum(100 * 100);
-        messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
-        messageStoreConfig.setFlushIntervalConsumeQueue(1);
-        return messageStoreConfig;
-    }
-
-    @Test
-    public void testHandlePutResultTask() throws Exception {
-        DefaultMessageStore messageStore = mock(DefaultMessageStore.class);
-        MessageStoreConfig config = buildMessageStoreConfig();
-        config.setEnableScheduleMessageStats(false);
-        config.setEnableScheduleAsyncDeliver(true);
-        when(messageStore.getMessageStoreConfig()).thenReturn(config);
-        ScheduleMessageService scheduleMessageService = new ScheduleMessageService(messageStore);
-        scheduleMessageService.parseDelayLevel();
-
-        Field field = scheduleMessageService.getClass().getDeclaredField("deliverPendingTable");
-        field.setAccessible(true);
-        Map<Integer /* level */, LinkedBlockingQueue<ScheduleMessageService.PutResultProcess>> deliverPendingTable =
-            (Map<Integer, LinkedBlockingQueue<ScheduleMessageService.PutResultProcess>>) field.get(scheduleMessageService);
-
-        field = scheduleMessageService.getClass().getDeclaredField("offsetTable");
-        field.setAccessible(true);
-        ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
-            (ConcurrentMap<Integer /* level */, Long/* offset */>) field.get(scheduleMessageService);
-        for (int i = 1; i <= scheduleMessageService.getMaxDelayLevel(); i++) {
-            offsetTable.put(i, 0L);
-        }
-
-        int deliverThreadPoolNums = Runtime.getRuntime().availableProcessors();
-        ScheduledExecutorService handleExecutorService = new ScheduledThreadPoolExecutor(deliverThreadPoolNums,
-            new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));
-        field = scheduleMessageService.getClass().getDeclaredField("handleExecutorService");
-        field.setAccessible(true);
-        field.set(scheduleMessageService, handleExecutorService);
-
-        field = scheduleMessageService.getClass().getDeclaredField("started");
-        field.setAccessible(true);
-        AtomicBoolean started = (AtomicBoolean) field.get(scheduleMessageService);
-        started.set(true);
-
-        for (int level = 1; level <= scheduleMessageService.getMaxDelayLevel(); level++) {
-            ScheduleMessageService.HandlePutResultTask handlePutResultTask = scheduleMessageService.new HandlePutResultTask(level);
-            handleExecutorService.schedule(handlePutResultTask, 10L, TimeUnit.MILLISECONDS);
-        }
-
-        MessageExt messageExt = new MessageExt();
-        messageExt.putUserProperty("init", "test");
-        messageExt.getProperties().put(MessageConst.PROPERTY_REAL_QUEUE_ID, "0");
-        when(messageStore.lookMessageByOffset(anyLong(), anyInt())).thenReturn(messageExt);
-        when(messageStore.putMessage(any())).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, null));
-
-        int msgNum = 100;
-        int totalMsgNum = msgNum * scheduleMessageService.getMaxDelayLevel();
-        List<CompletableFuture<PutMessageResult>> putMsgFutrueList = new ArrayList<>(totalMsgNum);
-        for (int level = 1; level <= scheduleMessageService.getMaxDelayLevel(); level++) {
-            for (int num = 0; num < msgNum; num++) {
-                CompletableFuture<PutMessageResult> future = new CompletableFuture<>();
-                ScheduleMessageService.PutResultProcess putResultProcess = scheduleMessageService.new PutResultProcess();
-                putResultProcess = putResultProcess
-                    .setOffset(num)
-                    .setAutoResend(true)
-                    .setFuture(future)
-                    .thenProcess();
-                deliverPendingTable.get(level).add(putResultProcess);
-                putMsgFutrueList.add(future);
-            }
-        }
-
-        Collections.shuffle(putMsgFutrueList);
-        Random random = new Random();
-        for (CompletableFuture<PutMessageResult> future : putMsgFutrueList) {
-            PutMessageStatus status;
-            if (random.nextInt(1000) % 2 == 0) {
-                status = PutMessageStatus.PUT_OK;
-            } else {
-                status = PutMessageStatus.OS_PAGECACHE_BUSY;
-            }
-
-            if (random.nextInt(1000) % 2 == 0) {
-                PutMessageResult result = new PutMessageResult(status, null);
-                future.complete(result);
-            } else {
-                future.completeExceptionally(new Throwable("complete exceptionally"));
-            }
-        }
-
-        Thread.sleep(1000);
-        for (int level = 1; level <= scheduleMessageService.getMaxDelayLevel(); level++) {
-            Assert.assertEquals(0, deliverPendingTable.get(level).size());
-            Assert.assertEquals(msgNum, offsetTable.get(level).longValue());
-        }
-
-        scheduleMessageService.shutdown();
-    }
-}
diff --git a/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java b/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java
deleted file mode 100644
index de3cf7f..0000000
--- a/store/src/test/java/org/apache/rocketmq/store/schedule/ScheduleMessageServiceTest.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.store.schedule;
-
-import org.apache.rocketmq.common.BrokerConfig;
-import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.message.MessageDecoder;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.store.*;
-import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.store.stats.BrokerStatsManager;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.rocketmq.store.stats.BrokerStatsManager.BROKER_PUT_NUMS;
-import static org.apache.rocketmq.store.stats.BrokerStatsManager.TOPIC_PUT_NUMS;
-import static org.apache.rocketmq.store.stats.BrokerStatsManager.TOPIC_PUT_SIZE;
-import static org.assertj.core.api.Assertions.assertThat;
-
-
-public class ScheduleMessageServiceTest {
-
-
-    /**
-     * t
-     * defaultMessageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
-     */
-    String testMessageDelayLevel = "5s 8s";
-    /**
-     * choose delay level
-     */
-    int delayLevel = 2;
-
-    private static final String storePath = System.getProperty("user.home") + File.separator + "schedule_test#" + UUID.randomUUID();
-    private static final int commitLogFileSize = 1024;
-    private static final int cqFileSize = 10;
-    private static final int cqExtFileSize = 10 * (ConsumeQueueExt.CqExtUnit.MIN_EXT_UNIT_SIZE + 64);
-
-    private static SocketAddress bornHost;
-    private static SocketAddress storeHost;
-    DefaultMessageStore messageStore;
-    MessageStoreConfig messageStoreConfig;
-    BrokerConfig brokerConfig;
-    ScheduleMessageService scheduleMessageService;
-
-    static String sendMessage = " ------- schedule message test -------";
-    static String topic = "schedule_topic_test";
-    static String messageGroup = "delayGroupTest";
-
-
-    static {
-        try {
-            bornHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
-        } catch (UnknownHostException e) {
-            e.printStackTrace();
-        }
-        try {
-            storeHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
-        } catch (UnknownHostException e) {
-            e.printStackTrace();
-        }
-    }
-
-
-    @Before
-    public void init() throws Exception {
-        messageStoreConfig = new MessageStoreConfig();
-        messageStoreConfig.setMessageDelayLevel(testMessageDelayLevel);
-        messageStoreConfig.setMappedFileSizeCommitLog(commitLogFileSize);
-        messageStoreConfig.setMappedFileSizeConsumeQueue(cqFileSize);
-        messageStoreConfig.setMappedFileSizeConsumeQueueExt(cqExtFileSize);
-        messageStoreConfig.setMessageIndexEnable(false);
-        messageStoreConfig.setEnableConsumeQueueExt(true);
-        messageStoreConfig.setStorePathRootDir(storePath);
-        messageStoreConfig.setStorePathCommitLog(storePath + File.separator + "commitlog");
-
-        brokerConfig = new BrokerConfig();
-        BrokerStatsManager manager = new BrokerStatsManager(brokerConfig.getBrokerClusterName(), brokerConfig.isEnableDetailStat());
-        messageStore = new DefaultMessageStore(messageStoreConfig, manager, new MyMessageArrivingListener(), new BrokerConfig());
-
-        assertThat(messageStore.load()).isTrue();
-
-        messageStore.start();
-        scheduleMessageService = messageStore.getScheduleMessageService();
-    }
-
-
-    @Test
-    public void deliverDelayedMessageTimerTaskTest() throws Exception {
-        assertThat(messageStore.getMessageStoreConfig().isEnableScheduleMessageStats()).isTrue();
-
-        assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_NUMS, topic)).isNull();
-
-        MessageExtBrokerInner msg = buildMessage();
-        int realQueueId = msg.getQueueId();
-        // set delayLevel,and send delay message
-        msg.setDelayTimeLevel(delayLevel);
-        PutMessageResult result = messageStore.putMessage(msg);
-        assertThat(result.isOk()).isTrue();
-
-        // make sure consumerQueue offset = commitLog offset
-        StoreTestUtil.waitCommitLogReput(messageStore);
-
-        // consumer message
-        int delayQueueId = ScheduleMessageService.delayLevel2QueueId(delayLevel);
-        assertThat(delayQueueId).isEqualTo(delayLevel - 1);
-
-        Long offset = result.getAppendMessageResult().getLogicsOffset();
-
-        // now, no message in queue,must wait > delayTime
-        GetMessageResult messageResult = getMessage(realQueueId, offset);
-        assertThat(messageResult.getStatus()).isEqualTo(GetMessageStatus.NO_MESSAGE_IN_QUEUE);
-
-        // timer run maybe delay, then consumer message again
-        // and wait offsetTable
-        TimeUnit.SECONDS.sleep(10);
-        scheduleMessageService.buildRunningStats(new HashMap<String, String>());
-
-        messageResult = getMessage(realQueueId, offset);
-        // now,found the message
-        assertThat(messageResult.getStatus()).isEqualTo(GetMessageStatus.FOUND);
-
-        // get the stats change
-        assertThat(messageStore.getBrokerStatsManager().getStatsItem(BROKER_PUT_NUMS, brokerConfig.getBrokerClusterName()).getValue().sum()).isEqualTo(1);
-        assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_NUMS, topic).getValue().sum()).isEqualTo(1L);
-        assertThat(messageStore.getBrokerStatsManager().getStatsItem(TOPIC_PUT_SIZE, topic).getValue().sum()).isEqualTo(messageResult.getBufferTotalSize());
-
-        // get the message body
-        ByteBuffer byteBuffer = ByteBuffer.allocate(messageResult.getBufferTotalSize());
-        List<ByteBuffer> byteBufferList = messageResult.getMessageBufferList();
-        for (ByteBuffer bb : byteBufferList) {
-            byteBuffer.put(bb);
-        }
-
-        // warp and decode the message
-        byteBuffer = ByteBuffer.wrap(byteBuffer.array());
-        List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
-        String retryMsg = new String(msgList.get(0).getBody());
-        assertThat(sendMessage).isEqualTo(retryMsg);
-
-        //  method will wait 10s,so I run it by myself
-        scheduleMessageService.persist();
-
-        // add mapFile release
-        messageResult.release();
-
-    }
-
-    /**
-     * add some [error/no use] code test
-     */
-    @Test
-    public void otherTest() {
-        // the method no use ,why need ?
-        int queueId = ScheduleMessageService.queueId2DelayLevel(delayLevel);
-        assertThat(queueId).isEqualTo(delayLevel + 1);
-
-        // error delayLevelTest
-        Long time = scheduleMessageService.computeDeliverTimestamp(999, 0);
-        assertThat(time).isEqualTo(1000);
-
-        // just decode
-        scheduleMessageService.decode(new DelayOffsetSerializeWrapper().toJson());
-    }
-
-
-    private GetMessageResult getMessage(int queueId, Long offset) {
-        return messageStore.getMessage(messageGroup, topic,
-                queueId, offset, 1, null);
-
-    }
-
-
-    @After
-    public void shutdown() throws InterruptedException {
-        messageStore.shutdown();
-        messageStore.destroy();
-        File file = new File(messageStoreConfig.getStorePathRootDir());
-        UtilAll.deleteFile(file);
-    }
-
-
-    public MessageExtBrokerInner buildMessage() {
-
-        byte[] msgBody = sendMessage.getBytes();
-        MessageExtBrokerInner msg = new MessageExtBrokerInner();
-        msg.setTopic(topic);
-        msg.setTags("schedule_tag");
-        msg.setKeys("schedule_key");
-        msg.setBody(msgBody);
-        msg.setSysFlag(0);
-        msg.setBornTimestamp(System.currentTimeMillis());
-        msg.setStoreHost(storeHost);
-        msg.setBornHost(bornHost);
-        return msg;
-    }
-
-
-    private class MyMessageArrivingListener implements MessageArrivingListener {
-        @Override
-        public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
-                             byte[] filterBitMap, Map<String, String> properties) {
-        }
-    }
-
-
-}