You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/08/01 07:23:01 UTC
[03/50] [abbrv] incubator-rocketmq git commit: [ROCKETMQ-107] Fix
possible concurrency problem on ServiceState when consumer start/shutdown,
closes apache/incubator-rocketmq#68
[ROCKETMQ-107] Fix possible concurrency problem on ServiceState when consumer start/shutdown, closes apache/incubator-rocketmq#68
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/3c6260a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/3c6260a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/3c6260a6
Branch: refs/heads/develop
Commit: 3c6260a6eba09bb0322b4f883f28be8ac5527a54
Parents: 15c2b55
Author: Jaskey <li...@gmail.com>
Authored: Wed Apr 19 11:58:48 2017 +0800
Committer: dongeforever <zh...@yeah.net>
Committed: Tue Jun 6 11:37:29 2017 +0800
----------------------------------------------------------------------
.../consumer/DefaultMQPullConsumerImpl.java | 21 +++++++++++++-------
.../consumer/DefaultMQPushConsumerImpl.java | 15 +++++++-------
2 files changed, 22 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/3c6260a6/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index b26d062..7d43b37 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -70,7 +70,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
private final RPCHook rpcHook;
private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
- private ServiceState serviceState = ServiceState.CREATE_JUST;
+ private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
private MQClientInstance mQClientFactory;
private PullAPIWrapper pullAPIWrapper;
private OffsetStore offsetStore;
@@ -161,7 +161,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
return this.pullSyncImpl(mq, subExpression, offset, maxNums, false, timeout);
}
- private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block, long timeout)
+ private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block,
+ long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
@@ -365,7 +366,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
pull(mq, subExpression, offset, maxNums, pullCallback, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis());
}
- public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback, long timeout)
+ public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback,
+ long timeout)
throws MQClientException, RemotingException, InterruptedException {
this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, false, timeout);
}
@@ -449,7 +451,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
return defaultMQPullConsumer;
}
- public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback)
+ public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums,
+ PullCallback pullCallback)
throws MQClientException, RemotingException, InterruptedException {
this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, true,
this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
@@ -510,7 +513,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
}
}
- public void shutdown() {
+ public synchronized void shutdown() {
switch (this.serviceState) {
case CREATE_JUST:
break;
@@ -528,7 +531,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
}
}
- public void start() throws MQClientException {
+ public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
@@ -593,6 +596,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
default:
break;
}
+
}
private void checkConfig() throws MQClientException {
@@ -662,7 +666,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
this.offsetStore.updateOffset(mq, offset, false);
}
- public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ public MessageExt viewMessage(String msgId)
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.makeSureStateOK();
return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId);
}
@@ -692,6 +697,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
return serviceState;
}
+ //Don't use this deprecated setter, which will be removed soon.
+ @Deprecated
public void setServiceState(ServiceState serviceState) {
this.serviceState = serviceState;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/3c6260a6/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 4f33732..67f3ebe 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -97,7 +97,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private final long consumerStartTimestamp = System.currentTimeMillis();
private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
private final RPCHook rpcHook;
- private ServiceState serviceState = ServiceState.CREATE_JUST;
+ private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
private MQClientInstance mQClientFactory;
private PullAPIWrapper pullAPIWrapper;
private volatile boolean pause = false;
@@ -515,7 +515,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
}
- public void shutdown() {
+ public synchronized void shutdown() {
switch (this.serviceState) {
case CREATE_JUST:
break;
@@ -535,7 +535,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
}
- public void start() throws MQClientException {
+ public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
@@ -615,9 +615,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
-
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
-
this.mQClientFactory.rebalanceImmediately();
}
@@ -855,7 +853,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
this.consumeMessageService.updateCorePoolSize(corePoolSize);
}
- public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ public MessageExt viewMessage(String msgId)
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return this.mQClientFactory.getMQAdminImpl().viewMessage(msgId);
}
@@ -1014,7 +1013,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
return serviceState;
}
- public void setServiceState(ServiceState serviceState) {
+ //Don't use this deprecated setter, which will be removed soon.
+ @Deprecated
+ public synchronized void setServiceState(ServiceState serviceState) {
this.serviceState = serviceState;
}