You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by la...@apache.org on 2020/05/22 02:21:51 UTC

[incubator-tubemq] branch master updated: [TUBEMQ-140] Remove the SSD auxiliary consumption function (#89)

This is an automated email from the ASF dual-hosted git repository.

lamberliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/master by this push:
     new bccceba  [TUBEMQ-140] Remove the SSD auxiliary consumption function (#89)
bccceba is described below

commit bccceba11ca84b6258dbc4a5d2a5679bf1c292f7
Author: gosonzhang <46...@qq.com>
AuthorDate: Fri May 22 02:21:45 2020 +0000

    [TUBEMQ-140] Remove the SSD auxiliary consumption function (#89)
    
    Co-authored-by: gosonzhang <go...@tencent.com>
---
 docs/http_access_API_definition.md                 |  12 +-
 docs/http_access_API_definition_cn.xls             | Bin 193024 -> 193536 bytes
 docs/tubemq_config_introduction.md                 |   3 -
 docs/tubemq_config_introduction_cn.doc             | Bin 182272 -> 179712 bytes
 .../client/consumer/BaseMessageConsumer.java       |  16 +-
 .../corebase/policies/FlowCtrlRuleHandler.java     | 167 +----
 tubemq-core/src/main/protobuf/BrokerService.proto  |   4 +-
 tubemq-core/src/main/protobuf/MasterService.proto  |  16 +-
 .../corebase/policies/TestFlowCtrlRuleHandler.java |  13 +-
 .../apache/tubemq/server/broker/BrokerConfig.java  |  26 -
 .../tubemq/server/broker/BrokerServiceServer.java  |  22 +-
 .../apache/tubemq/server/broker/TubeBroker.java    |  21 +-
 .../server/broker/msgstore/MessageStore.java       |  55 +-
 .../broker/msgstore/MessageStoreManager.java       |  67 --
 .../server/broker/msgstore/disk/MsgFileStore.java  |  24 -
 .../server/broker/msgstore/ssd/MsgSSDSegment.java  | 325 ---------
 .../broker/msgstore/ssd/MsgSSDStoreManager.java    | 779 ---------------------
 .../server/broker/msgstore/ssd/SSDSegEvent.java    |  47 --
 .../server/broker/msgstore/ssd/SSDSegFound.java    |  49 --
 .../server/broker/msgstore/ssd/SSDSegIndex.java    |  64 --
 .../server/broker/msgstore/ssd/SSDVisitInfo.java   |  44 --
 .../server/broker/nodeinfo/ConsumerNodeInfo.java   | 179 +----
 .../server/broker/web/BrokerAdminServlet.java      |   8 +-
 .../org/apache/tubemq/server/master/TMaster.java   |  35 +-
 .../web/handler/WebAdminFlowRuleHandler.java       |  25 +-
 25 files changed, 57 insertions(+), 1944 deletions(-)

diff --git a/docs/http_access_API_definition.md b/docs/http_access_API_definition.md
index 3d44c0c..569b0b8 100644
--- a/docs/http_access_API_definition.md
+++ b/docs/http_access_API_definition.md
@@ -627,11 +627,10 @@ The flow control info is described in JSON format, for example:
 ```json
 [{"type":0,"rule":[{"start":"08:00","end":"17:59","dltInM":1024,"limitInM":20,"freqInMs":1000},{"start":"18:00","end":"22:00","dltInM":1024,"limitInM":20,"freqInMs":5000}]},{"type":2,"rule":[{"start":"18:00","end":"23:59","dltStInM":20480,"dltEdInMM":2048}]},{"type":1,"rule":[{"zeroCnt":3,"freqInMs":300},{"zeroCnt":8,"freqInMs":1000}]},{"type":3,"rule":[{"normFreqInMs":0,"filterFreqInMs":100,"minDataFilterFreqInMs":400}]}]
 ```
-The `type` has four values [0, 1, 2, 3]. 0: flow control, 1: frequency control, 2: delay to SSD storage control, 3: filter consumer frequency control,<br>
- `[start, end]` is an inclusive range of time, `dltInM` is the consuming delta in MB, `dltStInM` is consuming data delta when enabling SSD to storage <br>
-`dltEdInM` is consuming data delta when terminating SSD to storage, `limitInM` is the flow control each minute, `freqInMs` is the interval for sending request
-after exceeding the flow or freq limit, `zeroCnt` is the count of how many times occurs zero data, `normFreqInMs` is the interval of sequential pulling,<br>
-`filterFreqInMs` is the interval of pulling filtered request.
+The `type` has four values [0, 1, 3]. 0: flow control, 1: frequency control, 3: filter consumer frequency control,<br>
+ `[start, end]` is an inclusive range of time, `dltInM` is the consuming delta in MB, `limitInM` is the flow control each minute, <br>
+ `freqInMs` is the interval for sending request after exceeding the flow or freq limit, `zeroCnt` is the count of how many times occurs zero data,  <br>
+ `normFreqInMs` is the interval of sequential pulling, `filterFreqInMs` is the interval of pulling filtered request.
 
 __Request__
 
@@ -658,7 +657,6 @@ __Request__
 |StatusId|no| the strategy status Id, default 0|int|
 |qryPriorityId|no| the consuming priority Id. It is a composed field `A0B` with default value 301, <br>the value of A,B is [1, 2, 3] which means file, backup memory, and main memory respectively|int|
 |createUser|yes|the creator|String|
-|needSSDProc|no|whether to enable SSD to handle, default false|Boolean|
 |flowCtrlInfo|yes|the flow control info in JSON format|String|
 |createDate|yes|the creating date in format `yyyyMMddHHmmss`|String|
 
@@ -688,7 +686,6 @@ __Request__
 |StatusId|no| the strategy status Id, default 0|int|
 |qryPriorityId|no|the consuming priority Id. It is a composed field `A0B` with default value 301,<br> the value of A,B is [1, 2, 3] which means file, backup memory, and main memory respectively|int|
 |createUser|yes|the creator|String|
-|needSSDProc|no|whether to enable SSD to handle, default false|Boolean|
 |createDate|yes|the creating date in format `yyyyMMddHHmmss`|String|
 
 ### `admin_upd_group_flow_control_rule`
@@ -705,7 +702,6 @@ __Request__
 |StatusId|no| the strategy status Id, default 0|int|
 |qryPriorityId|no|the consuming priority Id. It is a composed field `A0B` with default value 301,<br> the value of A,B is [1, 2, 3] which means file, backup memory, and main memory respectively|int|
 |createUser|yes|the creator|String|
-|needSSDProc|no|whether to enable SSD to handle, default false|Boolean|
 |createDate|yes|the creating date in format `yyyyMMddHHmmss`|String|
 
 
diff --git a/docs/http_access_API_definition_cn.xls b/docs/http_access_API_definition_cn.xls
index e090109..3373687 100644
Binary files a/docs/http_access_API_definition_cn.xls and b/docs/http_access_API_definition_cn.xls differ
diff --git a/docs/tubemq_config_introduction.md b/docs/tubemq_config_introduction.md
index 61026e4..8a05f10 100644
--- a/docs/tubemq_config_introduction.md
+++ b/docs/tubemq_config_introduction.md
@@ -107,9 +107,6 @@ In addition to the back-end system configuration file, the Master also stores th
 | consumerRegTimeoutMs  | no       | long    | Consumer heartbeat timeout, optional, in milliseconds, default 30 seconds |
 | socketRecvBuffer      | no       | long    | Socket receives the size of the Buffer buffer SO_RCVBUF, the unit byte, the negative number is not set, the default value is |
 | socketSendBuffer      | no       | long    | Socket sends Buffer buffer SO_SNDBUF size, unit byte, negative number is not set, the default value is |
-| secondDataPath        | no       | string  | The SSD to storage location where the broker is located, optional field. The default is blank to indicate that the machine has no SSD. |
-| maxSSDTotalFileCnt    | no       | int     | The maximum number of Data files allowed by the SSD where the Broker is located, optional field, default 70 |
-| maxSSDTotalFileSizes  | no       | long    | The SSD where the Broker is located allows the maximum size of the data file to be saved. The optional field is 32G by default. |
 | tcpWriteServiceThread | no       | int     | Broker supports the number of socket worker threads for TCP production services, optional fields, and defaults to 2 times the number of CPUs of the machine. |
 | tcpReadServiceThread  | no       | int     | Broker supports the number of socket worker threads for TCP consumer services, optional fields, defaults to 2 times the number of CPUs of the machine |
 | logClearupDurationMs  | no       | long    | The aging cleanup period of the message file, in milliseconds. The default is 3 minutes for a log cleanup operation. The minimum is 1 minutes. |
diff --git a/docs/tubemq_config_introduction_cn.doc b/docs/tubemq_config_introduction_cn.doc
index 89fd19c..216de22 100644
Binary files a/docs/tubemq_config_introduction_cn.doc and b/docs/tubemq_config_introduction_cn.doc differ
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
index 997320f..b9aa993 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
@@ -886,7 +886,6 @@ public class BaseMessageConsumer implements MessageConsumer {
         builder.setSessionTime(this.consumeSubInfo.getSubscribedTime());
         builder.addAllTopicList(this.consumeSubInfo.getSubscribedTopics());
         builder.setDefFlowCheckId(defFlowCtrlRuleHandler.getFlowCtrlId());
-        builder.setSsdStoreId(groupFlowCtrlRuleHandler.getSsdTranslateId());
         builder.setGroupFlowCheckId(groupFlowCtrlRuleHandler.getFlowCtrlId());
         builder.setQryPriorityId(groupFlowCtrlRuleHandler.getQryPriorityId());
         List<SubscribeInfo> subInfoList =
@@ -945,7 +944,6 @@ public class BaseMessageConsumer implements MessageConsumer {
         builder.setGroupName(this.consumerConfig.getConsumerGroup());
         builder.setReportSubscribeInfo(reportSubscribeInfo);
         builder.setDefFlowCheckId(defFlowCtrlRuleHandler.getFlowCtrlId());
-        builder.setSsdStoreId(groupFlowCtrlRuleHandler.getSsdTranslateId());
         builder.setQryPriorityId(groupFlowCtrlRuleHandler.getQryPriorityId());
         builder.setGroupFlowCheckId(groupFlowCtrlRuleHandler.getFlowCtrlId());
         if (event != null) {
@@ -989,7 +987,6 @@ public class BaseMessageConsumer implements MessageConsumer {
         builder.setOpType(RpcConstants.MSG_OPTYPE_REGISTER);
         builder.setTopicName(partition.getTopic());
         builder.setPartitionId(partition.getPartitionId());
-        builder.setSsdStoreId(groupFlowCtrlRuleHandler.getSsdTranslateId());
         builder.setQryPriorityId(groupFlowCtrlRuleHandler.getQryPriorityId());
         builder.setReadStatus(getGroupInitReadStatus(rmtDataCache.bookPartition(partition.getPartitionKey())));
         TopicProcessor topicProcessor =
@@ -1042,7 +1039,6 @@ public class BaseMessageConsumer implements MessageConsumer {
         builder.setClientId(consumerId);
         builder.setGroupName(this.consumerConfig.getConsumerGroup());
         builder.setReadStatus(getGroupInitReadStatus(false));
-        builder.setSsdStoreId(groupFlowCtrlRuleHandler.getSsdTranslateId());
         builder.setQryPriorityId(groupFlowCtrlRuleHandler.getQryPriorityId());
         builder.addAllPartitionInfo(partitionList);
         ClientBroker.AuthorizedInfo.Builder authInfoBuilder =
@@ -1062,16 +1058,12 @@ public class BaseMessageConsumer implements MessageConsumer {
                     ? response.getQryPriorityId() : groupFlowCtrlRuleHandler.getQryPriorityId();
             if (response.getGroupFlowCheckId() != groupFlowCtrlRuleHandler.getFlowCtrlId()) {
                 try {
-                    groupFlowCtrlRuleHandler.updateDefFlowCtrlInfo(response.getSsdStoreId(),
-                            TBaseConstants.META_VALUE_UNDEFINED,
+                    groupFlowCtrlRuleHandler.updateDefFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
                             response.getGroupFlowCheckId(), response.getGroupFlowControlInfo());
                 } catch (Exception e1) {
                     logger.warn("[Register response] found parse group flowCtrl rules failure", e1);
                 }
             }
-            if (response.getSsdStoreId() != groupFlowCtrlRuleHandler.getSsdTranslateId()) {
-                groupFlowCtrlRuleHandler.setSsdTranslateId(response.getSsdStoreId());
-            }
             if (qryPriorityId != groupFlowCtrlRuleHandler.getQryPriorityId()) {
                 groupFlowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
             }
@@ -1093,17 +1085,13 @@ public class BaseMessageConsumer implements MessageConsumer {
                     ? response.getQryPriorityId() : groupFlowCtrlRuleHandler.getQryPriorityId();
             if (response.getGroupFlowCheckId() != groupFlowCtrlRuleHandler.getFlowCtrlId()) {
                 try {
-                    groupFlowCtrlRuleHandler.updateDefFlowCtrlInfo(response.getSsdStoreId(),
-                            TBaseConstants.META_VALUE_UNDEFINED,
+                    groupFlowCtrlRuleHandler.updateDefFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
                             response.getGroupFlowCheckId(), response.getGroupFlowControlInfo());
                 } catch (Exception e1) {
                     logger.warn(
                             "[Heartbeat response] found parse group flowCtrl rules failure", e1);
                 }
             }
-            if (response.getSsdStoreId() != groupFlowCtrlRuleHandler.getSsdTranslateId()) {
-                groupFlowCtrlRuleHandler.setSsdTranslateId(response.getSsdStoreId());
-            }
             if (qryPriorityId != groupFlowCtrlRuleHandler.getQryPriorityId()) {
                 groupFlowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
             }
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/policies/FlowCtrlRuleHandler.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/policies/FlowCtrlRuleHandler.java
index ade5c55..0df1625 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/policies/FlowCtrlRuleHandler.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/policies/FlowCtrlRuleHandler.java
@@ -53,8 +53,6 @@ public class FlowCtrlRuleHandler {
     // Flow control ID and string information obtained from the server
     private AtomicLong flowCtrlId =
             new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
-    private AtomicLong ssdTranslateId =
-            new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
     private AtomicInteger qryPriorityId =
             new AtomicInteger(TBaseConstants.META_VALUE_UNDEFINED);
     private String strFlowCtrlInfo;
@@ -62,12 +60,6 @@ public class FlowCtrlRuleHandler {
     //improving the efficiency of the search return in the range
     private AtomicInteger minZeroCnt =
             new AtomicInteger(Integer.MAX_VALUE);
-    private AtomicLong minSSDProcDlt =
-            new AtomicLong(Long.MAX_VALUE);
-    private AtomicInteger ssdLimitStartTime =
-            new AtomicInteger(2500);
-    private AtomicInteger ssdLimitEndTime =
-            new AtomicInteger(TBaseConstants.META_VALUE_UNDEFINED);
     private AtomicLong minDataLimitDlt =
             new AtomicLong(Long.MAX_VALUE);
     private AtomicInteger dataLimitStartTime =
@@ -94,14 +86,12 @@ public class FlowCtrlRuleHandler {
     }
 
     /**
-     * @param ssdTranslateId
      * @param qyrPriorityId
      * @param flowCtrlId
      * @param flowCtrlInfo
      * @throws Exception
      */
-    public void updateDefFlowCtrlInfo(final long ssdTranslateId,
-                                      final int qyrPriorityId,
+    public void updateDefFlowCtrlInfo(final int qyrPriorityId,
                                       final long flowCtrlId,
                                       final String flowCtrlInfo) throws Exception {
         if (flowCtrlId == this.flowCtrlId.get()) {
@@ -118,9 +108,7 @@ public class FlowCtrlRuleHandler {
             logger.info(new StringBuilder(512)
                 .append("[Flow Ctrl] Updated ").append(flowCtrlName)
                 .append(" to flowId=").append(flowCtrlId)
-                .append(",ssdTranslateId=").append(ssdTranslateId)
                 .append(",qyrPriorityId=").append(qyrPriorityId).toString());
-            this.ssdTranslateId.set(ssdTranslateId);
             this.qryPriorityId.set(qyrPriorityId);
             clearStatisData();
             if (flowCtrlItemsMap == null
@@ -188,7 +176,6 @@ public class FlowCtrlRuleHandler {
     private void initialStatisData() {
         initialDataLimitStatisInfo();
         initialFreqLimitStatisInfo();
-        initialSSDProcLimitStatisInfo();
         initialLowFetchLimitStatisInfo();
     }
 
@@ -237,29 +224,6 @@ public class FlowCtrlRuleHandler {
         }
     }
 
-    private void initialSSDProcLimitStatisInfo() {
-        List<FlowCtrlItem> flowCtrlItemList = flowCtrlRuleSet.get(2);
-        if (flowCtrlItemList != null && !flowCtrlItemList.isEmpty()) {
-            for (FlowCtrlItem flowCtrlItem : flowCtrlItemList) {
-                if (flowCtrlItem == null) {
-                    continue;
-                }
-                if (flowCtrlItem.getType() != 2) {
-                    continue;
-                }
-                if (flowCtrlItem.getDltInM() < this.minSSDProcDlt.get()) {
-                    this.minSSDProcDlt.set(flowCtrlItem.getDltInM());
-                }
-                if (flowCtrlItem.getStartTime() < this.ssdLimitStartTime.get()) {
-                    this.ssdLimitStartTime.set(flowCtrlItem.getStartTime());
-                }
-                if (flowCtrlItem.getEndTime() > this.ssdLimitEndTime.get()) {
-                    this.ssdLimitEndTime.set(flowCtrlItem.getEndTime());
-                }
-            }
-        }
-    }
-
     private void initialLowFetchLimitStatisInfo() {
         List<FlowCtrlItem> flowCtrlItemList = flowCtrlRuleSet.get(3);
         if (flowCtrlItemList != null && !flowCtrlItemList.isEmpty()) {
@@ -280,43 +244,13 @@ public class FlowCtrlRuleHandler {
 
     private void clearStatisData() {
         this.minZeroCnt.set(Integer.MAX_VALUE);
-        this.minSSDProcDlt.set(Long.MAX_VALUE);
         this.minDataLimitDlt.set(Long.MAX_VALUE);
-        this.ssdLimitStartTime.set(2500);
-        this.ssdLimitEndTime.set(TBaseConstants.META_VALUE_UNDEFINED);
         this.dataLimitStartTime.set(2500);
         this.dataLimitEndTime.set(TBaseConstants.META_VALUE_UNDEFINED);
         this.filterCtrlItem = new FlowCtrlItem(3, TBaseConstants.META_VALUE_UNDEFINED,
                 TBaseConstants.META_VALUE_UNDEFINED, TBaseConstants.META_VALUE_UNDEFINED);
     }
 
-
-    public SSDCtrlResult getCurSSDStartDltInSZ() {
-        Calendar rightNow = Calendar.getInstance(timeZone);
-        int hour = rightNow.get(Calendar.HOUR_OF_DAY);
-        int minu = rightNow.get(Calendar.MINUTE);
-        int curTime = hour * 100 + minu;
-        if (curTime < this.ssdLimitStartTime.get()
-                || curTime > this.ssdLimitEndTime.get()) {
-            return new SSDCtrlResult(Long.MAX_VALUE, 0);
-        }
-        List<FlowCtrlItem> flowCtrlItemList = flowCtrlRuleSet.get(2);
-        if (flowCtrlItemList == null
-                || flowCtrlItemList.isEmpty()) {
-            return new SSDCtrlResult(Long.MAX_VALUE, 0);
-        }
-        for (FlowCtrlItem flowCtrlItem : flowCtrlItemList) {
-            if (flowCtrlItem == null) {
-                continue;
-            }
-            SSDCtrlResult ruleVal = flowCtrlItem.getSSDStartDltProc(hour, minu);
-            if (ruleVal != null && ruleVal.dataStartDltInSize > 0) {
-                return ruleVal;
-            }
-        }
-        return new SSDCtrlResult(Long.MAX_VALUE, 0);
-    }
-
     public int getMinZeroCnt() {
         return minZeroCnt.get();
     }
@@ -348,18 +282,6 @@ public class FlowCtrlRuleHandler {
         return rcmVal;
     }
 
-    public long getSsdTranslateId() {
-        return ssdTranslateId.get();
-    }
-
-
-    /**
-     * @param ssdTranslateId
-     */
-    public void setSsdTranslateId(long ssdTranslateId) {
-        this.ssdTranslateId.set(ssdTranslateId);
-    }
-
     public int getQryPriorityId() {
         return qryPriorityId.get();
     }
@@ -382,7 +304,6 @@ public class FlowCtrlRuleHandler {
             this.strFlowCtrlInfo = "";
             this.flowCtrlRuleSet.clear();
             this.flowCtrlId.set(TBaseConstants.META_VALUE_UNDEFINED);
-            this.ssdTranslateId.set(TBaseConstants.META_VALUE_UNDEFINED);
             this.qryPriorityId.set(TBaseConstants.META_VALUE_UNDEFINED);
         } finally {
             writeLock.unlock();
@@ -420,7 +341,7 @@ public class FlowCtrlRuleHandler {
                 int typeVal = jsonObject.get("type").getAsInt();
                 if (typeVal < 0 || typeVal > 3) {
                     throw new Exception(new StringBuilder(512)
-                            .append("type value must in [0,1,2,3] in index(")
+                            .append("type value must in [0,1,3] in index(")
                             .append(i).append(") of flowCtrlInfo value!").toString());
                 }
                 switch (typeVal) {
@@ -428,8 +349,8 @@ public class FlowCtrlRuleHandler {
                         flowCtrlItemList = parseFreqLimit(typeVal, jsonObject);
                         break;
 
-                    case 2:
-                        flowCtrlItemList = parseSSDProcLimit(typeVal, jsonObject);
+                    case 2:  /* Deprecated  */
+                        flowCtrlItemList = null;
                         break;
 
                     case 3:
@@ -670,86 +591,6 @@ public class FlowCtrlRuleHandler {
         return flowCtrlItems;
     }
 
-    /**
-     * @param typeVal
-     * @param jsonObject
-     * @return
-     * @throws Exception
-     */
-    private List<FlowCtrlItem> parseSSDProcLimit(int typeVal,
-                                                 JsonObject jsonObject) throws Exception {
-        if (jsonObject == null || jsonObject.get("type").getAsInt() != 2) {
-            throw new Exception("parse SSD limit rule failure!");
-        }
-        JsonArray ruleArray = jsonObject.get("rule").getAsJsonArray();
-        if (ruleArray == null) {
-            throw new Exception("not found rule list in SSD limit!");
-        }
-        ArrayList<FlowCtrlItem> flowCtrlItems = new ArrayList<>();
-        for (int index = 0; index < ruleArray.size(); index++) {
-            JsonObject ruleObject = ruleArray.get(index).getAsJsonObject();
-            int startTime = validAndGetTimeValue("start",
-                    ruleObject.get("start").getAsString(), index, "SSD");
-            int endTime = validAndGetTimeValue("end",
-                    ruleObject.get("end").getAsString(), index, "SSD");
-            if (startTime >= endTime) {
-                throw new Exception(new StringBuilder(512)
-                        .append("start value must be less than End value in index(")
-                        .append(index).append(") of SSD limit rule!").toString());
-            }
-            if (!ruleObject.has("dltStInM")) {
-                throw new Exception(new StringBuilder(512)
-                        .append("dltStInM key is required in index(")
-                        .append(index).append(") of SSD limit rule!").toString());
-            }
-            long dltStInM = ruleObject.get("dltStInM").getAsLong();
-            if (dltStInM < 512) {
-                throw new Exception(new StringBuilder(512)
-                        .append("dltStInM value must be greater than or equal to 512 in index(")
-                        .append(index).append(") of SSD limit rule!").toString());
-            }
-            if (!ruleObject.has("dltEdInM")) {
-                throw new Exception(new StringBuilder(512)
-                        .append("dltEdInM key is required in index(")
-                        .append(index).append(") of SSD limit rule!").toString());
-            }
-            long dataEndInM = ruleObject.get("dltEdInM").getAsLong();
-            if (dataEndInM < 0) {
-                throw new Exception(new StringBuilder(512)
-                        .append("dltEdInM value must be greater than or equal to zero in index(")
-                        .append(index).append(") of SSD limit rule!").toString());
-            }
-            if (dataEndInM < 512) {
-                throw new Exception(new StringBuilder(512)
-                        .append("dltStInM value must be greater than or equal to 512 in index(")
-                        .append(index).append(") of SSD limit rule!").toString());
-            }
-            if (dltStInM < dataEndInM) {
-                throw new Exception(new StringBuilder(512)
-                        .append("dltStInM value must be greater than ")
-                        .append("or equal to dltEdInM value in index(")
-                        .append(index).append(") of SSD limit rule!").toString());
-            }
-            dltStInM = (long) (dltStInM * 1024 * 1024);
-            dataEndInM = (long) (dataEndInM * 1024 * 1024);
-            flowCtrlItems.add(new FlowCtrlItem(typeVal, startTime, endTime, dltStInM, dataEndInM));
-        }
-
-        Collections.sort(flowCtrlItems, new Comparator<FlowCtrlItem>() {
-            @Override
-            public int compare(final FlowCtrlItem o1, final FlowCtrlItem o2) {
-                if (o1.getStartTime() > o2.getStartTime()) {
-                    return 1;
-                } else if (o1.getStartTime() < o2.getStartTime()) {
-                    return -1;
-                } else {
-                    return 0;
-                }
-            }
-        });
-        return flowCtrlItems;
-    }
-
     @Override
     public String toString() {
         return this.strFlowCtrlInfo;
diff --git a/tubemq-core/src/main/protobuf/BrokerService.proto b/tubemq-core/src/main/protobuf/BrokerService.proto
index af6608f..707e1ca 100644
--- a/tubemq-core/src/main/protobuf/BrokerService.proto
+++ b/tubemq-core/src/main/protobuf/BrokerService.proto
@@ -65,7 +65,7 @@ message RegisterRequestC2B {
     optional int64 currOffset = 8;
     optional string sessionKey = 9;
     optional int64 sessionTime = 10;
-    optional int64 ssdStoreId = 11;
+    optional int64 ssdStoreId = 11;  /* Deprecated  */
     optional int32 qryPriorityId = 12;
     optional AuthorizedInfo authInfo = 13;
 }
@@ -83,7 +83,7 @@ message HeartBeatRequestC2B {
     required int32 readStatus = 3;
     /* brokerId:host:port:topic:partitionId:delayTimeStamp */
     repeated string partitionInfo = 4;
-    optional int64 ssdStoreId = 5;
+    optional int64 ssdStoreId = 5;   /* Deprecated  */
     optional int32 qryPriorityId = 6;
     optional AuthorizedInfo authInfo = 7;
 }
diff --git a/tubemq-core/src/main/protobuf/MasterService.proto b/tubemq-core/src/main/protobuf/MasterService.proto
index cea6838..a95e794 100644
--- a/tubemq-core/src/main/protobuf/MasterService.proto
+++ b/tubemq-core/src/main/protobuf/MasterService.proto
@@ -126,7 +126,7 @@ message RegisterRequestC2M {
     optional bool selectBig = 13;
     optional int64 groupFlowCheckId = 14;
     optional int64 defFlowCheckId = 15;
-    optional int64 ssdStoreId = 16;
+    optional int64 ssdStoreId = 16; /* Deprecated  */
     optional int32 qryPriorityId = 17;
     optional MasterCertificateInfo authInfo = 18;
 }
@@ -142,7 +142,7 @@ message RegisterResponseM2C {
     optional string defFlowControlInfo = 7;
     optional int64 groupFlowCheckId = 8;
     optional string groupFlowControlInfo = 9;
-    optional int64 ssdStoreId = 10;
+    optional int64 ssdStoreId = 10; /* Deprecated  */
     optional int32 qryPriorityId = 11;
     optional MasterAuthorizedInfo authorizedInfo = 12;
 }
@@ -155,7 +155,7 @@ message HeartRequestC2M {
     optional EventProto event = 5;
     optional int64 defFlowCheckId = 6;
     optional int64 groupFlowCheckId = 7;
-    optional int64 ssdStoreId = 8;
+    optional int64 ssdStoreId = 8;  /* Deprecated  */
     optional int32 qryPriorityId = 9;
     optional MasterCertificateInfo authInfo = 10;
 }
@@ -170,7 +170,7 @@ message HeartResponseM2C {
     optional string defFlowControlInfo = 7;
     optional int64 groupFlowCheckId = 8;
     optional string groupFlowControlInfo = 9;
-    optional int64 ssdStoreId = 10;
+    optional int64 ssdStoreId = 10;   /* Deprecated  */
     optional int32 qryPriorityId = 11;
     optional bool requireAuth = 12;
     optional MasterAuthorizedInfo authorizedInfo = 13;
@@ -201,7 +201,7 @@ message RegisterRequestB2M {
     required string brokerDefaultConfInfo = 8;
     /* topic:partNum:acceptPublish:acceptSubscribe:unflushThreshold:unflushInterval:deleteWhen:deletePolicy:filterStatusId:statusId:attributes */
     repeated string brokerTopicSetConfInfo = 9;
-    optional int64 ssdStoreId = 10;
+    optional int64 ssdStoreId = 10;   /* Deprecated  */
     optional int64 flowCheckId = 11;
     optional int32 qryPriorityId = 12;
     optional int32 tlsPort = 13;
@@ -222,7 +222,7 @@ message RegisterResponseM2B {
     optional string brokerDefaultConfInfo = 10;
     /* topic:partNum:acceptPublish:acceptSubscribe:unflushThreshold:unflushInterval:deleteWhen:deletePolicy:filterStatusId:statusId:attributes */
     repeated string brokerTopicSetConfInfo = 11;
-    optional int64 ssdStoreId = 12;
+    optional int64 ssdStoreId = 12;   /* Deprecated  */
     optional int64 flowCheckId = 13;
     optional string flowControlInfo = 14;
     optional int32 qryPriorityId = 15;
@@ -244,7 +244,7 @@ message HeartRequestB2M {
     /* topic:partNum:acceptPublish:acceptSubscribe:unflushThreshold:unflushInterval:deleteWhen:deletePolicy:filterStatusId:statusId:attributes */
     repeated string brokerTopicSetConfInfo = 10;
     repeated string removedTopicsInfo = 11;
-    optional int64 ssdStoreId = 12;
+    optional int64 ssdStoreId = 12;  /* Deprecated  */
     optional int64 flowCheckId = 13;
     optional int32 qryPriorityId = 14;
     optional MasterCertificateInfo authInfo = 15;
@@ -267,7 +267,7 @@ message HeartResponseM2B {
     repeated string brokerTopicSetConfInfo = 12;
     /* topic:partNum:acceptPublish:acceptSubscribe:unflushThreshold:unflushInterval:deleteWhen:deletePolicy:filterStatusId:statusId:attributes */
     repeated string removeTopicConfInfo = 13;
-    optional int64 ssdStoreId = 14;
+    optional int64 ssdStoreId = 14; /* Deprecated  */
     optional int64 flowCheckId = 15;
     optional string flowControlInfo = 16;
     optional int32 qryPriorityId = 17;
diff --git a/tubemq-core/src/test/java/org/apache/tubemq/corebase/policies/TestFlowCtrlRuleHandler.java b/tubemq-core/src/test/java/org/apache/tubemq/corebase/policies/TestFlowCtrlRuleHandler.java
index ec14d81..5b84efa 100644
--- a/tubemq-core/src/test/java/org/apache/tubemq/corebase/policies/TestFlowCtrlRuleHandler.java
+++ b/tubemq-core/src/test/java/org/apache/tubemq/corebase/policies/TestFlowCtrlRuleHandler.java
@@ -41,7 +41,7 @@ public class TestFlowCtrlRuleHandler {
     public void testFlowCtrlRuleHandler() {
         try {
             FlowCtrlRuleHandler handler = new FlowCtrlRuleHandler(true);
-            handler.updateDefFlowCtrlInfo(1001, 2, 10, mockFlowCtrlInfo());
+            handler.updateDefFlowCtrlInfo(2, 10, mockFlowCtrlInfo());
             TimeZone timeZone = TimeZone.getTimeZone("GMT+8:00");
             Calendar rightNow = Calendar.getInstance(timeZone);
             int hour = rightNow.get(Calendar.HOUR_OF_DAY);
@@ -68,23 +68,12 @@ public class TestFlowCtrlRuleHandler {
             assertEquals(item.getZeroCnt(), 400);
             assertEquals(item.getFreqLtInMs(), 100);
 
-            // check ssd ctrl
-            SSDCtrlResult ssdResult = handler.getCurSSDStartDltInSZ();
-            if (curTime >= 1200) {
-                assertEquals(ssdResult.dataEndDLtInSz, 2048L * 1024L * 1024L);
-                assertEquals(ssdResult.dataStartDltInSize, 20480L * 1024L * 1024L);
-            } else {
-                assertEquals(ssdResult.dataEndDLtInSz, 0L);
-                assertEquals(ssdResult.dataStartDltInSize, Long.MAX_VALUE);
-            }
-
             //check values
             assertEquals(handler.getNormFreqInMs(), 100);
             assertEquals(handler.getFlowCtrlId(), 10);
             assertEquals(handler.getMinDataFreqInMs(), 400);
             assertEquals(handler.getMinZeroCnt(), 3);
             assertEquals(handler.getQryPriorityId(), 2);
-            assertEquals(handler.getSsdTranslateId(), 1001);
             assertEquals(handler.getFlowCtrlId(), 10);
 
             System.out.println();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerConfig.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerConfig.java
index 7644aed..c40e915 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerConfig.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerConfig.java
@@ -50,7 +50,6 @@ public class BrokerConfig extends AbstractFileConfig {
     // master service address
     private String masterAddressList;
     private String primaryPath;
-    private String secondDataPath;
     // tcp write service thread count
     private int tcpWriteServiceThread =
             Runtime.getRuntime().availableProcessors() * 2;
@@ -75,10 +74,6 @@ public class BrokerConfig extends AbstractFileConfig {
     private int indexTransCount = 1000;
     // rpc read timeout in milliseconds
     private long rpcReadTimeoutMs = 10 * 1000;
-    // max ssd file count
-    private int maxSSDTotalFileCnt = 70;
-    // max ssd file size
-    private long maxSSDTotalFileSizes = 32212254720L;
     // consumer register timeout in milliseconds
     private int consumerRegTimeoutMs = 30000;
     private boolean updateConsumerOffsets = true;
@@ -164,14 +159,6 @@ public class BrokerConfig extends AbstractFileConfig {
         return maxIndexSegmentSize;
     }
 
-    public int getMaxSSDTotalFileCnt() {
-        return maxSSDTotalFileCnt;
-    }
-
-    public long getMaxSSDTotalFileSizes() {
-        return maxSSDTotalFileSizes;
-    }
-
     public long getAuthValidTimeStampPeriodMs() {
         return authValidTimeStampPeriodMs;
     }
@@ -210,9 +197,6 @@ public class BrokerConfig extends AbstractFileConfig {
             throw new IllegalArgumentException("Require primaryPath not Blank!");
         }
         this.primaryPath = brokerSect.get("primaryPath").trim();
-        if (TStringUtils.isNotBlank(brokerSect.get("secondDataPath"))) {
-            this.secondDataPath = brokerSect.get("secondDataPath");
-        }
         if (TStringUtils.isBlank(brokerSect.get("hostName"))) {
             throw new IllegalArgumentException(new StringBuilder(256).append("hostName is null or Blank in ")
                     .append(SECT_TOKEN_BROKER).append(" section!").toString());
@@ -274,12 +258,6 @@ public class BrokerConfig extends AbstractFileConfig {
         if (TStringUtils.isNotBlank(brokerSect.get("maxIndexSegmentSize"))) {
             this.maxIndexSegmentSize = getInt(brokerSect, "maxIndexSegmentSize");
         }
-        if (TStringUtils.isNotBlank(brokerSect.get("maxSSDTotalFileCnt"))) {
-            this.maxSSDTotalFileCnt = getInt(brokerSect, "maxSSDTotalFileCnt");
-        }
-        if (TStringUtils.isNotBlank(brokerSect.get("maxSSDTotalFileSizes"))) {
-            this.maxSSDTotalFileSizes = getLong(brokerSect, "maxSSDTotalFileSizes");
-        }
         if (!TStringUtils.isBlank(brokerSect.get("updateConsumerOffsets"))) {
             this.updateConsumerOffsets = getBoolean(brokerSect, "updateConsumerOffsets");
         }
@@ -457,10 +435,6 @@ public class BrokerConfig extends AbstractFileConfig {
         return this.primaryPath;
     }
 
-    public String getSecondDataPath() {
-        return this.secondDataPath;
-    }
-
     public int getWebPort() {
         return webPort;
     }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
index 6cd99b1..5cd6bf8 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
@@ -378,11 +378,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
                         msgResult.lastRdDataOffset,
                         msgResult.totalMsgSize);
                 getCounterGroup.add(msgResult.tmpCounters);
-                if (msgResult.isFromSsdFile) {
-                    builder.setEscFlowCtrl(true);
-                } else {
-                    builder.setEscFlowCtrl(false);
-                }
+                builder.setEscFlowCtrl(false);
                 builder.setRequireSlow(msgResult.isSlowFreq);
                 builder.setSuccess(true);
                 builder.setErrCode(TErrCodeConstants.SUCCESS);
@@ -839,16 +835,10 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
             final long reqOffset = request.hasCurrOffset() ? request.getCurrOffset() : -1;
             long reqSessionTime = request.hasSessionTime() ? request.getSessionTime() : -1;
             String reqSessionKey = request.hasSessionKey() ? request.getSessionKey() : null;
-            long reqSsdStoreId = request.hasSsdStoreId()
-                    ? request.getSsdStoreId() : TBaseConstants.META_VALUE_UNDEFINED;
             int reqQryPriorityId = request.hasQryPriorityId()
                     ? request.getQryPriorityId() : TBaseConstants.META_VALUE_UNDEFINED;
-            boolean needSsdProc =
-                    ((reqSsdStoreId != TBaseConstants.META_VALUE_UNDEFINED)
-                            && (reqSsdStoreId == this.metadataManager.getFlowCtrlRuleHandler().getSsdTranslateId()));
             consumerRegisterMap.put(partStr, new ConsumerNodeInfo(storeManager, reqQryPriorityId,
-                    clientId, filterCondSet, reqSessionKey, reqSessionTime, reqSsdStoreId, needSsdProc,
-                    request.hasSsdStoreId(), partStr));
+                    clientId, filterCondSet, reqSessionKey, reqSessionTime, true, partStr));
             heartbeatManager.regConsumerNode(getHeartbeatNodeId(clientId, partStr), clientId, partStr);
             MessageStore dataStore = null;
             try {
@@ -876,7 +866,6 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
                     .append(TokenConstants.SEGMENT_SEP).append(partStr)
                     .append(TokenConstants.SEGMENT_SEP).append(offsetInfo)
                     .append(", reqOffset=").append(reqOffset)
-                    .append(", reqSsdStoreId=").append(reqSsdStoreId)
                     .append(", reqQryPriorityId=").append(reqQryPriorityId)
                     .append(", isOverTLS=").append(overtls).toString());
             builder.setSuccess(true);
@@ -1014,8 +1003,6 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
             return builder.build();
         }
         final String groupName = (String) paramCheckResult.checkData;
-        long reqSsdStoreId = request.hasSsdStoreId()
-                ? request.getSsdStoreId() : TBaseConstants.META_VALUE_UNDEFINED;
         int reqQryPriorityId = request.hasQryPriorityId()
                 ? request.getQryPriorityId() : TBaseConstants.META_VALUE_UNDEFINED;
         List<Partition> partitions =
@@ -1076,11 +1063,6 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
                 strBuffer.delete(0, strBuffer.length());
                 continue;
             }
-            if (consumerNodeInfo.getSsdTransId() != reqSsdStoreId) {
-                boolean needSsdProc = ((reqSsdStoreId != TBaseConstants.META_VALUE_UNDEFINED)
-                        && (reqSsdStoreId == metadataManager.getFlowCtrlRuleHandler().getSsdTranslateId()));
-                consumerNodeInfo.setSsdTransId(reqSsdStoreId, needSsdProc);
-            }
             if (consumerNodeInfo.getQryPriorityId() != reqQryPriorityId) {
                 consumerNodeInfo.setQryPriorityId(reqQryPriorityId);
             }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/TubeBroker.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/TubeBroker.java
index a30a41d..6501378 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/TubeBroker.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/TubeBroker.java
@@ -242,7 +242,6 @@ public class TubeBroker implements Stoppable, Runnable {
                             FlowCtrlRuleHandler flowCtrlRuleHandler =
                                 metadataManager.getFlowCtrlRuleHandler();
                             long flowCheckId = flowCtrlRuleHandler.getFlowCtrlId();
-                            long ssdTranslateId = flowCtrlRuleHandler.getSsdTranslateId();
                             int qryPriorityId = flowCtrlRuleHandler.getQryPriorityId();
                             ServiceStatusHolder
                                 .setReadWriteServiceStatus(response.getStopRead(),
@@ -252,20 +251,15 @@ public class TubeBroker implements Stoppable, Runnable {
                                     ? response.getQryPriorityId() : qryPriorityId;
                                 if (response.getFlowCheckId() != flowCheckId) {
                                     flowCheckId = response.getFlowCheckId();
-                                    ssdTranslateId = response.getSsdStoreId();
                                     try {
                                         flowCtrlRuleHandler
-                                            .updateDefFlowCtrlInfo(ssdTranslateId,
-                                                qryPriorityId, flowCheckId, response.getFlowControlInfo());
+                                            .updateDefFlowCtrlInfo(qryPriorityId,
+                                                flowCheckId, response.getFlowControlInfo());
                                     } catch (Exception e1) {
                                         logger.warn(
                                             "[HeartBeat response] found parse flowCtrl rules failure", e1);
                                     }
                                 }
-                                if (response.getSsdStoreId() != ssdTranslateId) {
-                                    ssdTranslateId = response.getSsdStoreId();
-                                    flowCtrlRuleHandler.setSsdTranslateId(ssdTranslateId);
-                                }
                                 if (qryPriorityId != flowCtrlRuleHandler.getQryPriorityId()) {
                                     flowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
                                 }
@@ -281,7 +275,6 @@ public class TubeBroker implements Stoppable, Runnable {
                                     .append(",configCheckSumId=").append(response.getConfCheckSumId())
                                     .append(",hasFlowCtrl=").append(response.hasFlowCheckId())
                                     .append(",curFlowCtrlId=").append(flowCheckId)
-                                    .append(",curSsdTranslateId=").append(ssdTranslateId)
                                     .append(",curQryPriorityId=").append(qryPriorityId)
                                     .append(",brokerDefaultConfInfo=")
                                     .append(response.getBrokerDefaultConfInfo())
@@ -425,15 +418,12 @@ public class TubeBroker implements Stoppable, Runnable {
                     if (response.getFlowCheckId() != flowCtrlRuleHandler.getFlowCtrlId()) {
                         try {
                             flowCtrlRuleHandler
-                                .updateDefFlowCtrlInfo(response.getSsdStoreId(), response.getQryPriorityId(),
+                                .updateDefFlowCtrlInfo(response.getQryPriorityId(),
                                     response.getFlowCheckId(), response.getFlowControlInfo());
                         } catch (Exception e1) {
                             logger.warn("[Register response] found parse flowCtrl rules failure", e1);
                         }
                     }
-                    if (response.getSsdStoreId() != flowCtrlRuleHandler.getSsdTranslateId()) {
-                        flowCtrlRuleHandler.setSsdTranslateId(response.getSsdStoreId());
-                    }
                     if (qryPriorityId != flowCtrlRuleHandler.getQryPriorityId()) {
                         flowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
                     }
@@ -456,7 +446,6 @@ public class TubeBroker implements Stoppable, Runnable {
                     .append(",enableConsumeAuthorize=")
                     .append(serverAuthHandler.isEnableConsumeAuthorize())
                     .append(",curFlowCtrlId=").append(flowCtrlRuleHandler.getFlowCtrlId())
-                    .append(",curSsdTranslateId=").append(flowCtrlRuleHandler.getSsdTranslateId())
                     .append(",curQryPriorityId=").append(flowCtrlRuleHandler.getQryPriorityId())
                     .append(",brokerDefaultConfInfo=").append(response.getBrokerDefaultConfInfo())
                     .append(",brokerTopicSetConfList=")
@@ -505,7 +494,6 @@ public class TubeBroker implements Stoppable, Runnable {
                 metadataManager.getFlowCtrlRuleHandler();
         builder.setFlowCheckId(flowCtrlRuleHandler.getFlowCtrlId());
         builder.setQryPriorityId(flowCtrlRuleHandler.getQryPriorityId());
-        builder.setSsdStoreId(flowCtrlRuleHandler.getSsdTranslateId());
         String brokerDefaultConfInfo = metadataManager.getBrokerDefMetaConfInfo();
         if (brokerDefaultConfInfo != null) {
             builder.setBrokerDefaultConfInfo(brokerDefaultConfInfo);
@@ -526,7 +514,6 @@ public class TubeBroker implements Stoppable, Runnable {
             .append(",isTlsEnable=").append(tubeConfig.isTlsEnable())
             .append(",TlsPort=").append(tubeConfig.getTlsPort())
             .append(",flowCtrlId=").append(flowCtrlRuleHandler.getFlowCtrlId())
-            .append(",SSDTranslateId=").append(flowCtrlRuleHandler.getSsdTranslateId())
             .append(",QryPriorityId=").append(flowCtrlRuleHandler.getQryPriorityId())
             .append(",configCheckSumId=").append(metadataManager.getBrokerConfCheckSumId())
             .append(",brokerDefaultConfInfo=").append(brokerDefaultConfInfo)
@@ -551,7 +538,6 @@ public class TubeBroker implements Stoppable, Runnable {
                 metadataManager.getFlowCtrlRuleHandler();
         builder.setFlowCheckId(flowCtrlRuleHandler.getFlowCtrlId());
         builder.setQryPriorityId(flowCtrlRuleHandler.getQryPriorityId());
-        builder.setSsdStoreId(flowCtrlRuleHandler.getSsdTranslateId());
         builder.setTakeConfInfo(false);
         builder.setTakeRemovedTopicInfo(false);
         List<String> removedTopics = this.metadataManager.getHardRemovedTopics();
@@ -573,7 +559,6 @@ public class TubeBroker implements Stoppable, Runnable {
                 .append(",readStatusRpt=").append(builder.getReadStatusRpt())
                 .append(",writeStatusRpt=").append(builder.getWriteStatusRpt())
                 .append(",flowCtrlId=").append(flowCtrlRuleHandler.getFlowCtrlId())
-                .append(",SSDTranslateId=").append(flowCtrlRuleHandler.getSsdTranslateId())
                 .append(",QryPriorityId=").append(flowCtrlRuleHandler.getQryPriorityId())
                 .append(",ReadStatusRpt=").append(builder.getReadStatusRpt())
                 .append(",WriteStatusRpt=").append(builder.getWriteStatusRpt())
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
index 7dbef6d..66f4b5b 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
@@ -44,7 +44,6 @@ import org.apache.tubemq.server.broker.msgstore.disk.Segment;
 import org.apache.tubemq.server.broker.msgstore.mem.GetCacheMsgResult;
 import org.apache.tubemq.server.broker.msgstore.mem.MsgMemStatisInfo;
 import org.apache.tubemq.server.broker.msgstore.mem.MsgMemStore;
-import org.apache.tubemq.server.broker.msgstore.ssd.SSDSegFound;
 import org.apache.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
 import org.apache.tubemq.server.broker.stats.CountItem;
 import org.apache.tubemq.server.broker.utils.DataStoreUtils;
@@ -53,7 +52,7 @@ import org.slf4j.LoggerFactory;
 
 /***
  * Topic's message storage. It's a logical topic storage. Contains multi types storage: data in memory,
- * data in disk, data in ssd, and statistics of produce and consume.
+ * data in disk, and statistics of produce and consume.
  */
 public class MessageStore implements Closeable {
     private static final Logger logger = LoggerFactory.getLogger(MessageStore.class);
@@ -261,36 +260,27 @@ public class MessageStore implements Closeable {
         indexRecordView.read(indexBuffer, reqNewOffset);
         indexBuffer.flip();
         indexRecordView.relViewRef();
-        // judge whether read from ssd or disk.
-        if (consumerNodeInfo.processFromSsdFile()) {
-            return msgStoreMgr.getSsdMessage(storeKey, consumerNodeInfo.getPartStr(),
-                    consumerNodeInfo.getStartSsdDataOffset(),
-                    consumerNodeInfo.getLastDataRdOffset(),
-                    partitionId, reqNewOffset, indexBuffer,
-                    msgSizeLimit, statisKeyBase);
-        } else {
-            if ((msgFileStore.getDataHighMaxOffset() - consumerNodeInfo.getLastDataRdOffset()
-                    >= this.tubeConfig.getDoubleDefaultDeduceReadSize())
-                    && msgSizeLimit > this.maxAllowRdSize) {
-                msgSizeLimit = this.maxAllowRdSize;
-            }
-            GetMessageResult retResult =
-                    msgFileStore.getMessages(partitionId,
-                            consumerNodeInfo.getLastDataRdOffset(), reqNewOffset,
-                            indexBuffer, consumerNodeInfo.isFilterConsume(),
-                            consumerNodeInfo.getFilterCondCodeSet(),
-                            statisKeyBase, msgSizeLimit);
-            if (consumerNodeInfo.isFilterConsume()
-                    && retResult.isSuccess
-                    && retResult.getLastReadOffset() > 0) {
-                if ((msgFileStore.getIndexMaxHighOffset()
-                        - reqNewOffset - retResult.getLastReadOffset())
-                        < fileLowReqMaxFilterIndexReadSize.get()) {
-                    retResult.setSlowFreq(true);
-                }
+        if ((msgFileStore.getDataHighMaxOffset() - consumerNodeInfo.getLastDataRdOffset()
+            >= this.tubeConfig.getDoubleDefaultDeduceReadSize())
+            && msgSizeLimit > this.maxAllowRdSize) {
+            msgSizeLimit = this.maxAllowRdSize;
+        }
+        GetMessageResult retResult =
+            msgFileStore.getMessages(partitionId,
+                consumerNodeInfo.getLastDataRdOffset(), reqNewOffset,
+                indexBuffer, consumerNodeInfo.isFilterConsume(),
+                consumerNodeInfo.getFilterCondCodeSet(),
+                statisKeyBase, msgSizeLimit);
+        if (consumerNodeInfo.isFilterConsume()
+            && retResult.isSuccess
+            && retResult.getLastReadOffset() > 0) {
+            if ((msgFileStore.getIndexMaxHighOffset()
+                - reqNewOffset - retResult.getLastReadOffset())
+                < fileLowReqMaxFilterIndexReadSize.get()) {
+                retResult.setSlowFreq(true);
             }
-            return retResult;
         }
+        return retResult;
     }
 
     /***
@@ -544,11 +534,6 @@ public class MessageStore implements Closeable {
         return totalSize;
     }
 
-    public SSDSegFound getSourceSegment(final long offset,
-                                        final int rate) throws IOException {
-        return this.msgFileStore.getSourceSegment(offset, rate);
-    }
-
     public long getDataStoreSize() {
         long totalSize = 0L;
         this.writeCacheMutex.readLock().lock();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java
index 344c9b6..6275f3a 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java
@@ -21,7 +21,6 @@ import java.beans.PropertyChangeEvent;
 import java.beans.PropertyChangeListener;
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -50,8 +49,6 @@ import org.apache.tubemq.server.broker.exception.StartupException;
 import org.apache.tubemq.server.broker.metadata.MetadataManager;
 import org.apache.tubemq.server.broker.metadata.TopicMetadata;
 import org.apache.tubemq.server.broker.msgstore.disk.GetMessageResult;
-import org.apache.tubemq.server.broker.msgstore.ssd.MsgSSDStoreManager;
-import org.apache.tubemq.server.broker.msgstore.ssd.SSDSegFound;
 import org.apache.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
 import org.apache.tubemq.server.broker.utils.DataStoreUtils;
 import org.apache.tubemq.server.common.TStatusConstants;
@@ -71,8 +68,6 @@ public class MessageStoreManager implements StoreService {
     private final ConcurrentHashMap<String/* topic */,
             ConcurrentHashMap<Integer/* storeId */, MessageStore>> dataStores =
             new ConcurrentHashMap<>();
-    // ssd store manager
-    private final MsgSSDStoreManager msgSsdStoreManager;
     // store service status
     private final AtomicBoolean stopped = new AtomicBoolean(false);
     // data expire operation scheduler.
@@ -97,7 +92,6 @@ public class MessageStoreManager implements StoreService {
         this.maxMsgTransferSize =
                 tubeConfig.getTransferSize() > DataStoreUtils.MAX_MSG_TRANSFER_SIZE
                         ? DataStoreUtils.MAX_MSG_TRANSFER_SIZE : tubeConfig.getTransferSize();
-        this.msgSsdStoreManager = new MsgSSDStoreManager(this, this.tubeConfig);
         this.metadataManager.addPropertyChangeListener("topicConfigMap", new PropertyChangeListener() {
             @Override
             public void propertyChange(final PropertyChangeEvent evt) {
@@ -136,7 +130,6 @@ public class MessageStoreManager implements StoreService {
     @Override
     public void start() {
         try {
-            this.msgSsdStoreManager.loadSSDStores();
             this.loadMessageStores(this.tubeConfig);
         } catch (final IOException e) {
             logger.error("[Store Manager] load message stores failed", e);
@@ -171,7 +164,6 @@ public class MessageStoreManager implements StoreService {
             this.logClearScheduler.shutdownNow();
             this.unFlushDiskScheduler.shutdownNow();
             this.unFlushMemScheduler.shutdownNow();
-            this.msgSsdStoreManager.close();
             for (Map.Entry<String, ConcurrentHashMap<Integer, MessageStore>> entry :
                     this.dataStores.entrySet()) {
                 if (entry.getValue() != null) {
@@ -345,52 +337,6 @@ public class MessageStoreManager implements StoreService {
         return this.tubeBroker;
     }
 
-    public boolean isSsdServiceStart() {
-        return this.msgSsdStoreManager.isSsdServiceInUse();
-    }
-
-    public boolean putSsdTransferReq(final String partStr,
-                                     final String storeKey, final long startOffset) {
-        if (this.msgSsdStoreManager.isSsdServiceInUse()) {
-            return this.msgSsdStoreManager.requestSsdTransfer(partStr, storeKey, startOffset, 1);
-        }
-        return false;
-    }
-
-    /***
-     * Get messages from ssd.
-     *
-     * @param storeKey
-     * @param partStr
-     * @param ssdStartDataOffset
-     * @param lastRDOffset
-     * @param partitionId
-     * @param reqOffset
-     * @param indexBuffer
-     * @param msgDataSizeLimit
-     * @param statisKeyBase
-     * @return
-     * @throws IOException
-     */
-    public GetMessageResult getSsdMessage(final String storeKey,
-                                           final String partStr,
-                                           final long ssdStartDataOffset,
-                                           final long lastRDOffset,
-                                           final int partitionId,
-                                           final long reqOffset,
-                                           final ByteBuffer indexBuffer,
-                                           int msgDataSizeLimit,
-                                           final String statisKeyBase) throws IOException {
-        if (this.msgSsdStoreManager.isSsdServiceInUse()) {
-            return msgSsdStoreManager.getMessages(storeKey, partStr,
-                    ssdStartDataOffset, lastRDOffset,
-                    partitionId, reqOffset, indexBuffer,
-                    msgDataSizeLimit, statisKeyBase);
-        }
-        return new GetMessageResult(false, TErrCodeConstants.INTERNAL_SERVER_ERROR,
-                reqOffset, 0, "SSD StoreService not in use!");
-    }
-
     /***
      * Get message from store.
      *
@@ -427,19 +373,6 @@ public class MessageStoreManager implements StoreService {
         }
     }
 
-    public SSDSegFound getSourceSegment(final String topic, final int storeId,
-                                        final long offset, final int rate) throws IOException {
-        ConcurrentHashMap<Integer, MessageStore> map = this.dataStores.get(topic);
-        if (map == null) {
-            return new SSDSegFound(false, 1, null);
-        }
-        MessageStore messageStore = map.get(storeId);
-        if (messageStore == null) {
-            return new SSDSegFound(false, 2, null);
-        }
-        return messageStore.getSourceSegment(offset, rate);
-    }
-
     public MetadataManager getMetadataManager() {
         return tubeBroker.getMetadataManager();
     }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
index cf8efb6..221fcc6 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
@@ -36,7 +36,6 @@ import org.apache.tubemq.corebase.protobuf.generated.ClientBroker;
 import org.apache.tubemq.corebase.utils.ServiceStatusHolder;
 import org.apache.tubemq.server.broker.BrokerConfig;
 import org.apache.tubemq.server.broker.msgstore.MessageStore;
-import org.apache.tubemq.server.broker.msgstore.ssd.SSDSegFound;
 import org.apache.tubemq.server.broker.stats.CountItem;
 import org.apache.tubemq.server.broker.utils.DataStoreUtils;
 import org.apache.tubemq.server.broker.utils.DiskSamplePrint;
@@ -472,29 +471,6 @@ public class MsgFileStore implements Closeable {
         return this.indexSegments.getMinOffset();
     }
 
-    /***
-     * Read from ssd file.
-     *
-     * @param offset
-     * @param rate
-     * @return
-     * @throws IOException
-     */
-    public SSDSegFound getSourceSegment(final long offset, final int rate) throws IOException {
-
-        final Segment segment = this.dataSegments.findSegment(offset);
-        if (segment == null) {
-            return new SSDSegFound(false, -1, null);
-        }
-        long dataSize = segment.getCachedSize();
-        if ((dataSize - offset + segment.getStart()) < dataSize * rate / 100) {
-            return new SSDSegFound(false, -2, null,
-                    segment.getStart(), segment.getStart() + segment.getCachedSize());
-        }
-        return new SSDSegFound(true, 0, segment.getFile(),
-                segment.getStart(), segment.getStart() + segment.getCachedSize());
-    }
-
     public Segment indexSlice(final long offset, final int maxSize) throws IOException {
         return indexSegments.getRecordSeg(offset);
     }
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/MsgSSDSegment.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/MsgSSDSegment.java
deleted file mode 100644
index c3adb5c..0000000
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/MsgSSDSegment.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tubemq.server.broker.msgstore.ssd;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.tubemq.corebase.TErrCodeConstants;
-import org.apache.tubemq.corebase.protobuf.generated.ClientBroker;
-import org.apache.tubemq.server.broker.msgstore.disk.FileSegment;
-import org.apache.tubemq.server.broker.msgstore.disk.GetMessageResult;
-import org.apache.tubemq.server.broker.msgstore.disk.Segment;
-import org.apache.tubemq.server.broker.msgstore.disk.SegmentType;
-import org.apache.tubemq.server.broker.stats.CountItem;
-import org.apache.tubemq.server.broker.utils.DataStoreUtils;
-import org.apache.tubemq.server.common.TServerConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/***
- * Messages in ssd format, it same to messages in disk.
- */
-public class MsgSSDSegment implements Closeable {
-    private static final Logger logger = LoggerFactory.getLogger(MsgSSDSegment.class);
-    private final Segment dataSegment;
-    private final ConcurrentHashMap<String, SSDVisitInfo> visitMap =
-        new ConcurrentHashMap<>();
-    private final String topic;
-    private final String storeKey;
-    private AtomicLong lastReadTime = new AtomicLong(System.currentTimeMillis());
-    private AtomicInteger curStatus = new AtomicInteger(0);   // 0:无效  1:有效; 2:老化中
-    private AtomicLong expiredWait = new AtomicLong(System.currentTimeMillis());
-
-
-    public MsgSSDSegment(final String storeKey, final String topic,
-                         final long start, final File file) throws IOException {
-        this.topic = topic;
-        this.storeKey = storeKey;
-        this.dataSegment = new FileSegment(start, file, false, SegmentType.DATA);
-        this.curStatus.set(1);
-        this.lastReadTime.set(System.currentTimeMillis());
-    }
-
-    /***
-     * Get messages from ssd.
-     *
-     * @param partStr
-     * @param partitionId
-     * @param reqOffset
-     * @param lastRDOffset
-     * @param indexBuffer
-     * @param isFilterConsume
-     * @param filterKeys
-     * @param statisKeyBase
-     * @param maxMsgTransferSize
-     * @param strBuffer
-     * @return
-     */
-    public GetMessageResult getMessages(final String partStr, final int partitionId,
-                                        final long reqOffset, final long lastRDOffset,
-                                        final ByteBuffer indexBuffer,
-                                        final boolean isFilterConsume, final List<Integer> filterKeys,
-                                        final String statisKeyBase, final int maxMsgTransferSize,
-                                        final StringBuilder strBuffer) {
-        // #lizard forgives
-        int retCode = 0;
-        int totalSize = 0;
-        String errInfo = "Ok";
-        boolean result = true;
-        int dataRealLimit = 0;
-        int curIndexPartitionId = 0;
-        long curIndexDataOffset = 0L;
-        int curIndexDataSize = 0;
-        int curIndexKeyCode = 0;
-        int readedOffset = 0;
-        long recvTimeInMillsec = 0L;
-        long maxDataLimitOffset = 0L;
-        long lastRdDataOffset = -1L;
-        final long curDataMaxOffset = getDataMaxOffset();
-        final long curDataMinOffset = getDataMinOffset();
-        HashMap<String, CountItem> countMap = new HashMap<>();
-        ByteBuffer dataBuffer =
-            ByteBuffer.allocate(TServerConstants.CFG_STORE_DEFAULT_MSG_READ_UNIT);
-        List<ClientBroker.TransferedMessage> transferedMessageList =
-            new ArrayList<>();
-        if (this.dataSegment == null) {
-            logger.error(strBuffer.append("[SSD Store] Found SSD fileRecordView is null! storeKey=")
-                .append(this.storeKey).toString());
-            strBuffer.delete(0, strBuffer.length());
-            return new GetMessageResult(false, TErrCodeConstants.INTERNAL_SERVER_ERROR_MSGSET_NULL,
-                reqOffset, 0, "SSD fileRecordView is null!");
-        }
-        SSDVisitInfo ssdVisitInfo = visitMap.get(partStr);
-        if (ssdVisitInfo == null) {
-            SSDVisitInfo ssdVisitInfo1 = new SSDVisitInfo(partStr, lastRDOffset);
-            ssdVisitInfo =
-                visitMap.putIfAbsent(partStr, ssdVisitInfo1);
-            if (ssdVisitInfo == null) {
-                ssdVisitInfo = ssdVisitInfo1;
-            }
-        }
-        lastReadTime.set(System.currentTimeMillis());
-        ssdVisitInfo.requestVisit(lastRDOffset);
-        // read data by index.
-        for (int curIndexOffset = 0;
-             curIndexOffset < indexBuffer.remaining();
-             curIndexOffset += DataStoreUtils.STORE_INDEX_HEAD_LEN) {
-            curIndexPartitionId = indexBuffer.getInt();
-            curIndexDataOffset = indexBuffer.getLong();
-            curIndexDataSize = indexBuffer.getInt();
-            curIndexKeyCode = indexBuffer.getInt();
-            recvTimeInMillsec = indexBuffer.getLong();
-            maxDataLimitOffset = curIndexDataOffset + curIndexDataSize;
-            if (curIndexDataOffset < 0
-                || curIndexPartitionId < 0
-                || curIndexDataSize <= 0
-                || curIndexDataSize > DataStoreUtils.STORE_MAX_MESSAGE_STORE_LEN) {
-                readedOffset = curIndexOffset + DataStoreUtils.STORE_INDEX_HEAD_LEN;
-                continue;
-            }
-            if (curIndexDataOffset < curDataMinOffset
-                || curIndexDataOffset >= curDataMaxOffset) {
-                lastRdDataOffset = curIndexDataOffset;
-                break;
-            }
-            if (curIndexPartitionId != partitionId
-                || maxDataLimitOffset > curDataMaxOffset
-                || (isFilterConsume && !filterKeys.contains(curIndexKeyCode))) {
-                lastRdDataOffset = maxDataLimitOffset;
-                readedOffset = curIndexOffset + DataStoreUtils.STORE_INDEX_HEAD_LEN;
-                continue;
-            }
-            try {
-                if (dataBuffer.capacity() < curIndexDataSize) {
-                    dataBuffer = ByteBuffer.allocate(curIndexDataSize);
-                }
-                dataBuffer.clear();
-                dataBuffer.limit(curIndexDataSize);
-                dataSegment.read(dataBuffer, curIndexDataOffset);
-                dataBuffer.flip();
-                dataRealLimit = dataBuffer.limit();
-                if (dataRealLimit < curIndexDataSize) {
-                    lastRdDataOffset = curIndexDataOffset;
-                    readedOffset = curIndexOffset + DataStoreUtils.STORE_INDEX_HEAD_LEN;
-                    continue;
-                }
-            } catch (Throwable e2) {
-                strBuffer.delete(0, strBuffer.length());
-                logger.warn(strBuffer
-                    .append("[SSD Store] Get message from file failure,storeKey=")
-                    .append(this.storeKey).append(", partitionId=")
-                    .append(partitionId).toString(), e2);
-                retCode = TErrCodeConstants.INTERNAL_SERVER_ERROR;
-                strBuffer.delete(0, strBuffer.length());
-                errInfo = strBuffer.append("Get message from file failure : ")
-                    .append(e2.getCause()).toString();
-                strBuffer.delete(0, strBuffer.length());
-                result = false;
-                break;
-            }
-            lastRdDataOffset = maxDataLimitOffset;
-            readedOffset = curIndexOffset + DataStoreUtils.STORE_INDEX_HEAD_LEN;
-            ClientBroker.TransferedMessage transferedMessage =
-                DataStoreUtils.getTransferMsg(dataBuffer,
-                    curIndexDataSize, countMap, statisKeyBase, strBuffer);
-            if (transferedMessage == null) {
-                continue;
-            }
-            transferedMessageList.add(transferedMessage);
-            totalSize += curIndexDataSize;
-            if (totalSize >= maxMsgTransferSize) {
-                break;
-            }
-        }
-        if (retCode != 0) {
-            if (!transferedMessageList.isEmpty()) {
-                retCode = 0;
-                errInfo = "Ok";
-            }
-        }
-        lastReadTime.set(System.currentTimeMillis());
-        ssdVisitInfo.responseVisit(lastRdDataOffset);
-        if (lastRdDataOffset <= 0L) {
-            lastRdDataOffset = lastRDOffset;
-        }
-        return new GetMessageResult(result, retCode, errInfo,
-            reqOffset, readedOffset, lastRdDataOffset,
-            totalSize, countMap, transferedMessageList, true);
-    }
-
-    public void setInUse() {
-        this.curStatus.set(1);
-        this.lastReadTime.set(System.currentTimeMillis());
-        this.expiredWait.set(System.currentTimeMillis());
-    }
-
-    public Segment getDataSegment() {
-        return dataSegment;
-    }
-
-    public ConcurrentHashMap<String, SSDVisitInfo> getVisitMap() {
-        return visitMap;
-    }
-
-    @Override
-    public void close() throws IOException {
-        this.curStatus.set(0);
-        try {
-            if (this.dataSegment != null) {
-                this.dataSegment.close();
-            }
-        } catch (Throwable e1) {
-            logger.error(new StringBuilder(512).append("[SSD Store] Close ")
-                .append(this.dataSegment.getFile().getAbsolutePath())
-                .append("'s file failure").toString(), e1);
-        }
-    }
-
-    public long getDataMaxOffset() {
-        return dataSegment.getLast();
-    }
-
-    public long getDataMinOffset() {
-        return this.dataSegment.getStart();
-    }
-
-    public int getOffsetPos(long dataOffset) {
-        if (dataOffset < dataSegment.getStart()) {
-            return -1;
-        } else if (dataOffset >= dataSegment.getStart() + dataSegment.getCachedSize()) {
-            return 1;
-        }
-        return 0;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof MsgSSDSegment)) {
-            return false;
-        }
-        MsgSSDSegment that = (MsgSSDSegment) o;
-        if (expiredWait.get() != that.expiredWait.get()) {
-            return false;
-        }
-        if (!curStatus.equals(that.curStatus)) {
-            return false;
-        }
-        return storeKey.equals(that.storeKey);
-
-    }
-
-    @Override
-    public int hashCode() {
-        int result = curStatus.hashCode();
-        result = 31 * result + storeKey.hashCode();
-        result = 31 * result + (int) (expiredWait.get() ^ (expiredWait.get() >>> 32));
-        return result;
-    }
-
-    public void closeConsumeRel(final String partStr) {
-        this.visitMap.remove(partStr);
-    }
-
-    public String getStoreKey() {
-        return this.storeKey;
-    }
-
-    public long getDataSizeInBytes() {
-        return dataSegment.getCachedSize();
-    }
-
-    public boolean isSegmentValid() {
-        long curTime = System.currentTimeMillis();
-        return !(this.curStatus.get() > 1
-            || (this.curStatus.get() > 0
-            && curTime - this.lastReadTime.get() >= 15 * 1000));
-    }
-
-    public int getCurStatus() {
-        return curStatus.get();
-    }
-
-    public boolean setAndGetSegmentExpired() {
-        long curTime = System.currentTimeMillis();
-        if (this.curStatus.get() > 0
-            && this.visitMap.isEmpty()
-            && (curTime - this.lastReadTime.get() >= 15 * 1000)) {
-            if (this.curStatus.get() == 1) {
-                this.curStatus.set(2);
-                this.expiredWait.set(curTime);
-                return false;
-            } else if (curTime - this.expiredWait.get() >= 15 * 1000) {
-                return true;
-            }
-            return false;
-        }
-        return false;
-    }
-
-}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/MsgSSDStoreManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/MsgSSDStoreManager.java
deleted file mode 100644
index 0be8806..0000000
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/MsgSSDStoreManager.java
+++ /dev/null
@@ -1,779 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tubemq.server.broker.msgstore.ssd;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.tubemq.corebase.TErrCodeConstants;
-import org.apache.tubemq.corebase.utils.ConcurrentHashSet;
-import org.apache.tubemq.corebase.utils.TStringUtils;
-import org.apache.tubemq.corebase.utils.ThreadUtils;
-import org.apache.tubemq.server.broker.BrokerConfig;
-import org.apache.tubemq.server.broker.exception.StartupException;
-import org.apache.tubemq.server.broker.metadata.MetadataManager;
-import org.apache.tubemq.server.broker.metadata.TopicMetadata;
-import org.apache.tubemq.server.broker.msgstore.MessageStoreManager;
-import org.apache.tubemq.server.broker.msgstore.disk.GetMessageResult;
-import org.apache.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
-import org.apache.tubemq.server.broker.utils.DataStoreUtils;
-import org.apache.tubemq.server.common.utils.FileUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/***
- * Message on ssd storage management. It mainly store message on ssd, and copy to disk.
- * Query messages in copied file, copy files if has not copied.
- * Expired files will be delete when exceed storage quota.
- */
-public class MsgSSDStoreManager implements Closeable {
-    private static final Logger logger = LoggerFactory.getLogger(MsgSSDStoreManager.class);
-    // file suffix
-    private static final String DATA_FILE_SUFFIX = ".tube";
-    // tube config
-    private final BrokerConfig tubeConfig;
-    private final String primStorePath;
-    private final String secdStorePath;
-    // ssd data directory
-    private final File ssdBaseDataDir;
-    private final BlockingQueue<SSDSegEvent> reqSSDEvents
-            = new ArrayBlockingQueue<>(60);
-    private final ExecutorService statusCheckExecutor = Executors.newSingleThreadExecutor();
-    private final ExecutorService reqExecutor = Executors.newSingleThreadExecutor();
-    // total ssd files size
-    private final AtomicLong totalSsdFileSize = new AtomicLong(0L);
-    // total ssd file count
-    private final AtomicInteger totalSsdFileCnt = new AtomicInteger(0);
-    private final MessageStoreManager msgStoreMgr;
-    private volatile boolean closed = false;
-    private volatile boolean isStart = true;
-    private AtomicInteger firstChecked = new AtomicInteger(3);
-    // ssd segments
-    private ConcurrentHashMap<String, ConcurrentHashMap<Long, MsgSSDSegment>> ssdSegmentsMap =
-            new ConcurrentHashMap<>(60);
-    private ConcurrentHashMap<String, ConcurrentHashSet<SSDSegIndex>> ssdPartStrMap =
-            new ConcurrentHashMap<>(60);
-    private long startTime = System.currentTimeMillis();
-    private long lastCheckTime = System.currentTimeMillis();
-
-
-    public MsgSSDStoreManager(final MessageStoreManager msgStoreMgr,
-                              final BrokerConfig tubeConfig) throws IOException {
-        this.tubeConfig = tubeConfig;
-        this.msgStoreMgr = msgStoreMgr;
-        this.primStorePath = this.tubeConfig.getPrimaryPath();
-        this.secdStorePath = TStringUtils.isBlank(this.tubeConfig.getSecondDataPath())
-                ? null : this.tubeConfig.getSecondDataPath();
-        if (TStringUtils.isBlank(this.secdStorePath)
-                || this.primStorePath.equals(this.secdStorePath)) {
-            this.isStart = false;
-            this.ssdBaseDataDir = null;
-            logger.info("[SSD Manager] The tube not configure SSD directory, SSD-Store function not start!");
-            return;
-        }
-        this.ssdBaseDataDir = new File(this.secdStorePath);
-        FileUtil.checkDir(this.ssdBaseDataDir);
-        startTime = System.currentTimeMillis();
-    }
-
-    /***
-     * Load ssd stores.
-     *
-     * @throws IOException
-     */
-    public void loadSSDStores() throws IOException {
-        try {
-            this.loadDataDir(tubeConfig);
-            this.statusCheckExecutor.execute(new Runnable() {
-                @Override
-                public void run() {
-                    final StringBuilder strBuffer = new StringBuilder(512);
-                    while (!closed) {
-                        try {
-                            flushCurrSegments(strBuffer);
-                        } catch (Throwable e) {
-                            logger.error("[SSD Manager] Error during SSD File status check", e);
-                        }
-                        ThreadUtils.sleep(30000);
-                    }
-                    logger.warn("[SSD Manager] SSD File scan thread is out!");
-                }
-            });
-
-            this.reqExecutor.execute(new SsdStoreRunner());
-        } catch (final IOException e) {
-            logger.error("[SSD Manager] load SSD data files failed", e);
-            throw new StartupException("Initialize SSD data files failed", e);
-        } catch (Throwable e) {
-            Thread.currentThread().interrupt();
-        }
-    }
-
-    public boolean isSsdServiceInUse() {
-        return (this.isStart && !this.closed);
-    }
-
-    /***
-     * Request ssd transfer to disk.
-     *
-     * @param partStr
-     * @param storeKey
-     * @param startOffset
-     * @param segCnt
-     * @return
-     */
-    public boolean requestSsdTransfer(final String partStr, final String storeKey,
-                                      final long startOffset, final int segCnt) {
-        if (this.isStart && !this.closed) {
-            try {
-                this.reqSSDEvents.put(new SSDSegEvent(partStr, storeKey, startOffset, segCnt));
-                return true;
-            } catch (Throwable e1) {
-                logger.warn("[SSD Store] request SSD Event failure : ", e1);
-            }
-        }
-        return false;
-    }
-
-    /***
-     * Get messages from ssd.
-     *
-     * @param storeKey
-     * @param partStr
-     * @param startDataOffset
-     * @param lastRDOffset
-     * @param partitionId
-     * @param reqOffset
-     * @param indexBuffer
-     * @param msgDataSizeLimit
-     * @param statisKeyBase
-     * @return
-     */
-    public GetMessageResult getMessages(final String storeKey, final String partStr,
-                                        final long startDataOffset, final long lastRDOffset,
-                                        final int partitionId, final long reqOffset,
-                                        final ByteBuffer indexBuffer, int msgDataSizeLimit,
-                                        final String statisKeyBase) {
-        final StringBuilder strBuffer = new StringBuilder(512);
-        ConcurrentHashMap<Long, MsgSSDSegment> msgSSDSegmentMap =
-                ssdSegmentsMap.get(storeKey);
-        Map<String, ConsumerNodeInfo> consumerNodeInfoMap =
-                msgStoreMgr.getTubeBroker().getBrokerServiceServer().getConsumerRegisterMap();
-        ConsumerNodeInfo consumerNodeInfo = consumerNodeInfoMap.get(partStr);
-        if (consumerNodeInfo == null) {
-            return new GetMessageResult(false, TErrCodeConstants.INTERNAL_SERVER_ERROR,
-                    reqOffset, 0, "Consumer unregistered!");
-        }
-        if (msgSSDSegmentMap == null) {
-            logger.warn(strBuffer.append("[SSD Store] Found consumer visit but file expired! storeKey=")
-                    .append(storeKey).toString());
-            consumerNodeInfo.resetSSDProcSeg(false);
-            return new GetMessageResult(false,
-                    TErrCodeConstants.INTERNAL_SERVER_ERROR, reqOffset, 0, "SSD file has expired!");
-        }
-        MsgSSDSegment msgSsdSegment =
-                msgSSDSegmentMap.get(startDataOffset);
-        if (msgSsdSegment == null) {
-            consumerNodeInfo.resetSSDProcSeg(false);
-            logger.warn(strBuffer
-                    .append("[SSD Store] Found consumer visit but file expired! storeKey=")
-                    .append(storeKey).append(",startDataOffset=")
-                    .append(startDataOffset).toString());
-            return new GetMessageResult(false,
-                    TErrCodeConstants.INTERNAL_SERVER_ERROR, reqOffset, 0, "SSD file has expired!");
-        }
-        GetMessageResult getMessageResult =
-                msgSsdSegment.getMessages(partStr, partitionId, reqOffset, lastRDOffset,
-                        indexBuffer, false, null, statisKeyBase, msgDataSizeLimit, strBuffer);
-        if (getMessageResult.retCode == TErrCodeConstants.INTERNAL_SERVER_ERROR_MSGSET_NULL) {
-            consumerNodeInfo.resetSSDProcSeg(false);
-            try {
-                msgSsdSegment.close();
-                msgSSDSegmentMap.remove(startDataOffset);
-                logger.warn(strBuffer
-                        .append("[SSD Store] Found SSD file's fileRecordView is null release! storeKey=")
-                        .append(storeKey).append(",startDataOffset=")
-                        .append(startDataOffset).toString());
-            } catch (Throwable ee) {
-                strBuffer.delete(0, strBuffer.length());
-                logger.warn(strBuffer.append("[SSD Store] release SSD FileSegment failure, storeKey=")
-                        .append(storeKey).append(",startDataOffset=")
-                        .append(startDataOffset).toString(), ee);
-            }
-        }
-        if (consumerNodeInfo.getLastDataRdOffset() >= msgSsdSegment.getDataMaxOffset()) {
-            msgSsdSegment.closeConsumeRel(partStr);
-            consumerNodeInfo.resetSSDProcSeg(false);
-        }
-        return getMessageResult;
-    }
-
-    @Override
-    public void close() {
-        this.closed = true;
-        try {
-            this.reqExecutor.shutdown();
-            this.statusCheckExecutor.shutdown();
-            for (Map<Long, MsgSSDSegment> msgSSDSegmentMap : ssdSegmentsMap.values()) {
-                if (msgSSDSegmentMap != null) {
-                    for (MsgSSDSegment msgSsdSegment : msgSSDSegmentMap.values()) {
-                        if (msgSsdSegment != null) {
-                            msgSsdSegment.close();
-                        }
-                    }
-                }
-            }
-        } catch (Throwable e1) {
-            logger.warn("[SSD Store] Close SSD Store manager failure", e1);
-        }
-    }
-
-    private boolean flushCurrSegments(final StringBuilder sb) throws IOException {
-        int totalCnt = 0;
-        long currTime = System.currentTimeMillis();
-        if ((firstChecked.get() > 0 && currTime - startTime <= 2 * 60 * 1000)
-                || (firstChecked.get() == 0
-                && totalSsdFileCnt.get() < (int) (tubeConfig.getMaxSSDTotalFileCnt() * 0.7)
-                && totalSsdFileSize.get() < (long) (tubeConfig.getMaxSSDTotalFileSizes() * 0.7))) {
-            return true;
-        }
-        Set<SSDSegIndex> msgSsdIndexSet = new HashSet<>();
-        for (Map.Entry<String, ConcurrentHashMap<Long, MsgSSDSegment>> entry : ssdSegmentsMap.entrySet()) {
-            if (entry.getKey() == null || entry.getValue() == null) {
-                continue;
-            }
-            ConcurrentHashMap<Long, MsgSSDSegment> msgSegmentMap = entry.getValue();
-            for (Map.Entry<Long, MsgSSDSegment> subEntry : msgSegmentMap.entrySet()) {
-                if (subEntry.getValue() != null && !subEntry.getValue().isSegmentValid()) {
-                    msgSsdIndexSet.add(new SSDSegIndex(entry.getKey(),
-                            subEntry.getValue().getDataSegment().getStart(),
-                            subEntry.getValue().getDataMaxOffset()));
-                }
-            }
-        }
-        if (msgSsdIndexSet.isEmpty()) {
-            if (firstChecked.get() > 0) {
-                firstChecked.set(0);
-            }
-            return true;
-        }
-        for (SSDSegIndex ssdSegIndex : msgSsdIndexSet) {
-            ConcurrentHashMap<Long, MsgSSDSegment> tmpMsgSSdSegmentMap =
-                    ssdSegmentsMap.get(ssdSegIndex.storeKey);
-            if (tmpMsgSSdSegmentMap == null) {
-                continue;
-            }
-            MsgSSDSegment msgSsdSegment =
-                    tmpMsgSSdSegmentMap.get(ssdSegIndex.startOffset);
-            if (msgSsdSegment == null) {
-                continue;
-            }
-            if (msgSsdSegment.setAndGetSegmentExpired()) {
-                inProcessExpiredFile(msgSsdSegment, ssdSegIndex, tmpMsgSSdSegmentMap, sb);
-            } else {
-                inProcessInUseFile(msgSsdSegment);
-            }
-        }
-        if (firstChecked.get() > 0) {
-            totalCnt = firstChecked.get();
-            firstChecked.compareAndSet(totalCnt, totalCnt - 1);
-        }
-        return true;
-    }
-
-    private void inProcessInUseFile(MsgSSDSegment msgSsdSegment) {
-        if (msgSsdSegment.getCurStatus() != 1) {
-            return;
-        }
-        try {
-            Map<String, ConsumerNodeInfo> consumerNodeInfoMap =
-                    msgStoreMgr.getTubeBroker().getBrokerServiceServer().getConsumerRegisterMap();
-            ConcurrentHashMap<String, SSDVisitInfo> ssdVisitInfoMap =
-                    msgSsdSegment.getVisitMap();
-            if (ssdVisitInfoMap != null) {
-                List<String> rmvPartStrList = new ArrayList<>();
-                for (String partStr : ssdVisitInfoMap.keySet()) {
-                    ConsumerNodeInfo consumerNodeInfo =
-                            consumerNodeInfoMap.get(partStr);
-                    if (consumerNodeInfo == null) {
-                        rmvPartStrList.add(partStr);
-                        continue;
-                    }
-                    if (msgSsdSegment
-                            .getOffsetPos(consumerNodeInfo.getLastDataRdOffset()) > 0) {
-                        consumerNodeInfo.resetSSDProcSeg(true);
-                        rmvPartStrList.add(partStr);
-                    }
-                }
-                if (!rmvPartStrList.isEmpty()) {
-                    for (String partStr : rmvPartStrList) {
-                        msgSsdSegment.closeConsumeRel(partStr);
-                    }
-                }
-            }
-        } catch (Throwable e2) {
-            logger.warn("[SSD Manager] Check consumerInfo and SSD file relation failure", e2);
-        }
-    }
-
-    /***
-     * Delete expired files.
-     *
-     * @param msgSsdSegment
-     * @param ssdSegIndex
-     * @param msgSsdSegmentMap
-     * @param strBuffer
-     */
-    private void inProcessExpiredFile(MsgSSDSegment msgSsdSegment, SSDSegIndex ssdSegIndex,
-                                      Map<Long, MsgSSDSegment> msgSsdSegmentMap, StringBuilder strBuffer) {
-        try {
-            File file = msgSsdSegment.getDataSegment().getFile();
-            logger.info(strBuffer.append("[SSD Manager] delete ssd segment : ")
-                    .append(file.getAbsolutePath()).toString());
-            strBuffer.delete(0, strBuffer.length());
-            final long msgSize = msgSsdSegment.getDataSegment().getCachedSize();
-            for (Map.Entry<String, ConcurrentHashSet<SSDSegIndex>> entry : ssdPartStrMap.entrySet()) {
-                if (entry.getValue() != null) {
-                    entry.getValue().remove(ssdSegIndex);
-                }
-            }
-            msgSsdSegmentMap.remove(ssdSegIndex.startOffset);
-            msgSsdSegment.close();
-            file.delete();
-            int exptCnt = totalSsdFileCnt.get();
-            while (!totalSsdFileCnt.compareAndSet(exptCnt, exptCnt - 1)) {
-                exptCnt = totalSsdFileCnt.get();
-            }
-            long exptSize = totalSsdFileSize.get();
-            while (!totalSsdFileSize.compareAndSet(exptSize, exptSize - msgSize)) {
-                exptSize = totalSsdFileSize.get();
-            }
-            logger.info(strBuffer.append("[SSD Manager] rmv SSD FileSegment finished, totalSsdFileCnt=")
-                    .append(totalSsdFileCnt.get()).append(",totalSsdFileSize=")
-                    .append(totalSsdFileSize.get()).toString());
-            strBuffer.delete(0, strBuffer.length());
-        } catch (Throwable e1) {
-            logger.warn("[SSD Manager] Delete SSD expired file failure", e1);
-        }
-    }
-
-    /***
-     * Load data from directory.
-     *
-     * @param tubeConfig
-     * @throws IOException
-     */
-    private void loadDataDir(final BrokerConfig tubeConfig) throws IOException {
-        StringBuilder strBuffer = new StringBuilder(512);
-        final long startTime = System.currentTimeMillis();
-        logger.info(strBuffer.append("[SSD Manager] Begin to scan data path:")
-                .append(this.ssdBaseDataDir.getAbsolutePath()).toString());
-        strBuffer.delete(0, strBuffer.length());
-        MetadataManager metadataManager = msgStoreMgr.getMetadataManager();
-        final File[] ls = this.ssdBaseDataDir.listFiles();
-        if (ls != null) {
-            for (final File subDir : ls) {
-                if (subDir == null) {
-                    continue;
-                }
-                if (!subDir.isDirectory()) {
-                    logger.warn(strBuffer.append("[SSD Manager] Ignore not directory path:")
-                            .append(subDir.getAbsolutePath()).toString());
-                    strBuffer.delete(0, strBuffer.length());
-                } else {
-                    final String name = subDir.getName();
-                    final int index = name.lastIndexOf('-');
-                    if (index < 0) {
-                        logger.warn(strBuffer.append("[SSD Manager] Ignore invalid directory:")
-                                .append(subDir.getAbsolutePath()).toString());
-                        strBuffer.delete(0, strBuffer.length());
-                        continue;
-                    }
-                    final String topic = name.substring(0, index);
-                    TopicMetadata topicMetadata = metadataManager.getTopicMetadata(topic);
-                    if (topicMetadata == null) {
-                        logger.warn(strBuffer
-                                .append("[SSD Manager] No valid topic config for topic data directories:")
-                                .append(topic).toString());
-                        strBuffer.delete(0, strBuffer.length());
-                        continue;
-                    }
-                    final int storeId = Integer.parseInt(name.substring(index + 1));
-                    final String storeKey =
-                            strBuffer.append(topic).append("-").append(storeId).toString();
-                    strBuffer.delete(0, strBuffer.length());
-                    loadSsdFile(storeKey, topic, strBuffer);
-                }
-            }
-        }
-        logger.info(strBuffer.append("[SSD Manager] End to scan SSD data path in ")
-                .append((System.currentTimeMillis() - startTime) / 1000)
-                .append(" secs").toString());
-    }
-
-    /***
-     * Copy files on disk to ssd.
-     *
-     * @param storeKey
-     * @param topic
-     * @param partStr
-     * @param fromFile
-     * @param startOffset
-     * @param endOffset
-     * @param sb
-     * @return
-     */
-    private boolean copyFileToSSD(final String storeKey, final String topic,
-                                  final String partStr, File fromFile,
-                                  final long startOffset, final long endOffset,
-                                  final StringBuilder sb) {
-        // #lizard forgives
-        logger.info(sb.append("[SSD Manager] Begin to copy file to SSD, source data path:")
-                .append(fromFile.getAbsolutePath()).toString());
-        sb.delete(0, sb.length());
-        if (!fromFile.exists()) {
-            logger.warn(sb.append("[SSD Manager] source file not exists, path:")
-                    .append(fromFile.getAbsolutePath()).toString());
-            sb.delete(0, sb.length());
-            return false;
-        }
-        if (!fromFile.isFile()) {
-            logger.warn(sb.append("[SSD Manager] source file not File, path:")
-                    .append(fromFile.getAbsolutePath()).toString());
-            sb.delete(0, sb.length());
-            return false;
-        }
-        if (!fromFile.canRead()) {
-            logger.warn(sb.append("[SSD Manager] source file not canRead, path:")
-                    .append(fromFile.getAbsolutePath()).toString());
-            sb.delete(0, sb.length());
-            return false;
-        }
-        boolean isNew = false;
-        final String filename = fromFile.getName();
-        final long start =
-                Long.parseLong(filename.substring(0, filename.length() - DATA_FILE_SUFFIX.length()));
-        final File toFileDir =
-                new File(sb.append(secdStorePath)
-                        .append(File.separator).append(storeKey).toString());
-        sb.delete(0, sb.length());
-        FileUtil.checkDir(toFileDir);
-        ConcurrentHashMap<Long, MsgSSDSegment> msgSsdSegMap =
-                ssdSegmentsMap.get(storeKey);
-        if (msgSsdSegMap == null) {
-            ConcurrentHashMap<Long, MsgSSDSegment> tmpMsgSsdSegMap =
-                    new ConcurrentHashMap<>();
-            msgSsdSegMap = ssdSegmentsMap.putIfAbsent(storeKey, tmpMsgSsdSegMap);
-            if (msgSsdSegMap == null) {
-                msgSsdSegMap = tmpMsgSsdSegMap;
-            }
-        }
-        MsgSSDSegment msgSsdSegment1 = msgSsdSegMap.get(start);
-        if (msgSsdSegment1 == null) {
-            boolean isSuccess = false;
-            FileInputStream fosfrom = null;
-            FileOutputStream fosto = null;
-            final File targetFile =
-                    new File(toFileDir, DataStoreUtils.nameFromOffset(start, DATA_FILE_SUFFIX));
-            try {
-                fosfrom = new FileInputStream(fromFile);
-                fosto = new FileOutputStream(targetFile);
-                byte[] bt = new byte[65536];
-                int c;
-                while ((c = fosfrom.read(bt)) > 0) {
-                    fosto.write(bt, 0, c);
-                }
-                isSuccess = true;
-                isNew = true;
-            } catch (FileNotFoundException e) {
-                logger.warn("[SSD Manager] copy File to SSD FileNotFoundException failure ", e);
-            } catch (IOException e) {
-                logger.warn("[SSD Manager] copy File to SSD IOException failure ", e);
-            } finally {
-                if (fosfrom != null) {
-                    try {
-                        fosfrom.close();
-                    } catch (Throwable e2) {
-                        logger.warn("[SSD Manager] Close FileInputStream failure! ", e2);
-                    }
-                }
-                if (fosto != null) {
-                    try {
-                        fosto.close();
-                    } catch (Throwable e2) {
-                        logger.warn("[SSD Manager] Close FileOutputStream failure! ", e2);
-                    }
-                }
-            }
-            if (!isSuccess) {
-                try {
-                    targetFile.delete();
-                } catch (Throwable e2) {
-                    logger.warn("[SSD Manager] remove SSD target file failure ", e2);
-                }
-                return false;
-            }
-            try {
-                MsgSSDSegment tmpMsgSsdSegment =
-                        new MsgSSDSegment(storeKey, topic, start, targetFile);
-                if (tmpMsgSsdSegment.getDataMinOffset() != startOffset
-                        || tmpMsgSsdSegment.getDataMaxOffset() != endOffset) {
-                    logger.warn(sb
-                            .append("[SSD Manager] created SSD file getCachedSize not equal source: sourceFile=")
-                            .append(fromFile.getAbsolutePath()).append(",srcStartOffset=")
-                            .append(startOffset).append(",srcEndOffset=").append(endOffset)
-                            .append(",dstStartOffset=").append(tmpMsgSsdSegment.getDataMinOffset())
-                            .append(",dstEndOffset=").append(tmpMsgSsdSegment.getDataMaxOffset()).toString());
-                    sb.delete(0, sb.length());
-                    tmpMsgSsdSegment.close();
-                    return false;
-                }
-                msgSsdSegment1 = msgSsdSegMap.putIfAbsent(start, tmpMsgSsdSegment);
-                if (msgSsdSegment1 == null) {
-                    msgSsdSegment1 = tmpMsgSsdSegment;
-                    this.totalSsdFileSize.addAndGet(tmpMsgSsdSegment.getDataSizeInBytes());
-                    this.totalSsdFileCnt.incrementAndGet();
-                } else {
-                    tmpMsgSsdSegment.close();
-                }
-            } catch (Throwable e1) {
-                logger.warn("[SSD Manager] create SSD FileSegment failure", e1);
-                return false;
-            }
-        }
-        ConcurrentHashSet<SSDSegIndex> ssdSegIndices = this.ssdPartStrMap.get(partStr);
-        if (ssdSegIndices == null) {
-            ConcurrentHashSet<SSDSegIndex> tmpSsdSegIndices = new ConcurrentHashSet<>();
-            ssdSegIndices = this.ssdPartStrMap.putIfAbsent(partStr, tmpSsdSegIndices);
-            if (ssdSegIndices == null) {
-                ssdSegIndices = tmpSsdSegIndices;
-            }
-        }
-        SSDSegIndex ssdSegIndex =
-                new SSDSegIndex(storeKey, start, msgSsdSegment1.getDataMaxOffset());
-        if (!ssdSegIndices.contains(ssdSegIndex)) {
-            ssdSegIndices.add(ssdSegIndex);
-        }
-        logger.info(sb.append("[SSD Manager] copy file to SSD success, data segment ")
-                .append(msgSsdSegment1.getStoreKey()).append(",start=").append(start)
-                .append(", isNew=").append(isNew).toString());
-        sb.delete(0, sb.length());
-        return true;
-    }
-
-    private void checkSegmentTransferStatus(final String topicName, final SSDSegEvent ssdSegEvent,
-                                            final SSDSegFound ssdSegFound, final ConsumerNodeInfo consumerNodeInfo,
-                                            final StringBuilder strBuffer) {
-        consumerNodeInfo.setSSDProcing();
-        ConcurrentHashMap<Long, MsgSSDSegment> msgSegmentMap =
-                ssdSegmentsMap.get(ssdSegEvent.storeKey);
-        if (msgSegmentMap == null) {
-            ConcurrentHashMap<Long, MsgSSDSegment> tmpMsgSsdSegMap =
-                    new ConcurrentHashMap<>();
-            msgSegmentMap =
-                    ssdSegmentsMap.putIfAbsent(ssdSegEvent.storeKey, tmpMsgSsdSegMap);
-            if (msgSegmentMap == null) {
-                msgSegmentMap = tmpMsgSsdSegMap;
-            }
-        }
-        MsgSSDSegment curMsgSsdSegment =
-                msgSegmentMap.get(ssdSegFound.startOffset);
-        if (curMsgSsdSegment != null) {
-            curMsgSsdSegment.setInUse();
-            ConcurrentHashSet<SSDSegIndex> ssdSegIndices =
-                    ssdPartStrMap.get(ssdSegEvent.partStr);
-            if (ssdSegIndices == null) {
-                ConcurrentHashSet<SSDSegIndex> tmpSsdSegIndices =
-                        new ConcurrentHashSet<>();
-                ssdSegIndices =
-                        ssdPartStrMap.putIfAbsent(ssdSegEvent.partStr, tmpSsdSegIndices);
-                if (ssdSegIndices == null) {
-                    ssdSegIndices = tmpSsdSegIndices;
-                }
-            }
-            ssdSegIndices.add(new SSDSegIndex(ssdSegEvent.storeKey,
-                    ssdSegFound.startOffset, ssdSegFound.endOffset));
-            consumerNodeInfo.setSSDTransferFinished(true,
-                    ssdSegFound.startOffset, ssdSegFound.endOffset);
-        } else {
-            if (totalSsdFileCnt.get() >= tubeConfig.getMaxSSDTotalFileCnt()
-                    || totalSsdFileSize.get() >= tubeConfig.getMaxSSDTotalFileSizes()) {
-                consumerNodeInfo.setSSDTransferFinished(false, -2, -2);
-                return;
-            }
-            boolean result = copyFileToSSD(ssdSegEvent.storeKey, topicName,
-                    ssdSegEvent.partStr, ssdSegFound.sourceFile,
-                    ssdSegFound.startOffset, ssdSegFound.endOffset, strBuffer);
-            if (result) {
-                consumerNodeInfo.setSSDTransferFinished(true,
-                        ssdSegFound.startOffset, ssdSegFound.endOffset);
-            } else {
-                consumerNodeInfo.setSSDTransferFinished(false,
-                        ssdSegFound.startOffset, ssdSegFound.endOffset);
-            }
-        }
-    }
-
-    private void loadSsdFile(final String storeKey, final String topicName,
-                             final StringBuilder strBuffer) throws IOException {
-        final File storeKeydataDir =
-                new File(strBuffer.append(this.secdStorePath)
-                        .append(File.separator).append(storeKey).toString());
-        strBuffer.delete(0, strBuffer.length());
-        final File[] lsData = storeKeydataDir.listFiles();
-        if (lsData == null) {
-            return;
-        }
-        for (final File fileData : lsData) {
-            if (fileData != null
-                    && fileData.isFile()
-                    && fileData.toString().endsWith(DATA_FILE_SUFFIX)) {
-                if (!fileData.canRead()) {
-                    throw new IOException(new StringBuilder(512)
-                            .append("[SSD Manager] Could not read data file ")
-                            .append(fileData).toString());
-                }
-                final String filename = fileData.getName();
-                final long start =
-                        Long.parseLong(filename.substring(0,
-                                filename.length() - DATA_FILE_SUFFIX.length()));
-                ConcurrentHashMap<Long, MsgSSDSegment> msgSsdSegMap =
-                        this.ssdSegmentsMap.get(storeKey);
-                if (msgSsdSegMap == null) {
-                    ConcurrentHashMap<Long, MsgSSDSegment> tmpMsgSsdSegMap =
-                            new ConcurrentHashMap<>();
-                    msgSsdSegMap =
-                            ssdSegmentsMap.putIfAbsent(storeKey, tmpMsgSsdSegMap);
-                    if (msgSsdSegMap == null) {
-                        msgSsdSegMap = tmpMsgSsdSegMap;
-                    }
-                }
-                MsgSSDSegment msgSsdSegment1 = msgSsdSegMap.get(start);
-                if (msgSsdSegment1 == null) {
-                    MsgSSDSegment tmpMsgSsdSegment =
-                            new MsgSSDSegment(storeKey, topicName, start, fileData);
-                    msgSsdSegment1 =
-                            msgSsdSegMap.putIfAbsent(start, tmpMsgSsdSegment);
-                    if (msgSsdSegment1 == null) {
-                        totalSsdFileSize
-                                .addAndGet(tmpMsgSsdSegment.getDataSizeInBytes());
-                        totalSsdFileCnt.incrementAndGet();
-                    } else {
-                        tmpMsgSsdSegment.close();
-                    }
-                }
-                logger.info(strBuffer.append("[SSD Manager] Loaded data segment ")
-                        .append(fileData.getAbsolutePath()).toString());
-                strBuffer.delete(0, strBuffer.length());
-            }
-        }
-    }
-
-    private class SsdStoreRunner implements Runnable {
-
-        public SsdStoreRunner() {
-            //
-        }
-
-        @Override
-        public void run() {
-            final StringBuilder strBuffer = new StringBuilder(512);
-            logger.info("[SSD Manager] start process SSD  transfer requests");
-            try {
-                MetadataManager metadataManager = msgStoreMgr.getMetadataManager();
-                while (!closed) {
-                    try {
-                        SSDSegEvent ssdSegEvent =
-                                reqSSDEvents.poll(500, TimeUnit.MILLISECONDS);
-                        if (ssdSegEvent == null) {
-                            ThreadUtils.sleep(1000);
-                            continue;
-                        }
-                        Map<String, ConsumerNodeInfo> consumerNodeInfoMap =
-                                msgStoreMgr.getTubeBroker().getBrokerServiceServer().getConsumerRegisterMap();
-                        ConsumerNodeInfo consumerNodeInfo =
-                                consumerNodeInfoMap.get(ssdSegEvent.partStr);
-                        if (consumerNodeInfo == null) {
-                            continue;
-                        }
-                        final int index = ssdSegEvent.storeKey.lastIndexOf('-');
-                        if (index < 0) {
-                            logger.warn(strBuffer.append("[SSD Manager] Ignore invalid storeKey=")
-                                    .append(ssdSegEvent.storeKey).toString());
-                            strBuffer.delete(0, strBuffer.length());
-                            continue;
-                        }
-                        final String topic = ssdSegEvent.storeKey.substring(0, index);
-                        TopicMetadata topicMetadata =
-                                metadataManager.getTopicMetadata(topic);
-                        if (topicMetadata == null) {
-                            logger.warn(strBuffer
-                                    .append("[SSD Manager] No valid topic config for storeKey=")
-                                    .append(ssdSegEvent.storeKey).toString());
-                            strBuffer.delete(0, strBuffer.length());
-                            continue;
-                        }
-                        final int storeId =
-                                Integer.parseInt(ssdSegEvent.storeKey.substring(index + 1));
-                        SSDSegFound ssdSegFound =
-                                msgStoreMgr.getSourceSegment(topic, storeId, ssdSegEvent.startOffset, 15);
-                        if (!ssdSegFound.isSuccess) {
-                            if (ssdSegFound.reason > 0) {
-                                logger.warn(strBuffer
-                                        .append("[SSD Manager] Found source store error, storeKey=")
-                                        .append(ssdSegEvent.storeKey).append(", reason=")
-                                        .append(ssdSegFound.reason).toString());
-                                strBuffer.delete(0, strBuffer.length());
-                            }
-                            consumerNodeInfo.setSSDTransferFinished(false,
-                                    ssdSegFound.startOffset, ssdSegFound.endOffset);
-                            continue;
-                        }
-                        checkSegmentTransferStatus(topic, ssdSegEvent,
-                                ssdSegFound, consumerNodeInfo, strBuffer);
-                    } catch (InterruptedException e) {
-                        break;
-                    } catch (Throwable e2) {
-                        logger.error("[SSD Manager] process SSD  transfer request failure", e2);
-                    }
-                }
-                logger.info("[SSD Manager] stopped process SSD  transfer requests");
-            } catch (Throwable e) {
-                logger.error("[SSD Manager] Error during process SSD transfer requests", e);
-            }
-        }
-    }
-
-}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/SSDSegEvent.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/SSDSegEvent.java
deleted file mode 100644
index 1045d1e..0000000
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/SSDSegEvent.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tubemq.server.broker.msgstore.ssd;
-
-/***
- * Transfer files on disk to ssd event.
- */
-public class SSDSegEvent {
-    public final String storeKey;
-    public final String partStr;
-    public final long startOffset;
-    public final int segCnt;
-
-
-    public SSDSegEvent(final String partStr, final String storeKey,
-                       final long startOffset, final int segCnt) {
-        this.partStr = partStr;
-        this.storeKey = storeKey;
-        this.startOffset = startOffset;
-        this.segCnt = segCnt;
-    }
-
-    @Override
-    public String toString() {
-        return new StringBuilder(512).append("{storeKey=").append(storeKey)
-                .append(",partStr=").append(partStr).append(",startOffset=")
-                .append(startOffset).append(",segCnt=").append(segCnt)
-                .append("}").toString();
-    }
-
-
-}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/SSDSegFound.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/SSDSegFound.java
deleted file mode 100644
index 0e6d61b..0000000
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/SSDSegFound.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tubemq.server.broker.msgstore.ssd;
-
-import java.io.File;
-
-/***
- * SSD segment found result.
- */
-public class SSDSegFound {
-    public final File sourceFile;
-    public final long startOffset;
-    public final long endOffset;
-    public boolean isSuccess;
-    public int reason;
-
-    public SSDSegFound(boolean isSuccess, int reason, final File sourceFile) {
-        this.isSuccess = isSuccess;
-        this.reason = reason;
-        this.sourceFile = sourceFile;
-        this.startOffset = -2;
-        this.endOffset = -2;
-    }
-
-    public SSDSegFound(boolean isSuccess, int reason, final File sourceFile,
-                       final long startOffset, final long endOffset) {
-        this.isSuccess = isSuccess;
-        this.reason = reason;
-        this.sourceFile = sourceFile;
-        this.startOffset = startOffset;
-        this.endOffset = endOffset;
-    }
-
-}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/SSDSegIndex.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/SSDSegIndex.java
deleted file mode 100644
index a731596..0000000
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/SSDSegIndex.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tubemq.server.broker.msgstore.ssd;
-
-/**
- * SSD segment index.
- */
-public class SSDSegIndex {
-    public final String storeKey;
-    public final long startOffset;
-    public final long endOffset;
-
-    public SSDSegIndex(final String storeKey,
-                       final long startOffset,
-                       final long endOffset) {
-        this.storeKey = storeKey;
-        this.startOffset = startOffset;
-        this.endOffset = endOffset;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof SSDSegIndex)) {
-            return false;
-        }
-
-        SSDSegIndex that = (SSDSegIndex) o;
-        if (startOffset != that.startOffset) {
-            return false;
-        }
-        if (endOffset != that.endOffset) {
-            return false;
-        }
-        return storeKey.equals(that.storeKey);
-
-    }
-
-    @Override
-    public int hashCode() {
-        int result = storeKey.hashCode();
-        result = 31 * result + (int) (startOffset ^ (startOffset >>> 32));
-        result = 31 * result + (int) (endOffset ^ (endOffset >>> 32));
-        return result;
-    }
-
-}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/SSDVisitInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/SSDVisitInfo.java
deleted file mode 100644
index 6d06e42..0000000
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/SSDVisitInfo.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tubemq.server.broker.msgstore.ssd;
-
-/***
- * SSD visit record.
- */
-public class SSDVisitInfo {
-    public final String partStr;
-    public long lastOffset;
-    public long lastTime;
-
-    public SSDVisitInfo(final String partStr, final long lastOffset) {
-        this.partStr = partStr;
-        this.lastOffset = lastOffset;
-        this.lastTime = System.currentTimeMillis();
-    }
-
-    public void requestVisit(final long lastOffset) {
-        this.lastOffset = lastOffset;
-        this.lastTime = System.currentTimeMillis();
-    }
-
-    public void responseVisit(final long lastOffset) {
-        this.lastOffset = lastOffset;
-        this.lastTime = System.currentTimeMillis();
-    }
-
-}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
index a66f3af..3b05b06 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
@@ -19,13 +19,10 @@ package org.apache.tubemq.server.broker.nodeinfo;
 
 import java.util.HashSet;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.corebase.policies.FlowCtrlResult;
 import org.apache.tubemq.corebase.policies.FlowCtrlRuleHandler;
-import org.apache.tubemq.corebase.policies.SSDCtrlResult;
 import org.apache.tubemq.server.broker.msgstore.MessageStoreManager;
 import org.apache.tubemq.server.common.TServerConstants;
 
@@ -59,19 +56,7 @@ public class ConsumerNodeInfo {
     private long totalUnitMin = 0L;
     private FlowCtrlResult curFlowCtrlVal =
             new FlowCtrlResult(Long.MAX_VALUE, 0);
-    private SSDCtrlResult curSsdDltLimit =
-            new SSDCtrlResult(Long.MAX_VALUE, 0);
     private long nextLimitUpdateTime = 0;
-    private AtomicBoolean needSsdProc =
-            new AtomicBoolean(false);
-    private AtomicLong ssdTransId =
-            new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
-    // -2 : 未启用 0:已发起请求 1://已接收请求正在处理 2:已处理完可用 3:已处理不可用 4:已停止
-    private AtomicInteger ssdProcStatus =
-            new AtomicInteger(TBaseConstants.META_VALUE_UNDEFINED);
-    private long lastOpTime = 0;
-    private long startSsdDataOffset = -2;
-    private long endSsdDataOffset = -2;
     private AtomicInteger qryPriorityId =
             new AtomicInteger(TBaseConstants.META_VALUE_UNDEFINED);
     private long createTime = System.currentTimeMillis();
@@ -80,31 +65,15 @@ public class ConsumerNodeInfo {
     public ConsumerNodeInfo(final MessageStoreManager storeManager,
                             final String consumerId, Set<String> filterCodes,
                             final String sessionKey, long sessionTime, final String partStr) {
-        setConsumerId(consumerId);
-        if (filterCodes != null) {
-            for (String filterItem : filterCodes) {
-                this.filterCondStrs.add(filterItem);
-                this.filterCondCode.add(filterItem.hashCode());
-            }
-        }
-        this.sessionKey = sessionKey;
-        this.sessionTime = sessionTime;
-        this.isSupportLimit = false;
-        this.needSsdProc.set(false);
-        this.storeManager = storeManager;
-        this.partStr = partStr;
-        this.createTime = System.currentTimeMillis();
-        if (filterCodes != null && !filterCodes.isEmpty()) {
-            this.isFilterConsume = true;
-        }
-        this.ssdTransId.set(TBaseConstants.META_VALUE_UNDEFINED);
+        this(storeManager, TBaseConstants.META_VALUE_UNDEFINED, consumerId,
+            filterCodes, sessionKey, sessionTime, false, partStr);
     }
 
     public ConsumerNodeInfo(final MessageStoreManager storeManager,
                             final int qryPriorityId, final String consumerId,
                             Set<String> filterCodes, final String sessionKey,
-                            long sessionTime, long ssdTransId, boolean needSsdProc,
-                            boolean isSupportLimit, final String partStr) {
+                            long sessionTime, boolean isSupportLimit,
+                            final String partStr) {
         setConsumerId(consumerId);
         if (filterCodes != null) {
             for (String filterItem : filterCodes) {
@@ -115,14 +84,11 @@ public class ConsumerNodeInfo {
         this.sessionKey = sessionKey;
         this.sessionTime = sessionTime;
         this.qryPriorityId.set(qryPriorityId);
-        this.ssdTransId.set(ssdTransId);
-        this.needSsdProc.set(needSsdProc);
         this.storeManager = storeManager;
         this.partStr = partStr;
         this.createTime = System.currentTimeMillis();
         if (filterCodes != null && !filterCodes.isEmpty()) {
             this.isFilterConsume = true;
-            this.needSsdProc.set(false);
         }
         this.isSupportLimit = isSupportLimit;
     }
@@ -137,69 +103,6 @@ public class ConsumerNodeInfo {
             long currTime = System.currentTimeMillis();
             recalcMsgLimitValue(curDataDlt,
                     currTime, maxMsgTransferSize, flowCtrlRuleHandler);
-            if (storeManager.isSsdServiceStart()
-                    && needSsdProc.get()
-                    && (currTime - createTime > 2 * 60 * 1000)) {
-                // get message from ssd.
-                switch (ssdProcStatus.get()) {
-                    case TBaseConstants.META_VALUE_UNDEFINED:
-                    case 4:
-                        // Request ssd sink operation, when finish first fetch operation.
-                        if (curDataDlt >= curSsdDltLimit.dataStartDltInSize
-                                && currTime - this.lastOpTime > 20 * 1000) {
-                            if (this.storeManager.putSsdTransferReq(partStr,
-                                    storeKey, lastDataRdOffset)) {
-                                ssdProcStatus.set(0);
-                            }
-                            this.lastOpTime = System.currentTimeMillis();
-                        }
-                        break;
-                    case 0:
-                        // Request ssd sink operation, when exceed 2 minutes since last time.
-                        if (curDataDlt >= curSsdDltLimit.dataStartDltInSize
-                                && currTime - this.lastOpTime > 2 * 60 * 1000) {
-                            if (this.storeManager.putSsdTransferReq(partStr,
-                                    storeKey, lastDataRdOffset)) {
-                                ssdProcStatus.set(0);
-                            }
-                            this.lastOpTime = System.currentTimeMillis();
-                        }
-                        break;
-                    case 2:
-                        // Set read message size, after finish ssd sink operation.
-                        if (lastDataRdOffset >= startSsdDataOffset
-                                && lastDataRdOffset < endSsdDataOffset) {
-                            return this.sentUnit;
-                        }
-                        if (lastDataRdOffset >= endSsdDataOffset) {
-                            // Request ssd sink operation, after read operation.
-                            if (curDataDlt >= curSsdDltLimit.dataEndDLtInSz) {
-                                resetSSDProcSeg(false);
-                                if (this.storeManager.putSsdTransferReq(partStr,
-                                        storeKey, lastDataRdOffset)) {
-                                    ssdProcStatus.set(0);
-                                }
-                                this.lastOpTime = System.currentTimeMillis();
-                            } else {
-                                resetSSDProcSeg(true);
-                            }
-                        }
-                        break;
-                    case 3:
-                        // Request ssd sink operation, when has been conducted or occur error in response.
-                        if (curDataDlt >= curSsdDltLimit.dataEndDLtInSz
-                                && currTime - this.lastOpTime > 20 * 1000) {
-                            if (this.storeManager.putSsdTransferReq(partStr,
-                                    storeKey, lastDataRdOffset)) {
-                                ssdProcStatus.set(0);
-                            }
-                            this.lastOpTime = System.currentTimeMillis();
-                        }
-                        break;
-                    default:
-                        break;
-                }
-            }
             if (isEscFlowCtrl
                     || (totalUnitSec > sentMsgSize
                     && this.curFlowCtrlVal.dataLtInSize > totalUnitMin)) {
@@ -216,23 +119,10 @@ public class ConsumerNodeInfo {
         }
     }
 
-    public boolean processFromSsdFile() {
-        return (storeManager != null
-                && storeManager.isSsdServiceStart()
-                && needSsdProc.get()
-                && ssdProcStatus.get() == 2
-                && lastDataRdOffset >= startSsdDataOffset
-                && lastDataRdOffset < endSsdDataOffset);
-    }
-
     public String getPartStr() {
         return partStr;
     }
 
-    public long getStartSsdDataOffset() {
-        return startSsdDataOffset;
-    }
-
     public int getSentMsgSize() {
         return sentMsgSize;
     }
@@ -241,38 +131,6 @@ public class ConsumerNodeInfo {
         return isSupportLimit;
     }
 
-    public boolean getNeedSsdProc() {
-        return needSsdProc.get();
-    }
-
-    public void setNeedSsdProc(boolean needSsdProc) {
-        this.needSsdProc.set(needSsdProc);
-    }
-
-    public long getDataStartDltInM() {
-        return this.curSsdDltLimit.dataStartDltInSize / 1024 / 1024;
-    }
-
-    public long getSsdDataEndDltInM() {
-        return this.curSsdDltLimit.dataEndDLtInSz / 1024 / 1024;
-    }
-
-    public long getSsdTransId() {
-        return ssdTransId.get();
-    }
-
-    public void setSsdTransId(long ssdTransId, boolean needSSDProc) {
-        this.ssdTransId.set(ssdTransId);
-        this.needSsdProc.set(needSSDProc);
-    }
-
-    public void setSSDProcing() {
-        if (this.ssdProcStatus.get() == 0) {
-            this.ssdProcStatus.set(1);
-            this.lastOpTime = System.currentTimeMillis();
-        }
-    }
-
     public int getQryPriorityId() {
         return qryPriorityId.get();
     }
@@ -281,32 +139,6 @@ public class ConsumerNodeInfo {
         this.qryPriorityId.set(qryPriorityId);
     }
 
-    public void setSSDTransferFinished(boolean isUse,
-                                       final long startOffset,
-                                       final long endOffset) {
-        if (this.ssdProcStatus.get() == 1) {
-            if (isUse) {
-                this.ssdProcStatus.set(2);
-            } else {
-                this.ssdProcStatus.set(3);
-            }
-            this.startSsdDataOffset = startOffset;
-            this.endSsdDataOffset = endOffset;
-            this.lastOpTime = System.currentTimeMillis();
-        }
-    }
-
-    public void resetSSDProcSeg(boolean finished) {
-        if (finished) {
-            this.ssdProcStatus.set(4);
-        } else {
-            this.ssdProcStatus.set(3);
-        }
-        this.startSsdDataOffset = -2;
-        this.endSsdDataOffset = -2;
-        this.lastOpTime = System.currentTimeMillis();
-    }
-
     public long getNextStatTime() {
         return nextStatTime;
     }
@@ -410,9 +242,6 @@ public class ConsumerNodeInfo {
             if (this.curFlowCtrlVal == null) {
                 this.curFlowCtrlVal = new FlowCtrlResult(Long.MAX_VALUE, 0);
             }
-            if (storeManager.isSsdServiceStart() && needSsdProc.get()) {
-                this.curSsdDltLimit = flowCtrlRuleHandler.getCurSSDStartDltInSZ();
-            }
             currTime = System.currentTimeMillis();
             this.sentMsgSize = 0;
             this.totalUnitMin = 0;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
index eb3d586..05428fe 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -165,18 +165,14 @@ public class BrokerAdminServlet extends HttpServlet {
                         .append(regTime).append(",\"isFilterConsume\":")
                         .append(ifFilterConsume);
             }
-            sBuilder.append(",\"needSSDProc\":").append(entry.getValue().getNeedSsdProc())
-                    .append(",\"ssdTransId\":").append(entry.getValue().getSsdTransId())
-                    .append(",\"qryPriorityId\":").append(entry.getValue().getQryPriorityId())
+            sBuilder.append(",\"qryPriorityId\":").append(entry.getValue().getQryPriorityId())
                     .append(",\"curDataLimitInM\":").append(entry.getValue().getCurFlowCtrlLimitSize())
                     .append(",\"curFreqLimit\":").append(entry.getValue().getCurFlowCtrlFreqLimit())
                     .append(",\"totalSentSec\":").append(entry.getValue().getSentMsgSize())
                     .append(",\"isSupportLimit\":").append(entry.getValue().isSupportLimit())
                     .append(",\"sentUnitSec\":").append(entry.getValue().getTotalUnitSec())
                     .append(",\"totalSentMin\":").append(entry.getValue().getTotalUnitMin())
-                    .append(",\"sentUnit\":").append(entry.getValue().getSentUnit())
-                    .append(",\"SSDDataDltStartInM\":").append(entry.getValue().getDataStartDltInM())
-                    .append(",\"SSDDataDltEndInM\":").append(entry.getValue().getSsdDataEndDltInM());
+                    .append(",\"sentUnit\":").append(entry.getValue().getSentUnit());
             MessageStoreManager storeManager = broker.getStoreManager();
             OffsetService offsetService = broker.getOffsetManager();
             MessageStore store = null;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
index 0554098..0d93ec7 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
@@ -551,8 +551,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                 ? request.getSessionTime() : System.currentTimeMillis();
         int sourceCount = request.hasTotalCount()
                 ? request.getTotalCount() : -1;
-        long ssdStoreId = request.hasSsdStoreId()
-                ? request.getSsdStoreId() : TBaseConstants.META_VALUE_UNDEFINED;
         int reqQryPriorityId = request.hasQryPriorityId()
                 ? request.getQryPriorityId() : TBaseConstants.META_VALUE_UNDEFINED;
         boolean isSelectBig = (!request.hasSelectBig() || request.getSelectBig());
@@ -677,12 +675,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                 if (request.getGroupFlowCheckId() != bdbGroupFlowCtrlEntity.getSerialId()) {
                     builder.setGroupFlowControlInfo(bdbGroupFlowCtrlEntity.getFlowCtrlInfo());
                 }
-                if (bdbGroupFlowCtrlEntity.isNeedSSDProc()
-                        && defFlowCtrlEntity != null
-                        && defFlowCtrlEntity.isValidStatus()
-                        && defFlowCtrlEntity.isNeedSSDProc()) {
-                    builder.setSsdStoreId(defFlowCtrlEntity.getSsdTranslateId());
-                }
             }
         }
         builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false));
@@ -859,12 +851,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                 if (request.getGroupFlowCheckId() != bdbGroupFlowCtrlEntity.getSerialId()) {
                     builder.setGroupFlowControlInfo(bdbGroupFlowCtrlEntity.getFlowCtrlInfo());
                 }
-                if (bdbGroupFlowCtrlEntity.isNeedSSDProc()
-                        && defFlowCtrlEntity != null
-                        && defFlowCtrlEntity.isValidStatus()
-                        && defFlowCtrlEntity.isNeedSSDProc()) {
-                    builder.setSsdStoreId(defFlowCtrlEntity.getSsdTranslateId());
-                }
             }
         }
         builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false));
@@ -1009,8 +995,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         boolean needFastStart = false;
         final long reFlowCtrlId = request.hasFlowCheckId()
                 ? request.getFlowCheckId() : TBaseConstants.META_VALUE_UNDEFINED;
-        final long reqSsdTranslateId = request.hasSsdStoreId()
-                ? request.getSsdStoreId() : TBaseConstants.META_VALUE_UNDEFINED;
         final int qryPriorityId = request.hasQryPriorityId()
                 ? request.getQryPriorityId() : TBaseConstants.META_VALUE_UNDEFINED;
         ConcurrentHashMap<Integer, BrokerSyncStatusInfo> brokerSyncStatusMap =
@@ -1050,7 +1034,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
             .append(",isTlsEnable=").append(brokerInfo.isEnableTLS())
             .append(",TLSport=").append(brokerInfo.getTlsPort())
             .append(",FlowCtrlId=").append(reFlowCtrlId)
-            .append(",ssdTranslateId=").append(reqSsdTranslateId)
             .append(",qryPriorityId=").append(qryPriorityId)
             .append(",checksumId=").append(request.getConfCheckSumId()).toString());
         strBuffer.delete(0, strBuffer.length());
@@ -1077,11 +1060,11 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         builder.setConfCheckSumId(brokerStatusInfo.getLastPushBrokerCheckSumId());
         builder.setBrokerDefaultConfInfo(brokerStatusInfo.getLastPushBrokerDefaultConfInfo());
         builder.addAllBrokerTopicSetConfInfo(brokerStatusInfo.getLastPushBrokerTopicSetConfInfo());
+        builder.setSsdStoreId(TBaseConstants.META_VALUE_UNDEFINED);
         if (request.hasFlowCheckId()) {
             BdbGroupFlowCtrlEntity bdbGroupFlowCtrlEntity =
                     defaultBrokerConfManager.getBdbDefFlowCtrl();
             if (bdbGroupFlowCtrlEntity == null) {
-                builder.setSsdStoreId(TBaseConstants.META_VALUE_UNDEFINED);
                 builder.setFlowCheckId(TBaseConstants.META_VALUE_UNDEFINED);
                 builder.setQryPriorityId(TBaseConstants.META_VALUE_UNDEFINED);
                 if (request.getFlowCheckId() != TBaseConstants.META_VALUE_UNDEFINED) {
@@ -1097,11 +1080,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                         builder.setFlowControlInfo(" ");
                     }
                 }
-                if (bdbGroupFlowCtrlEntity.isNeedSSDProc()) {
-                    builder.setSsdStoreId(bdbGroupFlowCtrlEntity.getSsdTranslateId());
-                } else {
-                    builder.setSsdStoreId(TBaseConstants.META_VALUE_UNDEFINED);
-                }
             }
         }
         logger.info(strBuffer.append("[TMaster sync] push broker configure: brokerId = ")
@@ -1225,8 +1203,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                 request.getReadStatusRpt(), request.getWriteStatusRpt());
         long reFlowCtrlId = request.hasFlowCheckId()
                 ? request.getFlowCheckId() : TBaseConstants.META_VALUE_UNDEFINED;
-        long reqSsdTranslateId = request.hasSsdStoreId()
-                ? request.getSsdStoreId() : TBaseConstants.META_VALUE_UNDEFINED;
         int qryPriorityId = request.hasQryPriorityId()
                 ? request.getQryPriorityId() : TBaseConstants.META_VALUE_UNDEFINED;
         if (request.getTakeConfInfo()) {
@@ -1238,7 +1214,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                 .append(",checksumId=").append(request.getConfCheckSumId())
                 .append(",hasFlowCheckId=").append(request.hasFlowCheckId())
                 .append(",reFlowCtrlId=").append(reFlowCtrlId)
-                .append(",reqSsdTranslateId=").append(reqSsdTranslateId)
                 .append(",qryPriorityId=").append(qryPriorityId)
                 .append(",brokerOnline=").append(request.getBrokerOnline())
                 .append(",default broker configure is ").append(request.getBrokerDefaultConfInfo())
@@ -1251,12 +1226,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
         builder.setNeedReportData(brokerSyncStatusInfo.needReportData());
         builder.setCurBrokerConfId(brokerSyncStatusInfo.getLastPushBrokerConfId());
         builder.setConfCheckSumId(brokerSyncStatusInfo.getLastPushBrokerCheckSumId());
+        builder.setSsdStoreId(TBaseConstants.META_VALUE_UNDEFINED);
         if (request.hasFlowCheckId()) {
             BdbGroupFlowCtrlEntity bdbGroupFlowCtrlEntity =
                     defaultBrokerConfManager.getBdbDefFlowCtrl();
             if (bdbGroupFlowCtrlEntity == null) {
                 builder.setFlowCheckId(TBaseConstants.META_VALUE_UNDEFINED);
-                builder.setSsdStoreId(TBaseConstants.META_VALUE_UNDEFINED);
                 builder.setQryPriorityId(TBaseConstants.META_VALUE_UNDEFINED);
                 if (request.getFlowCheckId() != TBaseConstants.META_VALUE_UNDEFINED) {
                     builder.setFlowControlInfo(" ");
@@ -1271,11 +1246,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                         builder.setFlowControlInfo(" ");
                     }
                 }
-                if (bdbGroupFlowCtrlEntity.isNeedSSDProc()) {
-                    builder.setSsdStoreId(bdbGroupFlowCtrlEntity.getSsdTranslateId());
-                } else {
-                    builder.setSsdStoreId(TBaseConstants.META_VALUE_UNDEFINED);
-                }
             }
         }
         brokerHolder.setBrokerHeartBeatReqStatus(brokerInfo.getBrokerId(), builder);
@@ -1295,7 +1265,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
                 .append(",set flowCtrlId=").append(builder.getFlowCheckId())
                 .append(",stopWrite=").append(builder.getStopWrite())
                 .append(",stopRead=").append(builder.getStopRead())
-                .append(",ssdTranslateId=").append(builder.getSsdStoreId())
                 .append(",qryPriorityId=").append(builder.getQryPriorityId())
                 .append(",checksumId=").append(brokerSyncStatusInfo.getLastPushBrokerCheckSumId())
                 .append(",default configure is ")
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminFlowRuleHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminFlowRuleHandler.java
index e6a91a1..acc74c0 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminFlowRuleHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminFlowRuleHandler.java
@@ -79,11 +79,6 @@ public class WebAdminFlowRuleHandler {
                     WebParameterUtils.validIntDataParameter("qryPriorityId",
                             req.getParameter("qryPriorityId"), false, 301, 101);
             checkQryPriorityId(qryPriorityId);
-            // get if enable ssd process function
-            boolean curNeedSSDProc =
-                    WebParameterUtils.validBooleanDataParameter("needSSDProc",
-                            req.getParameter("needSSDProc"),
-                            false, false);
             Set<String> batchGroupNames = new HashSet<>();
             if (opType == 1) {
                 batchGroupNames.add(TServerConstants.TOKEN_DEFAULT_FLOW_CONTROL);
@@ -107,12 +102,12 @@ public class WebAdminFlowRuleHandler {
                     brokerConfManager.confAddBdbGroupFlowCtrl(
                             new BdbGroupFlowCtrlEntity(strBuffer.toString(),
                                     statusId, ruleCnt, qryPriorityId, "",
-                                    curNeedSSDProc, createUser, createDate));
+                                false, createUser, createDate));
                 } else {
                     brokerConfManager.confAddBdbGroupFlowCtrl(
                             new BdbGroupFlowCtrlEntity(groupName,
                                     strBuffer.toString(), statusId, ruleCnt, qryPriorityId, "",
-                                    curNeedSSDProc, createUser, createDate));
+                                false, createUser, createDate));
                 }
             }
             strBuffer.delete(0, strBuffer.length());
@@ -248,16 +243,6 @@ public class WebAdminFlowRuleHandler {
                         newGroupFlowCtrlEntity.setFlowCtrlInfo(newFlowCtrlInfo);
                         newGroupFlowCtrlEntity.setRuleCnt(ruleCnt);
                     }
-                    String inNeedSsdProc = req.getParameter("needSSDProc");
-                    if (TStringUtils.isNotBlank(inNeedSsdProc)) {
-                        boolean curNeedSsdProc =
-                                WebParameterUtils.validBooleanDataParameter("needSSDProc",
-                                        req.getParameter("needSSDProc"), false, false);
-                        if (curNeedSsdProc != oldEntity.isNeedSSDProc()) {
-                            foundChange = true;
-                            newGroupFlowCtrlEntity.setNeedSSDProc(curNeedSsdProc);
-                        }
-                    }
                     // update record if found change
                     if (foundChange) {
                         try {
@@ -361,7 +346,7 @@ public class WebAdminFlowRuleHandler {
         strBuffer.append("[");
         if (TStringUtils.isNotBlank(inFlowCtrlInfo)) {
             List<Integer> ruleTypes = Arrays.asList(0, 1, 2, 3);
-            inFlowCtrlInfo = String.valueOf(inFlowCtrlInfo).trim();
+            inFlowCtrlInfo = inFlowCtrlInfo.trim();
             FlowCtrlRuleHandler flowCtrlRuleHandler =
                 new FlowCtrlRuleHandler(true);
             Map<Integer, List<FlowCtrlItem>> flowCtrlItemMap =
@@ -371,10 +356,6 @@ public class WebAdminFlowRuleHandler {
                     int rules = 0;
                     List<FlowCtrlItem> flowCtrlItems = flowCtrlItemMap.get(typeId);
                     if (flowCtrlItems != null) {
-                        if (opType != 1 && typeId == 2) {
-                            throw new Exception(
-                                    "Illegal value: SSD limit rule only set in default flow control set!");
-                        }
                         if (ruleCnt++ > 0) {
                             strBuffer.append(",");
                         }