You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2017/12/13 08:28:21 UTC

[GitHub] vongosling closed pull request #146: [ROCKETMQ-265] fix consume queue?s data maybe repeat bug

vongosling closed pull request #146: [ROCKETMQ-265] fix consume queue?s data maybe repeat bug
URL: https://github.com/apache/rocketmq/pull/146
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 0bf0aa9a..4922e3d9 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -446,6 +446,13 @@ private boolean putMessagePositionInfo(final long offset, final int size, final
 
             if (cqOffset != 0) {
                 long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
+
+                if (expectLogicOffset < currentLogicOffset) {
+                    log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
+                        expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
+                    return true;
+                }
+
                 if (expectLogicOffset != currentLogicOffset) {
                     LOG_ERROR.warn(
                         "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
diff --git a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
index b03f2fce..b7d38f8c 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
@@ -17,22 +17,21 @@
 
 package org.apache.rocketmq.store;
 
-import org.apache.rocketmq.common.BrokerConfig;
-import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.message.MessageDecoder;
-import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.store.stats.BrokerStatsManager;
-import org.junit.Test;
-
 import java.io.File;
+import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.Map;
-
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import static org.assertj.core.api.Assertions.assertThat;
+import org.junit.Test;
 
 public class ConsumeQueueTest {
 
@@ -131,6 +130,65 @@ protected void putMsg(DefaultMessageStore master) throws Exception {
         }
     }
 
+    protected void deleteDirectory(String rootPath) {
+        File file = new File(rootPath);
+        deleteFile(file);
+    }
+
+    protected void deleteFile(File file) {
+        File[] subFiles = file.listFiles();
+        if (subFiles != null) {
+            for (File sub : subFiles) {
+                deleteFile(sub);
+            }
+        }
+
+        file.delete();
+    }
+
+    @Test
+    public void testPutMessagePositionInfo_buildCQRepeatedly() throws Exception {
+        DefaultMessageStore messageStore = null;
+        try {
+
+            messageStore = gen();
+
+            int totalMessages = 10;
+
+            for (int i = 0; i < totalMessages; i++) {
+                putMsg(messageStore);
+            }
+            Thread.sleep(5);
+
+            ConsumeQueue cq = messageStore.getConsumeQueueTable().get(topic).get(queueId);
+            Method method = cq.getClass().getDeclaredMethod("putMessagePositionInfo", long.class, int.class, long.class, long.class);
+
+            assertThat(method).isNotNull();
+
+            method.setAccessible(true);
+
+            SelectMappedBufferResult result = messageStore.getCommitLog().getData(0);
+            assertThat(result != null).isTrue();
+
+            DispatchRequest dispatchRequest = messageStore.getCommitLog().checkMessageAndReturnSize(result.getByteBuffer(), false, false);
+
+            assertThat(cq).isNotNull();
+
+            Object dispatchResult = method.invoke(cq, dispatchRequest.getCommitLogOffset(),
+                dispatchRequest.getMsgSize(), dispatchRequest.getTagsCode(), dispatchRequest.getConsumeQueueOffset());
+
+            assertThat(Boolean.parseBoolean(dispatchResult.toString())).isTrue();
+
+        } finally {
+            if (messageStore != null) {
+                messageStore.shutdown();
+                messageStore.destroy();
+            }
+            deleteDirectory(storePath);
+        }
+
+    }
+
     @Test
     public void testConsumeQueueWithExtendData() {
         DefaultMessageStore master = null;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services