You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/11/16 08:31:50 UTC

[incubator-inlong] branch master updated: [INLONG-1803]Add the client interface that supports partition allocation (#1804)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 14fb846  [INLONG-1803]Add the client interface that supports partition allocation (#1804)
14fb846 is described below

commit 14fb8465533505440014abc3726f75add9938f5e
Author: gosonzhang <46...@qq.com>
AuthorDate: Tue Nov 16 16:31:44 2021 +0800

    [INLONG-1803]Add the client interface that supports partition allocation (#1804)
    
    * [INLONG-1803]Add the client interface that supports partition allocation
---
 .../inlong/tubemq/client/common/ConfirmResult.java |  70 ++++++++++
 .../inlong/tubemq/client/common/ConsumeResult.java |  75 +++++++++++
 .../inlong/tubemq/client/common/PeerInfo.java      |   9 ++
 .../tubemq/client/common/QueryMetaResult.java      |  60 +++++++++
 .../tubemq/client/common/TClientConstants.java     |   3 +
 .../tubemq/client/config/ConsumerConfig.java       |  17 ++-
 .../client/consumer/BaseMessageConsumer.java       |   4 +-
 .../client/consumer/ClientBalanceConsumer.java     | 149 +++++++++++++++++++++
 .../client/factory/MessageSessionFactory.java      |   3 +
 .../client/factory/TubeBaseSessionFactory.java     |  15 ++-
 .../client/factory/TubeMultiSessionFactory.java    |   6 +
 .../client/factory/TubeSingleSessionFactory.java   |   7 +
 .../inlong/tubemq/corebase/TBaseConstants.java     |   2 +
 13 files changed, 413 insertions(+), 7 deletions(-)

diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ConfirmResult.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ConfirmResult.java
new file mode 100644
index 0000000..dfa57e8
--- /dev/null
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ConfirmResult.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.client.common;
+
+import org.apache.inlong.tubemq.corebase.cluster.Partition;
+import org.apache.inlong.tubemq.corebase.rv.RetValue;
+
+public class ConfirmResult extends RetValue {
+    private String topicName = "";
+    private PeerInfo peerInfo = new PeerInfo();
+    private String confirmContext = "";
+
+    public ConfirmResult() {
+        super();
+    }
+
+    public void setSuccResult(String topicName, Partition partition,
+                              long currOffset, long maxOffset) {
+        super.setSuccResult();
+        this.topicName = topicName;
+        this.peerInfo.setMsgSourceInfo(partition, currOffset, maxOffset);
+    }
+
+    public void setProcessResult(boolean isSuccess, int errCode, String errMsg,
+                                 String topicName, Partition partition,
+                                 long currOffset, long maxOffset) {
+        super.setFullInfo(isSuccess, errCode, errMsg);
+        this.topicName = topicName;
+        this.peerInfo.setMsgSourceInfo(partition, currOffset, maxOffset);
+    }
+
+    public String getTopicName() {
+        return topicName;
+    }
+
+    public PeerInfo getPeerInfo() {
+        return peerInfo;
+    }
+
+    public String getPartitionKey() {
+        return peerInfo.getPartitionKey();
+    }
+
+    public final String getConfirmContext() {
+        return confirmContext;
+    }
+
+    public long getCurrOffset() {
+        return peerInfo.getCurrOffset();
+    }
+
+    public long getMaxOffset() {
+        return peerInfo.getMaxOffset();
+    }
+}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ConsumeResult.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ConsumeResult.java
new file mode 100644
index 0000000..b044316
--- /dev/null
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/ConsumeResult.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.client.common;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.inlong.tubemq.client.consumer.FetchContext;
+import org.apache.inlong.tubemq.corebase.Message;
+import org.apache.inlong.tubemq.corebase.rv.RetValue;
+
+public class ConsumeResult extends RetValue {
+    private String topicName = "";
+    private PeerInfo peerInfo = new PeerInfo();
+    private String confirmContext = "";
+    private List<Message> messageList = new ArrayList<>();
+
+    public ConsumeResult() {
+        super();
+    }
+
+    public void setProcessResult(FetchContext taskContext) {
+        setFullInfo(taskContext.isSuccess(),
+                taskContext.getErrCode(), taskContext.getErrMsg());
+        this.topicName = taskContext.getPartition().getTopic();
+        peerInfo.setMsgSourceInfo(taskContext.getPartition(),
+                taskContext.getCurrOffset(), taskContext.getMaxOffset());
+        if (this.isSuccess()) {
+            this.messageList = taskContext.getMessageList();
+            this.confirmContext = taskContext.getConfirmContext();
+        }
+    }
+
+    public String getTopicName() {
+        return topicName;
+    }
+
+    public PeerInfo getPeerInfo() {
+        return peerInfo;
+    }
+
+    public String getPartitionKey() {
+        return peerInfo.getPartitionKey();
+    }
+
+    public final String getConfirmContext() {
+        return confirmContext;
+    }
+
+    public long getCurrOffset() {
+        return peerInfo.getCurrOffset();
+    }
+
+    public final List<Message> getMessageList() {
+        return messageList;
+    }
+
+    public long getMaxOffset() {
+        return peerInfo.getMaxOffset();
+    }
+}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/PeerInfo.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/PeerInfo.java
index ac6fecc..3628beb 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/PeerInfo.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/PeerInfo.java
@@ -26,6 +26,7 @@ public class PeerInfo {
     private String partitionKey = "";
     private long currOffset = TBaseConstants.META_VALUE_UNDEFINED;
     private long maxOffset = TBaseConstants.META_VALUE_UNDEFINED;
+    private long msgLagCount = TBaseConstants.META_VALUE_UNDEFINED;
 
     public PeerInfo() {
 
@@ -59,9 +60,17 @@ public class PeerInfo {
         }
         this.currOffset = newOffset;
         this.maxOffset = maxOffset;
+        if (this.currOffset >= 0 && this.maxOffset >= 0) {
+            this.msgLagCount =
+                    (this.maxOffset - this.currOffset) / TBaseConstants.INDEX_MSG_UNIT_SIZE;
+        }
     }
 
     public long getMaxOffset() {
         return maxOffset;
     }
+
+    public long getMsgLagCount() {
+        return msgLagCount;
+    }
 }
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/QueryMetaResult.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/QueryMetaResult.java
new file mode 100644
index 0000000..996fd48
--- /dev/null
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/QueryMetaResult.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.client.common;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.inlong.tubemq.corebase.rv.RetValue;
+
+public class QueryMetaResult extends RetValue {
+
+    Map<String, Boolean> partStatusMap = new HashMap<>();
+
+    public QueryMetaResult() {
+        super();
+    }
+
+    public QueryMetaResult(QueryMetaResult other) {
+        super(other);
+    }
+
+    public QueryMetaResult(int errCode, String errInfo) {
+        super(errCode, errInfo);
+    }
+
+    public void setFailResult(int errCode, final String errMsg) {
+        super.setFailResult(errCode, errMsg);
+    }
+
+    public void setFailResult(final String errMsg) {
+        super.setFailResult(errMsg);
+    }
+
+    public void setSuccResult(Map<String, Boolean> partStatusMap) {
+        super.setSuccResult();
+        this.partStatusMap = partStatusMap;
+    }
+
+    public Map<String, Boolean> getPartStatusMap() {
+        return partStatusMap;
+    }
+
+    public void clear() {
+        super.clear();
+    }
+}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/TClientConstants.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/TClientConstants.java
index bd6039e..06b8293 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/TClientConstants.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/common/TClientConstants.java
@@ -37,4 +37,7 @@ public class TClientConstants {
     public static final int MAX_CONNECTION_FAILURE_LOG_TIMES = 10;
     public static final int MAX_SUBSCRIBE_REPORT_INTERVAL_TIMES = 6;
 
+    public static final long CFG_DEFAULT_META_QUERY_WAIT_PERIOD_MS = 10000L;
+    public static final long CFG_MIN_META_QUERY_WAIT_PERIOD_MS = 5000L;
+
 }
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java
index ea6bd5a..6dd19b8 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java
@@ -28,7 +28,7 @@ import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
  */
 public class ConsumerConfig extends TubeClientConfig {
 
-    private String consumerGroup;
+    private final String consumerGroup;
 
     /* consumeModel
      *    Set the start position of the consumer group. The value can be [-1, 0, 1]. Default value is 0.
@@ -47,6 +47,8 @@ public class ConsumerConfig extends TubeClientConfig {
             TClientConstants.CFG_DEFAULT_CONSUME_READ_CHECK_SLICE_MS;
     private long shutDownRebalanceWaitPeriodMs =
             TClientConstants.CFG_DEFAULT_SHUTDOWN_REBALANCE_WAIT_PERIOD_MS;
+    private long partMetaInfoCheckPeriodMs =
+            TClientConstants.CFG_DEFAULT_META_QUERY_WAIT_PERIOD_MS;
     private int pushFetchThreadCnt =
             TClientConstants.CFG_DEFAULT_CLIENT_PUSH_FETCH_THREAD_CNT;
     private boolean pushListenerWaitTimeoutRollBack = true;
@@ -159,6 +161,17 @@ public class ConsumerConfig extends TubeClientConfig {
         this.shutDownRebalanceWaitPeriodMs = shutDownRebalanceWaitPeriodMs;
     }
 
+    public long getPartMetaInfoCheckPeriodMs() {
+        return partMetaInfoCheckPeriodMs;
+    }
+
+    public void setPartMetaInfoCheckPeriodMs(long partMetaInfoCheckPeriodMs) {
+        if (partMetaInfoCheckPeriodMs < TClientConstants.CFG_MIN_META_QUERY_WAIT_PERIOD_MS) {
+            this.partMetaInfoCheckPeriodMs = TClientConstants.CFG_MIN_META_QUERY_WAIT_PERIOD_MS;
+        }
+        this.partMetaInfoCheckPeriodMs = partMetaInfoCheckPeriodMs;
+    }
+
     public int getPushFetchThreadCnt() {
         return pushFetchThreadCnt;
     }
@@ -270,6 +283,8 @@ public class ConsumerConfig extends TubeClientConfig {
                 .append(",\"pullConfirmWaitPeriodMs\":").append(this.pullRebConfirmWaitPeriodMs)
                 .append(",\"pullProtectConfirmTimeoutPeriodMs\":").append(this.pullProtectConfirmTimeoutMs)
                 .append(",\"pullConfirmInLocal\":").append(this.pullConfirmInLocal)
+                .append(",\"maxSubInfoReportIntvlTimes\":").append(this.maxSubInfoReportIntvlTimes)
+                .append(",\"partMetaInfoCheckPeriodMs\":").append(this.partMetaInfoCheckPeriodMs)
                 .append(",\"ClientConfig\":").append(toJsonString())
                 .append("}").toString();
     }
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
index c0cfaef..4386004 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
@@ -1058,7 +1058,7 @@ public class BaseMessageConsumer implements MessageConsumer {
                 && consumeSubInfo.isRequireBound()
                 && consumeSubInfo.getIsNotAllocated()) {
             Long currOffset = consumeSubInfo.getAssignedPartOffset(partition.getPartitionKey());
-            if (currOffset != null && currOffset != -1) {
+            if (currOffset != null && currOffset >= 0) {
                 builder.setCurrOffset(currOffset);
             }
         }
@@ -1720,7 +1720,7 @@ public class BaseMessageConsumer implements MessageConsumer {
                                                 logger.warn(strBuffer
                                                         .append("[heart2broker error] partition:")
                                                         .append(failPartition.toString())
-                                                        .append(" errorCode")
+                                                        .append(", errorCode=")
                                                         .append(errorCode).toString());
                                                 strBuffer.delete(0, strBuffer.length());
                                             }
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/ClientBalanceConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/ClientBalanceConsumer.java
new file mode 100644
index 0000000..4a781be
--- /dev/null
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/ClientBalanceConsumer.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.client.consumer;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import org.apache.inlong.tubemq.client.common.ConfirmResult;
+import org.apache.inlong.tubemq.client.common.ConsumeResult;
+import org.apache.inlong.tubemq.client.common.QueryMetaResult;
+import org.apache.inlong.tubemq.client.config.ConsumerConfig;
+import org.apache.inlong.tubemq.client.exception.TubeClientException;
+import org.apache.inlong.tubemq.corebase.Shutdownable;
+import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
+
+public interface ClientBalanceConsumer extends Shutdownable {
+
+    String getClientVersion();
+
+    String getConsumerId();
+
+    boolean isShutdown();
+
+    boolean isFilterConsume(String topic);
+
+    ConsumerConfig getConsumerConfig();
+
+    int getSourceCount();
+
+    int getNodeId();
+
+    /**
+     * start client-balance service
+     *
+     * @param topicAndFilterCondMap    subscribed topic and need filtered condition items of topic
+     *                                 if not need filter consume messages, set condition's set is empty or null
+     * @param sourceCount  the total count of clients that the consumer group will start this time
+     *                       If this value is set, the system will check that this parameter value
+     *                       carried by each client must be consistent,
+     *                       and the corresponding nodeId value must be unique in the consumer group;
+     *                       if this value is not set, Please set a negative number
+     * @param nodeId         the unique ID of the node in the consumer group
+     *                    Attention:The sourceCount and nodeId parameters are used when the client
+     *                              performs modular allocation of partitions to avoid
+     *                              allocation conflicts in advance and avoid
+     *                              repeated allocations to the same partition by the client
+     * @param result       call result, the parameter is not allowed to be null
+     * @throws TubeClientException    parameter abnormal
+     *
+     * @return  true if call success, false if failure
+     */
+    boolean start(Map<String, TreeSet<String>> topicAndFilterCondMap,
+                  int sourceCount, int nodeId, ProcessResult result) throws TubeClientException;
+
+    /**
+     * Query partition configure information from Master
+     *
+     * @param result    call result, the parameter is not allowed to be null
+     * @throws TubeClientException    parameter abnormal
+     *
+     * @return  true if call success, false if failure
+     */
+    boolean getPartitionMetaInfo(QueryMetaResult result) throws TubeClientException;
+
+    boolean isPartitionsReady(long maxWaitTime);
+
+    /**
+     * Get current registered partitionKey set
+     *
+     * @return  the partition key set registered
+     */
+    Set<String> getCurRegisteredPartSet();
+
+    /**
+     * Connect to the partition's broker for consumption
+     *
+     * @param partitionKey    partition key
+     * @param boostrapOffset     boostrap offset for consumption, if value:
+     *                           < 0, broker will not change stored offset,
+     *                           >= 0, broker will replace the stored offset with the specified value
+     * @param result    call result, the parameter is not allowed to be null
+     * @throws TubeClientException    parameter abnormal
+     *
+     * @return  true if call success, false if failure
+     */
+    boolean connect2Partition(String partitionKey, long boostrapOffset,
+                              ProcessResult result) throws TubeClientException;
+
+    /**
+     * Disconnect from the registered partition for partition release
+     *
+     * @param partitionKey    partition key
+     * @param result    call result, the parameter is not allowed to be null
+     * @throws TubeClientException    parameter abnormal
+     *
+     * @return  true if call success, false if failure
+     */
+    boolean disconnectFromPartition(String partitionKey,
+                                    ProcessResult result) throws TubeClientException;
+
+    /**
+     * Get consume offset information of the current registered partitions
+     *
+     * @return  consume offset information
+     */
+    Map<String, ConsumeOffsetInfo> getCurPartitionOffsetInfos();
+
+    /**
+     * Consume from messages from server
+     *
+     * @param result    call result, the parameter is not allowed to be null
+     * @throws TubeClientException    parameter abnormal
+     *
+     * @return  true if call success, false if failure
+     */
+    boolean getMessage(ConsumeResult result) throws TubeClientException;
+
+    /**
+     * Confirm whether the messages has been consumed
+     *
+     * @param confirmContext    confirm context, from the corresponding field value
+     *                         in the result of getting the message
+     * @param isConsumed    whether the data has been successfully processed, if
+     *                     true, tell the server to continue processing the next batch of data;
+     *                     false, tell the server that the data has not been processed successfully
+     *                            and need to be re-pulled for processing
+     * @throws TubeClientException    parameter abnormal
+     *
+     * @return  true if call success, false if failure
+     */
+    boolean confirmConsume(String confirmContext, boolean isConsumed,
+                           ConfirmResult result) throws TubeClientException;
+
+}
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/MessageSessionFactory.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/MessageSessionFactory.java
index 04359aa..2295464 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/MessageSessionFactory.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/MessageSessionFactory.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.tubemq.client.factory;
 
 import org.apache.inlong.tubemq.client.config.ConsumerConfig;
+import org.apache.inlong.tubemq.client.consumer.ClientBalanceConsumer;
 import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
 import org.apache.inlong.tubemq.client.consumer.PushMessageConsumer;
 import org.apache.inlong.tubemq.client.exception.TubeClientException;
@@ -39,4 +40,6 @@ public interface MessageSessionFactory extends Shutdownable {
     PullMessageConsumer createPullConsumer(ConsumerConfig consumerConfig)
             throws TubeClientException;
 
+    ClientBalanceConsumer createBalanceConsumer(ConsumerConfig consumerConfig)
+            throws TubeClientException;
 }
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeBaseSessionFactory.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeBaseSessionFactory.java
index 198cd81..b9b1c81 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeBaseSessionFactory.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeBaseSessionFactory.java
@@ -21,6 +21,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.inlong.tubemq.client.config.ConsumerConfig;
 import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.tubemq.client.consumer.ClientBalanceConsumer;
 import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
 import org.apache.inlong.tubemq.client.consumer.PushMessageConsumer;
 import org.apache.inlong.tubemq.client.consumer.SimplePullMessageConsumer;
@@ -164,9 +165,9 @@ public class TubeBaseSessionFactory implements InnerSessionFactory {
             throw new TubeClientException(new StringBuilder(512)
                     .append("consumerConfig's masterInfo not equal!")
                     .append(" SessionFactory's masterInfo is ")
-                    .append(tubeClientConfig.getMasterInfo())
+                    .append(tubeClientConfig.getMasterInfo().getMasterClusterStr())
                     .append(", consumerConfig's masterInfo is ")
-                    .append(consumerConfig.getMasterInfo()).toString());
+                    .append(consumerConfig.getMasterInfo().getMasterClusterStr()).toString());
         }
         return this.addClient(new SimplePullMessageConsumer(this, consumerConfig));
     }
