You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/12/21 07:39:29 UTC

[iotdb] branch expr updated: reuse client in LosDispatcher add commitIndex in heartbeat to update peer index remove timer threshold

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/expr by this push:
     new e6bf459  reuse client in LosDispatcher add commitIndex in heartbeat to update peer index remove timer threshold
e6bf459 is described below

commit e6bf459c74fe57256b6b5932e296bbad6a89f466
Author: jt <jt...@163.com>
AuthorDate: Tue Dec 21 15:36:00 2021 +0800

    reuse client in LosDispatcher
    add commitIndex in heartbeat to update peer index
    remove timer threshold
---
 .../java/org/apache/iotdb/cluster/log/Log.java     |  6 +++-
 .../apache/iotdb/cluster/log/LogDispatcher.java    | 10 +++----
 .../iotdb/cluster/log/catchup/CatchUpTask.java     |  8 +++--
 .../cluster/log/catchup/SnapshotCatchUpTask.java   | 35 +++++++++++++---------
 .../iotdb/cluster/log/logtypes/CloseFileLog.java   |  2 +-
 .../cluster/log/logtypes/EmptyContentLog.java      |  2 +-
 .../iotdb/cluster/log/logtypes/LargeTestLog.java   |  2 +-
 .../cluster/log/logtypes/PhysicalPlanLog.java      |  2 +-
 .../server/handlers/caller/HeartbeatHandler.java   |  6 +---
 .../iotdb/cluster/server/member/RaftMember.java    | 11 ++++---
 .../apache/iotdb/cluster/server/monitor/Timer.java |  9 ------
 thrift-cluster/src/main/thrift/cluster.thrift      |  1 +
 12 files changed, 49 insertions(+), 45 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
