You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/05/31 08:40:59 UTC

[incubator-tubemq] branch master updated: [TUBEMQ-155] Use enum class for consume position[addendum] (#106)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/master by this push:
     new ccb3490  [TUBEMQ-155] Use enum class for consume position[addendum] (#106)
ccb3490 is described below

commit ccb3490f489e43ae71db9bfbbdc7c71f9bf80bfe
Author: yanhai <yu...@163.com>
AuthorDate: Sun May 31 16:40:52 2020 +0800

    [TUBEMQ-155] Use enum class for consume position[addendum] (#106)
    
    Co-authored-by: yuchengxiang <yu...@meituan.com>
---
 .../tubemq/client/config/ConsumerConfig.java       | 30 +++++++++++++++++-----
 .../client/consumer/BaseMessageConsumer.java       |  4 +--
 .../{ConsumeModel.java => ConsumePosition.java}    | 15 ++++++++---
 .../tubemq/example/MessageConsumerExample.java     |  4 +--
 .../tubemq/example/MessagePullConsumerExample.java |  4 +--
 5 files changed, 41 insertions(+), 16 deletions(-)

diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java b/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java
index 457d126..6cd4526 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java
@@ -18,7 +18,7 @@
 package org.apache.tubemq.client.config;
 
 import org.apache.tubemq.client.common.TClientConstants;
-import org.apache.tubemq.client.consumer.ConsumeModel;
+import org.apache.tubemq.client.consumer.ConsumePosition;
 import org.apache.tubemq.corebase.TBaseConstants;
 import org.apache.tubemq.corebase.cluster.MasterInfo;
 import org.apache.tubemq.corebase.utils.TStringUtils;
@@ -36,7 +36,7 @@ public class ConsumerConfig extends TubeClientConfig {
      *  0: Start from the latest position for the first time. Otherwise start from last consume position.
      *  1: Start from the latest consume position.
     */
-    private ConsumeModel consumeModel = ConsumeModel.CONSUMER_FROM_LATEST_OFFSET;
+    private ConsumePosition consumePosition = ConsumePosition.CONSUMER_FROM_LATEST_OFFSET;
     private int maxSubInfoReportIntvlTimes =
             TClientConstants.MAX_SUBSCRIBE_REPORT_INTERVAL_TIMES;
     private long msgNotFoundWaitPeriodMs =
@@ -84,12 +84,28 @@ public class ConsumerConfig extends TubeClientConfig {
         return consumerGroup;
     }
 
-    public ConsumeModel getConsumeModel() {
-        return consumeModel;
+    public ConsumePosition getConsumePosition() {
+        return consumePosition;
     }
 
-    public void setConsumeModel(ConsumeModel consumeModel) {
-        this.consumeModel = consumeModel;
+    public void setConsumePosition(ConsumePosition consumePosition) {
+        this.consumePosition = consumePosition;
+    }
+
+    /**
+     * recommend to use getConsumePosition
+     */
+    @Deprecated
+    public int getConsumeModel() {
+        return consumePosition.getCode();
+    }
+
+    /**
+     * recommend to use setConsumePosition
+     */
+    @Deprecated
+    public void setConsumeModel(int consumeModel) {
+        setConsumePosition(ConsumePosition.valueOf(consumeModel));
     }
 
     public long getMsgNotFoundWaitPeriodMs() {
@@ -208,7 +224,7 @@ public class ConsumerConfig extends TubeClientConfig {
         return new StringBuilder(512).append("\"ConsumerConfig\":{")
                 .append("\"consumerGroup\":\"").append(this.consumerGroup)
                 .append("\",\"maxSubInfoReportIntvlTimes\":").append(this.maxSubInfoReportIntvlTimes)
-                .append(",\"consumeModel\":").append(this.consumeModel)
+                .append(",\"consumePosition\":").append(this.consumePosition)
                 .append(",\"msgNotFoundWaitPeriodMs\":").append(this.msgNotFoundWaitPeriodMs)
                 .append(",\"shutDownRebalanceWaitPeriodMs\":").append(this.shutDownRebalanceWaitPeriodMs)
                 .append(",\"pushFetchThreadCnt\":").append(this.pushFetchThreadCnt)
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
index b37f9e0..5abc4ba 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/BaseMessageConsumer.java
@@ -1179,7 +1179,7 @@ public class BaseMessageConsumer implements MessageConsumer {
 
     private int getGroupInitReadStatus(boolean isFistReg) {
         int readStatus = TBaseConstants.CONSUME_MODEL_READ_NORMAL;
-        switch (consumerConfig.getConsumeModel()) {
+        switch (consumerConfig.getConsumePosition()) {
             case CONSUMER_FROM_LATEST_OFFSET: {
                 if (isFistReg) {
                     readStatus = TBaseConstants.CONSUME_MODEL_READ_FROM_MAX;
@@ -1187,7 +1187,7 @@ public class BaseMessageConsumer implements MessageConsumer {
                 }
                 break;
             }
-            case CONSUMER_FROM_MAX_OFFSET: {
+            case CONSUMER_FROM_MAX_OFFSET_ALWAYS: {
                 if (isFistReg) {
                     readStatus = TBaseConstants.CONSUME_MODEL_READ_FROM_MAX_ALWAYS;
                     logger.info("[Consume From Max Offset Always]" + consumerId);
diff --git a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/ConsumeModel.java b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/ConsumePosition.java
similarity index 72%
rename from tubemq-client/src/main/java/org/apache/tubemq/client/consumer/ConsumeModel.java
rename to tubemq-client/src/main/java/org/apache/tubemq/client/consumer/ConsumePosition.java
index 1c77ad9..31e4f07 100644
--- a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/ConsumeModel.java
+++ b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/ConsumePosition.java
@@ -16,8 +16,8 @@
  */
 package org.apache.tubemq.client.consumer;
 
-public enum ConsumeModel {
-    CONSUMER_FROM_MAX_OFFSET(1, "Always start from the max consume position."),
+public enum ConsumePosition {
+    CONSUMER_FROM_MAX_OFFSET_ALWAYS(1, "Always start from the max consume position."),
     CONSUMER_FROM_LATEST_OFFSET(0, "Start from the latest position for the first time. " +
             "Otherwise start from last consume position."),
     CONSUMER_FROM_FIRST_OFFSET(-1, "Start from 0 for the first time. Otherwise start from last consume position.");
@@ -25,7 +25,7 @@ public enum ConsumeModel {
     private int code;
     private String description;
 
-    ConsumeModel(int code, String description) {
+    ConsumePosition(int code, String description) {
         this.code = code;
         this.description = description;
     }
@@ -37,4 +37,13 @@ public enum ConsumeModel {
     public String getDescription() {
         return description;
     }
+
+    public static ConsumePosition valueOf(int code) {
+        for (ConsumePosition consumePosition : ConsumePosition.values()) {
+            if (consumePosition.getCode() == code) {
+                return consumePosition;
+            }
+        }
+        throw new IllegalArgumentException(String.format("unknown ConsumePosition code %s", code));
+    }
 }
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java
index ecdee4f..c031a16 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessageConsumerExample.java
@@ -28,7 +28,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import org.apache.tubemq.client.common.PeerInfo;
 import org.apache.tubemq.client.config.ConsumerConfig;
-import org.apache.tubemq.client.consumer.ConsumeModel;
+import org.apache.tubemq.client.consumer.ConsumePosition;
 import org.apache.tubemq.client.consumer.MessageV2Listener;
 import org.apache.tubemq.client.consumer.PushMessageConsumer;
 import org.apache.tubemq.client.exception.TubeClientException;
@@ -68,7 +68,7 @@ public final class MessageConsumerExample {
         int fetchCount
     ) throws Exception {
         ConsumerConfig consumerConfig = new ConsumerConfig(localHost, masterHostAndPort, group);
-        consumerConfig.setConsumeModel(ConsumeModel.CONSUMER_FROM_LATEST_OFFSET);
+        consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
         if (fetchCount > 0) {
             consumerConfig.setPushFetchThreadCnt(fetchCount);
         }
diff --git a/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java b/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java
index b760b68..fa9c0a5 100644
--- a/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java
+++ b/tubemq-example/src/main/java/org/apache/tubemq/example/MessagePullConsumerExample.java
@@ -21,8 +21,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import org.apache.tubemq.client.config.ConsumerConfig;
-import org.apache.tubemq.client.consumer.ConsumeModel;
 import org.apache.tubemq.client.consumer.ConsumeOffsetInfo;
+import org.apache.tubemq.client.consumer.ConsumePosition;
 import org.apache.tubemq.client.consumer.ConsumerResult;
 import org.apache.tubemq.client.consumer.PullMessageConsumer;
 import org.apache.tubemq.client.exception.TubeClientException;
@@ -55,7 +55,7 @@ public final class MessagePullConsumerExample {
         String group
     ) throws Exception {
         ConsumerConfig consumerConfig = new ConsumerConfig(localHost, masterHostAndPort, group);
-        consumerConfig.setConsumeModel(ConsumeModel.CONSUMER_FROM_LATEST_OFFSET);
+        consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
         this.messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
         this.messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
     }