You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:14:08 UTC
[17/99] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18
Rename package name from com.alibaba to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/message/MessageConst.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/message/MessageConst.java b/common/src/main/java/com/alibaba/rocketmq/common/message/MessageConst.java
deleted file mode 100644
index fc06d6e..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/message/MessageConst.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.common.message;
-
-import java.util.HashSet;
-
-
-public class MessageConst {
- public static final String PROPERTY_KEYS = "KEYS";
- public static final String PROPERTY_TAGS = "TAGS";
- public static final String PROPERTY_WAIT_STORE_MSG_OK = "WAIT";
- public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY";
- public static final String PROPERTY_RETRY_TOPIC = "RETRY_TOPIC";
- public static final String PROPERTY_REAL_TOPIC = "REAL_TOPIC";
- public static final String PROPERTY_REAL_QUEUE_ID = "REAL_QID";
- public static final String PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG";
- public static final String PROPERTY_PRODUCER_GROUP = "PGROUP";
- public static final String PROPERTY_MIN_OFFSET = "MIN_OFFSET";
- public static final String PROPERTY_MAX_OFFSET = "MAX_OFFSET";
- public static final String PROPERTY_BUYER_ID = "BUYER_ID";
- public static final String PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID";
- public static final String PROPERTY_TRANSFER_FLAG = "TRANSFER_FLAG";
- public static final String PROPERTY_CORRECTION_FLAG = "CORRECTION_FLAG";
- public static final String PROPERTY_MQ2_FLAG = "MQ2_FLAG";
- public static final String PROPERTY_RECONSUME_TIME = "RECONSUME_TIME";
- public static final String PROPERTY_MSG_REGION = "MSG_REGION";
- public static final String PROPERTY_TRACE_SWITCH = "TRACE_ON";
- public static final String PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY";
- public static final String PROPERTY_MAX_RECONSUME_TIMES = "MAX_RECONSUME_TIMES";
- public static final String PROPERTY_CONSUME_START_TIMESTAMP = "CONSUME_START_TIME";
-
- public static final String KEY_SEPARATOR = " ";
-
- public static final HashSet<String> STRING_HASH_SET = new HashSet<String>();
-
-
- static {
- STRING_HASH_SET.add(PROPERTY_TRACE_SWITCH);
- STRING_HASH_SET.add(PROPERTY_MSG_REGION);
- STRING_HASH_SET.add(PROPERTY_KEYS);
- STRING_HASH_SET.add(PROPERTY_TAGS);
- STRING_HASH_SET.add(PROPERTY_WAIT_STORE_MSG_OK);
- STRING_HASH_SET.add(PROPERTY_DELAY_TIME_LEVEL);
- STRING_HASH_SET.add(PROPERTY_RETRY_TOPIC);
- STRING_HASH_SET.add(PROPERTY_REAL_TOPIC);
- STRING_HASH_SET.add(PROPERTY_REAL_QUEUE_ID);
- STRING_HASH_SET.add(PROPERTY_TRANSACTION_PREPARED);
- STRING_HASH_SET.add(PROPERTY_PRODUCER_GROUP);
- STRING_HASH_SET.add(PROPERTY_MIN_OFFSET);
- STRING_HASH_SET.add(PROPERTY_MAX_OFFSET);
- STRING_HASH_SET.add(PROPERTY_BUYER_ID);
- STRING_HASH_SET.add(PROPERTY_ORIGIN_MESSAGE_ID);
- STRING_HASH_SET.add(PROPERTY_TRANSFER_FLAG);
- STRING_HASH_SET.add(PROPERTY_CORRECTION_FLAG);
- STRING_HASH_SET.add(PROPERTY_MQ2_FLAG);
- STRING_HASH_SET.add(PROPERTY_RECONSUME_TIME);
- STRING_HASH_SET.add(PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
- STRING_HASH_SET.add(PROPERTY_MAX_RECONSUME_TIMES);
- STRING_HASH_SET.add(PROPERTY_CONSUME_START_TIMESTAMP);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/message/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/com/alibaba/rocketmq/common/message/MessageDecoder.java
deleted file mode 100644
index e21c1ca..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/message/MessageDecoder.java
+++ /dev/null
@@ -1,395 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.common.message;
-
-import com.alibaba.rocketmq.common.UtilAll;
-import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.UnknownHostException;
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-
-/**
- * @author shijia.wxr
- */
-public class MessageDecoder {
- public final static int MSG_ID_LENGTH = 8 + 8;
-
- public final static Charset CHARSET_UTF8 = Charset.forName("UTF-8");
- public final static int MESSAGE_MAGIC_CODE_POSTION = 4;
- public final static int MESSAGE_FLAG_POSTION = 16;
- public final static int MESSAGE_PHYSIC_OFFSET_POSTION = 28;
- public final static int MESSAGE_STORE_TIMESTAMP_POSTION = 56;
- public final static int MESSAGE_MAGIC_CODE = 0xAABBCCDD ^ 1880681586 + 8;
-
-
- public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) {
- input.flip();
- input.limit(MessageDecoder.MSG_ID_LENGTH);
-
- input.put(addr);
- input.putLong(offset);
-
- return UtilAll.bytes2string(input.array());
- }
-
-
- public static String createMessageId(SocketAddress socketAddress, long transactionIdhashCode) {
- ByteBuffer byteBuffer = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);
- InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
- byteBuffer.put(inetSocketAddress.getAddress().getAddress());
- byteBuffer.putInt(inetSocketAddress.getPort());
- byteBuffer.putLong(transactionIdhashCode);
- byteBuffer.flip();
- return UtilAll.bytes2string(byteBuffer.array());
- }
-
-
- public static MessageId decodeMessageId(final String msgId) throws UnknownHostException {
- SocketAddress address;
- long offset;
-
-
- byte[] ip = UtilAll.string2bytes(msgId.substring(0, 8));
- byte[] port = UtilAll.string2bytes(msgId.substring(8, 16));
- ByteBuffer bb = ByteBuffer.wrap(port);
- int portInt = bb.getInt(0);
- address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt);
-
- // offset
- byte[] data = UtilAll.string2bytes(msgId.substring(16, 32));
- bb = ByteBuffer.wrap(data);
- offset = bb.getLong(0);
-
- return new MessageId(address, offset);
- }
-
-
- public static MessageExt decode(java.nio.ByteBuffer byteBuffer) {
- return decode(byteBuffer, true, true, false);
- }
-
- public static MessageExt clientDecode(java.nio.ByteBuffer byteBuffer, final boolean readBody) {
- return decode(byteBuffer, readBody, true, true);
- }
-
- public static MessageExt decode(java.nio.ByteBuffer byteBuffer, final boolean readBody) {
- return decode(byteBuffer, readBody, true, false);
- }
-
-
- public static byte[] encode(MessageExt messageExt, boolean needCompress) throws Exception {
- byte[] body = messageExt.getBody();
- byte[] topics = messageExt.getTopic().getBytes(CHARSET_UTF8);
- byte topicLen = (byte) topics.length;
- String properties = messageProperties2String(messageExt.getProperties());
- byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);
- short propertiesLength = (short) propertiesBytes.length;
- int sysFlag = messageExt.getSysFlag();
- byte[] newBody = messageExt.getBody();
- if (needCompress && (sysFlag & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) {
- newBody = UtilAll.compress(body, 5);
- }
- int bodyLength = newBody.length;
- int storeSize = messageExt.getStoreSize();
- ByteBuffer byteBuffer;
- if (storeSize > 0) {
- byteBuffer = ByteBuffer.allocate(storeSize);
- } else {
- storeSize = 4 // 1 TOTALSIZE
- + 4 // 2 MAGICCODE
- + 4 // 3 BODYCRC
- + 4 // 4 QUEUEID
- + 4 // 5 FLAG
- + 8 // 6 QUEUEOFFSET
- + 8 // 7 PHYSICALOFFSET
- + 4 // 8 SYSFLAG
- + 8 // 9 BORNTIMESTAMP
- + 8 // 10 BORNHOST
- + 8 // 11 STORETIMESTAMP
- + 8 // 12 STOREHOSTADDRESS
- + 4 // 13 RECONSUMETIMES
- + 8 // 14 Prepared Transaction Offset
- + 4 + bodyLength // 14 BODY
- + 1 + topicLen // 15 TOPIC
- + 2 + propertiesLength // 16 propertiesLength
- + 0;
- byteBuffer = ByteBuffer.allocate(storeSize);
- }
- // 1 TOTALSIZE
- byteBuffer.putInt(storeSize);
-
- // 2 MAGICCODE
- byteBuffer.putInt(MESSAGE_MAGIC_CODE);
-
- // 3 BODYCRC
- int bodyCRC = messageExt.getBodyCRC();
- byteBuffer.putInt(bodyCRC);
-
- // 4 QUEUEID
- int queueId = messageExt.getQueueId();
- byteBuffer.putInt(queueId);
-
- // 5 FLAG
- int flag = messageExt.getFlag();
- byteBuffer.putInt(flag);
-
- // 6 QUEUEOFFSET
- long queueOffset = messageExt.getQueueOffset();
- byteBuffer.putLong(queueOffset);
-
- // 7 PHYSICALOFFSET
- long physicOffset = messageExt.getCommitLogOffset();
- byteBuffer.putLong(physicOffset);
-
- // 8 SYSFLAG
- byteBuffer.putInt(sysFlag);
-
- // 9 BORNTIMESTAMP
- long bornTimeStamp = messageExt.getBornTimestamp();
- byteBuffer.putLong(bornTimeStamp);
-
- // 10 BORNHOST
- InetSocketAddress bornHost = (InetSocketAddress) messageExt.getBornHost();
- byteBuffer.put(bornHost.getAddress().getAddress());
- byteBuffer.putInt(bornHost.getPort());
-
- // 11 STORETIMESTAMP
- long storeTimestamp = messageExt.getStoreTimestamp();
- byteBuffer.putLong(storeTimestamp);
-
- // 12 STOREHOST
- InetSocketAddress serverHost = (InetSocketAddress) messageExt.getStoreHost();
- byteBuffer.put(serverHost.getAddress().getAddress());
- byteBuffer.putInt(serverHost.getPort());
-
- // 13 RECONSUMETIMES
- int reconsumeTimes = messageExt.getReconsumeTimes();
- byteBuffer.putInt(reconsumeTimes);
-
- // 14 Prepared Transaction Offset
- long preparedTransactionOffset = messageExt.getPreparedTransactionOffset();
- byteBuffer.putLong(preparedTransactionOffset);
-
- // 15 BODY
- byteBuffer.putInt(bodyLength);
- byteBuffer.put(newBody);
-
- // 16 TOPIC
- byteBuffer.put(topicLen);
- byteBuffer.put(topics);
-
- // 17 properties
- byteBuffer.putShort(propertiesLength);
- byteBuffer.put(propertiesBytes);
-
- return byteBuffer.array();
- }
-
- public static MessageExt decode(
- java.nio.ByteBuffer byteBuffer, final boolean readBody, final boolean deCompressBody) {
- return decode(byteBuffer, readBody, deCompressBody, false);
- }
-
- public static MessageExt decode(
- java.nio.ByteBuffer byteBuffer, final boolean readBody, final boolean deCompressBody, final boolean isClient) {
- try {
-
- MessageExt msgExt;
- if (isClient) {
- msgExt = new MessageClientExt();
- } else {
- msgExt = new MessageExt();
- }
-
- // 1 TOTALSIZE
- int storeSize = byteBuffer.getInt();
- msgExt.setStoreSize(storeSize);
-
- // 2 MAGICCODE
- byteBuffer.getInt();
-
- // 3 BODYCRC
- int bodyCRC = byteBuffer.getInt();
- msgExt.setBodyCRC(bodyCRC);
-
- // 4 QUEUEID
- int queueId = byteBuffer.getInt();
- msgExt.setQueueId(queueId);
-
- // 5 FLAG
- int flag = byteBuffer.getInt();
- msgExt.setFlag(flag);
-
- // 6 QUEUEOFFSET
- long queueOffset = byteBuffer.getLong();
- msgExt.setQueueOffset(queueOffset);
-
- // 7 PHYSICALOFFSET
- long physicOffset = byteBuffer.getLong();
- msgExt.setCommitLogOffset(physicOffset);
-
- // 8 SYSFLAG
- int sysFlag = byteBuffer.getInt();
- msgExt.setSysFlag(sysFlag);
-
- // 9 BORNTIMESTAMP
- long bornTimeStamp = byteBuffer.getLong();
- msgExt.setBornTimestamp(bornTimeStamp);
-
- // 10 BORNHOST
- byte[] bornHost = new byte[4];
- byteBuffer.get(bornHost, 0, 4);
- int port = byteBuffer.getInt();
- msgExt.setBornHost(new InetSocketAddress(InetAddress.getByAddress(bornHost), port));
-
- // 11 STORETIMESTAMP
- long storeTimestamp = byteBuffer.getLong();
- msgExt.setStoreTimestamp(storeTimestamp);
-
- // 12 STOREHOST
- byte[] storeHost = new byte[4];
- byteBuffer.get(storeHost, 0, 4);
- port = byteBuffer.getInt();
- msgExt.setStoreHost(new InetSocketAddress(InetAddress.getByAddress(storeHost), port));
-
- // 13 RECONSUMETIMES
- int reconsumeTimes = byteBuffer.getInt();
- msgExt.setReconsumeTimes(reconsumeTimes);
-
- // 14 Prepared Transaction Offset
- long preparedTransactionOffset = byteBuffer.getLong();
- msgExt.setPreparedTransactionOffset(preparedTransactionOffset);
-
- // 15 BODY
- int bodyLen = byteBuffer.getInt();
- if (bodyLen > 0) {
- if (readBody) {
- byte[] body = new byte[bodyLen];
- byteBuffer.get(body);
-
- // uncompress body
- if (deCompressBody && (sysFlag & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) {
- body = UtilAll.uncompress(body);
- }
-
- msgExt.setBody(body);
- } else {
- byteBuffer.position(byteBuffer.position() + bodyLen);
- }
- }
-
- // 16 TOPIC
- byte topicLen = byteBuffer.get();
- byte[] topic = new byte[(int) topicLen];
- byteBuffer.get(topic);
- msgExt.setTopic(new String(topic, CHARSET_UTF8));
-
- // 17 properties
- short propertiesLength = byteBuffer.getShort();
- if (propertiesLength > 0) {
- byte[] properties = new byte[propertiesLength];
- byteBuffer.get(properties);
- String propertiesString = new String(properties, CHARSET_UTF8);
- Map<String, String> map = string2messageProperties(propertiesString);
- msgExt.setProperties(map);
- }
-
- ByteBuffer byteBufferMsgId = ByteBuffer.allocate(MSG_ID_LENGTH);
- String msgId = createMessageId(byteBufferMsgId, msgExt.getStoreHostBytes(), msgExt.getCommitLogOffset());
- msgExt.setMsgId(msgId);
-
- if (isClient) {
- ((MessageClientExt) msgExt).setOffsetMsgId(msgId);
- }
-
- return msgExt;
- } catch (UnknownHostException e) {
- byteBuffer.position(byteBuffer.limit());
- } catch (BufferUnderflowException e) {
- byteBuffer.position(byteBuffer.limit());
- } catch (Exception e) {
- byteBuffer.position(byteBuffer.limit());
- }
-
- return null;
- }
-
-
- public static List<MessageExt> decodes(java.nio.ByteBuffer byteBuffer) {
- return decodes(byteBuffer, true);
- }
-
- public static List<MessageExt> decodes(java.nio.ByteBuffer byteBuffer, final boolean readBody) {
- List<MessageExt> msgExts = new ArrayList<MessageExt>();
- while (byteBuffer.hasRemaining()) {
- MessageExt msgExt = clientDecode(byteBuffer, readBody);
- if (null != msgExt) {
- msgExts.add(msgExt);
- } else {
- break;
- }
- }
- return msgExts;
- }
-
- public static final char NAME_VALUE_SEPARATOR = 1;
- public static final char PROPERTY_SEPARATOR = 2;
-
-
- public static String messageProperties2String(Map<String, String> properties) {
- StringBuilder sb = new StringBuilder();
- if (properties != null) {
- for (final Map.Entry<String, String> entry : properties.entrySet()) {
- final String name = entry.getKey();
- final String value = entry.getValue();
-
- sb.append(name);
- sb.append(NAME_VALUE_SEPARATOR);
- sb.append(value);
- sb.append(PROPERTY_SEPARATOR);
- }
- }
- return sb.toString();
- }
-
- public static Map<String, String> string2messageProperties(final String properties) {
- Map<String, String> map = new HashMap<String, String>();
- if (properties != null) {
- String[] items = properties.split(String.valueOf(PROPERTY_SEPARATOR));
- if (items != null) {
- for (String i : items) {
- String[] nv = i.split(String.valueOf(NAME_VALUE_SEPARATOR));
- if (nv != null && 2 == nv.length) {
- map.put(nv[0], nv[1]);
- }
- }
- }
- }
-
- return map;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/message/MessageExt.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/message/MessageExt.java b/common/src/main/java/com/alibaba/rocketmq/common/message/MessageExt.java
deleted file mode 100644
index 627935d..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/message/MessageExt.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.common.message;
-
-import com.alibaba.rocketmq.common.TopicFilterType;
-import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
-
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-
-
-/**
- * @author shijia.wxr
- */
-public class MessageExt extends Message {
- private static final long serialVersionUID = 5720810158625748049L;
-
- private int queueId;
-
- private int storeSize;
-
- private long queueOffset;
- private int sysFlag;
- private long bornTimestamp;
- private SocketAddress bornHost;
-
- private long storeTimestamp;
- private SocketAddress storeHost;
- private String msgId;
- private long commitLogOffset;
- private int bodyCRC;
- private int reconsumeTimes;
-
- private long preparedTransactionOffset;
-
-
- public MessageExt() {
- }
-
-
- public MessageExt(int queueId, long bornTimestamp, SocketAddress bornHost, long storeTimestamp,
- SocketAddress storeHost, String msgId) {
- this.queueId = queueId;
- this.bornTimestamp = bornTimestamp;
- this.bornHost = bornHost;
- this.storeTimestamp = storeTimestamp;
- this.storeHost = storeHost;
- this.msgId = msgId;
- }
-
- public static TopicFilterType parseTopicFilterType(final int sysFlag) {
- if ((sysFlag & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG) {
- return TopicFilterType.MULTI_TAG;
- }
-
- return TopicFilterType.SINGLE_TAG;
- }
-
- public ByteBuffer getBornHostBytes() {
- return socketAddress2ByteBuffer(this.bornHost);
- }
-
- public ByteBuffer getBornHostBytes(ByteBuffer byteBuffer) {
- return socketAddress2ByteBuffer(this.bornHost, byteBuffer);
- }
-
- private static ByteBuffer socketAddress2ByteBuffer(final SocketAddress socketAddress, final ByteBuffer byteBuffer) {
- InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
- byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 4);
- byteBuffer.putInt(inetSocketAddress.getPort());
- byteBuffer.flip();
- return byteBuffer;
- }
-
- public static ByteBuffer socketAddress2ByteBuffer(SocketAddress socketAddress) {
- ByteBuffer byteBuffer = ByteBuffer.allocate(8);
- return socketAddress2ByteBuffer(socketAddress, byteBuffer);
- }
-
- public ByteBuffer getStoreHostBytes() {
- return socketAddress2ByteBuffer(this.storeHost);
- }
-
- public ByteBuffer getStoreHostBytes(ByteBuffer byteBuffer) {
- return socketAddress2ByteBuffer(this.storeHost, byteBuffer);
- }
-
- public int getQueueId() {
- return queueId;
- }
-
- public void setQueueId(int queueId) {
- this.queueId = queueId;
- }
-
- public long getBornTimestamp() {
- return bornTimestamp;
- }
-
- public void setBornTimestamp(long bornTimestamp) {
- this.bornTimestamp = bornTimestamp;
- }
-
- public SocketAddress getBornHost() {
- return bornHost;
- }
-
- public void setBornHost(SocketAddress bornHost) {
- this.bornHost = bornHost;
- }
-
- public String getBornHostString() {
- if (this.bornHost != null) {
- InetSocketAddress inetSocketAddress = (InetSocketAddress) this.bornHost;
- return inetSocketAddress.getAddress().getHostAddress();
- }
-
- return null;
- }
-
- public String getBornHostNameString() {
- if (this.bornHost != null) {
- InetSocketAddress inetSocketAddress = (InetSocketAddress) this.bornHost;
- return inetSocketAddress.getAddress().getHostName();
- }
-
- return null;
- }
-
- public long getStoreTimestamp() {
- return storeTimestamp;
- }
-
- public void setStoreTimestamp(long storeTimestamp) {
- this.storeTimestamp = storeTimestamp;
- }
-
- public SocketAddress getStoreHost() {
- return storeHost;
- }
-
- public void setStoreHost(SocketAddress storeHost) {
- this.storeHost = storeHost;
- }
-
- public String getMsgId() {
- return msgId;
- }
-
- public void setMsgId(String msgId) {
- this.msgId = msgId;
- }
-
- public int getSysFlag() {
- return sysFlag;
- }
-
- public void setSysFlag(int sysFlag) {
- this.sysFlag = sysFlag;
- }
-
- public int getBodyCRC() {
- return bodyCRC;
- }
-
- public void setBodyCRC(int bodyCRC) {
- this.bodyCRC = bodyCRC;
- }
-
- public long getQueueOffset() {
- return queueOffset;
- }
-
- public void setQueueOffset(long queueOffset) {
- this.queueOffset = queueOffset;
- }
-
- public long getCommitLogOffset() {
- return commitLogOffset;
- }
-
- public void setCommitLogOffset(long physicOffset) {
- this.commitLogOffset = physicOffset;
- }
-
- public int getStoreSize() {
- return storeSize;
- }
-
- public void setStoreSize(int storeSize) {
- this.storeSize = storeSize;
- }
-
- public int getReconsumeTimes() {
- return reconsumeTimes;
- }
-
-
- public void setReconsumeTimes(int reconsumeTimes) {
- this.reconsumeTimes = reconsumeTimes;
- }
-
-
- public long getPreparedTransactionOffset() {
- return preparedTransactionOffset;
- }
-
-
- public void setPreparedTransactionOffset(long preparedTransactionOffset) {
- this.preparedTransactionOffset = preparedTransactionOffset;
- }
-
-
- @Override
- public String toString() {
- return "MessageExt [queueId=" + queueId + ", storeSize=" + storeSize + ", queueOffset=" + queueOffset
- + ", sysFlag=" + sysFlag + ", bornTimestamp=" + bornTimestamp + ", bornHost=" + bornHost
- + ", storeTimestamp=" + storeTimestamp + ", storeHost=" + storeHost + ", msgId=" + msgId
- + ", commitLogOffset=" + commitLogOffset + ", bodyCRC=" + bodyCRC + ", reconsumeTimes="
- + reconsumeTimes + ", preparedTransactionOffset=" + preparedTransactionOffset
- + ", toString()=" + super.toString() + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/message/MessageId.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/message/MessageId.java b/common/src/main/java/com/alibaba/rocketmq/common/message/MessageId.java
deleted file mode 100644
index d08be86..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/message/MessageId.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.common.message;
-
-import java.net.SocketAddress;
-
-
-/**
- * @author shijia.wxr
- */
-public class MessageId {
- private SocketAddress address;
- private long offset;
-
-
- public MessageId(SocketAddress address, long offset) {
- this.address = address;
- this.offset = offset;
- }
-
-
- public SocketAddress getAddress() {
- return address;
- }
-
-
- public void setAddress(SocketAddress address) {
- this.address = address;
- }
-
-
- public long getOffset() {
- return offset;
- }
-
-
- public void setOffset(long offset) {
- this.offset = offset;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueue.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueue.java b/common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueue.java
deleted file mode 100644
index 35d2827..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueue.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.common.message;
-
-import java.io.Serializable;
-
-
-/**
- * @author shijia.wxr
- */
-public class MessageQueue implements Comparable<MessageQueue>, Serializable {
- private static final long serialVersionUID = 6191200464116433425L;
- private String topic;
- private String brokerName;
- private int queueId;
-
-
- public MessageQueue() {
-
- }
-
-
- public MessageQueue(String topic, String brokerName, int queueId) {
- this.topic = topic;
- this.brokerName = brokerName;
- this.queueId = queueId;
- }
-
-
- public String getTopic() {
- return topic;
- }
-
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
-
- public String getBrokerName() {
- return brokerName;
- }
-
-
- public void setBrokerName(String brokerName) {
- this.brokerName = brokerName;
- }
-
-
- public int getQueueId() {
- return queueId;
- }
-
-
- public void setQueueId(int queueId) {
- this.queueId = queueId;
- }
-
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((brokerName == null) ? 0 : brokerName.hashCode());
- result = prime * result + queueId;
- result = prime * result + ((topic == null) ? 0 : topic.hashCode());
- return result;
- }
-
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- MessageQueue other = (MessageQueue) obj;
- if (brokerName == null) {
- if (other.brokerName != null)
- return false;
- } else if (!brokerName.equals(other.brokerName))
- return false;
- if (queueId != other.queueId)
- return false;
- if (topic == null) {
- if (other.topic != null)
- return false;
- } else if (!topic.equals(other.topic))
- return false;
- return true;
- }
-
-
- @Override
- public String toString() {
- return "MessageQueue [topic=" + topic + ", brokerName=" + brokerName + ", queueId=" + queueId + "]";
- }
-
-
- @Override
- public int compareTo(MessageQueue o) {
- {
- int result = this.topic.compareTo(o.topic);
- if (result != 0) {
- return result;
- }
- }
-
- {
- int result = this.brokerName.compareTo(o.brokerName);
- if (result != 0) {
- return result;
- }
- }
-
- return this.queueId - o.queueId;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueueForC.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueueForC.java b/common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueueForC.java
deleted file mode 100644
index a905af6..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/message/MessageQueueForC.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.message;
-
-import java.io.Serializable;
-
-
-/**
- * @author lansheng.zj
- */
-public class MessageQueueForC implements Comparable<MessageQueueForC>, Serializable {
-
- private static final long serialVersionUID = 5320967846569962104L;
- private String topic;
- private String brokerName;
- private int queueId;
- private long offset;
-
-
- public MessageQueueForC(String topic, String brokerName, int queueId, long offset) {
- this.topic = topic;
- this.brokerName = brokerName;
- this.queueId = queueId;
- this.offset = offset;
- }
-
-
- @Override
- public int compareTo(MessageQueueForC o) {
- int result = this.topic.compareTo(o.topic);
- if (result != 0) {
- return result;
- }
- result = this.brokerName.compareTo(o.brokerName);
- if (result != 0) {
- return result;
- }
- result = this.queueId - o.queueId;
- if (result != 0) {
- return result;
- }
- if ((this.offset - o.offset) > 0) {
- return 1;
- } else if ((this.offset - o.offset) == 0) {
- return 0;
- } else {
- return -1;
- }
- }
-
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((brokerName == null) ? 0 : brokerName.hashCode());
- result = prime * result + queueId;
- result = prime * result + ((topic == null) ? 0 : topic.hashCode());
- return result;
- }
-
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- MessageQueueForC other = (MessageQueueForC) obj;
- if (brokerName == null) {
- if (other.brokerName != null)
- return false;
- } else if (!brokerName.equals(other.brokerName))
- return false;
- if (queueId != other.queueId)
- return false;
- if (topic == null) {
- if (other.topic != null)
- return false;
- } else if (!topic.equals(other.topic))
- return false;
-
- if (offset != other.offset) {
- return false;
- }
- return true;
- }
-
-
- @Override
- public String toString() {
- return "MessageQueueForC [topic=" + topic + ", brokerName=" + brokerName + ", queueId=" + queueId
- + ", offset=" + offset + "]";
- }
-
-
- public String getTopic() {
- return topic;
- }
-
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
-
- public String getBrokerName() {
- return brokerName;
- }
-
-
- public void setBrokerName(String brokerName) {
- this.brokerName = brokerName;
- }
-
-
- public int getQueueId() {
- return queueId;
- }
-
-
- public void setQueueId(int queueId) {
- this.queueId = queueId;
- }
-
-
- public long getOffset() {
- return offset;
- }
-
-
- public void setOffset(long offset) {
- this.offset = offset;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/message/MessageType.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/message/MessageType.java b/common/src/main/java/com/alibaba/rocketmq/common/message/MessageType.java
deleted file mode 100644
index 164eb87..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/message/MessageType.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.message;
-
-public enum MessageType {
- Normal_Msg,
- Trans_Msg_Half,
- Trans_msg_Commit,
- Delay_Msg,
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvConfig.java b/common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvConfig.java
deleted file mode 100644
index 08db357..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvConfig.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * $Id: NamesrvConfig.java 1839 2013-05-16 02:12:02Z shijia.wxr $
- */
-package com.alibaba.rocketmq.common.namesrv;
-
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-
-
-/**
- *
- * @author shijia.wxr
- * @author lansheng.zj
- */
-public class NamesrvConfig {
- private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
- private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
-
- private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
- private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
- private String productEnvName = "center";
- private boolean clusterTest = false;
- private boolean orderMessageEnable = false;
-
- public boolean isOrderMessageEnable() {
- return orderMessageEnable;
- }
-
- public void setOrderMessageEnable(boolean orderMessageEnable) {
- this.orderMessageEnable = orderMessageEnable;
- }
-
- public String getRocketmqHome() {
- return rocketmqHome;
- }
-
-
- public void setRocketmqHome(String rocketmqHome) {
- this.rocketmqHome = rocketmqHome;
- }
-
-
- public String getKvConfigPath() {
- return kvConfigPath;
- }
-
-
- public void setKvConfigPath(String kvConfigPath) {
- this.kvConfigPath = kvConfigPath;
- }
-
-
- public String getProductEnvName() {
- return productEnvName;
- }
-
-
- public void setProductEnvName(String productEnvName) {
- this.productEnvName = productEnvName;
- }
-
-
- public boolean isClusterTest() {
- return clusterTest;
- }
-
-
- public void setClusterTest(boolean clusterTest) {
- this.clusterTest = clusterTest;
- }
-
- public String getConfigStorePath() {
- return configStorePath;
- }
-
- public void setConfigStorePath(final String configStorePath) {
- this.configStorePath = configStorePath;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvUtil.java b/common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvUtil.java
deleted file mode 100644
index fcc32d9..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/namesrv/NamesrvUtil.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.namesrv;
-
-/**
- * @author shijia.wxr
- */
-public class NamesrvUtil {
- public static final String NAMESPACE_ORDER_TOPIC_CONFIG = "ORDER_TOPIC_CONFIG";
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/namesrv/RegisterBrokerResult.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/namesrv/RegisterBrokerResult.java b/common/src/main/java/com/alibaba/rocketmq/common/namesrv/RegisterBrokerResult.java
deleted file mode 100644
index 68bf44a..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/namesrv/RegisterBrokerResult.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.namesrv;
-
-import com.alibaba.rocketmq.common.protocol.body.KVTable;
-
-
-/**
- * @author shijia.wxr
- */
-public class RegisterBrokerResult {
- private String haServerAddr;
- private String masterAddr;
- private KVTable kvTable;
-
-
- public String getHaServerAddr() {
- return haServerAddr;
- }
-
-
- public void setHaServerAddr(String haServerAddr) {
- this.haServerAddr = haServerAddr;
- }
-
-
- public String getMasterAddr() {
- return masterAddr;
- }
-
-
- public void setMasterAddr(String masterAddr) {
- this.masterAddr = masterAddr;
- }
-
-
- public KVTable getKvTable() {
- return kvTable;
- }
-
-
- public void setKvTable(KVTable kvTable) {
- this.kvTable = kvTable;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/namesrv/TopAddressing.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/namesrv/TopAddressing.java b/common/src/main/java/com/alibaba/rocketmq/common/namesrv/TopAddressing.java
deleted file mode 100644
index 2e4ad87..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/namesrv/TopAddressing.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * $Id: TopAddressing.java 1831 2013-05-16 01:39:51Z shijia.wxr $
- */
-package com.alibaba.rocketmq.common.namesrv;
-
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.UtilAll;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.common.help.FAQUrl;
-import com.alibaba.rocketmq.common.utils.HttpTinyClient;
-import com.alibaba.rocketmq.common.utils.HttpTinyClient.HttpResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-
-/**
- * @author shijia.wxr
- * @author manhong.yqd
- */
-public class TopAddressing {
- private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
- private String nsAddr;
- private String wsAddr;
- private String unitName;
-
-
- public TopAddressing(final String wsAddr) {
- this(wsAddr, null);
- }
-
-
- public TopAddressing(final String wsAddr, final String unitName) {
- this.wsAddr = wsAddr;
- this.unitName = unitName;
- }
-
- public final String fetchNSAddr() {
- return fetchNSAddr(true, 3000);
- }
-
- public final String fetchNSAddr(boolean verbose, long timeoutMills) {
- String url = this.wsAddr;
- try {
- if (!UtilAll.isBlank(this.unitName)) {
- url = url + "-" + this.unitName + "?nofix=1";
- }
- HttpResult result = HttpTinyClient.httpGet(url, null, null, "UTF-8", timeoutMills);
- if (200 == result.code) {
- String responseStr = result.content;
- if (responseStr != null) {
- return clearNewLine(responseStr);
- } else {
- log.error("fetch nameserver address is null");
- }
- } else {
- log.error("fetch nameserver address failed. statusCode={}", result.code);
- }
- } catch (IOException e) {
- if (verbose) {
- log.error("fetch name server address exception", e);
- }
- }
-
- if (verbose) {
- String errorMsg =
- "connect to " + url + " failed, maybe the domain name " + MixAll.WS_DOMAIN_NAME + " not bind in /etc/hosts";
- errorMsg += FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL);
-
- log.warn(errorMsg);
- }
- return null;
- }
-
- private static String clearNewLine(final String str) {
- String newString = str.trim();
- int index = newString.indexOf("\r");
- if (index != -1) {
- return newString.substring(0, index);
- }
-
- index = newString.indexOf("\n");
- if (index != -1) {
- return newString.substring(0, index);
- }
-
- return newString;
- }
-
- public String getNsAddr() {
- return nsAddr;
- }
-
-
- public void setNsAddr(String nsAddr) {
- this.nsAddr = nsAddr;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/protocol/MQProtosHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/protocol/MQProtosHelper.java b/common/src/main/java/com/alibaba/rocketmq/common/protocol/MQProtosHelper.java
deleted file mode 100644
index aaaa51d..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/protocol/MQProtosHelper.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.protocol;
-
-import com.alibaba.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-import com.alibaba.rocketmq.remoting.exception.RemotingConnectException;
-import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
-import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
-import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
-
-
-/**
- * @author shijia.wxr
- */
-public class MQProtosHelper {
- public static boolean registerBrokerToNameServer(final String nsaddr, final String brokerAddr,
- final long timeoutMillis) {
- RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
- requestHeader.setBrokerAddr(brokerAddr);
-
- RemotingCommand request =
- RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
-
- try {
- RemotingCommand response = RemotingHelper.invokeSync(nsaddr, request, timeoutMillis);
- if (response != null) {
- return ResponseCode.SUCCESS == response.getCode();
- }
- } catch (RemotingConnectException e) {
- e.printStackTrace();
- } catch (RemotingSendRequestException e) {
- e.printStackTrace();
- } catch (RemotingTimeoutException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/protocol/RequestCode.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/com/alibaba/rocketmq/common/protocol/RequestCode.java
deleted file mode 100644
index a8b8698..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/protocol/RequestCode.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.protocol;
-
-public class RequestCode {
-
- public static final int SEND_MESSAGE = 10;
-
- public static final int PULL_MESSAGE = 11;
-
- public static final int QUERY_MESSAGE = 12;
- public static final int QUERY_BROKER_OFFSET = 13;
- public static final int QUERY_CONSUMER_OFFSET = 14;
- public static final int UPDATE_CONSUMER_OFFSET = 15;
- public static final int UPDATE_AND_CREATE_TOPIC = 17;
- public static final int GET_ALL_TOPIC_CONFIG = 21;
- public static final int GET_TOPIC_CONFIG_LIST = 22;
-
- public static final int GET_TOPIC_NAME_LIST = 23;
-
- public static final int UPDATE_BROKER_CONFIG = 25;
-
- public static final int GET_BROKER_CONFIG = 26;
-
- public static final int TRIGGER_DELETE_FILES = 27;
-
- public static final int GET_BROKER_RUNTIME_INFO = 28;
- public static final int SEARCH_OFFSET_BY_TIMESTAMP = 29;
- public static final int GET_MAX_OFFSET = 30;
- public static final int GET_MIN_OFFSET = 31;
-
- public static final int GET_EARLIEST_MSG_STORETIME = 32;
-
- public static final int VIEW_MESSAGE_BY_ID = 33;
-
- public static final int HEART_BEAT = 34;
-
- public static final int UNREGISTER_CLIENT = 35;
-
- public static final int CONSUMER_SEND_MSG_BACK = 36;
-
- public static final int END_TRANSACTION = 37;
- public static final int GET_CONSUMER_LIST_BY_GROUP = 38;
-
- public static final int CHECK_TRANSACTION_STATE = 39;
-
- public static final int NOTIFY_CONSUMER_IDS_CHANGED = 40;
-
- public static final int LOCK_BATCH_MQ = 41;
-
- public static final int UNLOCK_BATCH_MQ = 42;
- public static final int GET_ALL_CONSUMER_OFFSET = 43;
-
- public static final int GET_ALL_DELAY_OFFSET = 45;
-
- public static final int PUT_KV_CONFIG = 100;
-
- public static final int GET_KV_CONFIG = 101;
-
- public static final int DELETE_KV_CONFIG = 102;
-
- public static final int REGISTER_BROKER = 103;
-
- public static final int UNREGISTER_BROKER = 104;
- public static final int GET_ROUTEINTO_BY_TOPIC = 105;
-
- public static final int GET_BROKER_CLUSTER_INFO = 106;
- public static final int UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200;
- public static final int GET_ALL_SUBSCRIPTIONGROUP_CONFIG = 201;
- public static final int GET_TOPIC_STATS_INFO = 202;
- public static final int GET_CONSUMER_CONNECTION_LIST = 203;
- public static final int GET_PRODUCER_CONNECTION_LIST = 204;
- public static final int WIPE_WRITE_PERM_OF_BROKER = 205;
-
-
- public static final int GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206;
-
- public static final int DELETE_SUBSCRIPTIONGROUP = 207;
- public static final int GET_CONSUME_STATS = 208;
-
- public static final int SUSPEND_CONSUMER = 209;
-
- public static final int RESUME_CONSUMER = 210;
- public static final int RESET_CONSUMER_OFFSET_IN_CONSUMER = 211;
- public static final int RESET_CONSUMER_OFFSET_IN_BROKER = 212;
-
- public static final int ADJUST_CONSUMER_THREAD_POOL = 213;
-
- public static final int WHO_CONSUME_THE_MESSAGE = 214;
-
-
- public static final int DELETE_TOPIC_IN_BROKER = 215;
-
- public static final int DELETE_TOPIC_IN_NAMESRV = 216;
- public static final int GET_KVLIST_BY_NAMESPACE = 219;
-
-
- public static final int RESET_CONSUMER_CLIENT_OFFSET = 220;
-
- public static final int GET_CONSUMER_STATUS_FROM_CLIENT = 221;
-
- public static final int INVOKE_BROKER_TO_RESET_OFFSET = 222;
-
- public static final int INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223;
-
-
- public static final int QUERY_TOPIC_CONSUME_BY_WHO = 300;
-
- public static final int GET_TOPICS_BY_CLUSTER = 224;
-
- public static final int REGISTER_FILTER_SERVER = 301;
- public static final int REGISTER_MESSAGE_FILTER_CLASS = 302;
-
- public static final int QUERY_CONSUME_TIME_SPAN = 303;
-
- public static final int GET_SYSTEM_TOPIC_LIST_FROM_NS = 304;
- public static final int GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305;
-
- public static final int CLEAN_EXPIRED_CONSUMEQUEUE = 306;
-
- public static final int GET_CONSUMER_RUNNING_INFO = 307;
-
- public static final int QUERY_CORRECTION_OFFSET = 308;
- public static final int CONSUME_MESSAGE_DIRECTLY = 309;
-
- public static final int SEND_MESSAGE_V2 = 310;
-
- public static final int GET_UNIT_TOPIC_LIST = 311;
-
- public static final int GET_HAS_UNIT_SUB_TOPIC_LIST = 312;
-
- public static final int GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313;
-
- public static final int CLONE_GROUP_OFFSET = 314;
-
- public static final int VIEW_BROKER_STATS_DATA = 315;
-
- public static final int CLEAN_UNUSED_TOPIC = 316;
-
- public static final int GET_BROKER_CONSUME_STATS = 317;
-
- /**
- * update the config of name server
- */
- public static final int UPDATE_NAMESRV_CONFIG = 318;
-
- /**
- * get config from name server
- */
- public static final int GET_NAMESRV_CONFIG = 319;
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/protocol/ResponseCode.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/protocol/ResponseCode.java b/common/src/main/java/com/alibaba/rocketmq/common/protocol/ResponseCode.java
deleted file mode 100644
index 3c01fad..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/protocol/ResponseCode.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.protocol;
-
-import com.alibaba.rocketmq.remoting.protocol.RemotingSysResponseCode;
-
-
-public class ResponseCode extends RemotingSysResponseCode {
-
- public static final int FLUSH_DISK_TIMEOUT = 10;
-
- public static final int SLAVE_NOT_AVAILABLE = 11;
-
- public static final int FLUSH_SLAVE_TIMEOUT = 12;
-
- public static final int MESSAGE_ILLEGAL = 13;
-
- public static final int SERVICE_NOT_AVAILABLE = 14;
-
- public static final int VERSION_NOT_SUPPORTED = 15;
-
- public static final int NO_PERMISSION = 16;
-
- public static final int TOPIC_NOT_EXIST = 17;
- public static final int TOPIC_EXIST_ALREADY = 18;
- public static final int PULL_NOT_FOUND = 19;
-
- public static final int PULL_RETRY_IMMEDIATELY = 20;
-
- public static final int PULL_OFFSET_MOVED = 21;
-
- public static final int QUERY_NOT_FOUND = 22;
-
- public static final int SUBSCRIPTION_PARSE_FAILED = 23;
-
- public static final int SUBSCRIPTION_NOT_EXIST = 24;
-
- public static final int SUBSCRIPTION_NOT_LATEST = 25;
-
- public static final int SUBSCRIPTION_GROUP_NOT_EXIST = 26;
-
- public static final int TRANSACTION_SHOULD_COMMIT = 200;
-
- public static final int TRANSACTION_SHOULD_ROLLBACK = 201;
-
- public static final int TRANSACTION_STATE_UNKNOW = 202;
-
- public static final int TRANSACTION_STATE_GROUP_WRONG = 203;
- public static final int NO_BUYER_ID = 204;
-
-
- public static final int NOT_IN_CURRENT_UNIT = 205;
-
-
- public static final int CONSUMER_NOT_ONLINE = 206;
-
-
- public static final int CONSUME_MSG_TIMEOUT = 207;
-
-
- public static final int NO_MESSAGE = 208;
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsData.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsData.java b/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsData.java
deleted file mode 100644
index 6f51b06..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsData.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.protocol.body;
-
-import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
-
-
-public class BrokerStatsData extends RemotingSerializable {
-
- private BrokerStatsItem statsMinute;
-
- private BrokerStatsItem statsHour;
-
- private BrokerStatsItem statsDay;
-
-
- public BrokerStatsItem getStatsMinute() {
- return statsMinute;
- }
-
-
- public void setStatsMinute(BrokerStatsItem statsMinute) {
- this.statsMinute = statsMinute;
- }
-
-
- public BrokerStatsItem getStatsHour() {
- return statsHour;
- }
-
-
- public void setStatsHour(BrokerStatsItem statsHour) {
- this.statsHour = statsHour;
- }
-
-
- public BrokerStatsItem getStatsDay() {
- return statsDay;
- }
-
-
- public void setStatsDay(BrokerStatsItem statsDay) {
- this.statsDay = statsDay;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsItem.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsItem.java b/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsItem.java
deleted file mode 100644
index 1cf6c3d..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/BrokerStatsItem.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.protocol.body;
-
-public class BrokerStatsItem {
- private long sum;
- private double tps;
- private double avgpt;
-
-
- public long getSum() {
- return sum;
- }
-
-
- public void setSum(long sum) {
- this.sum = sum;
- }
-
-
- public double getTps() {
- return tps;
- }
-
-
- public void setTps(double tps) {
- this.tps = tps;
- }
-
-
- public double getAvgpt() {
- return avgpt;
- }
-
-
- public void setAvgpt(double avgpt) {
- this.avgpt = avgpt;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/CMResult.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/CMResult.java b/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/CMResult.java
deleted file mode 100644
index 873b548..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/CMResult.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.protocol.body;
-
-public enum CMResult {
- CR_SUCCESS,
- CR_LATER,
- CR_ROLLBACK,
- CR_COMMIT,
- CR_THROW_EXCEPTION,
- CR_RETURN_NULL,
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ClusterInfo.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ClusterInfo.java b/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ClusterInfo.java
deleted file mode 100644
index 81d6447..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ClusterInfo.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.protocol.body;
-
-import com.alibaba.rocketmq.common.protocol.route.BrokerData;
-import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Set;
-
-
-/**
- * @author shijia.wxr
- */
-public class ClusterInfo extends RemotingSerializable {
- private HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
- private HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
-
-
- public HashMap<String, BrokerData> getBrokerAddrTable() {
- return brokerAddrTable;
- }
-
-
- public void setBrokerAddrTable(HashMap<String, BrokerData> brokerAddrTable) {
- this.brokerAddrTable = brokerAddrTable;
- }
-
-
- public HashMap<String, Set<String>> getClusterAddrTable() {
- return clusterAddrTable;
- }
-
-
- public void setClusterAddrTable(HashMap<String, Set<String>> clusterAddrTable) {
- this.clusterAddrTable = clusterAddrTable;
- }
-
-
- public String[] retrieveAllAddrByCluster(String cluster) {
- List<String> addrs = new ArrayList<String>();
- if (clusterAddrTable.containsKey(cluster)) {
- Set<String> brokerNames = clusterAddrTable.get(cluster);
- for (String brokerName : brokerNames) {
- BrokerData brokerData = brokerAddrTable.get(brokerName);
- if (null != brokerData) {
- addrs.addAll(brokerData.getBrokerAddrs().values());
- }
- }
- }
-
- return addrs.toArray(new String[]{});
- }
-
-
- public String[] retrieveAllClusterNames() {
- return clusterAddrTable.keySet().toArray(new String[]{});
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/Connection.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/Connection.java b/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/Connection.java
deleted file mode 100644
index 72cf601..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/Connection.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.protocol.body;
-
-import com.alibaba.rocketmq.remoting.protocol.LanguageCode;
-
-
-/**
- * @author shijia.wxr
- */
-public class Connection {
- private String clientId;
- private String clientAddr;
- private LanguageCode language;
- private int version;
-
-
- public String getClientId() {
- return clientId;
- }
-
-
- public void setClientId(String clientId) {
- this.clientId = clientId;
- }
-
-
- public String getClientAddr() {
- return clientAddr;
- }
-
-
- public void setClientAddr(String clientAddr) {
- this.clientAddr = clientAddr;
- }
-
-
- public LanguageCode getLanguage() {
- return language;
- }
-
-
- public void setLanguage(LanguageCode language) {
- this.language = language;
- }
-
-
- public int getVersion() {
- return version;
- }
-
-
- public void setVersion(int version) {
- this.version = version;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeByWho.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeByWho.java b/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeByWho.java
deleted file mode 100644
index 8a69352..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeByWho.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.common.protocol.body;
-
-import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
-
-import java.util.HashSet;
-
-
-/**
- * @author shijia.wxr
- *
- */
-public class ConsumeByWho extends RemotingSerializable {
- private HashSet<String> consumedGroup = new HashSet<String>();
- private HashSet<String> notConsumedGroup = new HashSet<String>();
- private String topic;
- private int queueId;
- private long offset;
-
-
- public HashSet<String> getConsumedGroup() {
- return consumedGroup;
- }
-
-
- public void setConsumedGroup(HashSet<String> consumedGroup) {
- this.consumedGroup = consumedGroup;
- }
-
-
- public HashSet<String> getNotConsumedGroup() {
- return notConsumedGroup;
- }
-
-
- public void setNotConsumedGroup(HashSet<String> notConsumedGroup) {
- this.notConsumedGroup = notConsumedGroup;
- }
-
-
- public String getTopic() {
- return topic;
- }
-
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
-
- public int getQueueId() {
- return queueId;
- }
-
-
- public void setQueueId(int queueId) {
- this.queueId = queueId;
- }
-
-
- public long getOffset() {
- return offset;
- }
-
-
- public void setOffset(long offset) {
- this.offset = offset;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java b/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java
deleted file mode 100644
index c895fe2..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.protocol.body;
-
-import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
-
-
-public class ConsumeMessageDirectlyResult extends RemotingSerializable {
- private boolean order = false;
- private boolean autoCommit = true;
- private CMResult consumeResult;
- private String remark;
- private long spentTimeMills;
-
-
- public boolean isOrder() {
- return order;
- }
-
-
- public void setOrder(boolean order) {
- this.order = order;
- }
-
-
- public boolean isAutoCommit() {
- return autoCommit;
- }
-
-
- public void setAutoCommit(boolean autoCommit) {
- this.autoCommit = autoCommit;
- }
-
-
- public String getRemark() {
- return remark;
- }
-
-
- public void setRemark(String remark) {
- this.remark = remark;
- }
-
-
- public CMResult getConsumeResult() {
- return consumeResult;
- }
-
-
- public void setConsumeResult(CMResult consumeResult) {
- this.consumeResult = consumeResult;
- }
-
-
- public long getSpentTimeMills() {
- return spentTimeMills;
- }
-
-
- public void setSpentTimeMills(long spentTimeMills) {
- this.spentTimeMills = spentTimeMills;
- }
-
-
- @Override
- public String toString() {
- return "ConsumeMessageDirectlyResult [order=" + order + ", autoCommit=" + autoCommit
- + ", consumeResult=" + consumeResult + ", remark=" + remark + ", spentTimeMills="
- + spentTimeMills + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatsList.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatsList.java b/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatsList.java
deleted file mode 100644
index a1c608d..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatsList.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.common.protocol.body;
-
-import com.alibaba.rocketmq.common.admin.ConsumeStats;
-import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-
-/**
- * @author shijia.wxr
- */
-public class ConsumeStatsList extends RemotingSerializable {
- private List<Map<String/*subscriptionGroupName*/, List<ConsumeStats>>> consumeStatsList = new ArrayList<Map<String/*subscriptionGroupName*/, List<ConsumeStats>>>();
- private String brokerAddr;
- private long totalDiff;
-
- public List<Map<String, List<ConsumeStats>>> getConsumeStatsList() {
- return consumeStatsList;
- }
-
- public void setConsumeStatsList(List<Map<String, List<ConsumeStats>>> consumeStatsList) {
- this.consumeStatsList = consumeStatsList;
- }
-
- public String getBrokerAddr() {
- return brokerAddr;
- }
-
- public void setBrokerAddr(String brokerAddr) {
- this.brokerAddr = brokerAddr;
- }
-
- public long getTotalDiff() {
- return totalDiff;
- }
-
- public void setTotalDiff(long totalDiff) {
- this.totalDiff = totalDiff;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatus.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatus.java b/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatus.java
deleted file mode 100644
index dcb6281..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumeStatus.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.protocol.body;
-
-public class ConsumeStatus {
- private double pullRT;
- private double pullTPS;
- private double consumeRT;
- private double consumeOKTPS;
- private double consumeFailedTPS;
-
- private long consumeFailedMsgs;
-
-
- public double getPullRT() {
- return pullRT;
- }
-
-
- public void setPullRT(double pullRT) {
- this.pullRT = pullRT;
- }
-
-
- public double getPullTPS() {
- return pullTPS;
- }
-
-
- public void setPullTPS(double pullTPS) {
- this.pullTPS = pullTPS;
- }
-
-
- public double getConsumeRT() {
- return consumeRT;
- }
-
-
- public void setConsumeRT(double consumeRT) {
- this.consumeRT = consumeRT;
- }
-
-
- public double getConsumeOKTPS() {
- return consumeOKTPS;
- }
-
-
- public void setConsumeOKTPS(double consumeOKTPS) {
- this.consumeOKTPS = consumeOKTPS;
- }
-
-
- public double getConsumeFailedTPS() {
- return consumeFailedTPS;
- }
-
-
- public void setConsumeFailedTPS(double consumeFailedTPS) {
- this.consumeFailedTPS = consumeFailedTPS;
- }
-
-
- public long getConsumeFailedMsgs() {
- return consumeFailedMsgs;
- }
-
-
- public void setConsumeFailedMsgs(long consumeFailedMsgs) {
- this.consumeFailedMsgs = consumeFailedMsgs;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerConnection.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerConnection.java b/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerConnection.java
deleted file mode 100644
index f74c6fc..0000000
--- a/common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerConnection.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.protocol.body;
-
-import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
-import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
-import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
-import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
-
-import java.util.HashSet;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-/**
- * @author shijia.wxr
- */
-public class ConsumerConnection extends RemotingSerializable {
- private HashSet<Connection> connectionSet = new HashSet<Connection>();
- private ConcurrentHashMap<String/* Topic */, SubscriptionData> subscriptionTable =
- new ConcurrentHashMap<String, SubscriptionData>();
- private ConsumeType consumeType;
- private MessageModel messageModel;
- private ConsumeFromWhere consumeFromWhere;
-
-
- public int computeMinVersion() {
- int minVersion = Integer.MAX_VALUE;
- for (Connection c : this.connectionSet) {
- if (c.getVersion() < minVersion) {
- minVersion = c.getVersion();
- }
- }
-
- return minVersion;
- }
-
-
- public HashSet<Connection> getConnectionSet() {
- return connectionSet;
- }
-
-
- public void setConnectionSet(HashSet<Connection> connectionSet) {
- this.connectionSet = connectionSet;
- }
-
-
- public ConcurrentHashMap<String, SubscriptionData> getSubscriptionTable() {
- return subscriptionTable;
- }
-
-
- public void setSubscriptionTable(ConcurrentHashMap<String, SubscriptionData> subscriptionTable) {
- this.subscriptionTable = subscriptionTable;
- }
-
-
- public ConsumeType getConsumeType() {
- return consumeType;
- }
-
-
- public void setConsumeType(ConsumeType consumeType) {
- this.consumeType = consumeType;
- }
-
-
- public MessageModel getMessageModel() {
- return messageModel;
- }
-
-
- public void setMessageModel(MessageModel messageModel) {
- this.messageModel = messageModel;
- }
-
-
- public ConsumeFromWhere getConsumeFromWhere() {
- return consumeFromWhere;
- }
-
-
- public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
- this.consumeFromWhere = consumeFromWhere;
- }
-}