You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/11/27 02:30:49 UTC

[GitHub] Hellojungle closed pull request #559: [ISSUE557]Add function to receive message broadcast and ordered.

Hellojungle closed pull request #559: [ISSUE557]Add function to receive message broadcast and ordered. 
URL: https://github.com/apache/rocketmq/pull/559
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 393ef92c9..69e1699ee 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -268,7 +268,7 @@ public void pullMessage(final PullRequest pullRequest) {
                     pullRequest.setLockedFirst(true);
                     pullRequest.setNextOffset(offset);
                 }
-            } else {
+            } else if (MessageModel.CLUSTERING.equals(this.messageModel())) {
                 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
                 log.info("pull message later because not locked in broker, {}", pullRequest);
                 return;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index 9ad07c7e4..976ddbee4 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -369,7 +369,7 @@ private boolean updateProcessQueueTableInRebalance(final String topic, final Set
         List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
         for (MessageQueue mq : mqSet) {
             if (!this.processQueueTable.containsKey(mq)) {
-                if (isOrder && !this.lock(mq)) {
+                if (MessageModel.CLUSTERING.equals(this.messageModel) && isOrder && !this.lock(mq)) {
                     log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                     continue;
                 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 02aa84a3e..8d60321ed 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -604,7 +604,7 @@ public void setDefaultQueryMaxNum(int defaultQueryMaxNum) {
     }
 
     /**
-     * Enable transient commitLog store poll only if transientStorePoolEnable is true and the FlushDiskType is
+     * Enable transient commitLog store pool only if transientStorePoolEnable is true and the FlushDiskType is
      * ASYNC_FLUSH
      *
      * @return <tt>true</tt> or <tt>false</tt>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services