You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:14:30 UTC
[39/99] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18
Rename package name from com.alibaba to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java b/broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java
deleted file mode 100644
index 1c93b02..0000000
--- a/broker/src/test/java/com/alibaba/rocketmq/broker/topic/TopicConfigManagerTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * $Id: TopicConfigManagerTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
- */
-package com.alibaba.rocketmq.broker.topic;
-
-import com.alibaba.rocketmq.broker.BrokerTestHarness;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.TopicConfig;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-
-/**
- * @author zander
- */
-public class TopicConfigManagerTest extends BrokerTestHarness {
- @Test
- public void testFlushTopicConfig() throws Exception {
- TopicConfigManager topicConfigManager = new TopicConfigManager(brokerController);
-
- for (int i = 0; i < 10; i++) {
- String topic = "UNITTEST-" + i;
- TopicConfig topicConfig = topicConfigManager.createTopicInSendMessageMethod(topic, MixAll.DEFAULT_TOPIC, null, 4, 0);
- assertNotNull(topicConfig);
- }
- topicConfigManager.persist();
-
- topicConfigManager.getTopicConfigTable().clear();
-
- for (int i = 0; i < 10; i++) {
- String topic = "UNITTEST-" + i;
- TopicConfig topicConfig = topicConfigManager.selectTopicConfig(topic);
- assertNull(topicConfig);
- }
- topicConfigManager.load();
- for (int i = 0; i < 10; i++) {
- String topic = "UNITTEST-" + i;
- TopicConfig topicConfig = topicConfigManager.selectTopicConfig(topic);
- assertNotNull(topicConfig);
- assertEquals(topicConfig.getTopicSysFlag(), 0);
- assertEquals(topicConfig.getReadQueueNums(), 4);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
new file mode 100644
index 0000000..79f82a6
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker;
+
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author shtykh_roman
+ */
+public class BrokerControllerTest {
+ protected Logger logger = LoggerFactory.getLogger(BrokerControllerTest.class);
+
+ private static final int RESTART_NUM = 3;
+
+ /**
+ * Tests if the controller can be properly stopped and started.
+ *
+ * @throws Exception If fails.
+ */
+ @Test
+ public void testRestart() throws Exception {
+
+ for (int i = 0; i < RESTART_NUM; i++) {
+ BrokerController brokerController = new BrokerController(//
+ new BrokerConfig(), //
+ new NettyServerConfig(), //
+ new NettyClientConfig(), //
+ new MessageStoreConfig());
+ boolean initResult = brokerController.initialize();
+ Assert.assertTrue(initResult);
+ logger.info("Broker is initialized " + initResult);
+ brokerController.start();
+ logger.info("Broker is started");
+
+ brokerController.shutdown();
+ logger.info("Broker is stopped");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/test/java/org/apache/rocketmq/broker/BrokerTestHarness.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerTestHarness.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerTestHarness.java
new file mode 100644
index 0000000..4b4fd95
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerTestHarness.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * $Id: SendMessageTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package org.apache.rocketmq.broker;
+
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Random;
+
+/**
+ * @author zander
+ */
+public class BrokerTestHarness {
+
+ protected BrokerController brokerController = null;
+
+ protected Random random = new Random();
+ public final String BROKER_NAME = "TestBrokerName";
+ protected String brokerAddr = "";
+ protected Logger logger = LoggerFactory.getLogger(BrokerTestHarness.class);
+ protected BrokerConfig brokerConfig = new BrokerConfig();
+ protected NettyServerConfig nettyServerConfig = new NettyServerConfig();
+ protected NettyClientConfig nettyClientConfig = new NettyClientConfig();
+ protected MessageStoreConfig storeConfig = new MessageStoreConfig();
+
+ @Before
+ public void startup() throws Exception {
+ brokerConfig.setBrokerName(BROKER_NAME);
+ brokerConfig.setBrokerIP1("127.0.0.1");
+ storeConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "unitteststore");
+ storeConfig.setStorePathCommitLog(System.getProperty("user.home") + File.separator + "unitteststore" + File.separator + "commitlog");
+ nettyServerConfig.setListenPort(10000 + random.nextInt(1000));
+ brokerAddr = brokerConfig.getBrokerIP1() + ":" + nettyServerConfig.getListenPort();
+ brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig);
+ boolean initResult = brokerController.initialize();
+ Assert.assertTrue(initResult);
+ logger.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr());
+ brokerController.start();
+ }
+
+ @After
+ public void shutdown() throws Exception {
+ if (brokerController != null) {
+ brokerController.shutdown();
+ }
+ //maybe need to clean the file store. But we do not suggest deleting anything.
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/test/java/org/apache/rocketmq/broker/api/SendMessageTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/api/SendMessageTest.java b/broker/src/test/java/org/apache/rocketmq/broker/api/SendMessageTest.java
new file mode 100644
index 0000000..9988a7c
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/api/SendMessageTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * $Id: SendMessageTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package org.apache.rocketmq.broker.api;
+
+import org.apache.rocketmq.broker.BrokerTestHarness;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.hook.SendMessageContext;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+/**
+ * @author zander
+ */
+public class SendMessageTest extends BrokerTestHarness{
+
+ MQClientAPIImpl client = new MQClientAPIImpl(new NettyClientConfig(), null, null, new ClientConfig());
+ String topic = "UnitTestTopic";
+
+ @Before
+ @Override
+ public void startup() throws Exception {
+ super.startup();
+ client.start();
+
+ }
+
+ @After
+ @Override
+ public void shutdown() throws Exception {
+ client.shutdown();
+ super.shutdown();
+ }
+
+ @Test
+ public void testSendSingle() throws Exception{
+ Message msg = new Message(topic, "TAG1 TAG2", "100200300", "body".getBytes());
+ SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
+ requestHeader.setProducerGroup("abc");
+ requestHeader.setTopic(msg.getTopic());
+ requestHeader.setDefaultTopic(MixAll.DEFAULT_TOPIC);
+ requestHeader.setDefaultTopicQueueNums(4);
+ requestHeader.setQueueId(0);
+ requestHeader.setSysFlag(0);
+ requestHeader.setBornTimestamp(System.currentTimeMillis());
+ requestHeader.setFlag(msg.getFlag());
+ requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
+
+ SendResult result = client.sendMessage(brokerAddr, BROKER_NAME, msg, requestHeader, 1000 * 5,
+ CommunicationMode.SYNC, new SendMessageContext(), null);
+ assertEquals(result.getSendStatus(), SendStatus.SEND_OK);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
new file mode 100644
index 0000000..cdbddf9
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * $Id: ConsumerOffsetManagerTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package org.apache.rocketmq.broker.offset;
+
+import org.apache.rocketmq.broker.BrokerTestHarness;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * @author zander
+ */
+public class ConsumerOffsetManagerTest extends BrokerTestHarness {
+
+ @Test
+ public void testFlushConsumerOffset() throws Exception {
+ ConsumerOffsetManager consumerOffsetManager = new ConsumerOffsetManager(brokerController);
+ for (int i = 0; i < 10; i++) {
+ String group = "UNIT_TEST_GROUP_" + i;
+ for (int id = 0; id < 10; id++) {
+ consumerOffsetManager.commitOffset(null, group, "TOPIC_A", id, id + 100);
+ consumerOffsetManager.commitOffset(null, group, "TOPIC_B", id, id + 100);
+ consumerOffsetManager.commitOffset(null, group, "TOPIC_C", id, id + 100);
+ }
+ }
+ consumerOffsetManager.persist();
+ consumerOffsetManager.getOffsetTable().clear();
+ for (int i = 0; i < 10; i++) {
+ String group = "UNIT_TEST_GROUP_" + i;
+ for (int id = 0; id < 10; id++) {
+ assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_A", id), -1);
+ assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), -1);
+ assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), -1);
+ }
+ }
+ consumerOffsetManager.load();
+ for (int i = 0; i < 10; i++) {
+ String group = "UNIT_TEST_GROUP_" + i;
+ for (int id = 0; id < 10; id++) {
+ assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_A", id), id + 100);
+ assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), id + 100);
+ assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), id + 100);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
new file mode 100644
index 0000000..1de17e6
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * $Id: TopicConfigManagerTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package org.apache.rocketmq.broker.topic;
+
+import org.apache.rocketmq.broker.BrokerTestHarness;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+/**
+ * @author zander
+ */
+public class TopicConfigManagerTest extends BrokerTestHarness {
+ @Test
+ public void testFlushTopicConfig() throws Exception {
+ TopicConfigManager topicConfigManager = new TopicConfigManager(brokerController);
+
+ for (int i = 0; i < 10; i++) {
+ String topic = "UNITTEST-" + i;
+ TopicConfig topicConfig = topicConfigManager.createTopicInSendMessageMethod(topic, MixAll.DEFAULT_TOPIC, null, 4, 0);
+ assertNotNull(topicConfig);
+ }
+ topicConfigManager.persist();
+
+ topicConfigManager.getTopicConfigTable().clear();
+
+ for (int i = 0; i < 10; i++) {
+ String topic = "UNITTEST-" + i;
+ TopicConfig topicConfig = topicConfigManager.selectTopicConfig(topic);
+ assertNull(topicConfig);
+ }
+ topicConfigManager.load();
+ for (int i = 0; i < 10; i++) {
+ String topic = "UNITTEST-" + i;
+ TopicConfig topicConfig = topicConfigManager.selectTopicConfig(topic);
+ assertNotNull(topicConfig);
+ assertEquals(topicConfig.getTopicSysFlag(), 0);
+ assertEquals(topicConfig.getReadQueueNums(), 4);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/pom.xml
----------------------------------------------------------------------
diff --git a/client/pom.xml b/client/pom.xml
index 63a6114..86d38cf 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -18,7 +18,7 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <groupId>com.alibaba.rocketmq</groupId>
+ <groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.0.0-SNAPSHOT</version>
</parent>
@@ -56,19 +56,19 @@
<includes>
<include>com.alibaba:fastjson</include>
<include>io.netty:netty-all</include>
- <include>com.alibaba.rocketmq:rocketmq-client</include>
- <include>com.alibaba.rocketmq:rocketmq-common</include>
- <include>com.alibaba.rocketmq:rocketmq-remoting</include>
+ <include>org.apache.rocketmq:rocketmq-client</include>
+ <include>org.apache.rocketmq:rocketmq-common</include>
+ <include>org.apache.rocketmq:rocketmq-remoting</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>io.netty</pattern>
- <shadedPattern>com.alibaba.rocketmq.shade.io.netty</shadedPattern>
+ <shadedPattern>org.apache.rocketmq.shade.io.netty</shadedPattern>
</relocation>
<relocation>
<pattern>com.alibaba.fastjson</pattern>
- <shadedPattern>com.alibaba.rocketmq.shade.com.alibaba.fastjson</shadedPattern>
+ <shadedPattern>org.apache.rocketmq.shade.com.alibaba.fastjson</shadedPattern>
</relocation>
</relocations>
</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/ClientConfig.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/ClientConfig.java b/client/src/main/java/com/alibaba/rocketmq/client/ClientConfig.java
deleted file mode 100644
index 4d80564..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/ClientConfig.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client;
-
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.UtilAll;
-import com.alibaba.rocketmq.remoting.common.RemotingUtil;
-
-
-/**
- * Client Common configuration
- *
- * @author shijia.wxr
- * @author vongosling
- */
-public class ClientConfig {
- public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel";
- private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
- private String clientIP = RemotingUtil.getLocalAddress();
- private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
- private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
- /**
- * Pulling topic information interval from the named server
- */
- private int pollNameServerInteval = 1000 * 30;
- /**
- * Heartbeat interval in microseconds with message broker
- */
- private int heartbeatBrokerInterval = 1000 * 30;
- /**
- * Offset persistent interval for consumer
- */
- private int persistConsumerOffsetInterval = 1000 * 5;
- private boolean unitMode = false;
- private String unitName;
- private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
-
- public String buildMQClientId() {
- StringBuilder sb = new StringBuilder();
- sb.append(this.getClientIP());
-
- sb.append("@");
- sb.append(this.getInstanceName());
- if (!UtilAll.isBlank(this.unitName)) {
- sb.append("@");
- sb.append(this.unitName);
- }
-
- return sb.toString();
- }
-
- public String getClientIP() {
- return clientIP;
- }
-
- public void setClientIP(String clientIP) {
- this.clientIP = clientIP;
- }
-
- public String getInstanceName() {
- return instanceName;
- }
-
- public void setInstanceName(String instanceName) {
- this.instanceName = instanceName;
- }
-
- public void changeInstanceNameToPID() {
- if (this.instanceName.equals("DEFAULT")) {
- this.instanceName = String.valueOf(UtilAll.getPid());
- }
- }
-
- public void resetClientConfig(final ClientConfig cc) {
- this.namesrvAddr = cc.namesrvAddr;
- this.clientIP = cc.clientIP;
- this.instanceName = cc.instanceName;
- this.clientCallbackExecutorThreads = cc.clientCallbackExecutorThreads;
- this.pollNameServerInteval = cc.pollNameServerInteval;
- this.heartbeatBrokerInterval = cc.heartbeatBrokerInterval;
- this.persistConsumerOffsetInterval = cc.persistConsumerOffsetInterval;
- this.unitMode = cc.unitMode;
- this.unitName = cc.unitName;
- this.vipChannelEnabled = cc.vipChannelEnabled;
- }
-
- public ClientConfig cloneClientConfig() {
- ClientConfig cc = new ClientConfig();
- cc.namesrvAddr = namesrvAddr;
- cc.clientIP = clientIP;
- cc.instanceName = instanceName;
- cc.clientCallbackExecutorThreads = clientCallbackExecutorThreads;
- cc.pollNameServerInteval = pollNameServerInteval;
- cc.heartbeatBrokerInterval = heartbeatBrokerInterval;
- cc.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
- cc.unitMode = unitMode;
- cc.unitName = unitName;
- cc.vipChannelEnabled = vipChannelEnabled;
- return cc;
- }
-
- public String getNamesrvAddr() {
- return namesrvAddr;
- }
-
- public void setNamesrvAddr(String namesrvAddr) {
- this.namesrvAddr = namesrvAddr;
- }
-
- public int getClientCallbackExecutorThreads() {
- return clientCallbackExecutorThreads;
- }
-
-
- public void setClientCallbackExecutorThreads(int clientCallbackExecutorThreads) {
- this.clientCallbackExecutorThreads = clientCallbackExecutorThreads;
- }
-
-
- public int getPollNameServerInteval() {
- return pollNameServerInteval;
- }
-
-
- public void setPollNameServerInteval(int pollNameServerInteval) {
- this.pollNameServerInteval = pollNameServerInteval;
- }
-
-
- public int getHeartbeatBrokerInterval() {
- return heartbeatBrokerInterval;
- }
-
-
- public void setHeartbeatBrokerInterval(int heartbeatBrokerInterval) {
- this.heartbeatBrokerInterval = heartbeatBrokerInterval;
- }
-
-
- public int getPersistConsumerOffsetInterval() {
- return persistConsumerOffsetInterval;
- }
-
-
- public void setPersistConsumerOffsetInterval(int persistConsumerOffsetInterval) {
- this.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
- }
-
-
- public String getUnitName() {
- return unitName;
- }
-
-
- public void setUnitName(String unitName) {
- this.unitName = unitName;
- }
-
-
- public boolean isUnitMode() {
- return unitMode;
- }
-
-
- public void setUnitMode(boolean unitMode) {
- this.unitMode = unitMode;
- }
-
-
- public boolean isVipChannelEnabled() {
- return vipChannelEnabled;
- }
-
-
- public void setVipChannelEnabled(final boolean vipChannelEnabled) {
- this.vipChannelEnabled = vipChannelEnabled;
- }
-
-
- @Override
- public String toString() {
- return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
- + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInteval=" + pollNameServerInteval
- + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval="
- + persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
- + vipChannelEnabled + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/MQAdmin.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/MQAdmin.java b/client/src/main/java/com/alibaba/rocketmq/client/MQAdmin.java
deleted file mode 100644
index 4e202e9..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/MQAdmin.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client;
-
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.remoting.exception.RemotingException;
-
-
-/**
- * Base interface for MQ management
- *
- * @author shijia.wxr
- */
-public interface MQAdmin {
- /**
- * Creates an topic
- *
- * @param key
- * accesskey
- * @param newTopic
- * topic name
- * @param queueNum
- * topic's queue number
- *
- * @throws MQClientException
- */
- void createTopic(final String key, final String newTopic, final int queueNum)
- throws MQClientException;
-
-
- /**
- * Creates an topic
- *
- * @param key
- * accesskey
- * @param newTopic
- * topic name
- * @param queueNum
- * topic's queue number
- * @param topicSysFlag
- * topic system flag
- *
- * @throws MQClientException
- */
- void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
- throws MQClientException;
-
-
- /**
- * Gets the message queue offset according to some time in milliseconds<br>
- * be cautious to call because of more IO overhead
- *
- * @param mq
- * Instance of MessageQueue
- * @param timestamp
- * from when in milliseconds.
- *
- * @return offset
- *
- * @throws MQClientException
- */
- long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException;
-
-
- /**
- * Gets the max offset
- *
- * @param mq
- * Instance of MessageQueue
- *
- * @return the max offset
- *
- * @throws MQClientException
- */
- long maxOffset(final MessageQueue mq) throws MQClientException;
-
-
- /**
- * Gets the minimum offset
- *
- * @param mq
- * Instance of MessageQueue
- *
- * @return the minimum offset
- *
- * @throws MQClientException
- */
- long minOffset(final MessageQueue mq) throws MQClientException;
-
-
- /**
- * Gets the earliest stored message time
- *
- * @param mq
- * Instance of MessageQueue
- *
- * @return the time in microseconds
- *
- * @throws MQClientException
- */
- long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException;
-
-
- /**
- * Query message according tto message id
- *
- * @param offsetMsgId
- * message id
- *
- * @return message
- *
- * @throws InterruptedException
- * @throws MQBrokerException
- * @throws RemotingException
- * @throws MQClientException
- */
- MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException,
- InterruptedException, MQClientException;
-
-
- /**
- * Query messages
- *
- * @param topic
- * message topic
- * @param key
- * message key index word
- * @param maxNum
- * max message number
- * @param begin
- * from when
- * @param end
- * to when
- *
- * @return Instance of QueryResult
- *
- * @throws MQClientException
- * @throws InterruptedException
- */
- QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
- final long end) throws MQClientException, InterruptedException;
-
- /**
-
- * @param topic
- * @param msgId
- * @return The {@code MessageExt} of given msgId
- * @throws RemotingException
- * @throws MQBrokerException
- * @throws InterruptedException
- * @throws MQClientException
- */
- MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
-
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/MQHelper.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/MQHelper.java b/client/src/main/java/com/alibaba/rocketmq/client/MQHelper.java
deleted file mode 100644
index 5934b49..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/MQHelper.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client;
-
-import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
-import org.slf4j.Logger;
-
-import java.util.Set;
-import java.util.TreeSet;
-
-
-/**
- * @author shijia.wxr
- */
-public class MQHelper {
- public static void resetOffsetByTimestamp(
- final MessageModel messageModel,
- final String consumerGroup,
- final String topic,
- final long timestamp) throws Exception {
- resetOffsetByTimestamp(messageModel, "DEFAULT", consumerGroup, topic, timestamp);
- }
-
- /**
- * Reset consumer topic offset according to time
- *
- * @param messageModel
- * which model
- * @param instanceName
- * which instance
- * @param consumerGroup
- * consumer group
- * @param topic
- * topic
- * @param timestamp
- * time
- *
- * @throws Exception
- */
- public static void resetOffsetByTimestamp(
- final MessageModel messageModel,
- final String instanceName,
- final String consumerGroup,
- final String topic,
- final long timestamp) throws Exception {
- final Logger log = ClientLogger.getLog();
-
- DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(consumerGroup);
- consumer.setInstanceName(instanceName);
- consumer.setMessageModel(messageModel);
- consumer.start();
-
- Set<MessageQueue> mqs = null;
- try {
- mqs = consumer.fetchSubscribeMessageQueues(topic);
- if (mqs != null && !mqs.isEmpty()) {
- TreeSet<MessageQueue> mqsNew = new TreeSet<MessageQueue>(mqs);
- for (MessageQueue mq : mqsNew) {
- long offset = consumer.searchOffset(mq, timestamp);
- if (offset >= 0) {
- consumer.updateConsumeOffset(mq, offset);
- log.info("resetOffsetByTimestamp updateConsumeOffset success, {} {} {}",
- consumerGroup, offset, mq);
- }
- }
- }
- } catch (Exception e) {
- log.warn("resetOffsetByTimestamp Exception", e);
- throw e;
- } finally {
- if (mqs != null) {
- consumer.getDefaultMQPullConsumerImpl().getOffsetStore().persistAll(mqs);
- }
- consumer.shutdown();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/QueryResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/QueryResult.java b/client/src/main/java/com/alibaba/rocketmq/client/QueryResult.java
deleted file mode 100644
index 43c8106..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/QueryResult.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client;
-
-import com.alibaba.rocketmq.common.message.MessageExt;
-
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- */
-public class QueryResult {
- private final long indexLastUpdateTimestamp;
- private final List<MessageExt> messageList;
-
-
- public QueryResult(long indexLastUpdateTimestamp, List<MessageExt> messageList) {
- this.indexLastUpdateTimestamp = indexLastUpdateTimestamp;
- this.messageList = messageList;
- }
-
-
- public long getIndexLastUpdateTimestamp() {
- return indexLastUpdateTimestamp;
- }
-
-
- public List<MessageExt> getMessageList() {
- return messageList;
- }
-
-
- @Override
- public String toString() {
- return "QueryResult [indexLastUpdateTimestamp=" + indexLastUpdateTimestamp + ", messageList="
- + messageList + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/Validators.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/Validators.java b/client/src/main/java/com/alibaba/rocketmq/client/Validators.java
deleted file mode 100644
index 203aae0..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/Validators.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.client;
-
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.UtilAll;
-import com.alibaba.rocketmq.common.message.Message;
-import com.alibaba.rocketmq.common.protocol.ResponseCode;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-
-/**
- * Common Validator
- *
- * @author manhong.yqd
- */
-public class Validators {
- public static final String VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$";
- public static final Pattern PATTERN = Pattern.compile(VALID_PATTERN_STR);
- public static final int CHARACTER_MAX_LENGTH = 255;
-
- /**
- * @param origin
- * @param patternStr
- *
- * @return The resulting {@code String}
- */
- public static String getGroupWithRegularExpression(String origin, String patternStr) {
- Pattern pattern = Pattern.compile(patternStr);
- Matcher matcher = pattern.matcher(origin);
- while (matcher.find()) {
- return matcher.group(0);
- }
- return null;
- }
-
- /**
- * Validate group
- *
- * @param group
- *
- * @throws com.alibaba.rocketmq.client.exception.MQClientException
- */
- public static void checkGroup(String group) throws MQClientException {
- if (UtilAll.isBlank(group)) {
- throw new MQClientException("the specified group is blank", null);
- }
- if (!regularExpressionMatcher(group, PATTERN)) {
- throw new MQClientException(String.format(
- "the specified group[%s] contains illegal characters, allowing only %s", group,
- VALID_PATTERN_STR), null);
- }
- if (group.length() > CHARACTER_MAX_LENGTH) {
- throw new MQClientException("the specified group is longer than group max length 255.", null);
- }
- }
-
- /**
- * @param origin
- * @param pattern
- *
- * @return <tt>true</tt> if, and only if, the entire origin sequence
- * matches this matcher's pattern
- */
- public static boolean regularExpressionMatcher(String origin, Pattern pattern) {
- if (pattern == null) {
- return true;
- }
- Matcher matcher = pattern.matcher(origin);
- return matcher.matches();
- }
-
- /**
- * Validate message
- *
- * @param msg
- * @param defaultMQProducer
- *
- * @throws com.alibaba.rocketmq.client.exception.MQClientException
- */
- public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
- throws MQClientException {
- if (null == msg) {
- throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
- }
- // topic
- Validators.checkTopic(msg.getTopic());
- // body
- if (null == msg.getBody()) {
- throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
- }
-
- if (0 == msg.getBody().length) {
- throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
- }
-
- if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
- throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
- "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
- }
- }
-
- /**
- * Validate topic
- *
- * @param topic
- *
- * @throws com.alibaba.rocketmq.client.exception.MQClientException
- */
- public static void checkTopic(String topic) throws MQClientException {
- if (UtilAll.isBlank(topic)) {
- throw new MQClientException("the specified topic is blank", null);
- }
-
- if (!regularExpressionMatcher(topic, PATTERN)) {
- throw new MQClientException(String.format(
- "the specified topic[%s] contains illegal characters, allowing only %s", topic,
- VALID_PATTERN_STR), null);
- }
-
- if (topic.length() > CHARACTER_MAX_LENGTH) {
- throw new MQClientException("the specified topic is longer than topic max length 255.", null);
- }
-
- //whether the same with system reserved keyword
- if (topic.equals(MixAll.DEFAULT_TOPIC)) {
- throw new MQClientException(
- String.format("the topic[%s] is conflict with default topic.", topic), null);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/admin/MQAdminExtInner.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/admin/MQAdminExtInner.java b/client/src/main/java/com/alibaba/rocketmq/client/admin/MQAdminExtInner.java
deleted file mode 100644
index 071a872..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/admin/MQAdminExtInner.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.admin;
-
-/**
- * @author shijia.wxr
- */
-public interface MQAdminExtInner {
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/common/ClientErrorCode.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/common/ClientErrorCode.java b/client/src/main/java/com/alibaba/rocketmq/client/common/ClientErrorCode.java
deleted file mode 100644
index 88d0eea..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/common/ClientErrorCode.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.client.common;
-
-public class ClientErrorCode {
- public static final int CONNECT_BROKER_EXCEPTION = 10001;
- public static final int ACCESS_BROKER_TIMEOUT = 10002;
- public static final int BROKER_NOT_EXIST_EXCEPTION = 10003;
- public static final int NO_NAME_SERVER_EXCEPTION = 10004;
- public static final int NOT_FOUND_TOPIC_EXCEPTION = 10005;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/common/ThreadLocalIndex.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/common/ThreadLocalIndex.java b/client/src/main/java/com/alibaba/rocketmq/client/common/ThreadLocalIndex.java
deleted file mode 100644
index 63fda5d..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/common/ThreadLocalIndex.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.client.common;
-
-import java.util.Random;
-
-public class ThreadLocalIndex {
- private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
- private final Random random = new Random();
- public ThreadLocalIndex(int value) {
-
- }
-
- public int getAndIncrement() {
- Integer index = this.threadLocalIndex.get();
- if (null == index) {
- index = Math.abs(random.nextInt());
- if (index < 0) index = 0;
- this.threadLocalIndex.set(index);
- }
-
- index = Math.abs(index + 1);
- if (index < 0)
- index = 0;
-
- this.threadLocalIndex.set(index);
- return index;
- }
-
- @Override
- public String toString() {
- return "ThreadLocalIndex{" +
- "threadLocalIndex=" + threadLocalIndex.get() +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/AllocateMessageQueueStrategy.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/AllocateMessageQueueStrategy.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/AllocateMessageQueueStrategy.java
deleted file mode 100644
index 4d70167..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/AllocateMessageQueueStrategy.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer;
-
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-import java.util.List;
-
-
-/**
- * Strategy Algorithm for message allocating between consumers
- *
- * @author shijia.wxr
- * @author vongosling
- */
-public interface AllocateMessageQueueStrategy {
-
- /**
- * Allocating by consumer id
- *
- * @param consumerGroup
- * current consumer group
- * @param currentCID
- * current consumer id
- * @param mqAll
- * message queue set in current topic
- * @param cidAll
- * consumer set in current consumer group
- *
- * @return The allocate result of given strategy
- */
- List<MessageQueue> allocate(
- final String consumerGroup,
- final String currentCID,
- final List<MessageQueue> mqAll,
- final List<String> cidAll
- );
-
-
- /**
- * Algorithm name
- *
- * @return The strategy name
- */
- String getName();
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPullConsumer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPullConsumer.java
deleted file mode 100644
index 96040ae..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++ /dev/null
@@ -1,381 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer;
-
-import com.alibaba.rocketmq.client.ClientConfig;
-import com.alibaba.rocketmq.client.QueryResult;
-import com.alibaba.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
-import com.alibaba.rocketmq.client.consumer.store.OffsetStore;
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.message.MessageDecoder;
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
-import com.alibaba.rocketmq.remoting.RPCHook;
-import com.alibaba.rocketmq.remoting.exception.RemotingException;
-
-import java.util.HashSet;
-import java.util.Set;
-
-
-/**
- * Default pulling consumer
- *
- * @author shijia.wxr
- */
-public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsumer {
- protected final transient DefaultMQPullConsumerImpl defaultMQPullConsumerImpl;
-
- /**
- * Do the same thing for the same Group, the application must be set,and
- * guarantee Globally unique
- */
- private String consumerGroup;
- /**
- * Long polling mode, the Consumer connection max suspend time, it is not
- * recommended to modify
- */
- private long brokerSuspendMaxTimeMillis = 1000 * 20;
- /**
- * Long polling mode, the Consumer connection timeout(must greater than
- * brokerSuspendMaxTimeMillis), it is not recommended to modify
- */
- private long consumerTimeoutMillisWhenSuspend = 1000 * 30;
- /**
- * The socket timeout in milliseconds
- */
- private long consumerPullTimeoutMillis = 1000 * 10;
- /**
- * Consumption pattern,default is clustering
- */
- private MessageModel messageModel = MessageModel.CLUSTERING;
- /**
- * Message queue listener
- */
- private MessageQueueListener messageQueueListener;
- /**
- * Offset Storage
- */
- private OffsetStore offsetStore;
- /**
- * Topic set you want to register
- */
- private Set<String> registerTopics = new HashSet<String>();
- /**
- * Queue allocation algorithm
- */
- private AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
- /**
- * Whether the unit of subscription group
- */
- private boolean unitMode = false;
-
- private int maxReconsumeTimes = 16;
-
-
- public DefaultMQPullConsumer() {
- this(MixAll.DEFAULT_CONSUMER_GROUP, null);
- }
-
-
- public DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook) {
- this.consumerGroup = consumerGroup;
- defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, rpcHook);
- }
-
-
- public DefaultMQPullConsumer(final String consumerGroup) {
- this(consumerGroup, null);
- }
-
-
- public DefaultMQPullConsumer(RPCHook rpcHook) {
- this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook);
- }
-
- @Override
- public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
- createTopic(key, newTopic, queueNum, 0);
- }
-
-
- @Override
- public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
- this.defaultMQPullConsumerImpl.createTopic(key, newTopic, queueNum, topicSysFlag);
- }
-
-
- @Override
- public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
- return this.defaultMQPullConsumerImpl.searchOffset(mq, timestamp);
- }
-
-
- @Override
- public long maxOffset(MessageQueue mq) throws MQClientException {
- return this.defaultMQPullConsumerImpl.maxOffset(mq);
- }
-
-
- @Override
- public long minOffset(MessageQueue mq) throws MQClientException {
- return this.defaultMQPullConsumerImpl.minOffset(mq);
- }
-
-
- @Override
- public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
- return this.defaultMQPullConsumerImpl.earliestMsgStoreTime(mq);
- }
-
-
- @Override
- public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException,
- InterruptedException, MQClientException {
- return this.defaultMQPullConsumerImpl.viewMessage(offsetMsgId);
- }
-
-
- @Override
- public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
- throws MQClientException, InterruptedException {
- return this.defaultMQPullConsumerImpl.queryMessage(topic, key, maxNum, begin, end);
- }
-
-
- public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
- return allocateMessageQueueStrategy;
- }
-
-
- public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
- this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
- }
-
-
- public long getBrokerSuspendMaxTimeMillis() {
- return brokerSuspendMaxTimeMillis;
- }
-
-
- public void setBrokerSuspendMaxTimeMillis(long brokerSuspendMaxTimeMillis) {
- this.brokerSuspendMaxTimeMillis = brokerSuspendMaxTimeMillis;
- }
-
-
- public String getConsumerGroup() {
- return consumerGroup;
- }
-
-
- public void setConsumerGroup(String consumerGroup) {
- this.consumerGroup = consumerGroup;
- }
-
-
- public long getConsumerPullTimeoutMillis() {
- return consumerPullTimeoutMillis;
- }
-
-
- public void setConsumerPullTimeoutMillis(long consumerPullTimeoutMillis) {
- this.consumerPullTimeoutMillis = consumerPullTimeoutMillis;
- }
-
-
- public long getConsumerTimeoutMillisWhenSuspend() {
- return consumerTimeoutMillisWhenSuspend;
- }
-
-
- public void setConsumerTimeoutMillisWhenSuspend(long consumerTimeoutMillisWhenSuspend) {
- this.consumerTimeoutMillisWhenSuspend = consumerTimeoutMillisWhenSuspend;
- }
-
-
- public MessageModel getMessageModel() {
- return messageModel;
- }
-
-
- public void setMessageModel(MessageModel messageModel) {
- this.messageModel = messageModel;
- }
-
-
- public MessageQueueListener getMessageQueueListener() {
- return messageQueueListener;
- }
-
-
- public void setMessageQueueListener(MessageQueueListener messageQueueListener) {
- this.messageQueueListener = messageQueueListener;
- }
-
-
- public Set<String> getRegisterTopics() {
- return registerTopics;
- }
-
-
- public void setRegisterTopics(Set<String> registerTopics) {
- this.registerTopics = registerTopics;
- }
-
-
- @Override
- public void sendMessageBack(MessageExt msg, int delayLevel)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, null);
- }
-
-
- @Override
- public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, brokerName);
- }
-
- @Override
- public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
- return this.defaultMQPullConsumerImpl.fetchSubscribeMessageQueues(topic);
- }
-
- @Override
- public void start() throws MQClientException {
- this.defaultMQPullConsumerImpl.start();
- }
-
- @Override
- public void shutdown() {
- this.defaultMQPullConsumerImpl.shutdown();
- }
-
- @Override
- public void registerMessageQueueListener(String topic, MessageQueueListener listener) {
- synchronized (this.registerTopics) {
- this.registerTopics.add(topic);
- if (listener != null) {
- this.messageQueueListener = listener;
- }
- }
- }
-
- @Override
- public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums);
- }
-
- @Override
- public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, timeout);
- }
-
- @Override
- public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback)
- throws MQClientException, RemotingException, InterruptedException {
- this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback);
- }
-
- @Override
- public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback, long timeout)
- throws MQClientException, RemotingException, InterruptedException {
- this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback, timeout);
- }
-
- @Override
- public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- return this.defaultMQPullConsumerImpl.pullBlockIfNotFound(mq, subExpression, offset, maxNums);
- }
-
- @Override
- public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback)
- throws MQClientException, RemotingException, InterruptedException {
- this.defaultMQPullConsumerImpl.pullBlockIfNotFound(mq, subExpression, offset, maxNums, pullCallback);
- }
-
- @Override
- public void updateConsumeOffset(MessageQueue mq, long offset) throws MQClientException {
- this.defaultMQPullConsumerImpl.updateConsumeOffset(mq, offset);
- }
-
- @Override
- public long fetchConsumeOffset(MessageQueue mq, boolean fromStore) throws MQClientException {
- return this.defaultMQPullConsumerImpl.fetchConsumeOffset(mq, fromStore);
- }
-
- @Override
- public Set<MessageQueue> fetchMessageQueuesInBalance(String topic) throws MQClientException {
- return this.defaultMQPullConsumerImpl.fetchMessageQueuesInBalance(topic);
- }
-
- @Override
- public MessageExt viewMessage(String topic, String uniqKey) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- try {
- MessageDecoder.decodeMessageId(uniqKey);
- return this.viewMessage(uniqKey);
- } catch (Exception e) {
- }
- return this.defaultMQPullConsumerImpl.queryMessageByUniqKey(topic, uniqKey);
- }
-
- @Override
- public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, brokerName, consumerGroup);
- }
-
- public OffsetStore getOffsetStore() {
- return offsetStore;
- }
-
-
- public void setOffsetStore(OffsetStore offsetStore) {
- this.offsetStore = offsetStore;
- }
-
-
- public DefaultMQPullConsumerImpl getDefaultMQPullConsumerImpl() {
- return defaultMQPullConsumerImpl;
- }
-
-
- public boolean isUnitMode() {
- return unitMode;
- }
-
-
- public void setUnitMode(boolean isUnitMode) {
- this.unitMode = isUnitMode;
- }
-
-
- public int getMaxReconsumeTimes() {
- return maxReconsumeTimes;
- }
-
-
- public void setMaxReconsumeTimes(final int maxReconsumeTimes) {
- this.maxReconsumeTimes = maxReconsumeTimes;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPushConsumer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPushConsumer.java
deleted file mode 100644
index f37e982..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ /dev/null
@@ -1,519 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer;
-
-import com.alibaba.rocketmq.client.ClientConfig;
-import com.alibaba.rocketmq.client.QueryResult;
-import com.alibaba.rocketmq.client.consumer.listener.MessageListener;
-import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
-import com.alibaba.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
-import com.alibaba.rocketmq.client.consumer.store.OffsetStore;
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.UtilAll;
-import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
-import com.alibaba.rocketmq.common.message.MessageDecoder;
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
-import com.alibaba.rocketmq.remoting.RPCHook;
-import com.alibaba.rocketmq.remoting.exception.RemotingException;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-
-/**
- * Wrapped push consumer.in fact,it works as remarkable as the pull consumer
- *
- * @author shijia.wxr
- */
-public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
- protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
- /**
- * Do the same thing for the same Group, the application must be set,and
- * guarantee Globally unique
- */
- private String consumerGroup;
- /**
- * Consumption pattern,default is clustering
- */
- private MessageModel messageModel = MessageModel.CLUSTERING;
- /**
- * Consumption offset
- */
- private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
- /**
- * Backtracking consumption time with second precision.time format is
- * 20131223171201<br>
- * Implying Seventeen twelve and 01 seconds on December 23, 2013 year<br>
- * Default backtracking consumption time Half an hour ago
- */
- private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
- /**
- * Queue allocation algorithm
- */
- private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
-
- /**
- * Subscription relationship
- */
- private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();
- /**
- * Message listener
- */
- private MessageListener messageListener;
- /**
- * Offset Storage
- */
- private OffsetStore offsetStore;
- /**
- * Minimum consumer thread number
- */
- private int consumeThreadMin = 20;
- /**
- * Max consumer thread number
- */
- private int consumeThreadMax = 64;
-
- /**
- * Threshold for dynamic adjustment of the number of thread pool
- */
- private long adjustThreadPoolNumsThreshold = 100000;
-
- /**
- * Concurrently max span offset.it has no effect on sequential consumption
- */
- private int consumeConcurrentlyMaxSpan = 2000;
- /**
- * Flow control threshold
- */
- private int pullThresholdForQueue = 1000;
- /**
- * Message pull Interval
- */
- private long pullInterval = 0;
- /**
- * Batch consumption size
- */
- private int consumeMessageBatchMaxSize = 1;
- /**
- * Batch pull size
- */
- private int pullBatchSize = 32;
-
- /**
- * Whether update subscription relationship when every pull
- */
- private boolean postSubscriptionWhenPull = false;
-
- /**
- * Whether the unit of subscription group
- */
- private boolean unitMode = false;
-
- private int maxReconsumeTimes = -1;
- private long suspendCurrentQueueTimeMillis = 1000;
- private long consumeTimeout = 15;
-
-
- public DefaultMQPushConsumer() {
- this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely());
- }
-
-
- public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
- this.consumerGroup = consumerGroup;
- this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
- defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
- }
-
-
- public DefaultMQPushConsumer(RPCHook rpcHook) {
- this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely());
- }
-
-
- public DefaultMQPushConsumer(final String consumerGroup) {
- this(consumerGroup, null, new AllocateMessageQueueAveragely());
- }
-
- @Override
- public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
- createTopic(key, newTopic, queueNum, 0);
- }
-
-
- @Override
- public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
- this.defaultMQPushConsumerImpl.createTopic(key, newTopic, queueNum, topicSysFlag);
- }
-
-
- @Override
- public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
- return this.defaultMQPushConsumerImpl.searchOffset(mq, timestamp);
- }
-
-
- @Override
- public long maxOffset(MessageQueue mq) throws MQClientException {
- return this.defaultMQPushConsumerImpl.maxOffset(mq);
- }
-
-
- @Override
- public long minOffset(MessageQueue mq) throws MQClientException {
- return this.defaultMQPushConsumerImpl.minOffset(mq);
- }
-
-
- @Override
- public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
- return this.defaultMQPushConsumerImpl.earliestMsgStoreTime(mq);
- }
-
-
- @Override
- public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- return this.defaultMQPushConsumerImpl.viewMessage(offsetMsgId);
- }
-
-
- @Override
- public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
- throws MQClientException, InterruptedException {
- return this.defaultMQPushConsumerImpl.queryMessage(topic, key, maxNum, begin, end);
- }
-
- @Override
- public MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- try {
- MessageDecoder.decodeMessageId(msgId);
- return this.viewMessage(msgId);
- } catch (Exception e) {
- }
- return this.defaultMQPushConsumerImpl.queryMessageByUniqKey(topic, msgId);
- }
-
- public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
- return allocateMessageQueueStrategy;
- }
-
-
- public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
- this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
- }
-
-
- public int getConsumeConcurrentlyMaxSpan() {
- return consumeConcurrentlyMaxSpan;
- }
-
-
- public void setConsumeConcurrentlyMaxSpan(int consumeConcurrentlyMaxSpan) {
- this.consumeConcurrentlyMaxSpan = consumeConcurrentlyMaxSpan;
- }
-
-
- public ConsumeFromWhere getConsumeFromWhere() {
- return consumeFromWhere;
- }
-
-
- public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
- this.consumeFromWhere = consumeFromWhere;
- }
-
-
- public int getConsumeMessageBatchMaxSize() {
- return consumeMessageBatchMaxSize;
- }
-
-
- public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) {
- this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
- }
-
-
- public String getConsumerGroup() {
- return consumerGroup;
- }
-
-
- public void setConsumerGroup(String consumerGroup) {
- this.consumerGroup = consumerGroup;
- }
-
-
- public int getConsumeThreadMax() {
- return consumeThreadMax;
- }
-
-
- public void setConsumeThreadMax(int consumeThreadMax) {
- this.consumeThreadMax = consumeThreadMax;
- }
-
-
- public int getConsumeThreadMin() {
- return consumeThreadMin;
- }
-
-
- public void setConsumeThreadMin(int consumeThreadMin) {
- this.consumeThreadMin = consumeThreadMin;
- }
-
-
- public DefaultMQPushConsumerImpl getDefaultMQPushConsumerImpl() {
- return defaultMQPushConsumerImpl;
- }
-
-
- public MessageListener getMessageListener() {
- return messageListener;
- }
-
-
- public void setMessageListener(MessageListener messageListener) {
- this.messageListener = messageListener;
- }
-
-
- public MessageModel getMessageModel() {
- return messageModel;
- }
-
-
- public void setMessageModel(MessageModel messageModel) {
- this.messageModel = messageModel;
- }
-
-
- public int getPullBatchSize() {
- return pullBatchSize;
- }
-
-
- public void setPullBatchSize(int pullBatchSize) {
- this.pullBatchSize = pullBatchSize;
- }
-
-
- public long getPullInterval() {
- return pullInterval;
- }
-
-
- public void setPullInterval(long pullInterval) {
- this.pullInterval = pullInterval;
- }
-
-
- public int getPullThresholdForQueue() {
- return pullThresholdForQueue;
- }
-
-
- public void setPullThresholdForQueue(int pullThresholdForQueue) {
- this.pullThresholdForQueue = pullThresholdForQueue;
- }
-
-
- public Map<String, String> getSubscription() {
- return subscription;
- }
-
-
- public void setSubscription(Map<String, String> subscription) {
- this.subscription = subscription;
- }
-
-
- @Override
- public void sendMessageBack(MessageExt msg, int delayLevel)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, null);
- }
-
-
- @Override
- public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, brokerName);
- }
-
-
- @Override
- public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
- return this.defaultMQPushConsumerImpl.fetchSubscribeMessageQueues(topic);
- }
-
-
- @Override
- public void start() throws MQClientException {
- this.defaultMQPushConsumerImpl.start();
- }
-
-
- @Override
- public void shutdown() {
- this.defaultMQPushConsumerImpl.shutdown();
- }
-
-
- @Override
- @Deprecated
- public void registerMessageListener(MessageListener messageListener) {
- this.messageListener = messageListener;
- this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
- }
-
-
- @Override
- public void registerMessageListener(MessageListenerConcurrently messageListener) {
- this.messageListener = messageListener;
- this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
- }
-
-
- @Override
- public void registerMessageListener(MessageListenerOrderly messageListener) {
- this.messageListener = messageListener;
- this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
- }
-
-
- @Override
- public void subscribe(String topic, String subExpression) throws MQClientException {
- this.defaultMQPushConsumerImpl.subscribe(topic, subExpression);
- }
-
-
- @Override
- public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {
- this.defaultMQPushConsumerImpl.subscribe(topic, fullClassName, filterClassSource);
- }
-
-
- @Override
- public void unsubscribe(String topic) {
- this.defaultMQPushConsumerImpl.unsubscribe(topic);
- }
-
-
- @Override
- public void updateCorePoolSize(int corePoolSize) {
- this.defaultMQPushConsumerImpl.updateCorePoolSize(corePoolSize);
- }
-
-
- @Override
- public void suspend() {
- this.defaultMQPushConsumerImpl.suspend();
- }
-
-
- @Override
- public void resume() {
- this.defaultMQPushConsumerImpl.resume();
- }
-
-
- public OffsetStore getOffsetStore() {
- return offsetStore;
- }
-
-
- public void setOffsetStore(OffsetStore offsetStore) {
- this.offsetStore = offsetStore;
- }
-
-
- public String getConsumeTimestamp() {
- return consumeTimestamp;
- }
-
-
- public void setConsumeTimestamp(String consumeTimestamp) {
- this.consumeTimestamp = consumeTimestamp;
- }
-
-
- public boolean isPostSubscriptionWhenPull() {
- return postSubscriptionWhenPull;
- }
-
-
- public void setPostSubscriptionWhenPull(boolean postSubscriptionWhenPull) {
- this.postSubscriptionWhenPull = postSubscriptionWhenPull;
- }
-
-
- public boolean isUnitMode() {
- return unitMode;
- }
-
-
- public void setUnitMode(boolean isUnitMode) {
- this.unitMode = isUnitMode;
- }
-
-
- public long getAdjustThreadPoolNumsThreshold() {
- return adjustThreadPoolNumsThreshold;
- }
-
-
- public void setAdjustThreadPoolNumsThreshold(long adjustThreadPoolNumsThreshold) {
- this.adjustThreadPoolNumsThreshold = adjustThreadPoolNumsThreshold;
- }
-
-
- public int getMaxReconsumeTimes() {
- return maxReconsumeTimes;
- }
-
-
- public void setMaxReconsumeTimes(final int maxReconsumeTimes) {
- this.maxReconsumeTimes = maxReconsumeTimes;
- }
-
-
- public long getSuspendCurrentQueueTimeMillis() {
- return suspendCurrentQueueTimeMillis;
- }
-
-
- public void setSuspendCurrentQueueTimeMillis(final long suspendCurrentQueueTimeMillis) {
- this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
- }
-
-
- public long getConsumeTimeout() {
- return consumeTimeout;
- }
-
- public void setConsumeTimeout(final long consumeTimeout) {
- this.consumeTimeout = consumeTimeout;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQConsumer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQConsumer.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQConsumer.java
deleted file mode 100644
index 2a46b65..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQConsumer.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer;
-
-import com.alibaba.rocketmq.client.MQAdmin;
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.remoting.exception.RemotingException;
-
-import java.util.Set;
-
-
-/**
- * Message queue consumer interface
- *
- * @author shijia.wxr
- */
-public interface MQConsumer extends MQAdmin {
- /**
- * If consuming failure,message will be send back to the brokers,and delay consuming some time
- *
- * @param msg
- * @param delayLevel
- *
- * @throws InterruptedException
- * @throws MQBrokerException
- * @throws RemotingException
- * @throws MQClientException
- */
- @Deprecated
- void sendMessageBack(final MessageExt msg, final int delayLevel) throws RemotingException,
- MQBrokerException, InterruptedException, MQClientException;
-
-
- /**
- * If consuming failure,message will be send back to the broker,and delay consuming some time
- *
- * @param msg
- * @param delayLevel
- * @param brokerName
- *
- * @throws RemotingException
- * @throws MQBrokerException
- * @throws InterruptedException
- * @throws MQClientException
- */
- void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
-
-
- /**
- * Fetch message queues from consumer cache according to the topic
- *
- * @param topic
- * message topic
- *
- * @return queue set
- *
- * @throws MQClientException
- */
- Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) throws MQClientException;
-}