You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 02:44:44 UTC
[48/58] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18
Rename package name from com.alibaba to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManager.java
deleted file mode 100644
index 269918a..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManager.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.offset;
-
-import com.alibaba.rocketmq.broker.BrokerController;
-import com.alibaba.rocketmq.broker.BrokerPathConfigHelper;
-import com.alibaba.rocketmq.common.ConfigManager;
-import com.alibaba.rocketmq.common.UtilAll;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-/**
- * @author shijia.wxr
- */
-public class ConsumerOffsetManager extends ConfigManager {
- private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
- private static final String TOPIC_GROUP_SEPARATOR = "@";
-
- private ConcurrentHashMap<String/* topic@group */, ConcurrentHashMap<Integer, Long>> offsetTable =
- new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>>(512);
-
- private transient BrokerController brokerController;
-
-
- public ConsumerOffsetManager() {
- }
-
-
- public ConsumerOffsetManager(BrokerController brokerController) {
- this.brokerController = brokerController;
- }
-
-
- public void scanUnsubscribedTopic() {
- Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next();
- String topicAtGroup = next.getKey();
- String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
- if (arrays != null && arrays.length == 2) {
- String topic = arrays[0];
- String group = arrays[1];
-
- if (null == brokerController.getConsumerManager().findSubscriptionData(group, topic)
- && this.offsetBehindMuchThanData(topic, next.getValue())) {
- it.remove();
- log.warn("remove topic offset, {}", topicAtGroup);
- }
- }
- }
- }
-
-
- private boolean offsetBehindMuchThanData(final String topic, ConcurrentHashMap<Integer, Long> table) {
- Iterator<Entry<Integer, Long>> it = table.entrySet().iterator();
- boolean result = !table.isEmpty();
-
- while (it.hasNext() && result) {
- Entry<Integer, Long> next = it.next();
- long minOffsetInStore = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, next.getKey());
- long offsetInPersist = next.getValue();
- if (offsetInPersist > minOffsetInStore) {
- result = false;
- } else {
- result = true;
- }
- }
-
- return result;
- }
-
-
- public Set<String> whichTopicByConsumer(final String group) {
- Set<String> topics = new HashSet<String>();
-
- Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next();
- String topicAtGroup = next.getKey();
- String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
- if (arrays != null && arrays.length == 2) {
- if (group.equals(arrays[1])) {
- topics.add(arrays[0]);
- }
- }
- }
-
- return topics;
- }
-
-
- public Set<String> whichGroupByTopic(final String topic) {
- Set<String> groups = new HashSet<String>();
-
- Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next();
- String topicAtGroup = next.getKey();
- String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
- if (arrays != null && arrays.length == 2) {
- if (topic.equals(arrays[0])) {
- groups.add(arrays[1]);
- }
- }
- }
-
- return groups;
- }
-
-
- public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, final long offset) {
- // topic@group
- String key = topic + TOPIC_GROUP_SEPARATOR + group;
- this.commitOffset(clientHost, key, queueId, offset);
- }
-
- private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
- ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key);
- if (null == map) {
- map = new ConcurrentHashMap<Integer, Long>(32);
- map.put(queueId, offset);
- this.offsetTable.put(key, map);
- } else {
- Long storeOffset = map.put(queueId, offset);
- if (storeOffset != null && offset < storeOffset) {
- log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
- }
- }
- }
-
- public long queryOffset(final String group, final String topic, final int queueId) {
- // topic@group
- String key = topic + TOPIC_GROUP_SEPARATOR + group;
- ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key);
- if (null != map) {
- Long offset = map.get(queueId);
- if (offset != null)
- return offset;
- }
-
- return -1;
- }
-
- public String encode() {
- return this.encode(false);
- }
-
- @Override
- public String configFilePath() {
- return BrokerPathConfigHelper.getConsumerOffsetPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
- }
-
- @Override
- public void decode(String jsonString) {
- if (jsonString != null) {
- ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);
- if (obj != null) {
- this.offsetTable = obj.offsetTable;
- }
- }
- }
-
- public String encode(final boolean prettyFormat) {
- return RemotingSerializable.toJson(this, prettyFormat);
- }
-
- public ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> getOffsetTable() {
- return offsetTable;
- }
-
-
- public void setOffsetTable(ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> offsetTable) {
- this.offsetTable = offsetTable;
- }
-
-
- public Map<Integer, Long> queryMinOffsetInAllGroup(final String topic, final String filterGroups) {
-
- Map<Integer, Long> queueMinOffset = new HashMap<Integer, Long>();
- Set<String> topicGroups = this.offsetTable.keySet();
- if (!UtilAll.isBlank(filterGroups)) {
- for (String group : filterGroups.split(",")) {
- Iterator<String> it = topicGroups.iterator();
- while (it.hasNext()) {
- if (group.equals(it.next().split(TOPIC_GROUP_SEPARATOR)[1])) {
- it.remove();
- }
- }
- }
- }
-
- for (Map.Entry<String, ConcurrentHashMap<Integer, Long>> offSetEntry : this.offsetTable.entrySet()) {
- String topicGroup = offSetEntry.getKey();
- String[] topicGroupArr = topicGroup.split(TOPIC_GROUP_SEPARATOR);
- if (topic.equals(topicGroupArr[0])) {
- for (Entry<Integer, Long> entry : offSetEntry.getValue().entrySet()) {
- long minOffset = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, entry.getKey());
- if (entry.getValue() >= minOffset) {
- Long offset = queueMinOffset.get(entry.getKey());
- if (offset == null) {
- queueMinOffset.put(entry.getKey(), Math.min(Long.MAX_VALUE, entry.getValue()));
- } else {
- queueMinOffset.put(entry.getKey(), Math.min(entry.getValue(), offset));
- }
- }
- }
- }
-
- }
- return queueMinOffset;
- }
-
-
- public Map<Integer, Long> queryOffset(final String group, final String topic) {
- // topic@group
- String key = topic + TOPIC_GROUP_SEPARATOR + group;
- return this.offsetTable.get(key);
- }
-
-
- public void cloneOffset(final String srcGroup, final String destGroup, final String topic) {
- ConcurrentHashMap<Integer, Long> offsets = this.offsetTable.get(topic + TOPIC_GROUP_SEPARATOR + srcGroup);
- if (offsets != null) {
- this.offsetTable.put(topic + TOPIC_GROUP_SEPARATOR + destGroup, new ConcurrentHashMap<Integer, Long>(offsets));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/out/BrokerOuterAPI.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/com/alibaba/rocketmq/broker/out/BrokerOuterAPI.java
deleted file mode 100644
index f051d29..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/out/BrokerOuterAPI.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.out;
-
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.common.namesrv.RegisterBrokerResult;
-import com.alibaba.rocketmq.common.namesrv.TopAddressing;
-import com.alibaba.rocketmq.common.protocol.RequestCode;
-import com.alibaba.rocketmq.common.protocol.ResponseCode;
-import com.alibaba.rocketmq.common.protocol.body.*;
-import com.alibaba.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
-import com.alibaba.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
-import com.alibaba.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
-import com.alibaba.rocketmq.remoting.RPCHook;
-import com.alibaba.rocketmq.remoting.RemotingClient;
-import com.alibaba.rocketmq.remoting.exception.*;
-import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
-import com.alibaba.rocketmq.remoting.netty.NettyRemotingClient;
-import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- * @author manhong.yqd
- */
-public class BrokerOuterAPI {
- private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
- private final RemotingClient remotingClient;
- private final TopAddressing topAddressing = new TopAddressing(MixAll.WS_ADDR);
- private String nameSrvAddr = null;
-
- public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {
- this(nettyClientConfig, null);
- }
-
- public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) {
- this.remotingClient = new NettyRemotingClient(nettyClientConfig);
- this.remotingClient.registerRPCHook(rpcHook);
- }
-
- public void start() {
- this.remotingClient.start();
- }
-
- public void shutdown() {
- this.remotingClient.shutdown();
- }
-
- public String fetchNameServerAddr() {
- try {
- String addrs = this.topAddressing.fetchNSAddr();
- if (addrs != null) {
- if (!addrs.equals(this.nameSrvAddr)) {
- log.info("name server address changed, old: " + this.nameSrvAddr + " new: " + addrs);
- this.updateNameServerAddressList(addrs);
- this.nameSrvAddr = addrs;
- return nameSrvAddr;
- }
- }
- } catch (Exception e) {
- log.error("fetchNameServerAddr Exception", e);
- }
- return nameSrvAddr;
- }
-
- public void updateNameServerAddressList(final String addrs) {
- List<String> lst = new ArrayList<String>();
- String[] addrArray = addrs.split(";");
- if (addrArray != null) {
- for (String addr : addrArray) {
- lst.add(addr);
- }
-
- this.remotingClient.updateNameServerAddressList(lst);
- }
- }
-
- public RegisterBrokerResult registerBrokerAll(
- final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final long brokerId,
- final String haServerAddr,
- final TopicConfigSerializeWrapper topicConfigWrapper,
- final List<String> filterServerList,
- final boolean oneway,
- final int timeoutMills) {
- RegisterBrokerResult registerBrokerResult = null;
-
- List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
- if (nameServerAddressList != null) {
- for (String namesrvAddr : nameServerAddressList) {
- try {
- RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
- haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);
- if (result != null) {
- registerBrokerResult = result;
- }
-
- log.info("register broker to name server {} OK", namesrvAddr);
- } catch (Exception e) {
- log.warn("registerBroker Exception, " + namesrvAddr, e);
- }
- }
- }
-
- return registerBrokerResult;
- }
-
- private RegisterBrokerResult registerBroker(
- final String namesrvAddr,
- final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final long brokerId,
- final String haServerAddr,
- final TopicConfigSerializeWrapper topicConfigWrapper,
- final List<String> filterServerList,
- final boolean oneway,
- final int timeoutMills
- ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
- InterruptedException {
- RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
- requestHeader.setBrokerAddr(brokerAddr);
- requestHeader.setBrokerId(brokerId);
- requestHeader.setBrokerName(brokerName);
- requestHeader.setClusterName(clusterName);
- requestHeader.setHaServerAddr(haServerAddr);
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
-
- RegisterBrokerBody requestBody = new RegisterBrokerBody();
- requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
- requestBody.setFilterServerList(filterServerList);
- request.setBody(requestBody.encode());
-
- if (oneway) {
- try {
- this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
- } catch (RemotingTooMuchRequestException e) {
- }
- return null;
- }
-
- RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- RegisterBrokerResponseHeader responseHeader =
- (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
- RegisterBrokerResult result = new RegisterBrokerResult();
- result.setMasterAddr(responseHeader.getMasterAddr());
- result.setHaServerAddr(responseHeader.getHaServerAddr());
- result.setHaServerAddr(responseHeader.getHaServerAddr());
- if (response.getBody() != null) {
- result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
- }
- return result;
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
-
- public void unregisterBrokerAll(
- final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final long brokerId
- ) {
- List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
- if (nameServerAddressList != null) {
- for (String namesrvAddr : nameServerAddressList) {
- try {
- this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId);
- log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr);
- } catch (Exception e) {
- log.warn("unregisterBroker Exception, " + namesrvAddr, e);
- }
- }
- }
- }
-
- public void unregisterBroker(
- final String namesrvAddr,
- final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final long brokerId
- ) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
- UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader();
- requestHeader.setBrokerAddr(brokerAddr);
- requestHeader.setBrokerId(brokerId);
- requestHeader.setBrokerName(brokerName);
- requestHeader.setClusterName(clusterName);
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_BROKER, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, 3000);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return;
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
-
- public TopicConfigSerializeWrapper getAllTopicConfig(final String addr) throws RemotingConnectException, RemotingSendRequestException,
- RemotingTimeoutException, InterruptedException, MQBrokerException {
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
-
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 3000);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return TopicConfigSerializeWrapper.decode(response.getBody(), TopicConfigSerializeWrapper.class);
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
-
- public ConsumerOffsetSerializeWrapper getAllConsumerOffset(final String addr) throws InterruptedException, RemotingTimeoutException,
- RemotingSendRequestException, RemotingConnectException, MQBrokerException {
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
- RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return ConsumerOffsetSerializeWrapper.decode(response.getBody(), ConsumerOffsetSerializeWrapper.class);
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
-
- public String getAllDelayOffset(final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
- RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, null);
- RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return new String(response.getBody(), MixAll.DEFAULT_CHARSET);
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
-
- public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(final String addr) throws InterruptedException, RemotingTimeoutException,
- RemotingSendRequestException, RemotingConnectException, MQBrokerException {
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
- RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return SubscriptionGroupWrapper.decode(response.getBody(), SubscriptionGroupWrapper.class);
- }
- default:
- break;
- }
-
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
-
- public void registerRPCHook(RPCHook rpcHook) {
- remotingClient.registerRPCHook(rpcHook);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/ManyMessageTransfer.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/ManyMessageTransfer.java b/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/ManyMessageTransfer.java
deleted file mode 100644
index 8050bc1..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/ManyMessageTransfer.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.pagecache;
-
-import com.alibaba.rocketmq.store.GetMessageResult;
-import io.netty.channel.FileRegion;
-import io.netty.util.AbstractReferenceCounted;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.WritableByteChannel;
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- */
-public class ManyMessageTransfer extends AbstractReferenceCounted implements FileRegion {
- private final ByteBuffer byteBufferHeader;
- private final GetMessageResult getMessageResult;
- private long transfered; // the bytes which was transfered already
-
-
- public ManyMessageTransfer(ByteBuffer byteBufferHeader, GetMessageResult getMessageResult) {
- this.byteBufferHeader = byteBufferHeader;
- this.getMessageResult = getMessageResult;
- }
-
-
- @Override
- public long position() {
- int pos = byteBufferHeader.position();
- List<ByteBuffer> messageBufferList = this.getMessageResult.getMessageBufferList();
- for (ByteBuffer bb : messageBufferList) {
- pos += bb.position();
- }
- return pos;
- }
-
- @Override
- public long transfered() {
- return transfered;
- }
-
- @Override
- public long count() {
- return byteBufferHeader.limit() + this.getMessageResult.getBufferTotalSize();
- }
-
- @Override
- public long transferTo(WritableByteChannel target, long position) throws IOException {
- if (this.byteBufferHeader.hasRemaining()) {
- transfered += target.write(this.byteBufferHeader);
- return transfered;
- } else {
- List<ByteBuffer> messageBufferList = this.getMessageResult.getMessageBufferList();
- for (ByteBuffer bb : messageBufferList) {
- if (bb.hasRemaining()) {
- transfered += target.write(bb);
- return transfered;
- }
- }
- }
-
- return 0;
- }
-
- public void close() {
- this.deallocate();
- }
-
- @Override
- protected void deallocate() {
- this.getMessageResult.release();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/OneMessageTransfer.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/OneMessageTransfer.java b/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/OneMessageTransfer.java
deleted file mode 100644
index df742c5..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/OneMessageTransfer.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.pagecache;
-
-import com.alibaba.rocketmq.store.SelectMappedBufferResult;
-import io.netty.channel.FileRegion;
-import io.netty.util.AbstractReferenceCounted;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.WritableByteChannel;
-
-
-/**
- * @author shijia.wxr
- */
-public class OneMessageTransfer extends AbstractReferenceCounted implements FileRegion {
- private final ByteBuffer byteBufferHeader;
- private final SelectMappedBufferResult selectMappedBufferResult;
- private long transfered; // the bytes which was transfered already
-
-
- public OneMessageTransfer(ByteBuffer byteBufferHeader, SelectMappedBufferResult selectMappedBufferResult) {
- this.byteBufferHeader = byteBufferHeader;
- this.selectMappedBufferResult = selectMappedBufferResult;
- }
-
-
- @Override
- public long position() {
- return this.byteBufferHeader.position() + this.selectMappedBufferResult.getByteBuffer().position();
- }
-
- @Override
- public long transfered() {
- return transfered;
- }
-
- @Override
- public long count() {
- return this.byteBufferHeader.limit() + this.selectMappedBufferResult.getSize();
- }
-
- @Override
- public long transferTo(WritableByteChannel target, long position) throws IOException {
- if (this.byteBufferHeader.hasRemaining()) {
- transfered += target.write(this.byteBufferHeader);
- return transfered;
- } else if (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
- transfered += target.write(this.selectMappedBufferResult.getByteBuffer());
- return transfered;
- }
-
- return 0;
- }
-
- public void close() {
- this.deallocate();
- }
-
- @Override
- protected void deallocate() {
- this.selectMappedBufferResult.release();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/QueryMessageTransfer.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/QueryMessageTransfer.java b/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/QueryMessageTransfer.java
deleted file mode 100644
index cbcbc74..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/QueryMessageTransfer.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.pagecache;
-
-import com.alibaba.rocketmq.store.QueryMessageResult;
-import io.netty.channel.FileRegion;
-import io.netty.util.AbstractReferenceCounted;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.WritableByteChannel;
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- */
-public class QueryMessageTransfer extends AbstractReferenceCounted implements FileRegion {
- private final ByteBuffer byteBufferHeader;
- private final QueryMessageResult queryMessageResult;
- private long transfered; // the bytes which was transfered already
-
-
- public QueryMessageTransfer(ByteBuffer byteBufferHeader, QueryMessageResult queryMessageResult) {
- this.byteBufferHeader = byteBufferHeader;
- this.queryMessageResult = queryMessageResult;
- }
-
-
- @Override
- public long position() {
- int pos = byteBufferHeader.position();
- List<ByteBuffer> messageBufferList = this.queryMessageResult.getMessageBufferList();
- for (ByteBuffer bb : messageBufferList) {
- pos += bb.position();
- }
- return pos;
- }
-
- @Override
- public long transfered() {
- return transfered;
- }
-
- @Override
- public long count() {
- return byteBufferHeader.limit() + this.queryMessageResult.getBufferTotalSize();
- }
-
- @Override
- public long transferTo(WritableByteChannel target, long position) throws IOException {
- if (this.byteBufferHeader.hasRemaining()) {
- transfered += target.write(this.byteBufferHeader);
- return transfered;
- } else {
- List<ByteBuffer> messageBufferList = this.queryMessageResult.getMessageBufferList();
- for (ByteBuffer bb : messageBufferList) {
- if (bb.hasRemaining()) {
- transfered += target.write(bb);
- return transfered;
- }
- }
- }
-
- return 0;
- }
-
- public void close() {
- this.deallocate();
- }
-
- @Override
- protected void deallocate() {
- this.queryMessageResult.release();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/AbstractPluginMessageStore.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/AbstractPluginMessageStore.java
deleted file mode 100644
index 141ba69..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/AbstractPluginMessageStore.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- *
- */
-package com.alibaba.rocketmq.broker.plugin;
-
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import com.alibaba.rocketmq.store.*;
-
-import java.util.HashMap;
-import java.util.Set;
-
-public abstract class AbstractPluginMessageStore implements MessageStore {
- protected MessageStore next = null;
- protected MessageStorePluginContext context;
-
- public AbstractPluginMessageStore(MessageStorePluginContext context, MessageStore next) {
- this.next = next;
- this.context = context;
- }
-
- @Override
- public long getEarliestMessageTime() {
- return next.getEarliestMessageTime();
- }
-
- @Override
- public long lockTimeMills() {
- return next.lockTimeMills();
- }
-
- @Override
- public boolean isOSPageCacheBusy() {
- return next.isOSPageCacheBusy();
- }
-
- @Override
- public boolean isTransientStorePoolDeficient() {
- return next.isTransientStorePoolDeficient();
- }
-
- @Override
- public boolean load() {
- return next.load();
- }
-
- @Override
- public void start() throws Exception {
- next.start();
- }
-
- @Override
- public void shutdown() {
- next.shutdown();
- }
-
- @Override
- public void destroy() {
- next.destroy();
- }
-
- @Override
- public PutMessageResult putMessage(MessageExtBrokerInner msg) {
- return next.putMessage(msg);
- }
-
- @Override
- public GetMessageResult getMessage(String group, String topic, int queueId, long offset,
- int maxMsgNums, SubscriptionData subscriptionData) {
- return next.getMessage(group, topic, queueId, offset, maxMsgNums, subscriptionData);
- }
-
- @Override
- public long getMaxOffsetInQuque(String topic, int queueId) {
- return next.getMaxOffsetInQuque(topic, queueId);
- }
-
- @Override
- public long getMinOffsetInQuque(String topic, int queueId) {
- return next.getMinOffsetInQuque(topic, queueId);
- }
-
- @Override
- public long getCommitLogOffsetInQueue(String topic, int queueId, long cqOffset) {
- return next.getCommitLogOffsetInQueue(topic, queueId, cqOffset);
- }
-
- @Override
- public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) {
- return next.getOffsetInQueueByTime(topic, queueId, timestamp);
- }
-
- @Override
- public MessageExt lookMessageByOffset(long commitLogOffset) {
- return next.lookMessageByOffset(commitLogOffset);
- }
-
- @Override
- public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset) {
- return next.selectOneMessageByOffset(commitLogOffset);
- }
-
- @Override
- public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset, int msgSize) {
- return next.selectOneMessageByOffset(commitLogOffset, msgSize);
- }
-
- @Override
- public String getRunningDataInfo() {
- return next.getRunningDataInfo();
- }
-
- @Override
- public HashMap<String, String> getRuntimeInfo() {
- return next.getRuntimeInfo();
- }
-
- @Override
- public long getMaxPhyOffset() {
- return next.getMaxPhyOffset();
- }
-
- @Override
- public long getMinPhyOffset() {
- return next.getMinPhyOffset();
- }
-
- @Override
- public long getEarliestMessageTime(String topic, int queueId) {
- return next.getEarliestMessageTime(topic, queueId);
- }
-
- @Override
- public long getMessageStoreTimeStamp(String topic, int queueId, long offset) {
- return next.getMessageStoreTimeStamp(topic, queueId, offset);
- }
-
- @Override
- public long getMessageTotalInQueue(String topic, int queueId) {
- return next.getMessageTotalInQueue(topic, queueId);
- }
-
- @Override
- public SelectMappedBufferResult getCommitLogData(long offset) {
- return next.getCommitLogData(offset);
- }
-
- @Override
- public boolean appendToCommitLog(long startOffset, byte[] data) {
- return next.appendToCommitLog(startOffset, data);
- }
-
- @Override
- public void excuteDeleteFilesManualy() {
- next.excuteDeleteFilesManualy();
- }
-
- @Override
- public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin,
- long end) {
- return next.queryMessage(topic, key, maxNum, begin, end);
- }
-
- @Override
- public void updateHaMasterAddress(String newAddr) {
- next.updateHaMasterAddress(newAddr);
- }
-
- @Override
- public long slaveFallBehindMuch() {
- return next.slaveFallBehindMuch();
- }
-
- @Override
- public long now() {
- return next.now();
- }
-
- @Override
- public int cleanUnusedTopic(Set<String> topics) {
- return next.cleanUnusedTopic(topics);
- }
-
- @Override
- public void cleanExpiredConsumerQueue() {
- next.cleanExpiredConsumerQueue();
- }
-
- @Override
- public boolean checkInDiskByConsumeOffset(String topic, int queueId, long consumeOffset) {
- return next.checkInDiskByConsumeOffset(topic, queueId, consumeOffset);
- }
-
- @Override
- public long dispatchBehindBytes() {
- return next.dispatchBehindBytes();
- }
-
- @Override
- public long flush() {
- return next.flush();
- }
-
- @Override
- public boolean resetWriteOffset(long phyOffset) {
- return next.resetWriteOffset(phyOffset);
- }
-
- @Override
- public long getConfirmOffset() {
- return next.getConfirmOffset();
- }
-
- @Override
- public void setConfirmOffset(long phyOffset) {
- next.setConfirmOffset(phyOffset);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStoreFactory.java b/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStoreFactory.java
deleted file mode 100644
index 84f5be7..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStoreFactory.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- *
- */
-package com.alibaba.rocketmq.broker.plugin;
-
-import com.alibaba.rocketmq.store.MessageStore;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-
-public final class MessageStoreFactory {
- public final static MessageStore build(MessageStorePluginContext context, MessageStore messageStore)
- throws IOException {
- String plugin = context.getBrokerConfig().getMessageStorePlugIn();
- if (plugin != null && plugin.trim().length() != 0) {
- String[] pluginClasses = plugin.split(",");
- for (int i = pluginClasses.length - 1; i >= 0; --i) {
- String pluginClass = pluginClasses[i];
- try {
- @SuppressWarnings("unchecked")
- Class<AbstractPluginMessageStore> clazz = (Class<AbstractPluginMessageStore>) Class.forName(pluginClass);
- Constructor<AbstractPluginMessageStore> construct = clazz.getConstructor(MessageStorePluginContext.class, MessageStore.class);
- AbstractPluginMessageStore pluginMessageStore = (AbstractPluginMessageStore) construct.newInstance(context, messageStore);
- messageStore = pluginMessageStore;
- } catch (Throwable e) {
- throw new RuntimeException(String.format(
- "Initialize plugin's class %s not found!", pluginClass), e);
- }
- }
- }
- return messageStore;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStorePluginContext.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStorePluginContext.java b/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStorePluginContext.java
deleted file mode 100644
index 15e8b07..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStorePluginContext.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- *
- */
-package com.alibaba.rocketmq.broker.plugin;
-
-import com.alibaba.rocketmq.common.BrokerConfig;
-import com.alibaba.rocketmq.store.MessageArrivingListener;
-import com.alibaba.rocketmq.store.config.MessageStoreConfig;
-import com.alibaba.rocketmq.store.stats.BrokerStatsManager;
-
-public class MessageStorePluginContext {
- private MessageStoreConfig messageStoreConfig;
- private BrokerStatsManager brokerStatsManager;
- private MessageArrivingListener messageArrivingListener;
- private BrokerConfig brokerConfig;
-
- public MessageStorePluginContext(MessageStoreConfig messageStoreConfig,
- BrokerStatsManager brokerStatsManager, MessageArrivingListener messageArrivingListener,
- BrokerConfig brokerConfig) {
- super();
- this.messageStoreConfig = messageStoreConfig;
- this.brokerStatsManager = brokerStatsManager;
- this.messageArrivingListener = messageArrivingListener;
- this.brokerConfig = brokerConfig;
- }
-
- public MessageStoreConfig getMessageStoreConfig() {
- return messageStoreConfig;
- }
-
- public BrokerStatsManager getBrokerStatsManager() {
- return brokerStatsManager;
- }
-
- public MessageArrivingListener getMessageArrivingListener() {
- return messageArrivingListener;
- }
-
- public BrokerConfig getBrokerConfig() {
- return brokerConfig;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/processor/AbstractSendMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/com/alibaba/rocketmq/broker/processor/AbstractSendMessageProcessor.java
deleted file mode 100644
index 95db52d..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.processor;
-
-import com.alibaba.rocketmq.broker.BrokerController;
-import com.alibaba.rocketmq.broker.mqtrace.SendMessageContext;
-import com.alibaba.rocketmq.broker.mqtrace.SendMessageHook;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.TopicConfig;
-import com.alibaba.rocketmq.common.TopicFilterType;
-import com.alibaba.rocketmq.common.constant.DBMsgConstants;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.common.constant.PermName;
-import com.alibaba.rocketmq.common.help.FAQUrl;
-import com.alibaba.rocketmq.common.message.MessageAccessor;
-import com.alibaba.rocketmq.common.message.MessageConst;
-import com.alibaba.rocketmq.common.message.MessageDecoder;
-import com.alibaba.rocketmq.common.protocol.RequestCode;
-import com.alibaba.rocketmq.common.protocol.ResponseCode;
-import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader;
-import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
-import com.alibaba.rocketmq.common.protocol.header.SendMessageResponseHeader;
-import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
-import com.alibaba.rocketmq.common.sysflag.TopicSysFlag;
-import com.alibaba.rocketmq.common.utils.ChannelUtil;
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
-import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
-import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
-import com.alibaba.rocketmq.store.MessageExtBrokerInner;
-import io.netty.channel.ChannelHandlerContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-
-/**
- * @author shijia.wxr
- */
-public abstract class AbstractSendMessageProcessor implements NettyRequestProcessor {
- protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-
- protected final static int DLQ_NUMS_PER_GROUP = 1;
- protected final BrokerController brokerController;
- protected final Random random = new Random(System.currentTimeMillis());
- protected final SocketAddress storeHost;
- private List<SendMessageHook> sendMessageHookList;
-
-
- public AbstractSendMessageProcessor(final BrokerController brokerController) {
- this.brokerController = brokerController;
- this.storeHost =
- new InetSocketAddress(brokerController.getBrokerConfig().getBrokerIP1(), brokerController
- .getNettyServerConfig().getListenPort());
- }
-
- protected SendMessageContext buildMsgContext(ChannelHandlerContext ctx,
- SendMessageRequestHeader requestHeader) {
- if (!this.hasSendMessageHook()) {
- return null;
- }
- SendMessageContext mqtraceContext;
- mqtraceContext = new SendMessageContext();
- mqtraceContext.setProducerGroup(requestHeader.getProducerGroup());
- mqtraceContext.setTopic(requestHeader.getTopic());
- mqtraceContext.setMsgProps(requestHeader.getProperties());
- mqtraceContext.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
- mqtraceContext.setBrokerAddr(this.brokerController.getBrokerAddr());
- mqtraceContext.setBrokerRegionId(this.brokerController.getBrokerConfig().getRegionId());
- mqtraceContext.setBornTimeStamp(requestHeader.getBornTimestamp());
-
- Map<String, String> properties = MessageDecoder.string2messageProperties(requestHeader.getProperties());
- String uniqueKey = properties.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
- properties.put(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
- properties.put(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
- requestHeader.setProperties(MessageDecoder.messageProperties2String(properties));
-
-
- if (uniqueKey == null) {
- uniqueKey = "";
- }
- mqtraceContext.setMsgUniqueKey(uniqueKey);
- return mqtraceContext;
- }
-
- public boolean hasSendMessageHook() {
- return sendMessageHookList != null && !this.sendMessageHookList.isEmpty();
- }
-
- protected MessageExtBrokerInner buildInnerMsg(final ChannelHandlerContext ctx,
- final SendMessageRequestHeader requestHeader, final byte[] body, TopicConfig topicConfig) {
- int queueIdInt = requestHeader.getQueueId();
- if (queueIdInt < 0) {
- queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
- }
- int sysFlag = requestHeader.getSysFlag();
-
- if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
- sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
- }
-
- MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
- msgInner.setTopic(requestHeader.getTopic());
- msgInner.setBody(body);
- msgInner.setFlag(requestHeader.getFlag());
- MessageAccessor.setProperties(msgInner,
- MessageDecoder.string2messageProperties(requestHeader.getProperties()));
- msgInner.setPropertiesString(requestHeader.getProperties());
- msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(),
- msgInner.getTags()));
-
- msgInner.setQueueId(queueIdInt);
- msgInner.setSysFlag(sysFlag);
- msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
- msgInner.setBornHost(ctx.channel().remoteAddress());
- msgInner.setStoreHost(this.getStoreHost());
- msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader
- .getReconsumeTimes());
- return msgInner;
- }
-
- public SocketAddress getStoreHost() {
- return storeHost;
- }
-
- protected RemotingCommand msgContentCheck(final ChannelHandlerContext ctx,
- final SendMessageRequestHeader requestHeader, RemotingCommand request,
- final RemotingCommand response) {
- if (requestHeader.getTopic().length() > Byte.MAX_VALUE) {
- log.warn("putMessage message topic length too long " + requestHeader.getTopic().length());
- response.setCode(ResponseCode.MESSAGE_ILLEGAL);
- return response;
- }
- if (requestHeader.getProperties() != null && requestHeader.getProperties().length() > Short.MAX_VALUE) {
- log.warn("putMessage message properties length too long "
- + requestHeader.getProperties().length());
- response.setCode(ResponseCode.MESSAGE_ILLEGAL);
- return response;
- }
- if (request.getBody().length > DBMsgConstants.MAX_BODY_SIZE) {
- log.warn(" topic {} msg body size {} from {}", requestHeader.getTopic(),
- request.getBody().length, ChannelUtil.getRemoteIp(ctx.channel()));
- response.setRemark("msg body must be less 64KB");
- response.setCode(ResponseCode.MESSAGE_ILLEGAL);
- return response;
- }
- return response;
- }
-
- protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
- final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
- if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
- && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
- response.setCode(ResponseCode.NO_PERMISSION);
- response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
- + "] sending message is forbidden");
- return response;
- }
- if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {
- String errorMsg =
- "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
- log.warn(errorMsg);
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark(errorMsg);
- return response;
- }
-
- TopicConfig topicConfig =
- this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
- if (null == topicConfig) {
- int topicSysFlag = 0;
- if (requestHeader.isUnitMode()) {
- if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
- } else {
- topicSysFlag = TopicSysFlag.buildSysFlag(true, false);
- }
- }
-
- log.warn("the topic " + requestHeader.getTopic() + " not exist, producer: "
- + ctx.channel().remoteAddress());
- topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(//
- requestHeader.getTopic(), //
- requestHeader.getDefaultTopic(), //
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()), //
- requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
-
- if (null == topicConfig) {
- if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- topicConfig =
- this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
- requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,
- topicSysFlag);
- }
- }
-
- if (null == topicConfig) {
- response.setCode(ResponseCode.TOPIC_NOT_EXIST);
- response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
- + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
- return response;
- }
- }
-
- int queueIdInt = requestHeader.getQueueId();
- int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
- if (queueIdInt >= idValid) {
- String errorInfo = String.format("request queueId[%d] is illagal, %s Producer: %s",
- queueIdInt,
- topicConfig.toString(),
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
-
- log.warn(errorInfo);
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark(errorInfo);
-
- return response;
- }
- return response;
- }
-
- public void registerSendMessageHook(List<SendMessageHook> sendMessageHookList) {
- this.sendMessageHookList = sendMessageHookList;
- }
-
- protected void doResponse(ChannelHandlerContext ctx, RemotingCommand request,
- final RemotingCommand response) {
- if (!request.isOnewayRPC()) {
- try {
- ctx.writeAndFlush(response);
- } catch (Throwable e) {
- log.error("SendMessageProcessor process request over, but response failed", e);
- log.error(request.toString());
- log.error(response.toString());
- }
- }
- }
-
- public void executeSendMessageHookBefore(final ChannelHandlerContext ctx, final RemotingCommand request,
- SendMessageContext context) {
- if (hasSendMessageHook()) {
- for (SendMessageHook hook : this.sendMessageHookList) {
- try {
- final SendMessageRequestHeader requestHeader = parseRequestHeader(request);
-
- if (null != requestHeader) {
- context.setProducerGroup(requestHeader.getProducerGroup());
- context.setTopic(requestHeader.getTopic());
- context.setBodyLength(request.getBody().length);
- context.setMsgProps(requestHeader.getProperties());
- context.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
- context.setBrokerAddr(this.brokerController.getBrokerAddr());
- context.setQueueId(requestHeader.getQueueId());
- }
-
- hook.sendMessageBefore(context);
- requestHeader.setProperties(context.getMsgProps());
- } catch (Throwable e) {
- }
- }
- }
- }
-
- protected SendMessageRequestHeader parseRequestHeader(RemotingCommand request)
- throws RemotingCommandException {
-
- SendMessageRequestHeaderV2 requestHeaderV2 = null;
- SendMessageRequestHeader requestHeader = null;
- switch (request.getCode()) {
- case RequestCode.SEND_MESSAGE_V2:
- requestHeaderV2 =
- (SendMessageRequestHeaderV2) request
- .decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
- case RequestCode.SEND_MESSAGE:
- if (null == requestHeaderV2) {
- requestHeader =
- (SendMessageRequestHeader) request
- .decodeCommandCustomHeader(SendMessageRequestHeader.class);
- } else {
- requestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2);
- }
- default:
- break;
- }
- return requestHeader;
- }
-
- public void executeSendMessageHookAfter(final RemotingCommand response, final SendMessageContext context) {
- if (hasSendMessageHook()) {
- for (SendMessageHook hook : this.sendMessageHookList) {
- try {
- if (response != null) {
- final SendMessageResponseHeader responseHeader =
- (SendMessageResponseHeader) response.readCustomHeader();
- context.setMsgId(responseHeader.getMsgId());
- context.setQueueId(responseHeader.getQueueId());
- context.setQueueOffset(responseHeader.getQueueOffset());
- context.setCode(response.getCode());
- context.setErrorMsg(response.getRemark());
- }
- hook.sendMessageAfter(context);
- } catch (Throwable e) {
-
- }
- }
- }
- }
-
- @Override
- public boolean rejectRequest() {
- return false;
- }
-}