@@ -178,13 +179,19 @@ public class TubeBaseSessionFactory implements InnerSessionFactory {
             throw new TubeClientException(new StringBuilder(512)
                     .append("consumerConfig's masterInfo not equal!")
                     .append(" SessionFactory's masterInfo is ")
-                    .append(tubeClientConfig.getMasterInfo())
+                    .append(tubeClientConfig.getMasterInfo().getMasterClusterStr())
                     .append(", consumerConfig's masterInfo is ")
-                    .append(consumerConfig.getMasterInfo()).toString());
+                    .append(consumerConfig.getMasterInfo().getMasterClusterStr()).toString());
         }
         return this.addClient(new SimplePushMessageConsumer(this, consumerConfig));
     }
 
+    @Override
+    public ClientBalanceConsumer createBalanceConsumer(ConsumerConfig consumerConfig)
+            throws TubeClientException {
+        return null;
+    }
+
     public boolean isShutdown() {
         return shutdown.get();
     }
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeMultiSessionFactory.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeMultiSessionFactory.java
index a113c60..715f08e 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeMultiSessionFactory.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeMultiSessionFactory.java
@@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.inlong.tubemq.client.config.ConsumerConfig;
 import org.apache.inlong.tubemq.client.config.TubeClientConfig;
 import org.apache.inlong.tubemq.client.config.TubeClientConfigUtils;
