You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/11/16 08:31:50 UTC
[incubator-inlong] branch master updated: [INLONG-1803]Add the client interface that supports partition allocation (#1804)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 14fb846 [INLONG-1803]Add the client interface that supports partition allocation (#1804)
14fb846 is described below
commit 14fb8465533505440014abc3726f75add9938f5e
Author: gosonzhang <46...@qq.com>
AuthorDate: Tue Nov 16 16:31:44 2021 +0800
[INLONG-1803]Add the client interface that supports partition allocation (#1804)
* [INLONG-1803]Add the client interface that supports partition allocation
---
.../inlong/tubemq/client/common/ConfirmResult.java | 70 ++++++++++
.../inlong/tubemq/client/common/ConsumeResult.java | 75 +++++++++++
.../inlong/tubemq/client/common/PeerInfo.java | 9 ++
.../tubemq/client/common/QueryMetaResult.java | 60 +++++++++
.../tubemq/client/common/TClientConstants.java | 3 +
.../tubemq/client/config/ConsumerConfig.java | 17 ++-
.../client/consumer/BaseMessageConsumer.java | 4 +-
.../client/consumer/ClientBalanceConsumer.java | 149 +++++++++++++++++++++
.../client/factory/MessageSessionFactory.java | 3 +
.../client/factory/TubeBaseSessionFactory.java | 15 ++-
.../client/factory/TubeMultiSessionFactory.java | 6 +
.../client/factory/TubeSingleSessionFactory.java | 7 +
.../inlong/tubemq/corebase/TBaseConstants.java | 2 +
13 files changed, 413 insertions(+), 7 deletions(-)
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ConfirmResult.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ConfirmResult.java
new file mode 100644
index 0000000..dfa57e8
--- /dev/null
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ConfirmResult.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.client.common;
+
+import org.apache.inlong.tubemq.corebase.cluster.Partition;
+import org.apache.inlong.tubemq.corebase.rv.RetValue;
+
+public class ConfirmResult extends RetValue {
+ private String topicName = "";
+ private PeerInfo peerInfo = new PeerInfo();
+ private String confirmContext = "";
+
+ public ConfirmResult() {
+ super();
+ }
+
+ public void setSuccResult(String topicName, Partition partition,
+ long currOffset, long maxOffset) {
+ super.setSuccResult();
+ this.topicName = topicName;
+ this.peerInfo.setMsgSourceInfo(partition, currOffset, maxOffset);
+ }
+
+ public void setProcessResult(boolean isSuccess, int errCode, String errMsg,
+ String topicName, Partition partition,
+ long currOffset, long maxOffset) {
+ super.setFullInfo(isSuccess, errCode, errMsg);
+ this.topicName = topicName;
+ this.peerInfo.setMsgSourceInfo(partition, currOffset, maxOffset);
+ }
+
+ public String getTopicName() {
+ return topicName;
+ }
+
+ public PeerInfo getPeerInfo() {
+ return peerInfo;
+ }
+
+ public String getPartitionKey() {
+ return peerInfo.getPartitionKey();
+ }
+
+ public final String getConfirmContext() {
+ return confirmContext;
+ }
+
+ public long getCurrOffset() {
+ return peerInfo.getCurrOffset();
+ }
+
+ public long getMaxOffset() {
+ return peerInfo.getMaxOffset();
+ }
+}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ConsumeResult.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ConsumeResult.java
new file mode 100644
index 0000000..b044316
--- /dev/null
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ConsumeResult.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.client.common;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.inlong.tubemq.client.consumer.FetchContext;
+import org.apache.inlong.tubemq.corebase.Message;
+import org.apache.inlong.tubemq.corebase.rv.RetValue;
+
+public class ConsumeResult extends RetValue {
+ private String topicName = "";
+ private PeerInfo peerInfo = new PeerInfo();
+ private String confirmContext = "";
+ private List<Message> messageList = new ArrayList<>();
+
+ public ConsumeResult() {
+ super();
+ }
+
+ public void setProcessResult(FetchContext taskContext) {
+ setFullInfo(taskContext.isSuccess(),
+ taskContext.getErrCode(), taskContext.getErrMsg());
+ this.topicName = taskContext.getPartition().getTopic();
+ peerInfo.setMsgSourceInfo(taskContext.getPartition(),
+ taskContext.getCurrOffset(), taskContext.getMaxOffset());
+ if (this.isSuccess()) {
+ this.messageList = taskContext.getMessageList();
+ this.confirmContext = taskContext.getConfirmContext();
+ }
+ }
+
+ public String getTopicName() {
+ return topicName;
+ }
+
+ public PeerInfo getPeerInfo() {
+ return peerInfo;
+ }
+
+ public String getPartitionKey() {
+ return peerInfo.getPartitionKey();
+ }
+
+ public final String getConfirmContext() {
+ return confirmContext;
+ }
+
+ public long getCurrOffset() {
+ return peerInfo.getCurrOffset();
+ }
+
+ public final List<Message> getMessageList() {
+ return messageList;
+ }
+
+ public long getMaxOffset() {
+ return peerInfo.getMaxOffset();
+ }
+}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/PeerInfo.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/PeerInfo.java
index ac6fecc..3628beb 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/PeerInfo.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/PeerInfo.java
@@ -26,6 +26,7 @@ public class PeerInfo {
private String partitionKey = "";
private long currOffset = TBaseConstants.META_VALUE_UNDEFINED;
private long maxOffset = TBaseConstants.META_VALUE_UNDEFINED;
+ private long msgLagCount = TBaseConstants.META_VALUE_UNDEFINED;
public PeerInfo() {
@@ -59,9 +60,17 @@ public class PeerInfo {
}
this.currOffset = newOffset;
this.maxOffset = maxOffset;
+ if (this.currOffset >= 0 && this.maxOffset >= 0) {
+ this.msgLagCount =
+ (this.maxOffset - this.currOffset) / TBaseConstants.INDEX_MSG_UNIT_SIZE;
+ }
}
public long getMaxOffset() {
return maxOffset;
}
+
+ public long getMsgLagCount() {
+ return msgLagCount;
+ }
}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/QueryMetaResult.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/QueryMetaResult.java
new file mode 100644
index 0000000..996fd48
--- /dev/null
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/QueryMetaResult.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.client.common;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.inlong.tubemq.corebase.rv.RetValue;
+
+public class QueryMetaResult extends RetValue {
+
+ Map<String, Boolean> partStatusMap = new HashMap<>();
+
+ public QueryMetaResult() {
+ super();
+ }
+
+ public QueryMetaResult(QueryMetaResult other) {
+ super(other);
+ }
+
+ public QueryMetaResult(int errCode, String errInfo) {
+ super(errCode, errInfo);
+ }
+
+ public void setFailResult(int errCode, final String errMsg) {
+ super.setFailResult(errCode, errMsg);
+ }
+
+ public void setFailResult(final String errMsg) {
+ super.setFailResult(errMsg);
+ }
+
+ public void setSuccResult(Map<String, Boolean> partStatusMap) {
+ super.setSuccResult();
+ this.partStatusMap = partStatusMap;
+ }
+
+ public Map<String, Boolean> getPartStatusMap() {
+ return partStatusMap;
+ }
+
+ public void clear() {
+ super.clear();
+ }
+}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/TClientConstants.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/TClientConstants.java
index bd6039e..06b8293 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/TClientConstants.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/TClientConstants.java
@@ -37,4 +37,7 @@ public class TClientConstants {
public static final int MAX_CONNECTION_FAILURE_LOG_TIMES = 10;
public static final int MAX_SUBSCRIBE_REPORT_INTERVAL_TIMES = 6;
+ public static final long CFG_DEFAULT_META_QUERY_WAIT_PERIOD_MS = 10000L;
+ public static final long CFG_MIN_META_QUERY_WAIT_PERIOD_MS = 5000L;
+
}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java
index ea6bd5a..6dd19b8 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java
@@ -28,7 +28,7 @@ import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
*/
public class ConsumerConfig extends TubeClientConfig {
- private String consumerGroup;
+ private final String consumerGroup;
/* consumeModel
* Set the start position of the consumer group. The value can be [-1, 0, 1]. Default value is 0.
@@ -47,6 +47,8 @@ public class ConsumerConfig extends TubeClientConfig {
TClientConstants.CFG_DEFAULT_CONSUME_READ_CHECK_SLICE_MS;
private long shutDownRebalanceWaitPeriodMs =
TClientConstants.CFG_DEFAULT_SHUTDOWN_REBALANCE_WAIT_PERIOD_MS;
+ private long partMetaInfoCheckPeriodMs =
+ TClientConstants.CFG_DEFAULT_META_QUERY_WAIT_PERIOD_MS;
private int pushFetchThreadCnt =
TClientConstants.CFG_DEFAULT_CLIENT_PUSH_FETCH_THREAD_CNT;
private boolean pushListenerWaitTimeoutRollBack = true;
@@ -159,6 +161,17 @@ public class ConsumerConfig extends TubeClientConfig {
this.shutDownRebalanceWaitPeriodMs = shutDownRebalanceWaitPeriodMs;
}
+ public long getPartMetaInfoCheckPeriodMs() {
+ return partMetaInfoCheckPeriodMs;
+ }
+
+ public void setPartMetaInfoCheckPeriodMs(long partMetaInfoCheckPeriodMs) {
+ if (partMetaInfoCheckPeriodMs < TClientConstants.CFG_MIN_META_QUERY_WAIT_PERIOD_MS) {
+ this.partMetaInfoCheckPeriodMs = TClientConstants.CFG_MIN_META_QUERY_WAIT_PERIOD_MS;
+ }
+ this.partMetaInfoCheckPeriodMs = partMetaInfoCheckPeriodMs;
+ }
+
public int getPushFetchThreadCnt() {
return pushFetchThreadCnt;
}
@@ -270,6 +283,8 @@ public class ConsumerConfig extends TubeClientConfig {
.append(",\"pullConfirmWaitPeriodMs\":").append(this.pullRebConfirmWaitPeriodMs)
.append(",\"pullProtectConfirmTimeoutPeriodMs\":").append(this.pullProtectConfirmTimeoutMs)
.append(",\"pullConfirmInLocal\":").append(this.pullConfirmInLocal)
+ .append(",\"maxSubInfoReportIntvlTimes\":").append(this.maxSubInfoReportIntvlTimes)
+ .append(",\"partMetaInfoCheckPeriodMs\":").append(this.partMetaInfoCheckPeriodMs)
.append(",\"ClientConfig\":").append(toJsonString())
.append("}").toString();
}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
index c0cfaef..4386004 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
@@ -1058,7 +1058,7 @@ public class BaseMessageConsumer implements MessageConsumer {
&& consumeSubInfo.isRequireBound()
&& consumeSubInfo.getIsNotAllocated()) {
Long currOffset = consumeSubInfo.getAssignedPartOffset(partition.getPartitionKey());
- if (currOffset != null && currOffset != -1) {
+ if (currOffset != null && currOffset >= 0) {
builder.setCurrOffset(currOffset);
}
}
@@ -1720,7 +1720,7 @@ public class BaseMessageConsumer implements MessageConsumer {
logger.warn(strBuffer
.append("[heart2broker error] partition:")
.append(failPartition.toString())
- .append(" errorCode")
+ .append(", errorCode=")
.append(errorCode).toString());
strBuffer.delete(0, strBuffer.length());
}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/ClientBalanceConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/ClientBalanceConsumer.java
new file mode 100644
index 0000000..4a781be
--- /dev/null
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/ClientBalanceConsumer.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.client.consumer;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import org.apache.inlong.tubemq.client.common.ConfirmResult;
+import org.apache.inlong.tubemq.client.common.ConsumeResult;
+import org.apache.inlong.tubemq.client.common.QueryMetaResult;
+import org.apache.inlong.tubemq.client.config.ConsumerConfig;
+import org.apache.inlong.tubemq.client.exception.TubeClientException;
+import org.apache.inlong.tubemq.corebase.Shutdownable;
+import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
+
+public interface ClientBalanceConsumer extends Shutdownable {
+
+ String getClientVersion();
+
+ String getConsumerId();
+
+ boolean isShutdown();
+
+ boolean isFilterConsume(String topic);
+
+ ConsumerConfig getConsumerConfig();
+
+ int getSourceCount();
+
+ int getNodeId();
+
+ /**
+ * start client-balance service
+ *
+ * @param topicAndFilterCondMap subscribed topic and need filtered condition items of topic
+ * if not need filter consume messages, set condition's set is empty or null
+ * @param sourceCount the total count of clients that the consumer group will start this time
+ * If this value is set, the system will check that this parameter value
+ * carried by each client must be consistent,
+ * and the corresponding nodeId value must be unique in the consumer group;
+ * if this value is not set, Please set a negative number
+ * @param nodeId the unique ID of the node in the consumer group
+ * Attention:The sourceCount and nodeId parameters are used when the client
+ * performs modular allocation of partitions to avoid
+ * allocation conflicts in advance and avoid
+ * repeated allocations to the same partition by the client
+ * @param result call result, the parameter is not allowed to be null
+ * @throws TubeClientException parameter abnormal
+ *
+ * @return true if call success, false if failure
+ */
+ boolean start(Map<String, TreeSet<String>> topicAndFilterCondMap,
+ int sourceCount, int nodeId, ProcessResult result) throws TubeClientException;
+
+ /**
+ * Query partition configure information from Master
+ *
+ * @param result call result, the parameter is not allowed to be null
+ * @throws TubeClientException parameter abnormal
+ *
+ * @return true if call success, false if failure
+ */
+ boolean getPartitionMetaInfo(QueryMetaResult result) throws TubeClientException;
+
+ boolean isPartitionsReady(long maxWaitTime);
+
+ /**
+ * Get current registered partitionKey set
+ *
+ * @return the partition key set registered
+ */
+ Set<String> getCurRegisteredPartSet();
+
+ /**
+ * Connect to the partition's broker for consumption
+ *
+ * @param partitionKey partition key
+ * @param boostrapOffset boostrap offset for consumption, if value:
+ * < 0, broker will not change stored offset,
+ * >= 0, broker will replace the stored offset with the specified value
+ * @param result call result, the parameter is not allowed to be null
+ * @throws TubeClientException parameter abnormal
+ *
+ * @return true if call success, false if failure
+ */
+ boolean connect2Partition(String partitionKey, long boostrapOffset,
+ ProcessResult result) throws TubeClientException;
+
+ /**
+ * Disconnect from the registered partition for partition release
+ *
+ * @param partitionKey partition key
+ * @param result call result, the parameter is not allowed to be null
+ * @throws TubeClientException parameter abnormal
+ *
+ * @return true if call success, false if failure
+ */
+ boolean disconnectFromPartition(String partitionKey,
+ ProcessResult result) throws TubeClientException;
+
+ /**
+ * Get consume offset information of the current registered partitions
+ *
+ * @return consume offset information
+ */
+ Map<String, ConsumeOffsetInfo> getCurPartitionOffsetInfos();
+
+ /**
+ * Consume from messages from server
+ *
+ * @param result call result, the parameter is not allowed to be null
+ * @throws TubeClientException parameter abnormal
+ *
+ * @return true if call success, false if failure
+ */
+ boolean getMessage(ConsumeResult result) throws TubeClientException;
+
+ /**
+ * Confirm whether the messages has been consumed
+ *
+ * @param confirmContext confirm context, from the corresponding field value
+ * in the result of getting the message
+ * @param isConsumed whether the data has been successfully processed, if
+ * true, tell the server to continue processing the next batch of data;
+ * false, tell the server that the data has not been processed successfully
+ * and need to be re-pulled for processing
+ * @throws TubeClientException parameter abnormal
+ *
+ * @return true if call success, false if failure
+ */
+ boolean confirmConsume(String confirmContext, boolean isConsumed,
+ ConfirmResult result) throws TubeClientException;
+
+}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/MessageSessionFactory.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/MessageSessionFactory.java
index 04359aa..2295464 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/MessageSessionFactory.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/MessageSessionFactory.java
@@ -18,6 +18,7 @@
package org.apache.inlong.tubemq.client.factory;
import org.apache.inlong.tubemq.client.config.ConsumerConfig;
+import org.apache.inlong.tubemq.client.consumer.ClientBalanceConsumer;
import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
import org.apache.inlong.tubemq.client.consumer.PushMessageConsumer;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
@@ -39,4 +40,6 @@ public interface MessageSessionFactory extends Shutdownable {
PullMessageConsumer createPullConsumer(ConsumerConfig consumerConfig)
throws TubeClientException;
+ ClientBalanceConsumer createBalanceConsumer(ConsumerConfig consumerConfig)
+ throws TubeClientException;
}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeBaseSessionFactory.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeBaseSessionFactory.java
index 198cd81..b9b1c81 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeBaseSessionFactory.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeBaseSessionFactory.java
@@ -21,6 +21,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.inlong.tubemq.client.config.ConsumerConfig;
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.tubemq.client.consumer.ClientBalanceConsumer;
import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
import org.apache.inlong.tubemq.client.consumer.PushMessageConsumer;
import org.apache.inlong.tubemq.client.consumer.SimplePullMessageConsumer;
@@ -164,9 +165,9 @@ public class TubeBaseSessionFactory implements InnerSessionFactory {
throw new TubeClientException(new StringBuilder(512)
.append("consumerConfig's masterInfo not equal!")
.append(" SessionFactory's masterInfo is ")
- .append(tubeClientConfig.getMasterInfo())
+ .append(tubeClientConfig.getMasterInfo().getMasterClusterStr())
.append(", consumerConfig's masterInfo is ")
- .append(consumerConfig.getMasterInfo()).toString());
+ .append(consumerConfig.getMasterInfo().getMasterClusterStr()).toString());
}
return this.addClient(new SimplePullMessageConsumer(this, consumerConfig));
}
@@ -178,13 +179,19 @@ public class TubeBaseSessionFactory implements InnerSessionFactory {
throw new TubeClientException(new StringBuilder(512)
.append("consumerConfig's masterInfo not equal!")
.append(" SessionFactory's masterInfo is ")
- .append(tubeClientConfig.getMasterInfo())
+ .append(tubeClientConfig.getMasterInfo().getMasterClusterStr())
.append(", consumerConfig's masterInfo is ")
- .append(consumerConfig.getMasterInfo()).toString());
+ .append(consumerConfig.getMasterInfo().getMasterClusterStr()).toString());
}
return this.addClient(new SimplePushMessageConsumer(this, consumerConfig));
}
+ @Override
+ public ClientBalanceConsumer createBalanceConsumer(ConsumerConfig consumerConfig)
+ throws TubeClientException {
+ return null;
+ }
+
public boolean isShutdown() {
return shutdown.get();
}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeMultiSessionFactory.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeMultiSessionFactory.java
index a113c60..715f08e 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeMultiSessionFactory.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeMultiSessionFactory.java
@@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.inlong.tubemq.client.config.ConsumerConfig;
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
import org.apache.inlong.tubemq.client.config.TubeClientConfigUtils;
+import org.apache.inlong.tubemq.client.consumer.ClientBalanceConsumer;
import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
import org.apache.inlong.tubemq.client.consumer.PushMessageConsumer;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
@@ -84,4 +85,9 @@ public class TubeMultiSessionFactory implements MessageSessionFactory {
return this.baseSessionFactory.createPullConsumer(consumerConfig);
}
+ @Override
+ public ClientBalanceConsumer createBalanceConsumer(ConsumerConfig consumerConfig)
+ throws TubeClientException {
+ return null;
+ }
}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeSingleSessionFactory.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeSingleSessionFactory.java
index 03fd450..f5d4411 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeSingleSessionFactory.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeSingleSessionFactory.java
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.inlong.tubemq.client.config.ConsumerConfig;
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
import org.apache.inlong.tubemq.client.config.TubeClientConfigUtils;
+import org.apache.inlong.tubemq.client.consumer.ClientBalanceConsumer;
import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
import org.apache.inlong.tubemq.client.consumer.PushMessageConsumer;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
@@ -100,6 +101,12 @@ public class TubeSingleSessionFactory implements MessageSessionFactory {
return baseSessionFactory.createPullConsumer(consumerConfig);
}
+ @Override
+ public ClientBalanceConsumer createBalanceConsumer(ConsumerConfig consumerConfig)
+ throws TubeClientException {
+ return null;
+ }
+
public NettyClientFactory getRpcServiceFactory() {
return clientFactory;
}
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TBaseConstants.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TBaseConstants.java
index ca5d31f..2f0595c 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TBaseConstants.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TBaseConstants.java
@@ -80,6 +80,8 @@ public class TBaseConstants {
public static final int META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT =
META_MAX_ALLOWED_MESSAGE_SIZE_MB * META_MB_UNIT_SIZE;
+ public static final long INDEX_MSG_UNIT_SIZE = 28;
+
public static final long CFG_DEF_META_FORCE_UPDATE_PERIOD = 3 * 60 * 1000;
public static final long CFG_MIN_META_FORCE_UPDATE_PERIOD = 1 * 60 * 1000;