You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/07/22 02:15:51 UTC

[incubator-iotdb] branch cluster_new updated: Make heartbeat thread commit log task asynchronous

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cluster_new
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster_new by this push:
     new 50d1f3a  Make heartbeat thread commit log task asynchronous
     new dc88faa  Merge pull request #1533 from neuyilan/apache_cluster_new_0721_async_commit_log
50d1f3a is described below

commit 50d1f3aea891b69a11b61669153ece99d90d6886
Author: HouliangQi <ne...@163.com>
AuthorDate: Wed Jul 22 09:10:45 2020 +0800

    Make heartbeat thread commit log task asynchronous
---
 .../apache/iotdb/cluster/log/CommitLogTask.java    | 68 +++++++++++++++++++++
 .../iotdb/cluster/server/MetaClusterServer.java    |  1 +
 .../cluster/server/member/MetaGroupMember.java     |  6 +-
 .../iotdb/cluster/server/member/RaftMember.java    | 71 ++++++++++++++++++----
 .../org/apache/iotdb/db/utils/CommonUtils.java     |  9 +++
 5 files changed, 142 insertions(+), 13 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/CommitLogTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/CommitLogTask.java
new file mode 100644
index 0000000..001169a
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/CommitLogTask.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.log;
+
+import org.apache.iotdb.cluster.log.manage.RaftLogManager;
+import org.apache.iotdb.cluster.server.member.RaftMember.OnCommitLogEventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CommitLogTask implements Runnable {
+
+  private static final Logger logger = LoggerFactory.getLogger(CommitLogTask.class);
+  private RaftLogManager logManager;
+  private long leaderCommit;
+  private long term;
+
+  public CommitLogTask(RaftLogManager logManager, long leaderCommit, long term) {
+    this.logManager = logManager;
+    this.leaderCommit = leaderCommit;
+    this.term = term;
+  }
+
+  /**
+   * listener field
+   */
+  private OnCommitLogEventListener mListener;
+
+  /**
+   * @param mListener the event listener
+   */
+  public void registerOnGeekEventListener(OnCommitLogEventListener mListener) {
+    this.mListener = mListener;
+  }
+
+  public void doCommitLog() {
+    if (mListener == null) {
+      logger.error("event listener is not registered");
+      return;
+    }
+    boolean success = logManager.maybeCommit(leaderCommit, term);
+
+    if (success) {
+      mListener.onSuccess();
+    }
+  }
+
+  @Override
+  public void run() {
+    doCommitLog();
+  }
+}
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index 5ec004c..5718c5f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -110,6 +110,7 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
     if (ioTDB == null) {
       return;
     }
+    metaHeartbeatServer.stop();
     super.stop();
     ioTDB.stop();
     ioTDB = null;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 8a081a6..46e413e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -360,6 +360,9 @@ public class MetaGroupMember extends RaftMember {
     if (getDataClusterServer() != null) {
       getDataClusterServer().stop();
     }
+    if (getDataHeartbeatServer() != null) {
+      getDataHeartbeatServer().stop();
+    }
     if (clientServer != null) {
       clientServer.stop();
     }
@@ -577,7 +580,8 @@ public class MetaGroupMember extends RaftMember {
     } else if (resp.getRespNum() == Response.RESPONSE_NEW_NODE_PARAMETER_CONFLICT) {
       CheckStatusResponse checkStatusResponse = resp.getCheckStatusResponse();
       StringBuilder parameters = new StringBuilder();
-      parameters.append(checkStatusResponse.isPartitionalIntervalEquals() ? "" : ", partition interval");
+      parameters
+          .append(checkStatusResponse.isPartitionalIntervalEquals() ? "" : ", partition interval");
       parameters.append(checkStatusResponse.isHashSaltEquals() ? "" : ", hash salt");
       parameters.append(checkStatusResponse.isReplicationNumEquals() ? "" : ", replication number");
       parameters.append(checkStatusResponse.isSeedNodeEquals() ? "" : ", seedNodes");
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 706e87f..b97fbc1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -64,6 +64,7 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.CheckConsistencyException;
 import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
+import org.apache.iotdb.cluster.log.CommitLogTask;
 import org.apache.iotdb.cluster.log.HardState;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogParser;
@@ -93,6 +94,7 @@ import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.utils.CommonUtils;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
@@ -165,6 +167,11 @@ public abstract class RaftMember {
   // a thread pool that is used to convert serial operations into paralleled ones
   private ExecutorService asyncThreadPool;
 
+  /**
+   * a thread pool that is used to do commit log tasks asynchronous in heartbeat thread
+   */
+  private ExecutorService commitLogPool;
+
   public RaftMember() {
   }
 
@@ -208,6 +215,12 @@ public abstract class RaftMember {
     asyncThreadPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), 100,
         0L, TimeUnit.MILLISECONDS,
         new LinkedBlockingQueue<>());
+
+    commitLogPool = new ThreadPoolExecutor(CommonUtils.getCpuCores(),
+        CommonUtils.getMaxExecutorPoolSize(),
+        0L, TimeUnit.MILLISECONDS,
+        new LinkedBlockingQueue<>());
+
     logger.info("{} started", name);
   }
 
@@ -248,6 +261,16 @@ public abstract class RaftMember {
         logger.error("Unexpected interruption when waiting for asyncThreadPool to end", e);
       }
     }
