You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/09/05 03:30:39 UTC

incubator-rocketmq git commit: [ROCKETMQ-281] Add check policy for preventing repeat start mq

Repository: incubator-rocketmq
Updated Branches:
  refs/heads/develop 254d43249 -> 6a97d2884


[ROCKETMQ-281] Add check policy for preventing repeat start mq

Author: 傅冲 <yu...@alibaba-inc.com>
Author: fuyou001 <fu...@gmail.com>

Closes #158 from fuyou001/ROCKETMQ-281.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/6a97d288
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/6a97d288
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/6a97d288

Branch: refs/heads/develop
Commit: 6a97d2884fe8ea23de8f231c8647b99ea5be1811
Parents: 254d432
Author: 傅冲 <yu...@alibaba-inc.com>
Authored: Tue Sep 5 11:30:11 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Sep 5 11:30:11 2017 +0800

----------------------------------------------------------------------
 .../apache/rocketmq/broker/BrokerStartup.java   |  4 +-
 .../rocketmq/broker/BrokerStartupTest.java      |  3 +-
 .../rocketmq/store/DefaultMessageStore.java     | 62 ++++++++++++++------
 .../store/config/StorePathConfigHelper.java     |  4 ++
 .../rocketmq/store/DefaultMessageStoreTest.java | 28 ++++++++-
 5 files changed, 80 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6a97d288/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
index e0a3b69..94ebe4f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -54,7 +54,9 @@ public class BrokerStartup {
 
     public static BrokerController start(BrokerController controller) {
         try {
+
             controller.start();
+
             String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
                 + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
 
@@ -242,7 +244,7 @@ public class BrokerStartup {
         System.setProperty("rocketmq.namesrv.domain.subgroup", rmqAddressServerSubGroup);
     }
 
-    public static Options buildCommandlineOptions(final Options options) {
+    private static Options buildCommandlineOptions(final Options options) {
         Option opt = new Option("c", "configFile", true, "Broker config properties file");
         opt.setRequired(false);
         options.addOption(opt);

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6a97d288/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java
index a5ad3ac..c8da08d 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java
@@ -25,6 +25,8 @@ import org.junit.Test;
 
 public class BrokerStartupTest {
 
+    private String storePathRootDir = ".";
+
     @Test
     public void testProperties2SystemEnv() throws NoSuchMethodException, InvocationTargetException,
         IllegalAccessException {
@@ -36,5 +38,4 @@ public class BrokerStartupTest {
         method.invoke(null, properties);
         Assert.assertEquals("value", System.getProperty("rocketmq.namesrv.domain"));
     }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6a97d288/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index ffa8dbc..59ef490 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -16,6 +16,25 @@
  */
 package org.apache.rocketmq.store;
 
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileLock;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ServiceThread;
@@ -39,24 +58,6 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
 import static org.apache.rocketmq.store.config.BrokerRole.SLAVE;
 
 public class DefaultMessageStore implements MessageStore {
@@ -105,6 +106,10 @@ public class DefaultMessageStore implements MessageStore {
 
     private final LinkedList<CommitLogDispatcher> dispatcherList;
 
+    private RandomAccessFile lockFile;
+
+    private FileLock lock;
+
     public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
         final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
         this.messageArrivingListener = messageArrivingListener;
@@ -139,6 +144,10 @@ public class DefaultMessageStore implements MessageStore {
         this.dispatcherList = new LinkedList<>();
         this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
         this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
+
+        File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));
+        MappedFile.ensureDirOK(file.getParent());
+        lockFile = new RandomAccessFile(file, "rw");
     }
 
     public void truncateDirtyLogicFiles(long phyOffset) {
@@ -197,6 +206,15 @@ public class DefaultMessageStore implements MessageStore {
      * @throws Exception
      */
     public void start() throws Exception {
+
+        lock = lockFile.getChannel().tryLock(0, 1, false);
+        if (lock == null || lock.isShared() || !lock.isValid()) {
+            throw new RuntimeException("Lock failed,MQ already started");
+        }
+
+        lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
+        lockFile.getChannel().force(true);
+
         this.flushConsumeQueueService.start();
         this.commitLog.start();
         this.storeStatsService.start();
@@ -255,6 +273,14 @@ public class DefaultMessageStore implements MessageStore {
         }
 
         this.transientStorePool.destroy();
+
+        if (lockFile != null && lock != null) {
+            try {
+                lock.release();
+                lockFile.close();
+            } catch (IOException e) {
+            }
+        }
     }
 
     public void destroy() {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6a97d288/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java b/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java
index ef1d670..ccd76c4 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java
@@ -40,6 +40,10 @@ public class StorePathConfigHelper {
         return rootDir + File.separator + "abort";
     }
 
+    public static String getLockFile(final String rootDir) {
+        return rootDir + File.separator + "lock";
+    }
+
     public static String getDelayOffsetStorePath(final String rootDir) {
         return rootDir + File.separator + "config" + File.separator + "delayOffset.json";
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6a97d288/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
----------------------------------------------------------------------
diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index 6e37b70..9269cdf 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.nio.channels.OverlappingFileLockException;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -56,6 +57,31 @@ public class DefaultMessageStoreTest {
         messageStore.start();
     }
 
+    @Test(expected = OverlappingFileLockException.class)
+    public void test_repate_restart() throws Exception {
+        long totalMsgs = 100;
+        QUEUE_TOTAL = 1;
+        MessageBody = StoreMessage.getBytes();
+
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        messageStoreConfig.setMapedFileSizeCommitLog(1024 * 8);
+        messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 4);
+        messageStoreConfig.setMaxHashSlotNum(100);
+        messageStoreConfig.setMaxIndexNum(100 * 10);
+        MessageStore master = new DefaultMessageStore(messageStoreConfig, null, new MyMessageArrivingListener(), new BrokerConfig());
+
+        boolean load = master.load();
+        assertTrue(load);
+
+        try {
+            master.start();
+            master.start();
+        } finally {
+            master.shutdown();
+            master.destroy();
+        }
+    }
+
     @After
     public void destory() {
         messageStore.shutdown();
@@ -164,7 +190,7 @@ public class DefaultMessageStoreTest {
     private class MyMessageArrivingListener implements MessageArrivingListener {
         @Override
         public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
-            byte[] filterBitMap, Map<String, String> properties) {
+                             byte[] filterBitMap, Map<String, String> properties) {
         }
     }
 }