+import org.apache.inlong.tubemq.client.consumer.ClientBalanceConsumer;
 import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
 import org.apache.inlong.tubemq.client.consumer.PushMessageConsumer;
 import org.apache.inlong.tubemq.client.exception.TubeClientException;
@@ -84,4 +85,9 @@ public class TubeMultiSessionFactory implements MessageSessionFactory {
         return this.baseSessionFactory.createPullConsumer(consumerConfig);
     }
 
+    @Override
+    public ClientBalanceConsumer createBalanceConsumer(ConsumerConfig consumerConfig)
+            throws TubeClientException {
+        return null;
+    }
 }
diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeSingleSessionFactory.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeSingleSessionFactory.java
index 03fd450..f5d4411 100644
--- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeSingleSessionFactory.java
+++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/factory/TubeSingleSessionFactory.java
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.inlong.tubemq.client.config.ConsumerConfig;
 import org.apache.inlong.tubemq.client.config.TubeClientConfig;
 import org.apache.inlong.tubemq.client.config.TubeClientConfigUtils;
+import org.apache.inlong.tubemq.client.consumer.ClientBalanceConsumer;
 import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
 import org.apache.inlong.tubemq.client.consumer.PushMessageConsumer;
 import org.apache.inlong.tubemq.client.exception.TubeClientException;