+
+    if (commitLogPool != null) {
+      commitLogPool.shutdownNow();
+      try {
+        commitLogPool.awaitTermination(10, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        logger.error("Unexpected interruption when waiting for commitLogPool to end", e);
+      }
+    }
     catchUpService = null;
     heartBeatService = null;
     appendLogThreadPool = null;
@@ -295,18 +318,13 @@ public abstract class RaftMember {
           response.setLastLogIndex(logManager.getLastLogIndex());
           response.setLastLogTerm(logManager.getLastLogTerm());
 
-          // The term of the last log needs to be the same with leader's term in order to preserve
-          // safety, otherwise it may come from an invalid leader and is not committed
-          if (logManager.maybeCommit(request.getCommitLogIndex(), request.getCommitLogTerm())) {
-            logger.debug("{}: Committing to {}-{}, localCommit: {}-{}, localLast: {}-{}", name,
-                request.getCommitLogIndex(),
-                request.getCommitLogTerm(), logManager.getCommitLogIndex(),
-                logManager.getCommitLogTerm(), logManager.getLastLogIndex(),
-                logManager.getLastLogTerm());
-            synchronized (syncLock) {
-              syncLock.notifyAll();
-            }
-          } else if (logManager.getCommitLogIndex() < request.getCommitLogIndex()) {
+          CommitLogTask commitLogTask = new CommitLogTask(logManager, request.getCommitLogIndex(),
+              request.getCommitLogTerm());
+          OnCommitLogEventListener mListener = new AsyncCommitLogEvent();
+          commitLogTask.registerOnGeekEventListener(mListener);
+          commitLogPool.submit(commitLogTask);
+
+          if (logManager.getCommitLogIndex() < request.getCommitLogIndex()) {
             logger
                 .info("{}: Inconsistent log found, leader: {}-{}, local: {}-{}, last: {}-{}", name,
                     request.getCommitLogIndex(), request.getCommitLogTerm(),
@@ -1564,4 +1582,33 @@ public abstract class RaftMember {
   public ExecutorService getAsyncThreadPool() {
     return asyncThreadPool;
   }
+
+  public interface OnCommitLogEventListener {
+
+    /**
+     * the callback method when committed log async success
+     */
+    void onSuccess();
+
+    /**
+     * @param e the exception raised when committed log async failed
+     */
+    void onError(Exception e);
+  }
+
+  public class AsyncCommitLogEvent implements OnCommitLogEventListener {
+
+    @Override
+    public void onSuccess() {
+      synchronized (syncLock) {
+        syncLock.notifyAll();
+      }
+    }
+
+    @Override
+    public void onError(Exception e) {
+      logger.error("async commit log failed, {}", e.toString());
+    }
+  }
+
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
index c036414..f5ce443 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
@@ -33,6 +33,11 @@ public class CommonUtils {
 
   private static final int CPUS = Runtime.getRuntime().availableProcessors();
 
+  /**
+   * Default executor pool maximum size.
+   */
+  public static final int MAX_EXECUTOR_POOL_SIZE = Math.max(100, getCpuCores() * 5);
+
   private CommonUtils() {
   }
 
@@ -142,4 +147,8 @@ public class CommonUtils {
   public static int getCpuCores() {
     return CPUS;
   }
+
+  public static int getMaxExecutorPoolSize() {
+    return MAX_EXECUTOR_POOL_SIZE;
+  }
 }