You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:23 UTC
[06/43] incubator-rocketmq git commit: Finish code dump. Reviewed by:
@yukon @vongosling @stevenschew @vintagewang @lollipop @zander
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/stats/BrokerStatsManager.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/stats/BrokerStatsManager.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/stats/BrokerStatsManager.java
new file mode 100644
index 0000000..60b3fff
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/stats/BrokerStatsManager.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store.stats;
+
+import com.alibaba.rocketmq.common.ThreadFactoryImpl;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.stats.MomentStatsItemSet;
+import com.alibaba.rocketmq.common.stats.StatsItem;
+import com.alibaba.rocketmq.common.stats.StatsItemSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+
+public class BrokerStatsManager {
+
+ public static final String TOPIC_PUT_NUMS = "TOPIC_PUT_NUMS";
+ public static final String TOPIC_PUT_SIZE = "TOPIC_PUT_SIZE";
+ public static final String GROUP_GET_NUMS = "GROUP_GET_NUMS";
+ public static final String GROUP_GET_SIZE = "GROUP_GET_SIZE";
+ public static final String SNDBCK_PUT_NUMS = "SNDBCK_PUT_NUMS";
+ public static final String BROKER_PUT_NUMS = "BROKER_PUT_NUMS";
+ public static final String BROKER_GET_NUMS = "BROKER_GET_NUMS";
+ public static final String GROUP_GET_FROM_DISK_NUMS = "GROUP_GET_FROM_DISK_NUMS";
+ public static final String GROUP_GET_FROM_DISK_SIZE = "GROUP_GET_FROM_DISK_SIZE";
+ public static final String BROKER_GET_FROM_DISK_NUMS = "BROKER_GET_FROM_DISK_NUMS";
+ public static final String BROKER_GET_FROM_DISK_SIZE = "BROKER_GET_FROM_DISK_SIZE";
+ // For commercial
+ public static final String COMMERCIAL_SEND_TIMES = "COMMERCIAL_SEND_TIMES";
+ public static final String COMMERCIAL_SNDBCK_TIMES = "COMMERCIAL_SNDBCK_TIMES";
+ public static final String COMMERCIAL_RCV_TIMES = "COMMERCIAL_RCV_TIMES";
+ public static final String COMMERCIAL_RCV_EPOLLS = "COMMERCIAL_RCV_EPOLLS";
+ public static final String COMMERCIAL_SEND_SIZE = "COMMERCIAL_SEND_SIZE";
+ public static final String COMMERCIAL_RCV_SIZE = "COMMERCIAL_RCV_SIZE";
+ public static final String COMMERCIAL_PERM_FAILURES = "COMMERCIAL_PERM_FAILURES";
+ public static final String COMMERCIAL_OWNER = "Owner";
+ // Message Size limit for one api-calling count.
+ public static final double SIZE_PER_COUNT = 64 * 1024;
+
+ public static final String GROUP_GET_FALL_SIZE = "GROUP_GET_FALL_SIZE";
+ public static final String GROUP_GET_FALL_TIME = "GROUP_GET_FALL_TIME";
+ // Pull Message Latency
+ public static final String GROUP_GET_LATENCY = "GROUP_GET_LATENCY";
+
+ /**
+ * read disk follow stats
+ */
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_STATS_LOGGER_NAME);
+ private static final Logger COMMERCIAL_LOG = LoggerFactory.getLogger(LoggerName.COMMERCIAL_LOGGER_NAME);
+ private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
+ "BrokerStatsThread"));
+ private final ScheduledExecutorService commercialExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
+ "CommercialStatsThread"));
+ private final HashMap<String, StatsItemSet> statsTable = new HashMap<String, StatsItemSet>();
+ private final String clusterName;
+ private final MomentStatsItemSet momentStatsItemSetFallSize = new MomentStatsItemSet(GROUP_GET_FALL_SIZE, scheduledExecutorService, log);
+ private final MomentStatsItemSet momentStatsItemSetFallTime = new MomentStatsItemSet(GROUP_GET_FALL_TIME, scheduledExecutorService, log);
+
+ public BrokerStatsManager(String clusterName) {
+ this.clusterName = clusterName;
+
+ this.statsTable.put(TOPIC_PUT_NUMS, new StatsItemSet(TOPIC_PUT_NUMS, this.scheduledExecutorService, log));
+ this.statsTable.put(TOPIC_PUT_SIZE, new StatsItemSet(TOPIC_PUT_SIZE, this.scheduledExecutorService, log));
+ this.statsTable.put(GROUP_GET_NUMS, new StatsItemSet(GROUP_GET_NUMS, this.scheduledExecutorService, log));
+ this.statsTable.put(GROUP_GET_SIZE, new StatsItemSet(GROUP_GET_SIZE, this.scheduledExecutorService, log));
+ this.statsTable.put(GROUP_GET_LATENCY, new StatsItemSet(GROUP_GET_LATENCY, this.scheduledExecutorService, log));
+ this.statsTable.put(SNDBCK_PUT_NUMS, new StatsItemSet(SNDBCK_PUT_NUMS, this.scheduledExecutorService, log));
+ this.statsTable.put(BROKER_PUT_NUMS, new StatsItemSet(BROKER_PUT_NUMS, this.scheduledExecutorService, log));
+ this.statsTable.put(BROKER_GET_NUMS, new StatsItemSet(BROKER_GET_NUMS, this.scheduledExecutorService, log));
+ this.statsTable.put(GROUP_GET_FROM_DISK_NUMS, new StatsItemSet(GROUP_GET_FROM_DISK_NUMS, this.scheduledExecutorService, log));
+ this.statsTable.put(GROUP_GET_FROM_DISK_SIZE, new StatsItemSet(GROUP_GET_FROM_DISK_SIZE, this.scheduledExecutorService, log));
+ this.statsTable.put(BROKER_GET_FROM_DISK_NUMS, new StatsItemSet(BROKER_GET_FROM_DISK_NUMS, this.scheduledExecutorService, log));
+ this.statsTable.put(BROKER_GET_FROM_DISK_SIZE, new StatsItemSet(BROKER_GET_FROM_DISK_SIZE, this.scheduledExecutorService, log));
+
+
+ this.statsTable.put(COMMERCIAL_SEND_TIMES, new StatsItemSet(COMMERCIAL_SEND_TIMES, this.commercialExecutor, COMMERCIAL_LOG));
+ this.statsTable.put(COMMERCIAL_RCV_TIMES, new StatsItemSet(COMMERCIAL_RCV_TIMES, this.commercialExecutor, COMMERCIAL_LOG));
+ this.statsTable.put(COMMERCIAL_SEND_SIZE, new StatsItemSet(COMMERCIAL_SEND_SIZE, this.commercialExecutor, COMMERCIAL_LOG));
+ this.statsTable.put(COMMERCIAL_RCV_SIZE, new StatsItemSet(COMMERCIAL_RCV_SIZE, this.commercialExecutor, COMMERCIAL_LOG));
+ this.statsTable.put(COMMERCIAL_RCV_EPOLLS, new StatsItemSet(COMMERCIAL_RCV_EPOLLS, this.commercialExecutor, COMMERCIAL_LOG));
+ this.statsTable.put(COMMERCIAL_SNDBCK_TIMES, new StatsItemSet(COMMERCIAL_SNDBCK_TIMES, this.commercialExecutor, COMMERCIAL_LOG));
+ this.statsTable.put(COMMERCIAL_PERM_FAILURES, new StatsItemSet(COMMERCIAL_PERM_FAILURES, this.commercialExecutor, COMMERCIAL_LOG));
+ }
+
+ public MomentStatsItemSet getMomentStatsItemSetFallSize() {
+ return momentStatsItemSetFallSize;
+ }
+
+ public MomentStatsItemSet getMomentStatsItemSetFallTime() {
+ return momentStatsItemSetFallTime;
+ }
+
+ public void start() {
+ }
+
+ public void shutdown() {
+ this.scheduledExecutorService.shutdown();
+ }
+
+ public StatsItem getStatsItem(final String statsName, final String statsKey) {
+ try {
+ return this.statsTable.get(statsName).getStatsItem(statsKey);
+ } catch (Exception e) {
+ }
+
+ return null;
+ }
+
+ public void incTopicPutNums(final String topic) {
+ this.statsTable.get(TOPIC_PUT_NUMS).addValue(topic, 1, 1);
+ }
+
+ public void incTopicPutSize(final String topic, final int size) {
+ this.statsTable.get(TOPIC_PUT_SIZE).addValue(topic, size, 1);
+ }
+
+ public void incGroupGetNums(final String group, final String topic, final int incValue) {
+ final String statsKey = buildStatsKey(topic, group);
+ this.statsTable.get(GROUP_GET_NUMS).addValue(statsKey, incValue, 1);
+ }
+
+ public String buildStatsKey(String topic, String group) {
+ StringBuffer strBuilder = new StringBuffer();
+ strBuilder.append(topic);
+ strBuilder.append("@");
+ strBuilder.append(group);
+ return strBuilder.toString();
+ }
+
+ public void incGroupGetSize(final String group, final String topic, final int incValue) {
+ final String statsKey = buildStatsKey(topic, group);
+ this.statsTable.get(GROUP_GET_SIZE).addValue(statsKey, incValue, 1);
+ }
+
+ public void incGroupGetLatency(final String group, final String topic, final int queueId, final int incValue) {
+ final String statsKey = String.format("%d@%s@%s", queueId, topic, group);
+ this.statsTable.get(GROUP_GET_LATENCY).addValue(statsKey, incValue, 1);
+ }
+
+
+ public void incBrokerPutNums() {
+ this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().incrementAndGet();
+ }
+
+
+ public void incBrokerGetNums(final int incValue) {
+ this.statsTable.get(BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue().addAndGet(incValue);
+ }
+
+
+ public void incSendBackNums(final String group, final String topic) {
+ final String statsKey = buildStatsKey(topic, group);
+ this.statsTable.get(SNDBCK_PUT_NUMS).addValue(statsKey, 1, 1);
+ }
+
+
+ public double tpsGroupGetNums(final String group, final String topic) {
+ final String statsKey = buildStatsKey(topic, group);
+ return this.statsTable.get(GROUP_GET_NUMS).getStatsDataInMinute(statsKey).getTps();
+ }
+
+
+ public void recordDiskFallBehindTime(final String group, final String topic, final int queueId, final long fallBehind) {
+ final String statsKey = String.format("%d@%s@%s", queueId, topic, group);
+ this.momentStatsItemSetFallTime.getAndCreateStatsItem(statsKey).getValue().set(fallBehind);
+ }
+
+
+ public void recordDiskFallBehindSize(final String group, final String topic, final int queueId, final long fallBehind) {
+ final String statsKey = String.format("%d@%s@%s", queueId, topic, group);
+ this.momentStatsItemSetFallSize.getAndCreateStatsItem(statsKey).getValue().set(fallBehind);
+ }
+
+ public void incCommercialValue(final String key, final String owner, final String group,
+ final String topic, final String type, final int incValue) {
+ final String statsKey = buildCommercialStatsKey(owner, topic, group, type);
+ this.statsTable.get(key).addValue(statsKey, incValue, 1);
+ }
+
+ public String buildCommercialStatsKey(String owner, String topic, String group, String type) {
+ StringBuffer strBuilder = new StringBuffer();
+ strBuilder.append(owner);
+ strBuilder.append("@");
+ strBuilder.append(topic);
+ strBuilder.append("@");
+ strBuilder.append(group);
+ strBuilder.append("@");
+ strBuilder.append(type);
+ return strBuilder.toString();
+ }
+
+
+ public enum StatsType {
+ SEND_SUCCESS,
+ SEND_FAILURE,
+ SEND_BACK,
+ SEND_TIMER,
+ SEND_TRANSACTION,
+ RCV_SUCCESS,
+ RCV_EPOLLS,
+ PERM_FAILURE
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/util/LibC.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/util/LibC.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/util/LibC.java
new file mode 100644
index 0000000..7a3b37c
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/util/LibC.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store.util;
+
+import com.sun.jna.Library;
+import com.sun.jna.Native;
+import com.sun.jna.NativeLong;
+import com.sun.jna.Platform;
+import com.sun.jna.Pointer;
+
+
+public interface LibC extends Library {
+ LibC INSTANCE = (LibC) Native.loadLibrary(Platform.isWindows() ? "msvcrt" : "c", LibC.class);
+
+ int MADV_WILLNEED = 3;
+ int MADV_DONTNEED = 4;
+
+ int MCL_CURRENT = 1;
+ int MCL_FUTURE = 2;
+ int MCL_ONFAULT = 4;
+
+ /* sync memory asynchronously */
+ int MS_ASYNC = 0x0001;
+ /* invalidate mappings & caches */
+ int MS_INVALIDATE = 0x0002;
+ /* synchronous memory sync */
+ int MS_SYNC = 0x0004;
+
+ int mlock(Pointer var1, NativeLong var2);
+
+ int munlock(Pointer var1, NativeLong var2);
+
+ int madvise(Pointer var1, NativeLong var2, int var3);
+
+ Pointer memset(Pointer p, int v, long len);
+
+ int mlockall(int flags);
+
+ int msync(Pointer p, NativeLong length, int flags);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/DefaultMessageStoreTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/DefaultMessageStoreTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/DefaultMessageStoreTest.java
new file mode 100644
index 0000000..ac577c7
--- /dev/null
+++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/DefaultMessageStoreTest.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.store;
+
+import com.alibaba.rocketmq.store.config.FlushDiskType;
+import com.alibaba.rocketmq.store.config.MessageStoreConfig;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class DefaultMessageStoreTest {
+ private static final String StoreMessage = "Once, there was a chance for me!";
+
+ private static int QUEUE_TOTAL = 100;
+
+ private static AtomicInteger QueueId = new AtomicInteger(0);
+
+ private static SocketAddress BornHost;
+
+ private static SocketAddress StoreHost;
+
+ private static byte[] MessageBody;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
+ BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ }
+
+ @Test
+ public void test_write_read() throws Exception {
+ System.out.println("================================================================");
+ long totalMsgs = 100;
+ QUEUE_TOTAL = 1;
+ MessageBody = StoreMessage.getBytes();
+
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ messageStoreConfig.setMapedFileSizeCommitLog(1024 * 8);
+ messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 4);
+ messageStoreConfig.setMaxHashSlotNum(100);
+ messageStoreConfig.setMaxIndexNum(100 * 10);
+ MessageStore master = new DefaultMessageStore(messageStoreConfig, null, null, null);
+
+ boolean load = master.load();
+ assertTrue(load);
+
+ master.start();
+ for (long i = 0; i < totalMsgs; i++) {
+ PutMessageResult result = master.putMessage(buildMessage());
+ System.out.println(i + "\t" + result.getAppendMessageResult().getMsgId());
+ }
+
+ for (long i = 0; i < totalMsgs; i++) {
+ try {
+ GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
+ if (result == null) {
+ System.out.println("result == null " + i);
+ }
+ assertTrue(result != null);
+ result.release();
+ System.out.println("read " + i + " OK");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+ master.shutdown();
+ master.destroy();
+ System.out.println("================================================================");
+ }
+
+ public MessageExtBrokerInner buildMessage() {
+ MessageExtBrokerInner msg = new MessageExtBrokerInner();
+ msg.setTopic("AAA");
+ msg.setTags("TAG1");
+ msg.setKeys("Hello");
+ msg.setBody(MessageBody);
+ msg.setKeys(String.valueOf(System.currentTimeMillis()));
+ msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL);
+ msg.setSysFlag(4);
+ msg.setBornTimestamp(System.currentTimeMillis());
+ msg.setStoreHost(StoreHost);
+ msg.setBornHost(BornHost);
+ return msg;
+ }
+
+ @Test
+ public void test_group_commit() throws Exception {
+ System.out.println("================================================================");
+ long totalMsgs = 100;
+ QUEUE_TOTAL = 1;
+ MessageBody = StoreMessage.getBytes();
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ messageStoreConfig.setMapedFileSizeCommitLog(1024 * 8);
+ messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
+ MessageStore master = new DefaultMessageStore(messageStoreConfig, null, null, null);
+ boolean load = master.load();
+ assertTrue(load);
+
+ master.start();
+ for (long i = 0; i < totalMsgs; i++) {
+ PutMessageResult result = master.putMessage(buildMessage());
+ System.out.println(i + "\t" + result.getAppendMessageResult().getMsgId());
+ }
+
+ for (long i = 0; i < totalMsgs; i++) {
+ try {
+ GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
+ if (result == null) {
+ System.out.println("result == null " + i);
+ }
+ assertTrue(result != null);
+ result.release();
+ System.out.println("read " + i + " OK");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+ master.shutdown();
+ master.destroy();
+ System.out.println("================================================================");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java
new file mode 100644
index 0000000..ce7666f
--- /dev/null
+++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * $Id: MappedFileQueueTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.store;
+
+import org.junit.*;
+
+import static org.junit.Assert.*;
+
+
+public class MappedFileQueueTest {
+
+ // private static final String StoreMessage =
+ // "Once, there was a chance for me! but I did not treasure it. if";
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void test_getLastMapedFile() {
+ final String fixedMsg = "0123456789abcdef";
+ System.out.println("================================================================");
+ MappedFileQueue mappedFileQueue =
+ new MappedFileQueue("target/unit_test_store/a/", 1024, null);
+
+ for (int i = 0; i < 1024; i++) {
+ MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
+ assertTrue(mappedFile != null);
+ boolean result = mappedFile.appendMessage(fixedMsg.getBytes());
+ if (!result) {
+ System.out.println("appendMessage " + i);
+ }
+ assertTrue(result);
+ }
+
+ mappedFileQueue.shutdown(1000);
+ mappedFileQueue.destroy();
+ System.out.println("MappedFileQueue.getLastMappedFile() OK");
+ }
+
+
+ @Test
+ public void test_findMapedFileByOffset() {
+ final String fixedMsg = "abcd";
+ System.out.println("================================================================");
+ MappedFileQueue mappedFileQueue =
+ new MappedFileQueue("target/unit_test_store/b/", 1024, null);
+
+ for (int i = 0; i < 1024; i++) {
+ MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
+ assertTrue(mappedFile != null);
+ boolean result = mappedFile.appendMessage(fixedMsg.getBytes());
+ // System.out.println("appendMessage " + bytes);
+ assertTrue(result);
+ }
+
+ MappedFile mappedFile = mappedFileQueue.findMappedFileByOffset(0);
+ assertTrue(mappedFile != null);
+ assertEquals(mappedFile.getFileFromOffset(), 0);
+ System.out.println(mappedFile.getFileFromOffset());
+
+ mappedFile = mappedFileQueue.findMappedFileByOffset(100);
+ assertTrue(mappedFile != null);
+ assertEquals(mappedFile.getFileFromOffset(), 0);
+ System.out.println(mappedFile.getFileFromOffset());
+
+ mappedFile = mappedFileQueue.findMappedFileByOffset(1024);
+ assertTrue(mappedFile != null);
+ assertEquals(mappedFile.getFileFromOffset(), 1024);
+ System.out.println(mappedFile.getFileFromOffset());
+
+ mappedFile = mappedFileQueue.findMappedFileByOffset(1024 + 100);
+ assertTrue(mappedFile != null);
+ assertEquals(mappedFile.getFileFromOffset(), 1024);
+ System.out.println(mappedFile.getFileFromOffset());
+
+ mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 2);
+ assertTrue(mappedFile != null);
+ assertEquals(mappedFile.getFileFromOffset(), 1024 * 2);
+ System.out.println(mappedFile.getFileFromOffset());
+
+ mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 2 + 100);
+ assertTrue(mappedFile != null);
+ assertEquals(mappedFile.getFileFromOffset(), 1024 * 2);
+ System.out.println(mappedFile.getFileFromOffset());
+
+ mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 4);
+ assertTrue(mappedFile == null);
+
+ mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 4 + 100);
+ assertTrue(mappedFile == null);
+
+ mappedFileQueue.shutdown(1000);
+ mappedFileQueue.destroy();
+ System.out.println("MappedFileQueue.findMappedFileByOffset() OK");
+ }
+
+ @Test
+ public void test_commit() {
+ final String fixedMsg = "0123456789abcdef";
+ System.out.println("================================================================");
+ MappedFileQueue mappedFileQueue =
+ new MappedFileQueue("target/unit_test_store/c/", 1024, null);
+
+ for (int i = 0; i < 1024; i++) {
+ MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
+ assertTrue(mappedFile != null);
+ boolean result = mappedFile.appendMessage(fixedMsg.getBytes());
+ assertTrue(result);
+ }
+
+ boolean result = mappedFileQueue.flush(0);
+ assertFalse(result);
+ assertEquals(1024 * 1, mappedFileQueue.getFlushedWhere());
+ System.out.println("1 " + result + " " + mappedFileQueue.getFlushedWhere());
+
+ result = mappedFileQueue.flush(0);
+ assertFalse(result);
+ assertEquals(1024 * 2, mappedFileQueue.getFlushedWhere());
+ System.out.println("2 " + result + " " + mappedFileQueue.getFlushedWhere());
+
+ result = mappedFileQueue.flush(0);
+ assertFalse(result);
+ assertEquals(1024 * 3, mappedFileQueue.getFlushedWhere());
+ System.out.println("3 " + result + " " + mappedFileQueue.getFlushedWhere());
+
+ result = mappedFileQueue.flush(0);
+ assertFalse(result);
+ assertEquals(1024 * 4, mappedFileQueue.getFlushedWhere());
+ System.out.println("4 " + result + " " + mappedFileQueue.getFlushedWhere());
+
+ result = mappedFileQueue.flush(0);
+ assertFalse(result);
+ assertEquals(1024 * 5, mappedFileQueue.getFlushedWhere());
+ System.out.println("5 " + result + " " + mappedFileQueue.getFlushedWhere());
+
+ result = mappedFileQueue.flush(0);
+ assertFalse(result);
+ assertEquals(1024 * 6, mappedFileQueue.getFlushedWhere());
+ System.out.println("6 " + result + " " + mappedFileQueue.getFlushedWhere());
+
+ mappedFileQueue.shutdown(1000);
+ mappedFileQueue.destroy();
+ System.out.println("MappedFileQueue.flush() OK");
+ }
+
+ @Test
+ public void test_getMapedMemorySize() {
+ final String fixedMsg = "abcd";
+ System.out.println("================================================================");
+ MappedFileQueue mappedFileQueue =
+ new MappedFileQueue("target/unit_test_store/d/", 1024, null);
+
+ for (int i = 0; i < 1024; i++) {
+ MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
+ assertTrue(mappedFile != null);
+ boolean result = mappedFile.appendMessage(fixedMsg.getBytes());
+ assertTrue(result);
+ }
+
+ assertEquals(fixedMsg.length() * 1024, mappedFileQueue.getMappedMemorySize());
+ mappedFileQueue.shutdown(1000);
+ mappedFileQueue.destroy();
+ System.out.println("MappedFileQueue.getMappedMemorySize() OK");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileTest.java
new file mode 100644
index 0000000..94fd5ee
--- /dev/null
+++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * $Id: MappedFileTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.store;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class MappedFileTest {
+
+ private static final String StoreMessage = "Once, there was a chance for me!";
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ }
+
+ @Test
+ public void test_write_read() {
+ try {
+ MappedFile mappedFile = new MappedFile("target/unit_test_store/MappedFileTest/000", 1024 * 64);
+ boolean result = mappedFile.appendMessage(StoreMessage.getBytes());
+ assertTrue(result);
+ System.out.println("write OK");
+
+ SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(0);
+ byte[] data = new byte[StoreMessage.length()];
+ selectMappedBufferResult.getByteBuffer().get(data);
+ String readString = new String(data);
+
+ System.out.println("Read: " + readString);
+ assertTrue(readString.equals(StoreMessage));
+
+ mappedFile.shutdown(1000);
+ assertTrue(!mappedFile.isAvailable());
+ selectMappedBufferResult.release();
+ assertTrue(mappedFile.isCleanupOver());
+ assertTrue(mappedFile.destroy(1000));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Ignore
+ public void test_jvm_crashed() {
+ try {
+ MappedFile mappedFile = new MappedFile("target/unit_test_store/MappedFileTest/10086", 1024 * 64);
+ boolean result = mappedFile.appendMessage(StoreMessage.getBytes());
+ assertTrue(result);
+ System.out.println("write OK");
+
+ SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(0);
+ selectMappedBufferResult.release();
+ mappedFile.shutdown(1000);
+
+ byte[] data = new byte[StoreMessage.length()];
+ selectMappedBufferResult.getByteBuffer().get(data);
+ String readString = new String(data);
+ System.out.println(readString);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/RecoverTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/RecoverTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/RecoverTest.java
new file mode 100644
index 0000000..ea83375
--- /dev/null
+++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/RecoverTest.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * $Id: RecoverTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.store;
+
+import com.alibaba.rocketmq.common.message.MessageDecoder;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.store.config.MessageStoreConfig;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class RecoverTest {
+ private static final String StoreMessage = "Once, there was a chance for me!aaaaaaaaaaaaaaaaaaaaaaaa";
+
+ private static int QUEUE_TOTAL = 10;
+
+ private static AtomicInteger QueueId = new AtomicInteger(0);
+
+ private static SocketAddress BornHost;
+
+ private static SocketAddress StoreHost;
+
+ private static byte[] MessageBody;
+ private MessageStore storeWrite1;
+ private MessageStore storeWrite2;
+ private MessageStore storeRead;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
+ BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ }
+
+ @Test
+ public void test_recover_normally() throws Exception {
+ this.writeMessage(true, true);
+ Thread.sleep(1000 * 3);
+ this.readMessage(1000);
+ this.destroy();
+ }
+
+ public void writeMessage(boolean normal, boolean first) throws Exception {
+ System.out.println("================================================================");
+ long totalMsgs = 100;
+ QUEUE_TOTAL = 3;
+
+ MessageBody = StoreMessage.getBytes();
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ messageStoreConfig.setMapedFileSizeCommitLog(1024 * 32);
+ messageStoreConfig.setMapedFileSizeConsumeQueue(100 * 20);
+ messageStoreConfig.setMessageIndexEnable(false);
+
+ MessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, null);
+ if (first) {
+ this.storeWrite1 = messageStore;
+ } else {
+ this.storeWrite2 = messageStore;
+ }
+
+ boolean loadResult = messageStore.load();
+ assertTrue(loadResult);
+ messageStore.start();
+ for (long i = 0; i < totalMsgs; i++) {
+ PutMessageResult result = messageStore.putMessage(buildMessage());
+ System.out.println(i + "\t" + result.getAppendMessageResult().getMsgId());
+ }
+
+ if (normal) {
+ messageStore.shutdown();
+ }
+ System.out.println("========================writeMessage OK========================================");
+ }
+
+ public void readMessage(final long msgCnt) throws Exception {
+ System.out.println("================================================================");
+ QUEUE_TOTAL = 3;
+ MessageBody = StoreMessage.getBytes();
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ messageStoreConfig.setMapedFileSizeCommitLog(1024 * 32);
+ messageStoreConfig.setMapedFileSizeConsumeQueue(100 * 20);
+ messageStoreConfig.setMessageIndexEnable(false);
+ storeRead = new DefaultMessageStore(messageStoreConfig, null, null, null);
+ boolean loadResult = storeRead.load();
+ assertTrue(loadResult);
+ storeRead.start();
+
+ long readCnt = 0;
+ for (int queueId = 0; queueId < QUEUE_TOTAL; queueId++) {
+ for (long offset = 0; ; ) {
+ GetMessageResult result = storeRead.getMessage("GROUP_A", "TOPIC_A", queueId, offset, 1024 * 1024, null);
+ if (result.getStatus() == GetMessageStatus.FOUND) {
+ System.out.println(queueId + "\t" + result.getMessageCount());
+ this.veryReadMessage(queueId, offset, result.getMessageBufferList());
+ offset += result.getMessageCount();
+ readCnt += result.getMessageCount();
+ result.release();
+ } else {
+ break;
+ }
+ }
+ }
+
+ System.out.println("readCnt = " + readCnt);
+ assertTrue(readCnt == msgCnt);
+ System.out.println("========================readMessage OK========================================");
+ }
+
+ private void destroy() {
+ if (storeWrite1 != null) {
+ storeWrite1.shutdown();
+ storeWrite1.destroy();
+ }
+
+ if (storeWrite2 != null) {
+ storeWrite2.shutdown();
+ storeWrite2.destroy();
+ }
+
+ if (storeRead != null) {
+ storeRead.shutdown();
+ storeRead.destroy();
+ }
+ }
+
+ public MessageExtBrokerInner buildMessage() {
+ MessageExtBrokerInner msg = new MessageExtBrokerInner();
+ msg.setTopic("TOPIC_A");
+ msg.setTags("TAG1");
+ msg.setKeys("Hello");
+ msg.setBody(MessageBody);
+ msg.setKeys(String.valueOf(System.currentTimeMillis()));
+ msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL);
+ msg.setSysFlag(4);
+ msg.setBornTimestamp(System.currentTimeMillis());
+ msg.setStoreHost(StoreHost);
+ msg.setBornHost(BornHost);
+
+ return msg;
+ }
+
+ private void veryReadMessage(int queueId, long queueOffset, List<ByteBuffer> byteBuffers) {
+ for (ByteBuffer byteBuffer : byteBuffers) {
+ MessageExt msg = MessageDecoder.decode(byteBuffer);
+ System.out.println("request queueId " + queueId + ", request queueOffset " + queueOffset + " msg queue offset "
+ + msg.getQueueOffset());
+
+ assertTrue(msg.getQueueOffset() == queueOffset);
+
+ queueOffset++;
+ }
+ }
+
+ @Test
+ public void test_recover_normally_write() throws Exception {
+ this.writeMessage(true, true);
+ Thread.sleep(1000 * 3);
+ this.writeMessage(true, false);
+ Thread.sleep(1000 * 3);
+ this.readMessage(2000);
+ this.destroy();
+ }
+
+ @Test
+ public void test_recover_abnormally() throws Exception {
+ this.writeMessage(false, true);
+ Thread.sleep(1000 * 3);
+ this.readMessage(1000);
+ this.destroy();
+ }
+
+ @Test
+ public void test_recover_abnormally_write() throws Exception {
+ this.writeMessage(false, true);
+ Thread.sleep(1000 * 3);
+ this.writeMessage(false, false);
+ Thread.sleep(1000 * 3);
+ this.readMessage(2000);
+ this.destroy();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/StoreCheckpointTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/StoreCheckpointTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/StoreCheckpointTest.java
new file mode 100644
index 0000000..e0a550d
--- /dev/null
+++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/StoreCheckpointTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * $Id: StoreCheckpointTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.store;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class StoreCheckpointTest {
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ }
+
+ @Test
+ public void test_write_read() {
+ try {
+ StoreCheckpoint storeCheckpoint = new StoreCheckpoint("target/checkpoint_test/0000");
+ long physicMsgTimestamp = 0xAABB;
+ long logicsMsgTimestamp = 0xCCDD;
+ storeCheckpoint.setPhysicMsgTimestamp(physicMsgTimestamp);
+ storeCheckpoint.setLogicsMsgTimestamp(logicsMsgTimestamp);
+ storeCheckpoint.flush();
+
+ long diff = physicMsgTimestamp - storeCheckpoint.getMinTimestamp();
+ assertTrue(diff == 3000);
+ storeCheckpoint.shutdown();
+ storeCheckpoint = new StoreCheckpoint("target/checkpoint_test/0000");
+ assertTrue(physicMsgTimestamp == storeCheckpoint.getPhysicMsgTimestamp());
+ assertTrue(logicsMsgTimestamp == storeCheckpoint.getLogicsMsgTimestamp());
+ } catch (Throwable e) {
+ e.printStackTrace();
+ assertTrue(false);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java
new file mode 100644
index 0000000..288b87e
--- /dev/null
+++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * $Id: IndexFileTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.store.index;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+public class IndexFileTest {
+ private static final int hashSlotNum = 100;
+ private static final int indexNum = 400;
+
+ @Test
+ public void test_put_index() {
+ try {
+ IndexFile indexFile = new IndexFile("100", hashSlotNum, indexNum, 0, 0);
+ for (long i = 0; i < (indexNum - 1); i++) {
+ boolean putResult = indexFile.putKey(Long.toString(i), i, System.currentTimeMillis());
+ assertTrue(putResult);
+ }
+
+ boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis());
+ assertFalse(putResult);
+
+ indexFile.destroy(0);
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertTrue(false);
+ }
+ }
+
+
+ @Test
+ public void test_put_get_index() {
+ try {
+ IndexFile indexFile = new IndexFile("200", hashSlotNum, indexNum, 0, 0);
+
+ for (long i = 0; i < (indexNum - 1); i++) {
+ boolean putResult = indexFile.putKey(Long.toString(i), i, System.currentTimeMillis());
+ assertTrue(putResult);
+ }
+ boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis());
+ assertFalse(putResult);
+
+ final List<Long> phyOffsets = new ArrayList<Long>();
+ indexFile.selectPhyOffset(phyOffsets, "60", 10, 0, Long.MAX_VALUE, true);
+ for (Long offset : phyOffsets) {
+ System.out.println(offset);
+ }
+ assertFalse(phyOffsets.isEmpty());
+ indexFile.destroy(0);
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertTrue(false);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/schedule/ScheduleMessageTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/schedule/ScheduleMessageTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/schedule/ScheduleMessageTest.java
new file mode 100644
index 0000000..d7de738
--- /dev/null
+++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/schedule/ScheduleMessageTest.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * $Id: ScheduleMessageTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.store.schedule;
+
+import com.alibaba.rocketmq.store.*;
+import com.alibaba.rocketmq.store.config.MessageStoreConfig;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertTrue;
+
+@Ignore
+public class ScheduleMessageTest {
+ private static final String StoreMessage = "Once, there was a chance for me!";
+
+ private static int QUEUE_TOTAL = 100;
+
+ private static AtomicInteger QueueId = new AtomicInteger(0);
+
+ private static SocketAddress BornHost;
+
+ private static SocketAddress StoreHost;
+
+ private static byte[] MessageBody;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
+ BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ }
+
+ @Test
+ public void test_delay_message() throws Exception {
+ System.out.println("================================================================");
+ long totalMsgs = 10000;
+ QUEUE_TOTAL = 32;
+
+
+ MessageBody = StoreMessage.getBytes();
+
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ messageStoreConfig.setMapedFileSizeCommitLog(1024 * 32);
+ messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 16);
+ messageStoreConfig.setMaxHashSlotNum(100);
+ messageStoreConfig.setMaxIndexNum(1000 * 10);
+
+ MessageStore master = new DefaultMessageStore(messageStoreConfig, null, null, null);
+
+ boolean load = master.load();
+ assertTrue(load);
+
+
+ master.start();
+ for (int i = 0; i < totalMsgs; i++) {
+ MessageExtBrokerInner msg = buildMessage();
+ msg.setDelayTimeLevel(i % 4);
+
+ PutMessageResult result = master.putMessage(msg);
+ System.out.println(i + "\t" + result.getAppendMessageResult().getMsgId());
+ }
+
+ System.out.println("write message over, wait time up");
+ Thread.sleep(1000 * 20);
+
+
+ for (long i = 0; i < totalMsgs; i++) {
+ try {
+ GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
+ if (result == null) {
+ System.out.println("result == null " + i);
+ }
+ assertTrue(result != null);
+ result.release();
+ System.out.println("read " + i + " OK");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ Thread.sleep(1000 * 15);
+
+
+ master.shutdown();
+
+
+ master.destroy();
+ System.out.println("================================================================");
+ }
+
+ public MessageExtBrokerInner buildMessage() {
+ MessageExtBrokerInner msg = new MessageExtBrokerInner();
+ msg.setTopic("AAA");
+ msg.setTags("TAG1");
+ msg.setKeys("Hello");
+ msg.setBody(MessageBody);
+ msg.setKeys(String.valueOf(System.currentTimeMillis()));
+ msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL);
+ msg.setSysFlag(4);
+ msg.setBornTimestamp(System.currentTimeMillis());
+ msg.setStoreHost(StoreHost);
+ msg.setBornHost(BornHost);
+
+ return msg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/pom.xml
----------------------------------------------------------------------
diff --git a/rocketmq-tools/pom.xml b/rocketmq-tools/pom.xml
new file mode 100644
index 0000000..5070a68
--- /dev/null
+++ b/rocketmq-tools/pom.xml
@@ -0,0 +1,66 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain producerGroup copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>com.alibaba.rocketmq</groupId>
+ <artifactId>rocketmq-all</artifactId>
+ <version>4.0.0-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <packaging>jar</packaging>
+ <artifactId>rocketmq-tools</artifactId>
+ <name>rocketmq-tools ${project.version}</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rocketmq-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rocketmq-store</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rocketmq-srvutil</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExt.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExt.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExt.java
new file mode 100644
index 0000000..4576886
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -0,0 +1,458 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.admin;
+
+import com.alibaba.rocketmq.client.ClientConfig;
+import com.alibaba.rocketmq.client.QueryResult;
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.TopicConfig;
+import com.alibaba.rocketmq.common.admin.ConsumeStats;
+import com.alibaba.rocketmq.common.admin.RollbackStats;
+import com.alibaba.rocketmq.common.admin.TopicStatsTable;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.common.protocol.body.*;
+import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
+import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.exception.*;
+import com.alibaba.rocketmq.tools.admin.api.MessageTrack;
+
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
+ private final DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+ private String adminExtGroup = "admin_ext_group";
+ private String createTopicKey = MixAll.DEFAULT_TOPIC;
+ private long timeoutMillis = 5000;
+
+ public DefaultMQAdminExt() {
+ this.defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(this, null, timeoutMillis);
+ }
+
+ public DefaultMQAdminExt(long timeoutMillis) {
+ this.defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(this, null, timeoutMillis);
+ }
+
+ public DefaultMQAdminExt(RPCHook rpcHook) {
+ this.defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(this, rpcHook, timeoutMillis);
+ }
+
+ public DefaultMQAdminExt(RPCHook rpcHook, long timeoutMillis) {
+ this.defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(this, rpcHook, timeoutMillis);
+ }
+
+ public DefaultMQAdminExt(final String adminExtGroup) {
+ this.adminExtGroup = adminExtGroup;
+ this.defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(this, timeoutMillis);
+ }
+
+ public DefaultMQAdminExt(final String adminExtGroup, long timeoutMillis) {
+ this.adminExtGroup = adminExtGroup;
+ this.defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(this, timeoutMillis);
+ }
+
+ @Override
+ public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
+ createTopic(key, newTopic, queueNum, 0);
+ }
+
+ @Override
+ public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
+ defaultMQAdminExtImpl.createTopic(key, newTopic, queueNum, topicSysFlag);
+ }
+
+ @Override
+ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
+ return defaultMQAdminExtImpl.searchOffset(mq, timestamp);
+ }
+
+ @Override
+ public long maxOffset(MessageQueue mq) throws MQClientException {
+ return defaultMQAdminExtImpl.maxOffset(mq);
+ }
+
+
+ @Override
+ public long minOffset(MessageQueue mq) throws MQClientException {
+ return defaultMQAdminExtImpl.minOffset(mq);
+ }
+
+ @Override
+ public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
+ return defaultMQAdminExtImpl.earliestMsgStoreTime(mq);
+ }
+
+ @Override
+ public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ return defaultMQAdminExtImpl.viewMessage(offsetMsgId);
+ }
+
+ @Override
+ public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException,
+ InterruptedException {
+ return defaultMQAdminExtImpl.queryMessage(topic, key, maxNum, begin, end);
+ }
+
+ @Override
+ public void start() throws MQClientException {
+ defaultMQAdminExtImpl.start();
+ }
+
+ @Override
+ public void shutdown() {
+ defaultMQAdminExtImpl.shutdown();
+ }
+
+ @Override
+ public void updateBrokerConfig(String brokerAddr, Properties properties) throws RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException {
+ defaultMQAdminExtImpl.updateBrokerConfig(brokerAddr, properties);
+ }
+
+ @Override
+ public Properties getBrokerConfig(final String brokerAddr) throws RemotingConnectException,
+ RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException {
+ return defaultMQAdminExtImpl.getBrokerConfig(brokerAddr);
+ }
+
+ @Override
+ public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, MQBrokerException,
+ InterruptedException, MQClientException {
+ defaultMQAdminExtImpl.createAndUpdateTopicConfig(addr, config);
+ }
+
+ @Override
+ public void createAndUpdateSubscriptionGroupConfig(String addr, SubscriptionGroupConfig config) throws RemotingException,
+ MQBrokerException, InterruptedException, MQClientException {
+ defaultMQAdminExtImpl.createAndUpdateSubscriptionGroupConfig(addr, config);
+ }
+
+ @Override
+ public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) {
+ return defaultMQAdminExtImpl.examineSubscriptionGroupConfig(addr, group);
+ }
+
+ @Override
+ public TopicConfig examineTopicConfig(String addr, String topic) {
+ return defaultMQAdminExtImpl.examineTopicConfig(addr, topic);
+ }
+
+ @Override
+ public TopicStatsTable examineTopicStats(String topic) throws RemotingException, MQClientException, InterruptedException,
+ MQBrokerException {
+ return defaultMQAdminExtImpl.examineTopicStats(topic);
+ }
+
+ @Override
+ public TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException {
+ return this.defaultMQAdminExtImpl.fetchAllTopicList();
+ }
+
+ @Override
+ public TopicList fetchTopicsByCLuster(String clusterName) throws RemotingException, MQClientException, InterruptedException {
+ return this.defaultMQAdminExtImpl.fetchTopicsByCLuster(clusterName);
+ }
+
+ @Override
+ public KVTable fetchBrokerRuntimeStats(final String brokerAddr) throws RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, InterruptedException, MQBrokerException {
+ return this.defaultMQAdminExtImpl.fetchBrokerRuntimeStats(brokerAddr);
+ }
+
+ @Override
+ public ConsumeStats examineConsumeStats(String consumerGroup) throws RemotingException, MQClientException, InterruptedException,
+ MQBrokerException {
+ return examineConsumeStats(consumerGroup, null);
+ }
+
+ @Override
+ public ConsumeStats examineConsumeStats(String consumerGroup, String topic) throws RemotingException, MQClientException,
+ InterruptedException, MQBrokerException {
+ return defaultMQAdminExtImpl.examineConsumeStats(consumerGroup, topic);
+ }
+
+ @Override
+ public ClusterInfo examineBrokerClusterInfo() throws InterruptedException, RemotingConnectException, RemotingTimeoutException,
+ RemotingSendRequestException, MQBrokerException {
+ return defaultMQAdminExtImpl.examineBrokerClusterInfo();
+ }
+
+ @Override
+ public TopicRouteData examineTopicRouteInfo(String topic) throws RemotingException, MQClientException, InterruptedException {
+ return defaultMQAdminExtImpl.examineTopicRouteInfo(topic);
+ }
+
+ @Override
+ public ConsumerConnection examineConsumerConnectionInfo(String consumerGroup) throws InterruptedException, MQBrokerException,
+ RemotingException, MQClientException {
+ return defaultMQAdminExtImpl.examineConsumerConnectionInfo(consumerGroup);
+ }
+
+ @Override
+ public ProducerConnection examineProducerConnectionInfo(String producerGroup, final String topic) throws RemotingException,
+ MQClientException, InterruptedException, MQBrokerException {
+ return defaultMQAdminExtImpl.examineProducerConnectionInfo(producerGroup, topic);
+ }
+
+ @Override
+ public List<String> getNameServerAddressList() {
+ return this.defaultMQAdminExtImpl.getNameServerAddressList();
+ }
+
+ @Override
+ public int wipeWritePermOfBroker(final String namesrvAddr, String brokerName) throws RemotingCommandException,
+ RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
+ return defaultMQAdminExtImpl.wipeWritePermOfBroker(namesrvAddr, brokerName);
+ }
+
+ @Override
+ public void putKVConfig(String namespace, String key, String value) {
+ defaultMQAdminExtImpl.putKVConfig(namespace, key, value);
+ }
+
+ @Override
+ public String getKVConfig(String namespace, String key) throws RemotingException, MQClientException, InterruptedException {
+ return defaultMQAdminExtImpl.getKVConfig(namespace, key);
+ }
+
+ @Override
+ public KVTable getKVListByNamespace(String namespace) throws RemotingException, MQClientException, InterruptedException {
+ return defaultMQAdminExtImpl.getKVListByNamespace(namespace);
+ }
+
+ @Override
+ public void deleteTopicInBroker(Set<String> addrs, String topic) throws RemotingException, MQBrokerException, InterruptedException,
+ MQClientException {
+ defaultMQAdminExtImpl.deleteTopicInBroker(addrs, topic);
+ }
+
+ @Override
+ public void deleteTopicInNameServer(Set<String> addrs, String topic) throws RemotingException, MQBrokerException, InterruptedException,
+ MQClientException {
+ defaultMQAdminExtImpl.deleteTopicInNameServer(addrs, topic);
+ }
+
+ @Override
+ public void deleteSubscriptionGroup(String addr, String groupName) throws RemotingException, MQBrokerException, InterruptedException,
+ MQClientException {
+ defaultMQAdminExtImpl.deleteSubscriptionGroup(addr, groupName);
+ }
+
+ @Override
+ public void createAndUpdateKvConfig(String namespace, String key, String value) throws RemotingException, MQBrokerException,
+ InterruptedException, MQClientException {
+ defaultMQAdminExtImpl.createAndUpdateKvConfig(namespace, key, value);
+ }
+
+ @Override
+ public void deleteKvConfig(String namespace, String key) throws RemotingException, MQBrokerException, InterruptedException,
+ MQClientException {
+ defaultMQAdminExtImpl.deleteKvConfig(namespace, key);
+ }
+
+ public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp, boolean force)
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ return defaultMQAdminExtImpl.resetOffsetByTimestampOld(consumerGroup, topic, timestamp, force);
+ }
+
+ @Override
+ public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce)
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ return resetOffsetByTimestamp(topic, group, timestamp, isForce, false);
+ }
+
+ public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce, boolean isC)
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ return defaultMQAdminExtImpl.resetOffsetByTimestamp(topic, group, timestamp, isForce, isC);
+ }
+
+ @Override
+ public void resetOffsetNew(String consumerGroup, String topic, long timestamp) throws RemotingException, MQBrokerException,
+ InterruptedException, MQClientException {
+ this.defaultMQAdminExtImpl.resetOffsetNew(consumerGroup, topic, timestamp);
+ }
+
+ @Override
+ public Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group, String clientAddr) throws RemotingException,
+ MQBrokerException, InterruptedException, MQClientException {
+ return defaultMQAdminExtImpl.getConsumeStatus(topic, group, clientAddr);
+ }
+
+ @Override
+ public void createOrUpdateOrderConf(String key, String value, boolean isCluster) throws RemotingException, MQBrokerException,
+ InterruptedException, MQClientException {
+ defaultMQAdminExtImpl.createOrUpdateOrderConf(key, value, isCluster);
+ }
+
+ @Override
+ public GroupList queryTopicConsumeByWho(String topic) throws InterruptedException, MQBrokerException, RemotingException,
+ MQClientException {
+ return this.defaultMQAdminExtImpl.queryTopicConsumeByWho(topic);
+ }
+
+ @Override
+ public List<QueueTimeSpan> queryConsumeTimeSpan(final String topic, final String group) throws InterruptedException, MQBrokerException,
+ RemotingException, MQClientException {
+ return this.defaultMQAdminExtImpl.queryConsumeTimeSpan(topic, group);
+ }
+
+ @Override
+ public boolean cleanExpiredConsumerQueue(String cluster) throws RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, MQClientException, InterruptedException {
+ return defaultMQAdminExtImpl.cleanExpiredConsumerQueue(cluster);
+ }
+
+ @Override
+ public boolean cleanExpiredConsumerQueueByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, MQClientException, InterruptedException {
+ return defaultMQAdminExtImpl.cleanExpiredConsumerQueueByAddr(addr);
+ }
+
+ @Override
+ public boolean cleanUnusedTopic(String cluster) throws RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, MQClientException, InterruptedException {
+ return defaultMQAdminExtImpl.cleanUnusedTopicByAddr(cluster);
+ }
+
+ @Override
+ public boolean cleanUnusedTopicByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, MQClientException, InterruptedException {
+ return defaultMQAdminExtImpl.cleanUnusedTopicByAddr(addr);
+ }
+
+ @Override
+ public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack) throws RemotingException,
+ MQClientException, InterruptedException {
+ return defaultMQAdminExtImpl.getConsumerRunningInfo(consumerGroup, clientId, jstack);
+ }
+
+ @Override
+ public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId, String msgId)
+ throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+ return defaultMQAdminExtImpl.consumeMessageDirectly(consumerGroup, clientId, msgId);
+ }
+
+ @Override
+ public ConsumeMessageDirectlyResult consumeMessageDirectly(final String consumerGroup, final String clientId, final String topic, final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+ return defaultMQAdminExtImpl.consumeMessageDirectly(consumerGroup, clientId, topic, msgId);
+ }
+
+ @Override
+ public List<MessageTrack> messageTrackDetail(MessageExt msg) throws RemotingException, MQClientException, InterruptedException,
+ MQBrokerException {
+ return this.defaultMQAdminExtImpl.messageTrackDetail(msg);
+ }
+
+ @Override
+ public void cloneGroupOffset(String srcGroup, String destGroup, String topic, boolean isOffline) throws RemotingException,
+ MQClientException, InterruptedException, MQBrokerException {
+ this.defaultMQAdminExtImpl.cloneGroupOffset(srcGroup, destGroup, topic, isOffline);
+ }
+
+ @Override
+ public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName, String statsKey) throws RemotingConnectException,
+ RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+ return this.defaultMQAdminExtImpl.viewBrokerStatsData(brokerAddr, statsName, statsKey);
+ }
+
+ @Override
+ public Set<String> getClusterList(String topic) throws RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, MQClientException, InterruptedException {
+ return this.defaultMQAdminExtImpl.getClusterList(topic);
+ }
+
+ @Override
+ public ConsumeStatsList fetchConsumeStatsInBroker(final String brokerAddr, boolean isOrder, long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException,
+ RemotingTimeoutException, MQClientException, InterruptedException {
+ return this.defaultMQAdminExtImpl.fetchConsumeStatsInBroker(brokerAddr, isOrder, timeoutMillis);
+ }
+
+ @Override
+ public Set<String> getTopicClusterList(final String topic) throws InterruptedException, MQBrokerException, MQClientException, RemotingException {
+ return this.defaultMQAdminExtImpl.getTopicClusterList(topic);
+ }
+
+ @Override
+ public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
+ RemotingConnectException, MQBrokerException {
+ return this.defaultMQAdminExtImpl.getAllSubscriptionGroup(brokerAddr, timeoutMillis);
+ }
+
+ @Override
+ public TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
+ RemotingConnectException, MQBrokerException {
+ return this.defaultMQAdminExtImpl.getAllTopicGroup(brokerAddr, timeoutMillis);
+ }
+
+
+ /* (non-Javadoc)
+ * @see com.alibaba.rocketmq.client.MQAdmin#queryMessageByUniqKey(java.lang.String, java.lang.String)
+ */
+ @Override
+ public MessageExt viewMessage(String topic, String msgId)
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ return this.defaultMQAdminExtImpl.viewMessage(topic, msgId);
+ }
+
+ public String getAdminExtGroup() {
+ return adminExtGroup;
+ }
+
+ public void setAdminExtGroup(String adminExtGroup) {
+ this.adminExtGroup = adminExtGroup;
+ }
+
+ public String getCreateTopicKey() {
+ return createTopicKey;
+ }
+
+ public void setCreateTopicKey(String createTopicKey) {
+ this.createTopicKey = createTopicKey;
+ }
+
+ @Override
+ public void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq, long offset) throws RemotingException, InterruptedException, MQBrokerException {
+ this.defaultMQAdminExtImpl.updateConsumeOffset(brokerAddr, consumeGroup, mq, offset);
+ }
+
+ @Override
+ public void updateNameServerConfig(final Properties properties, final List<String> nameServers)
+ throws InterruptedException, RemotingConnectException,
+ UnsupportedEncodingException, MQBrokerException, RemotingTimeoutException,
+ MQClientException, RemotingSendRequestException {
+ this.defaultMQAdminExtImpl.updateNameServerConfig(properties, nameServers);
+ }
+
+ @Override
+ public Map<String, Properties> getNameServerConfig(final List<String> nameServers)
+ throws InterruptedException, RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException, MQClientException,
+ UnsupportedEncodingException {
+ return this.defaultMQAdminExtImpl.getNameServerConfig(nameServers);
+ }
+}