You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 02:44:29 UTC
[33/58] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18
Rename package name from com.alibaba to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullRequest.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullRequest.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullRequest.java
deleted file mode 100644
index efc5ab0..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullRequest.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.consumer;
-
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-
-/**
- * @author shijia.wxr
- */
-public class PullRequest {
- private String consumerGroup;
- private MessageQueue messageQueue;
- private ProcessQueue processQueue;
- private long nextOffset;
- private boolean lockedFirst = false;
-
- public boolean isLockedFirst() {
- return lockedFirst;
- }
-
- public void setLockedFirst(boolean lockedFirst) {
- this.lockedFirst = lockedFirst;
- }
-
- public String getConsumerGroup() {
- return consumerGroup;
- }
-
-
- public void setConsumerGroup(String consumerGroup) {
- this.consumerGroup = consumerGroup;
- }
-
-
- public MessageQueue getMessageQueue() {
- return messageQueue;
- }
-
-
- public void setMessageQueue(MessageQueue messageQueue) {
- this.messageQueue = messageQueue;
- }
-
-
- public long getNextOffset() {
- return nextOffset;
- }
-
-
- public void setNextOffset(long nextOffset) {
- this.nextOffset = nextOffset;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((consumerGroup == null) ? 0 : consumerGroup.hashCode());
- result = prime * result + ((messageQueue == null) ? 0 : messageQueue.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- PullRequest other = (PullRequest) obj;
- if (consumerGroup == null) {
- if (other.consumerGroup != null)
- return false;
- } else if (!consumerGroup.equals(other.consumerGroup))
- return false;
- if (messageQueue == null) {
- if (other.messageQueue != null)
- return false;
- } else if (!messageQueue.equals(other.messageQueue))
- return false;
- return true;
- }
-
- @Override
- public String toString() {
- return "PullRequest [consumerGroup=" + consumerGroup + ", messageQueue=" + messageQueue
- + ", nextOffset=" + nextOffset + "]";
- }
-
- public ProcessQueue getProcessQueue() {
- return processQueue;
- }
-
-
- public void setProcessQueue(ProcessQueue processQueue) {
- this.processQueue = processQueue;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullResultExt.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullResultExt.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullResultExt.java
deleted file mode 100644
index e140b6a..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/PullResultExt.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.consumer;
-
-import com.alibaba.rocketmq.client.consumer.PullResult;
-import com.alibaba.rocketmq.client.consumer.PullStatus;
-import com.alibaba.rocketmq.common.message.MessageExt;
-
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- */
-public class PullResultExt extends PullResult {
- private final long suggestWhichBrokerId;
- private byte[] messageBinary;
-
-
- public PullResultExt(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset,
- List<MessageExt> msgFoundList, final long suggestWhichBrokerId, final byte[] messageBinary) {
- super(pullStatus, nextBeginOffset, minOffset, maxOffset, msgFoundList);
- this.suggestWhichBrokerId = suggestWhichBrokerId;
- this.messageBinary = messageBinary;
- }
-
-
- public byte[] getMessageBinary() {
- return messageBinary;
- }
-
-
- public void setMessageBinary(byte[] messageBinary) {
- this.messageBinary = messageBinary;
- }
-
-
- public long getSuggestWhichBrokerId() {
- return suggestWhichBrokerId;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceImpl.java
deleted file mode 100644
index 641bb75..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceImpl.java
+++ /dev/null
@@ -1,481 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.consumer;
-
-import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import com.alibaba.rocketmq.client.impl.FindBrokerResult;
-import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.common.protocol.body.LockBatchRequestBody;
-import com.alibaba.rocketmq.common.protocol.body.UnlockBatchRequestBody;
-import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
-import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
-import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import org.slf4j.Logger;
-
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-/**
- * Base class for rebalance algorithm
- *
- * @author shijia.wxr
- */
-public abstract class RebalanceImpl {
- protected static final Logger log = ClientLogger.getLog();
- protected final ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
- protected final ConcurrentHashMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =
- new ConcurrentHashMap<String, Set<MessageQueue>>();
- protected final ConcurrentHashMap<String /* topic */, SubscriptionData> subscriptionInner =
- new ConcurrentHashMap<String, SubscriptionData>();
- protected String consumerGroup;
- protected MessageModel messageModel;
- protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
- protected MQClientInstance mQClientFactory;
-
-
- public RebalanceImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy,
- MQClientInstance mQClientFactory) {
- this.consumerGroup = consumerGroup;
- this.messageModel = messageModel;
- this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
- this.mQClientFactory = mQClientFactory;
- }
-
- public void unlock(final MessageQueue mq, final boolean oneway) {
- FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
- if (findBrokerResult != null) {
- UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody();
- requestBody.setConsumerGroup(this.consumerGroup);
- requestBody.setClientId(this.mQClientFactory.getClientId());
- requestBody.getMqSet().add(mq);
-
- try {
- this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000, oneway);
- log.warn("unlock messageQueue. group:{}, clientId:{}, mq:{}", //
- this.consumerGroup, //
- this.mQClientFactory.getClientId(), //
- mq);
- } catch (Exception e) {
- log.error("unlockBatchMQ exception, " + mq, e);
- }
- }
- }
-
- public void unlockAll(final boolean oneway) {
- HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();
-
- for (final Map.Entry<String, Set<MessageQueue>> entry : brokerMqs.entrySet()) {
- final String brokerName = entry.getKey();
- final Set<MessageQueue> mqs = entry.getValue();
-
- if (mqs.isEmpty())
- continue;
-
- FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
- if (findBrokerResult != null) {
- UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody();
- requestBody.setConsumerGroup(this.consumerGroup);
- requestBody.setClientId(this.mQClientFactory.getClientId());
- requestBody.setMqSet(mqs);
-
- try {
- this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000, oneway);
-
- for (MessageQueue mq : mqs) {
- ProcessQueue processQueue = this.processQueueTable.get(mq);
- if (processQueue != null) {
- processQueue.setLocked(false);
- log.info("the message queue unlock OK, Group: {} {}", this.consumerGroup, mq);
- }
- }
- } catch (Exception e) {
- log.error("unlockBatchMQ exception, " + mqs, e);
- }
- }
- }
- }
-
- private HashMap<String/* brokerName */, Set<MessageQueue>> buildProcessQueueTableByBrokerName() {
- HashMap<String, Set<MessageQueue>> result = new HashMap<String, Set<MessageQueue>>();
- for (MessageQueue mq : this.processQueueTable.keySet()) {
- Set<MessageQueue> mqs = result.get(mq.getBrokerName());
- if (null == mqs) {
- mqs = new HashSet<MessageQueue>();
- result.put(mq.getBrokerName(), mqs);
- }
-
- mqs.add(mq);
- }
-
- return result;
- }
-
- public boolean lock(final MessageQueue mq) {
- FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
- if (findBrokerResult != null) {
- LockBatchRequestBody requestBody = new LockBatchRequestBody();
- requestBody.setConsumerGroup(this.consumerGroup);
- requestBody.setClientId(this.mQClientFactory.getClientId());
- requestBody.getMqSet().add(mq);
-
- try {
- Set<MessageQueue> lockedMq =
- this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
- for (MessageQueue mmqq : lockedMq) {
- ProcessQueue processQueue = this.processQueueTable.get(mmqq);
- if (processQueue != null) {
- processQueue.setLocked(true);
- processQueue.setLastLockTimestamp(System.currentTimeMillis());
- }
- }
-
- boolean lockOK = lockedMq.contains(mq);
- log.info("the message queue lock {}, {} {}",
- lockOK ? "OK" : "Failed",
- this.consumerGroup,
- mq);
- return lockOK;
- } catch (Exception e) {
- log.error("lockBatchMQ exception, " + mq, e);
- }
- }
-
- return false;
- }
-
- public void lockAll() {
- HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();
-
- Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, Set<MessageQueue>> entry = it.next();
- final String brokerName = entry.getKey();
- final Set<MessageQueue> mqs = entry.getValue();
-
- if (mqs.isEmpty())
- continue;
-
- FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
- if (findBrokerResult != null) {
- LockBatchRequestBody requestBody = new LockBatchRequestBody();
- requestBody.setConsumerGroup(this.consumerGroup);
- requestBody.setClientId(this.mQClientFactory.getClientId());
- requestBody.setMqSet(mqs);
-
- try {
- Set<MessageQueue> lockOKMQSet =
- this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
-
- for (MessageQueue mq : lockOKMQSet) {
- ProcessQueue processQueue = this.processQueueTable.get(mq);
- if (processQueue != null) {
- if (!processQueue.isLocked()) {
- log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);
- }
-
- processQueue.setLocked(true);
- processQueue.setLastLockTimestamp(System.currentTimeMillis());
- }
- }
- for (MessageQueue mq : mqs) {
- if (!lockOKMQSet.contains(mq)) {
- ProcessQueue processQueue = this.processQueueTable.get(mq);
- if (processQueue != null) {
- processQueue.setLocked(false);
- log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup, mq);
- }
- }
- }
- } catch (Exception e) {
- log.error("lockBatchMQ exception, " + mqs, e);
- }
- }
- }
- }
-
- public void doRebalance(final boolean isOrder) {
- Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
- if (subTable != null) {
- for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
- final String topic = entry.getKey();
- try {
- this.rebalanceByTopic(topic, isOrder);
- } catch (Throwable e) {
- if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- log.warn("rebalanceByTopic Exception", e);
- }
- }
- }
- }
-
- this.truncateMessageQueueNotMyTopic();
- }
-
- public ConcurrentHashMap<String, SubscriptionData> getSubscriptionInner() {
- return subscriptionInner;
- }
-
- private void rebalanceByTopic(final String topic, final boolean isOrder) {
- switch (messageModel) {
- case BROADCASTING: {
- Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
- if (mqSet != null) {
- boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
- if (changed) {
- this.messageQueueChanged(topic, mqSet, mqSet);
- log.info("messageQueueChanged {} {} {} {}", //
- consumerGroup, //
- topic, //
- mqSet, //
- mqSet);
- }
- } else {
- log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
- }
- break;
- }
- case CLUSTERING: {
- Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
- List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
- if (null == mqSet) {
- if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
- }
- }
-
- if (null == cidAll) {
- log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
- }
-
- if (mqSet != null && cidAll != null) {
- List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
- mqAll.addAll(mqSet);
-
- Collections.sort(mqAll);
- Collections.sort(cidAll);
-
- AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
-
- List<MessageQueue> allocateResult = null;
- try {
- allocateResult = strategy.allocate(//
- this.consumerGroup, //
- this.mQClientFactory.getClientId(), //
- mqAll, //
- cidAll);
- } catch (Throwable e) {
- log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
- e);
- return;
- }
-
- Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
- if (allocateResult != null) {
- allocateResultSet.addAll(allocateResult);
- }
-
- boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
- if (changed) {
- log.info(
- "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
- strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
- allocateResultSet.size(), allocateResultSet);
- this.messageQueueChanged(topic, mqSet, allocateResultSet);
- }
- }
- break;
- }
- default:
- break;
- }
- }
-
- private void truncateMessageQueueNotMyTopic() {
- Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
-
- for (MessageQueue mq : this.processQueueTable.keySet()) {
- if (!subTable.containsKey(mq.getTopic())) {
-
- ProcessQueue pq = this.processQueueTable.remove(mq);
- if (pq != null) {
- pq.setDropped(true);
- log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq);
- }
- }
- }
- }
-
- private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
- boolean changed = false;
-
- Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<MessageQueue, ProcessQueue> next = it.next();
- MessageQueue mq = next.getKey();
- ProcessQueue pq = next.getValue();
-
- if (mq.getTopic().equals(topic)) {
- if (!mqSet.contains(mq)) {
- pq.setDropped(true);
- if (this.removeUnnecessaryMessageQueue(mq, pq)) {
- it.remove();
- changed = true;
- log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
- }
- } else if (pq.isPullExpired()) {
- switch (this.consumeType()) {
- case CONSUME_ACTIVELY:
- break;
- case CONSUME_PASSIVELY:
- pq.setDropped(true);
- if (this.removeUnnecessaryMessageQueue(mq, pq)) {
- it.remove();
- changed = true;
- log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
- consumerGroup, mq);
- }
- break;
- default:
- break;
- }
- }
- }
- }
-
- List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
- for (MessageQueue mq : mqSet) {
- if (!this.processQueueTable.containsKey(mq)) {
- if (isOrder && !this.lock(mq)) {
- log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
- continue;
- }
-
- this.removeDirtyOffset(mq);
- ProcessQueue pq = new ProcessQueue();
- long nextOffset = this.computePullFromWhere(mq);
- if (nextOffset >= 0) {
- ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
- if (pre != null) {
- log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
- } else {
- log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
- PullRequest pullRequest = new PullRequest();
- pullRequest.setConsumerGroup(consumerGroup);
- pullRequest.setNextOffset(nextOffset);
- pullRequest.setMessageQueue(mq);
- pullRequest.setProcessQueue(pq);
- pullRequestList.add(pullRequest);
- changed = true;
- }
- } else {
- log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
- }
- }
- }
-
- this.dispatchPullRequest(pullRequestList);
-
- return changed;
- }
-
- public abstract void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll, final Set<MessageQueue> mqDivided);
-
- public abstract boolean removeUnnecessaryMessageQueue(final MessageQueue mq, final ProcessQueue pq);
-
- public abstract ConsumeType consumeType();
-
- public abstract void removeDirtyOffset(final MessageQueue mq);
-
- public abstract long computePullFromWhere(final MessageQueue mq);
-
- public abstract void dispatchPullRequest(final List<PullRequest> pullRequestList);
-
- public void removeProcessQueue(final MessageQueue mq) {
- ProcessQueue prev = this.processQueueTable.remove(mq);
- if (prev != null) {
- boolean droped = prev.isDropped();
- prev.setDropped(true);
- this.removeUnnecessaryMessageQueue(mq, prev);
- log.info("Fix Offset, {}, remove unnecessary mq, {} Droped: {}", consumerGroup, mq, droped);
- }
- }
-
- public ConcurrentHashMap<MessageQueue, ProcessQueue> getProcessQueueTable() {
- return processQueueTable;
- }
-
-
- public ConcurrentHashMap<String, Set<MessageQueue>> getTopicSubscribeInfoTable() {
- return topicSubscribeInfoTable;
- }
-
-
- public String getConsumerGroup() {
- return consumerGroup;
- }
-
-
- public void setConsumerGroup(String consumerGroup) {
- this.consumerGroup = consumerGroup;
- }
-
-
- public MessageModel getMessageModel() {
- return messageModel;
- }
-
-
- public void setMessageModel(MessageModel messageModel) {
- this.messageModel = messageModel;
- }
-
-
- public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
- return allocateMessageQueueStrategy;
- }
-
-
- public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
- this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
- }
-
-
- public MQClientInstance getmQClientFactory() {
- return mQClientFactory;
- }
-
-
- public void setmQClientFactory(MQClientInstance mQClientFactory) {
- this.mQClientFactory = mQClientFactory;
- }
-
-
- public void destroy() {
- Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<MessageQueue, ProcessQueue> next = it.next();
- next.getValue().setDropped(true);
- }
-
- this.processQueueTable.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePullImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePullImpl.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePullImpl.java
deleted file mode 100644
index 8d2b465..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePullImpl.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.consumer;
-
-import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import com.alibaba.rocketmq.client.consumer.MessageQueueListener;
-import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
-import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
-
-import java.util.List;
-import java.util.Set;
-
-
-/**
- * @author shijia.wxr
- */
-public class RebalancePullImpl extends RebalanceImpl {
- private final DefaultMQPullConsumerImpl defaultMQPullConsumerImpl;
-
-
- public RebalancePullImpl(DefaultMQPullConsumerImpl defaultMQPullConsumerImpl) {
- this(null, null, null, null, defaultMQPullConsumerImpl);
- }
-
-
- public RebalancePullImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy,
- MQClientInstance mQClientFactory, DefaultMQPullConsumerImpl defaultMQPullConsumerImpl) {
- super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory);
- this.defaultMQPullConsumerImpl = defaultMQPullConsumerImpl;
- }
-
- @Override
- public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
- MessageQueueListener messageQueueListener = this.defaultMQPullConsumerImpl.getDefaultMQPullConsumer().getMessageQueueListener();
- if (messageQueueListener != null) {
- try {
- messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
- } catch (Throwable e) {
- log.error("messageQueueChanged exception", e);
- }
- }
- }
-
- @Override
- public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
- this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq);
- this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);
- return true;
- }
-
- @Override
- public ConsumeType consumeType() {
- return ConsumeType.CONSUME_ACTIVELY;
- }
-
- @Override
- public void removeDirtyOffset(final MessageQueue mq) {
- this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);
- }
-
- @Override
- public long computePullFromWhere(MessageQueue mq) {
- return 0;
- }
-
- @Override
- public void dispatchPullRequest(List<PullRequest> pullRequestList) {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePushImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePushImpl.java
deleted file mode 100644
index 2377d29..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.consumer;
-
-import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import com.alibaba.rocketmq.client.consumer.store.OffsetStore;
-import com.alibaba.rocketmq.client.consumer.store.ReadOffsetType;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.UtilAll;
-import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
-import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
-
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-
-/**
- * @author shijia.wxr
- */
-public class RebalancePushImpl extends RebalanceImpl {
- private final static long UNLOCK_DELAY_TIME_MILLS = Long.parseLong(System.getProperty("rocketmq.client.unlockDelayTimeMills", "20000"));
- private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
-
-
- public RebalancePushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
- this(null, null, null, null, defaultMQPushConsumerImpl);
- }
-
-
- public RebalancePushImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy,
- MQClientInstance mQClientFactory, DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
- super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory);
- this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
- }
-
- @Override
- public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
- }
-
- @Override
- public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
- this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
- this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
- if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
- && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
- try {
- if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
- try {
- return this.unlockDelay(mq, pq);
- } finally {
- pq.getLockConsume().unlock();
- }
- } else {
- log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", //
- mq, //
- pq.getTryUnlockTimes());
-
- pq.incTryUnlockTimes();
- }
- } catch (Exception e) {
- log.error("removeUnnecessaryMessageQueue Exception", e);
- }
-
- return false;
- }
- return true;
- }
-
- private boolean unlockDelay(final MessageQueue mq, final ProcessQueue pq) {
-
- if (pq.hasTempMessage()) {
- log.info("[{}]unlockDelay, begin {} ", mq.hashCode(), mq);
- this.defaultMQPushConsumerImpl.getmQClientFactory().getScheduledExecutorService().schedule(new Runnable() {
- @Override
- public void run() {
- log.info("[{}]unlockDelay, execute at once {}", mq.hashCode(), mq);
- RebalancePushImpl.this.unlock(mq, true);
- }
- }, UNLOCK_DELAY_TIME_MILLS, TimeUnit.MILLISECONDS);
- } else {
- this.unlock(mq, true);
- }
- return true;
- }
-
- @Override
- public ConsumeType consumeType() {
- return ConsumeType.CONSUME_PASSIVELY;
- }
-
- @Override
- public void removeDirtyOffset(final MessageQueue mq) {
- this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
- }
-
- @Override
- public long computePullFromWhere(MessageQueue mq) {
- long result = -1;
- final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
- final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
- switch (consumeFromWhere) {
- case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
- case CONSUME_FROM_MIN_OFFSET:
- case CONSUME_FROM_MAX_OFFSET:
- case CONSUME_FROM_LAST_OFFSET: {
- long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
- if (lastOffset >= 0) {
- result = lastOffset;
- }
- // First start,no offset
- else if (-1 == lastOffset) {
- if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- result = 0L;
- } else {
- try {
- result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
- } catch (MQClientException e) {
- result = -1;
- }
- }
- } else {
- result = -1;
- }
- break;
- }
- case CONSUME_FROM_FIRST_OFFSET: {
- long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
- if (lastOffset >= 0) {
- result = lastOffset;
- } else if (-1 == lastOffset) {
- result = 0L;
- } else {
- result = -1;
- }
- break;
- }
- case CONSUME_FROM_TIMESTAMP: {
- long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
- if (lastOffset >= 0) {
- result = lastOffset;
- } else if (-1 == lastOffset) {
- if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- try {
- result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
- } catch (MQClientException e) {
- result = -1;
- }
- } else {
- try {
- long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
- UtilAll.YYYY_MMDD_HHMMSS).getTime();
- result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
- } catch (MQClientException e) {
- result = -1;
- }
- }
- } else {
- result = -1;
- }
- break;
- }
-
- default:
- break;
- }
-
- return result;
- }
-
- @Override
- public void dispatchPullRequest(List<PullRequest> pullRequestList) {
- for (PullRequest pullRequest : pullRequestList) {
- this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
- log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceService.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceService.java
deleted file mode 100644
index 47a9da5..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/consumer/RebalanceService.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl.consumer;
-
-import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.common.ServiceThread;
-import org.slf4j.Logger;
-
-
-/**
- * Rebalance Service
- *
- * @author shijia.wxr
- */
-public class RebalanceService extends ServiceThread {
- private static long waitInterval =
- Long.parseLong(System.getProperty(
- "rocketmq.client.rebalance.waitInterval", "20000"));
- private final Logger log = ClientLogger.getLog();
- private final MQClientInstance mqClientFactory;
-
- public RebalanceService(MQClientInstance mqClientFactory) {
- this.mqClientFactory = mqClientFactory;
- }
-
- @Override
- public void run() {
- log.info(this.getServiceName() + " service started");
-
- while (!this.isStopped()) {
- this.waitForRunning(waitInterval);
- this.mqClientFactory.doRebalance();
- }
-
- log.info(this.getServiceName() + " service end");
- }
-
-
- @Override
- public String getServiceName() {
- return RebalanceService.class.getSimpleName();
- }
-}