index 4845bba..59dd534 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
@@ -33,7 +33,7 @@ public abstract class Log implements Comparable<Log> {
       Comparator.comparingLong(Log::getCurrLogIndex).thenComparing(Log::getCurrLogTerm);
 
   // make this configurable or adaptive
-  public static int DEFAULT_BUFFER_SIZE = 16 * 1024;
+  private static final int DEFAULT_BUFFER_SIZE = 16 * 1024;
   private long currLogIndex;
   private long currLogTerm;
 
@@ -48,6 +48,10 @@ public abstract class Log implements Comparable<Log> {
 
   private int byteSize = 0;
 
+  public static int getDefaultBufferSize() {
+    return DEFAULT_BUFFER_SIZE;
+  }
+
   public abstract ByteBuffer serialize();
 
   public abstract void deserialize(ByteBuffer buffer);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index ba8f55f..ffcdb8f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -320,10 +320,8 @@ public class LogDispatcher {
       }
       Timer.Statistic.RAFT_SENDER_WAIT_FOR_PREV_LOG.calOperationCostTimeFromStart(startTime);
 
-      Client client = member.getSyncClient(receiver);
       if (client == null) {
-        logger.error("No available client for {}", receiver);
-        return;
+        client = member.getSyncClient(receiver);
       }
       AsyncMethodCallback<AppendEntryResult> handler = new AppendEntriesHandler(currBatch);
       startTime = Timer.Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
@@ -333,10 +331,10 @@ public class LogDispatcher {
         handler.onComplete(result);
       } catch (TException e) {
         client.getInputProtocol().getTransport().close();
-        handler.onError(e);
-        logger.warn("Failed logs: {}, first index: {}", logList, request.prevLogIndex + 1);
-      } finally {
         ClientUtils.putBackSyncClient(client);
+        client = member.getSyncClient(receiver);
+        logger.warn("Failed logs: {}, first index: {}", logList, request.prevLogIndex + 1);
+        handler.onError(e);
       }
     }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
index 0552e2e..061490f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
@@ -59,6 +59,8 @@ public class CatchUpTask implements Runnable {
   private String name;
   private int raftId;
 
+  private long startTime;
+
   public CatchUpTask(Node node, int raftId, Peer peer, RaftMember raftMember, long lastLogIdx) {
     this.node = node;
     this.raftId = raftId;
@@ -345,6 +347,7 @@ public class CatchUpTask implements Runnable {
 
   @Override
   public void run() {
+    startTime = System.currentTimeMillis();
     try {
       boolean findMatchedIndex = checkMatchIndex();
       if (abort) {
@@ -378,10 +381,11 @@ public class CatchUpTask implements Runnable {
         }
         if (logger.isInfoEnabled()) {
           logger.info(
-              "{}: Catch up {} finished, update it's matchIndex to {}",
+              "{}: Catch up {} finished, update it's matchIndex to {}, time consumption: {}ms",
               raftMember.getName(),
               node,
-              peer.getMatchIndex());
+              peer.getMatchIndex(),
+              System.currentTimeMillis() - startTime);
         }
         peer.resetInconsistentHeartbeatNum();
       }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java
index b11f886..f32edad 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java
@@ -36,6 +36,7 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -113,27 +114,33 @@ public class SnapshotCatchUpTask extends LogCatchUpTask implements Callable<Bool
   }
 
   private boolean sendSnapshotSync(SendSnapshotRequest request) throws TException {
-    logger.info(
-        "{}: sending a snapshot request size={} to {}",
-        raftMember.getName(),
-        request.getSnapshotBytes().length,
-        node);
-    Client client = raftMember.getSyncClient(node);
-    if (client == null) {
-      return false;
-    }
-    try {
+    int retry = 5;
+    for (int i = 0; i < retry; i++) {
+      logger.info(
+          "{}: sending a snapshot request size={} to {}",
+          raftMember.getName(),
+          request.getSnapshotBytes().length,
+          node);
+
+      Client client = raftMember.getSyncClient(node);
+      if (client == null) {
+        return false;
+      }
+
       try {
         client.sendSnapshot(request);
         logger.info("{}: snapshot is sent to {}", raftMember.getName(), node);
         return true;
       } catch (TException e) {
-        client.getInputProtocol().getTransport().close();
-        throw e;
+        if (!(e.getCause() instanceof SocketTimeoutException)) {
+          client.getInputProtocol().getTransport().close();
+          throw e;
+        }
+      } finally {
+        ClientUtils.putBackSyncClient(client);
       }
-    } finally {
-      ClientUtils.putBackSyncClient(client);
     }
+    return false;
   }
 
   @Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/CloseFileLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/CloseFileLog.java
index 675b7fe..bade7a1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/CloseFileLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/CloseFileLog.java
@@ -46,7 +46,7 @@ public class CloseFileLog extends Log {
 
   @Override
   public ByteBuffer serialize() {
-    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(DEFAULT_BUFFER_SIZE);
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(getDefaultBufferSize());
     try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
       dataOutputStream.writeByte((byte) CLOSE_FILE.ordinal());
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/EmptyContentLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/EmptyContentLog.java
index 1e785d5..cc9cec6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/EmptyContentLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/EmptyContentLog.java
@@ -39,7 +39,7 @@ public class EmptyContentLog extends Log {
 
   @Override
   public ByteBuffer serialize() {
-    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(DEFAULT_BUFFER_SIZE);
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(getDefaultBufferSize());
     try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
       dataOutputStream.writeByte((byte) EMPTY_CONTENT.ordinal());
       dataOutputStream.writeLong(getCurrLogIndex());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/LargeTestLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/LargeTestLog.java
index 32e4a1c..12a8e61 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/LargeTestLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/LargeTestLog.java
@@ -38,7 +38,7 @@ public class LargeTestLog extends Log {
 
   @Override
   public ByteBuffer serialize() {
-    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(DEFAULT_BUFFER_SIZE);
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(getDefaultBufferSize());
     try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
       dataOutputStream.writeByte((byte) TEST_LARGE_CONTENT.ordinal());
       dataOutputStream.writeLong(getCurrLogIndex());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
index 7ecdf5f..1f3882b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
@@ -48,7 +48,7 @@ public class PhysicalPlanLog extends Log {
 
   @Override
   public ByteBuffer serialize() {
-    PublicBAOS byteArrayOutputStream = new PublicBAOS(DEFAULT_BUFFER_SIZE);
+    PublicBAOS byteArrayOutputStream = new PublicBAOS(getDefaultBufferSize());
     try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
       dataOutputStream.writeByte((byte) PHYSICAL_PLAN.ordinal());
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
index bccb16f..8e885cf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
@@ -108,11 +108,7 @@ public class HeartbeatHandler implements AsyncMethodCallback<HeartBeatResponse>
     if (!localMember.getLogManager().isLogUpToDate(lastLogTerm, lastLogIdx)
         || !localMember.getLogManager().matchTerm(lastLogTerm, lastLogIdx)) {
       // the follower is not up-to-date
-      if (lastLogIdx == -1) {
-        // maybe the follower has restarted, so we need to find its match index again, because
-        // some logs may be lost due to restart
-        peer.setMatchIndex(-1);
-      }
+      peer.setMatchIndex(Math.max(peer.getMatchIndex(), resp.getCommitIndex()));
 
       // only start a catch up when the follower's lastLogIndex remains stall and unchanged for 3
       // heartbeats
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 942379f..287330c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -445,6 +445,7 @@ public abstract class RaftMember implements RaftMemberMBean {
         // tell the leader the local log progress so it may decide whether to perform a catch up
         response.setLastLogIndex(logManager.getLastLogIndex());
         response.setLastLogTerm(logManager.getLastLogTerm());
+        response.setCommitIndex(logManager.getCommitLogIndex());
 
         if (logger.isDebugEnabled()) {
           logger.debug(
@@ -1117,8 +1118,9 @@ public abstract class RaftMember implements RaftMemberMBean {
     }
     // if a single log exceeds the threshold
     // we need to return error code to the client as in server mode
-    if (log.serialize().capacity() + Integer.BYTES
-        >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
+    if (config.isEnableRaftLogPersistence()
+        && (log.serialize().capacity() + Integer.BYTES
+            >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize())) {
       logger.error(
           "Log cannot fit into buffer, please increase raft_log_buffer_size;"
               + "or reduce the size of requests you send.");
@@ -1166,8 +1168,9 @@ public abstract class RaftMember implements RaftMemberMBean {
     }
 
     // just like processPlanLocally,we need to check the size of log
-    if (log.serialize().capacity() + Integer.BYTES
-        >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize()) {
+    if (config.isEnableRaftLogPersistence()
+        && (log.serialize().capacity() + Integer.BYTES
+            >= ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize())) {
       logger.error(
           "Log cannot fit into buffer, please increase raft_log_buffer_size;"
               + "or reduce the size of requests you send.");
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index c794211..5a55f62 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -289,7 +289,6 @@ public class Timer {
     int level;
     Statistic parent;
     List<Statistic> children = new ArrayList<>();
-    long warningThreshold = 30 * 1000 * 1000 * 1000L;
 
     Statistic(String className, String blockName, double scale, boolean valid, Statistic parent) {
       this.className = className;
@@ -329,14 +328,6 @@ public class Timer {
       if (ENABLE_INSTRUMENTING && startTime != Long.MIN_VALUE) {
         long consumed = System.nanoTime() - startTime;
         add(consumed);
-        if (consumed >= warningThreshold && logger.isWarnEnabled()) {
-          logger.warn(
-              "Operation {}.{}.{} costs more than warning threshold: {}",
-              className,
-              blockName,
-              name(),
-              consumed);
-        }
       }
     }
 
diff --git a/thrift-cluster/src/main/thrift/cluster.thrift b/thrift-cluster/src/main/thrift/cluster.thrift
index 64914ba..df93e31 100644
--- a/thrift-cluster/src/main/thrift/cluster.thrift
+++ b/thrift-cluster/src/main/thrift/cluster.thrift
@@ -56,6 +56,7 @@ struct HeartBeatResponse {
   // because a data server may play many data groups members, this is used to identify which
   // member should process the request or response. Only used in data group communication.
   7: optional Node header
+  8: optional long commitIndex
 }
 
 struct RequestCommitIndexResponse {