You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 02:44:16 UTC
[20/58] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18
Rename package name from com.alibaba to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
new file mode 100644
index 0000000..070635a
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -0,0 +1,380 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.producer;
+
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.QueryResult;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.common.message.*;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
+ private String producerGroup;
+ /**
+ * Just for testing or demo program
+ */
+ private String createTopicKey = MixAll.DEFAULT_TOPIC;
+ private volatile int defaultTopicQueueNums = 4;
+ private int sendMsgTimeout = 3000;
+ private int compressMsgBodyOverHowmuch = 1024 * 4;
+ private int retryTimesWhenSendFailed = 2;
+ private int retryTimesWhenSendAsyncFailed = 2;
+
+ private boolean retryAnotherBrokerWhenNotStoreOK = false;
+ private int maxMessageSize = 1024 * 1024 * 4; // 4M
+ public DefaultMQProducer() {
+ this(MixAll.DEFAULT_PRODUCER_GROUP, null);
+ }
+
+
+ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
+ this.producerGroup = producerGroup;
+ defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
+ }
+
+
+ public DefaultMQProducer(final String producerGroup) {
+ this(producerGroup, null);
+ }
+
+
+ public DefaultMQProducer(RPCHook rpcHook) {
+ this(MixAll.DEFAULT_PRODUCER_GROUP, rpcHook);
+ }
+
+
+ @Override
+ public void start() throws MQClientException {
+ this.defaultMQProducerImpl.start();
+ }
+
+ @Override
+ public void shutdown() {
+ this.defaultMQProducerImpl.shutdown();
+ }
+
+
+ @Override
+ public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException {
+ return this.defaultMQProducerImpl.fetchPublishMessageQueues(topic);
+ }
+
+
+ @Override
+ public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return this.defaultMQProducerImpl.send(msg);
+ }
+
+
+ @Override
+ public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return this.defaultMQProducerImpl.send(msg, timeout);
+ }
+
+
+ @Override
+ public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
+ this.defaultMQProducerImpl.send(msg, sendCallback);
+ }
+
+
+ @Override
+ public void send(Message msg, SendCallback sendCallback, long timeout)
+ throws MQClientException, RemotingException, InterruptedException {
+ this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
+ }
+
+
+ @Override
+ public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
+ this.defaultMQProducerImpl.sendOneway(msg);
+ }
+
+
+ @Override
+ public SendResult send(Message msg, MessageQueue mq)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return this.defaultMQProducerImpl.send(msg, mq);
+ }
+
+
+ @Override
+ public SendResult send(Message msg, MessageQueue mq, long timeout)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return this.defaultMQProducerImpl.send(msg, mq, timeout);
+ }
+
+
+ @Override
+ public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
+ throws MQClientException, RemotingException, InterruptedException {
+ this.defaultMQProducerImpl.send(msg, mq, sendCallback);
+ }
+
+
+ @Override
+ public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
+ throws MQClientException, RemotingException, InterruptedException {
+ this.defaultMQProducerImpl.send(msg, mq, sendCallback, timeout);
+ }
+
+
+ @Override
+ public void sendOneway(Message msg, MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
+ this.defaultMQProducerImpl.sendOneway(msg, mq);
+ }
+
+
+ @Override
+ public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return this.defaultMQProducerImpl.send(msg, selector, arg);
+ }
+
+
+ @Override
+ public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return this.defaultMQProducerImpl.send(msg, selector, arg, timeout);
+ }
+
+
+ @Override
+ public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
+ throws MQClientException, RemotingException, InterruptedException {
+ this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback);
+ }
+
+
+ @Override
+ public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)
+ throws MQClientException, RemotingException, InterruptedException {
+ this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout);
+ }
+
+
+ @Override
+ public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
+ throws MQClientException, RemotingException, InterruptedException {
+ this.defaultMQProducerImpl.sendOneway(msg, selector, arg);
+ }
+
+
+ @Override
+ public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, final Object arg)
+ throws MQClientException {
+ throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class");
+ }
+
+
+ @Override
+ public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
+ createTopic(key, newTopic, queueNum, 0);
+ }
+
+
+ @Override
+ public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
+ this.defaultMQProducerImpl.createTopic(key, newTopic, queueNum, topicSysFlag);
+ }
+
+
+ @Override
+ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
+ return this.defaultMQProducerImpl.searchOffset(mq, timestamp);
+ }
+
+
+ @Override
+ public long maxOffset(MessageQueue mq) throws MQClientException {
+ return this.defaultMQProducerImpl.maxOffset(mq);
+ }
+
+
+ @Override
+ public long minOffset(MessageQueue mq) throws MQClientException {
+ return this.defaultMQProducerImpl.minOffset(mq);
+ }
+
+
+ @Override
+ public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
+ return this.defaultMQProducerImpl.earliestMsgStoreTime(mq);
+ }
+
+
+ @Override
+ public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ return this.defaultMQProducerImpl.viewMessage(offsetMsgId);
+ }
+
+
+ @Override
+ public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
+ throws MQClientException, InterruptedException {
+ return this.defaultMQProducerImpl.queryMessage(topic, key, maxNum, begin, end);
+ }
+
+
+ @Override
+ public MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ try {
+ MessageId oldMsgId = MessageDecoder.decodeMessageId(msgId);
+ return this.viewMessage(msgId);
+ } catch (Exception e) {
+ }
+ return this.defaultMQProducerImpl.queryMessageByUniqKey(topic, msgId);
+ }
+
+ public String getProducerGroup() {
+ return producerGroup;
+ }
+
+
+ public void setProducerGroup(String producerGroup) {
+ this.producerGroup = producerGroup;
+ }
+
+
+ public String getCreateTopicKey() {
+ return createTopicKey;
+ }
+
+
+ public void setCreateTopicKey(String createTopicKey) {
+ this.createTopicKey = createTopicKey;
+ }
+
+
+ public int getSendMsgTimeout() {
+ return sendMsgTimeout;
+ }
+
+
+ public void setSendMsgTimeout(int sendMsgTimeout) {
+ this.sendMsgTimeout = sendMsgTimeout;
+ }
+
+
+ public int getCompressMsgBodyOverHowmuch() {
+ return compressMsgBodyOverHowmuch;
+ }
+
+
+ public void setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch) {
+ this.compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch;
+ }
+
+
+ public DefaultMQProducerImpl getDefaultMQProducerImpl() {
+ return defaultMQProducerImpl;
+ }
+
+
+ public boolean isRetryAnotherBrokerWhenNotStoreOK() {
+ return retryAnotherBrokerWhenNotStoreOK;
+ }
+
+
+ public void setRetryAnotherBrokerWhenNotStoreOK(boolean retryAnotherBrokerWhenNotStoreOK) {
+ this.retryAnotherBrokerWhenNotStoreOK = retryAnotherBrokerWhenNotStoreOK;
+ }
+
+
+ public int getMaxMessageSize() {
+ return maxMessageSize;
+ }
+
+
+ public void setMaxMessageSize(int maxMessageSize) {
+ this.maxMessageSize = maxMessageSize;
+ }
+
+
+ public int getDefaultTopicQueueNums() {
+ return defaultTopicQueueNums;
+ }
+
+
+ public void setDefaultTopicQueueNums(int defaultTopicQueueNums) {
+ this.defaultTopicQueueNums = defaultTopicQueueNums;
+ }
+
+
+ public int getRetryTimesWhenSendFailed() {
+ return retryTimesWhenSendFailed;
+ }
+
+
+ public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
+ this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
+ }
+
+
+ public boolean isSendMessageWithVIPChannel() {
+ return isVipChannelEnabled();
+ }
+
+
+ public void setSendMessageWithVIPChannel(final boolean sendMessageWithVIPChannel) {
+ this.setVipChannelEnabled(sendMessageWithVIPChannel);
+ }
+
+
+ public long[] getNotAvailableDuration() {
+ return this.defaultMQProducerImpl.getNotAvailableDuration();
+ }
+
+ public void setNotAvailableDuration(final long[] notAvailableDuration) {
+ this.defaultMQProducerImpl.setNotAvailableDuration(notAvailableDuration);
+ }
+
+ public long[] getLatencyMax() {
+ return this.defaultMQProducerImpl.getLatencyMax();
+ }
+
+ public void setLatencyMax(final long[] latencyMax) {
+ this.defaultMQProducerImpl.setLatencyMax(latencyMax);
+ }
+
+ public boolean isSendLatencyFaultEnable() {
+ return this.defaultMQProducerImpl.isSendLatencyFaultEnable();
+ }
+
+ public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
+ this.defaultMQProducerImpl.setSendLatencyFaultEnable(sendLatencyFaultEnable);
+ }
+
+ public int getRetryTimesWhenSendAsyncFailed() {
+ return retryTimesWhenSendAsyncFailed;
+ }
+
+ public void setRetryTimesWhenSendAsyncFailed(final int retryTimesWhenSendAsyncFailed) {
+ this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java
new file mode 100644
index 0000000..5e8178a
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.producer;
+
+import org.apache.rocketmq.common.message.Message;
+
+
+/**
+ * @author shijia.wxr
+ */
+public interface LocalTransactionExecuter {
+ public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionState.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionState.java b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionState.java
new file mode 100644
index 0000000..ce5b0d9
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionState.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.producer;
+
+/**
+ * @author shijia.wxr
+ */
+public enum LocalTransactionState {
+ COMMIT_MESSAGE,
+ ROLLBACK_MESSAGE,
+ UNKNOW,
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
new file mode 100644
index 0000000..0ea4a33
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.producer;
+
+import org.apache.rocketmq.client.MQAdmin;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public interface MQProducer extends MQAdmin {
+ void start() throws MQClientException;
+
+ void shutdown();
+
+
+ List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;
+
+
+ SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,
+ InterruptedException;
+
+
+ SendResult send(final Message msg, final long timeout) throws MQClientException,
+ RemotingException, MQBrokerException, InterruptedException;
+
+
+ void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
+ RemotingException, InterruptedException;
+
+
+ void send(final Message msg, final SendCallback sendCallback, final long timeout)
+ throws MQClientException, RemotingException, InterruptedException;
+
+
+ void sendOneway(final Message msg) throws MQClientException, RemotingException,
+ InterruptedException;
+
+
+ SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,
+ RemotingException, MQBrokerException, InterruptedException;
+
+
+ SendResult send(final Message msg, final MessageQueue mq, final long timeout)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
+
+
+ void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback)
+ throws MQClientException, RemotingException, InterruptedException;
+
+
+ void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout)
+ throws MQClientException, RemotingException, InterruptedException;
+
+
+ void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException,
+ RemotingException, InterruptedException;
+
+
+ SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
+
+
+ SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg,
+ final long timeout) throws MQClientException, RemotingException, MQBrokerException,
+ InterruptedException;
+
+
+ void send(final Message msg, final MessageQueueSelector selector, final Object arg,
+ final SendCallback sendCallback) throws MQClientException, RemotingException,
+ InterruptedException;
+
+
+ void send(final Message msg, final MessageQueueSelector selector, final Object arg,
+ final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
+ InterruptedException;
+
+
+ void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg)
+ throws MQClientException, RemotingException, InterruptedException;
+
+
+ TransactionSendResult sendMessageInTransaction(final Message msg,
+ final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/MessageQueueSelector.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MessageQueueSelector.java b/client/src/main/java/org/apache/rocketmq/client/producer/MessageQueueSelector.java
new file mode 100644
index 0000000..c7a9124
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/MessageQueueSelector.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.producer;
+
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public interface MessageQueueSelector {
+ MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/SendCallback.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/SendCallback.java b/client/src/main/java/org/apache/rocketmq/client/producer/SendCallback.java
new file mode 100644
index 0000000..7b0e00e
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/SendCallback.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.producer;
+
+/**
+ * @author shijia.wxr
+ */
+public interface SendCallback {
+ public void onSuccess(final SendResult sendResult);
+
+
+ public void onException(final Throwable e);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/SendResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/SendResult.java b/client/src/main/java/org/apache/rocketmq/client/producer/SendResult.java
new file mode 100644
index 0000000..02ed6b5
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/SendResult.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.producer;
+
+import com.alibaba.fastjson.JSON;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SendResult {
+ private SendStatus sendStatus;
+ private String msgId;
+ private MessageQueue messageQueue;
+ private long queueOffset;
+ private String transactionId;
+ private String offsetMsgId;
+ private String regionId;
+ private boolean traceOn = true;
+
+ public SendResult() {
+ }
+
+ public SendResult(SendStatus sendStatus, String msgId, String offsetMsgId, MessageQueue messageQueue, long queueOffset) {
+ this.sendStatus = sendStatus;
+ this.msgId = msgId;
+ this.offsetMsgId = offsetMsgId;
+ this.messageQueue = messageQueue;
+ this.queueOffset = queueOffset;
+ }
+
+ public SendResult(final SendStatus sendStatus, final String msgId, final MessageQueue messageQueue, final long queueOffset, final String transactionId, final String offsetMsgId, final String regionId) {
+ this.sendStatus = sendStatus;
+ this.msgId = msgId;
+ this.messageQueue = messageQueue;
+ this.queueOffset = queueOffset;
+ this.transactionId = transactionId;
+ this.offsetMsgId = offsetMsgId;
+ this.regionId = regionId;
+ }
+
+ public boolean isTraceOn() {
+ return traceOn;
+ }
+
+ public void setTraceOn(final boolean traceOn) {
+ this.traceOn = traceOn;
+ }
+
+ public String getRegionId() {
+ return regionId;
+ }
+
+ public void setRegionId(final String regionId) {
+ this.regionId = regionId;
+ }
+
+ public static String encoderSendResultToJson(final Object obj) {
+ return JSON.toJSONString(obj);
+ }
+
+ public static SendResult decoderSendResultFromJson(String json) {
+ return JSON.parseObject(json, SendResult.class);
+ }
+
+ public String getMsgId() {
+ return msgId;
+ }
+
+
+ public void setMsgId(String msgId) {
+ this.msgId = msgId;
+ }
+
+
+ public SendStatus getSendStatus() {
+ return sendStatus;
+ }
+
+
+ public void setSendStatus(SendStatus sendStatus) {
+ this.sendStatus = sendStatus;
+ }
+
+
+ public MessageQueue getMessageQueue() {
+ return messageQueue;
+ }
+
+
+ public void setMessageQueue(MessageQueue messageQueue) {
+ this.messageQueue = messageQueue;
+ }
+
+
+ public long getQueueOffset() {
+ return queueOffset;
+ }
+
+
+ public void setQueueOffset(long queueOffset) {
+ this.queueOffset = queueOffset;
+ }
+
+
+ public String getTransactionId() {
+ return transactionId;
+ }
+
+
+ public void setTransactionId(String transactionId) {
+ this.transactionId = transactionId;
+ }
+
+ public String getOffsetMsgId() {
+ return offsetMsgId;
+ }
+
+ public void setOffsetMsgId(String offsetMsgId) {
+ this.offsetMsgId = offsetMsgId;
+ }
+
+ @Override
+ public String toString() {
+ return "SendResult [sendStatus=" + sendStatus + ", msgId=" + msgId + ", offsetMsgId=" + offsetMsgId + ", messageQueue=" + messageQueue
+ + ", queueOffset=" + queueOffset + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/SendStatus.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/SendStatus.java b/client/src/main/java/org/apache/rocketmq/client/producer/SendStatus.java
new file mode 100644
index 0000000..038bc99
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/SendStatus.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.producer;
+
+/**
+ * @author shijia.wxr
+ */
+public enum SendStatus {
+ SEND_OK,
+ FLUSH_DISK_TIMEOUT,
+ FLUSH_SLAVE_TIMEOUT,
+ SLAVE_NOT_AVAILABLE,
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/TransactionCheckListener.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionCheckListener.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionCheckListener.java
new file mode 100644
index 0000000..9a11d50
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionCheckListener.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.producer;
+
+import org.apache.rocketmq.common.message.MessageExt;
+
+
+/**
+ * @author shijia.wxr
+ */
+public interface TransactionCheckListener {
+ LocalTransactionState checkLocalTransactionState(final MessageExt msg);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
new file mode 100644
index 0000000..eaca6ec
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.producer;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.RPCHook;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class TransactionMQProducer extends DefaultMQProducer {
+ private TransactionCheckListener transactionCheckListener;
+ private int checkThreadPoolMinSize = 1;
+ private int checkThreadPoolMaxSize = 1;
+ private int checkRequestHoldMax = 2000;
+
+
+ public TransactionMQProducer() {
+ }
+
+
+ public TransactionMQProducer(final String producerGroup) {
+ super(producerGroup);
+ }
+
+ public TransactionMQProducer(final String producerGroup, RPCHook rpcHook) {
+ super(producerGroup, rpcHook);
+ }
+
+ @Override
+ public void start() throws MQClientException {
+ this.defaultMQProducerImpl.initTransactionEnv();
+ super.start();
+ }
+
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ this.defaultMQProducerImpl.destroyTransactionEnv();
+ }
+
+
+ @Override
+ public TransactionSendResult sendMessageInTransaction(final Message msg,
+ final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException {
+ if (null == this.transactionCheckListener) {
+ throw new MQClientException("localTransactionBranchCheckListener is null", null);
+ }
+
+ return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg);
+ }
+
+
+ public TransactionCheckListener getTransactionCheckListener() {
+ return transactionCheckListener;
+ }
+
+
+ public void setTransactionCheckListener(TransactionCheckListener transactionCheckListener) {
+ this.transactionCheckListener = transactionCheckListener;
+ }
+
+
+ public int getCheckThreadPoolMinSize() {
+ return checkThreadPoolMinSize;
+ }
+
+
+ public void setCheckThreadPoolMinSize(int checkThreadPoolMinSize) {
+ this.checkThreadPoolMinSize = checkThreadPoolMinSize;
+ }
+
+
+ public int getCheckThreadPoolMaxSize() {
+ return checkThreadPoolMaxSize;
+ }
+
+
+ public void setCheckThreadPoolMaxSize(int checkThreadPoolMaxSize) {
+ this.checkThreadPoolMaxSize = checkThreadPoolMaxSize;
+ }
+
+
+ public int getCheckRequestHoldMax() {
+ return checkRequestHoldMax;
+ }
+
+
+ public void setCheckRequestHoldMax(int checkRequestHoldMax) {
+ this.checkRequestHoldMax = checkRequestHoldMax;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/TransactionSendResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionSendResult.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionSendResult.java
new file mode 100644
index 0000000..478c39d
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionSendResult.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.producer;
+
+/**
+ * @author shijia.wxr
+ */
+public class TransactionSendResult extends SendResult {
+ private LocalTransactionState localTransactionState;
+
+
+ public TransactionSendResult() {
+ }
+
+
+ public LocalTransactionState getLocalTransactionState() {
+ return localTransactionState;
+ }
+
+
+ public void setLocalTransactionState(LocalTransactionState localTransactionState) {
+ this.localTransactionState = localTransactionState;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByHash.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByHash.java b/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByHash.java
new file mode 100644
index 0000000..0f6ce48
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByHash.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.producer.selector;
+
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SelectMessageQueueByHash implements MessageQueueSelector {
+
+ @Override
+ public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+ int value = arg.hashCode();
+ if (value < 0) {
+ value = Math.abs(value);
+ }
+
+ value = value % mqs.size();
+ return mqs.get(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java b/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java
new file mode 100644
index 0000000..1902de5
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.producer.selector;
+
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
+ private Set<String> consumeridcs;
+
+
+ @Override
+ public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+ return null;
+ }
+
+
+ public Set<String> getConsumeridcs() {
+ return consumeridcs;
+ }
+
+
+ public void setConsumeridcs(Set<String> consumeridcs) {
+ this.consumeridcs = consumeridcs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java b/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java
new file mode 100644
index 0000000..b39b777
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.producer.selector;
+
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.List;
+import java.util.Random;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SelectMessageQueueByRandoom implements MessageQueueSelector {
+ private Random random = new Random(System.currentTimeMillis());
+
+
+ @Override
+ public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+ int value = random.nextInt();
+ if (value < 0) {
+ value = Math.abs(value);
+ }
+
+ value = value % mqs.size();
+ return mqs.get(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/stat/ConsumerStatsManager.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/stat/ConsumerStatsManager.java b/client/src/main/java/org/apache/rocketmq/client/stat/ConsumerStatsManager.java
new file mode 100644
index 0000000..3234ada
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/stat/ConsumerStatsManager.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.stat;
+
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.ConsumeStatus;
+import org.apache.rocketmq.common.stats.StatsItemSet;
+import org.apache.rocketmq.common.stats.StatsSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+
+public class ConsumerStatsManager {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.CLIENT_LOGGER_NAME);
+
+ private static final String TOPIC_AND_GROUP_CONSUME_OK_TPS = "CONSUME_OK_TPS";
+ private static final String TOPIC_AND_GROUP_CONSUME_FAILED_TPS = "CONSUME_FAILED_TPS";
+ private static final String TOPIC_AND_GROUP_CONSUME_RT = "CONSUME_RT";
+ private static final String TOPIC_AND_GROUP_PULL_TPS = "PULL_TPS";
+ private static final String TOPIC_AND_GROUP_PULL_RT = "PULL_RT";
+
+ private final StatsItemSet topicAndGroupConsumeOKTPS;
+ private final StatsItemSet topicAndGroupConsumeRT;
+ private final StatsItemSet topicAndGroupConsumeFailedTPS;
+ private final StatsItemSet topicAndGroupPullTPS;
+ private final StatsItemSet topicAndGroupPullRT;
+
+
+ public ConsumerStatsManager(final ScheduledExecutorService scheduledExecutorService) {
+ this.topicAndGroupConsumeOKTPS =
+ new StatsItemSet(TOPIC_AND_GROUP_CONSUME_OK_TPS, scheduledExecutorService, log);
+
+ this.topicAndGroupConsumeRT =
+ new StatsItemSet(TOPIC_AND_GROUP_CONSUME_RT, scheduledExecutorService, log);
+
+ this.topicAndGroupConsumeFailedTPS =
+ new StatsItemSet(TOPIC_AND_GROUP_CONSUME_FAILED_TPS, scheduledExecutorService, log);
+
+ this.topicAndGroupPullTPS = new StatsItemSet(TOPIC_AND_GROUP_PULL_TPS, scheduledExecutorService, log);
+
+ this.topicAndGroupPullRT = new StatsItemSet(TOPIC_AND_GROUP_PULL_RT, scheduledExecutorService, log);
+ }
+
+
+ public void start() {
+ }
+
+
+ public void shutdown() {
+ }
+
+
+ public void incPullRT(final String group, final String topic, final long rt) {
+ this.topicAndGroupPullRT.addValue(topic + "@" + group, (int) rt, 1);
+ }
+
+
+ public void incPullTPS(final String group, final String topic, final long msgs) {
+ this.topicAndGroupPullTPS.addValue(topic + "@" + group, (int) msgs, 1);
+ }
+
+
+ public void incConsumeRT(final String group, final String topic, final long rt) {
+ this.topicAndGroupConsumeRT.addValue(topic + "@" + group, (int) rt, 1);
+ }
+
+
+ public void incConsumeOKTPS(final String group, final String topic, final long msgs) {
+ this.topicAndGroupConsumeOKTPS.addValue(topic + "@" + group, (int) msgs, 1);
+ }
+
+
+ public void incConsumeFailedTPS(final String group, final String topic, final long msgs) {
+ this.topicAndGroupConsumeFailedTPS.addValue(topic + "@" + group, (int) msgs, 1);
+ }
+
+ public ConsumeStatus consumeStatus(final String group, final String topic) {
+ ConsumeStatus cs = new ConsumeStatus();
+ {
+ StatsSnapshot ss = this.getPullRT(group, topic);
+ if (ss != null) {
+ cs.setPullRT(ss.getAvgpt());
+ }
+ }
+
+ {
+ StatsSnapshot ss = this.getPullTPS(group, topic);
+ if (ss != null) {
+ cs.setPullTPS(ss.getTps());
+ }
+ }
+
+ {
+ StatsSnapshot ss = this.getConsumeRT(group, topic);
+ if (ss != null) {
+ cs.setConsumeRT(ss.getAvgpt());
+ }
+ }
+
+ {
+ StatsSnapshot ss = this.getConsumeOKTPS(group, topic);
+ if (ss != null) {
+ cs.setConsumeOKTPS(ss.getTps());
+ }
+ }
+
+ {
+ StatsSnapshot ss = this.getConsumeFailedTPS(group, topic);
+ if (ss != null) {
+ cs.setConsumeFailedTPS(ss.getTps());
+ }
+ }
+
+ {
+ StatsSnapshot ss = this.topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" + group);
+ if (ss != null) {
+ cs.setConsumeFailedMsgs(ss.getSum());
+ }
+ }
+
+ return cs;
+ }
+
+ private StatsSnapshot getPullRT(final String group, final String topic) {
+ return this.topicAndGroupPullRT.getStatsDataInMinute(topic + "@" + group);
+ }
+
+ private StatsSnapshot getPullTPS(final String group, final String topic) {
+ return this.topicAndGroupPullTPS.getStatsDataInMinute(topic + "@" + group);
+ }
+
+ private StatsSnapshot getConsumeRT(final String group, final String topic) {
+ StatsSnapshot statsData = this.topicAndGroupConsumeRT.getStatsDataInMinute(topic + "@" + group);
+ if (0 == statsData.getSum()) {
+ statsData = this.topicAndGroupConsumeRT.getStatsDataInHour(topic + "@" + group);
+ }
+
+ return statsData;
+ }
+
+ private StatsSnapshot getConsumeOKTPS(final String group, final String topic) {
+ return this.topicAndGroupConsumeOKTPS.getStatsDataInMinute(topic + "@" + group);
+ }
+
+ private StatsSnapshot getConsumeFailedTPS(final String group, final String topic) {
+ return this.topicAndGroupConsumeFailedTPS.getStatsDataInMinute(topic + "@" + group);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/test/java/com/alibaba/rocketmq/client/ValidatorsTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/com/alibaba/rocketmq/client/ValidatorsTest.java b/client/src/test/java/com/alibaba/rocketmq/client/ValidatorsTest.java
deleted file mode 100644
index 2a10ec4..0000000
--- a/client/src/test/java/com/alibaba/rocketmq/client/ValidatorsTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.client;
-
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import org.junit.Assert;
-import org.junit.Test;
-
-
-public class ValidatorsTest {
-
- @Test
- public void topicValidatorTest() throws MQClientException {
- Validators.checkTopic("Hello");
- Validators.checkTopic("%RETRY%Hello");
- Validators.checkTopic("_%RETRY%Hello");
- Validators.checkTopic("-%RETRY%Hello");
- Validators.checkTopic("223-%RETRY%Hello");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/test/java/com/alibaba/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/com/alibaba/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java b/client/src/test/java/com/alibaba/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java
deleted file mode 100644
index 5ef75ed..0000000
--- a/client/src/test/java/com/alibaba/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * @author yubao.fyb@taoboa.com
- * @version $id$
- */
-package com.alibaba.rocketmq.client.consumer.loadbalance;
-
-import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import com.alibaba.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
-import com.alibaba.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * @author yubao.fyb@alibaba-inc.com created on 2013-07-03 16:24
- */
-public class AllocateMessageQueueAveragelyTest {
- private AllocateMessageQueueStrategy allocateMessageQueueAveragely;
- private String currentCID;
- private String topic;
- private List<MessageQueue> messageQueueList;
- private List<String> consumerIdList;
-
- @Before
- public void init() {
- allocateMessageQueueAveragely = new AllocateMessageQueueAveragely();
- topic = "topic_test";
- }
-
- @Test
- public void testConsumer1() {
- currentCID = "0";
- createConsumerIdList(1);
- createMessageQueueList(5);
- List<MessageQueue> result =
- allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
- printMessageQueue(result, "testConsumer1");
- Assert.assertEquals(result.size(), 5);
- Assert.assertEquals(result.containsAll(getMessageQueueList()), true);
- }
-
- public void createConsumerIdList(int size) {
- consumerIdList = new ArrayList<String>(size);
- for (int i = 0; i < size; i++) {
- consumerIdList.add(String.valueOf(i));
- }
- }
-
- public void createMessageQueueList(int size) {
- messageQueueList = new ArrayList<MessageQueue>(size);
- for (int i = 0; i < size; i++) {
- MessageQueue mq = new MessageQueue(topic, "brokerName", i);
- messageQueueList.add(mq);
- }
- }
-
- public void printMessageQueue(List<MessageQueue> messageQueueList, String name) {
- if (messageQueueList == null || messageQueueList.size() < 1)
- return;
- System.out.println(name + ".......................................start");
- for (MessageQueue messageQueue : messageQueueList) {
- System.out.println(messageQueue);
- }
- System.out.println(name + ".......................................end");
- }
-
- public List<MessageQueue> getMessageQueueList() {
- return messageQueueList;
- }
-
- public void setMessageQueueList(List<MessageQueue> messageQueueList) {
- this.messageQueueList = messageQueueList;
- }
-
- @Test
- public void testConsumer2() {
- currentCID = "1";
- createConsumerIdList(2);
- createMessageQueueList(5);
- List<MessageQueue> result =
- allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
- printMessageQueue(result, "testConsumer2");
- Assert.assertEquals(result.size(), 3);
- Assert.assertEquals(result.containsAll(getMessageQueueList().subList(2, 5)), true);
-
- }
-
- @Test
- public void testConsumer3CurrentCID0() {
- currentCID = "0";
- createConsumerIdList(3);
- createMessageQueueList(5);
- List<MessageQueue> result =
- allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
- printMessageQueue(result, "testConsumer3CurrentCID0");
- Assert.assertEquals(result.size(), 1);
- Assert.assertEquals(result.containsAll(getMessageQueueList().subList(0, 1)), true);
- }
-
- @Test
- public void testConsumer3CurrentCID1() {
- currentCID = "1";
- createConsumerIdList(3);
- createMessageQueueList(5);
- List<MessageQueue> result =
- allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
- printMessageQueue(result, "testConsumer3CurrentCID1");
- Assert.assertEquals(result.size(), 1);
- Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true);
- }
-
- @Test
- public void testConsumer3CurrentCID2() {
- currentCID = "2";
- createConsumerIdList(3);
- createMessageQueueList(5);
- List<MessageQueue> result =
- allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
- printMessageQueue(result, "testConsumer3CurrentCID2");
- Assert.assertEquals(result.size(), 3);
- Assert.assertEquals(result.containsAll(getMessageQueueList().subList(2, 5)), true);
- }
-
- @Test
- public void testConsumer4() {
- currentCID = "1";
- createConsumerIdList(4);
- createMessageQueueList(5);
- List<MessageQueue> result =
- allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
- printMessageQueue(result, "testConsumer4");
- Assert.assertEquals(result.size(), 1);
- Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true);
- }
-
- @Test
- public void testConsumer5() {
- currentCID = "1";
- createConsumerIdList(5);
- createMessageQueueList(5);
- List<MessageQueue> result =
- allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
- printMessageQueue(result, "testConsumer5");
- Assert.assertEquals(result.size(), 1);
- Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true);
- }
-
- @Test
- public void testConsumer6() {
- currentCID = "1";
- createConsumerIdList(2);
- createMessageQueueList(6);
- List<MessageQueue> result =
- allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
- printMessageQueue(result, "testConsumer");
- Assert.assertEquals(result.size(), 3);
- Assert.assertEquals(result.containsAll(getMessageQueueList().subList(3, 6)), true);
- }
-
- @Test
- public void testCurrentCIDNotExists() {
- currentCID = String.valueOf(Integer.MAX_VALUE);
- createConsumerIdList(2);
- createMessageQueueList(6);
- List<MessageQueue> result =
- allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
- printMessageQueue(result, "testCurrentCIDNotExists");
- Assert.assertEquals(result.size(), 0);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testCurrentCIDIllegalArgument() {
- createConsumerIdList(2);
- createMessageQueueList(6);
- allocateMessageQueueAveragely.allocate("", "", getMessageQueueList(), getConsumerIdList());
- }
-
- public List<String> getConsumerIdList() {
- return consumerIdList;
- }
-
- public void setConsumerIdList(List<String> consumerIdList) {
- this.consumerIdList = consumerIdList;
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testMessageQueueIllegalArgument() {
- currentCID = "0";
- createConsumerIdList(2);
- allocateMessageQueueAveragely.allocate("", currentCID, null, getConsumerIdList());
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testConsumerIdIllegalArgument() {
- currentCID = "0";
- createMessageQueueList(6);
- allocateMessageQueueAveragely.allocate("", currentCID, getMessageQueueList(), null);
- }
-
- @Test
- public void testAllocate() {
- AllocateMessageQueueAveragely allocateMessageQueueAveragely = new AllocateMessageQueueAveragely();
- String topic = "topic_test";
- String currentCID = "CID";
- int queueSize = 19;
- int consumerSize = 10;
- List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
- for (int i = 0; i < queueSize; i++) {
- MessageQueue mq = new MessageQueue(topic, "brokerName", i);
- mqAll.add(mq);
- }
-
- List<String> cidAll = new ArrayList<String>();
- for (int j = 0; j < consumerSize; j++) {
- cidAll.add("CID" + j);
- }
- System.out.println(mqAll.toString());
- System.out.println(cidAll.toString());
- for (int i = 0; i < consumerSize; i++) {
- List<MessageQueue> rs = allocateMessageQueueAveragely.allocate("", currentCID + i, mqAll, cidAll);
- System.out.println("rs[" + currentCID + i + "]:" + rs.toString());
- }
- }
-
-
- @Test
- public void testAllocateByCircle() {
- AllocateMessageQueueAveragelyByCircle circle = new AllocateMessageQueueAveragelyByCircle();
- String topic = "topic_test";
- String currentCID = "CID";
- int consumerSize = 3;
- int queueSize = 13;
- List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
- for (int i = 0; i < queueSize; i++) {
- MessageQueue mq = new MessageQueue(topic, "brokerName", i);
- mqAll.add(mq);
- }
-
- List<String> cidAll = new ArrayList<String>();
- for (int j = 0; j < consumerSize; j++) {
- cidAll.add("CID" + j);
- }
- System.out.println(mqAll.toString());
- System.out.println(cidAll.toString());
- for (int i = 0; i < consumerSize; i++) {
- List<MessageQueue> rs = circle.allocate("", currentCID + i, mqAll, cidAll);
- System.out.println("rs[" + currentCID + i + "]:" + rs.toString());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java b/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java
new file mode 100644
index 0000000..a3daba5
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.junit.Test;
+
+
+public class ValidatorsTest {
+
+ @Test
+ public void topicValidatorTest() throws MQClientException {
+ Validators.checkTopic("Hello");
+ Validators.checkTopic("%RETRY%Hello");
+ Validators.checkTopic("_%RETRY%Hello");
+ Validators.checkTopic("-%RETRY%Hello");
+ Validators.checkTopic("223-%RETRY%Hello");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/test/java/org/apache/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java
new file mode 100644
index 0000000..7b568c5
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * @author yubao.fyb@taoboa.com
+ * @version $id$
+ */
+package org.apache.rocketmq.client.consumer.loadbalance;
+
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
+import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * @author yubao.fyb@alibaba-inc.com created on 2013-07-03 16:24
+ */
+public class AllocateMessageQueueAveragelyTest {
+ private AllocateMessageQueueStrategy allocateMessageQueueAveragely;
+ private String currentCID;
+ private String topic;
+ private List<MessageQueue> messageQueueList;
+ private List<String> consumerIdList;
+
+ @Before
+ public void init() {
+ allocateMessageQueueAveragely = new AllocateMessageQueueAveragely();
+ topic = "topic_test";
+ }
+
+ @Test
+ public void testConsumer1() {
+ currentCID = "0";
+ createConsumerIdList(1);
+ createMessageQueueList(5);
+ List<MessageQueue> result =
+ allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+ printMessageQueue(result, "testConsumer1");
+ Assert.assertEquals(result.size(), 5);
+ Assert.assertEquals(result.containsAll(getMessageQueueList()), true);
+ }
+
+ public void createConsumerIdList(int size) {
+ consumerIdList = new ArrayList<String>(size);
+ for (int i = 0; i < size; i++) {
+ consumerIdList.add(String.valueOf(i));
+ }
+ }
+
+ public void createMessageQueueList(int size) {
+ messageQueueList = new ArrayList<MessageQueue>(size);
+ for (int i = 0; i < size; i++) {
+ MessageQueue mq = new MessageQueue(topic, "brokerName", i);
+ messageQueueList.add(mq);
+ }
+ }
+
+ public void printMessageQueue(List<MessageQueue> messageQueueList, String name) {
+ if (messageQueueList == null || messageQueueList.size() < 1)
+ return;
+ System.out.println(name + ".......................................start");
+ for (MessageQueue messageQueue : messageQueueList) {
+ System.out.println(messageQueue);
+ }
+ System.out.println(name + ".......................................end");
+ }
+
+ public List<MessageQueue> getMessageQueueList() {
+ return messageQueueList;
+ }
+
+ public void setMessageQueueList(List<MessageQueue> messageQueueList) {
+ this.messageQueueList = messageQueueList;
+ }
+
+ @Test
+ public void testConsumer2() {
+ currentCID = "1";
+ createConsumerIdList(2);
+ createMessageQueueList(5);
+ List<MessageQueue> result =
+ allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+ printMessageQueue(result, "testConsumer2");
+ Assert.assertEquals(result.size(), 3);
+ Assert.assertEquals(result.containsAll(getMessageQueueList().subList(2, 5)), true);
+
+ }
+
+ @Test
+ public void testConsumer3CurrentCID0() {
+ currentCID = "0";
+ createConsumerIdList(3);
+ createMessageQueueList(5);
+ List<MessageQueue> result =
+ allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+ printMessageQueue(result, "testConsumer3CurrentCID0");
+ Assert.assertEquals(result.size(), 1);
+ Assert.assertEquals(result.containsAll(getMessageQueueList().subList(0, 1)), true);
+ }
+
+ @Test
+ public void testConsumer3CurrentCID1() {
+ currentCID = "1";
+ createConsumerIdList(3);
+ createMessageQueueList(5);
+ List<MessageQueue> result =
+ allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+ printMessageQueue(result, "testConsumer3CurrentCID1");
+ Assert.assertEquals(result.size(), 1);
+ Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true);
+ }
+
+ @Test
+ public void testConsumer3CurrentCID2() {
+ currentCID = "2";
+ createConsumerIdList(3);
+ createMessageQueueList(5);
+ List<MessageQueue> result =
+ allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+ printMessageQueue(result, "testConsumer3CurrentCID2");
+ Assert.assertEquals(result.size(), 3);
+ Assert.assertEquals(result.containsAll(getMessageQueueList().subList(2, 5)), true);
+ }
+
+ @Test
+ public void testConsumer4() {
+ currentCID = "1";
+ createConsumerIdList(4);
+ createMessageQueueList(5);
+ List<MessageQueue> result =
+ allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+ printMessageQueue(result, "testConsumer4");
+ Assert.assertEquals(result.size(), 1);
+ Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true);
+ }
+
+ @Test
+ public void testConsumer5() {
+ currentCID = "1";
+ createConsumerIdList(5);
+ createMessageQueueList(5);
+ List<MessageQueue> result =
+ allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+ printMessageQueue(result, "testConsumer5");
+ Assert.assertEquals(result.size(), 1);
+ Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true);
+ }
+
+ @Test
+ public void testConsumer6() {
+ currentCID = "1";
+ createConsumerIdList(2);
+ createMessageQueueList(6);
+ List<MessageQueue> result =
+ allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+ printMessageQueue(result, "testConsumer");
+ Assert.assertEquals(result.size(), 3);
+ Assert.assertEquals(result.containsAll(getMessageQueueList().subList(3, 6)), true);
+ }
+
+ @Test
+ public void testCurrentCIDNotExists() {
+ currentCID = String.valueOf(Integer.MAX_VALUE);
+ createConsumerIdList(2);
+ createMessageQueueList(6);
+ List<MessageQueue> result =
+ allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+ printMessageQueue(result, "testCurrentCIDNotExists");
+ Assert.assertEquals(result.size(), 0);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testCurrentCIDIllegalArgument() {
+ createConsumerIdList(2);
+ createMessageQueueList(6);
+ allocateMessageQueueAveragely.allocate("", "", getMessageQueueList(), getConsumerIdList());
+ }
+
+ public List<String> getConsumerIdList() {
+ return consumerIdList;
+ }
+
+ public void setConsumerIdList(List<String> consumerIdList) {
+ this.consumerIdList = consumerIdList;
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testMessageQueueIllegalArgument() {
+ currentCID = "0";
+ createConsumerIdList(2);
+ allocateMessageQueueAveragely.allocate("", currentCID, null, getConsumerIdList());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testConsumerIdIllegalArgument() {
+ currentCID = "0";
+ createMessageQueueList(6);
+ allocateMessageQueueAveragely.allocate("", currentCID, getMessageQueueList(), null);
+ }
+
+ @Test
+ public void testAllocate() {
+ AllocateMessageQueueAveragely allocateMessageQueueAveragely = new AllocateMessageQueueAveragely();
+ String topic = "topic_test";
+ String currentCID = "CID";
+ int queueSize = 19;
+ int consumerSize = 10;
+ List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
+ for (int i = 0; i < queueSize; i++) {
+ MessageQueue mq = new MessageQueue(topic, "brokerName", i);
+ mqAll.add(mq);
+ }
+
+ List<String> cidAll = new ArrayList<String>();
+ for (int j = 0; j < consumerSize; j++) {
+ cidAll.add("CID" + j);
+ }
+ System.out.println(mqAll.toString());
+ System.out.println(cidAll.toString());
+ for (int i = 0; i < consumerSize; i++) {
+ List<MessageQueue> rs = allocateMessageQueueAveragely.allocate("", currentCID + i, mqAll, cidAll);
+ System.out.println("rs[" + currentCID + i + "]:" + rs.toString());
+ }
+ }
+
+
+ @Test
+ public void testAllocateByCircle() {
+ AllocateMessageQueueAveragelyByCircle circle = new AllocateMessageQueueAveragelyByCircle();
+ String topic = "topic_test";
+ String currentCID = "CID";
+ int consumerSize = 3;
+ int queueSize = 13;
+ List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
+ for (int i = 0; i < queueSize; i++) {
+ MessageQueue mq = new MessageQueue(topic, "brokerName", i);
+ mqAll.add(mq);
+ }
+
+ List<String> cidAll = new ArrayList<String>();
+ for (int j = 0; j < consumerSize; j++) {
+ cidAll.add("CID" + j);
+ }
+ System.out.println(mqAll.toString());
+ System.out.println(cidAll.toString());
+ for (int i = 0; i < consumerSize; i++) {
+ List<MessageQueue> rs = circle.allocate("", currentCID + i, mqAll, cidAll);
+ System.out.println("rs[" + currentCID + i + "]:" + rs.toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index 72cc2b0..ec95a76 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -18,7 +18,7 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <groupId>com.alibaba.rocketmq</groupId>
+ <groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.0.0-SNAPSHOT</version>
</parent>