You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2017/11/27 06:07:38 UTC

[GitHub] fuyou001 closed pull request #193: [ROCKETMQ-314] msg send back must sync change process queue msg size

fuyou001 closed pull request #193:  [ROCKETMQ-314] msg send back must sync change process queue msg size 
URL: https://github.com/apache/rocketmq/pull/193
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
index e0a3b699..94ebe4f1 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -54,7 +54,9 @@ public static void main(String[] args) {
 
     public static BrokerController start(BrokerController controller) {
         try {
+
             controller.start();
+
             String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
                 + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
 
@@ -242,7 +244,7 @@ private static void properties2SystemEnv(Properties properties) {
         System.setProperty("rocketmq.namesrv.domain.subgroup", rmqAddressServerSubGroup);
     }
 
-    public static Options buildCommandlineOptions(final Options options) {
+    private static Options buildCommandlineOptions(final Options options) {
         Option opt = new Option("c", "configFile", true, "Broker config properties file");
         opt.setRequired(false);
         options.addOption(opt);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java
index 2f94de20..64c28ece 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java
@@ -70,7 +70,7 @@ public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit
         // by tags code.
         if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
 
-            if (tagsCode == null || tagsCode < 0L) {
+            if (tagsCode == null) {
                 return true;
             }
 
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java
index a5ad3ac3..c8da08d2 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java
@@ -25,6 +25,8 @@
 
 public class BrokerStartupTest {
 
+    private String storePathRootDir = ".";
+
     @Test
     public void testProperties2SystemEnv() throws NoSuchMethodException, InvocationTargetException,
         IllegalAccessException {
@@ -36,5 +38,4 @@ public void testProperties2SystemEnv() throws NoSuchMethodException, InvocationT
         method.invoke(null, properties);
         Assert.assertEquals("value", System.getProperty("rocketmq.namesrv.domain"));
     }
-
 }
\ No newline at end of file
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
index 7978942e..e544d90a 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
@@ -24,12 +24,14 @@
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.store.CommitLogDispatcher;
+import org.apache.rocketmq.store.ConsumeQueueExt;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.DispatchRequest;
 import org.apache.rocketmq.store.GetMessageResult;
 import org.apache.rocketmq.store.GetMessageStatus;
 import org.apache.rocketmq.store.MessageArrivingListener;
 import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.MessageFilter;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
@@ -77,24 +79,17 @@
         try {
             StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
         } catch (UnknownHostException e) {
-            e.printStackTrace();
         }
         try {
             BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
         } catch (UnknownHostException e) {
-            e.printStackTrace();
         }
     }
 
     @Before
-    public void init() {
+    public void init() throws Exception {
         filterManager = ConsumerFilterManagerTest.gen(topicCount, msgPerTopic);
-        try {
-            master = gen(filterManager);
-        } catch (Exception e) {
-            e.printStackTrace();
-            assertThat(true).isFalse();
-        }
+        master = gen(filterManager);
     }
 
     @After
@@ -107,7 +102,7 @@ public void destroy() {
     public MessageExtBrokerInner buildMessage() {
         MessageExtBrokerInner msg = new MessageExtBrokerInner();
         msg.setTopic(topic);
-        msg.setTags("TAG1");
+        msg.setTags(System.currentTimeMillis() + "TAG");
         msg.setKeys("Hello");
         msg.setBody(msgBody);
         msg.setKeys(String.valueOf(System.currentTimeMillis()));
@@ -125,7 +120,7 @@ public MessageExtBrokerInner buildMessage() {
     }
 
     public MessageStoreConfig buildStoreConfig(int commitLogFileSize, int cqFileSize,
-        boolean enableCqExt, int cqExtFileSize) {
+                                               boolean enableCqExt, int cqExtFileSize) {
         MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
         messageStoreConfig.setMapedFileSizeCommitLog(commitLogFileSize);
         messageStoreConfig.setMapedFileSizeConsumeQueue(cqFileSize);
@@ -155,9 +150,7 @@ protected DefaultMessageStore gen(ConsumerFilterManager filterManager) throws Ex
             new MessageArrivingListener() {
                 @Override
                 public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
-                    long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
-//                    System.out.println(String.format("Msg coming: %s, %d, %d, %d",
-//                        topic, queueId, logicOffset, tagsCode));
+                                     long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
                 }
             }
             , brokerConfig);
@@ -166,8 +159,6 @@ public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
             @Override
             public void dispatch(DispatchRequest request) {
                 try {
-//                    System.out.println(String.format("offset:%d, bitMap:%s", request.getCommitLogOffset(),
-//                        BitsArray.create(request.getBitMap()).toString()));
                 } catch (Throwable e) {
                     e.printStackTrace();
                 }
@@ -183,7 +174,7 @@ public void dispatch(DispatchRequest request) {
     }
 
     protected List<MessageExtBrokerInner> putMsg(DefaultMessageStore master, int topicCount,
-        int msgCountPerTopic) throws Exception {
+                                                 int msgCountPerTopic) throws Exception {
         List<MessageExtBrokerInner> msgs = new ArrayList<MessageExtBrokerInner>();
         for (int i = 0; i < topicCount; i++) {
             String realTopic = topic + i;
@@ -229,22 +220,10 @@ public void dispatch(DispatchRequest request) {
     }
 
     @Test
-    public void testGetMessage_withFilterBitMapAndConsumerChanged() {
-        List<MessageExtBrokerInner> msgs = null;
-        try {
-            msgs = putMsg(master, topicCount, msgPerTopic);
-        } catch (Exception e) {
-            e.printStackTrace();
-            assertThat(true).isFalse();
-        }
+    public void testGetMessage_withFilterBitMapAndConsumerChanged() throws Exception {
+        List<MessageExtBrokerInner> msgs = putMsg(master, topicCount, msgPerTopic);
 
-        // sleep to wait for consume queue has been constructed.
-        try {
-            Thread.sleep(200);
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-            assertThat(true).isFalse();
-        }
+        Thread.sleep(200);
 
         // reset consumer;
         String topic = "topic" + 0;
@@ -303,16 +282,10 @@ public void testGetMessage_withFilterBitMapAndConsumerChanged() {
     }
 
     @Test
-    public void testGetMessage_withFilterBitMap() {
-        List<MessageExtBrokerInner> msgs = null;
-        try {
-            msgs = putMsg(master, topicCount, msgPerTopic);
-            // sleep to wait for consume queue has been constructed.
-            Thread.sleep(200);
-        } catch (Exception e) {
-            e.printStackTrace();
-            assertThat(true).isFalse();
-        }
+    public void testGetMessage_withFilterBitMap() throws Exception {
+        List<MessageExtBrokerInner> msgs = putMsg(master, topicCount, msgPerTopic);
+
+        Thread.sleep(100);
 
         for (int i = 0; i < topicCount; i++) {
             String realTopic = topic + i;
@@ -369,4 +342,32 @@ public void testGetMessage_withFilterBitMap() {
             }
         }
     }
+
+    @Test
+    public void testGetMessage_withFilter_checkTagsCode() throws Exception {
+        putMsg(master, topicCount, msgPerTopic);
+
+        Thread.sleep(200);
+
+        for (int i = 0; i < topicCount; i++) {
+            String realTopic = topic + i;
+
+            GetMessageResult getMessageResult = master.getMessage("test", realTopic, queueId, 0, 10000,
+                new MessageFilter() {
+                    @Override
+                    public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
+                        if (tagsCode != null && tagsCode <= ConsumeQueueExt.MAX_ADDR) {
+                            return false;
+                        }
+                        return true;
+                    }
+
+                    @Override
+                    public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
+                        return true;
+                    }
+                });
+            assertThat(getMessageResult.getMessageCount()).isEqualTo(msgPerTopic);
+        }
+    }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index 38b80738..cd7d8902 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.client.impl.consumer;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -101,7 +102,7 @@ public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
                     try {
                         if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {
                             try {
-                                msgTreeMap.remove(msgTreeMap.firstKey());
+                                removeMessage(Collections.singletonList(msg));
                             } catch (Exception e) {
                                 log.error("send expired msg exception", e);
                             }
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 379162d7..4922e3d9 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -446,6 +446,13 @@ private boolean putMessagePositionInfo(final long offset, final int size, final
 
             if (cqOffset != 0) {
                 long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
+
+                if (expectLogicOffset < currentLogicOffset) {
+                    log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
+                        expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
+                    return true;
+                }
+
                 if (expectLogicOffset != currentLogicOffset) {
                     LOG_ERROR.warn(
                         "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
@@ -569,6 +576,6 @@ protected boolean isExtWriteEnable() {
      * Check {@code tagsCode} is address of extend file or tags code.
      */
     public boolean isExtAddr(long tagsCode) {
-        return isExtReadEnable() && this.consumeQueueExt.isExtAddr(tagsCode);
+        return ConsumeQueueExt.isExtAddr(tagsCode);
     }
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java
index a118cde7..aeb2803e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java
@@ -95,7 +95,7 @@ public ConsumeQueueExt(final String topic,
      * Just test {@code address} is less than 0.
      * </p>
      */
-    public boolean isExtAddr(final long address) {
+    public static boolean isExtAddr(final long address) {
         return address <= MAX_ADDR;
     }
 
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 95a017ae..59ef4904 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -18,8 +18,10 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileLock;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -104,6 +106,10 @@
 
     private final LinkedList<CommitLogDispatcher> dispatcherList;
 
+    private RandomAccessFile lockFile;
+
+    private FileLock lock;
+
     public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
         final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
         this.messageArrivingListener = messageArrivingListener;
@@ -138,6 +144,10 @@ public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final Br
         this.dispatcherList = new LinkedList<>();
         this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
         this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
+
+        File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));
+        MappedFile.ensureDirOK(file.getParent());
+        lockFile = new RandomAccessFile(file, "rw");
     }
 
     public void truncateDirtyLogicFiles(long phyOffset) {
@@ -196,6 +206,15 @@ public boolean load() {
      * @throws Exception
      */
     public void start() throws Exception {
+
+        lock = lockFile.getChannel().tryLock(0, 1, false);
+        if (lock == null || lock.isShared() || !lock.isValid()) {
+            throw new RuntimeException("Lock failed,MQ already started");
+        }
+
+        lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
+        lockFile.getChannel().force(true);
+
         this.flushConsumeQueueService.start();
         this.commitLog.start();
         this.storeStatsService.start();
@@ -254,6 +273,14 @@ public void shutdown() {
         }
 
         this.transientStorePool.destroy();
+
+        if (lockFile != null && lock != null) {
+            try {
+                lock.release();
+                lockFile.close();
+            } catch (IOException e) {
+            }
+        }
     }
 
     public void destroy() {
@@ -487,7 +514,7 @@ public GetMessageResult getMessage(final String group, final String topic, final
                                 break;
                             }
 
-                            boolean extRet = false;
+                            boolean extRet = false, isTagsCodeLegal = true;
                             if (consumeQueue.isExtAddr(tagsCode)) {
                                 extRet = consumeQueue.getExt(tagsCode, cqExtUnit);
                                 if (extRet) {
@@ -496,11 +523,12 @@ public GetMessageResult getMessage(final String group, final String topic, final
                                     // can't find ext content.Client will filter messages by tag also.
                                     log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}",
                                         tagsCode, offsetPy, sizePy, topic, group);
+                                    isTagsCodeLegal = false;
                                 }
                             }
 
                             if (messageFilter != null
-                                && !messageFilter.isMatchedByConsumeQueue(tagsCode, extRet ? cqExtUnit : null)) {
+                                && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
                                 if (getResult.getBufferTotalSize() == 0) {
                                     status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java b/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java
index ef1d670a..ccd76c4f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java
@@ -40,6 +40,10 @@ public static String getAbortFile(final String rootDir) {
         return rootDir + File.separator + "abort";
     }
 
+    public static String getLockFile(final String rootDir) {
+        return rootDir + File.separator + "lock";
+    }
+
     public static String getDelayOffsetStorePath(final String rootDir) {
         return rootDir + File.separator + "config" + File.separator + "delayOffset.json";
     }
diff --git a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
index b03f2fce..b7d38f8c 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
@@ -17,22 +17,21 @@
 
 package org.apache.rocketmq.store;
 
-import org.apache.rocketmq.common.BrokerConfig;
-import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.message.MessageDecoder;
-import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.store.stats.BrokerStatsManager;
-import org.junit.Test;
-
 import java.io.File;
+import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.Map;
-
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import static org.assertj.core.api.Assertions.assertThat;
+import org.junit.Test;
 
 public class ConsumeQueueTest {
 
@@ -131,6 +130,65 @@ protected void putMsg(DefaultMessageStore master) throws Exception {
         }
     }
 
+    protected void deleteDirectory(String rootPath) {
+        File file = new File(rootPath);
+        deleteFile(file);
+    }
+
+    protected void deleteFile(File file) {
+        File[] subFiles = file.listFiles();
+        if (subFiles != null) {
+            for (File sub : subFiles) {
+                deleteFile(sub);
+            }
+        }
+
+        file.delete();
+    }
+
+    @Test
+    public void testPutMessagePositionInfo_buildCQRepeatedly() throws Exception {
+        DefaultMessageStore messageStore = null;
+        try {
+
+            messageStore = gen();
+
+            int totalMessages = 10;
+
+            for (int i = 0; i < totalMessages; i++) {
+                putMsg(messageStore);
+            }
+            Thread.sleep(5);
+
+            ConsumeQueue cq = messageStore.getConsumeQueueTable().get(topic).get(queueId);
+            Method method = cq.getClass().getDeclaredMethod("putMessagePositionInfo", long.class, int.class, long.class, long.class);
+
+            assertThat(method).isNotNull();
+
+            method.setAccessible(true);
+
+            SelectMappedBufferResult result = messageStore.getCommitLog().getData(0);
+            assertThat(result != null).isTrue();
+
+            DispatchRequest dispatchRequest = messageStore.getCommitLog().checkMessageAndReturnSize(result.getByteBuffer(), false, false);
+
+            assertThat(cq).isNotNull();
+
+            Object dispatchResult = method.invoke(cq, dispatchRequest.getCommitLogOffset(),
+                dispatchRequest.getMsgSize(), dispatchRequest.getTagsCode(), dispatchRequest.getConsumeQueueOffset());
+
+            assertThat(Boolean.parseBoolean(dispatchResult.toString())).isTrue();
+
+        } finally {
+            if (messageStore != null) {
+                messageStore.shutdown();
+                messageStore.destroy();
+            }
+            deleteDirectory(storePath);
+        }
+
+    }
+
     @Test
     public void testConsumeQueueWithExtendData() {
         DefaultMessageStore master = null;
diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index 6e37b705..9269cdfa 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -21,6 +21,7 @@
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.nio.channels.OverlappingFileLockException;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -56,6 +57,31 @@ public void init() throws Exception {
         messageStore.start();
     }
 
+    @Test(expected = OverlappingFileLockException.class)
+    public void test_repate_restart() throws Exception {
+        long totalMsgs = 100;
+        QUEUE_TOTAL = 1;
+        MessageBody = StoreMessage.getBytes();
+
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        messageStoreConfig.setMapedFileSizeCommitLog(1024 * 8);
+        messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 4);
+        messageStoreConfig.setMaxHashSlotNum(100);
+        messageStoreConfig.setMaxIndexNum(100 * 10);
+        MessageStore master = new DefaultMessageStore(messageStoreConfig, null, new MyMessageArrivingListener(), new BrokerConfig());
+
+        boolean load = master.load();
+        assertTrue(load);
+
+        try {
+            master.start();
+            master.start();
+        } finally {
+            master.shutdown();
+            master.destroy();
+        }
+    }
+
     @After
     public void destory() {
         messageStore.shutdown();
@@ -164,7 +190,7 @@ public void testPullSize() throws Exception {
     private class MyMessageArrivingListener implements MessageArrivingListener {
         @Override
         public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
-            byte[] filterBitMap, Map<String, String> properties) {
+                             byte[] filterBitMap, Map<String, String> properties) {
         }
     }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services