You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/09/05 12:28:39 UTC

incubator-rocketmq git commit: [ROCKETMQ-265] fix consume queue's data maybe repeat bug

Repository: incubator-rocketmq
Updated Branches:
  refs/heads/develop 6a97d2884 -> 368e7c86a


[ROCKETMQ-265] fix consume queue's data maybe repeat bug

Author: 傅冲 <yu...@alibaba-inc.com>
Author: fuyou001 <fu...@gmail.com>

Closes #146 from fuyou001/ROCKETMQ-265.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/368e7c86
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/368e7c86
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/368e7c86

Branch: refs/heads/develop
Commit: 368e7c86a0b06099f336c81672112dcb5143cf9e
Parents: 6a97d28
Author: 傅冲 <yu...@alibaba-inc.com>
Authored: Tue Sep 5 20:28:23 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Sep 5 20:28:23 2017 +0800

----------------------------------------------------------------------
 .../org/apache/rocketmq/store/ConsumeQueue.java |  7 ++
 .../apache/rocketmq/store/ConsumeQueueTest.java | 74 +++++++++++++++++---
 2 files changed, 73 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/368e7c86/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 0bf0aa9..4922e3d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -446,6 +446,13 @@ public class ConsumeQueue {
 
             if (cqOffset != 0) {
                 long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
+
+                if (expectLogicOffset < currentLogicOffset) {
+                    log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
+                        expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
+                    return true;
+                }
+
                 if (expectLogicOffset != currentLogicOffset) {
                     LOG_ERROR.warn(
                         "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/368e7c86/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
----------------------------------------------------------------------
diff --git a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
index b03f2fc..b7d38f8 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
@@ -17,22 +17,21 @@
 
 package org.apache.rocketmq.store;
 
-import org.apache.rocketmq.common.BrokerConfig;
-import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.message.MessageDecoder;
-import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.store.stats.BrokerStatsManager;
-import org.junit.Test;
-
 import java.io.File;
+import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.Map;
-
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import static org.assertj.core.api.Assertions.assertThat;
+import org.junit.Test;
 
 public class ConsumeQueueTest {
 
@@ -131,6 +130,65 @@ public class ConsumeQueueTest {
         }
     }
 
+    protected void deleteDirectory(String rootPath) {
+        File file = new File(rootPath);
+        deleteFile(file);
+    }
+
+    protected void deleteFile(File file) {
+        File[] subFiles = file.listFiles();
+        if (subFiles != null) {
+            for (File sub : subFiles) {
+                deleteFile(sub);
+            }
+        }
+
+        file.delete();
+    }
+
+    @Test
+    public void testPutMessagePositionInfo_buildCQRepeatedly() throws Exception {
+        DefaultMessageStore messageStore = null;
+        try {
+
+            messageStore = gen();
+
+            int totalMessages = 10;
+
+            for (int i = 0; i < totalMessages; i++) {
+                putMsg(messageStore);
+            }
+            Thread.sleep(5);
+
+            ConsumeQueue cq = messageStore.getConsumeQueueTable().get(topic).get(queueId);
+            Method method = cq.getClass().getDeclaredMethod("putMessagePositionInfo", long.class, int.class, long.class, long.class);
+
+            assertThat(method).isNotNull();
+
+            method.setAccessible(true);
+
+            SelectMappedBufferResult result = messageStore.getCommitLog().getData(0);
+            assertThat(result != null).isTrue();
+
+            DispatchRequest dispatchRequest = messageStore.getCommitLog().checkMessageAndReturnSize(result.getByteBuffer(), false, false);
+
+            assertThat(cq).isNotNull();
+
+            Object dispatchResult = method.invoke(cq, dispatchRequest.getCommitLogOffset(),
+                dispatchRequest.getMsgSize(), dispatchRequest.getTagsCode(), dispatchRequest.getConsumeQueueOffset());
+
+            assertThat(Boolean.parseBoolean(dispatchResult.toString())).isTrue();
+
+        } finally {
+            if (messageStore != null) {
+                messageStore.shutdown();
+                messageStore.destroy();
+            }
+            deleteDirectory(storePath);
+        }
+
+    }
+
     @Test
     public void testConsumeQueueWithExtendData() {
         DefaultMessageStore master = null;