You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/09/05 03:30:39 UTC
incubator-rocketmq git commit: [ROCKETMQ-281] Add check policy for
preventing repeat start mq
Repository: incubator-rocketmq
Updated Branches:
refs/heads/develop 254d43249 -> 6a97d2884
[ROCKETMQ-281] Add check policy for preventing repeat start mq
Author: 傅冲 <yu...@alibaba-inc.com>
Author: fuyou001 <fu...@gmail.com>
Closes #158 from fuyou001/ROCKETMQ-281.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/6a97d288
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/6a97d288
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/6a97d288
Branch: refs/heads/develop
Commit: 6a97d2884fe8ea23de8f231c8647b99ea5be1811
Parents: 254d432
Author: 傅冲 <yu...@alibaba-inc.com>
Authored: Tue Sep 5 11:30:11 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Sep 5 11:30:11 2017 +0800
----------------------------------------------------------------------
.../apache/rocketmq/broker/BrokerStartup.java | 4 +-
.../rocketmq/broker/BrokerStartupTest.java | 3 +-
.../rocketmq/store/DefaultMessageStore.java | 62 ++++++++++++++------
.../store/config/StorePathConfigHelper.java | 4 ++
.../rocketmq/store/DefaultMessageStoreTest.java | 28 ++++++++-
5 files changed, 80 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6a97d288/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
index e0a3b69..94ebe4f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -54,7 +54,9 @@ public class BrokerStartup {
public static BrokerController start(BrokerController controller) {
try {
+
controller.start();
+
String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
+ controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
@@ -242,7 +244,7 @@ public class BrokerStartup {
System.setProperty("rocketmq.namesrv.domain.subgroup", rmqAddressServerSubGroup);
}
- public static Options buildCommandlineOptions(final Options options) {
+ private static Options buildCommandlineOptions(final Options options) {
Option opt = new Option("c", "configFile", true, "Broker config properties file");
opt.setRequired(false);
options.addOption(opt);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6a97d288/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java
index a5ad3ac..c8da08d 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerStartupTest.java
@@ -25,6 +25,8 @@ import org.junit.Test;
public class BrokerStartupTest {
+ private String storePathRootDir = ".";
+
@Test
public void testProperties2SystemEnv() throws NoSuchMethodException, InvocationTargetException,
IllegalAccessException {
@@ -36,5 +38,4 @@ public class BrokerStartupTest {
method.invoke(null, properties);
Assert.assertEquals("value", System.getProperty("rocketmq.namesrv.domain"));
}
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6a97d288/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index ffa8dbc..59ef490 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -16,6 +16,25 @@
*/
package org.apache.rocketmq.store;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileLock;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceThread;
@@ -39,24 +58,6 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
import static org.apache.rocketmq.store.config.BrokerRole.SLAVE;
public class DefaultMessageStore implements MessageStore {
@@ -105,6 +106,10 @@ public class DefaultMessageStore implements MessageStore {
private final LinkedList<CommitLogDispatcher> dispatcherList;
+ private RandomAccessFile lockFile;
+
+ private FileLock lock;
+
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
this.messageArrivingListener = messageArrivingListener;
@@ -139,6 +144,10 @@ public class DefaultMessageStore implements MessageStore {
this.dispatcherList = new LinkedList<>();
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());
+
+ File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));
+ MappedFile.ensureDirOK(file.getParent());
+ lockFile = new RandomAccessFile(file, "rw");
}
public void truncateDirtyLogicFiles(long phyOffset) {
@@ -197,6 +206,15 @@ public class DefaultMessageStore implements MessageStore {
* @throws Exception
*/
public void start() throws Exception {
+
+ lock = lockFile.getChannel().tryLock(0, 1, false);
+ if (lock == null || lock.isShared() || !lock.isValid()) {
+ throw new RuntimeException("Lock failed,MQ already started");
+ }
+
+ lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));
+ lockFile.getChannel().force(true);
+
this.flushConsumeQueueService.start();
this.commitLog.start();
this.storeStatsService.start();
@@ -255,6 +273,14 @@ public class DefaultMessageStore implements MessageStore {
}
this.transientStorePool.destroy();
+
+ if (lockFile != null && lock != null) {
+ try {
+ lock.release();
+ lockFile.close();
+ } catch (IOException e) {
+ }
+ }
}
public void destroy() {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6a97d288/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java b/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java
index ef1d670..ccd76c4 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java
@@ -40,6 +40,10 @@ public class StorePathConfigHelper {
return rootDir + File.separator + "abort";
}
+ public static String getLockFile(final String rootDir) {
+ return rootDir + File.separator + "lock";
+ }
+
public static String getDelayOffsetStorePath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "delayOffset.json";
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6a97d288/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
----------------------------------------------------------------------
diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index 6e37b70..9269cdf 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.nio.channels.OverlappingFileLockException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@@ -56,6 +57,31 @@ public class DefaultMessageStoreTest {
messageStore.start();
}
+ @Test(expected = OverlappingFileLockException.class)
+ public void test_repate_restart() throws Exception {
+ long totalMsgs = 100;
+ QUEUE_TOTAL = 1;
+ MessageBody = StoreMessage.getBytes();
+
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ messageStoreConfig.setMapedFileSizeCommitLog(1024 * 8);
+ messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 4);
+ messageStoreConfig.setMaxHashSlotNum(100);
+ messageStoreConfig.setMaxIndexNum(100 * 10);
+ MessageStore master = new DefaultMessageStore(messageStoreConfig, null, new MyMessageArrivingListener(), new BrokerConfig());
+
+ boolean load = master.load();
+ assertTrue(load);
+
+ try {
+ master.start();
+ master.start();
+ } finally {
+ master.shutdown();
+ master.destroy();
+ }
+ }
+
@After
public void destory() {
messageStore.shutdown();
@@ -164,7 +190,7 @@ public class DefaultMessageStoreTest {
private class MyMessageArrivingListener implements MessageArrivingListener {
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
- byte[] filterBitMap, Map<String, String> properties) {
+ byte[] filterBitMap, Map<String, String> properties) {
}
}
}