You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by la...@apache.org on 2020/05/22 02:21:51 UTC
[incubator-tubemq] branch master updated: [TUBEMQ-140] Remove the
SSD auxiliary consumption function (#89)
This is an automated email from the ASF dual-hosted git repository.
lamberliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push:
new bccceba [TUBEMQ-140] Remove the SSD auxiliary consumption function (#89)
bccceba is described below
commit bccceba11ca84b6258dbc4a5d2a5679bf1c292f7
Author: gosonzhang <46...@qq.com>
AuthorDate: Fri May 22 02:21:45 2020 +0000
[TUBEMQ-140] Remove the SSD auxiliary consumption function (#89)
Co-authored-by: gosonzhang <go...@tencent.com>
---
docs/http_access_API_definition.md | 12 +-
docs/http_access_API_definition_cn.xls | Bin 193024 -> 193536 bytes
docs/tubemq_config_introduction.md | 3 -
docs/tubemq_config_introduction_cn.doc | Bin 182272 -> 179712 bytes
.../client/consumer/BaseMessageConsumer.java | 16 +-
.../corebase/policies/FlowCtrlRuleHandler.java | 167 +----
tubemq-core/src/main/protobuf/BrokerService.proto | 4 +-
tubemq-core/src/main/protobuf/MasterService.proto | 16 +-
.../corebase/policies/TestFlowCtrlRuleHandler.java | 13 +-
.../apache/tubemq/server/broker/BrokerConfig.java | 26 -
.../tubemq/server/broker/BrokerServiceServer.java | 22 +-
.../apache/tubemq/server/broker/TubeBroker.java | 21 +-
.../server/broker/msgstore/MessageStore.java | 55 +-
.../broker/msgstore/MessageStoreManager.java | 67 --
.../server/broker/msgstore/disk/MsgFileStore.java | 24 -
.../server/broker/msgstore/ssd/MsgSSDSegment.java | 325 ---------
.../broker/msgstore/ssd/MsgSSDStoreManager.java | 779 ---------------------
.../server/broker/msgstore/ssd/SSDSegEvent.java | 47 --
.../server/broker/msgstore/ssd/SSDSegFound.java | 49 --
.../server/broker/msgstore/ssd/SSDSegIndex.java | 64 --
.../server/broker/msgstore/ssd/SSDVisitInfo.java | 44 --
.../server/broker/nodeinfo/ConsumerNodeInfo.java | 179 +----
.../server/broker/web/BrokerAdminServlet.java | 8 +-
.../org/apache/tubemq/server/master/TMaster.java | 35 +-
.../web/handler/WebAdminFlowRuleHandler.java | 25 +-
25 files changed, 57 insertions(+), 1944 deletions(-)
diff --git a/docs/http_access_API_definition.md b/docs/http_access_API_definition.md
index 3d44c0c..569b0b8 100644
--- a/docs/http_access_API_definition.md
+++ b/docs/http_access_API_definition.md
@@ -627,11 +627,10 @@ The flow control info is described in JSON format, for example:
```json
[{"type":0,"rule":[{"start":"08:00","end":"17:59","dltInM":1024,"limitInM":20,"freqInMs":1000},{"start":"18:00","end":"22:00","dltInM":1024,"limitInM":20,"freqInMs":5000}]},{"type":2,"rule":[{"start":"18:00","end":"23:59","dltStInM":20480,"dltEdInMM":2048}]},{"type":1,"rule":[{"zeroCnt":3,"freqInMs":300},{"zeroCnt":8,"freqInMs":1000}]},{"type":3,"rule":[{"normFreqInMs":0,"filterFreqInMs":100,"minDataFilterFreqInMs":400}]}]
```
-The `type` has four values [0, 1, 2, 3]. 0: flow control, 1: frequency control, 2: delay to SSD storage control, 3: filter consumer frequency control,<br>
- `[start, end]` is an inclusive range of time, `dltInM` is the consuming delta in MB, `dltStInM` is consuming data delta when enabling SSD to storage <br>
-`dltEdInM` is consuming data delta when terminating SSD to storage, `limitInM` is the flow control each minute, `freqInMs` is the interval for sending request
-after exceeding the flow or freq limit, `zeroCnt` is the count of how many times occurs zero data, `normFreqInMs` is the interval of sequential pulling,<br>
-`filterFreqInMs` is the interval of pulling filtered request.
+The `type` has four values [0, 1, 3]. 0: flow control, 1: frequency control, 3: filter consumer frequency control,<br>
+ `[start, end]` is an inclusive range of time, `dltInM` is the consuming delta in MB, `limitInM` is the flow control each minute, <br>
+ `freqInMs` is the interval for sending request after exceeding the flow or freq limit, `zeroCnt` is the count of how many times occurs zero data, <br>
+ `normFreqInMs` is the interval of sequential pulling, `filterFreqInMs` is the interval of pulling filtered request.
__Request__
@@ -658,7 +657,6 @@ __Request__
|StatusId|no| the strategy status Id, default 0|int|
|qryPriorityId|no| the consuming priority Id. It is a composed field `A0B` with default value 301, <br>the value of A,B is [1, 2, 3] which means file, backup memory, and main memory respectively|int|
|createUser|yes|the creator|String|
-|needSSDProc|no|whether to enable SSD to handle, default false|Boolean|
|flowCtrlInfo|yes|the flow control info in JSON format|String|
|createDate|yes|the creating date in format `yyyyMMddHHmmss`|String|
@@ -688,7 +686,6 @@ __Request__
|StatusId|no| the strategy status Id, default 0|int|
|qryPriorityId|no|the consuming priority Id. It is a composed field `A0B` with default value 301,<br> the value of A,B is [1, 2, 3] which means file, backup memory, and main memory respectively|int|
|createUser|yes|the creator|String|
-|needSSDProc|no|whether to enable SSD to handle, default false|Boolean|
|createDate|yes|the creating date in format `yyyyMMddHHmmss`|String|
### `admin_upd_group_flow_control_rule`
@@ -705,7 +702,6 @@ __Request__
|StatusId|no| the strategy status Id, default 0|int|
|qryPriorityId|no|the consuming priority Id. It is a composed field `A0B` with default value 301,<br> the value of A,B is [1, 2, 3] which means file, backup memory, and main memory respectively|int|
|createUser|yes|the creator|String|
-|needSSDProc|no|whether to enable SSD to handle, default false|Boolean|
|createDate|yes|the creating date in format `yyyyMMddHHmmss`|String|
diff --git a/docs/http_access_API_definition_cn.xls b/docs/http_access_API_definition_cn.xls
index e090109..3373687 100644
Binary files a/docs/http_access_API_definition_cn.xls and b/docs/http_access_API_definition_cn.xls differ
diff --git a/docs/tubemq_config_introduction.md b/docs/tubemq_config_introduction.md
index 61026e4..8a05f10 100644
--- a/docs/tubemq_config_introduction.md
+++ b/docs/tubemq_config_introduction.md
@@ -107,9 +107,6 @@ In addition to the back-end system configuration file, the Master also stores th
| consumerRegTimeoutMs | no | long | Consumer heartbeat timeout, optional, in milliseconds, default 30 seconds |
| socketRecvBuffer | no | long | Socket receives the size of the Buffer buffer SO_RCVBUF, the unit byte, the negative number is not set, the default value is |
| socketSendBuffer | no | long | Socket sends Buffer buffer SO_SNDBUF size, unit byte, negative number is not set, the default value is |
-| secondDataPath | no | string | The SSD to storage location where the broker is located, optional field. The default is blank to indicate that the machine has no SSD. |
-| maxSSDTotalFileCnt | no | int | The maximum number of Data files allowed by the SSD where the Broker is located, optional field, default 70 |
-| maxSSDTotalFileSizes | no | long | The SSD where the Broker is located allows the maximum size of the data file to be saved. The optional field is 32G by default. |
| tcpWriteServiceThread | no | int | Broker supports the number of socket worker threads for TCP production services, optional fields, and defaults to 2 times the number of CPUs of the machine. |
| tcpReadServiceThread | no | int | Broker supports the number of socket worker threads for TCP consumer services, optional fields, defaults to 2 times the number of CPUs of the machine |
| logClearupDurationMs | no | long | The aging cleanup period of the message file, in milliseconds. The default is 3 minutes for a log cleanup operation. The minimum is 1 minutes. |
diff --git a/docs/tubemq_config_introduction_cn.doc b/docs/tubemq_config_introduction_cn.doc
index 89fd19c..216de22 100644
Binary files a/docs/tubemq_config_introduction_cn.doc and b/docs/tubemq_config_introduction_cn.doc differ
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
index 997320f..b9aa993 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
@@ -886,7 +886,6 @@ public class BaseMessageConsumer implements MessageConsumer {
builder.setSessionTime(this.consumeSubInfo.getSubscribedTime());
builder.addAllTopicList(this.consumeSubInfo.getSubscribedTopics());
builder.setDefFlowCheckId(defFlowCtrlRuleHandler.getFlowCtrlId());
- builder.setSsdStoreId(groupFlowCtrlRuleHandler.getSsdTranslateId());
builder.setGroupFlowCheckId(groupFlowCtrlRuleHandler.getFlowCtrlId());
builder.setQryPriorityId(groupFlowCtrlRuleHandler.getQryPriorityId());
List<SubscribeInfo> subInfoList =
@@ -945,7 +944,6 @@ public class BaseMessageConsumer implements MessageConsumer {
builder.setGroupName(this.consumerConfig.getConsumerGroup());
builder.setReportSubscribeInfo(reportSubscribeInfo);
builder.setDefFlowCheckId(defFlowCtrlRuleHandler.getFlowCtrlId());
- builder.setSsdStoreId(groupFlowCtrlRuleHandler.getSsdTranslateId());
builder.setQryPriorityId(groupFlowCtrlRuleHandler.getQryPriorityId());
builder.setGroupFlowCheckId(groupFlowCtrlRuleHandler.getFlowCtrlId());
if (event != null) {
@@ -989,7 +987,6 @@ public class BaseMessageConsumer implements MessageConsumer {
builder.setOpType(RpcConstants.MSG_OPTYPE_REGISTER);
builder.setTopicName(partition.getTopic());
builder.setPartitionId(partition.getPartitionId());
- builder.setSsdStoreId(groupFlowCtrlRuleHandler.getSsdTranslateId());
builder.setQryPriorityId(groupFlowCtrlRuleHandler.getQryPriorityId());
builder.setReadStatus(getGroupInitReadStatus(rmtDataCache.bookPartition(partition.getPartitionKey())));
TopicProcessor topicProcessor =
@@ -1042,7 +1039,6 @@ public class BaseMessageConsumer implements MessageConsumer {
builder.setClientId(consumerId);
builder.setGroupName(this.consumerConfig.getConsumerGroup());
builder.setReadStatus(getGroupInitReadStatus(false));
- builder.setSsdStoreId(groupFlowCtrlRuleHandler.getSsdTranslateId());
builder.setQryPriorityId(groupFlowCtrlRuleHandler.getQryPriorityId());
builder.addAllPartitionInfo(partitionList);
ClientBroker.AuthorizedInfo.Builder authInfoBuilder =
@@ -1062,16 +1058,12 @@ public class BaseMessageConsumer implements MessageConsumer {
? response.getQryPriorityId() : groupFlowCtrlRuleHandler.getQryPriorityId();
if (response.getGroupFlowCheckId() != groupFlowCtrlRuleHandler.getFlowCtrlId()) {
try {
- groupFlowCtrlRuleHandler.updateDefFlowCtrlInfo(response.getSsdStoreId(),
- TBaseConstants.META_VALUE_UNDEFINED,
+ groupFlowCtrlRuleHandler.updateDefFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
response.getGroupFlowCheckId(), response.getGroupFlowControlInfo());
} catch (Exception e1) {
logger.warn("[Register response] found parse group flowCtrl rules failure", e1);
}
}
- if (response.getSsdStoreId() != groupFlowCtrlRuleHandler.getSsdTranslateId()) {
- groupFlowCtrlRuleHandler.setSsdTranslateId(response.getSsdStoreId());
- }
if (qryPriorityId != groupFlowCtrlRuleHandler.getQryPriorityId()) {
groupFlowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
}
@@ -1093,17 +1085,13 @@ public class BaseMessageConsumer implements MessageConsumer {
? response.getQryPriorityId() : groupFlowCtrlRuleHandler.getQryPriorityId();
if (response.getGroupFlowCheckId() != groupFlowCtrlRuleHandler.getFlowCtrlId()) {
try {
- groupFlowCtrlRuleHandler.updateDefFlowCtrlInfo(response.getSsdStoreId(),
- TBaseConstants.META_VALUE_UNDEFINED,
+ groupFlowCtrlRuleHandler.updateDefFlowCtrlInfo(TBaseConstants.META_VALUE_UNDEFINED,
response.getGroupFlowCheckId(), response.getGroupFlowControlInfo());
} catch (Exception e1) {
logger.warn(
"[Heartbeat response] found parse group flowCtrl rules failure", e1);
}
}
- if (response.getSsdStoreId() != groupFlowCtrlRuleHandler.getSsdTranslateId()) {
- groupFlowCtrlRuleHandler.setSsdTranslateId(response.getSsdStoreId());
- }
if (qryPriorityId != groupFlowCtrlRuleHandler.getQryPriorityId()) {
groupFlowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
}
diff --git a/tubemq-core/src/main/java/org/apache/tubemq/corebase/policies/FlowCtrlRuleHandler.java b/tubemq-core/src/main/java/org/apache/tubemq/corebase/policies/FlowCtrlRuleHandler.java
index ade5c55..0df1625 100644
--- a/tubemq-core/src/main/java/org/apache/tubemq/corebase/policies/FlowCtrlRuleHandler.java
+++ b/tubemq-core/src/main/java/org/apache/tubemq/corebase/policies/FlowCtrlRuleHandler.java
@@ -53,8 +53,6 @@ public class FlowCtrlRuleHandler {
// Flow control ID and string information obtained from the server
private AtomicLong flowCtrlId =
new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
- private AtomicLong ssdTranslateId =
- new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
private AtomicInteger qryPriorityId =
new AtomicInteger(TBaseConstants.META_VALUE_UNDEFINED);
private String strFlowCtrlInfo;
@@ -62,12 +60,6 @@ public class FlowCtrlRuleHandler {
//improving the efficiency of the search return in the range
private AtomicInteger minZeroCnt =
new AtomicInteger(Integer.MAX_VALUE);
- private AtomicLong minSSDProcDlt =
- new AtomicLong(Long.MAX_VALUE);
- private AtomicInteger ssdLimitStartTime =
- new AtomicInteger(2500);
- private AtomicInteger ssdLimitEndTime =
- new AtomicInteger(TBaseConstants.META_VALUE_UNDEFINED);
private AtomicLong minDataLimitDlt =
new AtomicLong(Long.MAX_VALUE);
private AtomicInteger dataLimitStartTime =
@@ -94,14 +86,12 @@ public class FlowCtrlRuleHandler {
}
/**
- * @param ssdTranslateId
* @param qyrPriorityId
* @param flowCtrlId
* @param flowCtrlInfo
* @throws Exception
*/
- public void updateDefFlowCtrlInfo(final long ssdTranslateId,
- final int qyrPriorityId,
+ public void updateDefFlowCtrlInfo(final int qyrPriorityId,
final long flowCtrlId,
final String flowCtrlInfo) throws Exception {
if (flowCtrlId == this.flowCtrlId.get()) {
@@ -118,9 +108,7 @@ public class FlowCtrlRuleHandler {
logger.info(new StringBuilder(512)
.append("[Flow Ctrl] Updated ").append(flowCtrlName)
.append(" to flowId=").append(flowCtrlId)
- .append(",ssdTranslateId=").append(ssdTranslateId)
.append(",qyrPriorityId=").append(qyrPriorityId).toString());
- this.ssdTranslateId.set(ssdTranslateId);
this.qryPriorityId.set(qyrPriorityId);
clearStatisData();
if (flowCtrlItemsMap == null
@@ -188,7 +176,6 @@ public class FlowCtrlRuleHandler {
private void initialStatisData() {
initialDataLimitStatisInfo();
initialFreqLimitStatisInfo();
- initialSSDProcLimitStatisInfo();
initialLowFetchLimitStatisInfo();
}
@@ -237,29 +224,6 @@ public class FlowCtrlRuleHandler {
}
}
- private void initialSSDProcLimitStatisInfo() {
- List<FlowCtrlItem> flowCtrlItemList = flowCtrlRuleSet.get(2);
- if (flowCtrlItemList != null && !flowCtrlItemList.isEmpty()) {
- for (FlowCtrlItem flowCtrlItem : flowCtrlItemList) {
- if (flowCtrlItem == null) {
- continue;
- }
- if (flowCtrlItem.getType() != 2) {
- continue;
- }
- if (flowCtrlItem.getDltInM() < this.minSSDProcDlt.get()) {
- this.minSSDProcDlt.set(flowCtrlItem.getDltInM());
- }
- if (flowCtrlItem.getStartTime() < this.ssdLimitStartTime.get()) {
- this.ssdLimitStartTime.set(flowCtrlItem.getStartTime());
- }
- if (flowCtrlItem.getEndTime() > this.ssdLimitEndTime.get()) {
- this.ssdLimitEndTime.set(flowCtrlItem.getEndTime());
- }
- }
- }
- }
-
private void initialLowFetchLimitStatisInfo() {
List<FlowCtrlItem> flowCtrlItemList = flowCtrlRuleSet.get(3);
if (flowCtrlItemList != null && !flowCtrlItemList.isEmpty()) {
@@ -280,43 +244,13 @@ public class FlowCtrlRuleHandler {
private void clearStatisData() {
this.minZeroCnt.set(Integer.MAX_VALUE);
- this.minSSDProcDlt.set(Long.MAX_VALUE);
this.minDataLimitDlt.set(Long.MAX_VALUE);
- this.ssdLimitStartTime.set(2500);
- this.ssdLimitEndTime.set(TBaseConstants.META_VALUE_UNDEFINED);
this.dataLimitStartTime.set(2500);
this.dataLimitEndTime.set(TBaseConstants.META_VALUE_UNDEFINED);
this.filterCtrlItem = new FlowCtrlItem(3, TBaseConstants.META_VALUE_UNDEFINED,
TBaseConstants.META_VALUE_UNDEFINED, TBaseConstants.META_VALUE_UNDEFINED);
}
-
- public SSDCtrlResult getCurSSDStartDltInSZ() {
- Calendar rightNow = Calendar.getInstance(timeZone);
- int hour = rightNow.get(Calendar.HOUR_OF_DAY);
- int minu = rightNow.get(Calendar.MINUTE);
- int curTime = hour * 100 + minu;
- if (curTime < this.ssdLimitStartTime.get()
- || curTime > this.ssdLimitEndTime.get()) {
- return new SSDCtrlResult(Long.MAX_VALUE, 0);
- }
- List<FlowCtrlItem> flowCtrlItemList = flowCtrlRuleSet.get(2);
- if (flowCtrlItemList == null
- || flowCtrlItemList.isEmpty()) {
- return new SSDCtrlResult(Long.MAX_VALUE, 0);
- }
- for (FlowCtrlItem flowCtrlItem : flowCtrlItemList) {
- if (flowCtrlItem == null) {
- continue;
- }
- SSDCtrlResult ruleVal = flowCtrlItem.getSSDStartDltProc(hour, minu);
- if (ruleVal != null && ruleVal.dataStartDltInSize > 0) {
- return ruleVal;
- }
- }
- return new SSDCtrlResult(Long.MAX_VALUE, 0);
- }
-
public int getMinZeroCnt() {
return minZeroCnt.get();
}
@@ -348,18 +282,6 @@ public class FlowCtrlRuleHandler {
return rcmVal;
}
- public long getSsdTranslateId() {
- return ssdTranslateId.get();
- }
-
-
- /**
- * @param ssdTranslateId
- */
- public void setSsdTranslateId(long ssdTranslateId) {
- this.ssdTranslateId.set(ssdTranslateId);
- }
-
public int getQryPriorityId() {
return qryPriorityId.get();
}
@@ -382,7 +304,6 @@ public class FlowCtrlRuleHandler {
this.strFlowCtrlInfo = "";
this.flowCtrlRuleSet.clear();
this.flowCtrlId.set(TBaseConstants.META_VALUE_UNDEFINED);
- this.ssdTranslateId.set(TBaseConstants.META_VALUE_UNDEFINED);
this.qryPriorityId.set(TBaseConstants.META_VALUE_UNDEFINED);
} finally {
writeLock.unlock();
@@ -420,7 +341,7 @@ public class FlowCtrlRuleHandler {
int typeVal = jsonObject.get("type").getAsInt();
if (typeVal < 0 || typeVal > 3) {
throw new Exception(new StringBuilder(512)
- .append("type value must in [0,1,2,3] in index(")
+ .append("type value must in [0,1,3] in index(")
.append(i).append(") of flowCtrlInfo value!").toString());
}
switch (typeVal) {
@@ -428,8 +349,8 @@ public class FlowCtrlRuleHandler {
flowCtrlItemList = parseFreqLimit(typeVal, jsonObject);
break;
- case 2:
- flowCtrlItemList = parseSSDProcLimit(typeVal, jsonObject);
+ case 2: /* Deprecated */
+ flowCtrlItemList = null;
break;
case 3:
@@ -670,86 +591,6 @@ public class FlowCtrlRuleHandler {
return flowCtrlItems;
}
- /**
- * @param typeVal
- * @param jsonObject
- * @return
- * @throws Exception
- */
- private List<FlowCtrlItem> parseSSDProcLimit(int typeVal,
- JsonObject jsonObject) throws Exception {
- if (jsonObject == null || jsonObject.get("type").getAsInt() != 2) {
- throw new Exception("parse SSD limit rule failure!");
- }
- JsonArray ruleArray = jsonObject.get("rule").getAsJsonArray();
- if (ruleArray == null) {
- throw new Exception("not found rule list in SSD limit!");
- }
- ArrayList<FlowCtrlItem> flowCtrlItems = new ArrayList<>();
- for (int index = 0; index < ruleArray.size(); index++) {
- JsonObject ruleObject = ruleArray.get(index).getAsJsonObject();
- int startTime = validAndGetTimeValue("start",
- ruleObject.get("start").getAsString(), index, "SSD");
- int endTime = validAndGetTimeValue("end",
- ruleObject.get("end").getAsString(), index, "SSD");
- if (startTime >= endTime) {
- throw new Exception(new StringBuilder(512)
- .append("start value must be less than End value in index(")
- .append(index).append(") of SSD limit rule!").toString());
- }
- if (!ruleObject.has("dltStInM")) {
- throw new Exception(new StringBuilder(512)
- .append("dltStInM key is required in index(")
- .append(index).append(") of SSD limit rule!").toString());
- }
- long dltStInM = ruleObject.get("dltStInM").getAsLong();
- if (dltStInM < 512) {
- throw new Exception(new StringBuilder(512)
- .append("dltStInM value must be greater than or equal to 512 in index(")
- .append(index).append(") of SSD limit rule!").toString());
- }
- if (!ruleObject.has("dltEdInM")) {
- throw new Exception(new StringBuilder(512)
- .append("dltEdInM key is required in index(")
- .append(index).append(") of SSD limit rule!").toString());
- }
- long dataEndInM = ruleObject.get("dltEdInM").getAsLong();
- if (dataEndInM < 0) {
- throw new Exception(new StringBuilder(512)
- .append("dltEdInM value must be greater than or equal to zero in index(")
- .append(index).append(") of SSD limit rule!").toString());
- }
- if (dataEndInM < 512) {
- throw new Exception(new StringBuilder(512)
- .append("dltStInM value must be greater than or equal to 512 in index(")
- .append(index).append(") of SSD limit rule!").toString());
- }
- if (dltStInM < dataEndInM) {
- throw new Exception(new StringBuilder(512)
- .append("dltStInM value must be greater than ")
- .append("or equal to dltEdInM value in index(")
- .append(index).append(") of SSD limit rule!").toString());
- }
- dltStInM = (long) (dltStInM * 1024 * 1024);
- dataEndInM = (long) (dataEndInM * 1024 * 1024);
- flowCtrlItems.add(new FlowCtrlItem(typeVal, startTime, endTime, dltStInM, dataEndInM));
- }
-
- Collections.sort(flowCtrlItems, new Comparator<FlowCtrlItem>() {
- @Override
- public int compare(final FlowCtrlItem o1, final FlowCtrlItem o2) {
- if (o1.getStartTime() > o2.getStartTime()) {
- return 1;
- } else if (o1.getStartTime() < o2.getStartTime()) {
- return -1;
- } else {
- return 0;
- }
- }
- });
- return flowCtrlItems;
- }
-
@Override
public String toString() {
return this.strFlowCtrlInfo;
diff --git a/tubemq-core/src/main/protobuf/BrokerService.proto b/tubemq-core/src/main/protobuf/BrokerService.proto
index af6608f..707e1ca 100644
--- a/tubemq-core/src/main/protobuf/BrokerService.proto
+++ b/tubemq-core/src/main/protobuf/BrokerService.proto
@@ -65,7 +65,7 @@ message RegisterRequestC2B {
optional int64 currOffset = 8;
optional string sessionKey = 9;
optional int64 sessionTime = 10;
- optional int64 ssdStoreId = 11;
+ optional int64 ssdStoreId = 11; /* Deprecated */
optional int32 qryPriorityId = 12;
optional AuthorizedInfo authInfo = 13;
}
@@ -83,7 +83,7 @@ message HeartBeatRequestC2B {
required int32 readStatus = 3;
/* brokerId:host:port:topic:partitionId:delayTimeStamp */
repeated string partitionInfo = 4;
- optional int64 ssdStoreId = 5;
+ optional int64 ssdStoreId = 5; /* Deprecated */
optional int32 qryPriorityId = 6;
optional AuthorizedInfo authInfo = 7;
}
diff --git a/tubemq-core/src/main/protobuf/MasterService.proto b/tubemq-core/src/main/protobuf/MasterService.proto
index cea6838..a95e794 100644
--- a/tubemq-core/src/main/protobuf/MasterService.proto
+++ b/tubemq-core/src/main/protobuf/MasterService.proto
@@ -126,7 +126,7 @@ message RegisterRequestC2M {
optional bool selectBig = 13;
optional int64 groupFlowCheckId = 14;
optional int64 defFlowCheckId = 15;
- optional int64 ssdStoreId = 16;
+ optional int64 ssdStoreId = 16; /* Deprecated */
optional int32 qryPriorityId = 17;
optional MasterCertificateInfo authInfo = 18;
}
@@ -142,7 +142,7 @@ message RegisterResponseM2C {
optional string defFlowControlInfo = 7;
optional int64 groupFlowCheckId = 8;
optional string groupFlowControlInfo = 9;
- optional int64 ssdStoreId = 10;
+ optional int64 ssdStoreId = 10; /* Deprecated */
optional int32 qryPriorityId = 11;
optional MasterAuthorizedInfo authorizedInfo = 12;
}
@@ -155,7 +155,7 @@ message HeartRequestC2M {
optional EventProto event = 5;
optional int64 defFlowCheckId = 6;
optional int64 groupFlowCheckId = 7;
- optional int64 ssdStoreId = 8;
+ optional int64 ssdStoreId = 8; /* Deprecated */
optional int32 qryPriorityId = 9;
optional MasterCertificateInfo authInfo = 10;
}
@@ -170,7 +170,7 @@ message HeartResponseM2C {
optional string defFlowControlInfo = 7;
optional int64 groupFlowCheckId = 8;
optional string groupFlowControlInfo = 9;
- optional int64 ssdStoreId = 10;
+ optional int64 ssdStoreId = 10; /* Deprecated */
optional int32 qryPriorityId = 11;
optional bool requireAuth = 12;
optional MasterAuthorizedInfo authorizedInfo = 13;
@@ -201,7 +201,7 @@ message RegisterRequestB2M {
required string brokerDefaultConfInfo = 8;
/* topic:partNum:acceptPublish:acceptSubscribe:unflushThreshold:unflushInterval:deleteWhen:deletePolicy:filterStatusId:statusId:attributes */
repeated string brokerTopicSetConfInfo = 9;
- optional int64 ssdStoreId = 10;
+ optional int64 ssdStoreId = 10; /* Deprecated */
optional int64 flowCheckId = 11;
optional int32 qryPriorityId = 12;
optional int32 tlsPort = 13;
@@ -222,7 +222,7 @@ message RegisterResponseM2B {
optional string brokerDefaultConfInfo = 10;
/* topic:partNum:acceptPublish:acceptSubscribe:unflushThreshold:unflushInterval:deleteWhen:deletePolicy:filterStatusId:statusId:attributes */
repeated string brokerTopicSetConfInfo = 11;
- optional int64 ssdStoreId = 12;
+ optional int64 ssdStoreId = 12; /* Deprecated */
optional int64 flowCheckId = 13;
optional string flowControlInfo = 14;
optional int32 qryPriorityId = 15;
@@ -244,7 +244,7 @@ message HeartRequestB2M {
/* topic:partNum:acceptPublish:acceptSubscribe:unflushThreshold:unflushInterval:deleteWhen:deletePolicy:filterStatusId:statusId:attributes */
repeated string brokerTopicSetConfInfo = 10;
repeated string removedTopicsInfo = 11;
- optional int64 ssdStoreId = 12;
+ optional int64 ssdStoreId = 12; /* Deprecated */
optional int64 flowCheckId = 13;
optional int32 qryPriorityId = 14;
optional MasterCertificateInfo authInfo = 15;
@@ -267,7 +267,7 @@ message HeartResponseM2B {
repeated string brokerTopicSetConfInfo = 12;
/* topic:partNum:acceptPublish:acceptSubscribe:unflushThreshold:unflushInterval:deleteWhen:deletePolicy:filterStatusId:statusId:attributes */
repeated string removeTopicConfInfo = 13;
- optional int64 ssdStoreId = 14;
+ optional int64 ssdStoreId = 14; /* Deprecated */
optional int64 flowCheckId = 15;
optional string flowControlInfo = 16;
optional int32 qryPriorityId = 17;
diff --git a/tubemq-core/src/test/java/org/apache/tubemq/corebase/policies/TestFlowCtrlRuleHandler.java b/tubemq-core/src/test/java/org/apache/tubemq/corebase/policies/TestFlowCtrlRuleHandler.java
index ec14d81..5b84efa 100644
--- a/tubemq-core/src/test/java/org/apache/tubemq/corebase/policies/TestFlowCtrlRuleHandler.java
+++ b/tubemq-core/src/test/java/org/apache/tubemq/corebase/policies/TestFlowCtrlRuleHandler.java
@@ -41,7 +41,7 @@ public class TestFlowCtrlRuleHandler {
public void testFlowCtrlRuleHandler() {
try {
FlowCtrlRuleHandler handler = new FlowCtrlRuleHandler(true);
- handler.updateDefFlowCtrlInfo(1001, 2, 10, mockFlowCtrlInfo());
+ handler.updateDefFlowCtrlInfo(2, 10, mockFlowCtrlInfo());
TimeZone timeZone = TimeZone.getTimeZone("GMT+8:00");
Calendar rightNow = Calendar.getInstance(timeZone);
int hour = rightNow.get(Calendar.HOUR_OF_DAY);
@@ -68,23 +68,12 @@ public class TestFlowCtrlRuleHandler {
assertEquals(item.getZeroCnt(), 400);
assertEquals(item.getFreqLtInMs(), 100);
- // check ssd ctrl
- SSDCtrlResult ssdResult = handler.getCurSSDStartDltInSZ();
- if (curTime >= 1200) {
- assertEquals(ssdResult.dataEndDLtInSz, 2048L * 1024L * 1024L);
- assertEquals(ssdResult.dataStartDltInSize, 20480L * 1024L * 1024L);
- } else {
- assertEquals(ssdResult.dataEndDLtInSz, 0L);
- assertEquals(ssdResult.dataStartDltInSize, Long.MAX_VALUE);
- }
-
//check values
assertEquals(handler.getNormFreqInMs(), 100);
assertEquals(handler.getFlowCtrlId(), 10);
assertEquals(handler.getMinDataFreqInMs(), 400);
assertEquals(handler.getMinZeroCnt(), 3);
assertEquals(handler.getQryPriorityId(), 2);
- assertEquals(handler.getSsdTranslateId(), 1001);
assertEquals(handler.getFlowCtrlId(), 10);
System.out.println();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerConfig.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerConfig.java
index 7644aed..c40e915 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerConfig.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerConfig.java
@@ -50,7 +50,6 @@ public class BrokerConfig extends AbstractFileConfig {
// master service address
private String masterAddressList;
private String primaryPath;
- private String secondDataPath;
// tcp write service thread count
private int tcpWriteServiceThread =
Runtime.getRuntime().availableProcessors() * 2;
@@ -75,10 +74,6 @@ public class BrokerConfig extends AbstractFileConfig {
private int indexTransCount = 1000;
// rpc read timeout in milliseconds
private long rpcReadTimeoutMs = 10 * 1000;
- // max ssd file count
- private int maxSSDTotalFileCnt = 70;
- // max ssd file size
- private long maxSSDTotalFileSizes = 32212254720L;
// consumer register timeout in milliseconds
private int consumerRegTimeoutMs = 30000;
private boolean updateConsumerOffsets = true;
@@ -164,14 +159,6 @@ public class BrokerConfig extends AbstractFileConfig {
return maxIndexSegmentSize;
}
- public int getMaxSSDTotalFileCnt() {
- return maxSSDTotalFileCnt;
- }
-
- public long getMaxSSDTotalFileSizes() {
- return maxSSDTotalFileSizes;
- }
-
public long getAuthValidTimeStampPeriodMs() {
return authValidTimeStampPeriodMs;
}
@@ -210,9 +197,6 @@ public class BrokerConfig extends AbstractFileConfig {
throw new IllegalArgumentException("Require primaryPath not Blank!");
}
this.primaryPath = brokerSect.get("primaryPath").trim();
- if (TStringUtils.isNotBlank(brokerSect.get("secondDataPath"))) {
- this.secondDataPath = brokerSect.get("secondDataPath");
- }
if (TStringUtils.isBlank(brokerSect.get("hostName"))) {
throw new IllegalArgumentException(new StringBuilder(256).append("hostName is null or Blank in ")
.append(SECT_TOKEN_BROKER).append(" section!").toString());
@@ -274,12 +258,6 @@ public class BrokerConfig extends AbstractFileConfig {
if (TStringUtils.isNotBlank(brokerSect.get("maxIndexSegmentSize"))) {
this.maxIndexSegmentSize = getInt(brokerSect, "maxIndexSegmentSize");
}
- if (TStringUtils.isNotBlank(brokerSect.get("maxSSDTotalFileCnt"))) {
- this.maxSSDTotalFileCnt = getInt(brokerSect, "maxSSDTotalFileCnt");
- }
- if (TStringUtils.isNotBlank(brokerSect.get("maxSSDTotalFileSizes"))) {
- this.maxSSDTotalFileSizes = getLong(brokerSect, "maxSSDTotalFileSizes");
- }
if (!TStringUtils.isBlank(brokerSect.get("updateConsumerOffsets"))) {
this.updateConsumerOffsets = getBoolean(brokerSect, "updateConsumerOffsets");
}
@@ -457,10 +435,6 @@ public class BrokerConfig extends AbstractFileConfig {
return this.primaryPath;
}
- public String getSecondDataPath() {
- return this.secondDataPath;
- }
-
public int getWebPort() {
return webPort;
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
index 6cd99b1..5cd6bf8 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/BrokerServiceServer.java
@@ -378,11 +378,7 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
msgResult.lastRdDataOffset,
msgResult.totalMsgSize);
getCounterGroup.add(msgResult.tmpCounters);
- if (msgResult.isFromSsdFile) {
- builder.setEscFlowCtrl(true);
- } else {
- builder.setEscFlowCtrl(false);
- }
+ builder.setEscFlowCtrl(false);
builder.setRequireSlow(msgResult.isSlowFreq);
builder.setSuccess(true);
builder.setErrCode(TErrCodeConstants.SUCCESS);
@@ -839,16 +835,10 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
final long reqOffset = request.hasCurrOffset() ? request.getCurrOffset() : -1;
long reqSessionTime = request.hasSessionTime() ? request.getSessionTime() : -1;
String reqSessionKey = request.hasSessionKey() ? request.getSessionKey() : null;
- long reqSsdStoreId = request.hasSsdStoreId()
- ? request.getSsdStoreId() : TBaseConstants.META_VALUE_UNDEFINED;
int reqQryPriorityId = request.hasQryPriorityId()
? request.getQryPriorityId() : TBaseConstants.META_VALUE_UNDEFINED;
- boolean needSsdProc =
- ((reqSsdStoreId != TBaseConstants.META_VALUE_UNDEFINED)
- && (reqSsdStoreId == this.metadataManager.getFlowCtrlRuleHandler().getSsdTranslateId()));
consumerRegisterMap.put(partStr, new ConsumerNodeInfo(storeManager, reqQryPriorityId,
- clientId, filterCondSet, reqSessionKey, reqSessionTime, reqSsdStoreId, needSsdProc,
- request.hasSsdStoreId(), partStr));
+ clientId, filterCondSet, reqSessionKey, reqSessionTime, true, partStr));
heartbeatManager.regConsumerNode(getHeartbeatNodeId(clientId, partStr), clientId, partStr);
MessageStore dataStore = null;
try {
@@ -876,7 +866,6 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
.append(TokenConstants.SEGMENT_SEP).append(partStr)
.append(TokenConstants.SEGMENT_SEP).append(offsetInfo)
.append(", reqOffset=").append(reqOffset)
- .append(", reqSsdStoreId=").append(reqSsdStoreId)
.append(", reqQryPriorityId=").append(reqQryPriorityId)
.append(", isOverTLS=").append(overtls).toString());
builder.setSuccess(true);
@@ -1014,8 +1003,6 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
return builder.build();
}
final String groupName = (String) paramCheckResult.checkData;
- long reqSsdStoreId = request.hasSsdStoreId()
- ? request.getSsdStoreId() : TBaseConstants.META_VALUE_UNDEFINED;
int reqQryPriorityId = request.hasQryPriorityId()
? request.getQryPriorityId() : TBaseConstants.META_VALUE_UNDEFINED;
List<Partition> partitions =
@@ -1076,11 +1063,6 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic
strBuffer.delete(0, strBuffer.length());
continue;
}
- if (consumerNodeInfo.getSsdTransId() != reqSsdStoreId) {
- boolean needSsdProc = ((reqSsdStoreId != TBaseConstants.META_VALUE_UNDEFINED)
- && (reqSsdStoreId == metadataManager.getFlowCtrlRuleHandler().getSsdTranslateId()));
- consumerNodeInfo.setSsdTransId(reqSsdStoreId, needSsdProc);
- }
if (consumerNodeInfo.getQryPriorityId() != reqQryPriorityId) {
consumerNodeInfo.setQryPriorityId(reqQryPriorityId);
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/TubeBroker.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/TubeBroker.java
index a30a41d..6501378 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/TubeBroker.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/TubeBroker.java
@@ -242,7 +242,6 @@ public class TubeBroker implements Stoppable, Runnable {
FlowCtrlRuleHandler flowCtrlRuleHandler =
metadataManager.getFlowCtrlRuleHandler();
long flowCheckId = flowCtrlRuleHandler.getFlowCtrlId();
- long ssdTranslateId = flowCtrlRuleHandler.getSsdTranslateId();
int qryPriorityId = flowCtrlRuleHandler.getQryPriorityId();
ServiceStatusHolder
.setReadWriteServiceStatus(response.getStopRead(),
@@ -252,20 +251,15 @@ public class TubeBroker implements Stoppable, Runnable {
? response.getQryPriorityId() : qryPriorityId;
if (response.getFlowCheckId() != flowCheckId) {
flowCheckId = response.getFlowCheckId();
- ssdTranslateId = response.getSsdStoreId();
try {
flowCtrlRuleHandler
- .updateDefFlowCtrlInfo(ssdTranslateId,
- qryPriorityId, flowCheckId, response.getFlowControlInfo());
+ .updateDefFlowCtrlInfo(qryPriorityId,
+ flowCheckId, response.getFlowControlInfo());
} catch (Exception e1) {
logger.warn(
"[HeartBeat response] found parse flowCtrl rules failure", e1);
}
}
- if (response.getSsdStoreId() != ssdTranslateId) {
- ssdTranslateId = response.getSsdStoreId();
- flowCtrlRuleHandler.setSsdTranslateId(ssdTranslateId);
- }
if (qryPriorityId != flowCtrlRuleHandler.getQryPriorityId()) {
flowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
}
@@ -281,7 +275,6 @@ public class TubeBroker implements Stoppable, Runnable {
.append(",configCheckSumId=").append(response.getConfCheckSumId())
.append(",hasFlowCtrl=").append(response.hasFlowCheckId())
.append(",curFlowCtrlId=").append(flowCheckId)
- .append(",curSsdTranslateId=").append(ssdTranslateId)
.append(",curQryPriorityId=").append(qryPriorityId)
.append(",brokerDefaultConfInfo=")
.append(response.getBrokerDefaultConfInfo())
@@ -425,15 +418,12 @@ public class TubeBroker implements Stoppable, Runnable {
if (response.getFlowCheckId() != flowCtrlRuleHandler.getFlowCtrlId()) {
try {
flowCtrlRuleHandler
- .updateDefFlowCtrlInfo(response.getSsdStoreId(), response.getQryPriorityId(),
+ .updateDefFlowCtrlInfo(response.getQryPriorityId(),
response.getFlowCheckId(), response.getFlowControlInfo());
} catch (Exception e1) {
logger.warn("[Register response] found parse flowCtrl rules failure", e1);
}
}
- if (response.getSsdStoreId() != flowCtrlRuleHandler.getSsdTranslateId()) {
- flowCtrlRuleHandler.setSsdTranslateId(response.getSsdStoreId());
- }
if (qryPriorityId != flowCtrlRuleHandler.getQryPriorityId()) {
flowCtrlRuleHandler.setQryPriorityId(qryPriorityId);
}
@@ -456,7 +446,6 @@ public class TubeBroker implements Stoppable, Runnable {
.append(",enableConsumeAuthorize=")
.append(serverAuthHandler.isEnableConsumeAuthorize())
.append(",curFlowCtrlId=").append(flowCtrlRuleHandler.getFlowCtrlId())
- .append(",curSsdTranslateId=").append(flowCtrlRuleHandler.getSsdTranslateId())
.append(",curQryPriorityId=").append(flowCtrlRuleHandler.getQryPriorityId())
.append(",brokerDefaultConfInfo=").append(response.getBrokerDefaultConfInfo())
.append(",brokerTopicSetConfList=")
@@ -505,7 +494,6 @@ public class TubeBroker implements Stoppable, Runnable {
metadataManager.getFlowCtrlRuleHandler();
builder.setFlowCheckId(flowCtrlRuleHandler.getFlowCtrlId());
builder.setQryPriorityId(flowCtrlRuleHandler.getQryPriorityId());
- builder.setSsdStoreId(flowCtrlRuleHandler.getSsdTranslateId());
String brokerDefaultConfInfo = metadataManager.getBrokerDefMetaConfInfo();
if (brokerDefaultConfInfo != null) {
builder.setBrokerDefaultConfInfo(brokerDefaultConfInfo);
@@ -526,7 +514,6 @@ public class TubeBroker implements Stoppable, Runnable {
.append(",isTlsEnable=").append(tubeConfig.isTlsEnable())
.append(",TlsPort=").append(tubeConfig.getTlsPort())
.append(",flowCtrlId=").append(flowCtrlRuleHandler.getFlowCtrlId())
- .append(",SSDTranslateId=").append(flowCtrlRuleHandler.getSsdTranslateId())
.append(",QryPriorityId=").append(flowCtrlRuleHandler.getQryPriorityId())
.append(",configCheckSumId=").append(metadataManager.getBrokerConfCheckSumId())
.append(",brokerDefaultConfInfo=").append(brokerDefaultConfInfo)
@@ -551,7 +538,6 @@ public class TubeBroker implements Stoppable, Runnable {
metadataManager.getFlowCtrlRuleHandler();
builder.setFlowCheckId(flowCtrlRuleHandler.getFlowCtrlId());
builder.setQryPriorityId(flowCtrlRuleHandler.getQryPriorityId());
- builder.setSsdStoreId(flowCtrlRuleHandler.getSsdTranslateId());
builder.setTakeConfInfo(false);
builder.setTakeRemovedTopicInfo(false);
List<String> removedTopics = this.metadataManager.getHardRemovedTopics();
@@ -573,7 +559,6 @@ public class TubeBroker implements Stoppable, Runnable {
.append(",readStatusRpt=").append(builder.getReadStatusRpt())
.append(",writeStatusRpt=").append(builder.getWriteStatusRpt())
.append(",flowCtrlId=").append(flowCtrlRuleHandler.getFlowCtrlId())
- .append(",SSDTranslateId=").append(flowCtrlRuleHandler.getSsdTranslateId())
.append(",QryPriorityId=").append(flowCtrlRuleHandler.getQryPriorityId())
.append(",ReadStatusRpt=").append(builder.getReadStatusRpt())
.append(",WriteStatusRpt=").append(builder.getWriteStatusRpt())
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
index 7dbef6d..66f4b5b 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStore.java
@@ -44,7 +44,6 @@ import org.apache.tubemq.server.broker.msgstore.disk.Segment;
import org.apache.tubemq.server.broker.msgstore.mem.GetCacheMsgResult;
import org.apache.tubemq.server.broker.msgstore.mem.MsgMemStatisInfo;
import org.apache.tubemq.server.broker.msgstore.mem.MsgMemStore;
-import org.apache.tubemq.server.broker.msgstore.ssd.SSDSegFound;
import org.apache.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
import org.apache.tubemq.server.broker.stats.CountItem;
import org.apache.tubemq.server.broker.utils.DataStoreUtils;
@@ -53,7 +52,7 @@ import org.slf4j.LoggerFactory;
/***
* Topic's message storage. It's a logical topic storage. Contains multi types storage: data in memory,
- * data in disk, data in ssd, and statistics of produce and consume.
+ * data in disk, and statistics of produce and consume.
*/
public class MessageStore implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(MessageStore.class);
@@ -261,36 +260,27 @@ public class MessageStore implements Closeable {
indexRecordView.read(indexBuffer, reqNewOffset);
indexBuffer.flip();
indexRecordView.relViewRef();
- // judge whether read from ssd or disk.
- if (consumerNodeInfo.processFromSsdFile()) {
- return msgStoreMgr.getSsdMessage(storeKey, consumerNodeInfo.getPartStr(),
- consumerNodeInfo.getStartSsdDataOffset(),
- consumerNodeInfo.getLastDataRdOffset(),
- partitionId, reqNewOffset, indexBuffer,
- msgSizeLimit, statisKeyBase);
- } else {
- if ((msgFileStore.getDataHighMaxOffset() - consumerNodeInfo.getLastDataRdOffset()
- >= this.tubeConfig.getDoubleDefaultDeduceReadSize())
- && msgSizeLimit > this.maxAllowRdSize) {
- msgSizeLimit = this.maxAllowRdSize;
- }
- GetMessageResult retResult =
- msgFileStore.getMessages(partitionId,
- consumerNodeInfo.getLastDataRdOffset(), reqNewOffset,
- indexBuffer, consumerNodeInfo.isFilterConsume(),
- consumerNodeInfo.getFilterCondCodeSet(),
- statisKeyBase, msgSizeLimit);
- if (consumerNodeInfo.isFilterConsume()
- && retResult.isSuccess
- && retResult.getLastReadOffset() > 0) {
- if ((msgFileStore.getIndexMaxHighOffset()
- - reqNewOffset - retResult.getLastReadOffset())
- < fileLowReqMaxFilterIndexReadSize.get()) {
- retResult.setSlowFreq(true);
- }
+ if ((msgFileStore.getDataHighMaxOffset() - consumerNodeInfo.getLastDataRdOffset()
+ >= this.tubeConfig.getDoubleDefaultDeduceReadSize())
+ && msgSizeLimit > this.maxAllowRdSize) {
+ msgSizeLimit = this.maxAllowRdSize;
+ }
+ GetMessageResult retResult =
+ msgFileStore.getMessages(partitionId,
+ consumerNodeInfo.getLastDataRdOffset(), reqNewOffset,
+ indexBuffer, consumerNodeInfo.isFilterConsume(),
+ consumerNodeInfo.getFilterCondCodeSet(),
+ statisKeyBase, msgSizeLimit);
+ if (consumerNodeInfo.isFilterConsume()
+ && retResult.isSuccess
+ && retResult.getLastReadOffset() > 0) {
+ if ((msgFileStore.getIndexMaxHighOffset()
+ - reqNewOffset - retResult.getLastReadOffset())
+ < fileLowReqMaxFilterIndexReadSize.get()) {
+ retResult.setSlowFreq(true);
}
- return retResult;
}
+ return retResult;
}
/***
@@ -544,11 +534,6 @@ public class MessageStore implements Closeable {
return totalSize;
}
- public SSDSegFound getSourceSegment(final long offset,
- final int rate) throws IOException {
- return this.msgFileStore.getSourceSegment(offset, rate);
- }
-
public long getDataStoreSize() {
long totalSize = 0L;
this.writeCacheMutex.readLock().lock();
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java
index 344c9b6..6275f3a 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/MessageStoreManager.java
@@ -21,7 +21,6 @@ import java.beans.PropertyChangeEvent;
import java.beans.PropertyChangeListener;
import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -50,8 +49,6 @@ import org.apache.tubemq.server.broker.exception.StartupException;
import org.apache.tubemq.server.broker.metadata.MetadataManager;
import org.apache.tubemq.server.broker.metadata.TopicMetadata;
import org.apache.tubemq.server.broker.msgstore.disk.GetMessageResult;
-import org.apache.tubemq.server.broker.msgstore.ssd.MsgSSDStoreManager;
-import org.apache.tubemq.server.broker.msgstore.ssd.SSDSegFound;
import org.apache.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
import org.apache.tubemq.server.broker.utils.DataStoreUtils;
import org.apache.tubemq.server.common.TStatusConstants;
@@ -71,8 +68,6 @@ public class MessageStoreManager implements StoreService {
private final ConcurrentHashMap<String/* topic */,
ConcurrentHashMap<Integer/* storeId */, MessageStore>> dataStores =
new ConcurrentHashMap<>();
- // ssd store manager
- private final MsgSSDStoreManager msgSsdStoreManager;
// store service status
private final AtomicBoolean stopped = new AtomicBoolean(false);
// data expire operation scheduler.
@@ -97,7 +92,6 @@ public class MessageStoreManager implements StoreService {
this.maxMsgTransferSize =
tubeConfig.getTransferSize() > DataStoreUtils.MAX_MSG_TRANSFER_SIZE
? DataStoreUtils.MAX_MSG_TRANSFER_SIZE : tubeConfig.getTransferSize();
- this.msgSsdStoreManager = new MsgSSDStoreManager(this, this.tubeConfig);
this.metadataManager.addPropertyChangeListener("topicConfigMap", new PropertyChangeListener() {
@Override
public void propertyChange(final PropertyChangeEvent evt) {
@@ -136,7 +130,6 @@ public class MessageStoreManager implements StoreService {
@Override
public void start() {
try {
- this.msgSsdStoreManager.loadSSDStores();
this.loadMessageStores(this.tubeConfig);
} catch (final IOException e) {
logger.error("[Store Manager] load message stores failed", e);
@@ -171,7 +164,6 @@ public class MessageStoreManager implements StoreService {
this.logClearScheduler.shutdownNow();
this.unFlushDiskScheduler.shutdownNow();
this.unFlushMemScheduler.shutdownNow();
- this.msgSsdStoreManager.close();
for (Map.Entry<String, ConcurrentHashMap<Integer, MessageStore>> entry :
this.dataStores.entrySet()) {
if (entry.getValue() != null) {
@@ -345,52 +337,6 @@ public class MessageStoreManager implements StoreService {
return this.tubeBroker;
}
- public boolean isSsdServiceStart() {
- return this.msgSsdStoreManager.isSsdServiceInUse();
- }
-
- public boolean putSsdTransferReq(final String partStr,
- final String storeKey, final long startOffset) {
- if (this.msgSsdStoreManager.isSsdServiceInUse()) {
- return this.msgSsdStoreManager.requestSsdTransfer(partStr, storeKey, startOffset, 1);
- }
- return false;
- }
-
- /***
- * Get messages from ssd.
- *
- * @param storeKey
- * @param partStr
- * @param ssdStartDataOffset
- * @param lastRDOffset
- * @param partitionId
- * @param reqOffset
- * @param indexBuffer
- * @param msgDataSizeLimit
- * @param statisKeyBase
- * @return
- * @throws IOException
- */
- public GetMessageResult getSsdMessage(final String storeKey,
- final String partStr,
- final long ssdStartDataOffset,
- final long lastRDOffset,
- final int partitionId,
- final long reqOffset,
- final ByteBuffer indexBuffer,
- int msgDataSizeLimit,
- final String statisKeyBase) throws IOException {
- if (this.msgSsdStoreManager.isSsdServiceInUse()) {
- return msgSsdStoreManager.getMessages(storeKey, partStr,
- ssdStartDataOffset, lastRDOffset,
- partitionId, reqOffset, indexBuffer,
- msgDataSizeLimit, statisKeyBase);
- }
- return new GetMessageResult(false, TErrCodeConstants.INTERNAL_SERVER_ERROR,
- reqOffset, 0, "SSD StoreService not in use!");
- }
-
/***
* Get message from store.
*
@@ -427,19 +373,6 @@ public class MessageStoreManager implements StoreService {
}
}
- public SSDSegFound getSourceSegment(final String topic, final int storeId,
- final long offset, final int rate) throws IOException {
- ConcurrentHashMap<Integer, MessageStore> map = this.dataStores.get(topic);
- if (map == null) {
- return new SSDSegFound(false, 1, null);
- }
- MessageStore messageStore = map.get(storeId);
- if (messageStore == null) {
- return new SSDSegFound(false, 2, null);
- }
- return messageStore.getSourceSegment(offset, rate);
- }
-
public MetadataManager getMetadataManager() {
return tubeBroker.getMetadataManager();
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
index cf8efb6..221fcc6 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/disk/MsgFileStore.java
@@ -36,7 +36,6 @@ import org.apache.tubemq.corebase.protobuf.generated.ClientBroker;
import org.apache.tubemq.corebase.utils.ServiceStatusHolder;
import org.apache.tubemq.server.broker.BrokerConfig;
import org.apache.tubemq.server.broker.msgstore.MessageStore;
-import org.apache.tubemq.server.broker.msgstore.ssd.SSDSegFound;
import org.apache.tubemq.server.broker.stats.CountItem;
import org.apache.tubemq.server.broker.utils.DataStoreUtils;
import org.apache.tubemq.server.broker.utils.DiskSamplePrint;
@@ -472,29 +471,6 @@ public class MsgFileStore implements Closeable {
return this.indexSegments.getMinOffset();
}
- /***
- * Read from ssd file.
- *
- * @param offset
- * @param rate
- * @return
- * @throws IOException
- */
- public SSDSegFound getSourceSegment(final long offset, final int rate) throws IOException {
-
- final Segment segment = this.dataSegments.findSegment(offset);
- if (segment == null) {
- return new SSDSegFound(false, -1, null);
- }
- long dataSize = segment.getCachedSize();
- if ((dataSize - offset + segment.getStart()) < dataSize * rate / 100) {
- return new SSDSegFound(false, -2, null,
- segment.getStart(), segment.getStart() + segment.getCachedSize());
- }
- return new SSDSegFound(true, 0, segment.getFile(),
- segment.getStart(), segment.getStart() + segment.getCachedSize());
- }
-
public Segment indexSlice(final long offset, final int maxSize) throws IOException {
return indexSegments.getRecordSeg(offset);
}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/MsgSSDSegment.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/MsgSSDSegment.java
deleted file mode 100644
index c3adb5c..0000000
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/MsgSSDSegment.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tubemq.server.broker.msgstore.ssd;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.tubemq.corebase.TErrCodeConstants;
-import org.apache.tubemq.corebase.protobuf.generated.ClientBroker;
-import org.apache.tubemq.server.broker.msgstore.disk.FileSegment;
-import org.apache.tubemq.server.broker.msgstore.disk.GetMessageResult;
-import org.apache.tubemq.server.broker.msgstore.disk.Segment;
-import org.apache.tubemq.server.broker.msgstore.disk.SegmentType;
-import org.apache.tubemq.server.broker.stats.CountItem;
-import org.apache.tubemq.server.broker.utils.DataStoreUtils;
-import org.apache.tubemq.server.common.TServerConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/***
- * Messages in ssd format, it same to messages in disk.
- */
-public class MsgSSDSegment implements Closeable {
- private static final Logger logger = LoggerFactory.getLogger(MsgSSDSegment.class);
- private final Segment dataSegment;
- private final ConcurrentHashMap<String, SSDVisitInfo> visitMap =
- new ConcurrentHashMap<>();
- private final String topic;
- private final String storeKey;
- private AtomicLong lastReadTime = new AtomicLong(System.currentTimeMillis());
- private AtomicInteger curStatus = new AtomicInteger(0); // 0:无效 1:有效; 2:老化中
- private AtomicLong expiredWait = new AtomicLong(System.currentTimeMillis());
-
-
- public MsgSSDSegment(final String storeKey, final String topic,
- final long start, final File file) throws IOException {
- this.topic = topic;
- this.storeKey = storeKey;
- this.dataSegment = new FileSegment(start, file, false, SegmentType.DATA);
- this.curStatus.set(1);
- this.lastReadTime.set(System.currentTimeMillis());
- }
-
- /***
- * Get messages from ssd.
- *
- * @param partStr
- * @param partitionId
- * @param reqOffset
- * @param lastRDOffset
- * @param indexBuffer
- * @param isFilterConsume
- * @param filterKeys
- * @param statisKeyBase
- * @param maxMsgTransferSize
- * @param strBuffer
- * @return
- */
- public GetMessageResult getMessages(final String partStr, final int partitionId,
- final long reqOffset, final long lastRDOffset,
- final ByteBuffer indexBuffer,
- final boolean isFilterConsume, final List<Integer> filterKeys,
- final String statisKeyBase, final int maxMsgTransferSize,
- final StringBuilder strBuffer) {
- // #lizard forgives
- int retCode = 0;
- int totalSize = 0;
- String errInfo = "Ok";
- boolean result = true;
- int dataRealLimit = 0;
- int curIndexPartitionId = 0;
- long curIndexDataOffset = 0L;
- int curIndexDataSize = 0;
- int curIndexKeyCode = 0;
- int readedOffset = 0;
- long recvTimeInMillsec = 0L;
- long maxDataLimitOffset = 0L;
- long lastRdDataOffset = -1L;
- final long curDataMaxOffset = getDataMaxOffset();
- final long curDataMinOffset = getDataMinOffset();
- HashMap<String, CountItem> countMap = new HashMap<>();
- ByteBuffer dataBuffer =
- ByteBuffer.allocate(TServerConstants.CFG_STORE_DEFAULT_MSG_READ_UNIT);
- List<ClientBroker.TransferedMessage> transferedMessageList =
- new ArrayList<>();
- if (this.dataSegment == null) {
- logger.error(strBuffer.append("[SSD Store] Found SSD fileRecordView is null! storeKey=")
- .append(this.storeKey).toString());
- strBuffer.delete(0, strBuffer.length());
- return new GetMessageResult(false, TErrCodeConstants.INTERNAL_SERVER_ERROR_MSGSET_NULL,
- reqOffset, 0, "SSD fileRecordView is null!");
- }
- SSDVisitInfo ssdVisitInfo = visitMap.get(partStr);
- if (ssdVisitInfo == null) {
- SSDVisitInfo ssdVisitInfo1 = new SSDVisitInfo(partStr, lastRDOffset);
- ssdVisitInfo =
- visitMap.putIfAbsent(partStr, ssdVisitInfo1);
- if (ssdVisitInfo == null) {
- ssdVisitInfo = ssdVisitInfo1;
- }
- }
- lastReadTime.set(System.currentTimeMillis());
- ssdVisitInfo.requestVisit(lastRDOffset);
- // read data by index.
- for (int curIndexOffset = 0;
- curIndexOffset < indexBuffer.remaining();
- curIndexOffset += DataStoreUtils.STORE_INDEX_HEAD_LEN) {
- curIndexPartitionId = indexBuffer.getInt();
- curIndexDataOffset = indexBuffer.getLong();
- curIndexDataSize = indexBuffer.getInt();
- curIndexKeyCode = indexBuffer.getInt();
- recvTimeInMillsec = indexBuffer.getLong();
- maxDataLimitOffset = curIndexDataOffset + curIndexDataSize;
- if (curIndexDataOffset < 0
- || curIndexPartitionId < 0
- || curIndexDataSize <= 0
- || curIndexDataSize > DataStoreUtils.STORE_MAX_MESSAGE_STORE_LEN) {
- readedOffset = curIndexOffset + DataStoreUtils.STORE_INDEX_HEAD_LEN;
- continue;
- }
- if (curIndexDataOffset < curDataMinOffset
- || curIndexDataOffset >= curDataMaxOffset) {
- lastRdDataOffset = curIndexDataOffset;
- break;
- }
- if (curIndexPartitionId != partitionId
- || maxDataLimitOffset > curDataMaxOffset
- || (isFilterConsume && !filterKeys.contains(curIndexKeyCode))) {
- lastRdDataOffset = maxDataLimitOffset;
- readedOffset = curIndexOffset + DataStoreUtils.STORE_INDEX_HEAD_LEN;
- continue;
- }
- try {
- if (dataBuffer.capacity() < curIndexDataSize) {
- dataBuffer = ByteBuffer.allocate(curIndexDataSize);
- }
- dataBuffer.clear();
- dataBuffer.limit(curIndexDataSize);
- dataSegment.read(dataBuffer, curIndexDataOffset);
- dataBuffer.flip();
- dataRealLimit = dataBuffer.limit();
- if (dataRealLimit < curIndexDataSize) {
- lastRdDataOffset = curIndexDataOffset;
- readedOffset = curIndexOffset + DataStoreUtils.STORE_INDEX_HEAD_LEN;
- continue;
- }
- } catch (Throwable e2) {
- strBuffer.delete(0, strBuffer.length());
- logger.warn(strBuffer
- .append("[SSD Store] Get message from file failure,storeKey=")
- .append(this.storeKey).append(", partitionId=")
- .append(partitionId).toString(), e2);
- retCode = TErrCodeConstants.INTERNAL_SERVER_ERROR;
- strBuffer.delete(0, strBuffer.length());
- errInfo = strBuffer.append("Get message from file failure : ")
- .append(e2.getCause()).toString();
- strBuffer.delete(0, strBuffer.length());
- result = false;
- break;
- }
- lastRdDataOffset = maxDataLimitOffset;
- readedOffset = curIndexOffset + DataStoreUtils.STORE_INDEX_HEAD_LEN;
- ClientBroker.TransferedMessage transferedMessage =
- DataStoreUtils.getTransferMsg(dataBuffer,
- curIndexDataSize, countMap, statisKeyBase, strBuffer);
- if (transferedMessage == null) {
- continue;
- }
- transferedMessageList.add(transferedMessage);
- totalSize += curIndexDataSize;
- if (totalSize >= maxMsgTransferSize) {
- break;
- }
- }
- if (retCode != 0) {
- if (!transferedMessageList.isEmpty()) {
- retCode = 0;
- errInfo = "Ok";
- }
- }
- lastReadTime.set(System.currentTimeMillis());
- ssdVisitInfo.responseVisit(lastRdDataOffset);
- if (lastRdDataOffset <= 0L) {
- lastRdDataOffset = lastRDOffset;
- }
- return new GetMessageResult(result, retCode, errInfo,
- reqOffset, readedOffset, lastRdDataOffset,
- totalSize, countMap, transferedMessageList, true);
- }
-
- public void setInUse() {
- this.curStatus.set(1);
- this.lastReadTime.set(System.currentTimeMillis());
- this.expiredWait.set(System.currentTimeMillis());
- }
-
- public Segment getDataSegment() {
- return dataSegment;
- }
-
- public ConcurrentHashMap<String, SSDVisitInfo> getVisitMap() {
- return visitMap;
- }
-
- @Override
- public void close() throws IOException {
- this.curStatus.set(0);
- try {
- if (this.dataSegment != null) {
- this.dataSegment.close();
- }
- } catch (Throwable e1) {
- logger.error(new StringBuilder(512).append("[SSD Store] Close ")
- .append(this.dataSegment.getFile().getAbsolutePath())
- .append("'s file failure").toString(), e1);
- }
- }
-
- public long getDataMaxOffset() {
- return dataSegment.getLast();
- }
-
- public long getDataMinOffset() {
- return this.dataSegment.getStart();
- }
-
- public int getOffsetPos(long dataOffset) {
- if (dataOffset < dataSegment.getStart()) {
- return -1;
- } else if (dataOffset >= dataSegment.getStart() + dataSegment.getCachedSize()) {
- return 1;
- }
- return 0;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof MsgSSDSegment)) {
- return false;
- }
- MsgSSDSegment that = (MsgSSDSegment) o;
- if (expiredWait.get() != that.expiredWait.get()) {
- return false;
- }
- if (!curStatus.equals(that.curStatus)) {
- return false;
- }
- return storeKey.equals(that.storeKey);
-
- }
-
- @Override
- public int hashCode() {
- int result = curStatus.hashCode();
- result = 31 * result + storeKey.hashCode();
- result = 31 * result + (int) (expiredWait.get() ^ (expiredWait.get() >>> 32));
- return result;
- }
-
- public void closeConsumeRel(final String partStr) {
- this.visitMap.remove(partStr);
- }
-
- public String getStoreKey() {
- return this.storeKey;
- }
-
- public long getDataSizeInBytes() {
- return dataSegment.getCachedSize();
- }
-
- public boolean isSegmentValid() {
- long curTime = System.currentTimeMillis();
- return !(this.curStatus.get() > 1
- || (this.curStatus.get() > 0
- && curTime - this.lastReadTime.get() >= 15 * 1000));
- }
-
- public int getCurStatus() {
- return curStatus.get();
- }
-
- public boolean setAndGetSegmentExpired() {
- long curTime = System.currentTimeMillis();
- if (this.curStatus.get() > 0
- && this.visitMap.isEmpty()
- && (curTime - this.lastReadTime.get() >= 15 * 1000)) {
- if (this.curStatus.get() == 1) {
- this.curStatus.set(2);
- this.expiredWait.set(curTime);
- return false;
- } else if (curTime - this.expiredWait.get() >= 15 * 1000) {
- return true;
- }
- return false;
- }
- return false;
- }
-
-}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/MsgSSDStoreManager.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/MsgSSDStoreManager.java
deleted file mode 100644
index 0be8806..0000000
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/MsgSSDStoreManager.java
+++ /dev/null
@@ -1,779 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tubemq.server.broker.msgstore.ssd;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.tubemq.corebase.TErrCodeConstants;
-import org.apache.tubemq.corebase.utils.ConcurrentHashSet;
-import org.apache.tubemq.corebase.utils.TStringUtils;
-import org.apache.tubemq.corebase.utils.ThreadUtils;
-import org.apache.tubemq.server.broker.BrokerConfig;
-import org.apache.tubemq.server.broker.exception.StartupException;
-import org.apache.tubemq.server.broker.metadata.MetadataManager;
-import org.apache.tubemq.server.broker.metadata.TopicMetadata;
-import org.apache.tubemq.server.broker.msgstore.MessageStoreManager;
-import org.apache.tubemq.server.broker.msgstore.disk.GetMessageResult;
-import org.apache.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
-import org.apache.tubemq.server.broker.utils.DataStoreUtils;
-import org.apache.tubemq.server.common.utils.FileUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/***
- * Message on ssd storage management. It mainly store message on ssd, and copy to disk.
- * Query messages in copied file, copy files if has not copied.
- * Expired files will be delete when exceed storage quota.
- */
-public class MsgSSDStoreManager implements Closeable {
- private static final Logger logger = LoggerFactory.getLogger(MsgSSDStoreManager.class);
- // file suffix
- private static final String DATA_FILE_SUFFIX = ".tube";
- // tube config
- private final BrokerConfig tubeConfig;
- private final String primStorePath;
- private final String secdStorePath;
- // ssd data directory
- private final File ssdBaseDataDir;
- private final BlockingQueue<SSDSegEvent> reqSSDEvents
- = new ArrayBlockingQueue<>(60);
- private final ExecutorService statusCheckExecutor = Executors.newSingleThreadExecutor();
- private final ExecutorService reqExecutor = Executors.newSingleThreadExecutor();
- // total ssd files size
- private final AtomicLong totalSsdFileSize = new AtomicLong(0L);
- // total ssd file count
- private final AtomicInteger totalSsdFileCnt = new AtomicInteger(0);
- private final MessageStoreManager msgStoreMgr;
- private volatile boolean closed = false;
- private volatile boolean isStart = true;
- private AtomicInteger firstChecked = new AtomicInteger(3);
- // ssd segments
- private ConcurrentHashMap<String, ConcurrentHashMap<Long, MsgSSDSegment>> ssdSegmentsMap =
- new ConcurrentHashMap<>(60);
- private ConcurrentHashMap<String, ConcurrentHashSet<SSDSegIndex>> ssdPartStrMap =
- new ConcurrentHashMap<>(60);
- private long startTime = System.currentTimeMillis();
- private long lastCheckTime = System.currentTimeMillis();
-
-
- public MsgSSDStoreManager(final MessageStoreManager msgStoreMgr,
- final BrokerConfig tubeConfig) throws IOException {
- this.tubeConfig = tubeConfig;
- this.msgStoreMgr = msgStoreMgr;
- this.primStorePath = this.tubeConfig.getPrimaryPath();
- this.secdStorePath = TStringUtils.isBlank(this.tubeConfig.getSecondDataPath())
- ? null : this.tubeConfig.getSecondDataPath();
- if (TStringUtils.isBlank(this.secdStorePath)
- || this.primStorePath.equals(this.secdStorePath)) {
- this.isStart = false;
- this.ssdBaseDataDir = null;
- logger.info("[SSD Manager] The tube not configure SSD directory, SSD-Store function not start!");
- return;
- }
- this.ssdBaseDataDir = new File(this.secdStorePath);
- FileUtil.checkDir(this.ssdBaseDataDir);
- startTime = System.currentTimeMillis();
- }
-
- /***
- * Load ssd stores.
- *
- * @throws IOException
- */
- public void loadSSDStores() throws IOException {
- try {
- this.loadDataDir(tubeConfig);
- this.statusCheckExecutor.execute(new Runnable() {
- @Override
- public void run() {
- final StringBuilder strBuffer = new StringBuilder(512);
- while (!closed) {
- try {
- flushCurrSegments(strBuffer);
- } catch (Throwable e) {
- logger.error("[SSD Manager] Error during SSD File status check", e);
- }
- ThreadUtils.sleep(30000);
- }
- logger.warn("[SSD Manager] SSD File scan thread is out!");
- }
- });
-
- this.reqExecutor.execute(new SsdStoreRunner());
- } catch (final IOException e) {
- logger.error("[SSD Manager] load SSD data files failed", e);
- throw new StartupException("Initialize SSD data files failed", e);
- } catch (Throwable e) {
- Thread.currentThread().interrupt();
- }
- }
-
- public boolean isSsdServiceInUse() {
- return (this.isStart && !this.closed);
- }
-
- /***
- * Request ssd transfer to disk.
- *
- * @param partStr
- * @param storeKey
- * @param startOffset
- * @param segCnt
- * @return
- */
- public boolean requestSsdTransfer(final String partStr, final String storeKey,
- final long startOffset, final int segCnt) {
- if (this.isStart && !this.closed) {
- try {
- this.reqSSDEvents.put(new SSDSegEvent(partStr, storeKey, startOffset, segCnt));
- return true;
- } catch (Throwable e1) {
- logger.warn("[SSD Store] request SSD Event failure : ", e1);
- }
- }
- return false;
- }
-
- /***
- * Get messages from ssd.
- *
- * @param storeKey
- * @param partStr
- * @param startDataOffset
- * @param lastRDOffset
- * @param partitionId
- * @param reqOffset
- * @param indexBuffer
- * @param msgDataSizeLimit
- * @param statisKeyBase
- * @return
- */
- public GetMessageResult getMessages(final String storeKey, final String partStr,
- final long startDataOffset, final long lastRDOffset,
- final int partitionId, final long reqOffset,
- final ByteBuffer indexBuffer, int msgDataSizeLimit,
- final String statisKeyBase) {
- final StringBuilder strBuffer = new StringBuilder(512);
- ConcurrentHashMap<Long, MsgSSDSegment> msgSSDSegmentMap =
- ssdSegmentsMap.get(storeKey);
- Map<String, ConsumerNodeInfo> consumerNodeInfoMap =
- msgStoreMgr.getTubeBroker().getBrokerServiceServer().getConsumerRegisterMap();
- ConsumerNodeInfo consumerNodeInfo = consumerNodeInfoMap.get(partStr);
- if (consumerNodeInfo == null) {
- return new GetMessageResult(false, TErrCodeConstants.INTERNAL_SERVER_ERROR,
- reqOffset, 0, "Consumer unregistered!");
- }
- if (msgSSDSegmentMap == null) {
- logger.warn(strBuffer.append("[SSD Store] Found consumer visit but file expired! storeKey=")
- .append(storeKey).toString());
- consumerNodeInfo.resetSSDProcSeg(false);
- return new GetMessageResult(false,
- TErrCodeConstants.INTERNAL_SERVER_ERROR, reqOffset, 0, "SSD file has expired!");
- }
- MsgSSDSegment msgSsdSegment =
- msgSSDSegmentMap.get(startDataOffset);
- if (msgSsdSegment == null) {
- consumerNodeInfo.resetSSDProcSeg(false);
- logger.warn(strBuffer
- .append("[SSD Store] Found consumer visit but file expired! storeKey=")
- .append(storeKey).append(",startDataOffset=")
- .append(startDataOffset).toString());
- return new GetMessageResult(false,
- TErrCodeConstants.INTERNAL_SERVER_ERROR, reqOffset, 0, "SSD file has expired!");
- }
- GetMessageResult getMessageResult =
- msgSsdSegment.getMessages(partStr, partitionId, reqOffset, lastRDOffset,
- indexBuffer, false, null, statisKeyBase, msgDataSizeLimit, strBuffer);
- if (getMessageResult.retCode == TErrCodeConstants.INTERNAL_SERVER_ERROR_MSGSET_NULL) {
- consumerNodeInfo.resetSSDProcSeg(false);
- try {
- msgSsdSegment.close();
- msgSSDSegmentMap.remove(startDataOffset);
- logger.warn(strBuffer
- .append("[SSD Store] Found SSD file's fileRecordView is null release! storeKey=")
- .append(storeKey).append(",startDataOffset=")
- .append(startDataOffset).toString());
- } catch (Throwable ee) {
- strBuffer.delete(0, strBuffer.length());
- logger.warn(strBuffer.append("[SSD Store] release SSD FileSegment failure, storeKey=")
- .append(storeKey).append(",startDataOffset=")
- .append(startDataOffset).toString(), ee);
- }
- }
- if (consumerNodeInfo.getLastDataRdOffset() >= msgSsdSegment.getDataMaxOffset()) {
- msgSsdSegment.closeConsumeRel(partStr);
- consumerNodeInfo.resetSSDProcSeg(false);
- }
- return getMessageResult;
- }
-
- @Override
- public void close() {
- this.closed = true;
- try {
- this.reqExecutor.shutdown();
- this.statusCheckExecutor.shutdown();
- for (Map<Long, MsgSSDSegment> msgSSDSegmentMap : ssdSegmentsMap.values()) {
- if (msgSSDSegmentMap != null) {
- for (MsgSSDSegment msgSsdSegment : msgSSDSegmentMap.values()) {
- if (msgSsdSegment != null) {
- msgSsdSegment.close();
- }
- }
- }
- }
- } catch (Throwable e1) {
- logger.warn("[SSD Store] Close SSD Store manager failure", e1);
- }
- }
-
- private boolean flushCurrSegments(final StringBuilder sb) throws IOException {
- int totalCnt = 0;
- long currTime = System.currentTimeMillis();
- if ((firstChecked.get() > 0 && currTime - startTime <= 2 * 60 * 1000)
- || (firstChecked.get() == 0
- && totalSsdFileCnt.get() < (int) (tubeConfig.getMaxSSDTotalFileCnt() * 0.7)
- && totalSsdFileSize.get() < (long) (tubeConfig.getMaxSSDTotalFileSizes() * 0.7))) {
- return true;
- }
- Set<SSDSegIndex> msgSsdIndexSet = new HashSet<>();
- for (Map.Entry<String, ConcurrentHashMap<Long, MsgSSDSegment>> entry : ssdSegmentsMap.entrySet()) {
- if (entry.getKey() == null || entry.getValue() == null) {
- continue;
- }
- ConcurrentHashMap<Long, MsgSSDSegment> msgSegmentMap = entry.getValue();
- for (Map.Entry<Long, MsgSSDSegment> subEntry : msgSegmentMap.entrySet()) {
- if (subEntry.getValue() != null && !subEntry.getValue().isSegmentValid()) {
- msgSsdIndexSet.add(new SSDSegIndex(entry.getKey(),
- subEntry.getValue().getDataSegment().getStart(),
- subEntry.getValue().getDataMaxOffset()));
- }
- }
- }
- if (msgSsdIndexSet.isEmpty()) {
- if (firstChecked.get() > 0) {
- firstChecked.set(0);
- }
- return true;
- }
- for (SSDSegIndex ssdSegIndex : msgSsdIndexSet) {
- ConcurrentHashMap<Long, MsgSSDSegment> tmpMsgSSdSegmentMap =
- ssdSegmentsMap.get(ssdSegIndex.storeKey);
- if (tmpMsgSSdSegmentMap == null) {
- continue;
- }
- MsgSSDSegment msgSsdSegment =
- tmpMsgSSdSegmentMap.get(ssdSegIndex.startOffset);
- if (msgSsdSegment == null) {
- continue;
- }
- if (msgSsdSegment.setAndGetSegmentExpired()) {
- inProcessExpiredFile(msgSsdSegment, ssdSegIndex, tmpMsgSSdSegmentMap, sb);
- } else {
- inProcessInUseFile(msgSsdSegment);
- }
- }
- if (firstChecked.get() > 0) {
- totalCnt = firstChecked.get();
- firstChecked.compareAndSet(totalCnt, totalCnt - 1);
- }
- return true;
- }
-
- private void inProcessInUseFile(MsgSSDSegment msgSsdSegment) {
- if (msgSsdSegment.getCurStatus() != 1) {
- return;
- }
- try {
- Map<String, ConsumerNodeInfo> consumerNodeInfoMap =
- msgStoreMgr.getTubeBroker().getBrokerServiceServer().getConsumerRegisterMap();
- ConcurrentHashMap<String, SSDVisitInfo> ssdVisitInfoMap =
- msgSsdSegment.getVisitMap();
- if (ssdVisitInfoMap != null) {
- List<String> rmvPartStrList = new ArrayList<>();
- for (String partStr : ssdVisitInfoMap.keySet()) {
- ConsumerNodeInfo consumerNodeInfo =
- consumerNodeInfoMap.get(partStr);
- if (consumerNodeInfo == null) {
- rmvPartStrList.add(partStr);
- continue;
- }
- if (msgSsdSegment
- .getOffsetPos(consumerNodeInfo.getLastDataRdOffset()) > 0) {
- consumerNodeInfo.resetSSDProcSeg(true);
- rmvPartStrList.add(partStr);
- }
- }
- if (!rmvPartStrList.isEmpty()) {
- for (String partStr : rmvPartStrList) {
- msgSsdSegment.closeConsumeRel(partStr);
- }
- }
- }
- } catch (Throwable e2) {
- logger.warn("[SSD Manager] Check consumerInfo and SSD file relation failure", e2);
- }
- }
-
- /***
- * Delete expired files.
- *
- * @param msgSsdSegment
- * @param ssdSegIndex
- * @param msgSsdSegmentMap
- * @param strBuffer
- */
- private void inProcessExpiredFile(MsgSSDSegment msgSsdSegment, SSDSegIndex ssdSegIndex,
- Map<Long, MsgSSDSegment> msgSsdSegmentMap, StringBuilder strBuffer) {
- try {
- File file = msgSsdSegment.getDataSegment().getFile();
- logger.info(strBuffer.append("[SSD Manager] delete ssd segment : ")
- .append(file.getAbsolutePath()).toString());
- strBuffer.delete(0, strBuffer.length());
- final long msgSize = msgSsdSegment.getDataSegment().getCachedSize();
- for (Map.Entry<String, ConcurrentHashSet<SSDSegIndex>> entry : ssdPartStrMap.entrySet()) {
- if (entry.getValue() != null) {
- entry.getValue().remove(ssdSegIndex);
- }
- }
- msgSsdSegmentMap.remove(ssdSegIndex.startOffset);
- msgSsdSegment.close();
- file.delete();
- int exptCnt = totalSsdFileCnt.get();
- while (!totalSsdFileCnt.compareAndSet(exptCnt, exptCnt - 1)) {
- exptCnt = totalSsdFileCnt.get();
- }
- long exptSize = totalSsdFileSize.get();
- while (!totalSsdFileSize.compareAndSet(exptSize, exptSize - msgSize)) {
- exptSize = totalSsdFileSize.get();
- }
- logger.info(strBuffer.append("[SSD Manager] rmv SSD FileSegment finished, totalSsdFileCnt=")
- .append(totalSsdFileCnt.get()).append(",totalSsdFileSize=")
- .append(totalSsdFileSize.get()).toString());
- strBuffer.delete(0, strBuffer.length());
- } catch (Throwable e1) {
- logger.warn("[SSD Manager] Delete SSD expired file failure", e1);
- }
- }
-
- /***
- * Load data from directory.
- *
- * @param tubeConfig
- * @throws IOException
- */
- private void loadDataDir(final BrokerConfig tubeConfig) throws IOException {
- StringBuilder strBuffer = new StringBuilder(512);
- final long startTime = System.currentTimeMillis();
- logger.info(strBuffer.append("[SSD Manager] Begin to scan data path:")
- .append(this.ssdBaseDataDir.getAbsolutePath()).toString());
- strBuffer.delete(0, strBuffer.length());
- MetadataManager metadataManager = msgStoreMgr.getMetadataManager();
- final File[] ls = this.ssdBaseDataDir.listFiles();
- if (ls != null) {
- for (final File subDir : ls) {
- if (subDir == null) {
- continue;
- }
- if (!subDir.isDirectory()) {
- logger.warn(strBuffer.append("[SSD Manager] Ignore not directory path:")
- .append(subDir.getAbsolutePath()).toString());
- strBuffer.delete(0, strBuffer.length());
- } else {
- final String name = subDir.getName();
- final int index = name.lastIndexOf('-');
- if (index < 0) {
- logger.warn(strBuffer.append("[SSD Manager] Ignore invalid directory:")
- .append(subDir.getAbsolutePath()).toString());
- strBuffer.delete(0, strBuffer.length());
- continue;
- }
- final String topic = name.substring(0, index);
- TopicMetadata topicMetadata = metadataManager.getTopicMetadata(topic);
- if (topicMetadata == null) {
- logger.warn(strBuffer
- .append("[SSD Manager] No valid topic config for topic data directories:")
- .append(topic).toString());
- strBuffer.delete(0, strBuffer.length());
- continue;
- }
- final int storeId = Integer.parseInt(name.substring(index + 1));
- final String storeKey =
- strBuffer.append(topic).append("-").append(storeId).toString();
- strBuffer.delete(0, strBuffer.length());
- loadSsdFile(storeKey, topic, strBuffer);
- }
- }
- }
- logger.info(strBuffer.append("[SSD Manager] End to scan SSD data path in ")
- .append((System.currentTimeMillis() - startTime) / 1000)
- .append(" secs").toString());
- }
-
- /***
- * Copy files on disk to ssd.
- *
- * @param storeKey
- * @param topic
- * @param partStr
- * @param fromFile
- * @param startOffset
- * @param endOffset
- * @param sb
- * @return
- */
- private boolean copyFileToSSD(final String storeKey, final String topic,
- final String partStr, File fromFile,
- final long startOffset, final long endOffset,
- final StringBuilder sb) {
- // #lizard forgives
- logger.info(sb.append("[SSD Manager] Begin to copy file to SSD, source data path:")
- .append(fromFile.getAbsolutePath()).toString());
- sb.delete(0, sb.length());
- if (!fromFile.exists()) {
- logger.warn(sb.append("[SSD Manager] source file not exists, path:")
- .append(fromFile.getAbsolutePath()).toString());
- sb.delete(0, sb.length());
- return false;
- }
- if (!fromFile.isFile()) {
- logger.warn(sb.append("[SSD Manager] source file not File, path:")
- .append(fromFile.getAbsolutePath()).toString());
- sb.delete(0, sb.length());
- return false;
- }
- if (!fromFile.canRead()) {
- logger.warn(sb.append("[SSD Manager] source file not canRead, path:")
- .append(fromFile.getAbsolutePath()).toString());
- sb.delete(0, sb.length());
- return false;
- }
- boolean isNew = false;
- final String filename = fromFile.getName();
- final long start =
- Long.parseLong(filename.substring(0, filename.length() - DATA_FILE_SUFFIX.length()));
- final File toFileDir =
- new File(sb.append(secdStorePath)
- .append(File.separator).append(storeKey).toString());
- sb.delete(0, sb.length());
- FileUtil.checkDir(toFileDir);
- ConcurrentHashMap<Long, MsgSSDSegment> msgSsdSegMap =
- ssdSegmentsMap.get(storeKey);
- if (msgSsdSegMap == null) {
- ConcurrentHashMap<Long, MsgSSDSegment> tmpMsgSsdSegMap =
- new ConcurrentHashMap<>();
- msgSsdSegMap = ssdSegmentsMap.putIfAbsent(storeKey, tmpMsgSsdSegMap);
- if (msgSsdSegMap == null) {
- msgSsdSegMap = tmpMsgSsdSegMap;
- }
- }
- MsgSSDSegment msgSsdSegment1 = msgSsdSegMap.get(start);
- if (msgSsdSegment1 == null) {
- boolean isSuccess = false;
- FileInputStream fosfrom = null;
- FileOutputStream fosto = null;
- final File targetFile =
- new File(toFileDir, DataStoreUtils.nameFromOffset(start, DATA_FILE_SUFFIX));
- try {
- fosfrom = new FileInputStream(fromFile);
- fosto = new FileOutputStream(targetFile);
- byte[] bt = new byte[65536];
- int c;
- while ((c = fosfrom.read(bt)) > 0) {
- fosto.write(bt, 0, c);
- }
- isSuccess = true;
- isNew = true;
- } catch (FileNotFoundException e) {
- logger.warn("[SSD Manager] copy File to SSD FileNotFoundException failure ", e);
- } catch (IOException e) {
- logger.warn("[SSD Manager] copy File to SSD IOException failure ", e);
- } finally {
- if (fosfrom != null) {
- try {
- fosfrom.close();
- } catch (Throwable e2) {
- logger.warn("[SSD Manager] Close FileInputStream failure! ", e2);
- }
- }
- if (fosto != null) {
- try {
- fosto.close();
- } catch (Throwable e2) {
- logger.warn("[SSD Manager] Close FileOutputStream failure! ", e2);
- }
- }
- }
- if (!isSuccess) {
- try {
- targetFile.delete();
- } catch (Throwable e2) {
- logger.warn("[SSD Manager] remove SSD target file failure ", e2);
- }
- return false;
- }
- try {
- MsgSSDSegment tmpMsgSsdSegment =
- new MsgSSDSegment(storeKey, topic, start, targetFile);
- if (tmpMsgSsdSegment.getDataMinOffset() != startOffset
- || tmpMsgSsdSegment.getDataMaxOffset() != endOffset) {
- logger.warn(sb
- .append("[SSD Manager] created SSD file getCachedSize not equal source: sourceFile=")
- .append(fromFile.getAbsolutePath()).append(",srcStartOffset=")
- .append(startOffset).append(",srcEndOffset=").append(endOffset)
- .append(",dstStartOffset=").append(tmpMsgSsdSegment.getDataMinOffset())
- .append(",dstEndOffset=").append(tmpMsgSsdSegment.getDataMaxOffset()).toString());
- sb.delete(0, sb.length());
- tmpMsgSsdSegment.close();
- return false;
- }
- msgSsdSegment1 = msgSsdSegMap.putIfAbsent(start, tmpMsgSsdSegment);
- if (msgSsdSegment1 == null) {
- msgSsdSegment1 = tmpMsgSsdSegment;
- this.totalSsdFileSize.addAndGet(tmpMsgSsdSegment.getDataSizeInBytes());
- this.totalSsdFileCnt.incrementAndGet();
- } else {
- tmpMsgSsdSegment.close();
- }
- } catch (Throwable e1) {
- logger.warn("[SSD Manager] create SSD FileSegment failure", e1);
- return false;
- }
- }
- ConcurrentHashSet<SSDSegIndex> ssdSegIndices = this.ssdPartStrMap.get(partStr);
- if (ssdSegIndices == null) {
- ConcurrentHashSet<SSDSegIndex> tmpSsdSegIndices = new ConcurrentHashSet<>();
- ssdSegIndices = this.ssdPartStrMap.putIfAbsent(partStr, tmpSsdSegIndices);
- if (ssdSegIndices == null) {
- ssdSegIndices = tmpSsdSegIndices;
- }
- }
- SSDSegIndex ssdSegIndex =
- new SSDSegIndex(storeKey, start, msgSsdSegment1.getDataMaxOffset());
- if (!ssdSegIndices.contains(ssdSegIndex)) {
- ssdSegIndices.add(ssdSegIndex);
- }
- logger.info(sb.append("[SSD Manager] copy file to SSD success, data segment ")
- .append(msgSsdSegment1.getStoreKey()).append(",start=").append(start)
- .append(", isNew=").append(isNew).toString());
- sb.delete(0, sb.length());
- return true;
- }
-
- private void checkSegmentTransferStatus(final String topicName, final SSDSegEvent ssdSegEvent,
- final SSDSegFound ssdSegFound, final ConsumerNodeInfo consumerNodeInfo,
- final StringBuilder strBuffer) {
- consumerNodeInfo.setSSDProcing();
- ConcurrentHashMap<Long, MsgSSDSegment> msgSegmentMap =
- ssdSegmentsMap.get(ssdSegEvent.storeKey);
- if (msgSegmentMap == null) {
- ConcurrentHashMap<Long, MsgSSDSegment> tmpMsgSsdSegMap =
- new ConcurrentHashMap<>();
- msgSegmentMap =
- ssdSegmentsMap.putIfAbsent(ssdSegEvent.storeKey, tmpMsgSsdSegMap);
- if (msgSegmentMap == null) {
- msgSegmentMap = tmpMsgSsdSegMap;
- }
- }
- MsgSSDSegment curMsgSsdSegment =
- msgSegmentMap.get(ssdSegFound.startOffset);
- if (curMsgSsdSegment != null) {
- curMsgSsdSegment.setInUse();
- ConcurrentHashSet<SSDSegIndex> ssdSegIndices =
- ssdPartStrMap.get(ssdSegEvent.partStr);
- if (ssdSegIndices == null) {
- ConcurrentHashSet<SSDSegIndex> tmpSsdSegIndices =
- new ConcurrentHashSet<>();
- ssdSegIndices =
- ssdPartStrMap.putIfAbsent(ssdSegEvent.partStr, tmpSsdSegIndices);
- if (ssdSegIndices == null) {
- ssdSegIndices = tmpSsdSegIndices;
- }
- }
- ssdSegIndices.add(new SSDSegIndex(ssdSegEvent.storeKey,
- ssdSegFound.startOffset, ssdSegFound.endOffset));
- consumerNodeInfo.setSSDTransferFinished(true,
- ssdSegFound.startOffset, ssdSegFound.endOffset);
- } else {
- if (totalSsdFileCnt.get() >= tubeConfig.getMaxSSDTotalFileCnt()
- || totalSsdFileSize.get() >= tubeConfig.getMaxSSDTotalFileSizes()) {
- consumerNodeInfo.setSSDTransferFinished(false, -2, -2);
- return;
- }
- boolean result = copyFileToSSD(ssdSegEvent.storeKey, topicName,
- ssdSegEvent.partStr, ssdSegFound.sourceFile,
- ssdSegFound.startOffset, ssdSegFound.endOffset, strBuffer);
- if (result) {
- consumerNodeInfo.setSSDTransferFinished(true,
- ssdSegFound.startOffset, ssdSegFound.endOffset);
- } else {
- consumerNodeInfo.setSSDTransferFinished(false,
- ssdSegFound.startOffset, ssdSegFound.endOffset);
- }
- }
- }
-
- private void loadSsdFile(final String storeKey, final String topicName,
- final StringBuilder strBuffer) throws IOException {
- final File storeKeydataDir =
- new File(strBuffer.append(this.secdStorePath)
- .append(File.separator).append(storeKey).toString());
- strBuffer.delete(0, strBuffer.length());
- final File[] lsData = storeKeydataDir.listFiles();
- if (lsData == null) {
- return;
- }
- for (final File fileData : lsData) {
- if (fileData != null
- && fileData.isFile()
- && fileData.toString().endsWith(DATA_FILE_SUFFIX)) {
- if (!fileData.canRead()) {
- throw new IOException(new StringBuilder(512)
- .append("[SSD Manager] Could not read data file ")
- .append(fileData).toString());
- }
- final String filename = fileData.getName();
- final long start =
- Long.parseLong(filename.substring(0,
- filename.length() - DATA_FILE_SUFFIX.length()));
- ConcurrentHashMap<Long, MsgSSDSegment> msgSsdSegMap =
- this.ssdSegmentsMap.get(storeKey);
- if (msgSsdSegMap == null) {
- ConcurrentHashMap<Long, MsgSSDSegment> tmpMsgSsdSegMap =
- new ConcurrentHashMap<>();
- msgSsdSegMap =
- ssdSegmentsMap.putIfAbsent(storeKey, tmpMsgSsdSegMap);
- if (msgSsdSegMap == null) {
- msgSsdSegMap = tmpMsgSsdSegMap;
- }
- }
- MsgSSDSegment msgSsdSegment1 = msgSsdSegMap.get(start);
- if (msgSsdSegment1 == null) {
- MsgSSDSegment tmpMsgSsdSegment =
- new MsgSSDSegment(storeKey, topicName, start, fileData);
- msgSsdSegment1 =
- msgSsdSegMap.putIfAbsent(start, tmpMsgSsdSegment);
- if (msgSsdSegment1 == null) {
- totalSsdFileSize
- .addAndGet(tmpMsgSsdSegment.getDataSizeInBytes());
- totalSsdFileCnt.incrementAndGet();
- } else {
- tmpMsgSsdSegment.close();
- }
- }
- logger.info(strBuffer.append("[SSD Manager] Loaded data segment ")
- .append(fileData.getAbsolutePath()).toString());
- strBuffer.delete(0, strBuffer.length());
- }
- }
- }
-
- private class SsdStoreRunner implements Runnable {
-
- public SsdStoreRunner() {
- //
- }
-
- @Override
- public void run() {
- final StringBuilder strBuffer = new StringBuilder(512);
- logger.info("[SSD Manager] start process SSD transfer requests");
- try {
- MetadataManager metadataManager = msgStoreMgr.getMetadataManager();
- while (!closed) {
- try {
- SSDSegEvent ssdSegEvent =
- reqSSDEvents.poll(500, TimeUnit.MILLISECONDS);
- if (ssdSegEvent == null) {
- ThreadUtils.sleep(1000);
- continue;
- }
- Map<String, ConsumerNodeInfo> consumerNodeInfoMap =
- msgStoreMgr.getTubeBroker().getBrokerServiceServer().getConsumerRegisterMap();
- ConsumerNodeInfo consumerNodeInfo =
- consumerNodeInfoMap.get(ssdSegEvent.partStr);
- if (consumerNodeInfo == null) {
- continue;
- }
- final int index = ssdSegEvent.storeKey.lastIndexOf('-');
- if (index < 0) {
- logger.warn(strBuffer.append("[SSD Manager] Ignore invalid storeKey=")
- .append(ssdSegEvent.storeKey).toString());
- strBuffer.delete(0, strBuffer.length());
- continue;
- }
- final String topic = ssdSegEvent.storeKey.substring(0, index);
- TopicMetadata topicMetadata =
- metadataManager.getTopicMetadata(topic);
- if (topicMetadata == null) {
- logger.warn(strBuffer
- .append("[SSD Manager] No valid topic config for storeKey=")
- .append(ssdSegEvent.storeKey).toString());
- strBuffer.delete(0, strBuffer.length());
- continue;
- }
- final int storeId =
- Integer.parseInt(ssdSegEvent.storeKey.substring(index + 1));
- SSDSegFound ssdSegFound =
- msgStoreMgr.getSourceSegment(topic, storeId, ssdSegEvent.startOffset, 15);
- if (!ssdSegFound.isSuccess) {
- if (ssdSegFound.reason > 0) {
- logger.warn(strBuffer
- .append("[SSD Manager] Found source store error, storeKey=")
- .append(ssdSegEvent.storeKey).append(", reason=")
- .append(ssdSegFound.reason).toString());
- strBuffer.delete(0, strBuffer.length());
- }
- consumerNodeInfo.setSSDTransferFinished(false,
- ssdSegFound.startOffset, ssdSegFound.endOffset);
- continue;
- }
- checkSegmentTransferStatus(topic, ssdSegEvent,
- ssdSegFound, consumerNodeInfo, strBuffer);
- } catch (InterruptedException e) {
- break;
- } catch (Throwable e2) {
- logger.error("[SSD Manager] process SSD transfer request failure", e2);
- }
- }
- logger.info("[SSD Manager] stopped process SSD transfer requests");
- } catch (Throwable e) {
- logger.error("[SSD Manager] Error during process SSD transfer requests", e);
- }
- }
- }
-
-}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/SSDSegEvent.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/SSDSegEvent.java
deleted file mode 100644
index 1045d1e..0000000
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/SSDSegEvent.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tubemq.server.broker.msgstore.ssd;
-
-/***
- * Transfer files on disk to ssd event.
- */
-public class SSDSegEvent {
- public final String storeKey;
- public final String partStr;
- public final long startOffset;
- public final int segCnt;
-
-
- public SSDSegEvent(final String partStr, final String storeKey,
- final long startOffset, final int segCnt) {
- this.partStr = partStr;
- this.storeKey = storeKey;
- this.startOffset = startOffset;
- this.segCnt = segCnt;
- }
-
- @Override
- public String toString() {
- return new StringBuilder(512).append("{storeKey=").append(storeKey)
- .append(",partStr=").append(partStr).append(",startOffset=")
- .append(startOffset).append(",segCnt=").append(segCnt)
- .append("}").toString();
- }
-
-
-}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/SSDSegFound.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/SSDSegFound.java
deleted file mode 100644
index 0e6d61b..0000000
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/SSDSegFound.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tubemq.server.broker.msgstore.ssd;
-
-import java.io.File;
-
-/***
- * SSD segment found result.
- */
-public class SSDSegFound {
- public final File sourceFile;
- public final long startOffset;
- public final long endOffset;
- public boolean isSuccess;
- public int reason;
-
- public SSDSegFound(boolean isSuccess, int reason, final File sourceFile) {
- this.isSuccess = isSuccess;
- this.reason = reason;
- this.sourceFile = sourceFile;
- this.startOffset = -2;
- this.endOffset = -2;
- }
-
- public SSDSegFound(boolean isSuccess, int reason, final File sourceFile,
- final long startOffset, final long endOffset) {
- this.isSuccess = isSuccess;
- this.reason = reason;
- this.sourceFile = sourceFile;
- this.startOffset = startOffset;
- this.endOffset = endOffset;
- }
-
-}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/SSDSegIndex.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/SSDSegIndex.java
deleted file mode 100644
index a731596..0000000
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/SSDSegIndex.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tubemq.server.broker.msgstore.ssd;
-
-/**
- * SSD segment index.
- */
-public class SSDSegIndex {
- public final String storeKey;
- public final long startOffset;
- public final long endOffset;
-
- public SSDSegIndex(final String storeKey,
- final long startOffset,
- final long endOffset) {
- this.storeKey = storeKey;
- this.startOffset = startOffset;
- this.endOffset = endOffset;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof SSDSegIndex)) {
- return false;
- }
-
- SSDSegIndex that = (SSDSegIndex) o;
- if (startOffset != that.startOffset) {
- return false;
- }
- if (endOffset != that.endOffset) {
- return false;
- }
- return storeKey.equals(that.storeKey);
-
- }
-
- @Override
- public int hashCode() {
- int result = storeKey.hashCode();
- result = 31 * result + (int) (startOffset ^ (startOffset >>> 32));
- result = 31 * result + (int) (endOffset ^ (endOffset >>> 32));
- return result;
- }
-
-}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/SSDVisitInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/SSDVisitInfo.java
deleted file mode 100644
index 6d06e42..0000000
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/msgstore/ssd/SSDVisitInfo.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tubemq.server.broker.msgstore.ssd;
-
-/***
- * SSD visit record.
- */
-public class SSDVisitInfo {
- public final String partStr;
- public long lastOffset;
- public long lastTime;
-
- public SSDVisitInfo(final String partStr, final long lastOffset) {
- this.partStr = partStr;
- this.lastOffset = lastOffset;
- this.lastTime = System.currentTimeMillis();
- }
-
- public void requestVisit(final long lastOffset) {
- this.lastOffset = lastOffset;
- this.lastTime = System.currentTimeMillis();
- }
-
- public void responseVisit(final long lastOffset) {
- this.lastOffset = lastOffset;
- this.lastTime = System.currentTimeMillis();
- }
-
-}
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
index a66f3af..3b05b06 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
@@ -19,13 +19,10 @@ package org.apache.tubemq.server.broker.nodeinfo;
import java.util.HashSet;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.policies.FlowCtrlResult;
import org.apache.tubemq.corebase.policies.FlowCtrlRuleHandler;
-import org.apache.tubemq.corebase.policies.SSDCtrlResult;
import org.apache.tubemq.server.broker.msgstore.MessageStoreManager;
import org.apache.tubemq.server.common.TServerConstants;
@@ -59,19 +56,7 @@ public class ConsumerNodeInfo {
private long totalUnitMin = 0L;
private FlowCtrlResult curFlowCtrlVal =
new FlowCtrlResult(Long.MAX_VALUE, 0);
- private SSDCtrlResult curSsdDltLimit =
- new SSDCtrlResult(Long.MAX_VALUE, 0);
private long nextLimitUpdateTime = 0;
- private AtomicBoolean needSsdProc =
- new AtomicBoolean(false);
- private AtomicLong ssdTransId =
- new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
- // -2 : 未启用 0:已发起请求 1://已接收请求正在处理 2:已处理完可用 3:已处理不可用 4:已停止
- private AtomicInteger ssdProcStatus =
- new AtomicInteger(TBaseConstants.META_VALUE_UNDEFINED);
- private long lastOpTime = 0;
- private long startSsdDataOffset = -2;
- private long endSsdDataOffset = -2;
private AtomicInteger qryPriorityId =
new AtomicInteger(TBaseConstants.META_VALUE_UNDEFINED);
private long createTime = System.currentTimeMillis();
@@ -80,31 +65,15 @@ public class ConsumerNodeInfo {
public ConsumerNodeInfo(final MessageStoreManager storeManager,
final String consumerId, Set<String> filterCodes,
final String sessionKey, long sessionTime, final String partStr) {
- setConsumerId(consumerId);
- if (filterCodes != null) {
- for (String filterItem : filterCodes) {
- this.filterCondStrs.add(filterItem);
- this.filterCondCode.add(filterItem.hashCode());
- }
- }
- this.sessionKey = sessionKey;
- this.sessionTime = sessionTime;
- this.isSupportLimit = false;
- this.needSsdProc.set(false);
- this.storeManager = storeManager;
- this.partStr = partStr;
- this.createTime = System.currentTimeMillis();
- if (filterCodes != null && !filterCodes.isEmpty()) {
- this.isFilterConsume = true;
- }
- this.ssdTransId.set(TBaseConstants.META_VALUE_UNDEFINED);
+ this(storeManager, TBaseConstants.META_VALUE_UNDEFINED, consumerId,
+ filterCodes, sessionKey, sessionTime, false, partStr);
}
public ConsumerNodeInfo(final MessageStoreManager storeManager,
final int qryPriorityId, final String consumerId,
Set<String> filterCodes, final String sessionKey,
- long sessionTime, long ssdTransId, boolean needSsdProc,
- boolean isSupportLimit, final String partStr) {
+ long sessionTime, boolean isSupportLimit,
+ final String partStr) {
setConsumerId(consumerId);
if (filterCodes != null) {
for (String filterItem : filterCodes) {
@@ -115,14 +84,11 @@ public class ConsumerNodeInfo {
this.sessionKey = sessionKey;
this.sessionTime = sessionTime;
this.qryPriorityId.set(qryPriorityId);
- this.ssdTransId.set(ssdTransId);
- this.needSsdProc.set(needSsdProc);
this.storeManager = storeManager;
this.partStr = partStr;
this.createTime = System.currentTimeMillis();
if (filterCodes != null && !filterCodes.isEmpty()) {
this.isFilterConsume = true;
- this.needSsdProc.set(false);
}
this.isSupportLimit = isSupportLimit;
}
@@ -137,69 +103,6 @@ public class ConsumerNodeInfo {
long currTime = System.currentTimeMillis();
recalcMsgLimitValue(curDataDlt,
currTime, maxMsgTransferSize, flowCtrlRuleHandler);
- if (storeManager.isSsdServiceStart()
- && needSsdProc.get()
- && (currTime - createTime > 2 * 60 * 1000)) {
- // get message from ssd.
- switch (ssdProcStatus.get()) {
- case TBaseConstants.META_VALUE_UNDEFINED:
- case 4:
- // Request ssd sink operation, when finish first fetch operation.
- if (curDataDlt >= curSsdDltLimit.dataStartDltInSize
- && currTime - this.lastOpTime > 20 * 1000) {
- if (this.storeManager.putSsdTransferReq(partStr,
- storeKey, lastDataRdOffset)) {
- ssdProcStatus.set(0);
- }
- this.lastOpTime = System.currentTimeMillis();
- }
- break;
- case 0:
- // Request ssd sink operation, when exceed 2 minutes since last time.
- if (curDataDlt >= curSsdDltLimit.dataStartDltInSize
- && currTime - this.lastOpTime > 2 * 60 * 1000) {
- if (this.storeManager.putSsdTransferReq(partStr,
- storeKey, lastDataRdOffset)) {
- ssdProcStatus.set(0);
- }
- this.lastOpTime = System.currentTimeMillis();
- }
- break;
- case 2:
- // Set read message size, after finish ssd sink operation.
- if (lastDataRdOffset >= startSsdDataOffset
- && lastDataRdOffset < endSsdDataOffset) {
- return this.sentUnit;
- }
- if (lastDataRdOffset >= endSsdDataOffset) {
- // Request ssd sink operation, after read operation.
- if (curDataDlt >= curSsdDltLimit.dataEndDLtInSz) {
- resetSSDProcSeg(false);
- if (this.storeManager.putSsdTransferReq(partStr,
- storeKey, lastDataRdOffset)) {
- ssdProcStatus.set(0);
- }
- this.lastOpTime = System.currentTimeMillis();
- } else {
- resetSSDProcSeg(true);
- }
- }
- break;
- case 3:
- // Request ssd sink operation, when has been conducted or occur error in response.
- if (curDataDlt >= curSsdDltLimit.dataEndDLtInSz
- && currTime - this.lastOpTime > 20 * 1000) {
- if (this.storeManager.putSsdTransferReq(partStr,
- storeKey, lastDataRdOffset)) {
- ssdProcStatus.set(0);
- }
- this.lastOpTime = System.currentTimeMillis();
- }
- break;
- default:
- break;
- }
- }
if (isEscFlowCtrl
|| (totalUnitSec > sentMsgSize
&& this.curFlowCtrlVal.dataLtInSize > totalUnitMin)) {
@@ -216,23 +119,10 @@ public class ConsumerNodeInfo {
}
}
- public boolean processFromSsdFile() {
- return (storeManager != null
- && storeManager.isSsdServiceStart()
- && needSsdProc.get()
- && ssdProcStatus.get() == 2
- && lastDataRdOffset >= startSsdDataOffset
- && lastDataRdOffset < endSsdDataOffset);
- }
-
public String getPartStr() {
return partStr;
}
- public long getStartSsdDataOffset() {
- return startSsdDataOffset;
- }
-
public int getSentMsgSize() {
return sentMsgSize;
}
@@ -241,38 +131,6 @@ public class ConsumerNodeInfo {
return isSupportLimit;
}
- public boolean getNeedSsdProc() {
- return needSsdProc.get();
- }
-
- public void setNeedSsdProc(boolean needSsdProc) {
- this.needSsdProc.set(needSsdProc);
- }
-
- public long getDataStartDltInM() {
- return this.curSsdDltLimit.dataStartDltInSize / 1024 / 1024;
- }
-
- public long getSsdDataEndDltInM() {
- return this.curSsdDltLimit.dataEndDLtInSz / 1024 / 1024;
- }
-
- public long getSsdTransId() {
- return ssdTransId.get();
- }
-
- public void setSsdTransId(long ssdTransId, boolean needSSDProc) {
- this.ssdTransId.set(ssdTransId);
- this.needSsdProc.set(needSSDProc);
- }
-
- public void setSSDProcing() {
- if (this.ssdProcStatus.get() == 0) {
- this.ssdProcStatus.set(1);
- this.lastOpTime = System.currentTimeMillis();
- }
- }
-
public int getQryPriorityId() {
return qryPriorityId.get();
}
@@ -281,32 +139,6 @@ public class ConsumerNodeInfo {
this.qryPriorityId.set(qryPriorityId);
}
- public void setSSDTransferFinished(boolean isUse,
- final long startOffset,
- final long endOffset) {
- if (this.ssdProcStatus.get() == 1) {
- if (isUse) {
- this.ssdProcStatus.set(2);
- } else {
- this.ssdProcStatus.set(3);
- }
- this.startSsdDataOffset = startOffset;
- this.endSsdDataOffset = endOffset;
- this.lastOpTime = System.currentTimeMillis();
- }
- }
-
- public void resetSSDProcSeg(boolean finished) {
- if (finished) {
- this.ssdProcStatus.set(4);
- } else {
- this.ssdProcStatus.set(3);
- }
- this.startSsdDataOffset = -2;
- this.endSsdDataOffset = -2;
- this.lastOpTime = System.currentTimeMillis();
- }
-
public long getNextStatTime() {
return nextStatTime;
}
@@ -410,9 +242,6 @@ public class ConsumerNodeInfo {
if (this.curFlowCtrlVal == null) {
this.curFlowCtrlVal = new FlowCtrlResult(Long.MAX_VALUE, 0);
}
- if (storeManager.isSsdServiceStart() && needSsdProc.get()) {
- this.curSsdDltLimit = flowCtrlRuleHandler.getCurSSDStartDltInSZ();
- }
currTime = System.currentTimeMillis();
this.sentMsgSize = 0;
this.totalUnitMin = 0;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
index eb3d586..05428fe 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/broker/web/BrokerAdminServlet.java
@@ -165,18 +165,14 @@ public class BrokerAdminServlet extends HttpServlet {
.append(regTime).append(",\"isFilterConsume\":")
.append(ifFilterConsume);
}
- sBuilder.append(",\"needSSDProc\":").append(entry.getValue().getNeedSsdProc())
- .append(",\"ssdTransId\":").append(entry.getValue().getSsdTransId())
- .append(",\"qryPriorityId\":").append(entry.getValue().getQryPriorityId())
+ sBuilder.append(",\"qryPriorityId\":").append(entry.getValue().getQryPriorityId())
.append(",\"curDataLimitInM\":").append(entry.getValue().getCurFlowCtrlLimitSize())
.append(",\"curFreqLimit\":").append(entry.getValue().getCurFlowCtrlFreqLimit())
.append(",\"totalSentSec\":").append(entry.getValue().getSentMsgSize())
.append(",\"isSupportLimit\":").append(entry.getValue().isSupportLimit())
.append(",\"sentUnitSec\":").append(entry.getValue().getTotalUnitSec())
.append(",\"totalSentMin\":").append(entry.getValue().getTotalUnitMin())
- .append(",\"sentUnit\":").append(entry.getValue().getSentUnit())
- .append(",\"SSDDataDltStartInM\":").append(entry.getValue().getDataStartDltInM())
- .append(",\"SSDDataDltEndInM\":").append(entry.getValue().getSsdDataEndDltInM());
+ .append(",\"sentUnit\":").append(entry.getValue().getSentUnit());
MessageStoreManager storeManager = broker.getStoreManager();
OffsetService offsetService = broker.getOffsetManager();
MessageStore store = null;
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
index 0554098..0d93ec7 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/TMaster.java
@@ -551,8 +551,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
? request.getSessionTime() : System.currentTimeMillis();
int sourceCount = request.hasTotalCount()
? request.getTotalCount() : -1;
- long ssdStoreId = request.hasSsdStoreId()
- ? request.getSsdStoreId() : TBaseConstants.META_VALUE_UNDEFINED;
int reqQryPriorityId = request.hasQryPriorityId()
? request.getQryPriorityId() : TBaseConstants.META_VALUE_UNDEFINED;
boolean isSelectBig = (!request.hasSelectBig() || request.getSelectBig());
@@ -677,12 +675,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
if (request.getGroupFlowCheckId() != bdbGroupFlowCtrlEntity.getSerialId()) {
builder.setGroupFlowControlInfo(bdbGroupFlowCtrlEntity.getFlowCtrlInfo());
}
- if (bdbGroupFlowCtrlEntity.isNeedSSDProc()
- && defFlowCtrlEntity != null
- && defFlowCtrlEntity.isValidStatus()
- && defFlowCtrlEntity.isNeedSSDProc()) {
- builder.setSsdStoreId(defFlowCtrlEntity.getSsdTranslateId());
- }
}
}
builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false));
@@ -859,12 +851,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
if (request.getGroupFlowCheckId() != bdbGroupFlowCtrlEntity.getSerialId()) {
builder.setGroupFlowControlInfo(bdbGroupFlowCtrlEntity.getFlowCtrlInfo());
}
- if (bdbGroupFlowCtrlEntity.isNeedSSDProc()
- && defFlowCtrlEntity != null
- && defFlowCtrlEntity.isValidStatus()
- && defFlowCtrlEntity.isNeedSSDProc()) {
- builder.setSsdStoreId(defFlowCtrlEntity.getSsdTranslateId());
- }
}
}
builder.setAuthorizedInfo(genAuthorizedInfo(certResult.authorizedToken, false));
@@ -1009,8 +995,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
boolean needFastStart = false;
final long reFlowCtrlId = request.hasFlowCheckId()
? request.getFlowCheckId() : TBaseConstants.META_VALUE_UNDEFINED;
- final long reqSsdTranslateId = request.hasSsdStoreId()
- ? request.getSsdStoreId() : TBaseConstants.META_VALUE_UNDEFINED;
final int qryPriorityId = request.hasQryPriorityId()
? request.getQryPriorityId() : TBaseConstants.META_VALUE_UNDEFINED;
ConcurrentHashMap<Integer, BrokerSyncStatusInfo> brokerSyncStatusMap =
@@ -1050,7 +1034,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
.append(",isTlsEnable=").append(brokerInfo.isEnableTLS())
.append(",TLSport=").append(brokerInfo.getTlsPort())
.append(",FlowCtrlId=").append(reFlowCtrlId)
- .append(",ssdTranslateId=").append(reqSsdTranslateId)
.append(",qryPriorityId=").append(qryPriorityId)
.append(",checksumId=").append(request.getConfCheckSumId()).toString());
strBuffer.delete(0, strBuffer.length());
@@ -1077,11 +1060,11 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
builder.setConfCheckSumId(brokerStatusInfo.getLastPushBrokerCheckSumId());
builder.setBrokerDefaultConfInfo(brokerStatusInfo.getLastPushBrokerDefaultConfInfo());
builder.addAllBrokerTopicSetConfInfo(brokerStatusInfo.getLastPushBrokerTopicSetConfInfo());
+ builder.setSsdStoreId(TBaseConstants.META_VALUE_UNDEFINED);
if (request.hasFlowCheckId()) {
BdbGroupFlowCtrlEntity bdbGroupFlowCtrlEntity =
defaultBrokerConfManager.getBdbDefFlowCtrl();
if (bdbGroupFlowCtrlEntity == null) {
- builder.setSsdStoreId(TBaseConstants.META_VALUE_UNDEFINED);
builder.setFlowCheckId(TBaseConstants.META_VALUE_UNDEFINED);
builder.setQryPriorityId(TBaseConstants.META_VALUE_UNDEFINED);
if (request.getFlowCheckId() != TBaseConstants.META_VALUE_UNDEFINED) {
@@ -1097,11 +1080,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
builder.setFlowControlInfo(" ");
}
}
- if (bdbGroupFlowCtrlEntity.isNeedSSDProc()) {
- builder.setSsdStoreId(bdbGroupFlowCtrlEntity.getSsdTranslateId());
- } else {
- builder.setSsdStoreId(TBaseConstants.META_VALUE_UNDEFINED);
- }
}
}
logger.info(strBuffer.append("[TMaster sync] push broker configure: brokerId = ")
@@ -1225,8 +1203,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
request.getReadStatusRpt(), request.getWriteStatusRpt());
long reFlowCtrlId = request.hasFlowCheckId()
? request.getFlowCheckId() : TBaseConstants.META_VALUE_UNDEFINED;
- long reqSsdTranslateId = request.hasSsdStoreId()
- ? request.getSsdStoreId() : TBaseConstants.META_VALUE_UNDEFINED;
int qryPriorityId = request.hasQryPriorityId()
? request.getQryPriorityId() : TBaseConstants.META_VALUE_UNDEFINED;
if (request.getTakeConfInfo()) {
@@ -1238,7 +1214,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
.append(",checksumId=").append(request.getConfCheckSumId())
.append(",hasFlowCheckId=").append(request.hasFlowCheckId())
.append(",reFlowCtrlId=").append(reFlowCtrlId)
- .append(",reqSsdTranslateId=").append(reqSsdTranslateId)
.append(",qryPriorityId=").append(qryPriorityId)
.append(",brokerOnline=").append(request.getBrokerOnline())
.append(",default broker configure is ").append(request.getBrokerDefaultConfInfo())
@@ -1251,12 +1226,12 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
builder.setNeedReportData(brokerSyncStatusInfo.needReportData());
builder.setCurBrokerConfId(brokerSyncStatusInfo.getLastPushBrokerConfId());
builder.setConfCheckSumId(brokerSyncStatusInfo.getLastPushBrokerCheckSumId());
+ builder.setSsdStoreId(TBaseConstants.META_VALUE_UNDEFINED);
if (request.hasFlowCheckId()) {
BdbGroupFlowCtrlEntity bdbGroupFlowCtrlEntity =
defaultBrokerConfManager.getBdbDefFlowCtrl();
if (bdbGroupFlowCtrlEntity == null) {
builder.setFlowCheckId(TBaseConstants.META_VALUE_UNDEFINED);
- builder.setSsdStoreId(TBaseConstants.META_VALUE_UNDEFINED);
builder.setQryPriorityId(TBaseConstants.META_VALUE_UNDEFINED);
if (request.getFlowCheckId() != TBaseConstants.META_VALUE_UNDEFINED) {
builder.setFlowControlInfo(" ");
@@ -1271,11 +1246,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
builder.setFlowControlInfo(" ");
}
}
- if (bdbGroupFlowCtrlEntity.isNeedSSDProc()) {
- builder.setSsdStoreId(bdbGroupFlowCtrlEntity.getSsdTranslateId());
- } else {
- builder.setSsdStoreId(TBaseConstants.META_VALUE_UNDEFINED);
- }
}
}
brokerHolder.setBrokerHeartBeatReqStatus(brokerInfo.getBrokerId(), builder);
@@ -1295,7 +1265,6 @@ public class TMaster extends HasThread implements MasterService, Stoppable {
.append(",set flowCtrlId=").append(builder.getFlowCheckId())
.append(",stopWrite=").append(builder.getStopWrite())
.append(",stopRead=").append(builder.getStopRead())
- .append(",ssdTranslateId=").append(builder.getSsdStoreId())
.append(",qryPriorityId=").append(builder.getQryPriorityId())
.append(",checksumId=").append(brokerSyncStatusInfo.getLastPushBrokerCheckSumId())
.append(",default configure is ")
diff --git a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminFlowRuleHandler.java b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminFlowRuleHandler.java
index e6a91a1..acc74c0 100644
--- a/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminFlowRuleHandler.java
+++ b/tubemq-server/src/main/java/org/apache/tubemq/server/master/web/handler/WebAdminFlowRuleHandler.java
@@ -79,11 +79,6 @@ public class WebAdminFlowRuleHandler {
WebParameterUtils.validIntDataParameter("qryPriorityId",
req.getParameter("qryPriorityId"), false, 301, 101);
checkQryPriorityId(qryPriorityId);
- // get if enable ssd process function
- boolean curNeedSSDProc =
- WebParameterUtils.validBooleanDataParameter("needSSDProc",
- req.getParameter("needSSDProc"),
- false, false);
Set<String> batchGroupNames = new HashSet<>();
if (opType == 1) {
batchGroupNames.add(TServerConstants.TOKEN_DEFAULT_FLOW_CONTROL);
@@ -107,12 +102,12 @@ public class WebAdminFlowRuleHandler {
brokerConfManager.confAddBdbGroupFlowCtrl(
new BdbGroupFlowCtrlEntity(strBuffer.toString(),
statusId, ruleCnt, qryPriorityId, "",
- curNeedSSDProc, createUser, createDate));
+ false, createUser, createDate));
} else {
brokerConfManager.confAddBdbGroupFlowCtrl(
new BdbGroupFlowCtrlEntity(groupName,
strBuffer.toString(), statusId, ruleCnt, qryPriorityId, "",
- curNeedSSDProc, createUser, createDate));
+ false, createUser, createDate));
}
}
strBuffer.delete(0, strBuffer.length());
@@ -248,16 +243,6 @@ public class WebAdminFlowRuleHandler {
newGroupFlowCtrlEntity.setFlowCtrlInfo(newFlowCtrlInfo);
newGroupFlowCtrlEntity.setRuleCnt(ruleCnt);
}
- String inNeedSsdProc = req.getParameter("needSSDProc");
- if (TStringUtils.isNotBlank(inNeedSsdProc)) {
- boolean curNeedSsdProc =
- WebParameterUtils.validBooleanDataParameter("needSSDProc",
- req.getParameter("needSSDProc"), false, false);
- if (curNeedSsdProc != oldEntity.isNeedSSDProc()) {
- foundChange = true;
- newGroupFlowCtrlEntity.setNeedSSDProc(curNeedSsdProc);
- }
- }
// update record if found change
if (foundChange) {
try {
@@ -361,7 +346,7 @@ public class WebAdminFlowRuleHandler {
strBuffer.append("[");
if (TStringUtils.isNotBlank(inFlowCtrlInfo)) {
List<Integer> ruleTypes = Arrays.asList(0, 1, 2, 3);
- inFlowCtrlInfo = String.valueOf(inFlowCtrlInfo).trim();
+ inFlowCtrlInfo = inFlowCtrlInfo.trim();
FlowCtrlRuleHandler flowCtrlRuleHandler =
new FlowCtrlRuleHandler(true);
Map<Integer, List<FlowCtrlItem>> flowCtrlItemMap =
@@ -371,10 +356,6 @@ public class WebAdminFlowRuleHandler {
int rules = 0;
List<FlowCtrlItem> flowCtrlItems = flowCtrlItemMap.get(typeId);
if (flowCtrlItems != null) {
- if (opType != 1 && typeId == 2) {
- throw new Exception(
- "Illegal value: SSD limit rule only set in default flow control set!");
- }
if (ruleCnt++ > 0) {
strBuffer.append(",");
}