You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 02:44:33 UTC
[37/58] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18
Rename package name from com.alibaba to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
deleted file mode 100644
index 9df2be3..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer.store;
-
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.impl.FindBrokerResult;
-import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.UtilAll;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
-import com.alibaba.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
-import com.alibaba.rocketmq.remoting.exception.RemotingException;
-import org.slf4j.Logger;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-
-/**
- * Remote storage implementation
- *
- * @author shijia.wxr
- */
-public class RemoteBrokerOffsetStore implements OffsetStore {
- private final static Logger log = ClientLogger.getLog();
- private final MQClientInstance mQClientFactory;
- private final String groupName;
- private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable =
- new ConcurrentHashMap<MessageQueue, AtomicLong>();
-
-
- public RemoteBrokerOffsetStore(MQClientInstance mQClientFactory, String groupName) {
- this.mQClientFactory = mQClientFactory;
- this.groupName = groupName;
- }
-
-
- @Override
- public void load() {
- }
-
-
- @Override
- public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
- if (mq != null) {
- AtomicLong offsetOld = this.offsetTable.get(mq);
- if (null == offsetOld) {
- offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
- }
-
- if (null != offsetOld) {
- if (increaseOnly) {
- MixAll.compareAndIncreaseOnly(offsetOld, offset);
- } else {
- offsetOld.set(offset);
- }
- }
- }
- }
-
-
- @Override
- public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
- if (mq != null) {
- switch (type) {
- case MEMORY_FIRST_THEN_STORE:
- case READ_FROM_MEMORY: {
- AtomicLong offset = this.offsetTable.get(mq);
- if (offset != null) {
- return offset.get();
- } else if (ReadOffsetType.READ_FROM_MEMORY == type) {
- return -1;
- }
- }
- case READ_FROM_STORE: {
- try {
- long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
- AtomicLong offset = new AtomicLong(brokerOffset);
- this.updateOffset(mq, offset.get(), false);
- return brokerOffset;
- }
- // No offset in broker
- catch (MQBrokerException e) {
- return -1;
- }
- //Other exceptions
- catch (Exception e) {
- log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);
- return -2;
- }
- }
- default:
- break;
- }
- }
-
- return -1;
- }
-
-
- @Override
- public void persistAll(Set<MessageQueue> mqs) {
- if (null == mqs || mqs.isEmpty())
- return;
-
- final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
- if (mqs != null && !mqs.isEmpty()) {
- for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
- MessageQueue mq = entry.getKey();
- AtomicLong offset = entry.getValue();
- if (offset != null) {
- if (mqs.contains(mq)) {
- try {
- this.updateConsumeOffsetToBroker(mq, offset.get());
- log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
- this.groupName,
- this.mQClientFactory.getClientId(),
- mq,
- offset.get());
- } catch (Exception e) {
- log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
- }
- } else {
- unusedMQ.add(mq);
- }
- }
- }
- }
-
- if (!unusedMQ.isEmpty()) {
- for (MessageQueue mq : unusedMQ) {
- this.offsetTable.remove(mq);
- log.info("remove unused mq, {}, {}", mq, this.groupName);
- }
- }
- }
-
-
- @Override
- public void persist(MessageQueue mq) {
- AtomicLong offset = this.offsetTable.get(mq);
- if (offset != null) {
- try {
- this.updateConsumeOffsetToBroker(mq, offset.get());
- log.info("[persist] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
- this.groupName,
- this.mQClientFactory.getClientId(),
- mq,
- offset.get());
- } catch (Exception e) {
- log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
- }
- }
- }
-
- public void removeOffset(MessageQueue mq) {
- if (mq != null) {
- this.offsetTable.remove(mq);
- log.info("remove unnecessary messageQueue offset. group={}, mq={}, offsetTableSize={}", this.groupName, mq,
- offsetTable.size());
- }
- }
-
- @Override
- public Map<MessageQueue, Long> cloneOffsetTable(String topic) {
- Map<MessageQueue, Long> cloneOffsetTable = new HashMap<MessageQueue, Long>();
- for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
- MessageQueue mq = entry.getKey();
- if (!UtilAll.isBlank(topic) && !topic.equals(mq.getTopic())) {
- continue;
- }
- cloneOffsetTable.put(mq, entry.getValue().get());
- }
- return cloneOffsetTable;
- }
-
- /**
- * Update the Consumer Offset in one way, once the Master is off, updated to Slave,
- * here need to be optimized.
- */
- private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException,
- MQBrokerException, InterruptedException, MQClientException {
- updateConsumeOffsetToBroker(mq, offset, true);
- }
-
- /**
- * Update the Consumer Offset synchronously, once the Master is off, updated to Slave,
- * here need to be optimized.
- */
- @Override
- public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
- MQBrokerException, InterruptedException, MQClientException {
- FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
- if (null == findBrokerResult) {
- // TODO Here may be heavily overhead for Name Server,need tuning
- this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
- findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
- }
-
- if (findBrokerResult != null) {
- UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
- requestHeader.setTopic(mq.getTopic());
- requestHeader.setConsumerGroup(this.groupName);
- requestHeader.setQueueId(mq.getQueueId());
- requestHeader.setCommitOffset(offset);
-
- if (isOneway) {
- this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
- findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
- } else {
- this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
- findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
- }
- } else {
- throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
- }
- }
-
- private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
- InterruptedException, MQClientException {
- FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
- if (null == findBrokerResult) {
- // TODO Here may be heavily overhead for Name Server,need tuning
- this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
- findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
- }
-
- if (findBrokerResult != null) {
- QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
- requestHeader.setTopic(mq.getTopic());
- requestHeader.setConsumerGroup(this.groupName);
- requestHeader.setQueueId(mq.getQueueId());
-
- return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
- findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
- } else {
- throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/exception/MQBrokerException.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/exception/MQBrokerException.java b/client/src/main/java/com/alibaba/rocketmq/client/exception/MQBrokerException.java
deleted file mode 100644
index 7fc09f8..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/exception/MQBrokerException.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.exception;
-
-import com.alibaba.rocketmq.common.UtilAll;
-import com.alibaba.rocketmq.common.help.FAQUrl;
-
-
-/**
- * @author shijia.wxr
- */
-public class MQBrokerException extends Exception {
- private static final long serialVersionUID = 5975020272601250368L;
- private final int responseCode;
- private final String errorMessage;
-
-
- public MQBrokerException(int responseCode, String errorMessage) {
- super(FAQUrl.attachDefaultURL("CODE: " + UtilAll.responseCode2String(responseCode) + " DESC: "
- + errorMessage));
- this.responseCode = responseCode;
- this.errorMessage = errorMessage;
- }
-
-
- public int getResponseCode() {
- return responseCode;
- }
-
-
- public String getErrorMessage() {
- return errorMessage;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/exception/MQClientException.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/exception/MQClientException.java b/client/src/main/java/com/alibaba/rocketmq/client/exception/MQClientException.java
deleted file mode 100644
index f343a67..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/exception/MQClientException.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.exception;
-
-import com.alibaba.rocketmq.common.UtilAll;
-import com.alibaba.rocketmq.common.help.FAQUrl;
-
-
-/**
- * @author shijia.wxr
- */
-public class MQClientException extends Exception {
- private static final long serialVersionUID = -5758410930844185841L;
- private int responseCode;
- private String errorMessage;
-
-
- public MQClientException(String errorMessage, Throwable cause) {
- super(FAQUrl.attachDefaultURL(errorMessage), cause);
- this.responseCode = -1;
- this.errorMessage = errorMessage;
- }
-
-
- public MQClientException(int responseCode, String errorMessage) {
- super(FAQUrl.attachDefaultURL("CODE: " + UtilAll.responseCode2String(responseCode) + " DESC: "
- + errorMessage));
- this.responseCode = responseCode;
- this.errorMessage = errorMessage;
- }
-
- public int getResponseCode() {
- return responseCode;
- }
-
- public MQClientException setResponseCode(final int responseCode) {
- this.responseCode = responseCode;
- return this;
- }
-
- public String getErrorMessage() {
- return errorMessage;
- }
-
- public void setErrorMessage(final String errorMessage) {
- this.errorMessage = errorMessage;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/hook/CheckForbiddenContext.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/hook/CheckForbiddenContext.java b/client/src/main/java/com/alibaba/rocketmq/client/hook/CheckForbiddenContext.java
deleted file mode 100644
index 6291803..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/hook/CheckForbiddenContext.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.hook;
-
-import com.alibaba.rocketmq.client.impl.CommunicationMode;
-import com.alibaba.rocketmq.client.producer.SendResult;
-import com.alibaba.rocketmq.common.message.Message;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-
-/**
- * @author manhong.yqd
- */
-public class CheckForbiddenContext {
- private String nameSrvAddr;
- private String group;
- private Message message;
- private MessageQueue mq;
- private String brokerAddr;
- private CommunicationMode communicationMode;
- private SendResult sendResult;
- private Exception exception;
- private Object arg;
- private boolean unitMode = false;
-
-
- public String getGroup() {
- return group;
- }
-
-
- public void setGroup(String group) {
- this.group = group;
- }
-
-
- public Message getMessage() {
- return message;
- }
-
-
- public void setMessage(Message message) {
- this.message = message;
- }
-
-
- public MessageQueue getMq() {
- return mq;
- }
-
-
- public void setMq(MessageQueue mq) {
- this.mq = mq;
- }
-
-
- public String getBrokerAddr() {
- return brokerAddr;
- }
-
-
- public void setBrokerAddr(String brokerAddr) {
- this.brokerAddr = brokerAddr;
- }
-
-
- public CommunicationMode getCommunicationMode() {
- return communicationMode;
- }
-
-
- public void setCommunicationMode(CommunicationMode communicationMode) {
- this.communicationMode = communicationMode;
- }
-
-
- public SendResult getSendResult() {
- return sendResult;
- }
-
-
- public void setSendResult(SendResult sendResult) {
- this.sendResult = sendResult;
- }
-
-
- public Exception getException() {
- return exception;
- }
-
-
- public void setException(Exception exception) {
- this.exception = exception;
- }
-
-
- public Object getArg() {
- return arg;
- }
-
-
- public void setArg(Object arg) {
- this.arg = arg;
- }
-
-
- public boolean isUnitMode() {
- return unitMode;
- }
-
-
- public void setUnitMode(boolean isUnitMode) {
- this.unitMode = isUnitMode;
- }
-
-
- public String getNameSrvAddr() {
- return nameSrvAddr;
- }
-
-
- public void setNameSrvAddr(String nameSrvAddr) {
- this.nameSrvAddr = nameSrvAddr;
- }
-
-
- @Override
- public String toString() {
- return "SendMessageContext [nameSrvAddr=" + nameSrvAddr + ", group=" + group + ", message=" + message
- + ", mq=" + mq + ", brokerAddr=" + brokerAddr + ", communicationMode=" + communicationMode
- + ", sendResult=" + sendResult + ", exception=" + exception + ", unitMode=" + unitMode
- + ", arg=" + arg + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/hook/CheckForbiddenHook.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/hook/CheckForbiddenHook.java b/client/src/main/java/com/alibaba/rocketmq/client/hook/CheckForbiddenHook.java
deleted file mode 100644
index 35a2740..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/hook/CheckForbiddenHook.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.client.hook;
-
-import com.alibaba.rocketmq.client.exception.MQClientException;
-
-
-/**
- * @author manhong.yqd
- */
-public interface CheckForbiddenHook {
- public String hookName();
-
-
- public void checkForbidden(final CheckForbiddenContext context) throws MQClientException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/hook/ConsumeMessageContext.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/hook/ConsumeMessageContext.java b/client/src/main/java/com/alibaba/rocketmq/client/hook/ConsumeMessageContext.java
deleted file mode 100644
index 0c0e7cd..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/hook/ConsumeMessageContext.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.hook;
-
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-import java.util.List;
-import java.util.Map;
-
-
-public class ConsumeMessageContext {
- private String consumerGroup;
- private List<MessageExt> msgList;
- private MessageQueue mq;
- private boolean success;
- private String status;
- private Object mqTraceContext;
- private Map<String, String> props;
-
-
- public String getConsumerGroup() {
- return consumerGroup;
- }
-
-
- public void setConsumerGroup(String consumerGroup) {
- this.consumerGroup = consumerGroup;
- }
-
-
- public List<MessageExt> getMsgList() {
- return msgList;
- }
-
-
- public void setMsgList(List<MessageExt> msgList) {
- this.msgList = msgList;
- }
-
-
- public MessageQueue getMq() {
- return mq;
- }
-
-
- public void setMq(MessageQueue mq) {
- this.mq = mq;
- }
-
-
- public boolean isSuccess() {
- return success;
- }
-
-
- public void setSuccess(boolean success) {
- this.success = success;
- }
-
-
- public Object getMqTraceContext() {
- return mqTraceContext;
- }
-
-
- public void setMqTraceContext(Object mqTraceContext) {
- this.mqTraceContext = mqTraceContext;
- }
-
-
- public Map<String, String> getProps() {
- return props;
- }
-
-
- public void setProps(Map<String, String> props) {
- this.props = props;
- }
-
-
- public String getStatus() {
- return status;
- }
-
-
- public void setStatus(String status) {
- this.status = status;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/hook/ConsumeMessageHook.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/hook/ConsumeMessageHook.java b/client/src/main/java/com/alibaba/rocketmq/client/hook/ConsumeMessageHook.java
deleted file mode 100644
index 96b0e53..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/hook/ConsumeMessageHook.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.hook;
-
-public interface ConsumeMessageHook {
- String hookName();
-
- void consumeMessageBefore(final ConsumeMessageContext context);
-
- void consumeMessageAfter(final ConsumeMessageContext context);
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/hook/FilterMessageContext.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/hook/FilterMessageContext.java b/client/src/main/java/com/alibaba/rocketmq/client/hook/FilterMessageContext.java
deleted file mode 100644
index c47f09e..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/hook/FilterMessageContext.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.hook;
-
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-import java.util.List;
-
-
-/**
- * @author manhong.yqd
- */
-public class FilterMessageContext {
- private String consumerGroup;
- private List<MessageExt> msgList;
- private MessageQueue mq;
- private Object arg;
- private boolean unitMode;
-
-
- public String getConsumerGroup() {
- return consumerGroup;
- }
-
-
- public void setConsumerGroup(String consumerGroup) {
- this.consumerGroup = consumerGroup;
- }
-
-
- public List<MessageExt> getMsgList() {
- return msgList;
- }
-
-
- public void setMsgList(List<MessageExt> msgList) {
- this.msgList = msgList;
- }
-
-
- public MessageQueue getMq() {
- return mq;
- }
-
-
- public void setMq(MessageQueue mq) {
- this.mq = mq;
- }
-
-
- public Object getArg() {
- return arg;
- }
-
-
- public void setArg(Object arg) {
- this.arg = arg;
- }
-
-
- public boolean isUnitMode() {
- return unitMode;
- }
-
-
- public void setUnitMode(boolean isUnitMode) {
- this.unitMode = isUnitMode;
- }
-
-
- @Override
- public String toString() {
- return "ConsumeMessageContext [consumerGroup=" + consumerGroup + ", msgList=" + msgList + ", mq="
- + mq + ", arg=" + arg + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/hook/FilterMessageHook.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/hook/FilterMessageHook.java b/client/src/main/java/com/alibaba/rocketmq/client/hook/FilterMessageHook.java
deleted file mode 100644
index 1528ef9..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/hook/FilterMessageHook.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.hook;
-
-/**
- * @author manhong.yqd
- */
-public interface FilterMessageHook {
- public String hookName();
-
-
- public void filterMessage(final FilterMessageContext context);
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/hook/SendMessageContext.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/hook/SendMessageContext.java b/client/src/main/java/com/alibaba/rocketmq/client/hook/SendMessageContext.java
deleted file mode 100644
index 9552456..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/hook/SendMessageContext.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.hook;
-
-import com.alibaba.rocketmq.client.impl.CommunicationMode;
-import com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl;
-import com.alibaba.rocketmq.client.producer.SendResult;
-import com.alibaba.rocketmq.common.message.Message;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.common.message.MessageType;
-
-import java.util.Map;
-
-
-public class SendMessageContext {
- private String producerGroup;
- private Message message;
- private MessageQueue mq;
- private String brokerAddr;
- private String bornHost;
- private CommunicationMode communicationMode;
- private SendResult sendResult;
- private Exception exception;
- private Object mqTraceContext;
- private Map<String, String> props;
- private DefaultMQProducerImpl producer;
- private MessageType msgType = MessageType.Normal_Msg;
-
- public MessageType getMsgType() {
- return msgType;
- }
-
- public void setMsgType(final MessageType msgType) {
- this.msgType = msgType;
- }
-
- public DefaultMQProducerImpl getProducer() {
- return producer;
- }
-
- public void setProducer(final DefaultMQProducerImpl producer) {
- this.producer = producer;
- }
-
- public String getProducerGroup() {
- return producerGroup;
- }
-
-
- public void setProducerGroup(String producerGroup) {
- this.producerGroup = producerGroup;
- }
-
-
- public Message getMessage() {
- return message;
- }
-
-
- public void setMessage(Message message) {
- this.message = message;
- }
-
-
- public MessageQueue getMq() {
- return mq;
- }
-
-
- public void setMq(MessageQueue mq) {
- this.mq = mq;
- }
-
-
- public String getBrokerAddr() {
- return brokerAddr;
- }
-
-
- public void setBrokerAddr(String brokerAddr) {
- this.brokerAddr = brokerAddr;
- }
-
-
- public CommunicationMode getCommunicationMode() {
- return communicationMode;
- }
-
-
- public void setCommunicationMode(CommunicationMode communicationMode) {
- this.communicationMode = communicationMode;
- }
-
-
- public SendResult getSendResult() {
- return sendResult;
- }
-
-
- public void setSendResult(SendResult sendResult) {
- this.sendResult = sendResult;
- }
-
-
- public Exception getException() {
- return exception;
- }
-
-
- public void setException(Exception exception) {
- this.exception = exception;
- }
-
-
- public Object getMqTraceContext() {
- return mqTraceContext;
- }
-
-
- public void setMqTraceContext(Object mqTraceContext) {
- this.mqTraceContext = mqTraceContext;
- }
-
-
- public Map<String, String> getProps() {
- return props;
- }
-
-
- public void setProps(Map<String, String> props) {
- this.props = props;
- }
-
-
- public String getBornHost() {
- return bornHost;
- }
-
-
- public void setBornHost(String bornHost) {
- this.bornHost = bornHost;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/hook/SendMessageHook.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/hook/SendMessageHook.java b/client/src/main/java/com/alibaba/rocketmq/client/hook/SendMessageHook.java
deleted file mode 100644
index 22e1fb3..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/hook/SendMessageHook.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.hook;
-
-public interface SendMessageHook {
- String hookName();
-
- void sendMessageBefore(final SendMessageContext context);
-
- void sendMessageAfter(final SendMessageContext context);
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/ClientRemotingProcessor.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/ClientRemotingProcessor.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/ClientRemotingProcessor.java
deleted file mode 100644
index 79a539e..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/ClientRemotingProcessor.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl;
-
-import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
-import com.alibaba.rocketmq.client.impl.producer.MQProducerInner;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.common.UtilAll;
-import com.alibaba.rocketmq.common.message.MessageConst;
-import com.alibaba.rocketmq.common.message.MessageDecoder;
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.common.protocol.RequestCode;
-import com.alibaba.rocketmq.common.protocol.ResponseCode;
-import com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
-import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo;
-import com.alibaba.rocketmq.common.protocol.body.GetConsumerStatusBody;
-import com.alibaba.rocketmq.common.protocol.body.ResetOffsetBody;
-import com.alibaba.rocketmq.common.protocol.header.*;
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
-import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
-import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
-import io.netty.channel.ChannelHandlerContext;
-import org.slf4j.Logger;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-
-
-/**
- * @author shijia.wxr
- */
-public class ClientRemotingProcessor implements NettyRequestProcessor {
- private final Logger log = ClientLogger.getLog();
- private final MQClientInstance mqClientFactory;
-
-
- public ClientRemotingProcessor(final MQClientInstance mqClientFactory) {
- this.mqClientFactory = mqClientFactory;
- }
-
-
- @Override
- public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
- switch (request.getCode()) {
- case RequestCode.CHECK_TRANSACTION_STATE:
- return this.checkTransactionState(ctx, request);
- case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:
- return this.notifyConsumerIdsChanged(ctx, request);
- case RequestCode.RESET_CONSUMER_CLIENT_OFFSET:
- return this.resetOffset(ctx, request);
- case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT:
- return this.getConsumeStatus(ctx, request);
-
- case RequestCode.GET_CONSUMER_RUNNING_INFO:
- return this.getConsumerRunningInfo(ctx, request);
-
- case RequestCode.CONSUME_MESSAGE_DIRECTLY:
- return this.consumeMessageDirectly(ctx, request);
- default:
- break;
- }
- return null;
- }
-
- @Override
- public boolean rejectRequest() {
- return false;
- }
-
- public RemotingCommand checkTransactionState(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
- final CheckTransactionStateRequestHeader requestHeader =
- (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
- final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
- final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
- if (messageExt != null) {
- final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
- if (group != null) {
- MQProducerInner producer = this.mqClientFactory.selectProducer(group);
- if (producer != null) {
- final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- producer.checkTransactionState(addr, messageExt, requestHeader);
- } else {
- log.debug("checkTransactionState, pick producer by group[{}] failed", group);
- }
- } else {
- log.warn("checkTransactionState, pick producer group failed");
- }
- } else {
- log.warn("checkTransactionState, decode message failed");
- }
-
- return null;
- }
-
- public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
- try {
- final NotifyConsumerIdsChangedRequestHeader requestHeader =
- (NotifyConsumerIdsChangedRequestHeader) request.decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class);
- log.info("receive broker's notification[{}], the consumer group: {} changed, rebalance immediately",
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
- requestHeader.getConsumerGroup());
- this.mqClientFactory.rebalanceImmediately();
- } catch (Exception e) {
- log.error("notifyConsumerIdsChanged exception", RemotingHelper.exceptionSimpleDesc(e));
- }
- return null;
- }
-
- public RemotingCommand resetOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
- final ResetOffsetRequestHeader requestHeader =
- (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
- log.info("invoke reset offset operation from broker. brokerAddr={}, topic={}, group={}, timestamp={}",
- new Object[]{RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(),
- requestHeader.getTimestamp()});
- Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
- if (request.getBody() != null) {
- ResetOffsetBody body = ResetOffsetBody.decode(request.getBody(), ResetOffsetBody.class);
- offsetTable = body.getOffsetTable();
- }
- this.mqClientFactory.resetOffset(requestHeader.getTopic(), requestHeader.getGroup(), offsetTable);
- return null;
- }
-
- @Deprecated
- public RemotingCommand getConsumeStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
- final RemotingCommand response = RemotingCommand.createResponseCommand(null);
- final GetConsumerStatusRequestHeader requestHeader =
- (GetConsumerStatusRequestHeader) request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class);
-
- Map<MessageQueue, Long> offsetTable = this.mqClientFactory.getConsumerStatus(requestHeader.getTopic(), requestHeader.getGroup());
- GetConsumerStatusBody body = new GetConsumerStatusBody();
- body.setMessageQueueTable(offsetTable);
- response.setBody(body.encode());
- response.setCode(ResponseCode.SUCCESS);
- return response;
- }
-
- private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
- final RemotingCommand response = RemotingCommand.createResponseCommand(null);
- final GetConsumerRunningInfoRequestHeader requestHeader =
- (GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);
-
- ConsumerRunningInfo consumerRunningInfo = this.mqClientFactory.consumerRunningInfo(requestHeader.getConsumerGroup());
- if (null != consumerRunningInfo) {
- if (requestHeader.isJstackEnable()) {
- Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
- String jstack = UtilAll.jstack(map);
- consumerRunningInfo.setJstack(jstack);
- }
-
- response.setCode(ResponseCode.SUCCESS);
- response.setBody(consumerRunningInfo.encode());
- } else {
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer", requestHeader.getConsumerGroup()));
- }
-
- return response;
- }
-
- private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
- final RemotingCommand response = RemotingCommand.createResponseCommand(null);
- final ConsumeMessageDirectlyResultRequestHeader requestHeader =
- (ConsumeMessageDirectlyResultRequestHeader) request
- .decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class);
-
- final MessageExt msg = MessageDecoder.decode(ByteBuffer.wrap(request.getBody()));
-
- ConsumeMessageDirectlyResult result =
- this.mqClientFactory.consumeMessageDirectly(msg, requestHeader.getConsumerGroup(), requestHeader.getBrokerName());
-
- if (null != result) {
- response.setCode(ResponseCode.SUCCESS);
- response.setBody(result.encode());
- } else {
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer", requestHeader.getConsumerGroup()));
- }
-
- return response;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/CommunicationMode.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/CommunicationMode.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/CommunicationMode.java
deleted file mode 100644
index bc2f95d..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/CommunicationMode.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl;
-
-/**
- * @author shijia.wxr
- */
-public enum CommunicationMode {
- SYNC,
- ASYNC,
- ONEWAY,
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/FindBrokerResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/FindBrokerResult.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/FindBrokerResult.java
deleted file mode 100644
index 22805cd..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/FindBrokerResult.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl;
-
-/**
- * @author shijia.wxr
- */
-public class FindBrokerResult {
- private final String brokerAddr;
- private final boolean slave;
-
-
- public FindBrokerResult(String brokerAddr, boolean slave) {
- this.brokerAddr = brokerAddr;
- this.slave = slave;
- }
-
-
- public String getBrokerAddr() {
- return brokerAddr;
- }
-
-
- public boolean isSlave() {
- return slave;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/MQAdminImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/MQAdminImpl.java
deleted file mode 100644
index 9f7e964..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/MQAdminImpl.java
+++ /dev/null
@@ -1,418 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl;
-
-import com.alibaba.rocketmq.client.QueryResult;
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
-import com.alibaba.rocketmq.client.impl.producer.TopicPublishInfo;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.TopicConfig;
-import com.alibaba.rocketmq.common.help.FAQUrl;
-import com.alibaba.rocketmq.common.message.*;
-import com.alibaba.rocketmq.common.protocol.ResponseCode;
-import com.alibaba.rocketmq.common.protocol.header.QueryMessageRequestHeader;
-import com.alibaba.rocketmq.common.protocol.header.QueryMessageResponseHeader;
-import com.alibaba.rocketmq.common.protocol.route.BrokerData;
-import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
-import com.alibaba.rocketmq.remoting.InvokeCallback;
-import com.alibaba.rocketmq.remoting.common.RemotingUtil;
-import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
-import com.alibaba.rocketmq.remoting.exception.RemotingException;
-import com.alibaba.rocketmq.remoting.netty.ResponseFuture;
-import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
-import org.slf4j.Logger;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-
-/**
- * @author shijia.wxr
- */
-public class MQAdminImpl {
-
- private final Logger log = ClientLogger.getLog();
- private final MQClientInstance mQClientFactory;
- private long timeoutMillis = 6000;
-
-
- public MQAdminImpl(MQClientInstance mQClientFactory) {
- this.mQClientFactory = mQClientFactory;
- }
-
-
- public long getTimeoutMillis() {
- return timeoutMillis;
- }
-
-
- public void setTimeoutMillis(long timeoutMillis) {
- this.timeoutMillis = timeoutMillis;
- }
-
-
- public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
- createTopic(key, newTopic, queueNum, 0);
- }
-
-
- public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
- try {
- TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(key, timeoutMillis);
- List<BrokerData> brokerDataList = topicRouteData.getBrokerDatas();
- if (brokerDataList != null && !brokerDataList.isEmpty()) {
- Collections.sort(brokerDataList);
-
- boolean createOKAtLeastOnce = false;
- MQClientException exception = null;
-
- StringBuilder orderTopicString = new StringBuilder();
-
- for (BrokerData brokerData : brokerDataList) {
- String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
- if (addr != null) {
- TopicConfig topicConfig = new TopicConfig(newTopic);
- topicConfig.setReadQueueNums(queueNum);
- topicConfig.setWriteQueueNums(queueNum);
- topicConfig.setTopicSysFlag(topicSysFlag);
-
- boolean createOK = false;
- for (int i = 0; i < 5; i++) {
- try {
- this.mQClientFactory.getMQClientAPIImpl().createTopic(addr, key, topicConfig, timeoutMillis);
- createOK = true;
- createOKAtLeastOnce = true;
- break;
- } catch (Exception e) {
- if (4 == i) {
- exception = new MQClientException("create topic to broker exception", e);
- }
- }
- }
-
- if (createOK) {
- orderTopicString.append(brokerData.getBrokerName());
- orderTopicString.append(":");
- orderTopicString.append(queueNum);
- orderTopicString.append(";");
- }
- }
- }
-
- if (exception != null && !createOKAtLeastOnce) {
- throw exception;
- }
- } else {
- throw new MQClientException("Not found broker, maybe key is wrong", null);
- }
- } catch (Exception e) {
- throw new MQClientException("create new topic failed", e);
- }
- }
-
-
- public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException {
- try {
- TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
- if (topicRouteData != null) {
- TopicPublishInfo topicPublishInfo = MQClientInstance.topicRouteData2TopicPublishInfo(topic, topicRouteData);
- if (topicPublishInfo != null && topicPublishInfo.ok()) {
- return topicPublishInfo.getMessageQueueList();
- }
- }
- } catch (Exception e) {
- throw new MQClientException("Can not find Message Queue for this topic, " + topic, e);
- }
-
- throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
- }
-
-
- public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
- try {
- TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
- if (topicRouteData != null) {
- Set<MessageQueue> mqList = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
- if (!mqList.isEmpty()) {
- return mqList;
- } else {
- throw new MQClientException("Can not find Message Queue for this topic, " + topic + " Namesrv return empty", null);
- }
- }
- } catch (Exception e) {
- throw new MQClientException(
- "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST), //
- e);
- }
-
- throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
- }
-
-
- public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
- String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
- if (null == brokerAddr) {
- this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
- brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
- }
-
- if (brokerAddr != null) {
- try {
- return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timestamp,
- timeoutMillis);
- } catch (Exception e) {
- throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
- }
- }
-
- throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
- }
-
-
- public long maxOffset(MessageQueue mq) throws MQClientException {
- String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
- if (null == brokerAddr) {
- this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
- brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
- }
-
- if (brokerAddr != null) {
- try {
- return this.mQClientFactory.getMQClientAPIImpl().getMaxOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timeoutMillis);
- } catch (Exception e) {
- throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
- }
- }
-
- throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
- }
-
-
- public long minOffset(MessageQueue mq) throws MQClientException {
- String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
- if (null == brokerAddr) {
- this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
- brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
- }
-
- if (brokerAddr != null) {
- try {
- return this.mQClientFactory.getMQClientAPIImpl().getMinOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timeoutMillis);
- } catch (Exception e) {
- throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
- }
- }
-
- throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
- }
-
-
- public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
- String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
- if (null == brokerAddr) {
- this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
- brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
- }
-
- if (brokerAddr != null) {
- try {
- return this.mQClientFactory.getMQClientAPIImpl().getEarliestMsgStoretime(brokerAddr, mq.getTopic(), mq.getQueueId(),
- timeoutMillis);
- } catch (Exception e) {
- throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
- }
- }
-
- throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
- }
-
- public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
-
- MessageId messageId = null;
- try {
- messageId = MessageDecoder.decodeMessageId(msgId);
- } catch (Exception e) {
- throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by id finished, but no message.");
- }
- return this.mQClientFactory.getMQClientAPIImpl().viewMessage(RemotingUtil.socketAddress2String(messageId.getAddress()),
- messageId.getOffset(), timeoutMillis);
- }
-
- public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException,
- InterruptedException {
- return queryMessage(topic, key, maxNum, begin, end, false);
- }
-
- public MessageExt queryMessageByUniqKey(String topic, String uniqKey) throws InterruptedException, MQClientException {
-
- QueryResult qr = this.queryMessage(topic, uniqKey, 32,
- MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime() - 1000, Long.MAX_VALUE, true);
- if (qr != null && qr.getMessageList() != null && qr.getMessageList().size() > 0) {
- return qr.getMessageList().get(0);
- } else {
- return null;
- }
- }
-
- protected QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end, boolean isUniqKey) throws MQClientException,
- InterruptedException {
- TopicRouteData topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(topic);
- if (null == topicRouteData) {
- this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
- topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(topic);
- }
-
- if (topicRouteData != null) {
- List<String> brokerAddrs = new LinkedList<String>();
- for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
- String addr = brokerData.selectBrokerAddr();
- if (addr != null) {
- brokerAddrs.add(addr);
- }
- }
-
- if (!brokerAddrs.isEmpty()) {
- final CountDownLatch countDownLatch = new CountDownLatch(brokerAddrs.size());
- final List<QueryResult> queryResultList = new LinkedList<QueryResult>();
-
- for (String addr : brokerAddrs) {
- try {
- QueryMessageRequestHeader requestHeader = new QueryMessageRequestHeader();
- requestHeader.setTopic(topic);
- requestHeader.setKey(key);
- requestHeader.setMaxNum(maxNum);
- requestHeader.setBeginTimestamp(begin);
- requestHeader.setEndTimestamp(end);
-
- this.mQClientFactory.getMQClientAPIImpl().queryMessage(addr, requestHeader, timeoutMillis * 3,
- new InvokeCallback() {
- @Override
- public void operationComplete(ResponseFuture responseFuture) {
- try {
- RemotingCommand response = responseFuture.getResponseCommand();
- if (response != null) {
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- QueryMessageResponseHeader responseHeader = null;
- try {
- responseHeader =
- (QueryMessageResponseHeader) response
- .decodeCommandCustomHeader(QueryMessageResponseHeader.class);
- } catch (RemotingCommandException e) {
- log.error("decodeCommandCustomHeader exception", e);
- return;
- }
-
- List<MessageExt> wrappers =
- MessageDecoder.decodes(ByteBuffer.wrap(response.getBody()), true);
-
- QueryResult qr = new QueryResult(responseHeader.getIndexLastUpdateTimestamp(), wrappers);
- queryResultList.add(qr);
- break;
- }
- default:
- log.warn("getResponseCommand failed, {} {}", response.getCode(), response.getRemark());
- break;
- }
- } else {
- log.warn("getResponseCommand return null");
- }
- } finally {
- countDownLatch.countDown();
- }
- }
- }, isUniqKey);
- } catch (Exception e) {
- log.warn("queryMessage exception", e);
- }
-
- }
-
- boolean ok = countDownLatch.await(timeoutMillis * 4, TimeUnit.MILLISECONDS);
- if (!ok) {
- log.warn("queryMessage, maybe some broker failed");
- }
-
- long indexLastUpdateTimestamp = 0;
- List<MessageExt> messageList = new LinkedList<MessageExt>();
- for (QueryResult qr : queryResultList) {
- if (qr.getIndexLastUpdateTimestamp() > indexLastUpdateTimestamp) {
- indexLastUpdateTimestamp = qr.getIndexLastUpdateTimestamp();
- }
-
- for (MessageExt msgExt : qr.getMessageList()) {
- if (isUniqKey) {
- if (msgExt.getMsgId().equals(key)) {
-
- if (messageList.size() > 0) {
-
- if (messageList.get(0).getStoreTimestamp() > msgExt.getStoreTimestamp()) {
-
- messageList.clear();
- messageList.add(msgExt);
- }
-
- } else {
-
- messageList.add(msgExt);
- }
- } else {
- log.warn("queryMessage by uniqKey, find message key not matched, maybe hash duplicate {}", msgExt.toString());
- }
- } else {
- String keys = msgExt.getKeys();
- if (keys != null) {
- boolean matched = false;
- String[] keyArray = keys.split(MessageConst.KEY_SEPARATOR);
- if (keyArray != null) {
- for (String k : keyArray) {
- if (key.equals(k)) {
- matched = true;
- break;
- }
- }
- }
-
- if (matched) {
- messageList.add(msgExt);
- } else {
- log.warn("queryMessage, find message key not matched, maybe hash duplicate {}", msgExt.toString());
- }
- }
- }
- }
- }
-
- if (!messageList.isEmpty()) {
- return new QueryResult(indexLastUpdateTimestamp, messageList);
- } else {
- throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by key finished, but no message.");
- }
- }
- }
-
- throw new MQClientException(ResponseCode.TOPIC_NOT_EXIST, "The topic[" + topic + "] not matched route info");
- }
-}