You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 02:44:26 UTC
[30/58] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18
Rename package name from com.alibaba to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/DefaultMQProducer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/DefaultMQProducer.java
deleted file mode 100644
index 6f861d3..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/producer/DefaultMQProducer.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.producer;
-
-import com.alibaba.rocketmq.client.ClientConfig;
-import com.alibaba.rocketmq.client.QueryResult;
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.message.*;
-import com.alibaba.rocketmq.remoting.RPCHook;
-import com.alibaba.rocketmq.remoting.exception.RemotingException;
-
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- */
-public class DefaultMQProducer extends ClientConfig implements MQProducer {
- protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
- private String producerGroup;
- /**
- * Just for testing or demo program
- */
- private String createTopicKey = MixAll.DEFAULT_TOPIC;
- private volatile int defaultTopicQueueNums = 4;
- private int sendMsgTimeout = 3000;
- private int compressMsgBodyOverHowmuch = 1024 * 4;
- private int retryTimesWhenSendFailed = 2;
- private int retryTimesWhenSendAsyncFailed = 2;
-
- private boolean retryAnotherBrokerWhenNotStoreOK = false;
- private int maxMessageSize = 1024 * 1024 * 4; // 4M
- public DefaultMQProducer() {
- this(MixAll.DEFAULT_PRODUCER_GROUP, null);
- }
-
-
- public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
- this.producerGroup = producerGroup;
- defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
- }
-
-
- public DefaultMQProducer(final String producerGroup) {
- this(producerGroup, null);
- }
-
-
- public DefaultMQProducer(RPCHook rpcHook) {
- this(MixAll.DEFAULT_PRODUCER_GROUP, rpcHook);
- }
-
-
- @Override
- public void start() throws MQClientException {
- this.defaultMQProducerImpl.start();
- }
-
- @Override
- public void shutdown() {
- this.defaultMQProducerImpl.shutdown();
- }
-
-
- @Override
- public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException {
- return this.defaultMQProducerImpl.fetchPublishMessageQueues(topic);
- }
-
-
- @Override
- public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- return this.defaultMQProducerImpl.send(msg);
- }
-
-
- @Override
- public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- return this.defaultMQProducerImpl.send(msg, timeout);
- }
-
-
- @Override
- public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
- this.defaultMQProducerImpl.send(msg, sendCallback);
- }
-
-
- @Override
- public void send(Message msg, SendCallback sendCallback, long timeout)
- throws MQClientException, RemotingException, InterruptedException {
- this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
- }
-
-
- @Override
- public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
- this.defaultMQProducerImpl.sendOneway(msg);
- }
-
-
- @Override
- public SendResult send(Message msg, MessageQueue mq)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- return this.defaultMQProducerImpl.send(msg, mq);
- }
-
-
- @Override
- public SendResult send(Message msg, MessageQueue mq, long timeout)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- return this.defaultMQProducerImpl.send(msg, mq, timeout);
- }
-
-
- @Override
- public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
- throws MQClientException, RemotingException, InterruptedException {
- this.defaultMQProducerImpl.send(msg, mq, sendCallback);
- }
-
-
- @Override
- public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
- throws MQClientException, RemotingException, InterruptedException {
- this.defaultMQProducerImpl.send(msg, mq, sendCallback, timeout);
- }
-
-
- @Override
- public void sendOneway(Message msg, MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
- this.defaultMQProducerImpl.sendOneway(msg, mq);
- }
-
-
- @Override
- public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- return this.defaultMQProducerImpl.send(msg, selector, arg);
- }
-
-
- @Override
- public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- return this.defaultMQProducerImpl.send(msg, selector, arg, timeout);
- }
-
-
- @Override
- public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
- throws MQClientException, RemotingException, InterruptedException {
- this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback);
- }
-
-
- @Override
- public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)
- throws MQClientException, RemotingException, InterruptedException {
- this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout);
- }
-
-
- @Override
- public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
- throws MQClientException, RemotingException, InterruptedException {
- this.defaultMQProducerImpl.sendOneway(msg, selector, arg);
- }
-
-
- @Override
- public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, final Object arg)
- throws MQClientException {
- throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class");
- }
-
-
- @Override
- public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
- createTopic(key, newTopic, queueNum, 0);
- }
-
-
- @Override
- public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
- this.defaultMQProducerImpl.createTopic(key, newTopic, queueNum, topicSysFlag);
- }
-
-
- @Override
- public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
- return this.defaultMQProducerImpl.searchOffset(mq, timestamp);
- }
-
-
- @Override
- public long maxOffset(MessageQueue mq) throws MQClientException {
- return this.defaultMQProducerImpl.maxOffset(mq);
- }
-
-
- @Override
- public long minOffset(MessageQueue mq) throws MQClientException {
- return this.defaultMQProducerImpl.minOffset(mq);
- }
-
-
- @Override
- public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
- return this.defaultMQProducerImpl.earliestMsgStoreTime(mq);
- }
-
-
- @Override
- public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- return this.defaultMQProducerImpl.viewMessage(offsetMsgId);
- }
-
-
- @Override
- public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
- throws MQClientException, InterruptedException {
- return this.defaultMQProducerImpl.queryMessage(topic, key, maxNum, begin, end);
- }
-
-
- @Override
- public MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- try {
- MessageId oldMsgId = MessageDecoder.decodeMessageId(msgId);
- return this.viewMessage(msgId);
- } catch (Exception e) {
- }
- return this.defaultMQProducerImpl.queryMessageByUniqKey(topic, msgId);
- }
-
- public String getProducerGroup() {
- return producerGroup;
- }
-
-
- public void setProducerGroup(String producerGroup) {
- this.producerGroup = producerGroup;
- }
-
-
- public String getCreateTopicKey() {
- return createTopicKey;
- }
-
-
- public void setCreateTopicKey(String createTopicKey) {
- this.createTopicKey = createTopicKey;
- }
-
-
- public int getSendMsgTimeout() {
- return sendMsgTimeout;
- }
-
-
- public void setSendMsgTimeout(int sendMsgTimeout) {
- this.sendMsgTimeout = sendMsgTimeout;
- }
-
-
- public int getCompressMsgBodyOverHowmuch() {
- return compressMsgBodyOverHowmuch;
- }
-
-
- public void setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch) {
- this.compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch;
- }
-
-
- public DefaultMQProducerImpl getDefaultMQProducerImpl() {
- return defaultMQProducerImpl;
- }
-
-
- public boolean isRetryAnotherBrokerWhenNotStoreOK() {
- return retryAnotherBrokerWhenNotStoreOK;
- }
-
-
- public void setRetryAnotherBrokerWhenNotStoreOK(boolean retryAnotherBrokerWhenNotStoreOK) {
- this.retryAnotherBrokerWhenNotStoreOK = retryAnotherBrokerWhenNotStoreOK;
- }
-
-
- public int getMaxMessageSize() {
- return maxMessageSize;
- }
-
-
- public void setMaxMessageSize(int maxMessageSize) {
- this.maxMessageSize = maxMessageSize;
- }
-
-
- public int getDefaultTopicQueueNums() {
- return defaultTopicQueueNums;
- }
-
-
- public void setDefaultTopicQueueNums(int defaultTopicQueueNums) {
- this.defaultTopicQueueNums = defaultTopicQueueNums;
- }
-
-
- public int getRetryTimesWhenSendFailed() {
- return retryTimesWhenSendFailed;
- }
-
-
- public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
- this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
- }
-
-
- public boolean isSendMessageWithVIPChannel() {
- return isVipChannelEnabled();
- }
-
-
- public void setSendMessageWithVIPChannel(final boolean sendMessageWithVIPChannel) {
- this.setVipChannelEnabled(sendMessageWithVIPChannel);
- }
-
-
- public long[] getNotAvailableDuration() {
- return this.defaultMQProducerImpl.getNotAvailableDuration();
- }
-
- public void setNotAvailableDuration(final long[] notAvailableDuration) {
- this.defaultMQProducerImpl.setNotAvailableDuration(notAvailableDuration);
- }
-
- public long[] getLatencyMax() {
- return this.defaultMQProducerImpl.getLatencyMax();
- }
-
- public void setLatencyMax(final long[] latencyMax) {
- this.defaultMQProducerImpl.setLatencyMax(latencyMax);
- }
-
- public boolean isSendLatencyFaultEnable() {
- return this.defaultMQProducerImpl.isSendLatencyFaultEnable();
- }
-
- public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
- this.defaultMQProducerImpl.setSendLatencyFaultEnable(sendLatencyFaultEnable);
- }
-
- public int getRetryTimesWhenSendAsyncFailed() {
- return retryTimesWhenSendAsyncFailed;
- }
-
- public void setRetryTimesWhenSendAsyncFailed(final int retryTimesWhenSendAsyncFailed) {
- this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionExecuter.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionExecuter.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionExecuter.java
deleted file mode 100644
index af3723a..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionExecuter.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.producer;
-
-import com.alibaba.rocketmq.common.message.Message;
-
-
-/**
- * @author shijia.wxr
- */
-public interface LocalTransactionExecuter {
- public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg);
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionState.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionState.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionState.java
deleted file mode 100644
index ee2a93a..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionState.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.producer;
-
-/**
- * @author shijia.wxr
- */
-public enum LocalTransactionState {
- COMMIT_MESSAGE,
- ROLLBACK_MESSAGE,
- UNKNOW,
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/MQProducer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/MQProducer.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/MQProducer.java
deleted file mode 100644
index e21bc00..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/producer/MQProducer.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.producer;
-
-import com.alibaba.rocketmq.client.MQAdmin;
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.common.message.Message;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.remoting.exception.RemotingException;
-
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- */
-public interface MQProducer extends MQAdmin {
- void start() throws MQClientException;
-
- void shutdown();
-
-
- List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;
-
-
- SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,
- InterruptedException;
-
-
- SendResult send(final Message msg, final long timeout) throws MQClientException,
- RemotingException, MQBrokerException, InterruptedException;
-
-
- void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
- RemotingException, InterruptedException;
-
-
- void send(final Message msg, final SendCallback sendCallback, final long timeout)
- throws MQClientException, RemotingException, InterruptedException;
-
-
- void sendOneway(final Message msg) throws MQClientException, RemotingException,
- InterruptedException;
-
-
- SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,
- RemotingException, MQBrokerException, InterruptedException;
-
-
- SendResult send(final Message msg, final MessageQueue mq, final long timeout)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
-
-
- void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback)
- throws MQClientException, RemotingException, InterruptedException;
-
-
- void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout)
- throws MQClientException, RemotingException, InterruptedException;
-
-
- void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException,
- RemotingException, InterruptedException;
-
-
- SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
-
-
- SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg,
- final long timeout) throws MQClientException, RemotingException, MQBrokerException,
- InterruptedException;
-
-
- void send(final Message msg, final MessageQueueSelector selector, final Object arg,
- final SendCallback sendCallback) throws MQClientException, RemotingException,
- InterruptedException;
-
-
- void send(final Message msg, final MessageQueueSelector selector, final Object arg,
- final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
- InterruptedException;
-
-
- void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg)
- throws MQClientException, RemotingException, InterruptedException;
-
-
- TransactionSendResult sendMessageInTransaction(final Message msg,
- final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/MessageQueueSelector.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/MessageQueueSelector.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/MessageQueueSelector.java
deleted file mode 100644
index 924c145..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/producer/MessageQueueSelector.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.producer;
-
-import com.alibaba.rocketmq.common.message.Message;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- */
-public interface MessageQueueSelector {
- MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/SendCallback.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/SendCallback.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/SendCallback.java
deleted file mode 100644
index 35d1a72..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/producer/SendCallback.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.producer;
-
-/**
- * @author shijia.wxr
- */
-public interface SendCallback {
- public void onSuccess(final SendResult sendResult);
-
-
- public void onException(final Throwable e);
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/SendResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/SendResult.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/SendResult.java
deleted file mode 100644
index 183accf..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/producer/SendResult.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.producer;
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-
-/**
- * @author shijia.wxr
- */
-public class SendResult {
- private SendStatus sendStatus;
- private String msgId;
- private MessageQueue messageQueue;
- private long queueOffset;
- private String transactionId;
- private String offsetMsgId;
- private String regionId;
- private boolean traceOn = true;
-
- public SendResult() {
- }
-
- public SendResult(SendStatus sendStatus, String msgId, String offsetMsgId, MessageQueue messageQueue, long queueOffset) {
- this.sendStatus = sendStatus;
- this.msgId = msgId;
- this.offsetMsgId = offsetMsgId;
- this.messageQueue = messageQueue;
- this.queueOffset = queueOffset;
- }
-
- public SendResult(final SendStatus sendStatus, final String msgId, final MessageQueue messageQueue, final long queueOffset, final String transactionId, final String offsetMsgId, final String regionId) {
- this.sendStatus = sendStatus;
- this.msgId = msgId;
- this.messageQueue = messageQueue;
- this.queueOffset = queueOffset;
- this.transactionId = transactionId;
- this.offsetMsgId = offsetMsgId;
- this.regionId = regionId;
- }
-
- public boolean isTraceOn() {
- return traceOn;
- }
-
- public void setTraceOn(final boolean traceOn) {
- this.traceOn = traceOn;
- }
-
- public String getRegionId() {
- return regionId;
- }
-
- public void setRegionId(final String regionId) {
- this.regionId = regionId;
- }
-
- public static String encoderSendResultToJson(final Object obj) {
- return JSON.toJSONString(obj);
- }
-
- public static SendResult decoderSendResultFromJson(String json) {
- return JSON.parseObject(json, SendResult.class);
- }
-
- public String getMsgId() {
- return msgId;
- }
-
-
- public void setMsgId(String msgId) {
- this.msgId = msgId;
- }
-
-
- public SendStatus getSendStatus() {
- return sendStatus;
- }
-
-
- public void setSendStatus(SendStatus sendStatus) {
- this.sendStatus = sendStatus;
- }
-
-
- public MessageQueue getMessageQueue() {
- return messageQueue;
- }
-
-
- public void setMessageQueue(MessageQueue messageQueue) {
- this.messageQueue = messageQueue;
- }
-
-
- public long getQueueOffset() {
- return queueOffset;
- }
-
-
- public void setQueueOffset(long queueOffset) {
- this.queueOffset = queueOffset;
- }
-
-
- public String getTransactionId() {
- return transactionId;
- }
-
-
- public void setTransactionId(String transactionId) {
- this.transactionId = transactionId;
- }
-
- public String getOffsetMsgId() {
- return offsetMsgId;
- }
-
- public void setOffsetMsgId(String offsetMsgId) {
- this.offsetMsgId = offsetMsgId;
- }
-
- @Override
- public String toString() {
- return "SendResult [sendStatus=" + sendStatus + ", msgId=" + msgId + ", offsetMsgId=" + offsetMsgId + ", messageQueue=" + messageQueue
- + ", queueOffset=" + queueOffset + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/SendStatus.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/SendStatus.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/SendStatus.java
deleted file mode 100644
index 3bc572f..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/producer/SendStatus.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.producer;
-
-/**
- * @author shijia.wxr
- */
-public enum SendStatus {
- SEND_OK,
- FLUSH_DISK_TIMEOUT,
- FLUSH_SLAVE_TIMEOUT,
- SLAVE_NOT_AVAILABLE,
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionCheckListener.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionCheckListener.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionCheckListener.java
deleted file mode 100644
index 8440537..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionCheckListener.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.producer;
-
-import com.alibaba.rocketmq.common.message.MessageExt;
-
-
-/**
- * @author shijia.wxr
- */
-public interface TransactionCheckListener {
- LocalTransactionState checkLocalTransactionState(final MessageExt msg);
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionMQProducer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionMQProducer.java
deleted file mode 100644
index 08dd4ab..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionMQProducer.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.producer;
-
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.common.message.Message;
-import com.alibaba.rocketmq.remoting.RPCHook;
-
-
-/**
- * @author shijia.wxr
- */
-public class TransactionMQProducer extends DefaultMQProducer {
- private TransactionCheckListener transactionCheckListener;
- private int checkThreadPoolMinSize = 1;
- private int checkThreadPoolMaxSize = 1;
- private int checkRequestHoldMax = 2000;
-
-
- public TransactionMQProducer() {
- }
-
-
- public TransactionMQProducer(final String producerGroup) {
- super(producerGroup);
- }
-
- public TransactionMQProducer(final String producerGroup, RPCHook rpcHook) {
- super(producerGroup, rpcHook);
- }
-
- @Override
- public void start() throws MQClientException {
- this.defaultMQProducerImpl.initTransactionEnv();
- super.start();
- }
-
-
- @Override
- public void shutdown() {
- super.shutdown();
- this.defaultMQProducerImpl.destroyTransactionEnv();
- }
-
-
- @Override
- public TransactionSendResult sendMessageInTransaction(final Message msg,
- final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException {
- if (null == this.transactionCheckListener) {
- throw new MQClientException("localTransactionBranchCheckListener is null", null);
- }
-
- return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg);
- }
-
-
- public TransactionCheckListener getTransactionCheckListener() {
- return transactionCheckListener;
- }
-
-
- public void setTransactionCheckListener(TransactionCheckListener transactionCheckListener) {
- this.transactionCheckListener = transactionCheckListener;
- }
-
-
- public int getCheckThreadPoolMinSize() {
- return checkThreadPoolMinSize;
- }
-
-
- public void setCheckThreadPoolMinSize(int checkThreadPoolMinSize) {
- this.checkThreadPoolMinSize = checkThreadPoolMinSize;
- }
-
-
- public int getCheckThreadPoolMaxSize() {
- return checkThreadPoolMaxSize;
- }
-
-
- public void setCheckThreadPoolMaxSize(int checkThreadPoolMaxSize) {
- this.checkThreadPoolMaxSize = checkThreadPoolMaxSize;
- }
-
-
- public int getCheckRequestHoldMax() {
- return checkRequestHoldMax;
- }
-
-
- public void setCheckRequestHoldMax(int checkRequestHoldMax) {
- this.checkRequestHoldMax = checkRequestHoldMax;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionSendResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionSendResult.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionSendResult.java
deleted file mode 100644
index e7dcd0e..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionSendResult.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.producer;
-
-/**
- * @author shijia.wxr
- */
-public class TransactionSendResult extends SendResult {
- private LocalTransactionState localTransactionState;
-
-
- public TransactionSendResult() {
- }
-
-
- public LocalTransactionState getLocalTransactionState() {
- return localTransactionState;
- }
-
-
- public void setLocalTransactionState(LocalTransactionState localTransactionState) {
- this.localTransactionState = localTransactionState;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByHash.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByHash.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByHash.java
deleted file mode 100644
index 648356b..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByHash.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.producer.selector;
-
-import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
-import com.alibaba.rocketmq.common.message.Message;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- */
-public class SelectMessageQueueByHash implements MessageQueueSelector {
-
- @Override
- public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
- int value = arg.hashCode();
- if (value < 0) {
- value = Math.abs(value);
- }
-
- value = value % mqs.size();
- return mqs.get(value);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java
deleted file mode 100644
index a213391..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.producer.selector;
-
-import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
-import com.alibaba.rocketmq.common.message.Message;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-import java.util.List;
-import java.util.Set;
-
-
-/**
- * @author shijia.wxr
- */
-public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
- private Set<String> consumeridcs;
-
-
- @Override
- public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
- return null;
- }
-
-
- public Set<String> getConsumeridcs() {
- return consumeridcs;
- }
-
-
- public void setConsumeridcs(Set<String> consumeridcs) {
- this.consumeridcs = consumeridcs;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java
deleted file mode 100644
index 3f381e4..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.producer.selector;
-
-import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
-import com.alibaba.rocketmq.common.message.Message;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-import java.util.List;
-import java.util.Random;
-
-
-/**
- * @author shijia.wxr
- */
-public class SelectMessageQueueByRandoom implements MessageQueueSelector {
- private Random random = new Random(System.currentTimeMillis());
-
-
- @Override
- public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
- int value = random.nextInt();
- if (value < 0) {
- value = Math.abs(value);
- }
-
- value = value % mqs.size();
- return mqs.get(value);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/stat/ConsumerStatsManager.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/stat/ConsumerStatsManager.java b/client/src/main/java/com/alibaba/rocketmq/client/stat/ConsumerStatsManager.java
deleted file mode 100644
index e07e233..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/stat/ConsumerStatsManager.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.client.stat;
-
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.common.protocol.body.ConsumeStatus;
-import com.alibaba.rocketmq.common.stats.StatsItemSet;
-import com.alibaba.rocketmq.common.stats.StatsSnapshot;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ScheduledExecutorService;
-
-
-public class ConsumerStatsManager {
- private static final Logger log = LoggerFactory.getLogger(LoggerName.CLIENT_LOGGER_NAME);
-
- private static final String TOPIC_AND_GROUP_CONSUME_OK_TPS = "CONSUME_OK_TPS";
- private static final String TOPIC_AND_GROUP_CONSUME_FAILED_TPS = "CONSUME_FAILED_TPS";
- private static final String TOPIC_AND_GROUP_CONSUME_RT = "CONSUME_RT";
- private static final String TOPIC_AND_GROUP_PULL_TPS = "PULL_TPS";
- private static final String TOPIC_AND_GROUP_PULL_RT = "PULL_RT";
-
- private final StatsItemSet topicAndGroupConsumeOKTPS;
- private final StatsItemSet topicAndGroupConsumeRT;
- private final StatsItemSet topicAndGroupConsumeFailedTPS;
- private final StatsItemSet topicAndGroupPullTPS;
- private final StatsItemSet topicAndGroupPullRT;
-
-
- public ConsumerStatsManager(final ScheduledExecutorService scheduledExecutorService) {
- this.topicAndGroupConsumeOKTPS =
- new StatsItemSet(TOPIC_AND_GROUP_CONSUME_OK_TPS, scheduledExecutorService, log);
-
- this.topicAndGroupConsumeRT =
- new StatsItemSet(TOPIC_AND_GROUP_CONSUME_RT, scheduledExecutorService, log);
-
- this.topicAndGroupConsumeFailedTPS =
- new StatsItemSet(TOPIC_AND_GROUP_CONSUME_FAILED_TPS, scheduledExecutorService, log);
-
- this.topicAndGroupPullTPS = new StatsItemSet(TOPIC_AND_GROUP_PULL_TPS, scheduledExecutorService, log);
-
- this.topicAndGroupPullRT = new StatsItemSet(TOPIC_AND_GROUP_PULL_RT, scheduledExecutorService, log);
- }
-
-
- public void start() {
- }
-
-
- public void shutdown() {
- }
-
-
- public void incPullRT(final String group, final String topic, final long rt) {
- this.topicAndGroupPullRT.addValue(topic + "@" + group, (int) rt, 1);
- }
-
-
- public void incPullTPS(final String group, final String topic, final long msgs) {
- this.topicAndGroupPullTPS.addValue(topic + "@" + group, (int) msgs, 1);
- }
-
-
- public void incConsumeRT(final String group, final String topic, final long rt) {
- this.topicAndGroupConsumeRT.addValue(topic + "@" + group, (int) rt, 1);
- }
-
-
- public void incConsumeOKTPS(final String group, final String topic, final long msgs) {
- this.topicAndGroupConsumeOKTPS.addValue(topic + "@" + group, (int) msgs, 1);
- }
-
-
- public void incConsumeFailedTPS(final String group, final String topic, final long msgs) {
- this.topicAndGroupConsumeFailedTPS.addValue(topic + "@" + group, (int) msgs, 1);
- }
-
- public ConsumeStatus consumeStatus(final String group, final String topic) {
- ConsumeStatus cs = new ConsumeStatus();
- {
- StatsSnapshot ss = this.getPullRT(group, topic);
- if (ss != null) {
- cs.setPullRT(ss.getAvgpt());
- }
- }
-
- {
- StatsSnapshot ss = this.getPullTPS(group, topic);
- if (ss != null) {
- cs.setPullTPS(ss.getTps());
- }
- }
-
- {
- StatsSnapshot ss = this.getConsumeRT(group, topic);
- if (ss != null) {
- cs.setConsumeRT(ss.getAvgpt());
- }
- }
-
- {
- StatsSnapshot ss = this.getConsumeOKTPS(group, topic);
- if (ss != null) {
- cs.setConsumeOKTPS(ss.getTps());
- }
- }
-
- {
- StatsSnapshot ss = this.getConsumeFailedTPS(group, topic);
- if (ss != null) {
- cs.setConsumeFailedTPS(ss.getTps());
- }
- }
-
- {
- StatsSnapshot ss = this.topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" + group);
- if (ss != null) {
- cs.setConsumeFailedMsgs(ss.getSum());
- }
- }
-
- return cs;
- }
-
- private StatsSnapshot getPullRT(final String group, final String topic) {
- return this.topicAndGroupPullRT.getStatsDataInMinute(topic + "@" + group);
- }
-
- private StatsSnapshot getPullTPS(final String group, final String topic) {
- return this.topicAndGroupPullTPS.getStatsDataInMinute(topic + "@" + group);
- }
-
- private StatsSnapshot getConsumeRT(final String group, final String topic) {
- StatsSnapshot statsData = this.topicAndGroupConsumeRT.getStatsDataInMinute(topic + "@" + group);
- if (0 == statsData.getSum()) {
- statsData = this.topicAndGroupConsumeRT.getStatsDataInHour(topic + "@" + group);
- }
-
- return statsData;
- }
-
- private StatsSnapshot getConsumeOKTPS(final String group, final String topic) {
- return this.topicAndGroupConsumeOKTPS.getStatsDataInMinute(topic + "@" + group);
- }
-
- private StatsSnapshot getConsumeFailedTPS(final String group, final String topic) {
- return this.topicAndGroupConsumeFailedTPS.getStatsDataInMinute(topic + "@" + group);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
new file mode 100644
index 0000000..8d15108
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client;
+
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+
+
+/**
+ * Client Common configuration
+ *
+ * @author shijia.wxr
+ * @author vongosling
+ */
+public class ClientConfig {
+ public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel";
+ private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
+ private String clientIP = RemotingUtil.getLocalAddress();
+ private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
+ private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
+ /**
+ * Pulling topic information interval from the named server
+ */
+ private int pollNameServerInteval = 1000 * 30;
+ /**
+ * Heartbeat interval in microseconds with message broker
+ */
+ private int heartbeatBrokerInterval = 1000 * 30;
+ /**
+ * Offset persistent interval for consumer
+ */
+ private int persistConsumerOffsetInterval = 1000 * 5;
+ private boolean unitMode = false;
+ private String unitName;
+ private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
+
+ public String buildMQClientId() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(this.getClientIP());
+
+ sb.append("@");
+ sb.append(this.getInstanceName());
+ if (!UtilAll.isBlank(this.unitName)) {
+ sb.append("@");
+ sb.append(this.unitName);
+ }
+
+ return sb.toString();
+ }
+
+ public String getClientIP() {
+ return clientIP;
+ }
+
+ public void setClientIP(String clientIP) {
+ this.clientIP = clientIP;
+ }
+
+ public String getInstanceName() {
+ return instanceName;
+ }
+
+ public void setInstanceName(String instanceName) {
+ this.instanceName = instanceName;
+ }
+
+ public void changeInstanceNameToPID() {
+ if (this.instanceName.equals("DEFAULT")) {
+ this.instanceName = String.valueOf(UtilAll.getPid());
+ }
+ }
+
+ public void resetClientConfig(final ClientConfig cc) {
+ this.namesrvAddr = cc.namesrvAddr;
+ this.clientIP = cc.clientIP;
+ this.instanceName = cc.instanceName;
+ this.clientCallbackExecutorThreads = cc.clientCallbackExecutorThreads;
+ this.pollNameServerInteval = cc.pollNameServerInteval;
+ this.heartbeatBrokerInterval = cc.heartbeatBrokerInterval;
+ this.persistConsumerOffsetInterval = cc.persistConsumerOffsetInterval;
+ this.unitMode = cc.unitMode;
+ this.unitName = cc.unitName;
+ this.vipChannelEnabled = cc.vipChannelEnabled;
+ }
+
+ public ClientConfig cloneClientConfig() {
+ ClientConfig cc = new ClientConfig();
+ cc.namesrvAddr = namesrvAddr;
+ cc.clientIP = clientIP;
+ cc.instanceName = instanceName;
+ cc.clientCallbackExecutorThreads = clientCallbackExecutorThreads;
+ cc.pollNameServerInteval = pollNameServerInteval;
+ cc.heartbeatBrokerInterval = heartbeatBrokerInterval;
+ cc.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
+ cc.unitMode = unitMode;
+ cc.unitName = unitName;
+ cc.vipChannelEnabled = vipChannelEnabled;
+ return cc;
+ }
+
+ public String getNamesrvAddr() {
+ return namesrvAddr;
+ }
+
+ public void setNamesrvAddr(String namesrvAddr) {
+ this.namesrvAddr = namesrvAddr;
+ }
+
+ public int getClientCallbackExecutorThreads() {
+ return clientCallbackExecutorThreads;
+ }
+
+
+ public void setClientCallbackExecutorThreads(int clientCallbackExecutorThreads) {
+ this.clientCallbackExecutorThreads = clientCallbackExecutorThreads;
+ }
+
+
+ public int getPollNameServerInteval() {
+ return pollNameServerInteval;
+ }
+
+
+ public void setPollNameServerInteval(int pollNameServerInteval) {
+ this.pollNameServerInteval = pollNameServerInteval;
+ }
+
+
+ public int getHeartbeatBrokerInterval() {
+ return heartbeatBrokerInterval;
+ }
+
+
+ public void setHeartbeatBrokerInterval(int heartbeatBrokerInterval) {
+ this.heartbeatBrokerInterval = heartbeatBrokerInterval;
+ }
+
+
+ public int getPersistConsumerOffsetInterval() {
+ return persistConsumerOffsetInterval;
+ }
+
+
+ public void setPersistConsumerOffsetInterval(int persistConsumerOffsetInterval) {
+ this.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
+ }
+
+
+ public String getUnitName() {
+ return unitName;
+ }
+
+
+ public void setUnitName(String unitName) {
+ this.unitName = unitName;
+ }
+
+
+ public boolean isUnitMode() {
+ return unitMode;
+ }
+
+
+ public void setUnitMode(boolean unitMode) {
+ this.unitMode = unitMode;
+ }
+
+
+ public boolean isVipChannelEnabled() {
+ return vipChannelEnabled;
+ }
+
+
+ public void setVipChannelEnabled(final boolean vipChannelEnabled) {
+ this.vipChannelEnabled = vipChannelEnabled;
+ }
+
+
+ @Override
+ public String toString() {
+ return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
+ + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInteval=" + pollNameServerInteval
+ + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval="
+ + persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
+ + vipChannelEnabled + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java b/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java
new file mode 100644
index 0000000..9e85283
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client;
+
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+
+/**
+ * Base interface for MQ management
+ *
+ * @author shijia.wxr
+ */
+public interface MQAdmin {
+ /**
+ * Creates an topic
+ *
+ * @param key
+ * accesskey
+ * @param newTopic
+ * topic name
+ * @param queueNum
+ * topic's queue number
+ *
+ * @throws MQClientException
+ */
+ void createTopic(final String key, final String newTopic, final int queueNum)
+ throws MQClientException;
+
+
+ /**
+ * Creates an topic
+ *
+ * @param key
+ * accesskey
+ * @param newTopic
+ * topic name
+ * @param queueNum
+ * topic's queue number
+ * @param topicSysFlag
+ * topic system flag
+ *
+ * @throws MQClientException
+ */
+ void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
+ throws MQClientException;
+
+
+ /**
+ * Gets the message queue offset according to some time in milliseconds<br>
+ * be cautious to call because of more IO overhead
+ *
+ * @param mq
+ * Instance of MessageQueue
+ * @param timestamp
+ * from when in milliseconds.
+ *
+ * @return offset
+ *
+ * @throws MQClientException
+ */
+ long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException;
+
+
+ /**
+ * Gets the max offset
+ *
+ * @param mq
+ * Instance of MessageQueue
+ *
+ * @return the max offset
+ *
+ * @throws MQClientException
+ */
+ long maxOffset(final MessageQueue mq) throws MQClientException;
+
+
+ /**
+ * Gets the minimum offset
+ *
+ * @param mq
+ * Instance of MessageQueue
+ *
+ * @return the minimum offset
+ *
+ * @throws MQClientException
+ */
+ long minOffset(final MessageQueue mq) throws MQClientException;
+
+
+ /**
+ * Gets the earliest stored message time
+ *
+ * @param mq
+ * Instance of MessageQueue
+ *
+ * @return the time in microseconds
+ *
+ * @throws MQClientException
+ */
+ long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException;
+
+
+ /**
+ * Query message according tto message id
+ *
+ * @param offsetMsgId
+ * message id
+ *
+ * @return message
+ *
+ * @throws InterruptedException
+ * @throws MQBrokerException
+ * @throws RemotingException
+ * @throws MQClientException
+ */
+ MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException,
+ InterruptedException, MQClientException;
+
+
+ /**
+ * Query messages
+ *
+ * @param topic
+ * message topic
+ * @param key
+ * message key index word
+ * @param maxNum
+ * max message number
+ * @param begin
+ * from when
+ * @param end
+ * to when
+ *
+ * @return Instance of QueryResult
+ *
+ * @throws MQClientException
+ * @throws InterruptedException
+ */
+ QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
+ final long end) throws MQClientException, InterruptedException;
+
+ /**
+
+ * @param topic
+ * @param msgId
+ * @return The {@code MessageExt} of given msgId
+ * @throws RemotingException
+ * @throws MQBrokerException
+ * @throws InterruptedException
+ * @throws MQClientException
+ */
+ MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/MQHelper.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/MQHelper.java b/client/src/main/java/org/apache/rocketmq/client/MQHelper.java
new file mode 100644
index 0000000..41009c5
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/MQHelper.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.slf4j.Logger;
+
+import java.util.Set;
+import java.util.TreeSet;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MQHelper {
+ public static void resetOffsetByTimestamp(
+ final MessageModel messageModel,
+ final String consumerGroup,
+ final String topic,
+ final long timestamp) throws Exception {
+ resetOffsetByTimestamp(messageModel, "DEFAULT", consumerGroup, topic, timestamp);
+ }
+
+ /**
+ * Reset consumer topic offset according to time
+ *
+ * @param messageModel
+ * which model
+ * @param instanceName
+ * which instance
+ * @param consumerGroup
+ * consumer group
+ * @param topic
+ * topic
+ * @param timestamp
+ * time
+ *
+ * @throws Exception
+ */
+ public static void resetOffsetByTimestamp(
+ final MessageModel messageModel,
+ final String instanceName,
+ final String consumerGroup,
+ final String topic,
+ final long timestamp) throws Exception {
+ final Logger log = ClientLogger.getLog();
+
+ DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(consumerGroup);
+ consumer.setInstanceName(instanceName);
+ consumer.setMessageModel(messageModel);
+ consumer.start();
+
+ Set<MessageQueue> mqs = null;
+ try {
+ mqs = consumer.fetchSubscribeMessageQueues(topic);
+ if (mqs != null && !mqs.isEmpty()) {
+ TreeSet<MessageQueue> mqsNew = new TreeSet<MessageQueue>(mqs);
+ for (MessageQueue mq : mqsNew) {
+ long offset = consumer.searchOffset(mq, timestamp);
+ if (offset >= 0) {
+ consumer.updateConsumeOffset(mq, offset);
+ log.info("resetOffsetByTimestamp updateConsumeOffset success, {} {} {}",
+ consumerGroup, offset, mq);
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.warn("resetOffsetByTimestamp Exception", e);
+ throw e;
+ } finally {
+ if (mqs != null) {
+ consumer.getDefaultMQPullConsumerImpl().getOffsetStore().persistAll(mqs);
+ }
+ consumer.shutdown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/QueryResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/QueryResult.java b/client/src/main/java/org/apache/rocketmq/client/QueryResult.java
new file mode 100644
index 0000000..cdbf1e7
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/QueryResult.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client;
+
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class QueryResult {
+ private final long indexLastUpdateTimestamp;
+ private final List<MessageExt> messageList;
+
+
+ public QueryResult(long indexLastUpdateTimestamp, List<MessageExt> messageList) {
+ this.indexLastUpdateTimestamp = indexLastUpdateTimestamp;
+ this.messageList = messageList;
+ }
+
+
+ public long getIndexLastUpdateTimestamp() {
+ return indexLastUpdateTimestamp;
+ }
+
+
+ public List<MessageExt> getMessageList() {
+ return messageList;
+ }
+
+
+ @Override
+ public String toString() {
+ return "QueryResult [indexLastUpdateTimestamp=" + indexLastUpdateTimestamp + ", messageList="
+ + messageList + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/Validators.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/Validators.java b/client/src/main/java/org/apache/rocketmq/client/Validators.java
new file mode 100644
index 0000000..e977d44
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/Validators.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+/**
+ * Common Validator
+ *
+ * @author manhong.yqd
+ */
+public class Validators {
+ public static final String VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$";
+ public static final Pattern PATTERN = Pattern.compile(VALID_PATTERN_STR);
+ public static final int CHARACTER_MAX_LENGTH = 255;
+
+ /**
+ * @param origin
+ * @param patternStr
+ *
+ * @return The resulting {@code String}
+ */
+ public static String getGroupWithRegularExpression(String origin, String patternStr) {
+ Pattern pattern = Pattern.compile(patternStr);
+ Matcher matcher = pattern.matcher(origin);
+ while (matcher.find()) {
+ return matcher.group(0);
+ }
+ return null;
+ }
+
+ /**
+ * Validate group
+ *
+ * @param group
+ *
+ * @throws MQClientException
+ */
+ public static void checkGroup(String group) throws MQClientException {
+ if (UtilAll.isBlank(group)) {
+ throw new MQClientException("the specified group is blank", null);
+ }
+ if (!regularExpressionMatcher(group, PATTERN)) {
+ throw new MQClientException(String.format(
+ "the specified group[%s] contains illegal characters, allowing only %s", group,
+ VALID_PATTERN_STR), null);
+ }
+ if (group.length() > CHARACTER_MAX_LENGTH) {
+ throw new MQClientException("the specified group is longer than group max length 255.", null);
+ }
+ }
+
+ /**
+ * @param origin
+ * @param pattern
+ *
+ * @return <tt>true</tt> if, and only if, the entire origin sequence
+ * matches this matcher's pattern
+ */
+ public static boolean regularExpressionMatcher(String origin, Pattern pattern) {
+ if (pattern == null) {
+ return true;
+ }
+ Matcher matcher = pattern.matcher(origin);
+ return matcher.matches();
+ }
+
+ /**
+ * Validate message
+ *
+ * @param msg
+ * @param defaultMQProducer
+ *
+ * @throws MQClientException
+ */
+ public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
+ throws MQClientException {
+ if (null == msg) {
+ throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
+ }
+ // topic
+ Validators.checkTopic(msg.getTopic());
+ // body
+ if (null == msg.getBody()) {
+ throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
+ }
+
+ if (0 == msg.getBody().length) {
+ throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
+ }
+
+ if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
+ throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
+ "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
+ }
+ }
+
+ /**
+ * Validate topic
+ *
+ * @param topic
+ *
+ * @throws MQClientException
+ */
+ public static void checkTopic(String topic) throws MQClientException {
+ if (UtilAll.isBlank(topic)) {
+ throw new MQClientException("the specified topic is blank", null);
+ }
+
+ if (!regularExpressionMatcher(topic, PATTERN)) {
+ throw new MQClientException(String.format(
+ "the specified topic[%s] contains illegal characters, allowing only %s", topic,
+ VALID_PATTERN_STR), null);
+ }
+
+ if (topic.length() > CHARACTER_MAX_LENGTH) {
+ throw new MQClientException("the specified topic is longer than topic max length 255.", null);
+ }
+
+ //whether the same with system reserved keyword
+ if (topic.equals(MixAll.DEFAULT_TOPIC)) {
+ throw new MQClientException(
+ String.format("the topic[%s] is conflict with default topic.", topic), null);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/admin/MQAdminExtInner.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/admin/MQAdminExtInner.java b/client/src/main/java/org/apache/rocketmq/client/admin/MQAdminExtInner.java
new file mode 100644
index 0000000..cfff17e
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/admin/MQAdminExtInner.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.admin;
+
+/**
+ * @author shijia.wxr
+ */
+public interface MQAdminExtInner {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java b/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java
new file mode 100644
index 0000000..6184379
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.common;
+
+public class ClientErrorCode {
+ public static final int CONNECT_BROKER_EXCEPTION = 10001;
+ public static final int ACCESS_BROKER_TIMEOUT = 10002;
+ public static final int BROKER_NOT_EXIST_EXCEPTION = 10003;
+ public static final int NO_NAME_SERVER_EXCEPTION = 10004;
+ public static final int NOT_FOUND_TOPIC_EXCEPTION = 10005;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java b/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java
new file mode 100644
index 0000000..360cfdf
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.common;
+
+import java.util.Random;
+
+public class ThreadLocalIndex {
+ private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
+ private final Random random = new Random();
+ public ThreadLocalIndex(int value) {
+
+ }
+
+ public int getAndIncrement() {
+ Integer index = this.threadLocalIndex.get();
+ if (null == index) {
+ index = Math.abs(random.nextInt());
+ if (index < 0) index = 0;
+ this.threadLocalIndex.set(index);
+ }
+
+ index = Math.abs(index + 1);
+ if (index < 0)
+ index = 0;
+
+ this.threadLocalIndex.set(index);
+ return index;
+ }
+
+ @Override
+ public String toString() {
+ return "ThreadLocalIndex{" +
+ "threadLocalIndex=" + threadLocalIndex.get() +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java b/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java
new file mode 100644
index 0000000..cb98b62
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.List;
+
+
+/**
+ * Strategy Algorithm for message allocating between consumers
+ *
+ * @author shijia.wxr
+ * @author vongosling
+ */
+public interface AllocateMessageQueueStrategy {
+
+ /**
+ * Allocating by consumer id
+ *
+ * @param consumerGroup
+ * current consumer group
+ * @param currentCID
+ * current consumer id
+ * @param mqAll
+ * message queue set in current topic
+ * @param cidAll
+ * consumer set in current consumer group
+ *
+ * @return The allocate result of given strategy
+ */
+ List<MessageQueue> allocate(
+ final String consumerGroup,
+ final String currentCID,
+ final List<MessageQueue> mqAll,
+ final List<String> cidAll
+ );
+
+
+ /**
+ * Algorithm name
+ *
+ * @return The strategy name
+ */
+ String getName();
+}