You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/05/31 08:40:59 UTC
[incubator-tubemq] branch master updated: [TUBEMQ-155] Use enum
class for consume position[addendum] (#106)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/master by this push:
new ccb3490 [TUBEMQ-155] Use enum class for consume position[addendum] (#106)
ccb3490 is described below
commit ccb3490f489e43ae71db9bfbbdc7c71f9bf80bfe
Author: yanhai <yu...@163.com>
AuthorDate: Sun May 31 16:40:52 2020 +0800
[TUBEMQ-155] Use enum class for consume position[addendum] (#106)
Co-authored-by: yuchengxiang <yu...@meituan.com>
---
.../tubemq/client/config/ConsumerConfig.java | 30 +++++++++++++++++-----
.../client/consumer/BaseMessageConsumer.java | 4 +--
.../{ConsumeModel.java => ConsumePosition.java} | 15 ++++++++---
.../tubemq/example/MessageConsumerExample.java | 4 +--
.../tubemq/example/MessagePullConsumerExample.java | 4 +--
5 files changed, 41 insertions(+), 16 deletions(-)
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java b/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java
index 457d126..6cd4526 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java
@@ -18,7 +18,7 @@
package org.apache.tubemq.client.config;
import org.apache.tubemq.client.common.TClientConstants;
-import org.apache.tubemq.client.consumer.ConsumeModel;
+import org.apache.tubemq.client.consumer.ConsumePosition;
import org.apache.tubemq.corebase.TBaseConstants;
import org.apache.tubemq.corebase.cluster.MasterInfo;
import org.apache.tubemq.corebase.utils.TStringUtils;
@@ -36,7 +36,7 @@ public class ConsumerConfig extends TubeClientConfig {
* 0: Start from the latest position for the first time. Otherwise start from last consume position.
* 1: Start from the latest consume position.
*/
- private ConsumeModel consumeModel = ConsumeModel.CONSUMER_FROM_LATEST_OFFSET;
+ private ConsumePosition consumePosition = ConsumePosition.CONSUMER_FROM_LATEST_OFFSET;
private int maxSubInfoReportIntvlTimes =
TClientConstants.MAX_SUBSCRIBE_REPORT_INTERVAL_TIMES;
private long msgNotFoundWaitPeriodMs =
@@ -84,12 +84,28 @@ public class ConsumerConfig extends TubeClientConfig {
return consumerGroup;
}
- public ConsumeModel getConsumeModel() {
- return consumeModel;
+ public ConsumePosition getConsumePosition() {
+ return consumePosition;
}
- public void setConsumeModel(ConsumeModel consumeModel) {
- this.consumeModel = consumeModel;
+ public void setConsumePosition(ConsumePosition consumePosition) {
+ this.consumePosition = consumePosition;
+ }
+
+ /**
+ * recommend to use getConsumePosition
+ */
+ @Deprecated
+ public int getConsumeModel() {
+ return consumePosition.getCode();
+ }
+
+ /**
+ * recommend to use setConsumePosition
+ */
+ @Deprecated
+ public void setConsumeModel(int consumeModel) {
+ setConsumePosition(ConsumePosition.valueOf(consumeModel));
}
public long getMsgNotFoundWaitPeriodMs() {
@@ -208,7 +224,7 @@ public class ConsumerConfig extends TubeClientConfig {
return new StringBuilder(512).append("\"ConsumerConfig\":{")
.append("\"consumerGroup\":\"").append(this.consumerGroup)
.append("\",\"maxSubInfoReportIntvlTimes\":").append(this.maxSubInfoReportIntvlTimes)
- .append(",\"consumeModel\":").append(this.consumeModel)
+ .append(",\"consumePosition\":").append(this.consumePosition)
.append(",\"msgNotFoundWaitPeriodMs\":").append(this.msgNotFoundWaitPeriodMs)
.append(",\"shutDownRebalanceWaitPeriodMs\":").append(this.shutDownRebalanceWaitPeriodMs)
.append(",\"pushFetchThreadCnt\":").append(this.pushFetchThreadCnt)
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
index b37f9e0..5abc4ba 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
@@ -1179,7 +1179,7 @@ public class BaseMessageConsumer implements MessageConsumer {
private int getGroupInitReadStatus(boolean isFistReg) {
int readStatus = TBaseConstants.CONSUME_MODEL_READ_NORMAL;
- switch (consumerConfig.getConsumeModel()) {
+ switch (consumerConfig.getConsumePosition()) {
case CONSUMER_FROM_LATEST_OFFSET: {
if (isFistReg) {
readStatus = TBaseConstants.CONSUME_MODEL_READ_FROM_MAX;
@@ -1187,7 +1187,7 @@ public class BaseMessageConsumer implements MessageConsumer {
}
break;
}
- case CONSUMER_FROM_MAX_OFFSET: {
+ case CONSUMER_FROM_MAX_OFFSET_ALWAYS: {
if (isFistReg) {
readStatus = TBaseConstants.CONSUME_MODEL_READ_FROM_MAX_ALWAYS;
logger.info("[Consume From Max Offset Always]" + consumerId);
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/ConsumeModel.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/ConsumePosition.java
similarity index 72%
rename from tubemq-client/src/main/java/org/apache/tubemq/client/consumer/ConsumeModel.java
rename to tubemq-client/src/main/java/org/apache/tubemq/client/consumer/ConsumePosition.java
index 1c77ad9..31e4f07 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/ConsumeModel.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/ConsumePosition.java
@@ -16,8 +16,8 @@
*/
package org.apache.tubemq.client.consumer;
-public enum ConsumeModel {
- CONSUMER_FROM_MAX_OFFSET(1, "Always start from the max consume position."),
+public enum ConsumePosition {
+ CONSUMER_FROM_MAX_OFFSET_ALWAYS(1, "Always start from the max consume position."),
CONSUMER_FROM_LATEST_OFFSET(0, "Start from the latest position for the first time. " +
"Otherwise start from last consume position."),
CONSUMER_FROM_FIRST_OFFSET(-1, "Start from 0 for the first time. Otherwise start from last consume position.");
@@ -25,7 +25,7 @@ public enum ConsumeModel {
private int code;
private String description;
- ConsumeModel(int code, String description) {
+ ConsumePosition(int code, String description) {
this.code = code;
this.description = description;
}
@@ -37,4 +37,13 @@ public enum ConsumeModel {
public String getDescription() {
return description;
}
+
+ public static ConsumePosition valueOf(int code) {
+ for (ConsumePosition consumePosition : ConsumePosition.values()) {
+ if (consumePosition.getCode() == code) {
+ return consumePosition;
+ }
+ }
+ throw new IllegalArgumentException(String.format("unknown ConsumePosition code %s", code));
+ }
}
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java
index ecdee4f..c031a16 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java
@@ -28,7 +28,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.tubemq.client.common.PeerInfo;
import org.apache.tubemq.client.config.ConsumerConfig;
-import org.apache.tubemq.client.consumer.ConsumeModel;
+import org.apache.tubemq.client.consumer.ConsumePosition;
import org.apache.tubemq.client.consumer.MessageV2Listener;
import org.apache.tubemq.client.consumer.PushMessageConsumer;
import org.apache.tubemq.client.exception.TubeClientException;
@@ -68,7 +68,7 @@ public final class MessageConsumerExample {
int fetchCount
) throws Exception {
ConsumerConfig consumerConfig = new ConsumerConfig(localHost, masterHostAndPort, group);
- consumerConfig.setConsumeModel(ConsumeModel.CONSUMER_FROM_LATEST_OFFSET);
+ consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
if (fetchCount > 0) {
consumerConfig.setPushFetchThreadCnt(fetchCount);
}
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java
index b760b68..fa9c0a5 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java
@@ -21,8 +21,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.tubemq.client.config.ConsumerConfig;
-import org.apache.tubemq.client.consumer.ConsumeModel;
import org.apache.tubemq.client.consumer.ConsumeOffsetInfo;
+import org.apache.tubemq.client.consumer.ConsumePosition;
import org.apache.tubemq.client.consumer.ConsumerResult;
import org.apache.tubemq.client.consumer.PullMessageConsumer;
import org.apache.tubemq.client.exception.TubeClientException;
@@ -55,7 +55,7 @@ public final class MessagePullConsumerExample {
String group
) throws Exception {
ConsumerConfig consumerConfig = new ConsumerConfig(localHost, masterHostAndPort, group);
- consumerConfig.setConsumeModel(ConsumeModel.CONSUMER_FROM_LATEST_OFFSET);
+ consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
this.messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
}