You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/08/01 07:26:43 UTC

[18/50] [abbrv] incubator-rocketmq git commit: [ROCKETMQ-98]Fix risk of unable to release putMessage Lock forever closes apache/incubator-rocketmq#61

[ROCKETMQ-98]Fix risk of unable to release putMessage Lock forever closes apache/incubator-rocketmq#61


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/b1fcf1b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/b1fcf1b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/b1fcf1b8

Branch: refs/heads/master
Commit: b1fcf1b83b659bd03bcebf651d9e88c294a89e07
Parents: 0adad6f
Author: Jaskey <li...@gmail.com>
Authored: Sat May 27 11:21:09 2017 +0800
Committer: dongeforever <zh...@yeah.net>
Committed: Sat May 27 11:21:09 2017 +0800

----------------------------------------------------------------------
 .../apache/rocketmq/common/BrokerConfig.java    |  3 ++
 .../org/apache/rocketmq/store/CommitLog.java    | 40 ++++--------------
 .../apache/rocketmq/store/PutMessageLock.java   | 25 ++++++++++++
 .../rocketmq/store/PutMessageReentrantLock.java | 37 +++++++++++++++++
 .../rocketmq/store/PutMessageSpinLock.java      | 43 ++++++++++++++++++++
 .../store/config/MessageStoreConfig.java        |  4 ++
 6 files changed, 119 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b1fcf1b8/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index f0a73bd..5bce013 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -47,6 +47,9 @@ public class BrokerConfig {
     private boolean autoCreateSubscriptionGroup = true;
     private String messageStorePlugIn = "";
 
+    /**
+     * thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default value is 1.
+     */
     private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4;
     private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
     private int adminBrokerThreadPoolNums = 16;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b1fcf1b8/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 7841feb..7b29263 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -23,8 +23,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantLock;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -63,12 +61,7 @@ public class CommitLog {
     private volatile long confirmOffset = -1L;
 
     private volatile long beginTimeInLock = 0;
-
-    //true: Can lock, false : in lock.
-    private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);
-
-    private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync
-
+    private final PutMessageLock putMessageLock;
     public CommitLog(final DefaultMessageStore defaultMessageStore) {
         this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
             defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
@@ -88,6 +81,8 @@ public class CommitLog {
                 return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
             }
         };
+        this.putMessageLock =  defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
+
     }
 
     public boolean load() {
@@ -577,7 +572,7 @@ public class CommitLog {
         MappedFile unlockMappedFile = null;
         MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
 
-        lockForPutMessage(); //spin...
+        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
         try {
             long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
             this.beginTimeInLock = beginLockTimestamp;
@@ -626,7 +621,7 @@ public class CommitLog {
             eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
             beginTimeInLock = 0;
         } finally {
-            releasePutMessageLock();
+            putMessageLock.unlock();
         }
 
         if (eclipseTimeInLock > 500) {
@@ -861,7 +856,7 @@ public class CommitLog {
     }
 
     public boolean appendData(long startOffset, byte[] data) {
-        lockForPutMessage(); //spin...
+        putMessageLock.lock();
         try {
             MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset);
             if (null == mappedFile) {
@@ -871,7 +866,7 @@ public class CommitLog {
 
             return mappedFile.appendMessage(data);
         } finally {
-            releasePutMessageLock();
+            putMessageLock.unlock();
         }
     }
 
@@ -906,28 +901,7 @@ public class CommitLog {
         return diff;
     }
 
-    /**
-     * Spin util acquired the lock.
-     */
-    private void lockForPutMessage() {
-        if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) {
-            putMessageNormalLock.lock();
-        } else {
-            boolean flag;
-            do {
-                flag = this.putMessageSpinLock.compareAndSet(true, false);
-            }
-            while (!flag);
-        }
-    }
 
-    private void releasePutMessageLock() {
-        if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) {
-            putMessageNormalLock.unlock();
-        } else {
-            this.putMessageSpinLock.compareAndSet(false, true);
-        }
-    }
 
     public static class GroupCommitRequest {
         private final long nextOffset;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b1fcf1b8/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java
new file mode 100644
index 0000000..a03e41a
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store;
+
+/**
+ * Used when trying to put message
+ */
+public interface PutMessageLock {
+    void lock();
+    void unlock();
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b1fcf1b8/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java
new file mode 100644
index 0000000..9198f1c
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store;
+
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Exclusive lock implementation to put message
+ */
+public class PutMessageReentrantLock implements PutMessageLock {
+    private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync
+
+    @Override
+    public void lock() {
+        putMessageNormalLock.lock();
+    }
+
+    @Override
+    public void unlock() {
+        putMessageNormalLock.unlock();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b1fcf1b8/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java
new file mode 100644
index 0000000..baa809d
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store;
+
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Spin lock Implementation to put message, suggest using this witb low race conditions
+ *
+ */
+public class PutMessageSpinLock implements PutMessageLock {
+    //true: Can lock, false : in lock.
+    private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);
+
+    @Override
+    public void lock() {
+        boolean flag;
+        do {
+            flag = this.putMessageSpinLock.compareAndSet(true, false);
+        }
+        while (!flag);
+    }
+
+    @Override
+    public void unlock() {
+        this.putMessageSpinLock.compareAndSet(false, true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b1fcf1b8/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 29f800c..19ed211 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -52,6 +52,10 @@ public class MessageStoreConfig {
     @ImportantField
     private int commitIntervalCommitLog = 200;
 
+    /**
+     * introduced since 4.0.x. Determine whether to use mutex reentrantLock when putting message.<br/>
+     * By default it is set to false indicating using spin lock when putting message.
+     */
     private boolean useReentrantLockWhenPutMessage = false;
 
     // Whether schedule flush,default is real-time