You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/12/03 11:50:06 UTC

[iotdb] 01/01: add allowWeakWriteOrder fix timer when not using dispatcher

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cluster_extract_application
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9940bfe5e76ad310a3b662781068fe39fccb53f0
Author: jt <jt...@163.com>
AuthorDate: Thu Dec 3 19:48:46 2020 +0800

    add allowWeakWriteOrder
    fix timer when not using dispatcher
---
 .../resources/conf/iotdb-cluster.properties        |  8 +++-
 .../apache/iotdb/cluster/config/ClusterConfig.java | 16 +++++++
 .../iotdb/cluster/config/ClusterDescriptor.java    |  4 ++
 .../java/org/apache/iotdb/cluster/log/Log.java     | 22 ++++++++++
 .../apache/iotdb/cluster/log/LogDispatcher.java    |  4 +-
 .../cluster/log/logtypes/PhysicalPlanLog.java      |  8 ++++
 .../iotdb/cluster/log/manage/RaftLogManager.java   | 28 +++++++++++-
 .../org/apache/iotdb/cluster/server/Timer.java     |  6 ++-
 .../cluster/server/member/DataGroupMember.java     |  1 +
 .../cluster/server/member/MetaGroupMember.java     |  1 +
 .../iotdb/cluster/server/member/RaftMember.java    | 50 ++++++++++++++++------
 .../org/apache/iotdb/db/metadata/MManager.java     |  1 +
 12 files changed, 130 insertions(+), 19 deletions(-)

diff --git a/cluster/src/assembly/resources/conf/iotdb-cluster.properties b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
index 640cfba..d7dfb4b 100644
--- a/cluster/src/assembly/resources/conf/iotdb-cluster.properties
+++ b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
@@ -148,4 +148,10 @@ enable_use_persist_log_on_disk_to_catch_up=true
 
 # The number of logs read on the disk at one time, which is mainly used to control the memory usage.
 # This value multiplied by the log size is about the amount of memory used to read logs from the disk at one time.