@@ -100,6 +101,12 @@ public class TubeSingleSessionFactory implements MessageSessionFactory {
         return baseSessionFactory.createPullConsumer(consumerConfig);
     }
 
+    @Override
+    public ClientBalanceConsumer createBalanceConsumer(ConsumerConfig consumerConfig)
+            throws TubeClientException {
+        return null;
+    }
+
     public NettyClientFactory getRpcServiceFactory() {
         return clientFactory;
     }
diff --git a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TBaseConstants.java b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TBaseConstants.java
index ca5d31f..2f0595c 100644
--- a/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TBaseConstants.java
+++ b/inlong-tubemq/tubemq-core/src/main/java/org/apache/inlong/tubemq/corebase/TBaseConstants.java
@@ -80,6 +80,8 @@ public class TBaseConstants {
     public static final int META_MAX_MESSAGE_DATA_SIZE_UPPER_LIMIT =
             META_MAX_ALLOWED_MESSAGE_SIZE_MB * META_MB_UNIT_SIZE;
 
+    public static final long INDEX_MSG_UNIT_SIZE = 28;
+
     public static final long CFG_DEF_META_FORCE_UPDATE_PERIOD = 3 * 60 * 1000;
     public static final long CFG_MIN_META_FORCE_UPDATE_PERIOD = 1 * 60 * 1000;