You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:14:19 UTC
[28/99] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18
Rename package name from com.alibaba to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
new file mode 100644
index 0000000..3784752
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.rebalance;
+
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy {
+ private List<MessageQueue> messageQueueList;
+
+ @Override
+ public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
+ List<String> cidAll) {
+ return this.messageQueueList;
+ }
+
+ @Override
+ public String getName() {
+ return "CONFIG";
+ }
+
+ public List<MessageQueue> getMessageQueueList() {
+ return messageQueueList;
+ }
+
+
+ public void setMessageQueueList(List<MessageQueue> messageQueueList) {
+ this.messageQueueList = messageQueueList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java
new file mode 100644
index 0000000..d3448c9
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.rebalance;
+
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * Computer room Hashing queue algorithm, such as Alipay logic room
+ */
+public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy {
+ private Set<String> consumeridcs;
+
+ @Override
+ public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
+ List<String> cidAll) {
+ List<MessageQueue> result = new ArrayList<MessageQueue>();
+ int currentIndex = cidAll.indexOf(currentCID);
+ if (currentIndex < 0) {
+ return result;
+ }
+ List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
+ for (MessageQueue mq : mqAll) {
+ String[] temp = mq.getBrokerName().split("@");
+ if (temp.length == 2 && consumeridcs.contains(temp[0])) {
+ premqAll.add(mq);
+ }
+ }
+ // Todo cid
+ int mod = premqAll.size() / cidAll.size();
+ int rem = premqAll.size() % cidAll.size();
+ int startindex = mod * currentIndex;
+ int endindex = startindex + mod;
+ for (int i = startindex; i < endindex; i++) {
+ result.add(mqAll.get(i));
+ }
+ if (rem > currentIndex) {
+ result.add(premqAll.get(currentIndex + mod * cidAll.size()));
+ }
+ return result;
+ }
+
+ @Override
+ public String getName() {
+ return "MACHINE_ROOM";
+ }
+
+ public Set<String> getConsumeridcs() {
+ return consumeridcs;
+ }
+
+
+ public void setConsumeridcs(Set<String> consumeridcs) {
+ this.consumeridcs = consumeridcs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
new file mode 100644
index 0000000..f4d87e7
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.store;
+
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.help.FAQUrl;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * Local storage implementation
+ *
+ * @author shijia.wxr
+ */
+public class LocalFileOffsetStore implements OffsetStore {
+ public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
+ "rocketmq.client.localOffsetStoreDir",
+ System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
+ private final static Logger log = ClientLogger.getLog();
+ private final MQClientInstance mQClientFactory;
+ private final String groupName;
+ private final String storePath;
+ private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable =
+ new ConcurrentHashMap<MessageQueue, AtomicLong>();
+
+
+ public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) {
+ this.mQClientFactory = mQClientFactory;
+ this.groupName = groupName;
+ this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator + //
+ this.mQClientFactory.getClientId() + File.separator + //
+ this.groupName + File.separator + //
+ "offsets.json";
+ }
+
+
+ @Override
+ public void load() throws MQClientException {
+ OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
+ if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
+ offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
+
+ for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {
+ AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
+ log.info("load consumer's offset, {} {} {}",
+ this.groupName,
+ mq,
+ offset.get());
+ }
+ }
+ }
+
+
+ @Override
+ public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
+ if (mq != null) {
+ AtomicLong offsetOld = this.offsetTable.get(mq);
+ if (null == offsetOld) {
+ offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
+ }
+
+ if (null != offsetOld) {
+ if (increaseOnly) {
+ MixAll.compareAndIncreaseOnly(offsetOld, offset);
+ } else {
+ offsetOld.set(offset);
+ }
+ }
+ }
+ }
+
+
+ @Override
+ public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
+ if (mq != null) {
+ switch (type) {
+ case MEMORY_FIRST_THEN_STORE:
+ case READ_FROM_MEMORY: {
+ AtomicLong offset = this.offsetTable.get(mq);
+ if (offset != null) {
+ return offset.get();
+ } else if (ReadOffsetType.READ_FROM_MEMORY == type) {
+ return -1;
+ }
+ }
+ case READ_FROM_STORE: {
+ OffsetSerializeWrapper offsetSerializeWrapper;
+ try {
+ offsetSerializeWrapper = this.readLocalOffset();
+ } catch (MQClientException e) {
+ return -1;
+ }
+ if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
+ AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
+ if (offset != null) {
+ this.updateOffset(mq, offset.get(), false);
+ return offset.get();
+ }
+ }
+ }
+ default:
+ break;
+ }
+ }
+
+ return -1;
+ }
+
+
+ @Override
+ public void persistAll(Set<MessageQueue> mqs) {
+ if (null == mqs || mqs.isEmpty())
+ return;
+
+ OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
+ for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
+ if (mqs.contains(entry.getKey())) {
+ AtomicLong offset = entry.getValue();
+ offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
+ }
+ }
+
+ String jsonString = offsetSerializeWrapper.toJson(true);
+ if (jsonString != null) {
+ try {
+ MixAll.string2File(jsonString, this.storePath);
+ } catch (IOException e) {
+ log.error("persistAll consumer offset Exception, " + this.storePath, e);
+ }
+ }
+ }
+
+
+ @Override
+ public void persist(MessageQueue mq) {
+ }
+
+ @Override
+ public void removeOffset(MessageQueue mq) {
+
+ }
+
+ @Override
+ public void updateConsumeOffsetToBroker(final MessageQueue mq, final long offset, final boolean isOneway)
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+
+ }
+
+ @Override
+ public Map<MessageQueue, Long> cloneOffsetTable(String topic) {
+ Map<MessageQueue, Long> cloneOffsetTable = new HashMap<MessageQueue, Long>();
+ for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
+ MessageQueue mq = entry.getKey();
+ if (!UtilAll.isBlank(topic) && !topic.equals(mq.getTopic())) {
+ continue;
+ }
+ cloneOffsetTable.put(mq, entry.getValue().get());
+
+ }
+ return cloneOffsetTable;
+ }
+
+ private OffsetSerializeWrapper readLocalOffset() throws MQClientException {
+ String content = MixAll.file2String(this.storePath);
+ if (null == content || content.length() == 0) {
+ return this.readLocalOffsetBak();
+ } else {
+ OffsetSerializeWrapper offsetSerializeWrapper = null;
+ try {
+ offsetSerializeWrapper =
+ OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);
+ } catch (Exception e) {
+ log.warn("readLocalOffset Exception, and try to correct", e);
+ return this.readLocalOffsetBak();
+ }
+
+ return offsetSerializeWrapper;
+ }
+ }
+
+ private OffsetSerializeWrapper readLocalOffsetBak() throws MQClientException {
+ String content = MixAll.file2String(this.storePath + ".bak");
+ if (content != null && content.length() > 0) {
+ OffsetSerializeWrapper offsetSerializeWrapper = null;
+ try {
+ offsetSerializeWrapper =
+ OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);
+ } catch (Exception e) {
+ log.warn("readLocalOffset Exception", e);
+ throw new MQClientException("readLocalOffset Exception, maybe fastjson version too low" //
+ + FAQUrl.suggestTodo(FAQUrl.LOAD_JSON_EXCEPTION), //
+ e);
+ }
+ return offsetSerializeWrapper;
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java
new file mode 100644
index 0000000..e69ad23
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.store;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * Wrapper class for offset serialization
+ *
+ * @author shijia.wxr
+ */
+public class OffsetSerializeWrapper extends RemotingSerializable {
+ private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable =
+ new ConcurrentHashMap<MessageQueue, AtomicLong>();
+
+ public ConcurrentHashMap<MessageQueue, AtomicLong> getOffsetTable() {
+ return offsetTable;
+ }
+
+ public void setOffsetTable(ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable) {
+ this.offsetTable = offsetTable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java
new file mode 100644
index 0000000..7c7ccc6
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.store;
+
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+import java.util.Map;
+import java.util.Set;
+
+
+/**
+ * Offset store interface
+ *
+ * @author shijia.wxr
+ */
+public interface OffsetStore {
+ /**
+ * Load
+ *
+ * @throws MQClientException
+ */
+ void load() throws MQClientException;
+
+
+ /**
+ * Update the offset,store it in memory
+ *
+ * @param mq
+ * @param offset
+ * @param increaseOnly
+ */
+ void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly);
+
+ /**
+ * Get offset from local storage
+ *
+ * @param mq
+ * @param type
+ *
+ * @return The fetched offset
+ */
+ long readOffset(final MessageQueue mq, final ReadOffsetType type);
+
+ /**
+ * Persist all offsets,may be in local storage or remote name server
+ *
+ * @param mqs
+ */
+ void persistAll(final Set<MessageQueue> mqs);
+
+ /**
+ * Persist the offset,may be in local storage or remote name server
+ *
+ * @param mq
+ */
+ void persist(final MessageQueue mq);
+
+ /**
+ * Remove offset
+ *
+ * @param mq
+ */
+ void removeOffset(MessageQueue mq);
+
+ /**
+ * @param topic
+ *
+ * @return The cloned offset table of given topic
+ */
+ Map<MessageQueue, Long> cloneOffsetTable(String topic);
+
+ /**
+ *
+ * @param mq
+ * @param offset
+ * @param isOneway
+ */
+ void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
+ MQBrokerException, InterruptedException, MQClientException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/store/ReadOffsetType.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/ReadOffsetType.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/ReadOffsetType.java
new file mode 100644
index 0000000..c2ee9b7
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/ReadOffsetType.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.store;
+
+public enum ReadOffsetType {
+ /**
+ * From memory
+ */
+ READ_FROM_MEMORY,
+ /**
+ * From storage
+ */
+ READ_FROM_STORE,
+ /**
+ * From memory,then from storage
+ */
+ MEMORY_FIRST_THEN_STORE;
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
new file mode 100644
index 0000000..082e7e8
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.store;
+
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * Remote storage implementation
+ *
+ * @author shijia.wxr
+ */
+public class RemoteBrokerOffsetStore implements OffsetStore {
+ private final static Logger log = ClientLogger.getLog();
+ private final MQClientInstance mQClientFactory;
+ private final String groupName;
+ private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable =
+ new ConcurrentHashMap<MessageQueue, AtomicLong>();
+
+
+ public RemoteBrokerOffsetStore(MQClientInstance mQClientFactory, String groupName) {
+ this.mQClientFactory = mQClientFactory;
+ this.groupName = groupName;
+ }
+
+
+ @Override
+ public void load() {
+ }
+
+
+ @Override
+ public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
+ if (mq != null) {
+ AtomicLong offsetOld = this.offsetTable.get(mq);
+ if (null == offsetOld) {
+ offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
+ }
+
+ if (null != offsetOld) {
+ if (increaseOnly) {
+ MixAll.compareAndIncreaseOnly(offsetOld, offset);
+ } else {
+ offsetOld.set(offset);
+ }
+ }
+ }
+ }
+
+
+ @Override
+ public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
+ if (mq != null) {
+ switch (type) {
+ case MEMORY_FIRST_THEN_STORE:
+ case READ_FROM_MEMORY: {
+ AtomicLong offset = this.offsetTable.get(mq);
+ if (offset != null) {
+ return offset.get();
+ } else if (ReadOffsetType.READ_FROM_MEMORY == type) {
+ return -1;
+ }
+ }
+ case READ_FROM_STORE: {
+ try {
+ long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
+ AtomicLong offset = new AtomicLong(brokerOffset);
+ this.updateOffset(mq, offset.get(), false);
+ return brokerOffset;
+ }
+ // No offset in broker
+ catch (MQBrokerException e) {
+ return -1;
+ }
+ //Other exceptions
+ catch (Exception e) {
+ log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);
+ return -2;
+ }
+ }
+ default:
+ break;
+ }
+ }
+
+ return -1;
+ }
+
+
+ @Override
+ public void persistAll(Set<MessageQueue> mqs) {
+ if (null == mqs || mqs.isEmpty())
+ return;
+
+ final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
+ if (mqs != null && !mqs.isEmpty()) {
+ for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
+ MessageQueue mq = entry.getKey();
+ AtomicLong offset = entry.getValue();
+ if (offset != null) {
+ if (mqs.contains(mq)) {
+ try {
+ this.updateConsumeOffsetToBroker(mq, offset.get());
+ log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
+ this.groupName,
+ this.mQClientFactory.getClientId(),
+ mq,
+ offset.get());
+ } catch (Exception e) {
+ log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
+ }
+ } else {
+ unusedMQ.add(mq);
+ }
+ }
+ }
+ }
+
+ if (!unusedMQ.isEmpty()) {
+ for (MessageQueue mq : unusedMQ) {
+ this.offsetTable.remove(mq);
+ log.info("remove unused mq, {}, {}", mq, this.groupName);
+ }
+ }
+ }
+
+
+ @Override
+ public void persist(MessageQueue mq) {
+ AtomicLong offset = this.offsetTable.get(mq);
+ if (offset != null) {
+ try {
+ this.updateConsumeOffsetToBroker(mq, offset.get());
+ log.info("[persist] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
+ this.groupName,
+ this.mQClientFactory.getClientId(),
+ mq,
+ offset.get());
+ } catch (Exception e) {
+ log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
+ }
+ }
+ }
+
+ public void removeOffset(MessageQueue mq) {
+ if (mq != null) {
+ this.offsetTable.remove(mq);
+ log.info("remove unnecessary messageQueue offset. group={}, mq={}, offsetTableSize={}", this.groupName, mq,
+ offsetTable.size());
+ }
+ }
+
+ @Override
+ public Map<MessageQueue, Long> cloneOffsetTable(String topic) {
+ Map<MessageQueue, Long> cloneOffsetTable = new HashMap<MessageQueue, Long>();
+ for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
+ MessageQueue mq = entry.getKey();
+ if (!UtilAll.isBlank(topic) && !topic.equals(mq.getTopic())) {
+ continue;
+ }
+ cloneOffsetTable.put(mq, entry.getValue().get());
+ }
+ return cloneOffsetTable;
+ }
+
+ /**
+ * Update the Consumer Offset in one way, once the Master is off, updated to Slave,
+ * here need to be optimized.
+ */
+ private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException,
+ MQBrokerException, InterruptedException, MQClientException {
+ updateConsumeOffsetToBroker(mq, offset, true);
+ }
+
+ /**
+ * Update the Consumer Offset synchronously, once the Master is off, updated to Slave,
+ * here need to be optimized.
+ */
+ @Override
+ public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
+ MQBrokerException, InterruptedException, MQClientException {
+ FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
+ if (null == findBrokerResult) {
+ // TODO Here may be heavily overhead for Name Server,need tuning
+ this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
+ findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
+ }
+
+ if (findBrokerResult != null) {
+ UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
+ requestHeader.setTopic(mq.getTopic());
+ requestHeader.setConsumerGroup(this.groupName);
+ requestHeader.setQueueId(mq.getQueueId());
+ requestHeader.setCommitOffset(offset);
+
+ if (isOneway) {
+ this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
+ findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
+ } else {
+ this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
+ findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
+ }
+ } else {
+ throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
+ }
+ }
+
+ private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
+ InterruptedException, MQClientException {
+ FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
+ if (null == findBrokerResult) {
+ // TODO Here may be heavily overhead for Name Server,need tuning
+ this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
+ findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
+ }
+
+ if (findBrokerResult != null) {
+ QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
+ requestHeader.setTopic(mq.getTopic());
+ requestHeader.setConsumerGroup(this.groupName);
+ requestHeader.setQueueId(mq.getQueueId());
+
+ return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
+ findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
+ } else {
+ throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java b/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java
new file mode 100644
index 0000000..5e8d1b9
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.exception;
+
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.help.FAQUrl;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MQBrokerException extends Exception {
+ private static final long serialVersionUID = 5975020272601250368L;
+ private final int responseCode;
+ private final String errorMessage;
+
+
+ public MQBrokerException(int responseCode, String errorMessage) {
+ super(FAQUrl.attachDefaultURL("CODE: " + UtilAll.responseCode2String(responseCode) + " DESC: "
+ + errorMessage));
+ this.responseCode = responseCode;
+ this.errorMessage = errorMessage;
+ }
+
+
+ public int getResponseCode() {
+ return responseCode;
+ }
+
+
+ public String getErrorMessage() {
+ return errorMessage;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/exception/MQClientException.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/exception/MQClientException.java b/client/src/main/java/org/apache/rocketmq/client/exception/MQClientException.java
new file mode 100644
index 0000000..5f32d12
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/exception/MQClientException.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.exception;
+
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.help.FAQUrl;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MQClientException extends Exception {
+ private static final long serialVersionUID = -5758410930844185841L;
+ private int responseCode;
+ private String errorMessage;
+
+
+ public MQClientException(String errorMessage, Throwable cause) {
+ super(FAQUrl.attachDefaultURL(errorMessage), cause);
+ this.responseCode = -1;
+ this.errorMessage = errorMessage;
+ }
+
+
+ public MQClientException(int responseCode, String errorMessage) {
+ super(FAQUrl.attachDefaultURL("CODE: " + UtilAll.responseCode2String(responseCode) + " DESC: "
+ + errorMessage));
+ this.responseCode = responseCode;
+ this.errorMessage = errorMessage;
+ }
+
+ public int getResponseCode() {
+ return responseCode;
+ }
+
+ public MQClientException setResponseCode(final int responseCode) {
+ this.responseCode = responseCode;
+ return this;
+ }
+
+ public String getErrorMessage() {
+ return errorMessage;
+ }
+
+ public void setErrorMessage(final String errorMessage) {
+ this.errorMessage = errorMessage;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenContext.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenContext.java b/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenContext.java
new file mode 100644
index 0000000..8cb4ca9
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenContext.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.hook;
+
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class CheckForbiddenContext {
+ private String nameSrvAddr;
+ private String group;
+ private Message message;
+ private MessageQueue mq;
+ private String brokerAddr;
+ private CommunicationMode communicationMode;
+ private SendResult sendResult;
+ private Exception exception;
+ private Object arg;
+ private boolean unitMode = false;
+
+
+ public String getGroup() {
+ return group;
+ }
+
+
+ public void setGroup(String group) {
+ this.group = group;
+ }
+
+
+ public Message getMessage() {
+ return message;
+ }
+
+
+ public void setMessage(Message message) {
+ this.message = message;
+ }
+
+
+ public MessageQueue getMq() {
+ return mq;
+ }
+
+
+ public void setMq(MessageQueue mq) {
+ this.mq = mq;
+ }
+
+
+ public String getBrokerAddr() {
+ return brokerAddr;
+ }
+
+
+ public void setBrokerAddr(String brokerAddr) {
+ this.brokerAddr = brokerAddr;
+ }
+
+
+ public CommunicationMode getCommunicationMode() {
+ return communicationMode;
+ }
+
+
+ public void setCommunicationMode(CommunicationMode communicationMode) {
+ this.communicationMode = communicationMode;
+ }
+
+
+ public SendResult getSendResult() {
+ return sendResult;
+ }
+
+
+ public void setSendResult(SendResult sendResult) {
+ this.sendResult = sendResult;
+ }
+
+
+ public Exception getException() {
+ return exception;
+ }
+
+
+ public void setException(Exception exception) {
+ this.exception = exception;
+ }
+
+
+ public Object getArg() {
+ return arg;
+ }
+
+
+ public void setArg(Object arg) {
+ this.arg = arg;
+ }
+
+
+ public boolean isUnitMode() {
+ return unitMode;
+ }
+
+
+ public void setUnitMode(boolean isUnitMode) {
+ this.unitMode = isUnitMode;
+ }
+
+
+ public String getNameSrvAddr() {
+ return nameSrvAddr;
+ }
+
+
+ public void setNameSrvAddr(String nameSrvAddr) {
+ this.nameSrvAddr = nameSrvAddr;
+ }
+
+
+ @Override
+ public String toString() {
+ return "SendMessageContext [nameSrvAddr=" + nameSrvAddr + ", group=" + group + ", message=" + message
+ + ", mq=" + mq + ", brokerAddr=" + brokerAddr + ", communicationMode=" + communicationMode
+ + ", sendResult=" + sendResult + ", exception=" + exception + ", unitMode=" + unitMode
+ + ", arg=" + arg + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenHook.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenHook.java b/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenHook.java
new file mode 100644
index 0000000..41ed088
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenHook.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.hook;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+
+
+/**
+ * @author manhong.yqd
+ */
+public interface CheckForbiddenHook {
+ public String hookName();
+
+
+ public void checkForbidden(final CheckForbiddenContext context) throws MQClientException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java b/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java
new file mode 100644
index 0000000..f141fac
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.hook;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.List;
+import java.util.Map;
+
+
+public class ConsumeMessageContext {
+ private String consumerGroup;
+ private List<MessageExt> msgList;
+ private MessageQueue mq;
+ private boolean success;
+ private String status;
+ private Object mqTraceContext;
+ private Map<String, String> props;
+
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+
+ public List<MessageExt> getMsgList() {
+ return msgList;
+ }
+
+
+ public void setMsgList(List<MessageExt> msgList) {
+ this.msgList = msgList;
+ }
+
+
+ public MessageQueue getMq() {
+ return mq;
+ }
+
+
+ public void setMq(MessageQueue mq) {
+ this.mq = mq;
+ }
+
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+
+ public void setSuccess(boolean success) {
+ this.success = success;
+ }
+
+
+ public Object getMqTraceContext() {
+ return mqTraceContext;
+ }
+
+
+ public void setMqTraceContext(Object mqTraceContext) {
+ this.mqTraceContext = mqTraceContext;
+ }
+
+
+ public Map<String, String> getProps() {
+ return props;
+ }
+
+
+ public void setProps(Map<String, String> props) {
+ this.props = props;
+ }
+
+
+ public String getStatus() {
+ return status;
+ }
+
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageHook.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageHook.java b/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageHook.java
new file mode 100644
index 0000000..8161d2e
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageHook.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.hook;
+
+public interface ConsumeMessageHook {
+ String hookName();
+
+ void consumeMessageBefore(final ConsumeMessageContext context);
+
+ void consumeMessageAfter(final ConsumeMessageContext context);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageContext.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageContext.java b/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageContext.java
new file mode 100644
index 0000000..942fd71
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageContext.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.hook;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.List;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class FilterMessageContext {
+ private String consumerGroup;
+ private List<MessageExt> msgList;
+ private MessageQueue mq;
+ private Object arg;
+ private boolean unitMode;
+
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+
+ public List<MessageExt> getMsgList() {
+ return msgList;
+ }
+
+
+ public void setMsgList(List<MessageExt> msgList) {
+ this.msgList = msgList;
+ }
+
+
+ public MessageQueue getMq() {
+ return mq;
+ }
+
+
+ public void setMq(MessageQueue mq) {
+ this.mq = mq;
+ }
+
+
+ public Object getArg() {
+ return arg;
+ }
+
+
+ public void setArg(Object arg) {
+ this.arg = arg;
+ }
+
+
+ public boolean isUnitMode() {
+ return unitMode;
+ }
+
+
+ public void setUnitMode(boolean isUnitMode) {
+ this.unitMode = isUnitMode;
+ }
+
+
+ @Override
+ public String toString() {
+ return "ConsumeMessageContext [consumerGroup=" + consumerGroup + ", msgList=" + msgList + ", mq="
+ + mq + ", arg=" + arg + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageHook.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageHook.java b/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageHook.java
new file mode 100644
index 0000000..016ff56
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageHook.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.hook;
+
+/**
+ * @author manhong.yqd
+ */
+public interface FilterMessageHook {
+ public String hookName();
+
+
+ public void filterMessage(final FilterMessageContext context);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageContext.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageContext.java b/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageContext.java
new file mode 100644
index 0000000..bfb4a47
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageContext.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.hook;
+
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.message.MessageType;
+
+import java.util.Map;
+
+
+public class SendMessageContext {
+ private String producerGroup;
+ private Message message;
+ private MessageQueue mq;
+ private String brokerAddr;
+ private String bornHost;
+ private CommunicationMode communicationMode;
+ private SendResult sendResult;
+ private Exception exception;
+ private Object mqTraceContext;
+ private Map<String, String> props;
+ private DefaultMQProducerImpl producer;
+ private MessageType msgType = MessageType.Normal_Msg;
+
+ public MessageType getMsgType() {
+ return msgType;
+ }
+
+ public void setMsgType(final MessageType msgType) {
+ this.msgType = msgType;
+ }
+
+ public DefaultMQProducerImpl getProducer() {
+ return producer;
+ }
+
+ public void setProducer(final DefaultMQProducerImpl producer) {
+ this.producer = producer;
+ }
+
+ public String getProducerGroup() {
+ return producerGroup;
+ }
+
+
+ public void setProducerGroup(String producerGroup) {
+ this.producerGroup = producerGroup;
+ }
+
+
+ public Message getMessage() {
+ return message;
+ }
+
+
+ public void setMessage(Message message) {
+ this.message = message;
+ }
+
+
+ public MessageQueue getMq() {
+ return mq;
+ }
+
+
+ public void setMq(MessageQueue mq) {
+ this.mq = mq;
+ }
+
+
+ public String getBrokerAddr() {
+ return brokerAddr;
+ }
+
+
+ public void setBrokerAddr(String brokerAddr) {
+ this.brokerAddr = brokerAddr;
+ }
+
+
+ public CommunicationMode getCommunicationMode() {
+ return communicationMode;
+ }
+
+
+ public void setCommunicationMode(CommunicationMode communicationMode) {
+ this.communicationMode = communicationMode;
+ }
+
+
+ public SendResult getSendResult() {
+ return sendResult;
+ }
+
+
+ public void setSendResult(SendResult sendResult) {
+ this.sendResult = sendResult;
+ }
+
+
+ public Exception getException() {
+ return exception;
+ }
+
+
+ public void setException(Exception exception) {
+ this.exception = exception;
+ }
+
+
+ public Object getMqTraceContext() {
+ return mqTraceContext;
+ }
+
+
+ public void setMqTraceContext(Object mqTraceContext) {
+ this.mqTraceContext = mqTraceContext;
+ }
+
+
+ public Map<String, String> getProps() {
+ return props;
+ }
+
+
+ public void setProps(Map<String, String> props) {
+ this.props = props;
+ }
+
+
+ public String getBornHost() {
+ return bornHost;
+ }
+
+
+ public void setBornHost(String bornHost) {
+ this.bornHost = bornHost;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageHook.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageHook.java b/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageHook.java
new file mode 100644
index 0000000..c040831
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageHook.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.hook;
+
+public interface SendMessageHook {
+ String hookName();
+
+ void sendMessageBefore(final SendMessageContext context);
+
+ void sendMessageAfter(final SendMessageContext context);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
new file mode 100644
index 0000000..50e9b45
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl;
+
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.impl.producer.MQProducerInner;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.common.protocol.body.GetConsumerStatusBody;
+import org.apache.rocketmq.common.protocol.body.ResetOffsetBody;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.rocketmq.common.protocol.header.*;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ClientRemotingProcessor implements NettyRequestProcessor {
+ private final Logger log = ClientLogger.getLog();
+ private final MQClientInstance mqClientFactory;
+
+
+ public ClientRemotingProcessor(final MQClientInstance mqClientFactory) {
+ this.mqClientFactory = mqClientFactory;
+ }
+
+
+ @Override
+ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ switch (request.getCode()) {
+ case RequestCode.CHECK_TRANSACTION_STATE:
+ return this.checkTransactionState(ctx, request);
+ case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:
+ return this.notifyConsumerIdsChanged(ctx, request);
+ case RequestCode.RESET_CONSUMER_CLIENT_OFFSET:
+ return this.resetOffset(ctx, request);
+ case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT:
+ return this.getConsumeStatus(ctx, request);
+
+ case RequestCode.GET_CONSUMER_RUNNING_INFO:
+ return this.getConsumerRunningInfo(ctx, request);
+
+ case RequestCode.CONSUME_MESSAGE_DIRECTLY:
+ return this.consumeMessageDirectly(ctx, request);
+ default:
+ break;
+ }
+ return null;
+ }
+
+ @Override
+ public boolean rejectRequest() {
+ return false;
+ }
+
+ public RemotingCommand checkTransactionState(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ final CheckTransactionStateRequestHeader requestHeader =
+ (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
+ final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
+ final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
+ if (messageExt != null) {
+ final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
+ if (group != null) {
+ MQProducerInner producer = this.mqClientFactory.selectProducer(group);
+ if (producer != null) {
+ final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ producer.checkTransactionState(addr, messageExt, requestHeader);
+ } else {
+ log.debug("checkTransactionState, pick producer by group[{}] failed", group);
+ }
+ } else {
+ log.warn("checkTransactionState, pick producer group failed");
+ }
+ } else {
+ log.warn("checkTransactionState, decode message failed");
+ }
+
+ return null;
+ }
+
+ public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ try {
+ final NotifyConsumerIdsChangedRequestHeader requestHeader =
+ (NotifyConsumerIdsChangedRequestHeader) request.decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class);
+ log.info("receive broker's notification[{}], the consumer group: {} changed, rebalance immediately",
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+ requestHeader.getConsumerGroup());
+ this.mqClientFactory.rebalanceImmediately();
+ } catch (Exception e) {
+ log.error("notifyConsumerIdsChanged exception", RemotingHelper.exceptionSimpleDesc(e));
+ }
+ return null;
+ }
+
+ public RemotingCommand resetOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ final ResetOffsetRequestHeader requestHeader =
+ (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
+ log.info("invoke reset offset operation from broker. brokerAddr={}, topic={}, group={}, timestamp={}",
+ new Object[]{RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(),
+ requestHeader.getTimestamp()});
+ Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
+ if (request.getBody() != null) {
+ ResetOffsetBody body = ResetOffsetBody.decode(request.getBody(), ResetOffsetBody.class);
+ offsetTable = body.getOffsetTable();
+ }
+ this.mqClientFactory.resetOffset(requestHeader.getTopic(), requestHeader.getGroup(), offsetTable);
+ return null;
+ }
+
+ @Deprecated
+ public RemotingCommand getConsumeStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ final GetConsumerStatusRequestHeader requestHeader =
+ (GetConsumerStatusRequestHeader) request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class);
+
+ Map<MessageQueue, Long> offsetTable = this.mqClientFactory.getConsumerStatus(requestHeader.getTopic(), requestHeader.getGroup());
+ GetConsumerStatusBody body = new GetConsumerStatusBody();
+ body.setMessageQueueTable(offsetTable);
+ response.setBody(body.encode());
+ response.setCode(ResponseCode.SUCCESS);
+ return response;
+ }
+
+ private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ final GetConsumerRunningInfoRequestHeader requestHeader =
+ (GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);
+
+ ConsumerRunningInfo consumerRunningInfo = this.mqClientFactory.consumerRunningInfo(requestHeader.getConsumerGroup());
+ if (null != consumerRunningInfo) {
+ if (requestHeader.isJstackEnable()) {
+ Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
+ String jstack = UtilAll.jstack(map);
+ consumerRunningInfo.setJstack(jstack);
+ }
+
+ response.setCode(ResponseCode.SUCCESS);
+ response.setBody(consumerRunningInfo.encode());
+ } else {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer", requestHeader.getConsumerGroup()));
+ }
+
+ return response;
+ }
+
+ private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ final ConsumeMessageDirectlyResultRequestHeader requestHeader =
+ (ConsumeMessageDirectlyResultRequestHeader) request
+ .decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class);
+
+ final MessageExt msg = MessageDecoder.decode(ByteBuffer.wrap(request.getBody()));
+
+ ConsumeMessageDirectlyResult result =
+ this.mqClientFactory.consumeMessageDirectly(msg, requestHeader.getConsumerGroup(), requestHeader.getBrokerName());
+
+ if (null != result) {
+ response.setCode(ResponseCode.SUCCESS);
+ response.setBody(result.encode());
+ } else {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer", requestHeader.getConsumerGroup()));
+ }
+
+ return response;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/CommunicationMode.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/CommunicationMode.java b/client/src/main/java/org/apache/rocketmq/client/impl/CommunicationMode.java
new file mode 100644
index 0000000..0f57339
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/CommunicationMode.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl;
+
+/**
+ * @author shijia.wxr
+ */
+public enum CommunicationMode {
+ SYNC,
+ ASYNC,
+ ONEWAY,
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java b/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java
new file mode 100644
index 0000000..56528ef
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl;
+
+/**
+ * @author shijia.wxr
+ */
+public class FindBrokerResult {
+ private final String brokerAddr;
+ private final boolean slave;
+
+
+ public FindBrokerResult(String brokerAddr, boolean slave) {
+ this.brokerAddr = brokerAddr;
+ this.slave = slave;
+ }
+
+
+ public String getBrokerAddr() {
+ return brokerAddr;
+ }
+
+
+ public boolean isSlave() {
+ return slave;
+ }
+}