You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by la...@apache.org on 2020/09/15 03:14:02 UTC
[incubator-tubemq] branch master updated: [TUBEMQ-345]Optimize the
call logic of getMessage() in Pull mode (#261)
This is an automated email from the ASF dual-hosted git repository.
lamberliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push:
new 2348b77 [TUBEMQ-345]Optimize the call logic of getMessage() in Pull mode (#261)
2348b77 is described below
commit 2348b77fe70fa63ba0de5c85cbfb085a8820bb19
Author: gosonzhang <46...@qq.com>
AuthorDate: Tue Sep 15 11:13:51 2020 +0800
[TUBEMQ-345]Optimize the call logic of getMessage() in Pull mode (#261)
Co-authored-by: gosonzhang <go...@tencent.com>
---
.../tubemq/client/common/TClientConstants.java | 4 ++-
.../tubemq/client/config/ConsumerConfig.java | 23 +++++++++++++
.../tubemq/client/consumer/RmtDataCache.java | 39 ++++++++++++----------
.../client/consumer/SimplePullMessageConsumer.java | 21 +++++++++++-
.../tubemq/example/MessagePullConsumerExample.java | 23 +++----------
.../example/MessagePullSetConsumerExample.java | 22 +++---------
6 files changed, 75 insertions(+), 57 deletions(-)
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/common/TClientConstants.java b/tubemq-client/src/main/java/org/apache/tubemq/client/common/TClientConstants.java
index 15c6d27..36808c0 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/common/TClientConstants.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/common/TClientConstants.java
@@ -23,7 +23,9 @@ public class TClientConstants {
public static final int CFG_DEFAULT_HEARTBEAT_RETRY_TIMES = 5;
public static final long CFG_DEFAULT_HEARTBEAT_PERIOD_MS = 13000;
public static final long CFG_DEFAULT_REGFAIL_WAIT_PERIOD_MS = 1000;
- public static final long CFG_DEFAULT_MSG_NOTFOUND_WAIT_PERIOD_MS = 200L;
+ public static final long CFG_DEFAULT_MSG_NOTFOUND_WAIT_PERIOD_MS = 400L;
+ public static final long CFG_DEFAULT_CONSUME_READ_WAIT_PERIOD_MS = 90000L;
+ public static final long CFG_DEFAULT_CONSUME_READ_CHECK_SLICE_MS = 300L;
public static final long CFG_DEFAULT_PUSH_LISTENER_WAIT_PERIOD_MS = 3000L;
public static final long CFG_DEFAULT_PULL_REB_CONFIRM_WAIT_PERIOD_MS = 3000L;
public static final long CFG_DEFAULT_PULL_PROTECT_CONFIRM_WAIT_PERIOD_MS = 60000L;
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java b/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java
index 30801d4..d8b63fb 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java
@@ -41,6 +41,10 @@ public class ConsumerConfig extends TubeClientConfig {
TClientConstants.MAX_SUBSCRIBE_REPORT_INTERVAL_TIMES;
private long msgNotFoundWaitPeriodMs =
TClientConstants.CFG_DEFAULT_MSG_NOTFOUND_WAIT_PERIOD_MS;
+ private long pullConsumeReadyWaitPeriodMs =
+ TClientConstants.CFG_DEFAULT_CONSUME_READ_WAIT_PERIOD_MS;
+ private long pullConsumeReadyChkSliceMs =
+ TClientConstants.CFG_DEFAULT_CONSUME_READ_CHECK_SLICE_MS;
private long shutDownRebalanceWaitPeriodMs =
TClientConstants.CFG_DEFAULT_SHUTDOWN_REBALANCE_WAIT_PERIOD_MS;
private int pushFetchThreadCnt =
@@ -116,6 +120,25 @@ public class ConsumerConfig extends TubeClientConfig {
this.msgNotFoundWaitPeriodMs = msgNotFoundWaitPeriodMs;
}
+ public long getPullConsumeReadyWaitPeriodMs() {
+ return pullConsumeReadyWaitPeriodMs;
+ }
+
+ public void setPullConsumeReadyWaitPeriodMs(long pullConsumeReadyWaitPeriodMs) {
+ this.pullConsumeReadyWaitPeriodMs = pullConsumeReadyWaitPeriodMs;
+ }
+
+ public long getPullConsumeReadyChkSliceMs() {
+ return pullConsumeReadyChkSliceMs;
+ }
+
+ public void setPullConsumeReadyChkSliceMs(long pullConsumeReadyChkSliceMs) {
+ if (pullConsumeReadyChkSliceMs >= 0
+ && pullConsumeReadyChkSliceMs <= 1000) {
+ this.pullConsumeReadyChkSliceMs = pullConsumeReadyChkSliceMs;
+ }
+ }
+
public long getShutDownRebalanceWaitPeriodMs() {
return shutDownRebalanceWaitPeriodMs;
}
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
index 2bc793b..a041042 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
@@ -151,22 +151,11 @@ public class RmtDataCache implements Closeable {
}
/**
- * Pull the selected partitions.
- *
- * @return pull result
+ * Get current partition's consume status.
+ * @return current status
*/
- public PartitionSelectResult pullSelect() {
- int count = 6;
- do {
- if (this.isClosed.get()) {
- break;
- }
- if (!partitionMap.isEmpty()) {
- break;
- }
- ThreadUtils.sleep(350);
- } while (--count > 0);
- if (this.isClosed.get()) {
+ public PartitionSelectResult getCurrPartsStatus() {
+ if (isClosed.get()) {
return new PartitionSelectResult(false,
TErrCodeConstants.BAD_REQUEST,
"Client instance has been shutdown!");
@@ -177,7 +166,7 @@ public class RmtDataCache implements Closeable {
"No partition info in local, please wait and try later");
}
if (indexPartition.isEmpty()) {
- if (hasPartitionWait()) {
+ if (!timeouts.isEmpty()) {
return new PartitionSelectResult(false,
TErrCodeConstants.ALL_PARTITION_WAITING,
"All partition in waiting, retry later!");
@@ -187,10 +176,24 @@ public class RmtDataCache implements Closeable {
"No idle partition to consume, please wait and try later");
} else {
return new PartitionSelectResult(false,
- TErrCodeConstants.ALL_PARTITION_FROZEN,
- "All partition are frozen to consume, please unfreeze partition(s) or wait");
+ TErrCodeConstants.ALL_PARTITION_FROZEN,
+ "All partition are frozen to consume, please unfreeze partition(s) or wait");
}
}
+ return new PartitionSelectResult(true,
+ TErrCodeConstants.SUCCESS, "OK");
+ }
+
+ /**
+ * Pull the selected partitions.
+ *
+ * @return pull result
+ */
+ public PartitionSelectResult pullSelect() {
+ PartitionSelectResult result = getCurrPartsStatus();
+ if (!result.isSuccess()) {
+ return result;
+ }
waitCont.incrementAndGet();
try {
rebProcessWait();
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/SimplePullMessageConsumer.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/SimplePullMessageConsumer.java
index e30b80c..b0fc8b9 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/SimplePullMessageConsumer.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/SimplePullMessageConsumer.java
@@ -30,6 +30,7 @@ import org.apache.tubemq.corebase.cluster.Partition;
import org.apache.tubemq.corebase.protobuf.generated.ClientBroker;
import org.apache.tubemq.corebase.utils.AddressUtils;
import org.apache.tubemq.corebase.utils.TStringUtils;
+import org.apache.tubemq.corebase.utils.ThreadUtils;
/**
* An implementation of PullMessageConsumer
@@ -130,9 +131,27 @@ public class SimplePullMessageConsumer implements PullMessageConsumer {
if (!baseConsumer.isSubscribed()) {
throw new TubeClientException("Please complete topic's Subscribe call first!");
}
+ PartitionSelectResult selectResult = null;
+ long startTime = System.currentTimeMillis();
+ while (true) {
+ if (baseConsumer.isShutdown()) {
+ return new ConsumerResult(TErrCodeConstants.BAD_REQUEST,
+ "Client instance has been shutdown!");
+ }
+ selectResult = baseConsumer.rmtDataCache.getCurrPartsStatus();
+ if (selectResult.isSuccess()) {
+ break;
+ }
+ if ((baseConsumer.getConsumerConfig().getPullConsumeReadyWaitPeriodMs() >= 0)
+ && (System.currentTimeMillis() - startTime >=
+ baseConsumer.getConsumerConfig().getPullConsumeReadyWaitPeriodMs())) {
+ return new ConsumerResult(selectResult.getErrCode(), selectResult.getErrMsg());
+ }
+ ThreadUtils.sleep(baseConsumer.getConsumerConfig().getPullConsumeReadyChkSliceMs());
+ }
StringBuilder sBuilder = new StringBuilder(512);
// Check the data cache first
- PartitionSelectResult selectResult = baseConsumer.rmtDataCache.pullSelect();
+ selectResult = baseConsumer.rmtDataCache.pullSelect();
if (!selectResult.isSuccess()) {
return new ConsumerResult(selectResult.getErrCode(), selectResult.getErrMsg());
}
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java
index 37978a2..16c45b2 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java
@@ -29,7 +29,6 @@ import org.apache.tubemq.client.exception.TubeClientException;
import org.apache.tubemq.client.factory.MessageSessionFactory;
import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.tubemq.corebase.Message;
-import org.apache.tubemq.corebase.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,13 +76,6 @@ public final class MessagePullConsumerExample {
fetchRunners[i].setName("_fetch_runner_" + i);
}
- // wait for client to join the exact consumer queue that consumer group allocated
- while (!messageConsumer.isCurConsumeReady(1000)) {
- ThreadUtils.sleep(1000);
- }
-
- logger.info("Wait and get partitions use time " + (System.currentTimeMillis() - startTime));
-
for (Thread thread : fetchRunners) {
thread.start();
}
@@ -100,10 +92,6 @@ public final class MessagePullConsumerExample {
messagePullConsumer.completeSubscribe();
}
- public boolean isCurConsumeReady(long waitTime) {
- return messagePullConsumer.isPartitionsReady(waitTime);
- }
-
public ConsumerResult getMessage() throws TubeClientException {
return messagePullConsumer.getMessage();
}
@@ -139,19 +127,16 @@ public final class MessagePullConsumerExample {
}
messageConsumer.confirmConsume(result.getConfirmContext(), true);
} else {
- if (result.getErrCode() == 400
+ if (!(result.getErrCode() == 400
+ || result.getErrCode() == 404
|| result.getErrCode() == 405
|| result.getErrCode() == 406
|| result.getErrCode() == 407
- || result.getErrCode() == 408) {
- ThreadUtils.sleep(400);
- } else {
- if (result.getErrCode() != 404) {
- logger.info(
+ || result.getErrCode() == 408)) {
+ logger.info(
"Receive messages errorCode is {}, Error message is {}",
result.getErrCode(),
result.getErrMsg());
- }
}
}
if (consumeCount > 0) {
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullSetConsumerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullSetConsumerExample.java
index f663d6e..d3afeaa 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullSetConsumerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullSetConsumerExample.java
@@ -34,7 +34,6 @@ import org.apache.tubemq.client.exception.TubeClientException;
import org.apache.tubemq.client.factory.MessageSessionFactory;
import org.apache.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.tubemq.corebase.Message;
-import org.apache.tubemq.corebase.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,16 +81,6 @@ public final class MessagePullSetConsumerExample {
MessagePullSetConsumerExample messageConsumer =
new MessagePullSetConsumerExample(masterHostAndPort, group);
messageConsumer.subscribe(topicList, partOffsetMap);
-
- // wait until the consumer is allocated parts
- Map<String, ConsumeOffsetInfo> curPartsMap = null;
- while (curPartsMap == null || curPartsMap.isEmpty()) {
- ThreadUtils.sleep(1000);
- curPartsMap = messageConsumer.getCurrPartitionOffsetMap();
- }
-
- logger.info("Get allocated partitions are " + curPartsMap.toString());
-
// main logic of consuming
do {
ConsumerResult result = messageConsumer.getMessage();
@@ -136,19 +125,16 @@ public final class MessagePullSetConsumerExample {
confirmResult.getErrMsg());
}
} else {
- if (result.getErrCode() == 400
+ if (!(result.getErrCode() == 400
+ || result.getErrCode() == 404
|| result.getErrCode() == 405
|| result.getErrCode() == 406
|| result.getErrCode() == 407
- || result.getErrCode() == 408) {
- ThreadUtils.sleep(400);
- } else {
- if (result.getErrCode() != 404) {
- logger.info(
+ || result.getErrCode() == 408)) {
+ logger.info(
"Receive messages errorCode is {}, Error message is {}",
result.getErrCode(),
result.getErrMsg());
- }
}
}
if (consumeCount >= 0) {