You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by la...@apache.org on 2020/09/15 03:14:02 UTC

[incubator-tubemq] branch master updated: [TUBEMQ-345]Optimize the call logic of getMessage() in Pull mode (#261)

This is an automated email from the ASF dual-hosted git repository.

lamberliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/master by this push:
     new 2348b77  [TUBEMQ-345]Optimize the call logic of getMessage() in Pull mode (#261)
2348b77 is described below

commit 2348b77fe70fa63ba0de5c85cbfb085a8820bb19
Author: gosonzhang <46...@qq.com>
AuthorDate: Tue Sep 15 11:13:51 2020 +0800

    [TUBEMQ-345]Optimize the call logic of getMessage() in Pull mode (#261)
    
    Co-authored-by: gosonzhang <go...@tencent.com>
---
 .../tubemq/client/common/TClientConstants.java     |  4 ++-
 .../tubemq/client/config/ConsumerConfig.java       | 23 +++++++++++++
 .../tubemq/client/consumer/RmtDataCache.java       | 39 ++++++++++++----------
 .../client/consumer/SimplePullMessageConsumer.java | 21 +++++++++++-
 .../tubemq/example/MessagePullConsumerExample.java | 23 +++----------
 .../example/MessagePullSetConsumerExample.java     | 22 +++---------
 6 files changed, 75 insertions(+), 57 deletions(-)

diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/common/TClientConstants.java b/tubemq-client/src/main/java/org/apache/tubemq/client/common/TClientConstants.java
index 15c6d27..36808c0 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/common/TClientConstants.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/common/TClientConstants.java
@@ -23,7 +23,9 @@ public class TClientConstants {
     public static final int CFG_DEFAULT_HEARTBEAT_RETRY_TIMES = 5;
     public static final long CFG_DEFAULT_HEARTBEAT_PERIOD_MS = 13000;
     public static final long CFG_DEFAULT_REGFAIL_WAIT_PERIOD_MS = 1000;
-    public static final long CFG_DEFAULT_MSG_NOTFOUND_WAIT_PERIOD_MS = 200L;
+    public static final long CFG_DEFAULT_MSG_NOTFOUND_WAIT_PERIOD_MS = 400L;
+    public static final long CFG_DEFAULT_CONSUME_READ_WAIT_PERIOD_MS = 90000L;
+    public static final long CFG_DEFAULT_CONSUME_READ_CHECK_SLICE_MS = 300L;
     public static final long CFG_DEFAULT_PUSH_LISTENER_WAIT_PERIOD_MS = 3000L;
     public static final long CFG_DEFAULT_PULL_REB_CONFIRM_WAIT_PERIOD_MS = 3000L;
     public static final long CFG_DEFAULT_PULL_PROTECT_CONFIRM_WAIT_PERIOD_MS = 60000L;
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java b/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java
index 30801d4..d8b63fb 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java
@@ -41,6 +41,10 @@ public class ConsumerConfig extends TubeClientConfig {
             TClientConstants.MAX_SUBSCRIBE_REPORT_INTERVAL_TIMES;
     private long msgNotFoundWaitPeriodMs =
             TClientConstants.CFG_DEFAULT_MSG_NOTFOUND_WAIT_PERIOD_MS;
+    private long pullConsumeReadyWaitPeriodMs =
+            TClientConstants.CFG_DEFAULT_CONSUME_READ_WAIT_PERIOD_MS;
+    private long pullConsumeReadyChkSliceMs =
+            TClientConstants.CFG_DEFAULT_CONSUME_READ_CHECK_SLICE_MS;
     private long shutDownRebalanceWaitPeriodMs =
             TClientConstants.CFG_DEFAULT_SHUTDOWN_REBALANCE_WAIT_PERIOD_MS;
     private int pushFetchThreadCnt =
@@ -116,6 +120,25 @@ public class ConsumerConfig extends TubeClientConfig {
         this.msgNotFoundWaitPeriodMs = msgNotFoundWaitPeriodMs;
     }
 
+    public long getPullConsumeReadyWaitPeriodMs() {
+        return pullConsumeReadyWaitPeriodMs;
+    }
+
+    public void setPullConsumeReadyWaitPeriodMs(long pullConsumeReadyWaitPeriodMs) {
+        this.pullConsumeReadyWaitPeriodMs = pullConsumeReadyWaitPeriodMs;
+    }
+
+    public long getPullConsumeReadyChkSliceMs() {
+        return pullConsumeReadyChkSliceMs;
+    }
+
+    public void setPullConsumeReadyChkSliceMs(long pullConsumeReadyChkSliceMs) {
+        if (pullConsumeReadyChkSliceMs >= 0
+                && pullConsumeReadyChkSliceMs <= 1000) {
+            this.pullConsumeReadyChkSliceMs = pullConsumeReadyChkSliceMs;
+        }
+    }
+
     public long getShutDownRebalanceWaitPeriodMs() {
         return shutDownRebalanceWaitPeriodMs;
     }
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
index 2bc793b..a041042 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
@@ -151,22 +151,11 @@ public class RmtDataCache implements Closeable {
     }
 
     /**
-     * Pull the selected partitions.
-     *
-     * @return pull result
+     * Get current partition's consume status.
+     * @return current status
      */
-    public PartitionSelectResult pullSelect() {
-        int count = 6;
-        do {
-            if (this.isClosed.get()) {
-                break;
-            }
-            if (!partitionMap.isEmpty()) {
-                break;
-            }
-            ThreadUtils.sleep(350);
-        } while (--count > 0);
-        if (this.isClosed.get()) {
+    public PartitionSelectResult getCurrPartsStatus() {
+        if (isClosed.get()) {
             return new PartitionSelectResult(false,
                     TErrCodeConstants.BAD_REQUEST,
                     "Client instance has been shutdown!");
@@ -177,7 +166,7 @@ public class RmtDataCache implements Closeable {
                     "No partition info in local, please wait and try later");
         }
         if (indexPartition.isEmpty()) {
-            if (hasPartitionWait()) {
+            if (!timeouts.isEmpty()) {
                 return new PartitionSelectResult(false,
                         TErrCodeConstants.ALL_PARTITION_WAITING,
                         "All partition in waiting, retry later!");
@@ -187,10 +176,24 @@ public class RmtDataCache implements Closeable {
                         "No idle partition to consume, please wait and try later");
             } else {
                 return new PartitionSelectResult(false,
-                    TErrCodeConstants.ALL_PARTITION_FROZEN,
-                    "All partition are frozen to consume, please unfreeze partition(s) or wait");
+                        TErrCodeConstants.ALL_PARTITION_FROZEN,
+                        "All partition are frozen to consume, please unfreeze partition(s) or wait");
             }
         }
+        return new PartitionSelectResult(true,
+                TErrCodeConstants.SUCCESS, "OK");
+    }
+
+    /**
+     * Pull the selected partitions.
+     *
+     * @return pull result
+     */
+    public PartitionSelectResult pullSelect() {
+        PartitionSelectResult result = getCurrPartsStatus();
+        if (!result.isSuccess()) {
+            return result;
+        }
         waitCont.incrementAndGet();
         try {
             rebProcessWait();
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/SimplePullMessageConsumer.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/SimplePullMessageConsumer.java
index e30b80c..b0fc8b9 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/SimplePullMessageConsumer.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/SimplePullMessageConsumer.java
@@ -30,6 +30,7 @@ import org.apache.tubemq.corebase.cluster.Partition;
 import org.apache.tubemq.corebase.protobuf.generated.ClientBroker;
 import org.apache.tubemq.corebase.utils.AddressUtils;
 import org.apache.tubemq.corebase.utils.TStringUtils;
+import org.apache.tubemq.corebase.utils.ThreadUtils;
 
 /**
  * An implementation of PullMessageConsumer
@@ -130,9 +131,27 @@ public class SimplePullMessageConsumer implements PullMessageConsumer {
         if (!baseConsumer.isSubscribed()) {
             throw new TubeClientException("Please complete topic's Subscribe call first!");
         }
+        PartitionSelectResult selectResult = null;
+        long startTime = System.currentTimeMillis();
+        while (true) {
+            if (baseConsumer.isShutdown()) {
+                return new ConsumerResult(TErrCodeConstants.BAD_REQUEST,
+                        "Client instance has been shutdown!");
+            }
+            selectResult = baseConsumer.rmtDataCache.getCurrPartsStatus();
+            if (selectResult.isSuccess()) {
+                break;
+            }
+            if ((baseConsumer.getConsumerConfig().getPullConsumeReadyWaitPeriodMs() >= 0)
+                && (System.currentTimeMillis() - startTime >=
+                    baseConsumer.getConsumerConfig().getPullConsumeReadyWaitPeriodMs())) {
+                return new ConsumerResult(selectResult.getErrCode(), selectResult.getErrMsg());
+            }
+            ThreadUtils.sleep(baseConsumer.getConsumerConfig().getPullConsumeReadyChkSliceMs());
+        }
         StringBuilder sBuilder = new StringBuilder(512);
         // Check the data cache first
-        PartitionSelectResult selectResult = baseConsumer.rmtDataCache.pullSelect();
+        selectResult = baseConsumer.rmtDataCache.pullSelect();
         if (!selectResult.isSuccess()) {
             return new ConsumerResult(selectResult.getErrCode(), selectResult.getErrMsg());
         }
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java
index 37978a2..16c45b2 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java
@@ -29,7 +29,6 @@ import org.apache.tubemq.client.exception.TubeClientException;
 import org.apache.tubemq.client.factory.MessageSessionFactory;
 import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
 import org.apache.tubemq.corebase.Message;
-import org.apache.tubemq.corebase.utils.ThreadUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -77,13 +76,6 @@ public final class MessagePullConsumerExample {
             fetchRunners[i].setName("_fetch_runner_" + i);
         }
 
-        // wait for client to join the exact consumer queue that consumer group allocated
-        while (!messageConsumer.isCurConsumeReady(1000)) {
-            ThreadUtils.sleep(1000);
-        }
-
-        logger.info("Wait and get partitions use time " + (System.currentTimeMillis() - startTime));
-
         for (Thread thread : fetchRunners) {
             thread.start();
         }
@@ -100,10 +92,6 @@ public final class MessagePullConsumerExample {
         messagePullConsumer.completeSubscribe();
     }
 
-    public boolean isCurConsumeReady(long waitTime) {
-        return messagePullConsumer.isPartitionsReady(waitTime);
-    }
-
     public ConsumerResult getMessage() throws TubeClientException {
         return messagePullConsumer.getMessage();
     }
@@ -139,19 +127,16 @@ public final class MessagePullConsumerExample {
                         }
                         messageConsumer.confirmConsume(result.getConfirmContext(), true);
                     } else {
-                        if (result.getErrCode() == 400
+                        if (!(result.getErrCode() == 400
+                                || result.getErrCode() == 404
                                 || result.getErrCode() == 405
                                 || result.getErrCode() == 406
                                 || result.getErrCode() == 407
-                                || result.getErrCode() == 408) {
-                            ThreadUtils.sleep(400);
-                        } else {
-                            if (result.getErrCode() != 404) {
-                                logger.info(
+                                || result.getErrCode() == 408)) {
+                            logger.info(
                                     "Receive messages errorCode is {}, Error message is {}",
                                     result.getErrCode(),
                                     result.getErrMsg());
-                            }
                         }
                     }
                     if (consumeCount > 0) {
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullSetConsumerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullSetConsumerExample.java
index f663d6e..d3afeaa 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullSetConsumerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullSetConsumerExample.java
@@ -34,7 +34,6 @@ import org.apache.tubemq.client.exception.TubeClientException;
 import org.apache.tubemq.client.factory.MessageSessionFactory;
 import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
 import org.apache.tubemq.corebase.Message;
-import org.apache.tubemq.corebase.utils.ThreadUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,16 +81,6 @@ public final class MessagePullSetConsumerExample {
                     MessagePullSetConsumerExample messageConsumer =
                         new MessagePullSetConsumerExample(masterHostAndPort, group);
                     messageConsumer.subscribe(topicList, partOffsetMap);
-
-                    // wait until the consumer is allocated parts
-                    Map<String, ConsumeOffsetInfo> curPartsMap = null;
-                    while (curPartsMap == null || curPartsMap.isEmpty()) {
-                        ThreadUtils.sleep(1000);
-                        curPartsMap = messageConsumer.getCurrPartitionOffsetMap();
-                    }
-
-                    logger.info("Get allocated partitions are " + curPartsMap.toString());
-
                     // main logic of consuming
                     do {
                         ConsumerResult result = messageConsumer.getMessage();
@@ -136,19 +125,16 @@ public final class MessagePullSetConsumerExample {
                                     confirmResult.getErrMsg());
                             }
                         } else {
-                            if (result.getErrCode() == 400
+                            if (!(result.getErrCode() == 400
+                                    || result.getErrCode() == 404
                                     || result.getErrCode() == 405
                                     || result.getErrCode() == 406
                                     || result.getErrCode() == 407
-                                    || result.getErrCode() == 408) {
-                                ThreadUtils.sleep(400);
-                            } else {
-                                if (result.getErrCode() != 404) {
-                                    logger.info(
+                                    || result.getErrCode() == 408)) {
+                                logger.info(
                                         "Receive messages errorCode is {}, Error message is {}",
                                         result.getErrCode(),
                                         result.getErrMsg());
-                                }
                             }
                         }
                         if (consumeCount >= 0) {