You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2017/06/08 07:54:26 UTC
[05/50] [abbrv] incubator-rocketmq git commit: [ROCKETMQ-121]Support
message filtering based on SQL92 closes apache/incubator-rocketmq#82
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java b/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java
index 33529da..3d33eaf 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java
@@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.store;
+import java.util.Map;
+
public class DispatchRequest {
private final String topic;
private final int queueId;
@@ -30,6 +32,8 @@ public class DispatchRequest {
private final int sysFlag;
private final long preparedTransactionOffset;
+ private final Map<String, String> propertiesMap;
+ private byte[] bitMap;
public DispatchRequest(
final String topic,
@@ -42,7 +46,8 @@ public class DispatchRequest {
final String keys,
final String uniqKey,
final int sysFlag,
- final long preparedTransactionOffset
+ final long preparedTransactionOffset,
+ final Map<String, String> propertiesMap
) {
this.topic = topic;
this.queueId = queueId;
@@ -57,6 +62,7 @@ public class DispatchRequest {
this.sysFlag = sysFlag;
this.preparedTransactionOffset = preparedTransactionOffset;
this.success = true;
+ this.propertiesMap = propertiesMap;
}
public DispatchRequest(int size) {
@@ -81,6 +87,7 @@ public class DispatchRequest {
this.sysFlag = 0;
this.preparedTransactionOffset = 0;
this.success = false;
+ this.propertiesMap = null;
}
public DispatchRequest(int size, boolean success) {
@@ -105,6 +112,7 @@ public class DispatchRequest {
this.sysFlag = 0;
this.preparedTransactionOffset = 0;
this.success = success;
+ this.propertiesMap = null;
}
public String getTopic() {
@@ -155,4 +163,15 @@ public class DispatchRequest {
return uniqKey;
}
+ public Map<String, String> getPropertiesMap() {
+ return propertiesMap;
+ }
+
+ public byte[] getBitMap() {
+ return bitMap;
+ }
+
+ public void setBitMap(byte[] bitMap) {
+ this.bitMap = bitMap;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
index 550e578..a9a00a8 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
@@ -245,6 +245,31 @@ public class MappedFile extends ReferenceResource {
}
/**
+ * Content of data from offset to offset + length will be wrote to file.
+ *
+ * @param data
+ * @param offset The offset of the subarray to be used.
+ * @param length The length of the subarray to be used.
+ * @return
+ */
+ public boolean appendMessage(final byte[] data, final int offset, final int length) {
+ int currentPos = this.wrotePosition.get();
+
+ if ((currentPos + length) <= this.fileSize) {
+ try {
+ this.fileChannel.position(currentPos);
+ this.fileChannel.write(ByteBuffer.wrap(data, offset, length));
+ } catch (Throwable e) {
+ log.error("Error occurred when append message to mappedFile.", e);
+ }
+ this.wrotePosition.addAndGet(length);
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
* @param flushLeastPages
* @return The current flushed position
*/
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
index 5c6c62c..a8fa364 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
@@ -121,7 +121,7 @@ public class MappedFileQueue {
this.deleteExpiredFile(willRemoveFiles);
}
- private void deleteExpiredFile(List<MappedFile> files) {
+ void deleteExpiredFile(List<MappedFile> files) {
if (!files.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java b/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java
index 2523c1a..dee1bc7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageArrivingListener.java
@@ -17,6 +17,9 @@
package org.apache.rocketmq.store;
+import java.util.Map;
+
public interface MessageArrivingListener {
- void arriving(String topic, int queueId, long logicOffset, long tagsCode);
+ void arriving(String topic, int queueId, long logicOffset, long tagsCode,
+ long msgStoreTime, byte[] filterBitMap, Map<String, String> properties);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java b/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java
index 859ce99..6b34758 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageFilter.java
@@ -16,8 +16,30 @@
*/
package org.apache.rocketmq.store;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import java.nio.ByteBuffer;
+import java.util.Map;
public interface MessageFilter {
- boolean isMessageMatched(final SubscriptionData subscriptionData, final Long tagsCode);
+ /**
+ * match by tags code or filter bit map which is calculated when message received
+ * and stored in consume queue ext.
+ *
+ * @param tagsCode tagsCode
+ * @param cqExtUnit extend unit of consume queue
+ * @return
+ */
+ boolean isMatchedByConsumeQueue(final Long tagsCode,
+ final ConsumeQueueExt.CqExtUnit cqExtUnit);
+
+ /**
+ * match by message content which are stored in commit log.
+ * <br>{@code msgBuffer} and {@code properties} are not all null.If invoked in store,
+ * {@code properties} is null;If invoked in {@code PullRequestHoldService}, {@code msgBuffer} is null.
+ *
+ * @param msgBuffer message buffer in commit log, may be null if not invoked in store.
+ * @param properties message properties, should decode from buffer if null by yourself.
+ * @return
+ */
+ boolean isMatchedByCommitLog(final ByteBuffer msgBuffer,
+ final Map<String, String> properties);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 65c546b..e841c08 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -17,10 +17,10 @@
package org.apache.rocketmq.store;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.Set;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
public interface MessageStore {
@@ -37,7 +37,7 @@ public interface MessageStore {
PutMessageResult putMessages(final MessageExtBatch messageExtBatch);
GetMessageResult getMessage(final String group, final String topic, final int queueId,
- final long offset, final int maxMsgNums, final SubscriptionData subscriptionData);
+ final long offset, final int maxMsgNums, final MessageFilter messageFilter);
long getMaxOffsetInQuque(final String topic, final int queueId);
@@ -105,4 +105,8 @@ public interface MessageStore {
long lockTimeMills();
boolean isTransientStorePoolDeficient();
+
+ LinkedList<CommitLogDispatcher> getDispatcherList();
+
+ ConsumeQueue getConsumeQueue(String topic, int queueId);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 7ae2ab5..29f800c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -34,6 +34,13 @@ public class MessageStoreConfig {
private int mapedFileSizeCommitLog = 1024 * 1024 * 1024;
// ConsumeQueue file size,default is 30W
private int mapedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE;
+ // enable consume queue ext
+ private boolean enableConsumeQueueExt = false;
+ // ConsumeQueue extend file size, 48M
+ private int mappedFileSizeConsumeQueueExt = 48 * 1024 * 1024;
+ // Bit count of filter bit map.
+ // this will be set by pipe of calculate filter bit map.
+ private int bitMapLengthConsumeQueueExt = 64;
// CommitLog flush interval
// flush data to disk
@@ -191,6 +198,30 @@ public class MessageStoreConfig {
this.mapedFileSizeConsumeQueue = mapedFileSizeConsumeQueue;
}
+ public boolean isEnableConsumeQueueExt() {
+ return enableConsumeQueueExt;
+ }
+
+ public void setEnableConsumeQueueExt(boolean enableConsumeQueueExt) {
+ this.enableConsumeQueueExt = enableConsumeQueueExt;
+ }
+
+ public int getMappedFileSizeConsumeQueueExt() {
+ return mappedFileSizeConsumeQueueExt;
+ }
+
+ public void setMappedFileSizeConsumeQueueExt(int mappedFileSizeConsumeQueueExt) {
+ this.mappedFileSizeConsumeQueueExt = mappedFileSizeConsumeQueueExt;
+ }
+
+ public int getBitMapLengthConsumeQueueExt() {
+ return bitMapLengthConsumeQueueExt;
+ }
+
+ public void setBitMapLengthConsumeQueueExt(int bitMapLengthConsumeQueueExt) {
+ this.bitMapLengthConsumeQueueExt = bitMapLengthConsumeQueueExt;
+ }
+
public int getFlushIntervalCommitLog() {
return flushIntervalCommitLog;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java b/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java
index aebebaf..ef1d670 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java
@@ -24,6 +24,10 @@ public class StorePathConfigHelper {
return rootDir + File.separator + "consumequeue";
}
+ public static String getStorePathConsumeQueueExt(final String rootDir) {
+ return rootDir + File.separator + "consumequeue_ext";
+ }
+
public static String getStorePathIndex(final String rootDir) {
return rootDir + File.separator + "index";
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
index e08a6f5..d45b994 100644
--- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
@@ -32,6 +32,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.running.RunningStats;
import org.apache.rocketmq.store.ConsumeQueue;
+import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
@@ -248,11 +249,24 @@ public class ScheduleMessageService extends ConfigManager {
try {
long nextOffset = offset;
int i = 0;
+ ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
+ if (cq.isExtAddr(tagsCode)) {
+ if (cq.getExt(tagsCode, cqExtUnit)) {
+ tagsCode = cqExtUnit.getTagsCode();
+ } else {
+ //can't find ext content.So re compute tags code.
+ log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
+ tagsCode, offsetPy, sizePy);
+ long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
+ tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
+ }
+ }
+
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueExtTest.java
----------------------------------------------------------------------
diff --git a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueExtTest.java b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueExtTest.java
new file mode 100644
index 0000000..5dbc584
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueExtTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ConsumeQueueExtTest {
+
+ private static final String topic = "abc";
+ private static final int queueId = 0;
+ private static final String storePath = "." + File.separator + "unit_test_store";
+ private static final int bitMapLength = 64;
+ private static final int unitSizeWithBitMap = ConsumeQueueExt.CqExtUnit.MIN_EXT_UNIT_SIZE + bitMapLength / Byte.SIZE;
+ private static final int cqExtFileSize = 10 * unitSizeWithBitMap;
+ private static final int unitCount = 20;
+
+
+ protected ConsumeQueueExt genExt() {
+ return new ConsumeQueueExt(
+ topic, queueId, storePath, cqExtFileSize, bitMapLength
+ );
+ }
+
+ protected byte[] genBitMap(int bitMapLength) {
+ byte[] bytes = new byte[bitMapLength / Byte.SIZE];
+
+ Random random = new Random(System.currentTimeMillis());
+ random.nextBytes(bytes);
+
+ return bytes;
+ }
+
+ protected ConsumeQueueExt.CqExtUnit genUnit(boolean hasBitMap) {
+ ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
+
+ cqExtUnit.setTagsCode(Math.abs((new Random(System.currentTimeMillis())).nextInt()));
+ cqExtUnit.setMsgStoreTime(System.currentTimeMillis());
+ if (hasBitMap) {
+ cqExtUnit.setFilterBitMap(genBitMap(bitMapLength));
+ }
+
+ return cqExtUnit;
+ }
+
+ protected void deleteDirectory(String rootPath) {
+ File file = new File(rootPath);
+ deleteFile(file);
+ }
+
+ protected void deleteFile(File file) {
+ File[] subFiles = file.listFiles();
+ if (subFiles != null) {
+ for (File sub : subFiles) {
+ deleteFile(sub);
+ }
+ }
+
+ file.delete();
+ }
+
+ protected void putSth(ConsumeQueueExt consumeQueueExt, boolean getAfterPut,
+ boolean unitSameSize, int unitCount) {
+ for (int i = 0; i < unitCount; i++) {
+ ConsumeQueueExt.CqExtUnit putUnit =
+ unitSameSize ? genUnit(true) : genUnit(i % 2 == 0);
+
+ long addr = consumeQueueExt.put(putUnit);
+ assertThat(addr).isLessThan(0);
+
+ if (getAfterPut) {
+ ConsumeQueueExt.CqExtUnit getUnit = consumeQueueExt.get(addr);
+
+ assertThat(getUnit).isNotNull();
+ assertThat(putUnit).isEqualTo(getUnit);
+ }
+
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ assertThat(false).isTrue();
+ }
+ }
+ }
+
+ @Test
+ public void testPut() {
+ ConsumeQueueExt consumeQueueExt = genExt();
+
+ try {
+ putSth(consumeQueueExt, true, false, unitCount);
+ } finally {
+ consumeQueueExt.destroy();
+ deleteDirectory(storePath);
+ }
+ }
+
+ @Test
+ public void testGet() {
+ ConsumeQueueExt consumeQueueExt = genExt();
+
+ putSth(consumeQueueExt, false, false, unitCount);
+
+ try {
+ // from start.
+ long addr = consumeQueueExt.decorate(0);
+
+ ConsumeQueueExt.CqExtUnit unit = new ConsumeQueueExt.CqExtUnit();
+ while (true) {
+ boolean ret = consumeQueueExt.get(addr, unit);
+
+ if (!ret) {
+ break;
+ }
+
+ assertThat(unit.getSize()).isGreaterThanOrEqualTo(ConsumeQueueExt.CqExtUnit.MIN_EXT_UNIT_SIZE);
+
+ addr += unit.getSize();
+ }
+ } finally {
+ consumeQueueExt.destroy();
+ deleteDirectory(storePath);
+ }
+ }
+
+ @Test
+ public void testGet_invalidAddress() {
+ ConsumeQueueExt consumeQueueExt = genExt();
+
+ putSth(consumeQueueExt, false, true, unitCount);
+
+ try {
+ ConsumeQueueExt.CqExtUnit unit = consumeQueueExt.get(0);
+
+ assertThat(unit).isNull();
+
+ long addr = (cqExtFileSize / unitSizeWithBitMap) * unitSizeWithBitMap;
+ addr += unitSizeWithBitMap;
+
+ unit = consumeQueueExt.get(addr);
+ assertThat(unit).isNull();
+ } finally {
+ consumeQueueExt.destroy();
+ deleteDirectory(storePath);
+ }
+ }
+
+ @Test
+ public void testRecovery() {
+ ConsumeQueueExt putCqExt = genExt();
+
+ putSth(putCqExt, false, true, unitCount);
+
+ ConsumeQueueExt loadCqExt = genExt();
+
+ loadCqExt.load();
+
+ loadCqExt.recover();
+
+ try {
+ assertThat(loadCqExt.getMinAddress()).isEqualTo(Long.MIN_VALUE);
+
+ // same unit size.
+ int countPerFile = (cqExtFileSize - ConsumeQueueExt.END_BLANK_DATA_LENGTH) / unitSizeWithBitMap;
+
+ int lastFileUnitCount = unitCount % countPerFile;
+
+ int fileCount = unitCount / countPerFile + 1;
+ if (lastFileUnitCount == 0) {
+ fileCount -= 1;
+ }
+
+ if (lastFileUnitCount == 0) {
+ assertThat(loadCqExt.unDecorate(loadCqExt.getMaxAddress()) % cqExtFileSize).isEqualTo(0);
+ } else {
+ assertThat(loadCqExt.unDecorate(loadCqExt.getMaxAddress()))
+ .isEqualTo(lastFileUnitCount * unitSizeWithBitMap + (fileCount - 1) * cqExtFileSize);
+ }
+ } finally {
+ putCqExt.destroy();
+ loadCqExt.destroy();
+ deleteDirectory(storePath);
+ }
+ }
+
+ @Test
+ public void testTruncateByMinOffset() {
+ ConsumeQueueExt consumeQueueExt = genExt();
+
+ putSth(consumeQueueExt, false, true, unitCount * 2);
+
+ try {
+ // truncate first one file.
+ long address = consumeQueueExt.decorate((long) (cqExtFileSize * 1.5));
+
+ long expectMinAddress = consumeQueueExt.decorate(cqExtFileSize);
+
+ consumeQueueExt.truncateByMinAddress(address);
+
+ long minAddress = consumeQueueExt.getMinAddress();
+
+ assertThat(expectMinAddress).isEqualTo(minAddress);
+ } finally {
+ consumeQueueExt.destroy();
+ deleteDirectory(storePath);
+ }
+ }
+
+ @Test
+ public void testTruncateByMaxOffset() {
+ ConsumeQueueExt consumeQueueExt = genExt();
+
+ putSth(consumeQueueExt, false, true, unitCount * 2);
+
+ try {
+ // truncate, only first 3 files exist.
+ long address = consumeQueueExt.decorate(cqExtFileSize * 2 + unitSizeWithBitMap);
+
+ long expectMaxAddress = address + unitSizeWithBitMap;
+
+ consumeQueueExt.truncateByMaxAddress(address);
+
+ long maxAddress = consumeQueueExt.getMaxAddress();
+
+ assertThat(expectMaxAddress).isEqualTo(maxAddress);
+ } finally {
+ consumeQueueExt.destroy();
+ deleteDirectory(storePath);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
----------------------------------------------------------------------
diff --git a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
new file mode 100644
index 0000000..9c42fb9
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store;
+
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ConsumeQueueTest {
+
+ private static final String msg = "Once, there was a chance for me!";
+ private static final byte[] msgBody = msg.getBytes();
+
+ private static final String topic = "abc";
+ private static final int queueId = 0;
+ private static final String storePath = "." + File.separator + "unit_test_store";
+ private static final int commitLogFileSize = 1024 * 8;
+ private static final int cqFileSize = 10 * 20;
+ private static final int cqExtFileSize = 10 * (ConsumeQueueExt.CqExtUnit.MIN_EXT_UNIT_SIZE + 64);
+
+ private static SocketAddress BornHost;
+
+ private static SocketAddress StoreHost;
+
+ static {
+ try {
+ StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ }
+ try {
+ BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public MessageExtBrokerInner buildMessage() {
+ MessageExtBrokerInner msg = new MessageExtBrokerInner();
+ msg.setTopic(topic);
+ msg.setTags("TAG1");
+ msg.setKeys("Hello");
+ msg.setBody(msgBody);
+ msg.setKeys(String.valueOf(System.currentTimeMillis()));
+ msg.setQueueId(queueId);
+ msg.setSysFlag(0);
+ msg.setBornTimestamp(System.currentTimeMillis());
+ msg.setStoreHost(StoreHost);
+ msg.setBornHost(BornHost);
+ for (int i = 0; i < 1; i++) {
+ msg.putUserProperty(String.valueOf(i), "imagoodperson" + i);
+ }
+ msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
+
+ return msg;
+ }
+
+
+ public MessageStoreConfig buildStoreConfig(int commitLogFileSize, int cqFileSize,
+ boolean enableCqExt, int cqExtFileSize) {
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ messageStoreConfig.setMapedFileSizeCommitLog(commitLogFileSize);
+ messageStoreConfig.setMapedFileSizeConsumeQueue(cqFileSize);
+ messageStoreConfig.setMappedFileSizeConsumeQueueExt(cqExtFileSize);
+ messageStoreConfig.setMessageIndexEnable(false);
+ messageStoreConfig.setEnableConsumeQueueExt(enableCqExt);
+
+ messageStoreConfig.setStorePathRootDir(storePath);
+ messageStoreConfig.setStorePathCommitLog(storePath + File.separator + "commitlog");
+
+ return messageStoreConfig;
+ }
+
+ protected DefaultMessageStore gen() throws Exception {
+ MessageStoreConfig messageStoreConfig = buildStoreConfig(
+ commitLogFileSize, cqFileSize, true, cqExtFileSize
+ );
+
+ BrokerConfig brokerConfig = new BrokerConfig();
+
+ DefaultMessageStore master = new DefaultMessageStore(
+ messageStoreConfig,
+ new BrokerStatsManager(brokerConfig.getBrokerClusterName()),
+ new MessageArrivingListener() {
+ @Override
+ public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
+ long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
+ }
+ }
+ , brokerConfig);
+
+ assertThat(master.load()).isTrue();
+
+ master.start();
+
+ return master;
+ }
+
+ protected void putMsg(DefaultMessageStore master) throws Exception {
+ long totalMsgs = 200;
+
+ for (long i = 0; i < totalMsgs; i++) {
+ master.putMessage(buildMessage());
+ }
+ }
+
+ protected void deleteDirectory(String rootPath) {
+ File file = new File(rootPath);
+ deleteFile(file);
+ }
+
+ protected void deleteFile(File file) {
+ File[] subFiles = file.listFiles();
+ if (subFiles != null) {
+ for (File sub : subFiles) {
+ deleteFile(sub);
+ }
+ }
+
+ file.delete();
+ }
+
+ @Test
+ public void testConsumeQueueWithExtendData() {
+ DefaultMessageStore master = null;
+ try {
+ master = gen();
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertThat(Boolean.FALSE).isTrue();
+ }
+
+ master.getDispatcherList().addFirst(new CommitLogDispatcher() {
+
+ @Override
+ public void dispatch(DispatchRequest request) {
+ runCount++;
+ }
+
+ private int runCount = 0;
+ });
+
+ try {
+ try {
+ putMsg(master);
+ // wait build consume queue
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertThat(Boolean.FALSE).isTrue();
+ }
+
+ ConsumeQueue cq = master.getConsumeQueueTable().get(topic).get(queueId);
+
+ assertThat(cq).isNotNull();
+
+ long index = 0;
+
+ while (index < cq.getMaxOffsetInQueue()) {
+ SelectMappedBufferResult bufferResult = cq.getIndexBuffer(index);
+
+ assertThat(bufferResult).isNotNull();
+
+ ByteBuffer buffer = bufferResult.getByteBuffer();
+
+ assertThat(buffer).isNotNull();
+ try {
+ ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
+ for (int i = 0; i < bufferResult.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
+ long phyOffset = buffer.getLong();
+ int size = buffer.getInt();
+ long tagsCode = buffer.getLong();
+
+ assertThat(phyOffset).isGreaterThanOrEqualTo(0);
+ assertThat(size).isGreaterThan(0);
+ assertThat(tagsCode).isLessThan(0);
+
+ boolean ret = cq.getExt(tagsCode, cqExtUnit);
+
+ assertThat(ret).isTrue();
+ assertThat(cqExtUnit).isNotNull();
+ assertThat(cqExtUnit.getSize()).isGreaterThan((short) 0);
+ assertThat(cqExtUnit.getMsgStoreTime()).isGreaterThan(0);
+ assertThat(cqExtUnit.getTagsCode()).isGreaterThan(0);
+ }
+
+ } finally {
+ bufferResult.release();
+ }
+
+ index += cqFileSize / ConsumeQueue.CQ_STORE_UNIT_SIZE;
+ }
+ } finally {
+ master.shutdown();
+ master.destroy();
+ deleteDirectory(storePath);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
----------------------------------------------------------------------
diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index 5c9c46f..75f1de9 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.store;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.store.config.FlushDiskType;
@@ -124,7 +125,8 @@ public class DefaultMessageStoreTest {
private class MyMessageArrivingListener implements MessageArrivingListener {
@Override
- public void arriving(String topic, int queueId, long logicOffset, long tagsCode) {
+ public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
+ byte[] filterBitMap, Map<String, String> properties) {
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index 19bff89..409ea33 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -41,6 +41,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
@@ -469,4 +470,12 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
UnsupportedEncodingException {
return this.defaultMQAdminExtImpl.getNameServerConfig(nameServers);
}
+
+ @Override
+ public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String topic, int queueId, long index, int count, String consumerGroup)
+ throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
+ return this.defaultMQAdminExtImpl.queryConsumeQueue(
+ brokerAddr, topic, queueId, index, count, consumerGroup
+ );
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index a31b69d..157ae21 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -63,6 +63,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
@@ -955,4 +956,11 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
return this.mqClientInstance.getMQClientAPIImpl().getNameServerConfig(nameServers, timeoutMillis);
}
+ @Override
+ public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String topic, int queueId, long index, int count, String consumerGroup)
+ throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
+ return this.mqClientInstance.getMQClientAPIImpl().queryConsumeQueue(
+ brokerAddr, topic, queueId, index, count, consumerGroup, timeoutMillis
+ );
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 493cf54..82add92 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -39,6 +39,7 @@ import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
@@ -241,4 +242,25 @@ public interface MQAdminExt extends MQAdmin {
Map<String, Properties> getNameServerConfig(final List<String> nameServers) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
MQClientException, UnsupportedEncodingException;
+
+ /**
+ * query consume queue data
+ *
+ * @param brokerAddr broker ip address
+ * @param topic topic
+ * @param queueId id of queue
+ * @param index start offset
+ * @param count how many
+ * @param consumerGroup group
+ * @return
+ * @throws InterruptedException
+ * @throws RemotingTimeoutException
+ * @throws RemotingSendRequestException
+ * @throws RemotingConnectException
+ * @throws MQClientException
+ */
+ QueryConsumeQueueResponseBody queryConsumeQueue(final String brokerAddr,
+ final String topic, final int queueId,
+ final long index, final int count, final String consumerGroup)
+ throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index 9bd37e8..6398291 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -59,6 +59,7 @@ import org.apache.rocketmq.tools.command.namesrv.UpdateNamesrvConfigCommand;
import org.apache.rocketmq.tools.command.namesrv.WipeWritePermSubCommand;
import org.apache.rocketmq.tools.command.offset.CloneGroupOffsetCommand;
import org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeCommand;
+import org.apache.rocketmq.tools.command.queue.QueryConsumeQueueCommand;
import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand;
import org.apache.rocketmq.tools.command.topic.AllocateMQSubCommand;
import org.apache.rocketmq.tools.command.topic.DeleteTopicSubCommand;
@@ -189,6 +190,8 @@ public class MQAdminStartup {
initCommand(new GetNamesrvConfigCommand());
initCommand(new UpdateNamesrvConfigCommand());
initCommand(new GetBrokerConfigCommand());
+
+ initCommand(new QueryConsumeQueueCommand());
}
private static void initLogback() throws JoranException {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/58f1574b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/QueryConsumeQueueCommand.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/queue/QueryConsumeQueueCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/QueryConsumeQueueCommand.java
new file mode 100644
index 0000000..611addd
--- /dev/null
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/queue/QueryConsumeQueueCommand.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tools.command.queue;
+
+import com.alibaba.fastjson.JSON;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.common.protocol.body.ConsumeQueueData;
+import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.SubCommand;
+
+public class QueryConsumeQueueCommand implements SubCommand {
+
+ public static void main(String[] args) {
+ QueryConsumeQueueCommand cmd = new QueryConsumeQueueCommand();
+
+ Options options = ServerUtil.buildCommandlineOptions(new Options());
+ String[] subargs = new String[]{"-t TopicTest", "-q 0", "-i 6447", "-b 100.81.165.119:10911"};
+ final CommandLine commandLine =
+ ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options),
+ new PosixParser());
+ cmd.execute(commandLine, options, null);
+ }
+
+ @Override
+ public String commandName() {
+ return "queryCq";
+ }
+
+ @Override
+ public String commandDesc() {
+ return "Query cq command.";
+ }
+
+ @Override
+ public Options buildCommandlineOptions(Options options) {
+ Option opt = new Option("t", "topic", true, "topic name");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("q", "queue", true, "queue num, ie. 1");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("i", "index", true, "start queue index.");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("c", "count", true, "how many.");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("b", "broker", true, "broker addr.");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("g", "consumer", true, "consumer group.");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+
+ @Override
+ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+ try {
+ defaultMQAdminExt.start();
+
+ String topic = commandLine.getOptionValue("t").trim();
+ int queueId = Integer.valueOf(commandLine.getOptionValue("q").trim());
+ long index = Long.valueOf(commandLine.getOptionValue("i").trim());
+ int count = Integer.valueOf(commandLine.getOptionValue("c", "10").trim());
+ String broker = null;
+ if (commandLine.hasOption("b")) {
+ broker = commandLine.getOptionValue("b").trim();
+ }
+ String consumerGroup = null;
+ if (commandLine.hasOption("g")) {
+ consumerGroup = commandLine.getOptionValue("g").trim();
+ }
+
+ if (broker == null || broker == "") {
+ TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+
+ if (topicRouteData == null || topicRouteData.getBrokerDatas() == null
+ || topicRouteData.getBrokerDatas().isEmpty()) {
+ throw new Exception("No topic route data!");
+ }
+
+ broker = topicRouteData.getBrokerDatas().get(0).getBrokerAddrs().get(0L);
+ }
+
+ QueryConsumeQueueResponseBody queryConsumeQueueResponseBody = defaultMQAdminExt.queryConsumeQueue(
+ broker, topic, queueId, index, count, consumerGroup
+ );
+
+ if (queryConsumeQueueResponseBody.getSubscriptionData() != null) {
+ System.out.printf("Subscription data: \n%s\n", JSON.toJSONString(queryConsumeQueueResponseBody.getSubscriptionData(), true));
+ System.out.print("======================================\n");
+ }
+
+ if (queryConsumeQueueResponseBody.getFilterData() != null) {
+ System.out.printf("Filter data: \n%s\n", queryConsumeQueueResponseBody.getFilterData());
+ System.out.print("======================================\n");
+ }
+
+ System.out.printf("Queue data: \nmax: %d, min: %d\n", queryConsumeQueueResponseBody.getMaxQueueIndex(),
+ queryConsumeQueueResponseBody.getMinQueueIndex());
+ System.out.print("======================================\n");
+
+ if (queryConsumeQueueResponseBody.getQueueData() != null) {
+
+ long i = index;
+ for (ConsumeQueueData queueData : queryConsumeQueueResponseBody.getQueueData()) {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append("idx: " + i + "\n");
+
+ stringBuilder.append(queueData.toString() + "\n");
+
+ stringBuilder.append("======================================\n");
+
+ System.out.print(stringBuilder.toString());
+ i++;
+ }
+
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+}