You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/09/05 12:28:39 UTC
incubator-rocketmq git commit: [ROCKETMQ-265] fix consume queue's
data maybe repeat bug
Repository: incubator-rocketmq
Updated Branches:
refs/heads/develop 6a97d2884 -> 368e7c86a
[ROCKETMQ-265] fix consume queue's data maybe repeat bug
Author: 傅冲 <yu...@alibaba-inc.com>
Author: fuyou001 <fu...@gmail.com>
Closes #146 from fuyou001/ROCKETMQ-265.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/368e7c86
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/368e7c86
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/368e7c86
Branch: refs/heads/develop
Commit: 368e7c86a0b06099f336c81672112dcb5143cf9e
Parents: 6a97d28
Author: 傅冲 <yu...@alibaba-inc.com>
Authored: Tue Sep 5 20:28:23 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Sep 5 20:28:23 2017 +0800
----------------------------------------------------------------------
.../org/apache/rocketmq/store/ConsumeQueue.java | 7 ++
.../apache/rocketmq/store/ConsumeQueueTest.java | 74 +++++++++++++++++---
2 files changed, 73 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/368e7c86/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 0bf0aa9..4922e3d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -446,6 +446,13 @@ public class ConsumeQueue {
if (cqOffset != 0) {
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
+
+ if (expectLogicOffset < currentLogicOffset) {
+ log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
+ expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
+ return true;
+ }
+
if (expectLogicOffset != currentLogicOffset) {
LOG_ERROR.warn(
"[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/368e7c86/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
----------------------------------------------------------------------
diff --git a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
index b03f2fc..b7d38f8 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
@@ -17,22 +17,21 @@
package org.apache.rocketmq.store;
-import org.apache.rocketmq.common.BrokerConfig;
-import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.message.MessageDecoder;
-import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.store.stats.BrokerStatsManager;
-import org.junit.Test;
-
import java.io.File;
+import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.Map;
-
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
import static org.assertj.core.api.Assertions.assertThat;
+import org.junit.Test;
public class ConsumeQueueTest {
@@ -131,6 +130,65 @@ public class ConsumeQueueTest {
}
}
+ protected void deleteDirectory(String rootPath) {
+ File file = new File(rootPath);
+ deleteFile(file);
+ }
+
+ protected void deleteFile(File file) {
+ File[] subFiles = file.listFiles();
+ if (subFiles != null) {
+ for (File sub : subFiles) {
+ deleteFile(sub);
+ }
+ }
+
+ file.delete();
+ }
+
+ @Test
+ public void testPutMessagePositionInfo_buildCQRepeatedly() throws Exception {
+ DefaultMessageStore messageStore = null;
+ try {
+
+ messageStore = gen();
+
+ int totalMessages = 10;
+
+ for (int i = 0; i < totalMessages; i++) {
+ putMsg(messageStore);
+ }
+ Thread.sleep(5);
+
+ ConsumeQueue cq = messageStore.getConsumeQueueTable().get(topic).get(queueId);
+ Method method = cq.getClass().getDeclaredMethod("putMessagePositionInfo", long.class, int.class, long.class, long.class);
+
+ assertThat(method).isNotNull();
+
+ method.setAccessible(true);
+
+ SelectMappedBufferResult result = messageStore.getCommitLog().getData(0);
+ assertThat(result != null).isTrue();
+
+ DispatchRequest dispatchRequest = messageStore.getCommitLog().checkMessageAndReturnSize(result.getByteBuffer(), false, false);
+
+ assertThat(cq).isNotNull();
+
+ Object dispatchResult = method.invoke(cq, dispatchRequest.getCommitLogOffset(),
+ dispatchRequest.getMsgSize(), dispatchRequest.getTagsCode(), dispatchRequest.getConsumeQueueOffset());
+
+ assertThat(Boolean.parseBoolean(dispatchResult.toString())).isTrue();
+
+ } finally {
+ if (messageStore != null) {
+ messageStore.shutdown();
+ messageStore.destroy();
+ }
+ deleteDirectory(storePath);
+ }
+
+ }
+
@Test
public void testConsumeQueueWithExtendData() {
DefaultMessageStore master = null;