-max_number_of_logs_per_fetch_on_disk=1000
\ No newline at end of file
+max_number_of_logs_per_fetch_on_disk=1000
+
+# allowWeakWriteOrdering means that if there are concurrent modifications of the
+# same data, they may be executed in different orders on different nodes, but the operation order
+# of the same client is guaranteed. This increases throughput at the expense of weaker
+# consistency, so be cautious when you want to enable it.
+# allow_weak_write_ordering=true
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 4d5d05b..e690dc0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -162,6 +162,22 @@ public class ClusterConfig {
    */
   private boolean waitForSlowNode = true;
 
+  /**
+   * allowWeakWriteOrdering means that if there are concurrent modifications of the
+   * same data, they may be executed in different orders on different nodes, but the operation order
+   * of the same client is guaranteed. This increase throughput at the expense of weaker
+   * consistency.
+   */
+  private boolean allowWeakWriteOrdering = true;
+
+  public boolean isAllowWeakWriteOrdering() {
+    return allowWeakWriteOrdering;
+  }
+
+  public void setAllowWeakWriteOrdering(boolean allowWeakWriteOrdering) {
+    this.allowWeakWriteOrdering = allowWeakWriteOrdering;
+  }
+
   public int getSelectorNumOfClientPool() {
     return selectorNumOfClientPool;
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index 3c9e8ec..6ce5f15 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -292,6 +292,10 @@ public class ClusterDescriptor {
         Boolean.parseBoolean(properties.getProperty("enable_use_persist_log_on_disk_to_catch_up",
             String.valueOf(config.isEnableUsePersistLogOnDiskToCatchUp()))));
 
+    config.setAllowWeakWriteOrdering(
+        Boolean.parseBoolean(properties.getProperty("allow_weak_write_ordering",
+            String.valueOf(config.isAllowWeakWriteOrdering()))));
+
     String consistencyLevel = properties.getProperty("consistency_level");
     if (consistencyLevel != null) {
       config.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
index 0c236b2..7df50f1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
@@ -44,6 +44,10 @@ public abstract class Log implements Comparable<Log> {
   private long createTime;
   private long enqueueTime;
 
+  // this log is on the leader side, so we may apply it outside the log manager (apply it in the
+  // client thread) to increase parallelism
+  private boolean onLeaderSide;
+
   public abstract ByteBuffer serialize();
 
   public abstract void deserialize(ByteBuffer buffer);
@@ -127,4 +131,22 @@ public abstract class Log implements Comparable<Log> {
   public void setEnqueueTime(long enqueueTime) {
     this.enqueueTime = enqueueTime;
   }
+
+  public boolean isOnLeaderSide() {
+    return onLeaderSide;
+  }
+
+  public void setOnLeaderSide(boolean onLeaderSide) {
+    this.onLeaderSide = onLeaderSide;
+  }
+
+  /**
+   *
+   * @return true if the log cannot be applied until all previous logs are applied, false
+   * otherwise. Only insertion logs can be applied before previous logs when allowWeakWriteOrder
+   * is true, other logs must wait or there may be inconsistency.
+   */
+  public boolean isBlockedLog() {
+    return true;
+  }
 }
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 751d3e8..d15f794 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -278,10 +278,10 @@ public class LogDispatcher {
         return;
       }
       AsyncMethodCallback<Long> handler = new AppendEntriesHandler(currBatch);
-      startTime = Timer.Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
+      startTime = Timer.Statistic.RAFT_SENDER_SEND_LOG_SYNC.getOperationStartTime();
       try {
         long result = client.appendEntries(request);
-        Timer.Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
+        Timer.Statistic.RAFT_SENDER_SEND_LOG_SYNC.calOperationCostTimeFromStart(startTime);
         if (result != -1 && logger.isInfoEnabled()) {
           logger.info("{}: Append {} logs to {}, resp: {}", member.getName(), logList.size(),
               receiver, result);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
index e0927c0..8b7c668 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
@@ -26,9 +26,11 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Objects;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -109,4 +111,10 @@ public class PhysicalPlanLog extends Log {
   public int hashCode() {
     return Objects.hash(super.hashCode(), plan);
   }
+
+  @Override
+  public boolean isBlockedLog() {
+    // only insertions can be unblocked from previous logs if allowWeakWriteOrdering is true
+    return ClusterDescriptor.getInstance().getConfig().isAllowWeakWriteOrdering() && !(plan instanceof InsertPlan);
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index bb8b231..24a233d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -637,8 +637,17 @@ public abstract class RaftLogManager {
         blockedUnappliedLogList.add(entry);
         continue;
       }
+
+      waitForBlockedLog(entry);
+
       try {
-        logApplier.apply(entry);
+        // if the order of some logs is not required, we may apply them outside logManager to
+        // reduce lock contention
+        if (entry.isBlockedLog() ||
+            !ClusterDescriptor.getInstance().getConfig().isAllowWeakWriteOrdering() ||
+            !entry.isOnLeaderSide()) {
+          logApplier.apply(entry);
+        }
       } catch (Exception e) {
         entry.setException(e);
       }
@@ -646,6 +655,23 @@ public abstract class RaftLogManager {
   }
 
   /**
+   * Some logs (like deletions) must wait until all previous logs are applied before they
+   * themselves can be applied.
+   * @param entry
+   */
+  private void waitForBlockedLog(Log entry) {
+    if (entry.isBlockedLog()) {
+      while (maxHaveAppliedCommitIndex < entry.getCurrLogIndex() - 1) {
+        try {
+          Thread.sleep(1);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+  }
+
+  /**
    * Check whether the parameters passed in satisfy the following properties. firstIndex <= low <=
    * high.
    *
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java
index 4d65241..9ca6413 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java
@@ -94,8 +94,10 @@ public class Timer {
         RAFT_MEMBER_SENDER, "send log async", TIME_SCALE,
         ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
         RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
-    RAFT_SENDER_SEND_LOG(
-        RAFT_MEMBER_SENDER, "send log", TIME_SCALE, true, RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
+    RAFT_SENDER_SEND_LOG_SYNC(
+        RAFT_MEMBER_SENDER, "send log sync", TIME_SCALE,
+        !ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
+        RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
     RAFT_SENDER_VOTE_COUNTER(
         RAFT_MEMBER_SENDER, "wait for votes", TIME_SCALE, true,
         RaftMember.USE_LOG_DISPATCHER ? DATA_GROUP_MEMBER_LOCAL_EXECUTION
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index de200cf..e94691b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -169,6 +169,7 @@ public class DataGroupMember extends RaftMember {
     setQueryManager(new ClusterQueryManager());
     slotManager = new SlotManager(ClusterConstant.SLOT_NUM, getMemberDir());
     LogApplier applier = new DataLogApplier(metaGroupMember, this);
+    directApplier = applier;
     if (ClusterDescriptor.getInstance().getConfig().isUseAsyncApplier()) {
       applier = new AsyncDataLogApplier(applier, name);
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 7cccccd..fe79179 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -280,6 +280,7 @@ public class MetaGroupMember extends RaftMember {
 
     // committed logs are applied to the state machine (the IoTDB instance) through the applier
     LogApplier metaLogApplier = new MetaLogApplier(this);
+    directApplier = metaLogApplier;
     logManager = new MetaSingleSnapshotLogManager(metaLogApplier, this);
     term.set(logManager.getHardState().getCurrentTerm());
     voteFor = logManager.getHardState().getVoteFor();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 142af7b..1788c00 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -57,6 +57,7 @@ import org.apache.iotdb.cluster.log.CommitLogCallback;
 import org.apache.iotdb.cluster.log.CommitLogTask;
 import org.apache.iotdb.cluster.log.HardState;
 import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.LogDispatcher;
 import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
 import org.apache.iotdb.cluster.log.LogParser;
@@ -245,6 +246,8 @@ public abstract class RaftMember {
    */
   private LogDispatcher logDispatcher;
 
+  protected LogApplier directApplier;
+
   protected RaftMember() {
   }
 
@@ -893,6 +896,7 @@ public abstract class RaftMember {
       log.setCurrLogTerm(getTerm().get());
       log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
 
+      log.setOnLeaderSide(true);
       log.setPlan(plan);
       plan.setIndex(log.getCurrLogIndex());
       logManager.append(log);
@@ -924,6 +928,7 @@ public abstract class RaftMember {
       Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2
           .calOperationCostTimeFromStart(startTime);
 
+      log.setOnLeaderSide(true);
       log.setCurrLogTerm(getTerm().get());
       log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
       log.setPlan(plan);
@@ -951,7 +956,7 @@ public abstract class RaftMember {
           .calOperationCostTimeFromStart(sendLogRequest.getLog().getCreateTime());
       switch (appendLogResult) {
         case OK:
-          logger.debug("{}: log {} is accepted", name, log);
+          logger.debug("{}: log {} is accepted.", name, log);
           startTime = Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
           commitLog(log);
           Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
@@ -1357,7 +1362,6 @@ public abstract class RaftMember {
     return AppendLogResult.OK;
   }
 
-  @SuppressWarnings("java:S2445")
   void commitLog(Log log) throws LogExecutionException {
     long startTime = Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT
         .getOperationStartTime();
@@ -1369,21 +1373,37 @@ public abstract class RaftMember {
       logManager.commitTo(log.getCurrLogIndex());
     }
     Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.calOperationCostTimeFromStart(startTime);
-    // when using async applier, the log here may not be applied. To return the execution
-    // result, we must wait until the log is applied.
-    startTime = Statistic.RAFT_SENDER_COMMIT_WAIT_LOG_APPLY.getOperationStartTime();
-    synchronized (log) {
-      while (!log.isApplied()) {
-        // wait until the log is applied
-        try {
-          log.wait(5);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new LogExecutionException(e);
+    waitLogApplied(log);
+
+    if (log.getException() != null) {
+      throw new LogExecutionException(log.getException());
+    }
+  }
+
+  @SuppressWarnings("java:S2445")
+  private void waitLogApplied(Log log) throws LogExecutionException {
+    long startTime = Statistic.RAFT_SENDER_COMMIT_WAIT_LOG_APPLY.getOperationStartTime();
+    if (ClusterDescriptor.getInstance().getConfig().isAllowWeakWriteOrdering() && !log.isBlockedLog()) {
+      long operationStartTime = Statistic.RAFT_SENDER_DATA_LOG_APPLY.getOperationStartTime();
+      directApplier.apply(log);
+      Statistic.RAFT_SENDER_DATA_LOG_APPLY.calOperationCostTimeFromStart(operationStartTime);
+    } else {
+      // when using async applier, the log here may not be applied. To return the execution
+      // result, we must wait until the log is applied.
+      synchronized (log) {
+        while (!log.isApplied()) {
+          // wait until the log is applied
+          try {
+            log.wait(1);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new LogExecutionException(e);
+          }
         }
       }
     }
     Statistic.RAFT_SENDER_COMMIT_WAIT_LOG_APPLY.calOperationCostTimeFromStart(startTime);
+
     if (log.getException() != null) {
       throw new LogExecutionException(log.getException());
     }
@@ -1417,7 +1437,9 @@ public abstract class RaftMember {
     AppendEntryRequest request = new AppendEntryRequest();
     request.setTerm(term.get());
     if (serializeNow) {
+      long operationStartTime = Statistic.RAFT_SENDER_SERIALIZE_LOG.getOperationStartTime();
       request.setEntry(log.serialize());
+      Statistic.RAFT_SENDER_SERIALIZE_LOG.calOperationCostTimeFromStart(operationStartTime);
     }
     request.setLeader(getThisNode());
     // don't need lock because even if it's larger than the commitIndex when appending this log to
@@ -1685,7 +1707,9 @@ public abstract class RaftMember {
           node, leaderShipStale, newLeaderTerm, peer);
       try {
         logger.debug("{} sending a log to {}: {}", name, node, log);
+        long operationStartTime = Statistic.RAFT_SENDER_SEND_LOG_SYNC.getOperationStartTime();
         long result = client.appendEntry(request);
+        Statistic.RAFT_SENDER_SEND_LOG_SYNC.calOperationCostTimeFromStart(operationStartTime);
         handler.onComplete(result);
       } catch (TException e) {
         client.getInputProtocol().getTransport().close();
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 58ebd31..f51b2aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -29,6 +29,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;