You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:15:17 UTC
[86/99] [abbrv] incubator-rocketmq git commit: ROCKETMQ-18 Reformat
all codes.
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
index 3cc2fdf..9fd1b34 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
@@ -6,36 +6,32 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.impl;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.remoting.RPCHook;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-
public class MQClientManager {
private static MQClientManager instance = new MQClientManager();
private AtomicInteger factoryIndexGenerator = new AtomicInteger();
private ConcurrentHashMap<String/* clientId */, MQClientInstance> factoryTable =
- new ConcurrentHashMap<String, MQClientInstance>();
-
+ new ConcurrentHashMap<String, MQClientInstance>();
private MQClientManager() {
}
-
public static MQClientManager getInstance() {
return instance;
}
@@ -49,8 +45,8 @@ public class MQClientManager {
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance =
- new MQClientInstance(clientConfig.cloneClientConfig(),
- this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
+ new MQClientInstance(clientConfig.cloneClientConfig(),
+ this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
index e02bd4f..e7a6ca3 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
@@ -16,6 +16,19 @@
*/
package org.apache.rocketmq.client.impl.consumer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@@ -35,10 +48,6 @@ import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
-import java.util.*;
-import java.util.concurrent.*;
-
-
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
private static final Logger log = ClientLogger.getLog();
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
@@ -51,9 +60,8 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
private final ScheduledExecutorService scheduledExecutorService;
private final ScheduledExecutorService cleanExpireMsgExecutors;
-
public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
- MessageListenerConcurrently messageListener) {
+ MessageListenerConcurrently messageListener) {
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener;
@@ -62,18 +70,17 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
this.consumeExecutor = new ThreadPoolExecutor(//
- this.defaultMQPushConsumer.getConsumeThreadMin(), //
- this.defaultMQPushConsumer.getConsumeThreadMax(), //
- 1000 * 60, //
- TimeUnit.MILLISECONDS, //
- this.consumeRequestQueue, //
- new ThreadFactoryImpl("ConsumeMessageThread_"));
+ this.defaultMQPushConsumer.getConsumeThreadMin(), //
+ this.defaultMQPushConsumer.getConsumeThreadMax(), //
+ 1000 * 60, //
+ TimeUnit.MILLISECONDS, //
+ this.consumeRequestQueue, //
+ new ThreadFactoryImpl("ConsumeMessageThread_"));
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
}
-
public void start() {
this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
@@ -85,7 +92,6 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
}
-
public void shutdown() {
this.scheduledExecutorService.shutdown();
this.consumeExecutor.shutdown();
@@ -95,8 +101,8 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
@Override
public void updateCorePoolSize(int corePoolSize) {
if (corePoolSize > 0 //
- && corePoolSize <= Short.MAX_VALUE //
- && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
+ && corePoolSize <= Short.MAX_VALUE //
+ && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
this.consumeExecutor.setCorePoolSize(corePoolSize);
}
}
@@ -180,10 +186,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
result.setRemark(RemotingHelper.exceptionSimpleDesc(e));
log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", //
- RemotingHelper.exceptionSimpleDesc(e), //
- ConsumeMessageConcurrentlyService.this.consumerGroup, //
- msgs, //
- mq), e);
+ RemotingHelper.exceptionSimpleDesc(e), //
+ ConsumeMessageConcurrentlyService.this.consumerGroup, //
+ msgs, //
+ mq), e);
}
result.setSpentTimeMills(System.currentTimeMillis() - beginTime);
@@ -195,10 +201,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
@Override
public void submitConsumeRequest(//
- final List<MessageExt> msgs, //
- final ProcessQueue processQueue, //
- final MessageQueue messageQueue, //
- final boolean dispatchToConsume) {
+ final List<MessageExt> msgs, //
+ final ProcessQueue processQueue, //
+ final MessageQueue messageQueue, //
+ final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
@@ -244,7 +250,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
private void cleanExpireMsg() {
Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =
- this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();
+ this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();
while (it.hasNext()) {
Map.Entry<MessageQueue, ProcessQueue> next = it.next();
ProcessQueue pq = next.getValue();
@@ -253,9 +259,9 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
}
public void processConsumeResult(//
- final ConsumeConcurrentlyStatus status, //
- final ConsumeConcurrentlyContext context, //
- final ConsumeRequest consumeRequest//
+ final ConsumeConcurrentlyStatus status, //
+ final ConsumeConcurrentlyContext context, //
+ final ConsumeRequest consumeRequest//
) {
int ackIndex = context.getAckIndex();
@@ -275,7 +281,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
- consumeRequest.getMsgs().size());
+ consumeRequest.getMsgs().size());
break;
default:
break;
@@ -333,9 +339,9 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
}
private void submitConsumeRequestLater(//
- final List<MessageExt> msgs, //
- final ProcessQueue processQueue, //
- final MessageQueue messageQueue//
+ final List<MessageExt> msgs, //
+ final ProcessQueue processQueue, //
+ final MessageQueue messageQueue//
) {
this.scheduledExecutorService.schedule(new Runnable() {
@@ -364,7 +370,6 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;
-
public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) {
this.msgs = msgs;
this.processQueue = processQueue;
@@ -414,10 +419,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
- RemotingHelper.exceptionSimpleDesc(e), //
- ConsumeMessageConcurrentlyService.this.consumerGroup,
- msgs,
- messageQueue);
+ RemotingHelper.exceptionSimpleDesc(e), //
+ ConsumeMessageConcurrentlyService.this.consumerGroup,
+ msgs,
+ messageQueue);
hasException = true;
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
@@ -437,9 +442,9 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
if (null == status) {
log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
- ConsumeMessageConcurrentlyService.this.consumerGroup,
- msgs,
- messageQueue);
+ ConsumeMessageConcurrentlyService.this.consumerGroup,
+ msgs,
+ messageQueue);
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
@@ -450,7 +455,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
}
ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
- .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
+ .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
@@ -463,6 +468,5 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
return messageQueue;
}
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
index f6a1e4d..3def223 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -16,35 +16,42 @@
*/
package org.apache.rocketmq.client.impl.consumer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.stat.ConsumerStatsManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.message.*;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.CMResult;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.slf4j.Logger;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.*;
-
-
public class ConsumeMessageOrderlyService implements ConsumeMessageService {
private static final Logger log = ClientLogger.getLog();
private final static long MAX_TIME_CONSUME_CONTINUOUSLY =
- Long.parseLong(System.getProperty("rocketmq.client.maxTimeConsumeContinuously", "60000"));
+ Long.parseLong(System.getProperty("rocketmq.client.maxTimeConsumeContinuously", "60000"));
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
private final DefaultMQPushConsumer defaultMQPushConsumer;
private final MessageListenerOrderly messageListener;
@@ -55,7 +62,6 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
private final ScheduledExecutorService scheduledExecutorService;
private volatile boolean stopped = false;
-
public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerOrderly messageListener) {
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener;
@@ -65,17 +71,16 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
this.consumeExecutor = new ThreadPoolExecutor(//
- this.defaultMQPushConsumer.getConsumeThreadMin(), //
- this.defaultMQPushConsumer.getConsumeThreadMax(), //
- 1000 * 60, //
- TimeUnit.MILLISECONDS, //
- this.consumeRequestQueue, //
- new ThreadFactoryImpl("ConsumeMessageThread_"));
+ this.defaultMQPushConsumer.getConsumeThreadMin(), //
+ this.defaultMQPushConsumer.getConsumeThreadMax(), //
+ 1000 * 60, //
+ TimeUnit.MILLISECONDS, //
+ this.consumeRequestQueue, //
+ new ThreadFactoryImpl("ConsumeMessageThread_"));
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
}
-
public void start() {
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@@ -87,7 +92,6 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
}
}
-
public void shutdown() {
this.stopped = true;
this.scheduledExecutorService.shutdown();
@@ -97,7 +101,6 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
}
}
-
public synchronized void unlockAllMQ() {
this.defaultMQPushConsumerImpl.getRebalanceImpl().unlockAll(false);
}
@@ -105,8 +108,8 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
@Override
public void updateCorePoolSize(int corePoolSize) {
if (corePoolSize > 0 //
- && corePoolSize <= Short.MAX_VALUE //
- && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
+ && corePoolSize <= Short.MAX_VALUE //
+ && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
this.consumeExecutor.setCorePoolSize(corePoolSize);
}
}
@@ -169,10 +172,10 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
result.setRemark(RemotingHelper.exceptionSimpleDesc(e));
log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", //
- RemotingHelper.exceptionSimpleDesc(e), //
- ConsumeMessageOrderlyService.this.consumerGroup, //
- msgs, //
- mq), e);
+ RemotingHelper.exceptionSimpleDesc(e), //
+ ConsumeMessageOrderlyService.this.consumerGroup, //
+ msgs, //
+ mq), e);
}
result.setAutoCommit(context.isAutoCommit());
@@ -185,10 +188,10 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
@Override
public void submitConsumeRequest(//
- final List<MessageExt> msgs, //
- final ProcessQueue processQueue, //
- final MessageQueue messageQueue, //
- final boolean dispathToConsume) {
+ final List<MessageExt> msgs, //
+ final ProcessQueue processQueue, //
+ final MessageQueue messageQueue, //
+ final boolean dispathToConsume) {
if (dispathToConsume) {
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
@@ -224,9 +227,9 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
}
private void submitConsumeRequestLater(//
- final ProcessQueue processQueue, //
- final MessageQueue messageQueue, //
- final long suspendTimeMillis//
+ final ProcessQueue processQueue, //
+ final MessageQueue messageQueue, //
+ final long suspendTimeMillis//
) {
long timeMillis = suspendTimeMillis;
if (timeMillis == -1) {
@@ -249,10 +252,10 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
}
public boolean processConsumeResult(//
- final List<MessageExt> msgs, //
- final ConsumeOrderlyStatus status, //
- final ConsumeOrderlyContext context, //
- final ConsumeRequest consumeRequest//
+ final List<MessageExt> msgs, //
+ final ConsumeOrderlyStatus status, //
+ final ConsumeOrderlyContext context, //
+ final ConsumeRequest consumeRequest//
) {
boolean continueConsume = true;
long commitOffset = -1L;
@@ -261,7 +264,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
case COMMIT:
case ROLLBACK:
log.warn("the message queue consume result is illegal, we think you want to ack these message {}",
- consumeRequest.getMessageQueue());
+ consumeRequest.getMessageQueue());
case SUCCESS:
commitOffset = consumeRequest.getProcessQueue().commit();
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
@@ -271,9 +274,9 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
if (checkReconsumeTimes(msgs)) {
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
this.submitConsumeRequestLater(//
- consumeRequest.getProcessQueue(), //
- consumeRequest.getMessageQueue(), //
- context.getSuspendCurrentQueueTimeMillis());
+ consumeRequest.getProcessQueue(), //
+ consumeRequest.getMessageQueue(), //
+ context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
} else {
commitOffset = consumeRequest.getProcessQueue().commit();
@@ -293,9 +296,9 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
case ROLLBACK:
consumeRequest.getProcessQueue().rollback();
this.submitConsumeRequestLater(//
- consumeRequest.getProcessQueue(), //
- consumeRequest.getMessageQueue(), //
- context.getSuspendCurrentQueueTimeMillis());
+ consumeRequest.getProcessQueue(), //
+ consumeRequest.getMessageQueue(), //
+ context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
break;
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
@@ -303,9 +306,9 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
if (checkReconsumeTimes(msgs)) {
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
this.submitConsumeRequestLater(//
- consumeRequest.getProcessQueue(), //
- consumeRequest.getMessageQueue(), //
- context.getSuspendCurrentQueueTimeMillis());
+ consumeRequest.getProcessQueue(), //
+ consumeRequest.getMessageQueue(), //
+ context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
}
break;
@@ -379,7 +382,6 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;
-
public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
this.processQueue = processQueue;
this.messageQueue = messageQueue;
@@ -403,7 +405,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
- || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
+ || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) {
if (this.processQueue.isDropped()) {
@@ -412,14 +414,14 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
}
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
- && !this.processQueue.isLocked()) {
+ && !this.processQueue.isLocked()) {
log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
- && this.processQueue.isLockExpired()) {
+ && this.processQueue.isLockExpired()) {
log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
@@ -432,7 +434,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
}
final int consumeBatchSize =
- ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
+ ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
if (!msgs.isEmpty()) {
@@ -444,7 +446,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext
- .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
+ .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
@@ -460,29 +462,29 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
this.processQueue.getLockConsume().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
- this.messageQueue);
+ this.messageQueue);
break;
}
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", //
- RemotingHelper.exceptionSimpleDesc(e), //
- ConsumeMessageOrderlyService.this.consumerGroup, //
- msgs, //
- messageQueue);
+ RemotingHelper.exceptionSimpleDesc(e), //
+ ConsumeMessageOrderlyService.this.consumerGroup, //
+ msgs, //
+ messageQueue);
hasException = true;
} finally {
this.processQueue.getLockConsume().unlock();
}
if (null == status //
- || ConsumeOrderlyStatus.ROLLBACK == status//
- || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
+ || ConsumeOrderlyStatus.ROLLBACK == status//
+ || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", //
- ConsumeMessageOrderlyService.this.consumerGroup, //
- msgs, //
- messageQueue);
+ ConsumeMessageOrderlyService.this.consumerGroup, //
+ msgs, //
+ messageQueue);
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
@@ -507,12 +509,12 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext
- .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
+ .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
ConsumeMessageOrderlyService.this.getConsumerStatsManager()
- .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
+ .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
@@ -530,7 +532,6 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
}
}
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
index 3dc768c..a59ab98 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
@@ -6,48 +6,39 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
+import java.util.List;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
-import java.util.List;
-
-
public interface ConsumeMessageService {
void start();
-
void shutdown();
-
void updateCorePoolSize(int corePoolSize);
-
void incCorePoolSize();
-
void decCorePoolSize();
-
int getCorePoolSize();
-
ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
-
void submitConsumeRequest(//
- final List<MessageExt> msgs, //
- final ProcessQueue processQueue, //
- final MessageQueue messageQueue, //
- final boolean dispathToConsume);
+ final List<MessageExt> msgs, //
+ final ProcessQueue processQueue, //
+ final MessageQueue messageQueue, //
+ final boolean dispathToConsume);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index f216533..7c1b4d6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -16,6 +16,13 @@
*/
package org.apache.rocketmq.client.impl.consumer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
@@ -41,7 +48,11 @@ import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
-import org.apache.rocketmq.common.message.*;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
@@ -52,10 +63,6 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-
-
public class DefaultMQPullConsumerImpl implements MQConsumerInner {
private final Logger log = ClientLogger.getLog();
private final DefaultMQPullConsumer defaultMQPullConsumer;
@@ -69,7 +76,6 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
private OffsetStore offsetStore;
private RebalanceImpl rebalanceImpl = new RebalancePullImpl(this);
-
public DefaultMQPullConsumerImpl(final DefaultMQPullConsumer defaultMQPullConsumer, final RPCHook rpcHook) {
this.defaultMQPullConsumer = defaultMQPullConsumer;
this.rpcHook = rpcHook;
@@ -92,9 +98,9 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
private void makeSureStateOK() throws MQClientException {
if (this.serviceState != ServiceState.RUNNING) {
throw new MQClientException("The consumer service state not OK, "//
- + this.serviceState//
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
- null);
+ + this.serviceState//
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
+ null);
}
}
@@ -146,17 +152,17 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
}
public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return pull(mq, subExpression, offset, maxNums, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis());
}
public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.pullSyncImpl(mq, subExpression, offset, maxNums, false, timeout);
}
private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block, long timeout)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
if (null == mq) {
@@ -179,7 +185,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
SubscriptionData subscriptionData;
try {
subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
- mq.getTopic(), subExpression);
+ mq.getTopic(), subExpression);
} catch (Exception e) {
throw new MQClientException("parse subscription error", e);
}
@@ -187,17 +193,17 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(//
- mq, // 1
- subscriptionData.getSubString(), // 2
- 0L, // 3
- offset, // 4
- maxNums, // 5
- sysFlag, // 6
- 0, // 7
- this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8
- timeoutMillis, // 9
- CommunicationMode.SYNC, // 10
- null// 11
+ mq, // 1
+ subscriptionData.getSubString(), // 2
+ 0L, // 3
+ offset, // 4
+ maxNums, // 5
+ sysFlag, // 6
+ 0, // 7
+ this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8
+ timeoutMillis, // 9
+ CommunicationMode.SYNC, // 10
+ null// 11
);
this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
if (!this.consumeMessageHookList.isEmpty()) {
@@ -219,7 +225,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) {
try {
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
- topic, SubscriptionData.SUB_ALL);
+ topic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData);
} catch (Exception e) {
}
@@ -357,23 +363,23 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
}
public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback)
- throws MQClientException, RemotingException, InterruptedException {
+ throws MQClientException, RemotingException, InterruptedException {
pull(mq, subExpression, offset, maxNums, pullCallback, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis());
}
public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback, long timeout)
- throws MQClientException, RemotingException, InterruptedException {
+ throws MQClientException, RemotingException, InterruptedException {
this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, false, timeout);
}
private void pullAsyncImpl(//
- final MessageQueue mq, //
- final String subExpression, //
- final long offset, //
- final int maxNums, //
- final PullCallback pullCallback, //
- final boolean block, //
- final long timeout) throws MQClientException, RemotingException, InterruptedException {
+ final MessageQueue mq, //
+ final String subExpression, //
+ final long offset, //
+ final int maxNums, //
+ final PullCallback pullCallback, //
+ final boolean block, //
+ final long timeout) throws MQClientException, RemotingException, InterruptedException {
this.makeSureStateOK();
if (null == mq) {
@@ -400,7 +406,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
final SubscriptionData subscriptionData;
try {
subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
- mq.getTopic(), subExpression);
+ mq.getTopic(), subExpression);
} catch (Exception e) {
throw new MQClientException("parse subscription error", e);
}
@@ -408,36 +414,36 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
this.pullAPIWrapper.pullKernelImpl(//
- mq, // 1
- subscriptionData.getSubString(), // 2
- 0L, // 3
- offset, // 4
- maxNums, // 5
- sysFlag, // 6
- 0, // 7
- this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8
- timeoutMillis, // 9
- CommunicationMode.ASYNC, // 10
- new PullCallback() {
-
- @Override
- public void onSuccess(PullResult pullResult) {
- pullCallback
- .onSuccess(DefaultMQPullConsumerImpl.this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData));
- }
-
- @Override
- public void onException(Throwable e) {
- pullCallback.onException(e);
- }
- });
+ mq, // 1
+ subscriptionData.getSubString(), // 2
+ 0L, // 3
+ offset, // 4
+ maxNums, // 5
+ sysFlag, // 6
+ 0, // 7
+ this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8
+ timeoutMillis, // 9
+ CommunicationMode.ASYNC, // 10
+ new PullCallback() {
+
+ @Override
+ public void onSuccess(PullResult pullResult) {
+ pullCallback
+ .onSuccess(DefaultMQPullConsumerImpl.this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData));
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ pullCallback.onException(e);
+ }
+ });
} catch (MQBrokerException e) {
throw new MQClientException("pullAsync unknow exception", e);
}
}
public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
- throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
}
@@ -446,19 +452,19 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
}
public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback)
- throws MQClientException, RemotingException, InterruptedException {
+ throws MQClientException, RemotingException, InterruptedException {
this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, true,
- this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
+ this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
}
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
- throws MQClientException, InterruptedException {
+ throws MQClientException, InterruptedException {
this.makeSureStateOK();
return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end);
}
public MessageExt queryMessageByUniqKey(String topic, String uniqKey)
- throws MQClientException, InterruptedException {
+ throws MQClientException, InterruptedException {
this.makeSureStateOK();
return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey);
}
@@ -469,27 +475,27 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
}
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
sendMessageBack(msg, delayLevel, brokerName, this.defaultMQPullConsumer.getConsumerGroup());
}
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
- MQBrokerException, InterruptedException, MQClientException {
+ MQBrokerException, InterruptedException, MQClientException {
this.offsetStore.updateConsumeOffsetToBroker(mq, offset, isOneway);
}
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, String consumerGroup)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
- : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
+ : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
if (UtilAll.isBlank(consumerGroup)) {
consumerGroup = this.defaultMQPullConsumer.getConsumerGroup();
}
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, 3000,
- this.defaultMQPullConsumer.getMaxReconsumeTimes());
+ this.defaultMQPullConsumer.getMaxReconsumeTimes());
} catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultMQPullConsumer.getConsumerGroup(), e);
@@ -545,8 +551,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(//
- mQClientFactory, //
- this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());
+ mQClientFactory, //
+ this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPullConsumer.getOffsetStore() != null) {
@@ -571,8 +577,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup()
- + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
- null);
+ + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
+ null);
}
mQClientFactory.start();
@@ -583,9 +589,9 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PullConsumer service state not OK, maybe started once, "//
- + this.serviceState//
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
- null);
+ + this.serviceState//
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
+ null);
default:
break;
}
@@ -598,43 +604,43 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
// consumerGroup
if (null == this.defaultMQPullConsumer.getConsumerGroup()) {
throw new MQClientException(
- "consumerGroup is null" //
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
- null);
+ "consumerGroup is null" //
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
+ null);
}
// consumerGroup
if (this.defaultMQPullConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) {
throw new MQClientException(
- "consumerGroup can not equal "//
- + MixAll.DEFAULT_CONSUMER_GROUP //
- + ", please specify another one."//
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
- null);
+ "consumerGroup can not equal "//
+ + MixAll.DEFAULT_CONSUMER_GROUP //
+ + ", please specify another one."//
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
+ null);
}
// messageModel
if (null == this.defaultMQPullConsumer.getMessageModel()) {
throw new MQClientException(
- "messageModel is null" //
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
- null);
+ "messageModel is null" //
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
+ null);
}
// allocateMessageQueueStrategy
if (null == this.defaultMQPullConsumer.getAllocateMessageQueueStrategy()) {
throw new MQClientException(
- "allocateMessageQueueStrategy is null" //
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
- null);
+ "allocateMessageQueueStrategy is null" //
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
+ null);
}
// allocateMessageQueueStrategy
if (this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() < this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis()) {
throw new MQClientException(
- "Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis" //
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
- null);
+ "Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis" //
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
+ null);
}
}
@@ -644,7 +650,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
if (registerTopics != null) {
for (final String topic : registerTopics) {
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
- topic, SubscriptionData.SUB_ALL);
+ topic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
}
@@ -696,7 +702,6 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
return consumerStartTimestamp;
}
-
public RebalanceImpl getRebalanceImpl() {
return rebalanceImpl;
}