You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:43 UTC
[26/43] incubator-rocketmq git commit: Finish code dump. Reviewed by:
@yukon @vongosling @stevenschew @vintagewang @lollipop @zander
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/DefaultMQProducer.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/DefaultMQProducer.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/DefaultMQProducer.java
new file mode 100644
index 0000000..6f861d3
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/DefaultMQProducer.java
@@ -0,0 +1,380 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client.producer;
+
+import com.alibaba.rocketmq.client.ClientConfig;
+import com.alibaba.rocketmq.client.QueryResult;
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.message.*;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.exception.RemotingException;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class DefaultMQProducer extends ClientConfig implements MQProducer {
+ protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
+ private String producerGroup;
+ /**
+ * Just for testing or demo program
+ */
+ private String createTopicKey = MixAll.DEFAULT_TOPIC;
+ private volatile int defaultTopicQueueNums = 4;
+ private int sendMsgTimeout = 3000;
+ private int compressMsgBodyOverHowmuch = 1024 * 4;
+ private int retryTimesWhenSendFailed = 2;
+ private int retryTimesWhenSendAsyncFailed = 2;
+
+ private boolean retryAnotherBrokerWhenNotStoreOK = false;
+ private int maxMessageSize = 1024 * 1024 * 4; // 4M
+ public DefaultMQProducer() {
+ this(MixAll.DEFAULT_PRODUCER_GROUP, null);
+ }
+
+
+ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
+ this.producerGroup = producerGroup;
+ defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
+ }
+
+
+ public DefaultMQProducer(final String producerGroup) {
+ this(producerGroup, null);
+ }
+
+
+ public DefaultMQProducer(RPCHook rpcHook) {
+ this(MixAll.DEFAULT_PRODUCER_GROUP, rpcHook);
+ }
+
+
+ @Override
+ public void start() throws MQClientException {
+ this.defaultMQProducerImpl.start();
+ }
+
+ @Override
+ public void shutdown() {
+ this.defaultMQProducerImpl.shutdown();
+ }
+
+
+ @Override
+ public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException {
+ return this.defaultMQProducerImpl.fetchPublishMessageQueues(topic);
+ }
+
+
+ @Override
+ public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return this.defaultMQProducerImpl.send(msg);
+ }
+
+
+ @Override
+ public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return this.defaultMQProducerImpl.send(msg, timeout);
+ }
+
+
+ @Override
+ public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
+ this.defaultMQProducerImpl.send(msg, sendCallback);
+ }
+
+
+ @Override
+ public void send(Message msg, SendCallback sendCallback, long timeout)
+ throws MQClientException, RemotingException, InterruptedException {
+ this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
+ }
+
+
+ @Override
+ public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
+ this.defaultMQProducerImpl.sendOneway(msg);
+ }
+
+
+ @Override
+ public SendResult send(Message msg, MessageQueue mq)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return this.defaultMQProducerImpl.send(msg, mq);
+ }
+
+
+ @Override
+ public SendResult send(Message msg, MessageQueue mq, long timeout)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return this.defaultMQProducerImpl.send(msg, mq, timeout);
+ }
+
+
+ @Override
+ public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
+ throws MQClientException, RemotingException, InterruptedException {
+ this.defaultMQProducerImpl.send(msg, mq, sendCallback);
+ }
+
+
+ @Override
+ public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
+ throws MQClientException, RemotingException, InterruptedException {
+ this.defaultMQProducerImpl.send(msg, mq, sendCallback, timeout);
+ }
+
+
+ @Override
+ public void sendOneway(Message msg, MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
+ this.defaultMQProducerImpl.sendOneway(msg, mq);
+ }
+
+
+ @Override
+ public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return this.defaultMQProducerImpl.send(msg, selector, arg);
+ }
+
+
+ @Override
+ public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return this.defaultMQProducerImpl.send(msg, selector, arg, timeout);
+ }
+
+
+ @Override
+ public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
+ throws MQClientException, RemotingException, InterruptedException {
+ this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback);
+ }
+
+
+ @Override
+ public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)
+ throws MQClientException, RemotingException, InterruptedException {
+ this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout);
+ }
+
+
+ @Override
+ public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
+ throws MQClientException, RemotingException, InterruptedException {
+ this.defaultMQProducerImpl.sendOneway(msg, selector, arg);
+ }
+
+
+ @Override
+ public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, final Object arg)
+ throws MQClientException {
+ throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class");
+ }
+
+
+ @Override
+ public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
+ createTopic(key, newTopic, queueNum, 0);
+ }
+
+
+ @Override
+ public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
+ this.defaultMQProducerImpl.createTopic(key, newTopic, queueNum, topicSysFlag);
+ }
+
+
+ @Override
+ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
+ return this.defaultMQProducerImpl.searchOffset(mq, timestamp);
+ }
+
+
+ @Override
+ public long maxOffset(MessageQueue mq) throws MQClientException {
+ return this.defaultMQProducerImpl.maxOffset(mq);
+ }
+
+
+ @Override
+ public long minOffset(MessageQueue mq) throws MQClientException {
+ return this.defaultMQProducerImpl.minOffset(mq);
+ }
+
+
+ @Override
+ public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
+ return this.defaultMQProducerImpl.earliestMsgStoreTime(mq);
+ }
+
+
+ @Override
+ public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ return this.defaultMQProducerImpl.viewMessage(offsetMsgId);
+ }
+
+
+ @Override
+ public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
+ throws MQClientException, InterruptedException {
+ return this.defaultMQProducerImpl.queryMessage(topic, key, maxNum, begin, end);
+ }
+
+
+ @Override
+ public MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ try {
+ MessageId oldMsgId = MessageDecoder.decodeMessageId(msgId);
+ return this.viewMessage(msgId);
+ } catch (Exception e) {
+ }
+ return this.defaultMQProducerImpl.queryMessageByUniqKey(topic, msgId);
+ }
+
+ public String getProducerGroup() {
+ return producerGroup;
+ }
+
+
+ public void setProducerGroup(String producerGroup) {
+ this.producerGroup = producerGroup;
+ }
+
+
+ public String getCreateTopicKey() {
+ return createTopicKey;
+ }
+
+
+ public void setCreateTopicKey(String createTopicKey) {
+ this.createTopicKey = createTopicKey;
+ }
+
+
+ public int getSendMsgTimeout() {
+ return sendMsgTimeout;
+ }
+
+
+ public void setSendMsgTimeout(int sendMsgTimeout) {
+ this.sendMsgTimeout = sendMsgTimeout;
+ }
+
+
+ public int getCompressMsgBodyOverHowmuch() {
+ return compressMsgBodyOverHowmuch;
+ }
+
+
+ public void setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch) {
+ this.compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch;
+ }
+
+
+ public DefaultMQProducerImpl getDefaultMQProducerImpl() {
+ return defaultMQProducerImpl;
+ }
+
+
+ public boolean isRetryAnotherBrokerWhenNotStoreOK() {
+ return retryAnotherBrokerWhenNotStoreOK;
+ }
+
+
+ public void setRetryAnotherBrokerWhenNotStoreOK(boolean retryAnotherBrokerWhenNotStoreOK) {
+ this.retryAnotherBrokerWhenNotStoreOK = retryAnotherBrokerWhenNotStoreOK;
+ }
+
+
+ public int getMaxMessageSize() {
+ return maxMessageSize;
+ }
+
+
+ public void setMaxMessageSize(int maxMessageSize) {
+ this.maxMessageSize = maxMessageSize;
+ }
+
+
+ public int getDefaultTopicQueueNums() {
+ return defaultTopicQueueNums;
+ }
+
+
+ public void setDefaultTopicQueueNums(int defaultTopicQueueNums) {
+ this.defaultTopicQueueNums = defaultTopicQueueNums;
+ }
+
+
+ public int getRetryTimesWhenSendFailed() {
+ return retryTimesWhenSendFailed;
+ }
+
+
+ public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
+ this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
+ }
+
+
+ public boolean isSendMessageWithVIPChannel() {
+ return isVipChannelEnabled();
+ }
+
+
+ public void setSendMessageWithVIPChannel(final boolean sendMessageWithVIPChannel) {
+ this.setVipChannelEnabled(sendMessageWithVIPChannel);
+ }
+
+
+ public long[] getNotAvailableDuration() {
+ return this.defaultMQProducerImpl.getNotAvailableDuration();
+ }
+
+ public void setNotAvailableDuration(final long[] notAvailableDuration) {
+ this.defaultMQProducerImpl.setNotAvailableDuration(notAvailableDuration);
+ }
+
+ public long[] getLatencyMax() {
+ return this.defaultMQProducerImpl.getLatencyMax();
+ }
+
+ public void setLatencyMax(final long[] latencyMax) {
+ this.defaultMQProducerImpl.setLatencyMax(latencyMax);
+ }
+
+ public boolean isSendLatencyFaultEnable() {
+ return this.defaultMQProducerImpl.isSendLatencyFaultEnable();
+ }
+
+ public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
+ this.defaultMQProducerImpl.setSendLatencyFaultEnable(sendLatencyFaultEnable);
+ }
+
+ public int getRetryTimesWhenSendAsyncFailed() {
+ return retryTimesWhenSendAsyncFailed;
+ }
+
+ public void setRetryTimesWhenSendAsyncFailed(final int retryTimesWhenSendAsyncFailed) {
+ this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionExecuter.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionExecuter.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionExecuter.java
new file mode 100644
index 0000000..af3723a
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionExecuter.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client.producer;
+
+import com.alibaba.rocketmq.common.message.Message;
+
+
+/**
+ * @author shijia.wxr
+ */
+public interface LocalTransactionExecuter {
+ public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionState.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionState.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionState.java
new file mode 100644
index 0000000..ee2a93a
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionState.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client.producer;
+
+/**
+ * @author shijia.wxr
+ */
+public enum LocalTransactionState {
+ COMMIT_MESSAGE,
+ ROLLBACK_MESSAGE,
+ UNKNOW,
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/MQProducer.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/MQProducer.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/MQProducer.java
new file mode 100644
index 0000000..e21bc00
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/MQProducer.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client.producer;
+
+import com.alibaba.rocketmq.client.MQAdmin;
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.remoting.exception.RemotingException;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public interface MQProducer extends MQAdmin {
+ void start() throws MQClientException;
+
+ void shutdown();
+
+
+ List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;
+
+
+ SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,
+ InterruptedException;
+
+
+ SendResult send(final Message msg, final long timeout) throws MQClientException,
+ RemotingException, MQBrokerException, InterruptedException;
+
+
+ void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
+ RemotingException, InterruptedException;
+
+
+ void send(final Message msg, final SendCallback sendCallback, final long timeout)
+ throws MQClientException, RemotingException, InterruptedException;
+
+
+ void sendOneway(final Message msg) throws MQClientException, RemotingException,
+ InterruptedException;
+
+
+ SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,
+ RemotingException, MQBrokerException, InterruptedException;
+
+
+ SendResult send(final Message msg, final MessageQueue mq, final long timeout)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
+
+
+ void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback)
+ throws MQClientException, RemotingException, InterruptedException;
+
+
+ void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout)
+ throws MQClientException, RemotingException, InterruptedException;
+
+
+ void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException,
+ RemotingException, InterruptedException;
+
+
+ SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
+
+
+ SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg,
+ final long timeout) throws MQClientException, RemotingException, MQBrokerException,
+ InterruptedException;
+
+
+ void send(final Message msg, final MessageQueueSelector selector, final Object arg,
+ final SendCallback sendCallback) throws MQClientException, RemotingException,
+ InterruptedException;
+
+
+ void send(final Message msg, final MessageQueueSelector selector, final Object arg,
+ final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
+ InterruptedException;
+
+
+ void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg)
+ throws MQClientException, RemotingException, InterruptedException;
+
+
+ TransactionSendResult sendMessageInTransaction(final Message msg,
+ final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/MessageQueueSelector.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/MessageQueueSelector.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/MessageQueueSelector.java
new file mode 100644
index 0000000..924c145
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/MessageQueueSelector.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client.producer;
+
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public interface MessageQueueSelector {
+ MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendCallback.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendCallback.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendCallback.java
new file mode 100644
index 0000000..35d1a72
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendCallback.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client.producer;
+
+/**
+ * @author shijia.wxr
+ */
+public interface SendCallback {
+ public void onSuccess(final SendResult sendResult);
+
+
+ public void onException(final Throwable e);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendResult.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendResult.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendResult.java
new file mode 100644
index 0000000..183accf
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendResult.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client.producer;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SendResult {
+ private SendStatus sendStatus;
+ private String msgId;
+ private MessageQueue messageQueue;
+ private long queueOffset;
+ private String transactionId;
+ private String offsetMsgId;
+ private String regionId;
+ private boolean traceOn = true;
+
+ public SendResult() {
+ }
+
+ public SendResult(SendStatus sendStatus, String msgId, String offsetMsgId, MessageQueue messageQueue, long queueOffset) {
+ this.sendStatus = sendStatus;
+ this.msgId = msgId;
+ this.offsetMsgId = offsetMsgId;
+ this.messageQueue = messageQueue;
+ this.queueOffset = queueOffset;
+ }
+
+ public SendResult(final SendStatus sendStatus, final String msgId, final MessageQueue messageQueue, final long queueOffset, final String transactionId, final String offsetMsgId, final String regionId) {
+ this.sendStatus = sendStatus;
+ this.msgId = msgId;
+ this.messageQueue = messageQueue;
+ this.queueOffset = queueOffset;
+ this.transactionId = transactionId;
+ this.offsetMsgId = offsetMsgId;
+ this.regionId = regionId;
+ }
+
+ public boolean isTraceOn() {
+ return traceOn;
+ }
+
+ public void setTraceOn(final boolean traceOn) {
+ this.traceOn = traceOn;
+ }
+
+ public String getRegionId() {
+ return regionId;
+ }
+
+ public void setRegionId(final String regionId) {
+ this.regionId = regionId;
+ }
+
+ public static String encoderSendResultToJson(final Object obj) {
+ return JSON.toJSONString(obj);
+ }
+
+ public static SendResult decoderSendResultFromJson(String json) {
+ return JSON.parseObject(json, SendResult.class);
+ }
+
+ public String getMsgId() {
+ return msgId;
+ }
+
+
+ public void setMsgId(String msgId) {
+ this.msgId = msgId;
+ }
+
+
+ public SendStatus getSendStatus() {
+ return sendStatus;
+ }
+
+
+ public void setSendStatus(SendStatus sendStatus) {
+ this.sendStatus = sendStatus;
+ }
+
+
+ public MessageQueue getMessageQueue() {
+ return messageQueue;
+ }
+
+
+ public void setMessageQueue(MessageQueue messageQueue) {
+ this.messageQueue = messageQueue;
+ }
+
+
+ public long getQueueOffset() {
+ return queueOffset;
+ }
+
+
+ public void setQueueOffset(long queueOffset) {
+ this.queueOffset = queueOffset;
+ }
+
+
+ public String getTransactionId() {
+ return transactionId;
+ }
+
+
+ public void setTransactionId(String transactionId) {
+ this.transactionId = transactionId;
+ }
+
+ public String getOffsetMsgId() {
+ return offsetMsgId;
+ }
+
+ public void setOffsetMsgId(String offsetMsgId) {
+ this.offsetMsgId = offsetMsgId;
+ }
+
+ @Override
+ public String toString() {
+ return "SendResult [sendStatus=" + sendStatus + ", msgId=" + msgId + ", offsetMsgId=" + offsetMsgId + ", messageQueue=" + messageQueue
+ + ", queueOffset=" + queueOffset + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendStatus.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendStatus.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendStatus.java
new file mode 100644
index 0000000..3bc572f
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendStatus.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client.producer;
+
+/**
+ * @author shijia.wxr
+ */
+public enum SendStatus {
+ SEND_OK,
+ FLUSH_DISK_TIMEOUT,
+ FLUSH_SLAVE_TIMEOUT,
+ SLAVE_NOT_AVAILABLE,
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionCheckListener.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionCheckListener.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionCheckListener.java
new file mode 100644
index 0000000..8440537
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionCheckListener.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client.producer;
+
+import com.alibaba.rocketmq.common.message.MessageExt;
+
+
+/**
+ * @author shijia.wxr
+ */
+public interface TransactionCheckListener {
+ LocalTransactionState checkLocalTransactionState(final MessageExt msg);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionMQProducer.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionMQProducer.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionMQProducer.java
new file mode 100644
index 0000000..08dd4ab
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionMQProducer.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client.producer;
+
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.remoting.RPCHook;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class TransactionMQProducer extends DefaultMQProducer {
+ private TransactionCheckListener transactionCheckListener;
+ private int checkThreadPoolMinSize = 1;
+ private int checkThreadPoolMaxSize = 1;
+ private int checkRequestHoldMax = 2000;
+
+
+ public TransactionMQProducer() {
+ }
+
+
+ public TransactionMQProducer(final String producerGroup) {
+ super(producerGroup);
+ }
+
+ public TransactionMQProducer(final String producerGroup, RPCHook rpcHook) {
+ super(producerGroup, rpcHook);
+ }
+
+ @Override
+ public void start() throws MQClientException {
+ this.defaultMQProducerImpl.initTransactionEnv();
+ super.start();
+ }
+
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ this.defaultMQProducerImpl.destroyTransactionEnv();
+ }
+
+
+ @Override
+ public TransactionSendResult sendMessageInTransaction(final Message msg,
+ final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException {
+ if (null == this.transactionCheckListener) {
+ throw new MQClientException("localTransactionBranchCheckListener is null", null);
+ }
+
+ return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg);
+ }
+
+
+ public TransactionCheckListener getTransactionCheckListener() {
+ return transactionCheckListener;
+ }
+
+
+ public void setTransactionCheckListener(TransactionCheckListener transactionCheckListener) {
+ this.transactionCheckListener = transactionCheckListener;
+ }
+
+
+ public int getCheckThreadPoolMinSize() {
+ return checkThreadPoolMinSize;
+ }
+
+
+ public void setCheckThreadPoolMinSize(int checkThreadPoolMinSize) {
+ this.checkThreadPoolMinSize = checkThreadPoolMinSize;
+ }
+
+
+ public int getCheckThreadPoolMaxSize() {
+ return checkThreadPoolMaxSize;
+ }
+
+
+ public void setCheckThreadPoolMaxSize(int checkThreadPoolMaxSize) {
+ this.checkThreadPoolMaxSize = checkThreadPoolMaxSize;
+ }
+
+
+ public int getCheckRequestHoldMax() {
+ return checkRequestHoldMax;
+ }
+
+
+ public void setCheckRequestHoldMax(int checkRequestHoldMax) {
+ this.checkRequestHoldMax = checkRequestHoldMax;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionSendResult.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionSendResult.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionSendResult.java
new file mode 100644
index 0000000..e7dcd0e
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionSendResult.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client.producer;
+
+/**
+ * @author shijia.wxr
+ */
+public class TransactionSendResult extends SendResult {
+ private LocalTransactionState localTransactionState;
+
+
+ public TransactionSendResult() {
+ }
+
+
+ public LocalTransactionState getLocalTransactionState() {
+ return localTransactionState;
+ }
+
+
+ public void setLocalTransactionState(LocalTransactionState localTransactionState) {
+ this.localTransactionState = localTransactionState;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByHash.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByHash.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByHash.java
new file mode 100644
index 0000000..648356b
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByHash.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client.producer.selector;
+
+import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SelectMessageQueueByHash implements MessageQueueSelector {
+
+ @Override
+ public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+ int value = arg.hashCode();
+ if (value < 0) {
+ value = Math.abs(value);
+ }
+
+ value = value % mqs.size();
+ return mqs.get(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java
new file mode 100644
index 0000000..a213391
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client.producer.selector;
+
+import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
+ private Set<String> consumeridcs;
+
+
+ @Override
+ public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+ return null;
+ }
+
+
+ public Set<String> getConsumeridcs() {
+ return consumeridcs;
+ }
+
+
+ public void setConsumeridcs(Set<String> consumeridcs) {
+ this.consumeridcs = consumeridcs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java
new file mode 100644
index 0000000..3f381e4
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client.producer.selector;
+
+import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+
+import java.util.List;
+import java.util.Random;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SelectMessageQueueByRandoom implements MessageQueueSelector {
+ private Random random = new Random(System.currentTimeMillis());
+
+
+ @Override
+ public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+ int value = random.nextInt();
+ if (value < 0) {
+ value = Math.abs(value);
+ }
+
+ value = value % mqs.size();
+ return mqs.get(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/stat/ConsumerStatsManager.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/stat/ConsumerStatsManager.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/stat/ConsumerStatsManager.java
new file mode 100644
index 0000000..e07e233
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/stat/ConsumerStatsManager.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.client.stat;
+
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.protocol.body.ConsumeStatus;
+import com.alibaba.rocketmq.common.stats.StatsItemSet;
+import com.alibaba.rocketmq.common.stats.StatsSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+
+public class ConsumerStatsManager {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.CLIENT_LOGGER_NAME);
+
+ private static final String TOPIC_AND_GROUP_CONSUME_OK_TPS = "CONSUME_OK_TPS";
+ private static final String TOPIC_AND_GROUP_CONSUME_FAILED_TPS = "CONSUME_FAILED_TPS";
+ private static final String TOPIC_AND_GROUP_CONSUME_RT = "CONSUME_RT";
+ private static final String TOPIC_AND_GROUP_PULL_TPS = "PULL_TPS";
+ private static final String TOPIC_AND_GROUP_PULL_RT = "PULL_RT";
+
+ private final StatsItemSet topicAndGroupConsumeOKTPS;
+ private final StatsItemSet topicAndGroupConsumeRT;
+ private final StatsItemSet topicAndGroupConsumeFailedTPS;
+ private final StatsItemSet topicAndGroupPullTPS;
+ private final StatsItemSet topicAndGroupPullRT;
+
+
+ public ConsumerStatsManager(final ScheduledExecutorService scheduledExecutorService) {
+ this.topicAndGroupConsumeOKTPS =
+ new StatsItemSet(TOPIC_AND_GROUP_CONSUME_OK_TPS, scheduledExecutorService, log);
+
+ this.topicAndGroupConsumeRT =
+ new StatsItemSet(TOPIC_AND_GROUP_CONSUME_RT, scheduledExecutorService, log);
+
+ this.topicAndGroupConsumeFailedTPS =
+ new StatsItemSet(TOPIC_AND_GROUP_CONSUME_FAILED_TPS, scheduledExecutorService, log);
+
+ this.topicAndGroupPullTPS = new StatsItemSet(TOPIC_AND_GROUP_PULL_TPS, scheduledExecutorService, log);
+
+ this.topicAndGroupPullRT = new StatsItemSet(TOPIC_AND_GROUP_PULL_RT, scheduledExecutorService, log);
+ }
+
+
+ public void start() {
+ }
+
+
+ public void shutdown() {
+ }
+
+
+ public void incPullRT(final String group, final String topic, final long rt) {
+ this.topicAndGroupPullRT.addValue(topic + "@" + group, (int) rt, 1);
+ }
+
+
+ public void incPullTPS(final String group, final String topic, final long msgs) {
+ this.topicAndGroupPullTPS.addValue(topic + "@" + group, (int) msgs, 1);
+ }
+
+
+ public void incConsumeRT(final String group, final String topic, final long rt) {
+ this.topicAndGroupConsumeRT.addValue(topic + "@" + group, (int) rt, 1);
+ }
+
+
+ public void incConsumeOKTPS(final String group, final String topic, final long msgs) {
+ this.topicAndGroupConsumeOKTPS.addValue(topic + "@" + group, (int) msgs, 1);
+ }
+
+
+ public void incConsumeFailedTPS(final String group, final String topic, final long msgs) {
+ this.topicAndGroupConsumeFailedTPS.addValue(topic + "@" + group, (int) msgs, 1);
+ }
+
+ public ConsumeStatus consumeStatus(final String group, final String topic) {
+ ConsumeStatus cs = new ConsumeStatus();
+ {
+ StatsSnapshot ss = this.getPullRT(group, topic);
+ if (ss != null) {
+ cs.setPullRT(ss.getAvgpt());
+ }
+ }
+
+ {
+ StatsSnapshot ss = this.getPullTPS(group, topic);
+ if (ss != null) {
+ cs.setPullTPS(ss.getTps());
+ }
+ }
+
+ {
+ StatsSnapshot ss = this.getConsumeRT(group, topic);
+ if (ss != null) {
+ cs.setConsumeRT(ss.getAvgpt());
+ }
+ }
+
+ {
+ StatsSnapshot ss = this.getConsumeOKTPS(group, topic);
+ if (ss != null) {
+ cs.setConsumeOKTPS(ss.getTps());
+ }
+ }
+
+ {
+ StatsSnapshot ss = this.getConsumeFailedTPS(group, topic);
+ if (ss != null) {
+ cs.setConsumeFailedTPS(ss.getTps());
+ }
+ }
+
+ {
+ StatsSnapshot ss = this.topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" + group);
+ if (ss != null) {
+ cs.setConsumeFailedMsgs(ss.getSum());
+ }
+ }
+
+ return cs;
+ }
+
+ private StatsSnapshot getPullRT(final String group, final String topic) {
+ return this.topicAndGroupPullRT.getStatsDataInMinute(topic + "@" + group);
+ }
+
+ private StatsSnapshot getPullTPS(final String group, final String topic) {
+ return this.topicAndGroupPullTPS.getStatsDataInMinute(topic + "@" + group);
+ }
+
+ private StatsSnapshot getConsumeRT(final String group, final String topic) {
+ StatsSnapshot statsData = this.topicAndGroupConsumeRT.getStatsDataInMinute(topic + "@" + group);
+ if (0 == statsData.getSum()) {
+ statsData = this.topicAndGroupConsumeRT.getStatsDataInHour(topic + "@" + group);
+ }
+
+ return statsData;
+ }
+
+ private StatsSnapshot getConsumeOKTPS(final String group, final String topic) {
+ return this.topicAndGroupConsumeOKTPS.getStatsDataInMinute(topic + "@" + group);
+ }
+
+ private StatsSnapshot getConsumeFailedTPS(final String group, final String topic) {
+ return this.topicAndGroupConsumeFailedTPS.getStatsDataInMinute(topic + "@" + group);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/resources/log4j_rocketmq_client.xml
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/resources/log4j_rocketmq_client.xml b/rocketmq-client/src/main/resources/log4j_rocketmq_client.xml
new file mode 100644
index 0000000..bf4b885
--- /dev/null
+++ b/rocketmq-client/src/main/resources/log4j_rocketmq_client.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain producerGroup copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+ <appender name="STDOUT-APPENDER" class="org.apache.log4j.ConsoleAppender">
+ <param name="encoding" value="UTF-8"/>
+ <param name="target" value="System.out"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%-5p %defaultTopic{2} , %m%n"/>
+ </layout>
+ </appender>
+
+ <appender name="RocketmqClientAppender" class="org.apache.log4j.RollingFileAppender">
+ <param name="file" value="${client.logRoot}/rocketmq_client.log"/>
+ <param name="append" value="true"/>
+ <param name="encoding" value="UTF-8"/>
+ <param name="maxFileSize" value="1073741824"/>
+ <param name="maxBackupIndex" value="${client.logFileMaxIndex}"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%defaultTopicQueueNums{yyy-MM-dd HH\:mm\:ss,SSS} %p %defaultTopic{1}(%L) - %m%n"/>
+ </layout>
+ </appender>
+
+ <logger name="RocketmqClient" additivity="false">
+ <level value="${client.logLevel}"/>
+ <appender-ref ref="RocketmqClientAppender"/>
+ </logger>
+
+ <logger name="RocketmqCommon" additivity="false">
+ <level value="${client.logLevel}"/>
+ <appender-ref ref="RocketmqClientAppender"/>
+ </logger>
+
+ <logger name="RocketmqRemoting" additivity="false">
+ <level value="${client.logLevel}"/>
+ <appender-ref ref="RocketmqClientAppender"/>
+ </logger>
+
+</log4j:configuration>
+
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/resources/logback_rocketmq_client.xml
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/resources/logback_rocketmq_client.xml b/rocketmq-client/src/main/resources/logback_rocketmq_client.xml
new file mode 100644
index 0000000..a845ee4
--- /dev/null
+++ b/rocketmq-client/src/main/resources/logback_rocketmq_client.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain producerGroup copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="RocketmqClientAppender"
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <file>${client.logRoot}/rocketmq_client.log</file>
+ <append>true</append>
+ <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+ <fileNamePattern>${client.logRoot}/otherdays/rocketmq_client.%properties.log
+ </fileNamePattern>
+ <minIndex>1</minIndex>
+ <maxIndex>${client.logFileMaxIndex}</maxIndex>
+ </rollingPolicy>
+ <triggeringPolicy
+ class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ <maxFileSize>100MB</maxFileSize>
+ </triggeringPolicy>
+ <encoder>
+ <pattern>%defaultTopicQueueNums{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
+ <charset class="java.nio.charset.Charset">UTF-8</charset>
+ </encoder>
+ </appender>
+
+ <logger name="RocketmqCommon" additivity="false">
+ <level value="${client.logLevel}"/>
+ <appender-ref ref="RocketmqClientAppender"/>
+ </logger>
+
+
+ <logger name="RocketmqRemoting" additivity="false">
+ <level value="${client.logLevel}"/>
+ <appender-ref ref="RocketmqClientAppender"/>
+ </logger>
+
+
+ <logger name="RocketmqClient" additivity="false">
+ <level value="${client.logLevel}"/>
+ <appender-ref ref="RocketmqClientAppender"/>
+ </logger>
+
+
+</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/test/java/com/alibaba/rocketmq/client/ValidatorsTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/test/java/com/alibaba/rocketmq/client/ValidatorsTest.java b/rocketmq-client/src/test/java/com/alibaba/rocketmq/client/ValidatorsTest.java
new file mode 100644
index 0000000..6dadafb
--- /dev/null
+++ b/rocketmq-client/src/test/java/com/alibaba/rocketmq/client/ValidatorsTest.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.client;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class ValidatorsTest {
+
+ @Test
+ public void topicValidatorTest() {
+ try {
+ Validators.checkTopic("Hello");
+ Validators.checkTopic("%RETRY%Hello");
+ Validators.checkTopic("_%RETRY%Hello");
+ Validators.checkTopic("-%RETRY%Hello");
+ Validators.checkTopic("223-%RETRY%Hello");
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.assertTrue(false);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/test/java/com/alibaba/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/test/java/com/alibaba/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java b/rocketmq-client/src/test/java/com/alibaba/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java
new file mode 100644
index 0000000..5ef75ed
--- /dev/null
+++ b/rocketmq-client/src/test/java/com/alibaba/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * @author yubao.fyb@taoboa.com
+ * @version $id$
+ */
+package com.alibaba.rocketmq.client.consumer.loadbalance;
+
+import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import com.alibaba.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
+import com.alibaba.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * @author yubao.fyb@alibaba-inc.com created on 2013-07-03 16:24
+ */
+public class AllocateMessageQueueAveragelyTest {
+ private AllocateMessageQueueStrategy allocateMessageQueueAveragely;
+ private String currentCID;
+ private String topic;
+ private List<MessageQueue> messageQueueList;
+ private List<String> consumerIdList;
+
+ @Before
+ public void init() {
+ allocateMessageQueueAveragely = new AllocateMessageQueueAveragely();
+ topic = "topic_test";
+ }
+
+ @Test
+ public void testConsumer1() {
+ currentCID = "0";
+ createConsumerIdList(1);
+ createMessageQueueList(5);
+ List<MessageQueue> result =
+ allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+ printMessageQueue(result, "testConsumer1");
+ Assert.assertEquals(result.size(), 5);
+ Assert.assertEquals(result.containsAll(getMessageQueueList()), true);
+ }
+
+ public void createConsumerIdList(int size) {
+ consumerIdList = new ArrayList<String>(size);
+ for (int i = 0; i < size; i++) {
+ consumerIdList.add(String.valueOf(i));
+ }
+ }
+
+ public void createMessageQueueList(int size) {
+ messageQueueList = new ArrayList<MessageQueue>(size);
+ for (int i = 0; i < size; i++) {
+ MessageQueue mq = new MessageQueue(topic, "brokerName", i);
+ messageQueueList.add(mq);
+ }
+ }
+
+ public void printMessageQueue(List<MessageQueue> messageQueueList, String name) {
+ if (messageQueueList == null || messageQueueList.size() < 1)
+ return;
+ System.out.println(name + ".......................................start");
+ for (MessageQueue messageQueue : messageQueueList) {
+ System.out.println(messageQueue);
+ }
+ System.out.println(name + ".......................................end");
+ }
+
+ public List<MessageQueue> getMessageQueueList() {
+ return messageQueueList;
+ }
+
+ public void setMessageQueueList(List<MessageQueue> messageQueueList) {
+ this.messageQueueList = messageQueueList;
+ }
+
+ @Test
+ public void testConsumer2() {
+ currentCID = "1";
+ createConsumerIdList(2);
+ createMessageQueueList(5);
+ List<MessageQueue> result =
+ allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+ printMessageQueue(result, "testConsumer2");
+ Assert.assertEquals(result.size(), 3);
+ Assert.assertEquals(result.containsAll(getMessageQueueList().subList(2, 5)), true);
+
+ }
+
+ @Test
+ public void testConsumer3CurrentCID0() {
+ currentCID = "0";
+ createConsumerIdList(3);
+ createMessageQueueList(5);
+ List<MessageQueue> result =
+ allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+ printMessageQueue(result, "testConsumer3CurrentCID0");
+ Assert.assertEquals(result.size(), 1);
+ Assert.assertEquals(result.containsAll(getMessageQueueList().subList(0, 1)), true);
+ }
+
+ @Test
+ public void testConsumer3CurrentCID1() {
+ currentCID = "1";
+ createConsumerIdList(3);
+ createMessageQueueList(5);
+ List<MessageQueue> result =
+ allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+ printMessageQueue(result, "testConsumer3CurrentCID1");
+ Assert.assertEquals(result.size(), 1);
+ Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true);
+ }
+
+ @Test
+ public void testConsumer3CurrentCID2() {
+ currentCID = "2";
+ createConsumerIdList(3);
+ createMessageQueueList(5);
+ List<MessageQueue> result =
+ allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+ printMessageQueue(result, "testConsumer3CurrentCID2");
+ Assert.assertEquals(result.size(), 3);
+ Assert.assertEquals(result.containsAll(getMessageQueueList().subList(2, 5)), true);
+ }
+
+ @Test
+ public void testConsumer4() {
+ currentCID = "1";
+ createConsumerIdList(4);
+ createMessageQueueList(5);
+ List<MessageQueue> result =
+ allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+ printMessageQueue(result, "testConsumer4");
+ Assert.assertEquals(result.size(), 1);
+ Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true);
+ }
+
+ @Test
+ public void testConsumer5() {
+ currentCID = "1";
+ createConsumerIdList(5);
+ createMessageQueueList(5);
+ List<MessageQueue> result =
+ allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+ printMessageQueue(result, "testConsumer5");
+ Assert.assertEquals(result.size(), 1);
+ Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true);
+ }
+
+ @Test
+ public void testConsumer6() {
+ currentCID = "1";
+ createConsumerIdList(2);
+ createMessageQueueList(6);
+ List<MessageQueue> result =
+ allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+ printMessageQueue(result, "testConsumer");
+ Assert.assertEquals(result.size(), 3);
+ Assert.assertEquals(result.containsAll(getMessageQueueList().subList(3, 6)), true);
+ }
+
+ @Test
+ public void testCurrentCIDNotExists() {
+ currentCID = String.valueOf(Integer.MAX_VALUE);
+ createConsumerIdList(2);
+ createMessageQueueList(6);
+ List<MessageQueue> result =
+ allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+ printMessageQueue(result, "testCurrentCIDNotExists");
+ Assert.assertEquals(result.size(), 0);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testCurrentCIDIllegalArgument() {
+ createConsumerIdList(2);
+ createMessageQueueList(6);
+ allocateMessageQueueAveragely.allocate("", "", getMessageQueueList(), getConsumerIdList());
+ }
+
+ public List<String> getConsumerIdList() {
+ return consumerIdList;
+ }
+
+ public void setConsumerIdList(List<String> consumerIdList) {
+ this.consumerIdList = consumerIdList;
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testMessageQueueIllegalArgument() {
+ currentCID = "0";
+ createConsumerIdList(2);
+ allocateMessageQueueAveragely.allocate("", currentCID, null, getConsumerIdList());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testConsumerIdIllegalArgument() {
+ currentCID = "0";
+ createMessageQueueList(6);
+ allocateMessageQueueAveragely.allocate("", currentCID, getMessageQueueList(), null);
+ }
+
+ @Test
+ public void testAllocate() {
+ AllocateMessageQueueAveragely allocateMessageQueueAveragely = new AllocateMessageQueueAveragely();
+ String topic = "topic_test";
+ String currentCID = "CID";
+ int queueSize = 19;
+ int consumerSize = 10;
+ List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
+ for (int i = 0; i < queueSize; i++) {
+ MessageQueue mq = new MessageQueue(topic, "brokerName", i);
+ mqAll.add(mq);
+ }
+
+ List<String> cidAll = new ArrayList<String>();
+ for (int j = 0; j < consumerSize; j++) {
+ cidAll.add("CID" + j);
+ }
+ System.out.println(mqAll.toString());
+ System.out.println(cidAll.toString());
+ for (int i = 0; i < consumerSize; i++) {
+ List<MessageQueue> rs = allocateMessageQueueAveragely.allocate("", currentCID + i, mqAll, cidAll);
+ System.out.println("rs[" + currentCID + i + "]:" + rs.toString());
+ }
+ }
+
+
+ @Test
+ public void testAllocateByCircle() {
+ AllocateMessageQueueAveragelyByCircle circle = new AllocateMessageQueueAveragelyByCircle();
+ String topic = "topic_test";
+ String currentCID = "CID";
+ int consumerSize = 3;
+ int queueSize = 13;
+ List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
+ for (int i = 0; i < queueSize; i++) {
+ MessageQueue mq = new MessageQueue(topic, "brokerName", i);
+ mqAll.add(mq);
+ }
+
+ List<String> cidAll = new ArrayList<String>();
+ for (int j = 0; j < consumerSize; j++) {
+ cidAll.add("CID" + j);
+ }
+ System.out.println(mqAll.toString());
+ System.out.println(cidAll.toString());
+ for (int i = 0; i < consumerSize; i++) {
+ List<MessageQueue> rs = circle.allocate("", currentCID + i, mqAll, cidAll);
+ System.out.println("rs[" + currentCID + i + "]:" + rs.toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/pom.xml
----------------------------------------------------------------------
diff --git a/rocketmq-common/pom.xml b/rocketmq-common/pom.xml
new file mode 100644
index 0000000..72cc2b0
--- /dev/null
+++ b/rocketmq-common/pom.xml
@@ -0,0 +1,43 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain producerGroup copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>com.alibaba.rocketmq</groupId>
+ <artifactId>rocketmq-all</artifactId>
+ <version>4.0.0-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <packaging>jar</packaging>
+ <artifactId>rocketmq-common</artifactId>
+ <name>rocketmq-common ${project.version}</name>
+
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rocketmq-remoting</artifactId>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java
new file mode 100644
index 0000000..6eae0a7
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java
@@ -0,0 +1,540 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common;
+
+import com.alibaba.rocketmq.common.annotation.ImportantField;
+import com.alibaba.rocketmq.common.constant.PermName;
+import com.alibaba.rocketmq.remoting.common.RemotingUtil;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class BrokerConfig {
+ private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
+ @ImportantField
+ private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
+ @ImportantField
+ private String brokerIP1 = RemotingUtil.getLocalAddress();
+ private String brokerIP2 = RemotingUtil.getLocalAddress();
+ @ImportantField
+ private String brokerName = localHostName();
+ @ImportantField
+ private String brokerClusterName = "DefaultCluster";
+ @ImportantField
+ private long brokerId = MixAll.MASTER_ID;
+ private int brokerPermission = PermName.PERM_READ | PermName.PERM_WRITE;
+ private int defaultTopicQueueNums = 8;
+ @ImportantField
+ private boolean autoCreateTopicEnable = true;
+
+ private boolean clusterTopicEnable = true;
+
+ private boolean brokerTopicEnable = true;
+ @ImportantField
+ private boolean autoCreateSubscriptionGroup = true;
+ private String messageStorePlugIn = "";
+
+ private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4;
+ private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
+ private int adminBrokerThreadPoolNums = 16;
+ private int clientManageThreadPoolNums = 32;
+ private int consumerManageThreadPoolNums = 32;
+
+ private int flushConsumerOffsetInterval = 1000 * 5;
+
+ private int flushConsumerOffsetHistoryInterval = 1000 * 60;
+
+ @ImportantField
+ private boolean rejectTransactionMessage = false;
+ @ImportantField
+ private boolean fetchNamesrvAddrByAddressServer = false;
+ private int sendThreadPoolQueueCapacity = 10000;
+ private int pullThreadPoolQueueCapacity = 100000;
+ private int clientManagerThreadPoolQueueCapacity = 1000000;
+ private int consumerManagerThreadPoolQueueCapacity = 1000000;
+
+ private int filterServerNums = 0;
+
+ private boolean longPollingEnable = true;
+
+ private long shortPollingTimeMills = 1000;
+
+ private boolean notifyConsumerIdsChangedEnable = true;
+
+ private boolean highSpeedMode = false;
+
+ private boolean commercialEnable = true;
+ private int commercialTimerCount = 1;
+ private int commercialTransCount = 1;
+ private int commercialBigCount = 1;
+
+ private boolean transferMsgByHeap = true;
+ private int maxDelayTime = 40;
+
+
+ private String regionId = MixAll.DEFAULT_TRACE_REGION_ID;
+ private int registerBrokerTimeoutMills = 6000;
+
+ private boolean slaveReadEnable = false;
+
+ private boolean disableConsumeIfConsumerReadSlowly = false;
+ private long consumerFallbehindThreshold = 1024 * 1024 * 1024 * 16;
+
+ private long waitTimeMillsInSendQueue = 200;
+
+ private long startAcceptSendRequestTimeStamp = 0L;
+
+ private boolean traceOn = true;
+
+ public boolean isTraceOn() {
+ return traceOn;
+ }
+
+ public void setTraceOn(final boolean traceOn) {
+ this.traceOn = traceOn;
+ }
+
+ public long getStartAcceptSendRequestTimeStamp() {
+ return startAcceptSendRequestTimeStamp;
+ }
+
+ public void setStartAcceptSendRequestTimeStamp(final long startAcceptSendRequestTimeStamp) {
+ this.startAcceptSendRequestTimeStamp = startAcceptSendRequestTimeStamp;
+ }
+
+ public long getWaitTimeMillsInSendQueue() {
+ return waitTimeMillsInSendQueue;
+ }
+
+ public void setWaitTimeMillsInSendQueue(final long waitTimeMillsInSendQueue) {
+ this.waitTimeMillsInSendQueue = waitTimeMillsInSendQueue;
+ }
+
+ public long getConsumerFallbehindThreshold() {
+ return consumerFallbehindThreshold;
+ }
+
+ public void setConsumerFallbehindThreshold(final long consumerFallbehindThreshold) {
+ this.consumerFallbehindThreshold = consumerFallbehindThreshold;
+ }
+
+ public boolean isDisableConsumeIfConsumerReadSlowly() {
+ return disableConsumeIfConsumerReadSlowly;
+ }
+
+ public void setDisableConsumeIfConsumerReadSlowly(final boolean disableConsumeIfConsumerReadSlowly) {
+ this.disableConsumeIfConsumerReadSlowly = disableConsumeIfConsumerReadSlowly;
+ }
+
+ public boolean isSlaveReadEnable() {
+ return slaveReadEnable;
+ }
+
+ public void setSlaveReadEnable(final boolean slaveReadEnable) {
+ this.slaveReadEnable = slaveReadEnable;
+ }
+
+ public static String localHostName() {
+ try {
+ return InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ }
+
+ return "DEFAULT_BROKER";
+ }
+
+ public int getRegisterBrokerTimeoutMills() {
+ return registerBrokerTimeoutMills;
+ }
+
+ public void setRegisterBrokerTimeoutMills(final int registerBrokerTimeoutMills) {
+ this.registerBrokerTimeoutMills = registerBrokerTimeoutMills;
+ }
+
+ public String getRegionId() {
+ return regionId;
+ }
+
+ public void setRegionId(final String regionId) {
+ this.regionId = regionId;
+ }
+
+ public boolean isTransferMsgByHeap() {
+ return transferMsgByHeap;
+ }
+
+ public void setTransferMsgByHeap(final boolean transferMsgByHeap) {
+ this.transferMsgByHeap = transferMsgByHeap;
+ }
+
+ public String getMessageStorePlugIn() {
+ return messageStorePlugIn;
+ }
+
+ public void setMessageStorePlugIn(String messageStorePlugIn) {
+ this.messageStorePlugIn = messageStorePlugIn;
+ }
+
+ public boolean isHighSpeedMode() {
+ return highSpeedMode;
+ }
+
+
+ public void setHighSpeedMode(final boolean highSpeedMode) {
+ this.highSpeedMode = highSpeedMode;
+ }
+
+
+ public String getRocketmqHome() {
+ return rocketmqHome;
+ }
+
+
+ public void setRocketmqHome(String rocketmqHome) {
+ this.rocketmqHome = rocketmqHome;
+ }
+
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+
+ public void setBrokerName(String brokerName) {
+ this.brokerName = brokerName;
+ }
+
+
+ public int getBrokerPermission() {
+ return brokerPermission;
+ }
+
+
+ public void setBrokerPermission(int brokerPermission) {
+ this.brokerPermission = brokerPermission;
+ }
+
+
+ public int getDefaultTopicQueueNums() {
+ return defaultTopicQueueNums;
+ }
+
+
+ public void setDefaultTopicQueueNums(int defaultTopicQueueNums) {
+ this.defaultTopicQueueNums = defaultTopicQueueNums;
+ }
+
+
+ public boolean isAutoCreateTopicEnable() {
+ return autoCreateTopicEnable;
+ }
+
+
+ public void setAutoCreateTopicEnable(boolean autoCreateTopic) {
+ this.autoCreateTopicEnable = autoCreateTopic;
+ }
+
+
+ public String getBrokerClusterName() {
+ return brokerClusterName;
+ }
+
+
+ public void setBrokerClusterName(String brokerClusterName) {
+ this.brokerClusterName = brokerClusterName;
+ }
+
+
+ public String getBrokerIP1() {
+ return brokerIP1;
+ }
+
+
+ public void setBrokerIP1(String brokerIP1) {
+ this.brokerIP1 = brokerIP1;
+ }
+
+
+ public String getBrokerIP2() {
+ return brokerIP2;
+ }
+
+
+ public void setBrokerIP2(String brokerIP2) {
+ this.brokerIP2 = brokerIP2;
+ }
+
+ public int getSendMessageThreadPoolNums() {
+ return sendMessageThreadPoolNums;
+ }
+
+ public void setSendMessageThreadPoolNums(int sendMessageThreadPoolNums) {
+ this.sendMessageThreadPoolNums = sendMessageThreadPoolNums;
+ }
+
+
+ public int getPullMessageThreadPoolNums() {
+ return pullMessageThreadPoolNums;
+ }
+
+
+ public void setPullMessageThreadPoolNums(int pullMessageThreadPoolNums) {
+ this.pullMessageThreadPoolNums = pullMessageThreadPoolNums;
+ }
+
+
+ public int getAdminBrokerThreadPoolNums() {
+ return adminBrokerThreadPoolNums;
+ }
+
+
+ public void setAdminBrokerThreadPoolNums(int adminBrokerThreadPoolNums) {
+ this.adminBrokerThreadPoolNums = adminBrokerThreadPoolNums;
+ }
+
+
+ public int getFlushConsumerOffsetInterval() {
+ return flushConsumerOffsetInterval;
+ }
+
+
+ public void setFlushConsumerOffsetInterval(int flushConsumerOffsetInterval) {
+ this.flushConsumerOffsetInterval = flushConsumerOffsetInterval;
+ }
+
+
+ public int getFlushConsumerOffsetHistoryInterval() {
+ return flushConsumerOffsetHistoryInterval;
+ }
+
+
+ public void setFlushConsumerOffsetHistoryInterval(int flushConsumerOffsetHistoryInterval) {
+ this.flushConsumerOffsetHistoryInterval = flushConsumerOffsetHistoryInterval;
+ }
+
+
+ public boolean isClusterTopicEnable() {
+ return clusterTopicEnable;
+ }
+
+
+ public void setClusterTopicEnable(boolean clusterTopicEnable) {
+ this.clusterTopicEnable = clusterTopicEnable;
+ }
+
+
+ public String getNamesrvAddr() {
+ return namesrvAddr;
+ }
+
+
+ public void setNamesrvAddr(String namesrvAddr) {
+ this.namesrvAddr = namesrvAddr;
+ }
+
+
+ public long getBrokerId() {
+ return brokerId;
+ }
+
+
+ public void setBrokerId(long brokerId) {
+ this.brokerId = brokerId;
+ }
+
+
+ public boolean isAutoCreateSubscriptionGroup() {
+ return autoCreateSubscriptionGroup;
+ }
+
+
+ public void setAutoCreateSubscriptionGroup(boolean autoCreateSubscriptionGroup) {
+ this.autoCreateSubscriptionGroup = autoCreateSubscriptionGroup;
+ }
+
+
+ public boolean isRejectTransactionMessage() {
+ return rejectTransactionMessage;
+ }
+
+
+ public void setRejectTransactionMessage(boolean rejectTransactionMessage) {
+ this.rejectTransactionMessage = rejectTransactionMessage;
+ }
+
+
+ public boolean isFetchNamesrvAddrByAddressServer() {
+ return fetchNamesrvAddrByAddressServer;
+ }
+
+
+ public void setFetchNamesrvAddrByAddressServer(boolean fetchNamesrvAddrByAddressServer) {
+ this.fetchNamesrvAddrByAddressServer = fetchNamesrvAddrByAddressServer;
+ }
+
+
+ public int getSendThreadPoolQueueCapacity() {
+ return sendThreadPoolQueueCapacity;
+ }
+
+
+ public void setSendThreadPoolQueueCapacity(int sendThreadPoolQueueCapacity) {
+ this.sendThreadPoolQueueCapacity = sendThreadPoolQueueCapacity;
+ }
+
+
+ public int getPullThreadPoolQueueCapacity() {
+ return pullThreadPoolQueueCapacity;
+ }
+
+
+ public void setPullThreadPoolQueueCapacity(int pullThreadPoolQueueCapacity) {
+ this.pullThreadPoolQueueCapacity = pullThreadPoolQueueCapacity;
+ }
+
+
+ public boolean isBrokerTopicEnable() {
+ return brokerTopicEnable;
+ }
+
+
+ public void setBrokerTopicEnable(boolean brokerTopicEnable) {
+ this.brokerTopicEnable = brokerTopicEnable;
+ }
+
+
+ public int getFilterServerNums() {
+ return filterServerNums;
+ }
+
+
+ public void setFilterServerNums(int filterServerNums) {
+ this.filterServerNums = filterServerNums;
+ }
+
+
+ public boolean isLongPollingEnable() {
+ return longPollingEnable;
+ }
+
+
+ public void setLongPollingEnable(boolean longPollingEnable) {
+ this.longPollingEnable = longPollingEnable;
+ }
+
+
+ public boolean isNotifyConsumerIdsChangedEnable() {
+ return notifyConsumerIdsChangedEnable;
+ }
+
+
+ public void setNotifyConsumerIdsChangedEnable(boolean notifyConsumerIdsChangedEnable) {
+ this.notifyConsumerIdsChangedEnable = notifyConsumerIdsChangedEnable;
+ }
+
+
+ public long getShortPollingTimeMills() {
+ return shortPollingTimeMills;
+ }
+
+
+ public void setShortPollingTimeMills(long shortPollingTimeMills) {
+ this.shortPollingTimeMills = shortPollingTimeMills;
+ }
+
+
+ public int getClientManageThreadPoolNums() {
+ return clientManageThreadPoolNums;
+ }
+
+
+ public void setClientManageThreadPoolNums(int clientManageThreadPoolNums) {
+ this.clientManageThreadPoolNums = clientManageThreadPoolNums;
+ }
+
+
+ public boolean isCommercialEnable() {
+ return commercialEnable;
+ }
+
+
+ public void setCommercialEnable(final boolean commercialEnable) {
+ this.commercialEnable = commercialEnable;
+ }
+
+ public int getCommercialTimerCount() {
+ return commercialTimerCount;
+ }
+
+ public void setCommercialTimerCount(final int commercialTimerCount) {
+ this.commercialTimerCount = commercialTimerCount;
+ }
+
+ public int getCommercialTransCount() {
+ return commercialTransCount;
+ }
+
+ public void setCommercialTransCount(final int commercialTransCount) {
+ this.commercialTransCount = commercialTransCount;
+ }
+
+ public int getCommercialBigCount() {
+ return commercialBigCount;
+ }
+
+ public void setCommercialBigCount(final int commercialBigCount) {
+ this.commercialBigCount = commercialBigCount;
+ }
+
+ public int getMaxDelayTime() {
+ return maxDelayTime;
+ }
+
+
+ public void setMaxDelayTime(final int maxDelayTime) {
+ this.maxDelayTime = maxDelayTime;
+ }
+
+ public int getClientManagerThreadPoolQueueCapacity() {
+ return clientManagerThreadPoolQueueCapacity;
+ }
+
+ public void setClientManagerThreadPoolQueueCapacity(int clientManagerThreadPoolQueueCapacity) {
+ this.clientManagerThreadPoolQueueCapacity = clientManagerThreadPoolQueueCapacity;
+ }
+
+ public int getConsumerManagerThreadPoolQueueCapacity() {
+ return consumerManagerThreadPoolQueueCapacity;
+ }
+
+ public void setConsumerManagerThreadPoolQueueCapacity(int consumerManagerThreadPoolQueueCapacity) {
+ this.consumerManagerThreadPoolQueueCapacity = consumerManagerThreadPoolQueueCapacity;
+ }
+
+ public int getConsumerManageThreadPoolNums() {
+ return consumerManageThreadPoolNums;
+ }
+
+ public void setConsumerManageThreadPoolNums(int consumerManageThreadPoolNums) {
+ this.consumerManageThreadPoolNums = consumerManageThreadPoolNums;
+ }
+}