You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 08:13:51 UTC
[26/34] incubator-rocketmq git commit: ROCKETMQ-18 Reformat all codes.
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/test/java/org/apache/rocketmq/broker/api/SendMessageTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/api/SendMessageTest.java b/broker/src/test/java/org/apache/rocketmq/broker/api/SendMessageTest.java
index 0ff589d..38d885e 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/api/SendMessageTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/api/SendMessageTest.java
@@ -6,13 +6,15 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * $Id: SendMessageTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
/**
@@ -36,10 +38,9 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import static org.junit.Assert.*;
-
+import static org.junit.Assert.assertEquals;
-public class SendMessageTest extends BrokerTestHarness{
+public class SendMessageTest extends BrokerTestHarness {
MQClientAPIImpl client = new MQClientAPIImpl(new NettyClientConfig(), null, null, new ClientConfig());
String topic = "UnitTestTopic";
@@ -60,7 +61,7 @@ public class SendMessageTest extends BrokerTestHarness{
}
@Test
- public void testSendSingle() throws Exception{
+ public void testSendSingle() throws Exception {
Message msg = new Message(topic, "TAG1 TAG2", "100200300", "body".getBytes());
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup("abc");
@@ -74,7 +75,7 @@ public class SendMessageTest extends BrokerTestHarness{
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
SendResult result = client.sendMessage(brokerAddr, BROKER_NAME, msg, requestHeader, 1000 * 5,
- CommunicationMode.SYNC, new SendMessageContext(), null);
+ CommunicationMode.SYNC, new SendMessageContext(), null);
assertEquals(result.getSendStatus(), SendStatus.SEND_OK);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
index d6be5fb..89813fc 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
@@ -6,13 +6,15 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * $Id: ConsumerOffsetManagerTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
/**
@@ -25,7 +27,6 @@ import org.junit.Test;
import static org.junit.Assert.assertEquals;
-
public class ConsumerOffsetManagerTest extends BrokerTestHarness {
@Test
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
index ab9ab6f..2f85dbc 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
@@ -6,13 +6,15 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * $Id: TopicConfigManagerTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
/**
@@ -25,8 +27,9 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.junit.Test;
-import static org.junit.Assert.*;
-
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
public class TopicConfigManagerTest extends BrokerTestHarness {
@Test
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/checkstyle/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index e4a8c36..0d22d7d 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE module PUBLIC
- "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
- "http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
+ "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
+ "http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
@@ -32,8 +32,8 @@
</module>
<module name="RegexpSingleline">
- <property name="format" value="System\.out\.println" />
- <property name="message" value="Prohibit invoking System.out.println in source code !" />
+ <property name="format" value="System\.out\.println"/>
+ <property name="message" value="Prohibit invoking System.out.println in source code !"/>
</module>
<module name="RegexpSingleline">
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/pom.xml
----------------------------------------------------------------------
diff --git a/client/pom.xml b/client/pom.xml
index 86d38cf..9c18ebd 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -15,7 +15,7 @@
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.rocketmq</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index 8afca13..9c7a0cd 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client;
@@ -20,7 +20,6 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.remoting.common.RemotingUtil;
-
/**
* Client Common configuration
*
@@ -123,78 +122,64 @@ public class ClientConfig {
return clientCallbackExecutorThreads;
}
-
public void setClientCallbackExecutorThreads(int clientCallbackExecutorThreads) {
this.clientCallbackExecutorThreads = clientCallbackExecutorThreads;
}
-
public int getPollNameServerInteval() {
return pollNameServerInteval;
}
-
public void setPollNameServerInteval(int pollNameServerInteval) {
this.pollNameServerInteval = pollNameServerInteval;
}
-
public int getHeartbeatBrokerInterval() {
return heartbeatBrokerInterval;
}
-
public void setHeartbeatBrokerInterval(int heartbeatBrokerInterval) {
this.heartbeatBrokerInterval = heartbeatBrokerInterval;
}
-
public int getPersistConsumerOffsetInterval() {
return persistConsumerOffsetInterval;
}
-
public void setPersistConsumerOffsetInterval(int persistConsumerOffsetInterval) {
this.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
}
-
public String getUnitName() {
return unitName;
}
-
public void setUnitName(String unitName) {
this.unitName = unitName;
}
-
public boolean isUnitMode() {
return unitMode;
}
-
public void setUnitMode(boolean unitMode) {
this.unitMode = unitMode;
}
-
public boolean isVipChannelEnabled() {
return vipChannelEnabled;
}
-
public void setVipChannelEnabled(final boolean vipChannelEnabled) {
this.vipChannelEnabled = vipChannelEnabled;
}
-
@Override
public String toString() {
return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
- + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInteval=" + pollNameServerInteval
- + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval="
- + persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
- + vipChannelEnabled + "]";
+ + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInteval=" + pollNameServerInteval
+ + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval="
+ + persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
+ + vipChannelEnabled + "]";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java b/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java
index 6596855..7697520 100644
--- a/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java
+++ b/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client;
@@ -22,7 +22,6 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
-
/**
* Base interface for MQ management
*
@@ -41,8 +40,7 @@ public interface MQAdmin {
* @throws MQClientException
*/
void createTopic(final String key, final String newTopic, final int queueNum)
- throws MQClientException;
-
+ throws MQClientException;
/**
* Creates an topic
@@ -59,8 +57,7 @@ public interface MQAdmin {
* @throws MQClientException
*/
void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
- throws MQClientException;
-
+ throws MQClientException;
/**
* Gets the message queue offset according to some time in milliseconds<br>
@@ -77,7 +74,6 @@ public interface MQAdmin {
*/
long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException;
-
/**
* Gets the max offset
*
@@ -90,7 +86,6 @@ public interface MQAdmin {
*/
long maxOffset(final MessageQueue mq) throws MQClientException;
-
/**
* Gets the minimum offset
*
@@ -103,7 +98,6 @@ public interface MQAdmin {
*/
long minOffset(final MessageQueue mq) throws MQClientException;
-
/**
* Gets the earliest stored message time
*
@@ -116,7 +110,6 @@ public interface MQAdmin {
*/
long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException;
-
/**
* Query message according tto message id
*
@@ -131,8 +124,7 @@ public interface MQAdmin {
* @throws MQClientException
*/
MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException,
- InterruptedException, MQClientException;
-
+ InterruptedException, MQClientException;
/**
* Query messages
@@ -154,8 +146,8 @@ public interface MQAdmin {
* @throws InterruptedException
*/
QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
- final long end) throws MQClientException, InterruptedException;
-
+ final long end) throws MQClientException, InterruptedException;
+
/**
* @param topic
@@ -166,7 +158,6 @@ public interface MQAdmin {
* @throws InterruptedException
* @throws MQClientException
*/
- MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
+ MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/MQHelper.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/MQHelper.java b/client/src/main/java/org/apache/rocketmq/client/MQHelper.java
index b4ddb08..937e846 100644
--- a/client/src/main/java/org/apache/rocketmq/client/MQHelper.java
+++ b/client/src/main/java/org/apache/rocketmq/client/MQHelper.java
@@ -16,47 +16,39 @@
*/
package org.apache.rocketmq.client;
+import java.util.Set;
+import java.util.TreeSet;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.slf4j.Logger;
-import java.util.Set;
-import java.util.TreeSet;
-
-
public class MQHelper {
public static void resetOffsetByTimestamp(
- final MessageModel messageModel,
- final String consumerGroup,
- final String topic,
- final long timestamp) throws Exception {
+ final MessageModel messageModel,
+ final String consumerGroup,
+ final String topic,
+ final long timestamp) throws Exception {
resetOffsetByTimestamp(messageModel, "DEFAULT", consumerGroup, topic, timestamp);
}
/**
* Reset consumer topic offset according to time
*
- * @param messageModel
- * which model
- * @param instanceName
- * which instance
- * @param consumerGroup
- * consumer group
- * @param topic
- * topic
- * @param timestamp
- * time
- *
+ * @param messageModel which model
+ * @param instanceName which instance
+ * @param consumerGroup consumer group
+ * @param topic topic
+ * @param timestamp time
* @throws Exception
*/
public static void resetOffsetByTimestamp(
- final MessageModel messageModel,
- final String instanceName,
- final String consumerGroup,
- final String topic,
- final long timestamp) throws Exception {
+ final MessageModel messageModel,
+ final String instanceName,
+ final String consumerGroup,
+ final String topic,
+ final long timestamp) throws Exception {
final Logger log = ClientLogger.getLog();
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(consumerGroup);
@@ -74,7 +66,7 @@ public class MQHelper {
if (offset >= 0) {
consumer.updateConsumeOffset(mq, offset);
log.info("resetOffsetByTimestamp updateConsumeOffset success, {} {} {}",
- consumerGroup, offset, mq);
+ consumerGroup, offset, mq);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/QueryResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/QueryResult.java b/client/src/main/java/org/apache/rocketmq/client/QueryResult.java
index af3649b..7b1cc01 100644
--- a/client/src/main/java/org/apache/rocketmq/client/QueryResult.java
+++ b/client/src/main/java/org/apache/rocketmq/client/QueryResult.java
@@ -6,45 +6,39 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client;
-import org.apache.rocketmq.common.message.MessageExt;
-
import java.util.List;
-
+import org.apache.rocketmq.common.message.MessageExt;
public class QueryResult {
private final long indexLastUpdateTimestamp;
private final List<MessageExt> messageList;
-
public QueryResult(long indexLastUpdateTimestamp, List<MessageExt> messageList) {
this.indexLastUpdateTimestamp = indexLastUpdateTimestamp;
this.messageList = messageList;
}
-
public long getIndexLastUpdateTimestamp() {
return indexLastUpdateTimestamp;
}
-
public List<MessageExt> getMessageList() {
return messageList;
}
-
@Override
public String toString() {
return "QueryResult [indexLastUpdateTimestamp=" + indexLastUpdateTimestamp + ", messageList="
- + messageList + "]";
+ + messageList + "]";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/Validators.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/Validators.java b/client/src/main/java/org/apache/rocketmq/client/Validators.java
index 92fc53b..fa9e4e6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/Validators.java
+++ b/client/src/main/java/org/apache/rocketmq/client/Validators.java
@@ -6,17 +6,19 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.MixAll;
@@ -24,10 +26,6 @@ import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.protocol.ResponseCode;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-
/**
* Common Validator
*
@@ -65,8 +63,8 @@ public class Validators {
}
if (!regularExpressionMatcher(group, PATTERN)) {
throw new MQClientException(String.format(
- "the specified group[%s] contains illegal characters, allowing only %s", group,
- VALID_PATTERN_STR), null);
+ "the specified group[%s] contains illegal characters, allowing only %s", group,
+ VALID_PATTERN_STR), null);
}
if (group.length() > CHARACTER_MAX_LENGTH) {
throw new MQClientException("the specified group is longer than group max length 255.", null);
@@ -97,7 +95,7 @@ public class Validators {
* @throws MQClientException
*/
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
- throws MQClientException {
+ throws MQClientException {
if (null == msg) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
}
@@ -114,7 +112,7 @@ public class Validators {
if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
- "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
+ "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
}
}
@@ -132,8 +130,8 @@ public class Validators {
if (!regularExpressionMatcher(topic, PATTERN)) {
throw new MQClientException(String.format(
- "the specified topic[%s] contains illegal characters, allowing only %s", topic,
- VALID_PATTERN_STR), null);
+ "the specified topic[%s] contains illegal characters, allowing only %s", topic,
+ VALID_PATTERN_STR), null);
}
if (topic.length() > CHARACTER_MAX_LENGTH) {
@@ -143,7 +141,7 @@ public class Validators {
//whether the same with system reserved keyword
if (topic.equals(MixAll.DEFAULT_TOPIC)) {
throw new MQClientException(
- String.format("the topic[%s] is conflict with default topic.", topic), null);
+ String.format("the topic[%s] is conflict with default topic.", topic), null);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/admin/MQAdminExtInner.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/admin/MQAdminExtInner.java b/client/src/main/java/org/apache/rocketmq/client/admin/MQAdminExtInner.java
index bc4ca6c..913d4f2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/admin/MQAdminExtInner.java
+++ b/client/src/main/java/org/apache/rocketmq/client/admin/MQAdminExtInner.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.admin;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java b/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java
index 360cfdf..391f1d1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java
+++ b/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.common;
@@ -22,6 +22,7 @@ import java.util.Random;
public class ThreadLocalIndex {
private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
private final Random random = new Random();
+
public ThreadLocalIndex(int value) {
}
@@ -30,7 +31,8 @@ public class ThreadLocalIndex {
Integer index = this.threadLocalIndex.get();
if (null == index) {
index = Math.abs(random.nextInt());
- if (index < 0) index = 0;
+ if (index < 0)
+ index = 0;
this.threadLocalIndex.set(index);
}
@@ -45,7 +47,7 @@ public class ThreadLocalIndex {
@Override
public String toString() {
return "ThreadLocalIndex{" +
- "threadLocalIndex=" + threadLocalIndex.get() +
- '}';
+ "threadLocalIndex=" + threadLocalIndex.get() +
+ '}';
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java b/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java
index 81a71e4..ca692d3 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java
@@ -16,39 +16,30 @@
*/
package org.apache.rocketmq.client.consumer;
-import org.apache.rocketmq.common.message.MessageQueue;
-
import java.util.List;
-
+import org.apache.rocketmq.common.message.MessageQueue;
/**
* Strategy Algorithm for message allocating between consumers
- *
*/
public interface AllocateMessageQueueStrategy {
/**
* Allocating by consumer id
*
- * @param consumerGroup
- * current consumer group
- * @param currentCID
- * current consumer id
- * @param mqAll
- * message queue set in current topic
- * @param cidAll
- * consumer set in current consumer group
- *
+ * @param consumerGroup current consumer group
+ * @param currentCID current consumer id
+ * @param mqAll message queue set in current topic
+ * @param cidAll consumer set in current consumer group
* @return The allocate result of given strategy
*/
List<MessageQueue> allocate(
- final String consumerGroup,
- final String currentCID,
- final List<MessageQueue> mqAll,
- final List<String> cidAll
+ final String consumerGroup,
+ final String currentCID,
+ final List<MessageQueue> mqAll,
+ final List<String> cidAll
);
-
/**
* Algorithm name
*
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index 156b3d0..8eb1258 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -6,16 +6,18 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
@@ -31,10 +33,6 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
-import java.util.HashSet;
-import java.util.Set;
-
-
/**
* Default pulling consumer
*
@@ -88,23 +86,19 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
private int maxReconsumeTimes = 16;
-
public DefaultMQPullConsumer() {
this(MixAll.DEFAULT_CONSUMER_GROUP, null);
}
-
public DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook) {
this.consumerGroup = consumerGroup;
defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, rpcHook);
}
-
public DefaultMQPullConsumer(final String consumerGroup) {
this(consumerGroup, null);
}
-
public DefaultMQPullConsumer(RPCHook rpcHook) {
this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook);
}
@@ -114,141 +108,116 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
createTopic(key, newTopic, queueNum, 0);
}
-
@Override
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
this.defaultMQPullConsumerImpl.createTopic(key, newTopic, queueNum, topicSysFlag);
}
-
@Override
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return this.defaultMQPullConsumerImpl.searchOffset(mq, timestamp);
}
-
@Override
public long maxOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQPullConsumerImpl.maxOffset(mq);
}
-
@Override
public long minOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQPullConsumerImpl.minOffset(mq);
}
-
@Override
public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
return this.defaultMQPullConsumerImpl.earliestMsgStoreTime(mq);
}
-
@Override
public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException,
- InterruptedException, MQClientException {
+ InterruptedException, MQClientException {
return this.defaultMQPullConsumerImpl.viewMessage(offsetMsgId);
}
-
@Override
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
- throws MQClientException, InterruptedException {
+ throws MQClientException, InterruptedException {
return this.defaultMQPullConsumerImpl.queryMessage(topic, key, maxNum, begin, end);
}
-
public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
return allocateMessageQueueStrategy;
}
-
public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
}
-
public long getBrokerSuspendMaxTimeMillis() {
return brokerSuspendMaxTimeMillis;
}
-
public void setBrokerSuspendMaxTimeMillis(long brokerSuspendMaxTimeMillis) {
this.brokerSuspendMaxTimeMillis = brokerSuspendMaxTimeMillis;
}
-
public String getConsumerGroup() {
return consumerGroup;
}
-
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
-
public long getConsumerPullTimeoutMillis() {
return consumerPullTimeoutMillis;
}
-
public void setConsumerPullTimeoutMillis(long consumerPullTimeoutMillis) {
this.consumerPullTimeoutMillis = consumerPullTimeoutMillis;
}
-
public long getConsumerTimeoutMillisWhenSuspend() {
return consumerTimeoutMillisWhenSuspend;
}
-
public void setConsumerTimeoutMillisWhenSuspend(long consumerTimeoutMillisWhenSuspend) {
this.consumerTimeoutMillisWhenSuspend = consumerTimeoutMillisWhenSuspend;
}
-
public MessageModel getMessageModel() {
return messageModel;
}
-
public void setMessageModel(MessageModel messageModel) {
this.messageModel = messageModel;
}
-
public MessageQueueListener getMessageQueueListener() {
return messageQueueListener;
}
-
public void setMessageQueueListener(MessageQueueListener messageQueueListener) {
this.messageQueueListener = messageQueueListener;
}
-
public Set<String> getRegisterTopics() {
return registerTopics;
}
-
public void setRegisterTopics(Set<String> registerTopics) {
this.registerTopics = registerTopics;
}
-
@Override
public void sendMessageBack(MessageExt msg, int delayLevel)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, null);
}
-
@Override
public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, brokerName);
}
@@ -279,37 +248,37 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
@Override
public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums);
}
@Override
public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, timeout);
}
@Override
public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback)
- throws MQClientException, RemotingException, InterruptedException {
+ throws MQClientException, RemotingException, InterruptedException {
this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback);
}
@Override
public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback, long timeout)
- throws MQClientException, RemotingException, InterruptedException {
+ throws MQClientException, RemotingException, InterruptedException {
this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback, timeout);
}
@Override
public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQPullConsumerImpl.pullBlockIfNotFound(mq, subExpression, offset, maxNums);
}
@Override
public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback)
- throws MQClientException, RemotingException, InterruptedException {
+ throws MQClientException, RemotingException, InterruptedException {
this.defaultMQPullConsumerImpl.pullBlockIfNotFound(mq, subExpression, offset, maxNums, pullCallback);
}
@@ -341,7 +310,7 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
@Override
public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, brokerName, consumerGroup);
}
@@ -349,32 +318,26 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
return offsetStore;
}
-
public void setOffsetStore(OffsetStore offsetStore) {
this.offsetStore = offsetStore;
}
-
public DefaultMQPullConsumerImpl getDefaultMQPullConsumerImpl() {
return defaultMQPullConsumerImpl;
}
-
public boolean isUnitMode() {
return unitMode;
}
-
public void setUnitMode(boolean isUnitMode) {
this.unitMode = isUnitMode;
}
-
public int getMaxReconsumeTimes() {
return maxReconsumeTimes;
}
-
public void setMaxReconsumeTimes(final int maxReconsumeTimes) {
this.maxReconsumeTimes = maxReconsumeTimes;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 228e075..fcb3e64 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -16,6 +16,9 @@
*/
package org.apache.rocketmq.client.consumer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
@@ -36,14 +39,8 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-
/**
* Wrapped push consumer.in fact,it works as remarkable as the pull consumer
- *
*/
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
@@ -133,24 +130,20 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
private long suspendCurrentQueueTimeMillis = 1000;
private long consumeTimeout = 15;
-
public DefaultMQPushConsumer() {
this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely());
}
-
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
-
public DefaultMQPushConsumer(RPCHook rpcHook) {
this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely());
}
-
public DefaultMQPushConsumer(final String consumerGroup) {
this(consumerGroup, null, new AllocateMessageQueueAveragely());
}
@@ -160,46 +153,39 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
createTopic(key, newTopic, queueNum, 0);
}
-
@Override
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
this.defaultMQPushConsumerImpl.createTopic(key, newTopic, queueNum, topicSysFlag);
}
-
@Override
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return this.defaultMQPushConsumerImpl.searchOffset(mq, timestamp);
}
-
@Override
public long maxOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQPushConsumerImpl.maxOffset(mq);
}
-
@Override
public long minOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQPushConsumerImpl.minOffset(mq);
}
-
@Override
public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
return this.defaultMQPushConsumerImpl.earliestMsgStoreTime(mq);
}
-
@Override
public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return this.defaultMQPushConsumerImpl.viewMessage(offsetMsgId);
}
-
@Override
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
- throws MQClientException, InterruptedException {
+ throws MQClientException, InterruptedException {
return this.defaultMQPushConsumerImpl.queryMessage(topic, key, maxNum, begin, end);
}
@@ -218,169 +204,137 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
return allocateMessageQueueStrategy;
}
-
public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
}
-
public int getConsumeConcurrentlyMaxSpan() {
return consumeConcurrentlyMaxSpan;
}
-
public void setConsumeConcurrentlyMaxSpan(int consumeConcurrentlyMaxSpan) {
this.consumeConcurrentlyMaxSpan = consumeConcurrentlyMaxSpan;
}
-
public ConsumeFromWhere getConsumeFromWhere() {
return consumeFromWhere;
}
-
public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
this.consumeFromWhere = consumeFromWhere;
}
-
public int getConsumeMessageBatchMaxSize() {
return consumeMessageBatchMaxSize;
}
-
public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) {
this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
}
-
public String getConsumerGroup() {
return consumerGroup;
}
-
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
-
public int getConsumeThreadMax() {
return consumeThreadMax;
}
-
public void setConsumeThreadMax(int consumeThreadMax) {
this.consumeThreadMax = consumeThreadMax;
}
-
public int getConsumeThreadMin() {
return consumeThreadMin;
}
-
public void setConsumeThreadMin(int consumeThreadMin) {
this.consumeThreadMin = consumeThreadMin;
}
-
public DefaultMQPushConsumerImpl getDefaultMQPushConsumerImpl() {
return defaultMQPushConsumerImpl;
}
-
public MessageListener getMessageListener() {
return messageListener;
}
-
public void setMessageListener(MessageListener messageListener) {
this.messageListener = messageListener;
}
-
public MessageModel getMessageModel() {
return messageModel;
}
-
public void setMessageModel(MessageModel messageModel) {
this.messageModel = messageModel;
}
-
public int getPullBatchSize() {
return pullBatchSize;
}
-
public void setPullBatchSize(int pullBatchSize) {
this.pullBatchSize = pullBatchSize;
}
-
public long getPullInterval() {
return pullInterval;
}
-
public void setPullInterval(long pullInterval) {
this.pullInterval = pullInterval;
}
-
public int getPullThresholdForQueue() {
return pullThresholdForQueue;
}
-
public void setPullThresholdForQueue(int pullThresholdForQueue) {
this.pullThresholdForQueue = pullThresholdForQueue;
}
-
public Map<String, String> getSubscription() {
return subscription;
}
-
public void setSubscription(Map<String, String> subscription) {
this.subscription = subscription;
}
-
@Override
public void sendMessageBack(MessageExt msg, int delayLevel)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, null);
}
-
@Override
public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, brokerName);
}
-
@Override
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
return this.defaultMQPushConsumerImpl.fetchSubscribeMessageQueues(topic);
}
-
@Override
public void start() throws MQClientException {
this.defaultMQPushConsumerImpl.start();
}
-
@Override
public void shutdown() {
this.defaultMQPushConsumerImpl.shutdown();
}
-
@Override
@Deprecated
public void registerMessageListener(MessageListener messageListener) {
@@ -388,127 +342,104 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
}
-
@Override
public void registerMessageListener(MessageListenerConcurrently messageListener) {
this.messageListener = messageListener;
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
}
-
@Override
public void registerMessageListener(MessageListenerOrderly messageListener) {
this.messageListener = messageListener;
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
}
-
@Override
public void subscribe(String topic, String subExpression) throws MQClientException {
this.defaultMQPushConsumerImpl.subscribe(topic, subExpression);
}
-
@Override
public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {
this.defaultMQPushConsumerImpl.subscribe(topic, fullClassName, filterClassSource);
}
-
@Override
public void unsubscribe(String topic) {
this.defaultMQPushConsumerImpl.unsubscribe(topic);
}
-
@Override
public void updateCorePoolSize(int corePoolSize) {
this.defaultMQPushConsumerImpl.updateCorePoolSize(corePoolSize);
}
-
@Override
public void suspend() {
this.defaultMQPushConsumerImpl.suspend();
}
-
@Override
public void resume() {
this.defaultMQPushConsumerImpl.resume();
}
-
public OffsetStore getOffsetStore() {
return offsetStore;
}
-
public void setOffsetStore(OffsetStore offsetStore) {
this.offsetStore = offsetStore;
}
-
public String getConsumeTimestamp() {
return consumeTimestamp;
}
-
public void setConsumeTimestamp(String consumeTimestamp) {
this.consumeTimestamp = consumeTimestamp;
}
-
public boolean isPostSubscriptionWhenPull() {
return postSubscriptionWhenPull;
}
-
public void setPostSubscriptionWhenPull(boolean postSubscriptionWhenPull) {
this.postSubscriptionWhenPull = postSubscriptionWhenPull;
}
-
public boolean isUnitMode() {
return unitMode;
}
-
public void setUnitMode(boolean isUnitMode) {
this.unitMode = isUnitMode;
}
-
public long getAdjustThreadPoolNumsThreshold() {
return adjustThreadPoolNumsThreshold;
}
-
public void setAdjustThreadPoolNumsThreshold(long adjustThreadPoolNumsThreshold) {
this.adjustThreadPoolNumsThreshold = adjustThreadPoolNumsThreshold;
}
-
public int getMaxReconsumeTimes() {
return maxReconsumeTimes;
}
-
public void setMaxReconsumeTimes(final int maxReconsumeTimes) {
this.maxReconsumeTimes = maxReconsumeTimes;
}
-
public long getSuspendCurrentQueueTimeMillis() {
return suspendCurrentQueueTimeMillis;
}
-
public void setSuspendCurrentQueueTimeMillis(final long suspendCurrentQueueTimeMillis) {
this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
}
-
public long getConsumeTimeout() {
return consumeTimeout;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java
index 9d9c72b..343a0a2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java
@@ -6,16 +6,17 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
+import java.util.Set;
import org.apache.rocketmq.client.MQAdmin;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -23,9 +24,6 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
-import java.util.Set;
-
-
/**
* Message queue consumer interface
*
@@ -44,8 +42,7 @@ public interface MQConsumer extends MQAdmin {
*/
@Deprecated
void sendMessageBack(final MessageExt msg, final int delayLevel) throws RemotingException,
- MQBrokerException, InterruptedException, MQClientException;
-
+ MQBrokerException, InterruptedException, MQClientException;
/**
* If consuming failure,message will be send back to the broker,and delay consuming some time
@@ -60,8 +57,7 @@ public interface MQConsumer extends MQAdmin {
* @throws MQClientException
*/
void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
-
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
/**
* Fetch message queues from consumer cache according to the topic
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
index 2335e3d..d199f8a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
@@ -6,25 +6,23 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
+import java.util.Set;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
-import java.util.Set;
-
-
/**
* Pulling consumer interface
*
@@ -37,13 +35,11 @@ public interface MQPullConsumer extends MQConsumer {
*/
void start() throws MQClientException;
-
/**
* Shutdown the consumer
*/
void shutdown();
-
/**
* Register the message queue listener
*
@@ -52,7 +48,6 @@ public interface MQPullConsumer extends MQConsumer {
*/
void registerMessageQueueListener(final String topic, final MessageQueueListener listener);
-
/**
* Pulling the messages,not blocking
*
@@ -74,9 +69,8 @@ public interface MQPullConsumer extends MQConsumer {
* @throws RemotingException
*/
PullResult pull(final MessageQueue mq, final String subExpression, final long offset,
- final int maxNums) throws MQClientException, RemotingException, MQBrokerException,
- InterruptedException;
-
+ final int maxNums) throws MQClientException, RemotingException, MQBrokerException,
+ InterruptedException;
/**
* Pulling the messages in the specified timeout
@@ -95,9 +89,8 @@ public interface MQPullConsumer extends MQConsumer {
* @throws InterruptedException
*/
PullResult pull(final MessageQueue mq, final String subExpression, final long offset,
- final int maxNums, final long timeout) throws MQClientException, RemotingException,
- MQBrokerException, InterruptedException;
-
+ final int maxNums, final long timeout) throws MQClientException, RemotingException,
+ MQBrokerException, InterruptedException;
/**
* Pulling the messages in a async. way
@@ -113,8 +106,8 @@ public interface MQPullConsumer extends MQConsumer {
* @throws InterruptedException
*/
void pull(final MessageQueue mq, final String subExpression, final long offset, final int maxNums,
- final PullCallback pullCallback) throws MQClientException, RemotingException,
- InterruptedException;
+ final PullCallback pullCallback) throws MQClientException, RemotingException,
+ InterruptedException;
/**
* Pulling the messages in a async. way
@@ -131,9 +124,8 @@ public interface MQPullConsumer extends MQConsumer {
* @throws InterruptedException
*/
void pull(final MessageQueue mq, final String subExpression, final long offset, final int maxNums,
- final PullCallback pullCallback, long timeout) throws MQClientException, RemotingException,
- InterruptedException;
-
+ final PullCallback pullCallback, long timeout) throws MQClientException, RemotingException,
+ InterruptedException;
/**
* Pulling the messages,if no message arrival,blocking some time
@@ -151,9 +143,8 @@ public interface MQPullConsumer extends MQConsumer {
* @throws InterruptedException
*/
PullResult pullBlockIfNotFound(final MessageQueue mq, final String subExpression,
- final long offset, final int maxNums) throws MQClientException, RemotingException,
- MQBrokerException, InterruptedException;
-
+ final long offset, final int maxNums) throws MQClientException, RemotingException,
+ MQBrokerException, InterruptedException;
/**
* Pulling the messages through callback function,if no message arrival,blocking.
@@ -169,9 +160,8 @@ public interface MQPullConsumer extends MQConsumer {
* @throws InterruptedException
*/
void pullBlockIfNotFound(final MessageQueue mq, final String subExpression, final long offset,
- final int maxNums, final PullCallback pullCallback) throws MQClientException, RemotingException,
- InterruptedException;
-
+ final int maxNums, final PullCallback pullCallback) throws MQClientException, RemotingException,
+ InterruptedException;
/**
* Update the offset
@@ -183,7 +173,6 @@ public interface MQPullConsumer extends MQConsumer {
*/
void updateConsumeOffset(final MessageQueue mq, final long offset) throws MQClientException;
-
/**
* Fetch the offset
*
@@ -196,7 +185,6 @@ public interface MQPullConsumer extends MQConsumer {
*/
long fetchConsumeOffset(final MessageQueue mq, final boolean fromStore) throws MQClientException;
-
/**
* Fetch the message queues according to the topic
*
@@ -224,5 +212,5 @@ public interface MQPullConsumer extends MQConsumer {
* @throws MQClientException
*/
void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java
index da8ffb5..ec747e2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java
@@ -16,34 +16,31 @@
*/
package org.apache.rocketmq.client.consumer;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
-import org.slf4j.Logger;
-
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.slf4j.Logger;
/**
* Schedule service for pull consumer
- *
*/
public class MQPullConsumerScheduleService {
private final Logger log = ClientLogger.getLog();
private final MessageQueueListener messageQueueListener = new MessageQueueListenerImpl();
private final ConcurrentHashMap<MessageQueue, PullTaskImpl> taskTable =
- new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
+ new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
private DefaultMQPullConsumer defaultMQPullConsumer;
private int pullThreadNums = 20;
private ConcurrentHashMap<String /* topic */, PullTaskCallback> callbackTable =
- new ConcurrentHashMap<String, PullTaskCallback>();
+ new ConcurrentHashMap<String, PullTaskCallback>();
private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
public MQPullConsumerScheduleService(final String consumerGroup) {
@@ -76,8 +73,8 @@ public class MQPullConsumerScheduleService {
public void start() throws MQClientException {
final String group = this.defaultMQPullConsumer.getConsumerGroup();
this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
- this.pullThreadNums,
- new ThreadFactoryImpl("PullMsgThread-" + group)
+ this.pullThreadNums,
+ new ThreadFactoryImpl("PullMsgThread-" + group)
);
this.defaultMQPullConsumer.setMessageQueueListener(this.messageQueueListener);
@@ -85,7 +82,7 @@ public class MQPullConsumerScheduleService {
this.defaultMQPullConsumer.start();
log.info("MQPullConsumerScheduleService start OK, {} {}",
- this.defaultMQPullConsumer.getConsumerGroup(), this.callbackTable);
+ this.defaultMQPullConsumer.getConsumerGroup(), this.callbackTable);
}
public void registerPullTaskCallback(final String topic, final PullTaskCallback callback) {
@@ -139,7 +136,7 @@ public class MQPullConsumerScheduleService {
@Override
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
MessageModel messageModel =
- MQPullConsumerScheduleService.this.defaultMQPullConsumer.getMessageModel();
+ MQPullConsumerScheduleService.this.defaultMQPullConsumer.getMessageModel();
switch (messageModel) {
case BROADCASTING:
MQPullConsumerScheduleService.this.putTask(topic, mqAll);
@@ -157,18 +154,16 @@ public class MQPullConsumerScheduleService {
private final MessageQueue messageQueue;
private volatile boolean cancelled = false;
-
public PullTaskImpl(final MessageQueue messageQueue) {
this.messageQueue = messageQueue;
}
-
@Override
public void run() {
String topic = this.messageQueue.getTopic();
if (!this.isCancelled()) {
PullTaskCallback pullTaskCallback =
- MQPullConsumerScheduleService.this.callbackTable.get(topic);
+ MQPullConsumerScheduleService.this.callbackTable.get(topic);
if (pullTaskCallback != null) {
final PullTaskContext context = new PullTaskContext();
context.setPullConsumer(MQPullConsumerScheduleService.this.defaultMQPullConsumer);
@@ -181,7 +176,7 @@ public class MQPullConsumerScheduleService {
if (!this.isCancelled()) {
MQPullConsumerScheduleService.this.scheduledThreadPoolExecutor.schedule(this,
- context.getPullNextDelayTimeMillis(), TimeUnit.MILLISECONDS);
+ context.getPullNextDelayTimeMillis(), TimeUnit.MILLISECONDS);
} else {
log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue);
}
@@ -193,17 +188,14 @@ public class MQPullConsumerScheduleService {
}
}
-
public boolean isCancelled() {
return cancelled;
}
-
public void setCancelled(boolean cancelled) {
this.cancelled = cancelled;
}
-
public MessageQueue getMessageQueue() {
return messageQueue;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java
index b04956c..1b969bd 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
@@ -21,7 +21,6 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
-
/**
* Push consumer
*
@@ -34,13 +33,11 @@ public interface MQPushConsumer extends MQConsumer {
*/
void start() throws MQClientException;
-
/**
* Shutdown the consumer
*/
void shutdown();
-
/**
* Register the message listener
*
@@ -49,13 +46,10 @@ public interface MQPushConsumer extends MQConsumer {
@Deprecated
void registerMessageListener(MessageListener messageListener);
-
void registerMessageListener(final MessageListenerConcurrently messageListener);
-
void registerMessageListener(final MessageListenerOrderly messageListener);
-
/**
* Subscribe some topic
*
@@ -69,7 +63,6 @@ public interface MQPushConsumer extends MQConsumer {
*/
void subscribe(final String topic, final String subExpression) throws MQClientException;
-
/**
* Subscribe some topic
*
@@ -85,7 +78,6 @@ public interface MQPushConsumer extends MQConsumer {
*/
void subscribe(final String topic, final String fullClassName, final String filterClassSource) throws MQClientException;
-
/**
* Unsubscribe consumption some topic
*
@@ -94,7 +86,6 @@ public interface MQPushConsumer extends MQConsumer {
*/
void unsubscribe(final String topic);
-
/**
* Update the consumer thread pool size Dynamically
*
@@ -102,13 +93,11 @@ public interface MQPushConsumer extends MQConsumer {
*/
void updateCorePoolSize(int corePoolSize);
-
/**
* Suspend the consumption
*/
void suspend();
-
/**
* Resume the consumption
*/
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java
index 7a08348..0cc2dc4 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java
@@ -6,20 +6,18 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
-import org.apache.rocketmq.common.message.MessageQueue;
-
import java.util.Set;
-
+import org.apache.rocketmq.common.message.MessageQueue;
/**
* A MessageQueueListener is implemented by the application and may be specified when a message queue changed
@@ -35,5 +33,5 @@ public interface MessageQueueListener {
* collection of queues,assigned to the current consumer
*/
void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,
- final Set<MessageQueue> mqDivided);
+ final Set<MessageQueue> mqDivided);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/PullCallback.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/PullCallback.java b/client/src/main/java/org/apache/rocketmq/client/consumer/PullCallback.java
index cf554c4..06e47d9 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/PullCallback.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/PullCallback.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/PullResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/PullResult.java b/client/src/main/java/org/apache/rocketmq/client/consumer/PullResult.java
index 1cb23ce..e494f74 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/PullResult.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/PullResult.java
@@ -6,20 +6,18 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
-import org.apache.rocketmq.common.message.MessageExt;
-
import java.util.List;
-
+import org.apache.rocketmq.common.message.MessageExt;
public class PullResult {
private final PullStatus pullStatus;
@@ -28,9 +26,8 @@ public class PullResult {
private final long maxOffset;
private List<MessageExt> msgFoundList;
-
public PullResult(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset,
- List<MessageExt> msgFoundList) {
+ List<MessageExt> msgFoundList) {
super();
this.pullStatus = pullStatus;
this.nextBeginOffset = nextBeginOffset;
@@ -39,41 +36,34 @@ public class PullResult {
this.msgFoundList = msgFoundList;
}
-
public PullStatus getPullStatus() {
return pullStatus;
}
-
public long getNextBeginOffset() {
return nextBeginOffset;
}
-
public long getMinOffset() {
return minOffset;
}
-
public long getMaxOffset() {
return maxOffset;
}
-
public List<MessageExt> getMsgFoundList() {
return msgFoundList;
}
-
public void setMsgFoundList(List<MessageExt> msgFoundList) {
this.msgFoundList = msgFoundList;
}
-
@Override
public String toString() {
return "PullResult [pullStatus=" + pullStatus + ", nextBeginOffset=" + nextBeginOffset
- + ", minOffset=" + minOffset + ", maxOffset=" + maxOffset + ", msgFoundList="
- + (msgFoundList == null ? 0 : msgFoundList.size()) + "]";
+ + ", minOffset=" + minOffset + ", maxOffset=" + maxOffset + ", msgFoundList="
+ + (msgFoundList == null ? 0 : msgFoundList.size()) + "]";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/PullStatus.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/PullStatus.java b/client/src/main/java/org/apache/rocketmq/client/consumer/PullStatus.java
index b2a3c8c..a400d90 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/PullStatus.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/PullStatus.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskCallback.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskCallback.java b/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskCallback.java
index dc74bca..bc9a867 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskCallback.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskCallback.java
@@ -6,19 +6,18 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
import org.apache.rocketmq.common.message.MessageQueue;
-
public interface PullTaskCallback {
void doPullTask(final MessageQueue mq, final PullTaskContext context);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskContext.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskContext.java b/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskContext.java
index ba66a1f..f0114ae 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskContext.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskContext.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.consumer;
@@ -22,22 +22,18 @@ public class PullTaskContext {
private MQPullConsumer pullConsumer;
-
public int getPullNextDelayTimeMillis() {
return pullNextDelayTimeMillis;
}
-
public void setPullNextDelayTimeMillis(int pullNextDelayTimeMillis) {
this.pullNextDelayTimeMillis = pullNextDelayTimeMillis;
}
-
public MQPullConsumer getPullConsumer() {
return pullConsumer;
}
-
public void setPullConsumer(MQPullConsumer pullConsumer) {
this.pullConsumer = pullConsumer;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java
index 981ceaf..40ac6c1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java
@@ -18,10 +18,8 @@ package org.apache.rocketmq.client.consumer.listener;
import org.apache.rocketmq.common.message.MessageQueue;
-
/**
* Consumer concurrent consumption context
- *
*/
public class ConsumeConcurrentlyContext {
private final MessageQueue messageQueue;
@@ -38,27 +36,22 @@ public class ConsumeConcurrentlyContext {
this.messageQueue = messageQueue;
}
-
public int getDelayLevelWhenNextConsume() {
return delayLevelWhenNextConsume;
}
-
public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
}
-
public MessageQueue getMessageQueue() {
return messageQueue;
}
-
public int getAckIndex() {
return ackIndex;
}
-
public void setAckIndex(int ackIndex) {
this.ackIndex = ackIndex;
}