You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:14:29 UTC
[38/99] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18
Rename package name from com.alibaba to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPullConsumer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPullConsumer.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPullConsumer.java
deleted file mode 100644
index 1125d09..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPullConsumer.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer;
-
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.remoting.exception.RemotingException;
-
-import java.util.Set;
-
-
-/**
- * Pulling consumer interface
- *
- * @author shijia.wxr
- */
-public interface MQPullConsumer extends MQConsumer {
- /**
- * Start the consumer
- *
- * @throws MQClientException
- */
- void start() throws MQClientException;
-
-
- /**
- * Shutdown the consumer
- */
- void shutdown();
-
-
- /**
- * Register the message queue listener
- *
- * @param topic
- * @param listener
- */
- void registerMessageQueueListener(final String topic, final MessageQueueListener listener);
-
-
- /**
- * Pulling the messages,not blocking
- *
- * @param mq
- * from which message queue
- * @param subExpression
- * subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br>
- * if null or * expression,meaning subscribe all
- * @param offset
- * from where to pull
- * @param maxNums
- * max pulling numbers
- *
- * @return The resulting {@code PullRequest}
- *
- * @throws MQClientException
- * @throws InterruptedException
- * @throws MQBrokerException
- * @throws RemotingException
- */
- PullResult pull(final MessageQueue mq, final String subExpression, final long offset,
- final int maxNums) throws MQClientException, RemotingException, MQBrokerException,
- InterruptedException;
-
-
- /**
- * Pulling the messages in the specified timeout
- *
- * @param mq
- * @param subExpression
- * @param offset
- * @param maxNums
- * @param timeout
- *
- * @return The resulting {@code PullRequest}
- *
- * @throws MQClientException
- * @throws RemotingException
- * @throws MQBrokerException
- * @throws InterruptedException
- */
- PullResult pull(final MessageQueue mq, final String subExpression, final long offset,
- final int maxNums, final long timeout) throws MQClientException, RemotingException,
- MQBrokerException, InterruptedException;
-
-
- /**
- * Pulling the messages in a async. way
- *
- * @param mq
- * @param subExpression
- * @param offset
- * @param maxNums
- * @param pullCallback
- *
- * @throws MQClientException
- * @throws RemotingException
- * @throws InterruptedException
- */
- void pull(final MessageQueue mq, final String subExpression, final long offset, final int maxNums,
- final PullCallback pullCallback) throws MQClientException, RemotingException,
- InterruptedException;
-
- /**
- * Pulling the messages in a async. way
- *
- * @param mq
- * @param subExpression
- * @param offset
- * @param maxNums
- * @param pullCallback
- * @param timeout
- *
- * @throws MQClientException
- * @throws RemotingException
- * @throws InterruptedException
- */
- void pull(final MessageQueue mq, final String subExpression, final long offset, final int maxNums,
- final PullCallback pullCallback, long timeout) throws MQClientException, RemotingException,
- InterruptedException;
-
-
- /**
- * Pulling the messages,if no message arrival,blocking some time
- *
- * @param mq
- * @param subExpression
- * @param offset
- * @param maxNums
- *
- * @return The resulting {@code PullRequest}
- *
- * @throws MQClientException
- * @throws RemotingException
- * @throws MQBrokerException
- * @throws InterruptedException
- */
- PullResult pullBlockIfNotFound(final MessageQueue mq, final String subExpression,
- final long offset, final int maxNums) throws MQClientException, RemotingException,
- MQBrokerException, InterruptedException;
-
-
- /**
- * Pulling the messages through callback function,if no message arrival,blocking.
- *
- * @param mq
- * @param subExpression
- * @param offset
- * @param maxNums
- * @param pullCallback
- *
- * @throws MQClientException
- * @throws RemotingException
- * @throws InterruptedException
- */
- void pullBlockIfNotFound(final MessageQueue mq, final String subExpression, final long offset,
- final int maxNums, final PullCallback pullCallback) throws MQClientException, RemotingException,
- InterruptedException;
-
-
- /**
- * Update the offset
- *
- * @param mq
- * @param offset
- *
- * @throws MQClientException
- */
- void updateConsumeOffset(final MessageQueue mq, final long offset) throws MQClientException;
-
-
- /**
- * Fetch the offset
- *
- * @param mq
- * @param fromStore
- *
- * @return The fetched offset of given queue
- *
- * @throws MQClientException
- */
- long fetchConsumeOffset(final MessageQueue mq, final boolean fromStore) throws MQClientException;
-
-
- /**
- * Fetch the message queues according to the topic
- *
- * @param topic
- * message topic
- *
- * @return message queue set
- *
- * @throws MQClientException
- */
- Set<MessageQueue> fetchMessageQueuesInBalance(final String topic) throws MQClientException;
-
- /**
- * If consuming failure,message will be send back to the broker,and delay consuming in some time later.<br>
- * Mind! message can only be consumed in the same group.
- *
- * @param msg
- * @param delayLevel
- * @param brokerName
- * @param consumerGroup
- *
- * @throws RemotingException
- * @throws MQBrokerException
- * @throws InterruptedException
- * @throws MQClientException
- */
- void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPullConsumerScheduleService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPullConsumerScheduleService.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPullConsumerScheduleService.java
deleted file mode 100644
index d68b559..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPullConsumerScheduleService.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer;
-
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.common.ThreadFactoryImpl;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
-import org.slf4j.Logger;
-
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-
-/**
- * Schedule service for pull consumer
- *
- * @author shijia.wxr
- */
-public class MQPullConsumerScheduleService {
- private final Logger log = ClientLogger.getLog();
- private final MessageQueueListener messageQueueListener = new MessageQueueListenerImpl();
- private final ConcurrentHashMap<MessageQueue, PullTaskImpl> taskTable =
- new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
- private DefaultMQPullConsumer defaultMQPullConsumer;
- private int pullThreadNums = 20;
- private ConcurrentHashMap<String /* topic */, PullTaskCallback> callbackTable =
- new ConcurrentHashMap<String, PullTaskCallback>();
- private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
-
- public MQPullConsumerScheduleService(final String consumerGroup) {
- this.defaultMQPullConsumer = new DefaultMQPullConsumer(consumerGroup);
- this.defaultMQPullConsumer.setMessageModel(MessageModel.CLUSTERING);
- }
-
- public void putTask(String topic, Set<MessageQueue> mqNewSet) {
- Iterator<Entry<MessageQueue, PullTaskImpl>> it = this.taskTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<MessageQueue, PullTaskImpl> next = it.next();
- if (next.getKey().getTopic().equals(topic)) {
- if (!mqNewSet.contains(next.getKey())) {
- next.getValue().setCancelled(true);
- it.remove();
- }
- }
- }
-
- for (MessageQueue mq : mqNewSet) {
- if (!this.taskTable.containsKey(mq)) {
- PullTaskImpl command = new PullTaskImpl(mq);
- this.taskTable.put(mq, command);
- this.scheduledThreadPoolExecutor.schedule(command, 0, TimeUnit.MILLISECONDS);
-
- }
- }
- }
-
- public void start() throws MQClientException {
- final String group = this.defaultMQPullConsumer.getConsumerGroup();
- this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
- this.pullThreadNums,
- new ThreadFactoryImpl("PullMsgThread-" + group)
- );
-
- this.defaultMQPullConsumer.setMessageQueueListener(this.messageQueueListener);
-
- this.defaultMQPullConsumer.start();
-
- log.info("MQPullConsumerScheduleService start OK, {} {}",
- this.defaultMQPullConsumer.getConsumerGroup(), this.callbackTable);
- }
-
- public void registerPullTaskCallback(final String topic, final PullTaskCallback callback) {
- this.callbackTable.put(topic, callback);
- this.defaultMQPullConsumer.registerMessageQueueListener(topic, null);
- }
-
- public void shutdown() {
- if (this.scheduledThreadPoolExecutor != null) {
- this.scheduledThreadPoolExecutor.shutdown();
- }
-
- if (this.defaultMQPullConsumer != null) {
- this.defaultMQPullConsumer.shutdown();
- }
- }
-
- public ConcurrentHashMap<String, PullTaskCallback> getCallbackTable() {
- return callbackTable;
- }
-
- public void setCallbackTable(ConcurrentHashMap<String, PullTaskCallback> callbackTable) {
- this.callbackTable = callbackTable;
- }
-
- public int getPullThreadNums() {
- return pullThreadNums;
- }
-
- public void setPullThreadNums(int pullThreadNums) {
- this.pullThreadNums = pullThreadNums;
- }
-
- public DefaultMQPullConsumer getDefaultMQPullConsumer() {
- return defaultMQPullConsumer;
- }
-
- public void setDefaultMQPullConsumer(DefaultMQPullConsumer defaultMQPullConsumer) {
- this.defaultMQPullConsumer = defaultMQPullConsumer;
- }
-
- public MessageModel getMessageModel() {
- return this.defaultMQPullConsumer.getMessageModel();
- }
-
- public void setMessageModel(MessageModel messageModel) {
- this.defaultMQPullConsumer.setMessageModel(messageModel);
- }
-
- class MessageQueueListenerImpl implements MessageQueueListener {
- @Override
- public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
- MessageModel messageModel =
- MQPullConsumerScheduleService.this.defaultMQPullConsumer.getMessageModel();
- switch (messageModel) {
- case BROADCASTING:
- MQPullConsumerScheduleService.this.putTask(topic, mqAll);
- break;
- case CLUSTERING:
- MQPullConsumerScheduleService.this.putTask(topic, mqDivided);
- break;
- default:
- break;
- }
- }
- }
-
- class PullTaskImpl implements Runnable {
- private final MessageQueue messageQueue;
- private volatile boolean cancelled = false;
-
-
- public PullTaskImpl(final MessageQueue messageQueue) {
- this.messageQueue = messageQueue;
- }
-
-
- @Override
- public void run() {
- String topic = this.messageQueue.getTopic();
- if (!this.isCancelled()) {
- PullTaskCallback pullTaskCallback =
- MQPullConsumerScheduleService.this.callbackTable.get(topic);
- if (pullTaskCallback != null) {
- final PullTaskContext context = new PullTaskContext();
- context.setPullConsumer(MQPullConsumerScheduleService.this.defaultMQPullConsumer);
- try {
- pullTaskCallback.doPullTask(this.messageQueue, context);
- } catch (Throwable e) {
- context.setPullNextDelayTimeMillis(1000);
- log.error("doPullTask Exception", e);
- }
-
- if (!this.isCancelled()) {
- MQPullConsumerScheduleService.this.scheduledThreadPoolExecutor.schedule(this,
- context.getPullNextDelayTimeMillis(), TimeUnit.MILLISECONDS);
- } else {
- log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue);
- }
- } else {
- log.warn("Pull Task Callback not exist , {}", topic);
- }
- } else {
- log.warn("The Pull Task is cancelled, {}", messageQueue);
- }
- }
-
-
- public boolean isCancelled() {
- return cancelled;
- }
-
-
- public void setCancelled(boolean cancelled) {
- this.cancelled = cancelled;
- }
-
-
- public MessageQueue getMessageQueue() {
- return messageQueue;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPushConsumer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPushConsumer.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPushConsumer.java
deleted file mode 100644
index e47739d..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/MQPushConsumer.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer;
-
-import com.alibaba.rocketmq.client.consumer.listener.MessageListener;
-import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-
-
-/**
- * Push consumer
- *
- * @author shijia.wxr
- */
-public interface MQPushConsumer extends MQConsumer {
- /**
- * Start the consumer
- *
- * @throws MQClientException
- */
- void start() throws MQClientException;
-
-
- /**
- * Shutdown the consumer
- */
- void shutdown();
-
-
- /**
- * Register the message listener
- *
- * @param messageListener
- */
- @Deprecated
- void registerMessageListener(MessageListener messageListener);
-
-
- void registerMessageListener(final MessageListenerConcurrently messageListener);
-
-
- void registerMessageListener(final MessageListenerOrderly messageListener);
-
-
- /**
- * Subscribe some topic
- *
- * @param topic
- * @param subExpression
- * subscription expression.it only support or operation such as
- * "tag1 || tag2 || tag3" <br>
- * if null or * expression,meaning subscribe all
- *
- * @throws MQClientException
- */
- void subscribe(final String topic, final String subExpression) throws MQClientException;
-
-
- /**
- * Subscribe some topic
- *
- * @param topic
- * @param fullClassName
- * full class name,must extend
- * com.alibaba.rocketmq.common.filter. MessageFilter
- * @param filterClassSource
- * class source code,used UTF-8 file encoding,must be responsible
- * for your code safety
- *
- * @throws MQClientException
- */
- void subscribe(final String topic, final String fullClassName, final String filterClassSource) throws MQClientException;
-
-
- /**
- * Unsubscribe consumption some topic
- *
- * @param topic
- * message topic
- */
- void unsubscribe(final String topic);
-
-
- /**
- * Update the consumer thread pool size Dynamically
- *
- * @param corePoolSize
- */
- void updateCorePoolSize(int corePoolSize);
-
-
- /**
- * Suspend the consumption
- */
- void suspend();
-
-
- /**
- * Resume the consumption
- */
- void resume();
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/MessageQueueListener.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/MessageQueueListener.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/MessageQueueListener.java
deleted file mode 100644
index bb25a3a..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/MessageQueueListener.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer;
-
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-import java.util.Set;
-
-
-/**
- * A MessageQueueListener is implemented by the application and may be specified when a message queue changed
- *
- * @author shijia.wxr
- * @author vongosling
- */
-public interface MessageQueueListener {
- /**
- * @param topic
- * message topic
- * @param mqAll
- * all queues in this message topic
- * @param mqDivided
- * collection of queues,assigned to the current consumer
- */
- void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,
- final Set<MessageQueue> mqDivided);
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullCallback.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullCallback.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullCallback.java
deleted file mode 100644
index 545cff2..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullCallback.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer;
-
-/**
- * Async message pulling interface
- *
- * @author shijia.wxr
- */
-public interface PullCallback {
- public void onSuccess(final PullResult pullResult);
-
- public void onException(final Throwable e);
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullResult.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullResult.java
deleted file mode 100644
index b485243..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullResult.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer;
-
-import com.alibaba.rocketmq.common.message.MessageExt;
-
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- */
-public class PullResult {
- private final PullStatus pullStatus;
- private final long nextBeginOffset;
- private final long minOffset;
- private final long maxOffset;
- private List<MessageExt> msgFoundList;
-
-
- public PullResult(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset,
- List<MessageExt> msgFoundList) {
- super();
- this.pullStatus = pullStatus;
- this.nextBeginOffset = nextBeginOffset;
- this.minOffset = minOffset;
- this.maxOffset = maxOffset;
- this.msgFoundList = msgFoundList;
- }
-
-
- public PullStatus getPullStatus() {
- return pullStatus;
- }
-
-
- public long getNextBeginOffset() {
- return nextBeginOffset;
- }
-
-
- public long getMinOffset() {
- return minOffset;
- }
-
-
- public long getMaxOffset() {
- return maxOffset;
- }
-
-
- public List<MessageExt> getMsgFoundList() {
- return msgFoundList;
- }
-
-
- public void setMsgFoundList(List<MessageExt> msgFoundList) {
- this.msgFoundList = msgFoundList;
- }
-
-
- @Override
- public String toString() {
- return "PullResult [pullStatus=" + pullStatus + ", nextBeginOffset=" + nextBeginOffset
- + ", minOffset=" + minOffset + ", maxOffset=" + maxOffset + ", msgFoundList="
- + (msgFoundList == null ? 0 : msgFoundList.size()) + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullStatus.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullStatus.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullStatus.java
deleted file mode 100644
index 35166f3..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullStatus.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer;
-
-/**
- * @author shijia.wxr
- */
-public enum PullStatus {
- /**
- * Founded
- */
- FOUND,
- /**
- * No new message can be pull
- */
- NO_NEW_MSG,
- /**
- * Filtering results can not match
- */
- NO_MATCHED_MSG,
- /**
- * Illegal offset,may be too big or too small
- */
- OFFSET_ILLEGAL
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullTaskCallback.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullTaskCallback.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullTaskCallback.java
deleted file mode 100644
index 19d5bfc..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullTaskCallback.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer;
-
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-
-public interface PullTaskCallback {
- public void doPullTask(final MessageQueue mq, final PullTaskContext context);
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullTaskContext.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullTaskContext.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullTaskContext.java
deleted file mode 100644
index 72c57d6..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/PullTaskContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer;
-
-public class PullTaskContext {
-
- private int pullNextDelayTimeMillis = 200;
-
- private MQPullConsumer pullConsumer;
-
-
- public int getPullNextDelayTimeMillis() {
- return pullNextDelayTimeMillis;
- }
-
-
- public void setPullNextDelayTimeMillis(int pullNextDelayTimeMillis) {
- this.pullNextDelayTimeMillis = pullNextDelayTimeMillis;
- }
-
-
- public MQPullConsumer getPullConsumer() {
- return pullConsumer;
- }
-
-
- public void setPullConsumer(MQPullConsumer pullConsumer) {
- this.pullConsumer = pullConsumer;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java
deleted file mode 100644
index 36fcf19..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer.listener;
-
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-
-/**
- * Consumer concurrent consumption context
- *
- * @author shijia.wxr
- */
-public class ConsumeConcurrentlyContext {
- private final MessageQueue messageQueue;
- /**
- * Message consume retry strategy<br>
- * -1,no retry,put into DLQ directly<br>
- * 0,broker control retry frequency<br>
- * >0,client control retry frequency
- */
- private int delayLevelWhenNextConsume = 0;
- private int ackIndex = Integer.MAX_VALUE;
-
- public ConsumeConcurrentlyContext(MessageQueue messageQueue) {
- this.messageQueue = messageQueue;
- }
-
-
- public int getDelayLevelWhenNextConsume() {
- return delayLevelWhenNextConsume;
- }
-
-
- public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
- this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
- }
-
-
- public MessageQueue getMessageQueue() {
- return messageQueue;
- }
-
-
- public int getAckIndex() {
- return ackIndex;
- }
-
-
- public void setAckIndex(int ackIndex) {
- this.ackIndex = ackIndex;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java
deleted file mode 100644
index d0d3bf4..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeConcurrentlyStatus.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer.listener;
-
-/**
- * @author shijia.wxr
- */
-public enum ConsumeConcurrentlyStatus {
- /**
- * Success consumption
- */
- CONSUME_SUCCESS,
- /**
- * Failure consumption,later try to consume
- */
- RECONSUME_LATER;
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeOrderlyContext.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeOrderlyContext.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeOrderlyContext.java
deleted file mode 100644
index 26a3892..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeOrderlyContext.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer.listener;
-
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-
-/**
- * Consumer Orderly consumption context
- *
- * @author shijia.wxr
- */
-public class ConsumeOrderlyContext {
- private final MessageQueue messageQueue;
- private boolean autoCommit = true;
- private long suspendCurrentQueueTimeMillis = -1;
-
-
- public ConsumeOrderlyContext(MessageQueue messageQueue) {
- this.messageQueue = messageQueue;
- }
-
-
- public boolean isAutoCommit() {
- return autoCommit;
- }
-
-
- public void setAutoCommit(boolean autoCommit) {
- this.autoCommit = autoCommit;
- }
-
-
- public MessageQueue getMessageQueue() {
- return messageQueue;
- }
-
-
- public long getSuspendCurrentQueueTimeMillis() {
- return suspendCurrentQueueTimeMillis;
- }
-
-
- public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
- this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeOrderlyStatus.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeOrderlyStatus.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeOrderlyStatus.java
deleted file mode 100644
index e490c5c..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeOrderlyStatus.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer.listener;
-
-/**
- * @author shijia.wxr
- */
-public enum ConsumeOrderlyStatus {
- /**
- * Success consumption
- */
- SUCCESS,
- /**
- * Rollback consumption(only for binlog consumption)
- */
- @Deprecated
- ROLLBACK,
- /**
- * Commit offset(only for binlog consumption)
- */
- @Deprecated
- COMMIT,
- /**
- * Suspend current queue a moment
- */
- SUSPEND_CURRENT_QUEUE_A_MOMENT;
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeReturnType.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeReturnType.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeReturnType.java
deleted file mode 100644
index 44f998e..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/ConsumeReturnType.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.client.consumer.listener;
-
-/**
- * Created by alvin on 16-11-30.
- */
-public enum ConsumeReturnType {
- /**
- * consume return success
- */
- SUCCESS,
- /**
- * consume timeout ,even if success
- */
- TIME_OUT,
- /**
- * consume throw exception
- */
- EXCEPTION,
- /**
- * consume return null
- */
- RETURNNULL,
- /**
- * consume return failed
- */
- FAILED
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListener.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListener.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListener.java
deleted file mode 100644
index f34946e..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListener.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer.listener;
-
-/**
- * A MessageListener object is used to receive asynchronously delivered messages.
- *
- * @author shijia.wxr
- */
-public interface MessageListener {
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListenerConcurrently.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListenerConcurrently.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListenerConcurrently.java
deleted file mode 100644
index f0b0c61..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListenerConcurrently.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer.listener;
-
-import com.alibaba.rocketmq.common.message.MessageExt;
-
-import java.util.List;
-
-
-/**
- * A MessageListenerConcurrently object is used to receive asynchronously delivered messages concurrently
- *
- * @author shijia.wxr
- */
-public interface MessageListenerConcurrently extends MessageListener {
- /**
- * It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if consumption failure
- *
- * @param msgs
- * msgs.size() >= 1<br>
- * DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here
- * @param context
- *
- * @return The consume status
- */
- ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
- final ConsumeConcurrentlyContext context);
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListenerOrderly.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListenerOrderly.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListenerOrderly.java
deleted file mode 100644
index d30cdfa..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/listener/MessageListenerOrderly.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer.listener;
-
-import com.alibaba.rocketmq.common.message.MessageExt;
-
-import java.util.List;
-
-
-/**
- * A MessageListenerConcurrently object is used to receive asynchronously delivered messages orderly.one queue,one thread
- *
- * @author shijia.wxr
- */
-public interface MessageListenerOrderly extends MessageListener {
- /**
- * It is not recommend to throw exception,rather than returning ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT if consumption failure
- *
- * @param msgs
- * msgs.size() >= 1<br>
- * DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here
- * @param context
- *
- * @return The consume status
- */
- ConsumeOrderlyStatus consumeMessage(final List<MessageExt> msgs,
- final ConsumeOrderlyContext context);
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java
deleted file mode 100644
index 413d646..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer.rebalance;
-
-import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import org.slf4j.Logger;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * Average Hashing queue algorithm
- *
- * @author manhong.yqd
- */
-public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
- private final Logger log = ClientLogger.getLog();
-
- @Override
- public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
- List<String> cidAll) {
- if (currentCID == null || currentCID.length() < 1) {
- throw new IllegalArgumentException("currentCID is empty");
- }
- if (mqAll == null || mqAll.isEmpty()) {
- throw new IllegalArgumentException("mqAll is null or mqAll empty");
- }
- if (cidAll == null || cidAll.isEmpty()) {
- throw new IllegalArgumentException("cidAll is null or cidAll empty");
- }
-
- List<MessageQueue> result = new ArrayList<MessageQueue>();
- if (!cidAll.contains(currentCID)) {
- log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
- consumerGroup,
- currentCID,
- cidAll);
- return result;
- }
-
- int index = cidAll.indexOf(currentCID);
- int mod = mqAll.size() % cidAll.size();
- int averageSize =
- mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
- + 1 : mqAll.size() / cidAll.size());
- int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
- int range = Math.min(averageSize, mqAll.size() - startIndex);
- for (int i = 0; i < range; i++) {
- result.add(mqAll.get((startIndex + i) % mqAll.size()));
- }
- return result;
- }
-
- @Override
- public String getName() {
- return "AVG";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
deleted file mode 100644
index 17f4611..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer.rebalance;
-
-import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import org.slf4j.Logger;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * Cycle average Hashing queue algorithm
- *
- * @author manhong.yqd
- */
-public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {
- private final Logger log = ClientLogger.getLog();
-
- @Override
- public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
- List<String> cidAll) {
- if (currentCID == null || currentCID.length() < 1) {
- throw new IllegalArgumentException("currentCID is empty");
- }
- if (mqAll == null || mqAll.isEmpty()) {
- throw new IllegalArgumentException("mqAll is null or mqAll empty");
- }
- if (cidAll == null || cidAll.isEmpty()) {
- throw new IllegalArgumentException("cidAll is null or cidAll empty");
- }
-
- List<MessageQueue> result = new ArrayList<MessageQueue>();
- if (!cidAll.contains(currentCID)) {
- log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
- consumerGroup,
- currentCID,
- cidAll);
- return result;
- }
-
- int index = cidAll.indexOf(currentCID);
- for (int i = index; i < mqAll.size(); i++) {
- if (i % cidAll.size() == index) {
- result.add(mqAll.get(i));
- }
- }
- return result;
- }
-
- @Override
- public String getName() {
- return "AVG_BY_CIRCLE";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
deleted file mode 100644
index 783678c..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer.rebalance;
-
-import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- */
-public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy {
- private List<MessageQueue> messageQueueList;
-
- @Override
- public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
- List<String> cidAll) {
- return this.messageQueueList;
- }
-
- @Override
- public String getName() {
- return "CONFIG";
- }
-
- public List<MessageQueue> getMessageQueueList() {
- return messageQueueList;
- }
-
-
- public void setMessageQueueList(List<MessageQueue> messageQueueList) {
- this.messageQueueList = messageQueueList;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java
deleted file mode 100644
index 5464fe3..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer.rebalance;
-
-import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-
-/**
- * Computer room Hashing queue algorithm, such as Alipay logic room
- */
-public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy {
- private Set<String> consumeridcs;
-
- @Override
- public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
- List<String> cidAll) {
- List<MessageQueue> result = new ArrayList<MessageQueue>();
- int currentIndex = cidAll.indexOf(currentCID);
- if (currentIndex < 0) {
- return result;
- }
- List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
- for (MessageQueue mq : mqAll) {
- String[] temp = mq.getBrokerName().split("@");
- if (temp.length == 2 && consumeridcs.contains(temp[0])) {
- premqAll.add(mq);
- }
- }
- // Todo cid
- int mod = premqAll.size() / cidAll.size();
- int rem = premqAll.size() % cidAll.size();
- int startindex = mod * currentIndex;
- int endindex = startindex + mod;
- for (int i = startindex; i < endindex; i++) {
- result.add(mqAll.get(i));
- }
- if (rem > currentIndex) {
- result.add(premqAll.get(currentIndex + mod * cidAll.size()));
- }
- return result;
- }
-
- @Override
- public String getName() {
- return "MACHINE_ROOM";
- }
-
- public Set<String> getConsumeridcs() {
- return consumeridcs;
- }
-
-
- public void setConsumeridcs(Set<String> consumeridcs) {
- this.consumeridcs = consumeridcs;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/LocalFileOffsetStore.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/LocalFileOffsetStore.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/LocalFileOffsetStore.java
deleted file mode 100644
index 39aec12..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/LocalFileOffsetStore.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer.store;
-
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.UtilAll;
-import com.alibaba.rocketmq.common.help.FAQUrl;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.remoting.exception.RemotingException;
-import org.slf4j.Logger;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-
-/**
- * Local storage implementation
- *
- * @author shijia.wxr
- */
-public class LocalFileOffsetStore implements OffsetStore {
- public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
- "rocketmq.client.localOffsetStoreDir",
- System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
- private final static Logger log = ClientLogger.getLog();
- private final MQClientInstance mQClientFactory;
- private final String groupName;
- private final String storePath;
- private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable =
- new ConcurrentHashMap<MessageQueue, AtomicLong>();
-
-
- public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) {
- this.mQClientFactory = mQClientFactory;
- this.groupName = groupName;
- this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator + //
- this.mQClientFactory.getClientId() + File.separator + //
- this.groupName + File.separator + //
- "offsets.json";
- }
-
-
- @Override
- public void load() throws MQClientException {
- OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
- if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
- offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
-
- for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {
- AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
- log.info("load consumer's offset, {} {} {}",
- this.groupName,
- mq,
- offset.get());
- }
- }
- }
-
-
- @Override
- public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
- if (mq != null) {
- AtomicLong offsetOld = this.offsetTable.get(mq);
- if (null == offsetOld) {
- offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
- }
-
- if (null != offsetOld) {
- if (increaseOnly) {
- MixAll.compareAndIncreaseOnly(offsetOld, offset);
- } else {
- offsetOld.set(offset);
- }
- }
- }
- }
-
-
- @Override
- public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
- if (mq != null) {
- switch (type) {
- case MEMORY_FIRST_THEN_STORE:
- case READ_FROM_MEMORY: {
- AtomicLong offset = this.offsetTable.get(mq);
- if (offset != null) {
- return offset.get();
- } else if (ReadOffsetType.READ_FROM_MEMORY == type) {
- return -1;
- }
- }
- case READ_FROM_STORE: {
- OffsetSerializeWrapper offsetSerializeWrapper;
- try {
- offsetSerializeWrapper = this.readLocalOffset();
- } catch (MQClientException e) {
- return -1;
- }
- if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
- AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
- if (offset != null) {
- this.updateOffset(mq, offset.get(), false);
- return offset.get();
- }
- }
- }
- default:
- break;
- }
- }
-
- return -1;
- }
-
-
- @Override
- public void persistAll(Set<MessageQueue> mqs) {
- if (null == mqs || mqs.isEmpty())
- return;
-
- OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
- for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
- if (mqs.contains(entry.getKey())) {
- AtomicLong offset = entry.getValue();
- offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
- }
- }
-
- String jsonString = offsetSerializeWrapper.toJson(true);
- if (jsonString != null) {
- try {
- MixAll.string2File(jsonString, this.storePath);
- } catch (IOException e) {
- log.error("persistAll consumer offset Exception, " + this.storePath, e);
- }
- }
- }
-
-
- @Override
- public void persist(MessageQueue mq) {
- }
-
- @Override
- public void removeOffset(MessageQueue mq) {
-
- }
-
- @Override
- public void updateConsumeOffsetToBroker(final MessageQueue mq, final long offset, final boolean isOneway)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
-
- }
-
- @Override
- public Map<MessageQueue, Long> cloneOffsetTable(String topic) {
- Map<MessageQueue, Long> cloneOffsetTable = new HashMap<MessageQueue, Long>();
- for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
- MessageQueue mq = entry.getKey();
- if (!UtilAll.isBlank(topic) && !topic.equals(mq.getTopic())) {
- continue;
- }
- cloneOffsetTable.put(mq, entry.getValue().get());
-
- }
- return cloneOffsetTable;
- }
-
- private OffsetSerializeWrapper readLocalOffset() throws MQClientException {
- String content = MixAll.file2String(this.storePath);
- if (null == content || content.length() == 0) {
- return this.readLocalOffsetBak();
- } else {
- OffsetSerializeWrapper offsetSerializeWrapper = null;
- try {
- offsetSerializeWrapper =
- OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);
- } catch (Exception e) {
- log.warn("readLocalOffset Exception, and try to correct", e);
- return this.readLocalOffsetBak();
- }
-
- return offsetSerializeWrapper;
- }
- }
-
- private OffsetSerializeWrapper readLocalOffsetBak() throws MQClientException {
- String content = MixAll.file2String(this.storePath + ".bak");
- if (content != null && content.length() > 0) {
- OffsetSerializeWrapper offsetSerializeWrapper = null;
- try {
- offsetSerializeWrapper =
- OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);
- } catch (Exception e) {
- log.warn("readLocalOffset Exception", e);
- throw new MQClientException("readLocalOffset Exception, maybe fastjson version too low" //
- + FAQUrl.suggestTodo(FAQUrl.LOAD_JSON_EXCEPTION), //
- e);
- }
- return offsetSerializeWrapper;
- }
-
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/OffsetSerializeWrapper.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/OffsetSerializeWrapper.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/OffsetSerializeWrapper.java
deleted file mode 100644
index 4434b86..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/OffsetSerializeWrapper.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer.store;
-
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-
-/**
- * Wrapper class for offset serialization
- *
- * @author shijia.wxr
- */
-public class OffsetSerializeWrapper extends RemotingSerializable {
- private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable =
- new ConcurrentHashMap<MessageQueue, AtomicLong>();
-
- public ConcurrentHashMap<MessageQueue, AtomicLong> getOffsetTable() {
- return offsetTable;
- }
-
- public void setOffsetTable(ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable) {
- this.offsetTable = offsetTable;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/OffsetStore.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/OffsetStore.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/OffsetStore.java
deleted file mode 100644
index 346beb1..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/OffsetStore.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer.store;
-
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.remoting.exception.RemotingException;
-
-import java.util.Map;
-import java.util.Set;
-
-
-/**
- * Offset store interface
- *
- * @author shijia.wxr
- */
-public interface OffsetStore {
- /**
- * Load
- *
- * @throws MQClientException
- */
- void load() throws MQClientException;
-
-
- /**
- * Update the offset,store it in memory
- *
- * @param mq
- * @param offset
- * @param increaseOnly
- */
- void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly);
-
- /**
- * Get offset from local storage
- *
- * @param mq
- * @param type
- *
- * @return The fetched offset
- */
- long readOffset(final MessageQueue mq, final ReadOffsetType type);
-
- /**
- * Persist all offsets,may be in local storage or remote name server
- *
- * @param mqs
- */
- void persistAll(final Set<MessageQueue> mqs);
-
- /**
- * Persist the offset,may be in local storage or remote name server
- *
- * @param mq
- */
- void persist(final MessageQueue mq);
-
- /**
- * Remove offset
- *
- * @param mq
- */
- void removeOffset(MessageQueue mq);
-
- /**
- * @param topic
- *
- * @return The cloned offset table of given topic
- */
- Map<MessageQueue, Long> cloneOffsetTable(String topic);
-
- /**
- *
- * @param mq
- * @param offset
- * @param isOneway
- */
- void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
- MQBrokerException, InterruptedException, MQClientException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/ReadOffsetType.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/ReadOffsetType.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/ReadOffsetType.java
deleted file mode 100644
index 3691a62..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/ReadOffsetType.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer.store;
-
-public enum ReadOffsetType {
- /**
- * From memory
- */
- READ_FROM_MEMORY,
- /**
- * From storage
- */
- READ_FROM_STORE,
- /**
- * From memory,then from storage
- */
- MEMORY_FIRST_THEN_STORE;
-}