You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/08/01 07:23:17 UTC
[19/50] [abbrv] incubator-rocketmq git commit: [ROCKETMQ-98]Fix risk
of unable to release putMessage Lock forever closes
apache/incubator-rocketmq#61
[ROCKETMQ-98]Fix risk of unable to release putMessage Lock forever closes apache/incubator-rocketmq#61
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/031347db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/031347db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/031347db
Branch: refs/heads/develop
Commit: 031347db7314b511ea7356ac892001ac1349489e
Parents: 16c8d43
Author: Jaskey <li...@gmail.com>
Authored: Sat May 27 11:21:09 2017 +0800
Committer: dongeforever <zh...@yeah.net>
Committed: Tue Jun 6 11:37:29 2017 +0800
----------------------------------------------------------------------
.../apache/rocketmq/common/BrokerConfig.java | 3 ++
.../org/apache/rocketmq/store/CommitLog.java | 40 ++++--------------
.../apache/rocketmq/store/PutMessageLock.java | 25 ++++++++++++
.../rocketmq/store/PutMessageReentrantLock.java | 37 +++++++++++++++++
.../rocketmq/store/PutMessageSpinLock.java | 43 ++++++++++++++++++++
.../store/config/MessageStoreConfig.java | 4 ++
6 files changed, 119 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/031347db/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index f0a73bd..5bce013 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -47,6 +47,9 @@ public class BrokerConfig {
private boolean autoCreateSubscriptionGroup = true;
private String messageStorePlugIn = "";
+ /**
+ * thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default value is 1.
+ */
private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4;
private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
private int adminBrokerThreadPoolNums = 16;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/031347db/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 7841feb..7b29263 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -23,8 +23,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -63,12 +61,7 @@ public class CommitLog {
private volatile long confirmOffset = -1L;
private volatile long beginTimeInLock = 0;
-
- //true: Can lock, false : in lock.
- private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);
-
- private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync
-
+ private final PutMessageLock putMessageLock;
public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
@@ -88,6 +81,8 @@ public class CommitLog {
return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
};
+ this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
+
}
public boolean load() {
@@ -577,7 +572,7 @@ public class CommitLog {
MappedFile unlockMappedFile = null;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
- lockForPutMessage(); //spin...
+ putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
@@ -626,7 +621,7 @@ public class CommitLog {
eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
- releasePutMessageLock();
+ putMessageLock.unlock();
}
if (eclipseTimeInLock > 500) {
@@ -861,7 +856,7 @@ public class CommitLog {
}
public boolean appendData(long startOffset, byte[] data) {
- lockForPutMessage(); //spin...
+ putMessageLock.lock();
try {
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset);
if (null == mappedFile) {
@@ -871,7 +866,7 @@ public class CommitLog {
return mappedFile.appendMessage(data);
} finally {
- releasePutMessageLock();
+ putMessageLock.unlock();
}
}
@@ -906,28 +901,7 @@ public class CommitLog {
return diff;
}
- /**
- * Spin util acquired the lock.
- */
- private void lockForPutMessage() {
- if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) {
- putMessageNormalLock.lock();
- } else {
- boolean flag;
- do {
- flag = this.putMessageSpinLock.compareAndSet(true, false);
- }
- while (!flag);
- }
- }
- private void releasePutMessageLock() {
- if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) {
- putMessageNormalLock.unlock();
- } else {
- this.putMessageSpinLock.compareAndSet(false, true);
- }
- }
public static class GroupCommitRequest {
private final long nextOffset;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/031347db/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java
new file mode 100644
index 0000000..a03e41a
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store;
+
+/**
+ * Used when trying to put message
+ */
+public interface PutMessageLock {
+ void lock();
+ void unlock();
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/031347db/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java
new file mode 100644
index 0000000..9198f1c
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store;
+
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Exclusive lock implementation to put message
+ */
+public class PutMessageReentrantLock implements PutMessageLock {
+ private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync
+
+ @Override
+ public void lock() {
+ putMessageNormalLock.lock();
+ }
+
+ @Override
+ public void unlock() {
+ putMessageNormalLock.unlock();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/031347db/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java
new file mode 100644
index 0000000..baa809d
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store;
+
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Spin lock Implementation to put message, suggest using this witb low race conditions
+ *
+ */
+public class PutMessageSpinLock implements PutMessageLock {
+ //true: Can lock, false : in lock.
+ private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);
+
+ @Override
+ public void lock() {
+ boolean flag;
+ do {
+ flag = this.putMessageSpinLock.compareAndSet(true, false);
+ }
+ while (!flag);
+ }
+
+ @Override
+ public void unlock() {
+ this.putMessageSpinLock.compareAndSet(false, true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/031347db/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 29f800c..19ed211 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -52,6 +52,10 @@ public class MessageStoreConfig {
@ImportantField
private int commitIntervalCommitLog = 200;
+ /**
+ * introduced since 4.0.x. Determine whether to use mutex reentrantLock when putting message.<br/>
+ * By default it is set to false indicating using spin lock when putting message.
+ */
private boolean useReentrantLockWhenPutMessage = false;
// Whether schedule flush,default is real-time