You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 02:44:19 UTC
[23/58] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18
Rename package name from com.alibaba to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java
new file mode 100644
index 0000000..b98b2a2
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class PullRequest {
+ private String consumerGroup;
+ private MessageQueue messageQueue;
+ private ProcessQueue processQueue;
+ private long nextOffset;
+ private boolean lockedFirst = false;
+
+ public boolean isLockedFirst() {
+ return lockedFirst;
+ }
+
+ public void setLockedFirst(boolean lockedFirst) {
+ this.lockedFirst = lockedFirst;
+ }
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+
+ public MessageQueue getMessageQueue() {
+ return messageQueue;
+ }
+
+
+ public void setMessageQueue(MessageQueue messageQueue) {
+ this.messageQueue = messageQueue;
+ }
+
+
+ public long getNextOffset() {
+ return nextOffset;
+ }
+
+
+ public void setNextOffset(long nextOffset) {
+ this.nextOffset = nextOffset;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((consumerGroup == null) ? 0 : consumerGroup.hashCode());
+ result = prime * result + ((messageQueue == null) ? 0 : messageQueue.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ PullRequest other = (PullRequest) obj;
+ if (consumerGroup == null) {
+ if (other.consumerGroup != null)
+ return false;
+ } else if (!consumerGroup.equals(other.consumerGroup))
+ return false;
+ if (messageQueue == null) {
+ if (other.messageQueue != null)
+ return false;
+ } else if (!messageQueue.equals(other.messageQueue))
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "PullRequest [consumerGroup=" + consumerGroup + ", messageQueue=" + messageQueue
+ + ", nextOffset=" + nextOffset + "]";
+ }
+
+ public ProcessQueue getProcessQueue() {
+ return processQueue;
+ }
+
+
+ public void setProcessQueue(ProcessQueue processQueue) {
+ this.processQueue = processQueue;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullResultExt.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullResultExt.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullResultExt.java
new file mode 100644
index 0000000..b924472
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullResultExt.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class PullResultExt extends PullResult {
+ private final long suggestWhichBrokerId;
+ private byte[] messageBinary;
+
+
+ public PullResultExt(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset,
+ List<MessageExt> msgFoundList, final long suggestWhichBrokerId, final byte[] messageBinary) {
+ super(pullStatus, nextBeginOffset, minOffset, maxOffset, msgFoundList);
+ this.suggestWhichBrokerId = suggestWhichBrokerId;
+ this.messageBinary = messageBinary;
+ }
+
+
+ public byte[] getMessageBinary() {
+ return messageBinary;
+ }
+
+
+ public void setMessageBinary(byte[] messageBinary) {
+ this.messageBinary = messageBinary;
+ }
+
+
+ public long getSuggestWhichBrokerId() {
+ return suggestWhichBrokerId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
new file mode 100644
index 0000000..05b7cfc
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -0,0 +1,481 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
+import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.slf4j.Logger;
+
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * Base class for rebalance algorithm
+ *
+ * @author shijia.wxr
+ */
+public abstract class RebalanceImpl {
+ protected static final Logger log = ClientLogger.getLog();
+ protected final ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
+ protected final ConcurrentHashMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =
+ new ConcurrentHashMap<String, Set<MessageQueue>>();
+ protected final ConcurrentHashMap<String /* topic */, SubscriptionData> subscriptionInner =
+ new ConcurrentHashMap<String, SubscriptionData>();
+ protected String consumerGroup;
+ protected MessageModel messageModel;
+ protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
+ protected MQClientInstance mQClientFactory;
+
+
+ public RebalanceImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy,
+ MQClientInstance mQClientFactory) {
+ this.consumerGroup = consumerGroup;
+ this.messageModel = messageModel;
+ this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
+ this.mQClientFactory = mQClientFactory;
+ }
+
+ public void unlock(final MessageQueue mq, final boolean oneway) {
+ FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
+ if (findBrokerResult != null) {
+ UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody();
+ requestBody.setConsumerGroup(this.consumerGroup);
+ requestBody.setClientId(this.mQClientFactory.getClientId());
+ requestBody.getMqSet().add(mq);
+
+ try {
+ this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000, oneway);
+ log.warn("unlock messageQueue. group:{}, clientId:{}, mq:{}", //
+ this.consumerGroup, //
+ this.mQClientFactory.getClientId(), //
+ mq);
+ } catch (Exception e) {
+ log.error("unlockBatchMQ exception, " + mq, e);
+ }
+ }
+ }
+
+ public void unlockAll(final boolean oneway) {
+ HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();
+
+ for (final Map.Entry<String, Set<MessageQueue>> entry : brokerMqs.entrySet()) {
+ final String brokerName = entry.getKey();
+ final Set<MessageQueue> mqs = entry.getValue();
+
+ if (mqs.isEmpty())
+ continue;
+
+ FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
+ if (findBrokerResult != null) {
+ UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody();
+ requestBody.setConsumerGroup(this.consumerGroup);
+ requestBody.setClientId(this.mQClientFactory.getClientId());
+ requestBody.setMqSet(mqs);
+
+ try {
+ this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000, oneway);
+
+ for (MessageQueue mq : mqs) {
+ ProcessQueue processQueue = this.processQueueTable.get(mq);
+ if (processQueue != null) {
+ processQueue.setLocked(false);
+ log.info("the message queue unlock OK, Group: {} {}", this.consumerGroup, mq);
+ }
+ }
+ } catch (Exception e) {
+ log.error("unlockBatchMQ exception, " + mqs, e);
+ }
+ }
+ }
+ }
+
+ private HashMap<String/* brokerName */, Set<MessageQueue>> buildProcessQueueTableByBrokerName() {
+ HashMap<String, Set<MessageQueue>> result = new HashMap<String, Set<MessageQueue>>();
+ for (MessageQueue mq : this.processQueueTable.keySet()) {
+ Set<MessageQueue> mqs = result.get(mq.getBrokerName());
+ if (null == mqs) {
+ mqs = new HashSet<MessageQueue>();
+ result.put(mq.getBrokerName(), mqs);
+ }
+
+ mqs.add(mq);
+ }
+
+ return result;
+ }
+
+ public boolean lock(final MessageQueue mq) {
+ FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
+ if (findBrokerResult != null) {
+ LockBatchRequestBody requestBody = new LockBatchRequestBody();
+ requestBody.setConsumerGroup(this.consumerGroup);
+ requestBody.setClientId(this.mQClientFactory.getClientId());
+ requestBody.getMqSet().add(mq);
+
+ try {
+ Set<MessageQueue> lockedMq =
+ this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
+ for (MessageQueue mmqq : lockedMq) {
+ ProcessQueue processQueue = this.processQueueTable.get(mmqq);
+ if (processQueue != null) {
+ processQueue.setLocked(true);
+ processQueue.setLastLockTimestamp(System.currentTimeMillis());
+ }
+ }
+
+ boolean lockOK = lockedMq.contains(mq);
+ log.info("the message queue lock {}, {} {}",
+ lockOK ? "OK" : "Failed",
+ this.consumerGroup,
+ mq);
+ return lockOK;
+ } catch (Exception e) {
+ log.error("lockBatchMQ exception, " + mq, e);
+ }
+ }
+
+ return false;
+ }
+
+ public void lockAll() {
+ HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();
+
+ Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, Set<MessageQueue>> entry = it.next();
+ final String brokerName = entry.getKey();
+ final Set<MessageQueue> mqs = entry.getValue();
+
+ if (mqs.isEmpty())
+ continue;
+
+ FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
+ if (findBrokerResult != null) {
+ LockBatchRequestBody requestBody = new LockBatchRequestBody();
+ requestBody.setConsumerGroup(this.consumerGroup);
+ requestBody.setClientId(this.mQClientFactory.getClientId());
+ requestBody.setMqSet(mqs);
+
+ try {
+ Set<MessageQueue> lockOKMQSet =
+ this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
+
+ for (MessageQueue mq : lockOKMQSet) {
+ ProcessQueue processQueue = this.processQueueTable.get(mq);
+ if (processQueue != null) {
+ if (!processQueue.isLocked()) {
+ log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);
+ }
+
+ processQueue.setLocked(true);
+ processQueue.setLastLockTimestamp(System.currentTimeMillis());
+ }
+ }
+ for (MessageQueue mq : mqs) {
+ if (!lockOKMQSet.contains(mq)) {
+ ProcessQueue processQueue = this.processQueueTable.get(mq);
+ if (processQueue != null) {
+ processQueue.setLocked(false);
+ log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup, mq);
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("lockBatchMQ exception, " + mqs, e);
+ }
+ }
+ }
+ }
+
+ public void doRebalance(final boolean isOrder) {
+ Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
+ if (subTable != null) {
+ for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
+ final String topic = entry.getKey();
+ try {
+ this.rebalanceByTopic(topic, isOrder);
+ } catch (Throwable e) {
+ if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+ log.warn("rebalanceByTopic Exception", e);
+ }
+ }
+ }
+ }
+
+ this.truncateMessageQueueNotMyTopic();
+ }
+
+ public ConcurrentHashMap<String, SubscriptionData> getSubscriptionInner() {
+ return subscriptionInner;
+ }
+
+ private void rebalanceByTopic(final String topic, final boolean isOrder) {
+ switch (messageModel) {
+ case BROADCASTING: {
+ Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
+ if (mqSet != null) {
+ boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
+ if (changed) {
+ this.messageQueueChanged(topic, mqSet, mqSet);
+ log.info("messageQueueChanged {} {} {} {}", //
+ consumerGroup, //
+ topic, //
+ mqSet, //
+ mqSet);
+ }
+ } else {
+ log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
+ }
+ break;
+ }
+ case CLUSTERING: {
+ Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
+ List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
+ if (null == mqSet) {
+ if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+ log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
+ }
+ }
+
+ if (null == cidAll) {
+ log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
+ }
+
+ if (mqSet != null && cidAll != null) {
+ List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
+ mqAll.addAll(mqSet);
+
+ Collections.sort(mqAll);
+ Collections.sort(cidAll);
+
+ AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
+
+ List<MessageQueue> allocateResult = null;
+ try {
+ allocateResult = strategy.allocate(//
+ this.consumerGroup, //
+ this.mQClientFactory.getClientId(), //
+ mqAll, //
+ cidAll);
+ } catch (Throwable e) {
+ log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
+ e);
+ return;
+ }
+
+ Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
+ if (allocateResult != null) {
+ allocateResultSet.addAll(allocateResult);
+ }
+
+ boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
+ if (changed) {
+ log.info(
+ "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
+ strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
+ allocateResultSet.size(), allocateResultSet);
+ this.messageQueueChanged(topic, mqSet, allocateResultSet);
+ }
+ }
+ break;
+ }
+ default:
+ break;
+ }
+ }
+
+ private void truncateMessageQueueNotMyTopic() {
+ Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
+
+ for (MessageQueue mq : this.processQueueTable.keySet()) {
+ if (!subTable.containsKey(mq.getTopic())) {
+
+ ProcessQueue pq = this.processQueueTable.remove(mq);
+ if (pq != null) {
+ pq.setDropped(true);
+ log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq);
+ }
+ }
+ }
+ }
+
+ private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
+ boolean changed = false;
+
+ Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<MessageQueue, ProcessQueue> next = it.next();
+ MessageQueue mq = next.getKey();
+ ProcessQueue pq = next.getValue();
+
+ if (mq.getTopic().equals(topic)) {
+ if (!mqSet.contains(mq)) {
+ pq.setDropped(true);
+ if (this.removeUnnecessaryMessageQueue(mq, pq)) {
+ it.remove();
+ changed = true;
+ log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
+ }
+ } else if (pq.isPullExpired()) {
+ switch (this.consumeType()) {
+ case CONSUME_ACTIVELY:
+ break;
+ case CONSUME_PASSIVELY:
+ pq.setDropped(true);
+ if (this.removeUnnecessaryMessageQueue(mq, pq)) {
+ it.remove();
+ changed = true;
+ log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
+ consumerGroup, mq);
+ }
+ break;
+ default:
+ break;
+ }
+ }
+ }
+ }
+
+ List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
+ for (MessageQueue mq : mqSet) {
+ if (!this.processQueueTable.containsKey(mq)) {
+ if (isOrder && !this.lock(mq)) {
+ log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
+ continue;
+ }
+
+ this.removeDirtyOffset(mq);
+ ProcessQueue pq = new ProcessQueue();
+ long nextOffset = this.computePullFromWhere(mq);
+ if (nextOffset >= 0) {
+ ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
+ if (pre != null) {
+ log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
+ } else {
+ log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
+ PullRequest pullRequest = new PullRequest();
+ pullRequest.setConsumerGroup(consumerGroup);
+ pullRequest.setNextOffset(nextOffset);
+ pullRequest.setMessageQueue(mq);
+ pullRequest.setProcessQueue(pq);
+ pullRequestList.add(pullRequest);
+ changed = true;
+ }
+ } else {
+ log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
+ }
+ }
+ }
+
+ this.dispatchPullRequest(pullRequestList);
+
+ return changed;
+ }
+
+ public abstract void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll, final Set<MessageQueue> mqDivided);
+
+ public abstract boolean removeUnnecessaryMessageQueue(final MessageQueue mq, final ProcessQueue pq);
+
+ public abstract ConsumeType consumeType();
+
+ public abstract void removeDirtyOffset(final MessageQueue mq);
+
+ public abstract long computePullFromWhere(final MessageQueue mq);
+
+ public abstract void dispatchPullRequest(final List<PullRequest> pullRequestList);
+
+ public void removeProcessQueue(final MessageQueue mq) {
+ ProcessQueue prev = this.processQueueTable.remove(mq);
+ if (prev != null) {
+ boolean droped = prev.isDropped();
+ prev.setDropped(true);
+ this.removeUnnecessaryMessageQueue(mq, prev);
+ log.info("Fix Offset, {}, remove unnecessary mq, {} Droped: {}", consumerGroup, mq, droped);
+ }
+ }
+
+ public ConcurrentHashMap<MessageQueue, ProcessQueue> getProcessQueueTable() {
+ return processQueueTable;
+ }
+
+
+ public ConcurrentHashMap<String, Set<MessageQueue>> getTopicSubscribeInfoTable() {
+ return topicSubscribeInfoTable;
+ }
+
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+
+ public MessageModel getMessageModel() {
+ return messageModel;
+ }
+
+
+ public void setMessageModel(MessageModel messageModel) {
+ this.messageModel = messageModel;
+ }
+
+
+ public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
+ return allocateMessageQueueStrategy;
+ }
+
+
+ public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
+ this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
+ }
+
+
+ public MQClientInstance getmQClientFactory() {
+ return mQClientFactory;
+ }
+
+
+ public void setmQClientFactory(MQClientInstance mQClientFactory) {
+ this.mQClientFactory = mQClientFactory;
+ }
+
+
+ public void destroy() {
+ Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<MessageQueue, ProcessQueue> next = it.next();
+ next.getValue().setDropped(true);
+ }
+
+ this.processQueueTable.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
new file mode 100644
index 0000000..376c21c
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.client.consumer.MessageQueueListener;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class RebalancePullImpl extends RebalanceImpl {
+ private final DefaultMQPullConsumerImpl defaultMQPullConsumerImpl;
+
+
+ public RebalancePullImpl(DefaultMQPullConsumerImpl defaultMQPullConsumerImpl) {
+ this(null, null, null, null, defaultMQPullConsumerImpl);
+ }
+
+
+ public RebalancePullImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy,
+ MQClientInstance mQClientFactory, DefaultMQPullConsumerImpl defaultMQPullConsumerImpl) {
+ super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory);
+ this.defaultMQPullConsumerImpl = defaultMQPullConsumerImpl;
+ }
+
+ @Override
+ public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
+ MessageQueueListener messageQueueListener = this.defaultMQPullConsumerImpl.getDefaultMQPullConsumer().getMessageQueueListener();
+ if (messageQueueListener != null) {
+ try {
+ messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
+ } catch (Throwable e) {
+ log.error("messageQueueChanged exception", e);
+ }
+ }
+ }
+
+ @Override
+ public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
+ this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq);
+ this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);
+ return true;
+ }
+
+ @Override
+ public ConsumeType consumeType() {
+ return ConsumeType.CONSUME_ACTIVELY;
+ }
+
+ @Override
+ public void removeDirtyOffset(final MessageQueue mq) {
+ this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);
+ }
+
+ @Override
+ public long computePullFromWhere(MessageQueue mq) {
+ return 0;
+ }
+
+ @Override
+ public void dispatchPullRequest(List<PullRequest> pullRequestList) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
new file mode 100644
index 0000000..4efac01
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.client.consumer.store.OffsetStore;
+import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class RebalancePushImpl extends RebalanceImpl {
+ private final static long UNLOCK_DELAY_TIME_MILLS = Long.parseLong(System.getProperty("rocketmq.client.unlockDelayTimeMills", "20000"));
+ private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
+
+
+ public RebalancePushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
+ this(null, null, null, null, defaultMQPushConsumerImpl);
+ }
+
+
+ public RebalancePushImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy,
+ MQClientInstance mQClientFactory, DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
+ super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory);
+ this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
+ }
+
+ @Override
+ public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
+ }
+
+ @Override
+ public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
+ this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
+ this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
+ if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
+ && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
+ try {
+ if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
+ try {
+ return this.unlockDelay(mq, pq);
+ } finally {
+ pq.getLockConsume().unlock();
+ }
+ } else {
+ log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", //
+ mq, //
+ pq.getTryUnlockTimes());
+
+ pq.incTryUnlockTimes();
+ }
+ } catch (Exception e) {
+ log.error("removeUnnecessaryMessageQueue Exception", e);
+ }
+
+ return false;
+ }
+ return true;
+ }
+
+ private boolean unlockDelay(final MessageQueue mq, final ProcessQueue pq) {
+
+ if (pq.hasTempMessage()) {
+ log.info("[{}]unlockDelay, begin {} ", mq.hashCode(), mq);
+ this.defaultMQPushConsumerImpl.getmQClientFactory().getScheduledExecutorService().schedule(new Runnable() {
+ @Override
+ public void run() {
+ log.info("[{}]unlockDelay, execute at once {}", mq.hashCode(), mq);
+ RebalancePushImpl.this.unlock(mq, true);
+ }
+ }, UNLOCK_DELAY_TIME_MILLS, TimeUnit.MILLISECONDS);
+ } else {
+ this.unlock(mq, true);
+ }
+ return true;
+ }
+
+ @Override
+ public ConsumeType consumeType() {
+ return ConsumeType.CONSUME_PASSIVELY;
+ }
+
+ @Override
+ public void removeDirtyOffset(final MessageQueue mq) {
+ this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
+ }
+
+ @Override
+ public long computePullFromWhere(MessageQueue mq) {
+ long result = -1;
+ final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
+ final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
+ switch (consumeFromWhere) {
+ case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
+ case CONSUME_FROM_MIN_OFFSET:
+ case CONSUME_FROM_MAX_OFFSET:
+ case CONSUME_FROM_LAST_OFFSET: {
+ long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
+ if (lastOffset >= 0) {
+ result = lastOffset;
+ }
+ // First start,no offset
+ else if (-1 == lastOffset) {
+ if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+ result = 0L;
+ } else {
+ try {
+ result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
+ } catch (MQClientException e) {
+ result = -1;
+ }
+ }
+ } else {
+ result = -1;
+ }
+ break;
+ }
+ case CONSUME_FROM_FIRST_OFFSET: {
+ long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
+ if (lastOffset >= 0) {
+ result = lastOffset;
+ } else if (-1 == lastOffset) {
+ result = 0L;
+ } else {
+ result = -1;
+ }
+ break;
+ }
+ case CONSUME_FROM_TIMESTAMP: {
+ long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
+ if (lastOffset >= 0) {
+ result = lastOffset;
+ } else if (-1 == lastOffset) {
+ if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+ try {
+ result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
+ } catch (MQClientException e) {
+ result = -1;
+ }
+ } else {
+ try {
+ long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
+ UtilAll.YYYY_MMDD_HHMMSS).getTime();
+ result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
+ } catch (MQClientException e) {
+ result = -1;
+ }
+ }
+ } else {
+ result = -1;
+ }
+ break;
+ }
+
+ default:
+ break;
+ }
+
+ return result;
+ }
+
+ @Override
+ public void dispatchPullRequest(List<PullRequest> pullRequestList) {
+ for (PullRequest pullRequest : pullRequestList) {
+ this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
+ log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceService.java
new file mode 100644
index 0000000..e6059fe
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceService.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.ServiceThread;
+import org.slf4j.Logger;
+
+
+/**
+ * Rebalance Service
+ *
+ * @author shijia.wxr
+ */
+public class RebalanceService extends ServiceThread {
+ private static long waitInterval =
+ Long.parseLong(System.getProperty(
+ "rocketmq.client.rebalance.waitInterval", "20000"));
+ private final Logger log = ClientLogger.getLog();
+ private final MQClientInstance mqClientFactory;
+
+ public RebalanceService(MQClientInstance mqClientFactory) {
+ this.mqClientFactory = mqClientFactory;
+ }
+
+ @Override
+ public void run() {
+ log.info(this.getServiceName() + " service started");
+
+ while (!this.isStopped()) {
+ this.waitForRunning(waitInterval);
+ this.mqClientFactory.doRebalance();
+ }
+
+ log.info(this.getServiceName() + " service end");
+ }
+
+
+ @Override
+ public String getServiceName() {
+ return RebalanceService.class.getSimpleName();
+ }
+}