You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/09/08 12:50:55 UTC

[rocketmq] branch develop updated: [ISSUE #4999] Optimize ConcurrentMap#computeIfAbsent performance on jdk1.8 a workaround (#5008)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new ed9c775ee [ISSUE #4999] Optimize ConcurrentMap#computeIfAbsent  performance on jdk1.8 a workaround (#5008)
ed9c775ee is described below

commit ed9c775ee371eb6cbca9915f98f44f63cccc7f8b
Author: mxsm <lj...@gmail.com>
AuthorDate: Thu Sep 8 20:50:32 2022 +0800

    [ISSUE #4999] Optimize ConcurrentMap#computeIfAbsent  performance on jdk1.8 a workaround (#5008)
---
 .../common/utils/ConcurrentHashMapUtils.java       | 21 ++++---
 .../common/utils/ConcurrentHashMapUtilsTest.java   | 39 +++++++++++++
 .../namesrv/routeinfo/RouteInfoManager.java        | 65 +++++++++++-----------
 .../rocketmq/proxy/common/ReceiptHandleGroup.java  |  4 +-
 .../proxy/processor/ReceiptHandleProcessor.java    |  3 +-
 .../proxy/service/channel/ChannelManager.java      |  4 +-
 .../store/ha/autoswitch/AutoSwitchHAService.java   |  3 +-
 .../rocketmq/store/queue/QueueOffsetAssigner.java  |  8 +--
 8 files changed, 95 insertions(+), 52 deletions(-)

diff --git a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ConcurrentHashMapUtil.java b/common/src/main/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtils.java
similarity index 77%
rename from srvutil/src/main/java/org/apache/rocketmq/srvutil/ConcurrentHashMapUtil.java
rename to common/src/main/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtils.java
index cc98eb5bc..b994276c9 100644
--- a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ConcurrentHashMapUtil.java
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtils.java
@@ -14,23 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.srvutil;
+package org.apache.rocketmq.common.utils;
 
 import java.util.concurrent.ConcurrentMap;
 import java.util.function.Function;
 
-public class ConcurrentHashMapUtil {
+public abstract class ConcurrentHashMapUtils {
+
     private static final boolean IS_JDK8;
 
     static {
-        // Java 8 or lower: 1.6.0_23, 1.7.0, 1.7.0_80, 1.8.0_211
-        // Java 9 or higher: 9.0.1, 11.0.4, 12, 12.0.1
+        // Java 8
+        // Java 9+: 9,11,17
         IS_JDK8 = System.getProperty("java.version").startsWith("1.8.");
     }
 
-    private ConcurrentHashMapUtil() {
-    }
-
     /**
      * A temporary workaround for Java 8 specific performance issue JDK-8161372 .<br> Use implementation of
      * ConcurrentMap.computeIfAbsent instead.
@@ -39,10 +37,11 @@ public class ConcurrentHashMapUtil {
      */
     public static <K, V> V computeIfAbsent(ConcurrentMap<K, V> map, K key, Function<? super K, ? extends V> func) {
         if (IS_JDK8) {
-            V v, newValue;
-            return ((v = map.get(key)) == null &&
-                (newValue = func.apply(key)) != null &&
-                (v = map.putIfAbsent(key, newValue)) == null) ? newValue : v;
+            V v = map.get(key);
+            if (null == v) {
+                v = map.computeIfAbsent(key, func);
+            }
+            return v;
         } else {
             return map.computeIfAbsent(key, func);
         }
diff --git a/common/src/test/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtilsTest.java b/common/src/test/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtilsTest.java
new file mode 100644
index 000000000..89a4b0cda
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtilsTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.utils;
+
+import java.util.concurrent.ConcurrentHashMap;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class ConcurrentHashMapUtilsTest {
+
+    @Test
+    public void computeIfAbsent() {
+
+        ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
+        map.put("123", "1111");
+        String value = ConcurrentHashMapUtils.computeIfAbsent(map, "123", k -> "234");
+        assertEquals("1111", value);
+        String value1 = ConcurrentHashMapUtils.computeIfAbsent(map, "1232", k -> "2342");
+        assertEquals("2342", value1);
+        String value2 = ConcurrentHashMapUtils.computeIfAbsent(map, "123", k -> "2342");
+        assertEquals("1111", value2);
+    }
+}
\ No newline at end of file
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index ace963ca9..91ec4ca1f 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -47,6 +47,7 @@ import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
 import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
@@ -201,16 +202,16 @@ public class RouteInfoManager {
     }
 
     public RegisterBrokerResult registerBroker(
-            final String clusterName,
-            final String brokerAddr,
-            final String brokerName,
-            final long brokerId,
-            final String haServerAddr,
-            final String zoneName,
-            final Long timeoutMillis,
-            final TopicConfigSerializeWrapper topicConfigWrapper,
-            final List<String> filterServerList,
-            final Channel channel) {
+        final String clusterName,
+        final String brokerAddr,
+        final String brokerName,
+        final long brokerId,
+        final String haServerAddr,
+        final String zoneName,
+        final Long timeoutMillis,
+        final TopicConfigSerializeWrapper topicConfigWrapper,
+        final List<String> filterServerList,
+        final Channel channel) {
         return registerBroker(clusterName, brokerAddr, brokerName, brokerId, haServerAddr, zoneName, timeoutMillis, false, topicConfigWrapper, filterServerList, channel);
     }
 
@@ -231,7 +232,7 @@ public class RouteInfoManager {
             this.lock.writeLock().lockInterruptibly();
 
             //init or update the cluster info
-            Set<String> brokerNames = this.clusterAddrTable.computeIfAbsent(clusterName, k -> new HashSet<>());
+            Set<String> brokerNames = ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap<String, Set<String>>) this.clusterAddrTable, clusterName, k -> new HashSet<>());
             brokerNames.add(brokerName);
 
             boolean registerFirst = false;
@@ -273,8 +274,8 @@ public class RouteInfoManager {
                     long newStateVersion = topicConfigWrapper.getDataVersion().getStateVersion();
                     if (oldStateVersion > newStateVersion) {
                         log.warn("Registered Broker conflicts with the existed one, just ignore.: Cluster:{}, BrokerName:{}, BrokerId:{}, " +
-                                        "Old BrokerAddr:{}, Old Version:{}, New BrokerAddr:{}, New Version:{}.",
-                                clusterName, brokerName, brokerId, oldBrokerAddr, oldStateVersion, brokerAddr, newStateVersion);
+                                "Old BrokerAddr:{}, Old Version:{}, New BrokerAddr:{}, New Version:{}.",
+                            clusterName, brokerName, brokerId, oldBrokerAddr, oldStateVersion, brokerAddr, newStateVersion);
                         //Remove the rejected brokerAddr from brokerLiveTable.
                         brokerLiveTable.remove(new BrokerAddrInfo(clusterName, brokerAddr));
                         return result;
@@ -284,7 +285,7 @@ public class RouteInfoManager {
 
             if (!brokerAddrsMap.containsKey(brokerId) && topicConfigWrapper.getTopicConfigTable().size() == 1) {
                 log.warn("Can't register topicConfigWrapper={} because broker[{}]={} has not registered.",
-                        topicConfigWrapper.getTopicConfigTable(), brokerId, brokerAddr);
+                    topicConfigWrapper.getTopicConfigTable(), brokerId, brokerAddr);
                 return null;
             }
 
@@ -293,17 +294,17 @@ public class RouteInfoManager {
 
             boolean isMaster = MixAll.MASTER_ID == brokerId;
             boolean isPrimeSlave = !isOldVersionBroker && !isMaster
-                    && brokerId == Collections.min(brokerAddrsMap.keySet());
+                && brokerId == Collections.min(brokerAddrsMap.keySet());
 
             if (null != topicConfigWrapper && (isMaster || isPrimeSlave)) {
 
                 ConcurrentMap<String, TopicConfig> tcTable =
-                        topicConfigWrapper.getTopicConfigTable();
+                    topicConfigWrapper.getTopicConfigTable();
                 if (tcTable != null) {
                     for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                         if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr,
-                                topicConfigWrapper.getDataVersion(), brokerName,
-                                entry.getValue().getTopicName())) {
+                            topicConfigWrapper.getDataVersion(), brokerName,
+                            entry.getValue().getTopicName())) {
                             final TopicConfig topicConfig = entry.getValue();
                             if (isPrimeSlave) {
                                 // Wipe write perm for prime slave
@@ -331,12 +332,12 @@ public class RouteInfoManager {
 
             BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
             BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddrInfo,
-                    new BrokerLiveInfo(
-                            System.currentTimeMillis(),
-                            timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
-                            topicConfigWrapper == null ? new DataVersion() : topicConfigWrapper.getDataVersion(),
-                            channel,
-                            haServerAddr));
+                new BrokerLiveInfo(
+                    System.currentTimeMillis(),
+                    timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
+                    topicConfigWrapper == null ? new DataVersion() : topicConfigWrapper.getDataVersion(),
+                    channel,
+                    haServerAddr));
             if (null == prevBrokerLiveInfo) {
                 log.info("new broker registered, {} HAService: {}", brokerAddrInfo, haServerAddr);
             }
@@ -363,7 +364,7 @@ public class RouteInfoManager {
 
             if (isMinBrokerIdChanged && namesrvConfig.isNotifyMinBrokerIdChanged()) {
                 notifyMinBrokerIdChanged(brokerAddrsMap, null,
-                        this.brokerLiveTable.get(brokerAddrInfo).getHaServerAddr());
+                    this.brokerLiveTable.get(brokerAddrInfo).getHaServerAddr());
             }
         } catch (Exception e) {
             log.error("registerBroker Exception", e);
@@ -679,9 +680,9 @@ public class RouteInfoManager {
                         continue;
                     }
                     BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(),
-                            brokerData.getBrokerName(),
-                            (HashMap<Long, String>) brokerData.getBrokerAddrs().clone(),
-                            brokerData.isEnableActingMaster(), brokerData.getZoneName());
+                        brokerData.getBrokerName(),
+                        (HashMap<Long, String>) brokerData.getBrokerAddrs().clone(),
+                        brokerData.isEnableActingMaster(), brokerData.getZoneName());
 
                     brokerDataList.add(brokerDataClone);
                     foundBrokerData = true;
@@ -1023,7 +1024,7 @@ public class RouteInfoManager {
                 String topic = topicEntry.getKey();
                 Map<String, QueueData> queueDatas = topicEntry.getValue();
                 if (queueDatas != null && queueDatas.size() > 0
-                        && TopicSysFlag.hasUnitFlag(queueDatas.values().iterator().next().getTopicSysFlag())) {
+                    && TopicSysFlag.hasUnitFlag(queueDatas.values().iterator().next().getTopicSysFlag())) {
                     topicList.getTopicList().add(topic);
                 }
             }
@@ -1044,7 +1045,7 @@ public class RouteInfoManager {
                 String topic = topicEntry.getKey();
                 Map<String, QueueData> queueDatas = topicEntry.getValue();
                 if (queueDatas != null && queueDatas.size() > 0
-                        && TopicSysFlag.hasUnitSubFlag(queueDatas.values().iterator().next().getTopicSysFlag())) {
+                    && TopicSysFlag.hasUnitSubFlag(queueDatas.values().iterator().next().getTopicSysFlag())) {
                     topicList.getTopicList().add(topic);
                 }
             }
@@ -1065,8 +1066,8 @@ public class RouteInfoManager {
                 String topic = topicEntry.getKey();
                 Map<String, QueueData> queueDatas = topicEntry.getValue();
                 if (queueDatas != null && queueDatas.size() > 0
-                        && !TopicSysFlag.hasUnitFlag(queueDatas.values().iterator().next().getTopicSysFlag())
-                        && TopicSysFlag.hasUnitSubFlag(queueDatas.values().iterator().next().getTopicSysFlag())) {
+                    && !TopicSysFlag.hasUnitFlag(queueDatas.values().iterator().next().getTopicSysFlag())
+                    && TopicSysFlag.hasUnitSubFlag(queueDatas.values().iterator().next().getTopicSysFlag())) {
                     topicList.getTopicList().add(topic);
                 }
             }
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
index d2f447273..07d32445f 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
@@ -26,6 +26,7 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
+import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 
 public class ReceiptHandleGroup {
@@ -74,7 +75,8 @@ public class ReceiptHandleGroup {
 
     public void put(String msgID, String handle, MessageReceiptHandle value) {
         long timeout = ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup();
-        Map<String, HandleData> handleMap = receiptHandleMap.computeIfAbsent(msgID, msgIDKey -> new ConcurrentHashMap<>());
+        Map<String, HandleData> handleMap = ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap<String, Map<String, HandleData>>) this.receiptHandleMap,
+            msgID, msgIDKey -> new ConcurrentHashMap<>());
         handleMap.compute(handle, (handleKey, handleData) -> {
             if (handleData == null || handleData.needRemove) {
                 return new HandleData(value);
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index 9bb8b7f9f..15c4385fd 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -40,6 +40,7 @@ import org.apache.rocketmq.common.consumer.ReceiptHandle;
 import org.apache.rocketmq.common.subscription.RetryPolicy;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
+import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
 import org.apache.rocketmq.proxy.common.AbstractStartAndShutdown;
 import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
 import org.apache.rocketmq.proxy.common.ProxyContext;
@@ -239,7 +240,7 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
         if (key == null) {
             return;
         }
-        receiptHandleGroupMap.computeIfAbsent(key,
+        ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, key,
             k -> new ReceiptHandleGroup()).put(msgID, receiptHandle, messageReceiptHandle);
     }
 
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/ChannelManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/ChannelManager.java
index d730d9118..283cd823d 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/ChannelManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/ChannelManager.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
 import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,8 +38,7 @@ public class ChannelManager {
             log.warn("ClientId is unexpected null or empty");
             return createChannelInner(context);
         }
-
-        SimpleChannel channel = clientIdChannelMap.computeIfAbsent(clientId, k -> createChannelInner(context));
+        SimpleChannel channel = ConcurrentHashMapUtils.computeIfAbsent(this.clientIdChannelMap,clientId, k -> createChannelInner(context));
         channel.updateLastAccessTime();
         return channel;
     }
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 74de4d691..49794c28a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -33,6 +33,7 @@ import org.apache.rocketmq.common.EpochEntry;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.protocol.body.HARuntimeInfo;
+import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.store.DefaultMessageStore;
@@ -237,7 +238,7 @@ public class AutoSwitchHAService extends DefaultHAService {
     }
 
     public void updateConnectionLastCaughtUpTime(final String slaveAddress, final long lastCaughtUpTimeMs) {
-        long prevTime = this.connectionCaughtUpTimeTable.computeIfAbsent(slaveAddress, k -> 0L);
+        Long prevTime = ConcurrentHashMapUtils.computeIfAbsent(this.connectionCaughtUpTimeTable, slaveAddress, k -> 0L);
         this.connectionCaughtUpTimeTable.put(slaveAddress, Math.max(prevTime, lastCaughtUpTimeMs));
     }
 
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
index 55614ccba..5e87bbc03 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
@@ -20,12 +20,12 @@ package org.apache.rocketmq.store.queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 
 /**
  * QueueOffsetAssigner is a component for assigning offsets for queues.
- *
  */
 public class QueueOffsetAssigner {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
@@ -35,7 +35,7 @@ public class QueueOffsetAssigner {
     private ConcurrentMap<String/* topic-queueid */, Long/* offset */> lmqTopicQueueTable = new ConcurrentHashMap<>(1024);
 
     public long assignQueueOffset(String topicQueueKey, short messageNum) {
-        long queueOffset = this.topicQueueTable.computeIfAbsent(topicQueueKey, k -> 0L);
+        Long queueOffset = ConcurrentHashMapUtils.computeIfAbsent(this.topicQueueTable, topicQueueKey, k -> 0L);
         this.topicQueueTable.put(topicQueueKey, queueOffset + messageNum);
         return queueOffset;
     }
@@ -45,13 +45,13 @@ public class QueueOffsetAssigner {
     }
 
     public long assignBatchQueueOffset(String topicQueueKey, short messageNum) {
-        Long topicOffset = this.batchTopicQueueTable.computeIfAbsent(topicQueueKey, k -> 0L);
+        Long topicOffset = ConcurrentHashMapUtils.computeIfAbsent(this.batchTopicQueueTable, topicQueueKey, k -> 0L);
         this.batchTopicQueueTable.put(topicQueueKey, topicOffset + messageNum);
         return topicOffset;
     }
 
     public long assignLmqOffset(String topicQueueKey, short messageNum) {
-        long topicOffset = this.lmqTopicQueueTable.computeIfAbsent(topicQueueKey, k -> 0L);
+        Long topicOffset = ConcurrentHashMapUtils.computeIfAbsent(this.lmqTopicQueueTable, topicQueueKey, k -> 0L);
         this.lmqTopicQueueTable.put(topicQueueKey, topicOffset + messageNum);
         return topicOffset;
     }