You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/08/01 07:23:01 UTC

[03/50] [abbrv] incubator-rocketmq git commit: [ROCKETMQ-107] Fix possible concurrency problem on ServiceState when consumer start/shutdown, closes apache/incubator-rocketmq#68

[ROCKETMQ-107] Fix possible concurrency problem on ServiceState when consumer start/shutdown, closes apache/incubator-rocketmq#68


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/3c6260a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/3c6260a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/3c6260a6

Branch: refs/heads/develop
Commit: 3c6260a6eba09bb0322b4f883f28be8ac5527a54
Parents: 15c2b55
Author: Jaskey <li...@gmail.com>
Authored: Wed Apr 19 11:58:48 2017 +0800
Committer: dongeforever <zh...@yeah.net>
Committed: Tue Jun 6 11:37:29 2017 +0800

----------------------------------------------------------------------
 .../consumer/DefaultMQPullConsumerImpl.java     | 21 +++++++++++++-------
 .../consumer/DefaultMQPushConsumerImpl.java     | 15 +++++++-------
 2 files changed, 22 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/3c6260a6/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index b26d062..7d43b37 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -70,7 +70,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
     private final RPCHook rpcHook;
     private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
     private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
-    private ServiceState serviceState = ServiceState.CREATE_JUST;
+    private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
     private MQClientInstance mQClientFactory;
     private PullAPIWrapper pullAPIWrapper;
     private OffsetStore offsetStore;
@@ -161,7 +161,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
         return this.pullSyncImpl(mq, subExpression, offset, maxNums, false, timeout);
     }
 
-    private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block, long timeout)
+    private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block,
+        long timeout)
         throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         this.makeSureStateOK();
 
@@ -365,7 +366,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
         pull(mq, subExpression, offset, maxNums, pullCallback, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis());
     }
 
-    public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback, long timeout)
+    public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback,
+        long timeout)
         throws MQClientException, RemotingException, InterruptedException {
         this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, false, timeout);
     }
@@ -449,7 +451,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
         return defaultMQPullConsumer;
     }
 
-    public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback)
+    public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums,
+        PullCallback pullCallback)
         throws MQClientException, RemotingException, InterruptedException {
         this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, true,
             this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
@@ -510,7 +513,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
         }
     }
 
-    public void shutdown() {
+    public synchronized void shutdown() {
         switch (this.serviceState) {
             case CREATE_JUST:
                 break;
@@ -528,7 +531,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
         }
     }
 
-    public void start() throws MQClientException {
+    public synchronized void start() throws MQClientException {
         switch (this.serviceState) {
             case CREATE_JUST:
                 this.serviceState = ServiceState.START_FAILED;
@@ -593,6 +596,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
             default:
                 break;
         }
+
     }
 
     private void checkConfig() throws MQClientException {
@@ -662,7 +666,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
         this.offsetStore.updateOffset(mq, offset, false);
     }
 
-    public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+    public MessageExt viewMessage(String msgId)
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         this.makeSureStateOK();
         return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId);
     }
@@ -692,6 +697,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
         return serviceState;
     }
 
+    //Don't use this deprecated setter, which will be removed soon.
+    @Deprecated
     public void setServiceState(ServiceState serviceState) {
         this.serviceState = serviceState;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/3c6260a6/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 4f33732..67f3ebe 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -97,7 +97,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     private final long consumerStartTimestamp = System.currentTimeMillis();
     private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
     private final RPCHook rpcHook;
-    private ServiceState serviceState = ServiceState.CREATE_JUST;
+    private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
     private MQClientInstance mQClientFactory;
     private PullAPIWrapper pullAPIWrapper;
     private volatile boolean pause = false;
@@ -515,7 +515,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
         }
     }
 
-    public void shutdown() {
+    public synchronized void shutdown() {
         switch (this.serviceState) {
             case CREATE_JUST:
                 break;
@@ -535,7 +535,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
         }
     }
 
-    public void start() throws MQClientException {
+    public synchronized void start() throws MQClientException {
         switch (this.serviceState) {
             case CREATE_JUST:
                 log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
@@ -615,9 +615,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
         }
 
         this.updateTopicSubscribeInfoWhenSubscriptionChanged();
-
         this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
-
         this.mQClientFactory.rebalanceImmediately();
     }
 
@@ -855,7 +853,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
         this.consumeMessageService.updateCorePoolSize(corePoolSize);
     }
 
-    public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+    public MessageExt viewMessage(String msgId)
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId);
     }
 
@@ -1014,7 +1013,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
         return serviceState;
     }
 
-    public void setServiceState(ServiceState serviceState) {
+    //Don't use this deprecated setter, which will be removed soon.
+    @Deprecated
+    public synchronized void setServiceState(ServiceState serviceState) {
         this.serviceState = serviceState;
     }