You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/12/03 11:50:06 UTC
[iotdb] 01/01: add allowWeakWriteOrder fix timer when not using
dispatcher
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch cluster_extract_application
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9940bfe5e76ad310a3b662781068fe39fccb53f0
Author: jt <jt...@163.com>
AuthorDate: Thu Dec 3 19:48:46 2020 +0800
add allowWeakWriteOrder
fix timer when not using dispatcher
---
.../resources/conf/iotdb-cluster.properties | 8 +++-
.../apache/iotdb/cluster/config/ClusterConfig.java | 16 +++++++
.../iotdb/cluster/config/ClusterDescriptor.java | 4 ++
.../java/org/apache/iotdb/cluster/log/Log.java | 22 ++++++++++
.../apache/iotdb/cluster/log/LogDispatcher.java | 4 +-
.../cluster/log/logtypes/PhysicalPlanLog.java | 8 ++++
.../iotdb/cluster/log/manage/RaftLogManager.java | 28 +++++++++++-
.../org/apache/iotdb/cluster/server/Timer.java | 6 ++-
.../cluster/server/member/DataGroupMember.java | 1 +
.../cluster/server/member/MetaGroupMember.java | 1 +
.../iotdb/cluster/server/member/RaftMember.java | 50 ++++++++++++++++------
.../org/apache/iotdb/db/metadata/MManager.java | 1 +
12 files changed, 130 insertions(+), 19 deletions(-)
diff --git a/cluster/src/assembly/resources/conf/iotdb-cluster.properties b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
index 640cfba..d7dfb4b 100644
--- a/cluster/src/assembly/resources/conf/iotdb-cluster.properties
+++ b/cluster/src/assembly/resources/conf/iotdb-cluster.properties
@@ -148,4 +148,10 @@ enable_use_persist_log_on_disk_to_catch_up=true
# The number of logs read on the disk at one time, which is mainly used to control the memory usage.
# This value multiplied by the log size is about the amount of memory used to read logs from the disk at one time.
-max_number_of_logs_per_fetch_on_disk=1000
\ No newline at end of file
+max_number_of_logs_per_fetch_on_disk=1000
+
+# allowWeakWriteOrdering means that if there are concurrent modifications of the
+# same data, they may be executed in different orders on different nodes, but the operation order
+# of the same client is guaranteed. This increases throughput at the expense of weaker
+# consistency, so be cautious when you want to enable it.
+# allow_weak_write_ordering=true
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 4d5d05b..e690dc0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -162,6 +162,22 @@ public class ClusterConfig {
*/
private boolean waitForSlowNode = true;
+ /**
+ * allowWeakWriteOrdering means that if there are concurrent modifications of the
+ * same data, they may be executed in different orders on different nodes, but the operation order
+ * of the same client is guaranteed. This increase throughput at the expense of weaker
+ * consistency.
+ */
+ private boolean allowWeakWriteOrdering = true;
+
+ public boolean isAllowWeakWriteOrdering() {
+ return allowWeakWriteOrdering;
+ }
+
+ public void setAllowWeakWriteOrdering(boolean allowWeakWriteOrdering) {
+ this.allowWeakWriteOrdering = allowWeakWriteOrdering;
+ }
+
public int getSelectorNumOfClientPool() {
return selectorNumOfClientPool;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index 3c9e8ec..6ce5f15 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -292,6 +292,10 @@ public class ClusterDescriptor {
Boolean.parseBoolean(properties.getProperty("enable_use_persist_log_on_disk_to_catch_up",
String.valueOf(config.isEnableUsePersistLogOnDiskToCatchUp()))));
+ config.setAllowWeakWriteOrdering(
+ Boolean.parseBoolean(properties.getProperty("allow_weak_write_ordering",
+ String.valueOf(config.isAllowWeakWriteOrdering()))));
+
String consistencyLevel = properties.getProperty("consistency_level");
if (consistencyLevel != null) {
config.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
index 0c236b2..7df50f1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
@@ -44,6 +44,10 @@ public abstract class Log implements Comparable<Log> {
private long createTime;
private long enqueueTime;
+ // this log is on the leader side, so we may apply it outside the log manager (apply it in the
+ // client thread) to increase parallelism
+ private boolean onLeaderSide;
+
public abstract ByteBuffer serialize();
public abstract void deserialize(ByteBuffer buffer);
@@ -127,4 +131,22 @@ public abstract class Log implements Comparable<Log> {
public void setEnqueueTime(long enqueueTime) {
this.enqueueTime = enqueueTime;
}
+
+ public boolean isOnLeaderSide() {
+ return onLeaderSide;
+ }
+
+ public void setOnLeaderSide(boolean onLeaderSide) {
+ this.onLeaderSide = onLeaderSide;
+ }
+
+ /**
+ *
+ * @return true if the log cannot be applied until all previous logs are applied, false
+ * otherwise. Only insertion logs can be applied before previous logs when allowWeakWriteOrder
+ * is true, other logs must wait or there may be inconsistency.
+ */
+ public boolean isBlockedLog() {
+ return true;
+ }
}
\ No newline at end of file
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 751d3e8..d15f794 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -278,10 +278,10 @@ public class LogDispatcher {
return;
}
AsyncMethodCallback<Long> handler = new AppendEntriesHandler(currBatch);
- startTime = Timer.Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
+ startTime = Timer.Statistic.RAFT_SENDER_SEND_LOG_SYNC.getOperationStartTime();
try {
long result = client.appendEntries(request);
- Timer.Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
+ Timer.Statistic.RAFT_SENDER_SEND_LOG_SYNC.calOperationCostTimeFromStart(startTime);
if (result != -1 && logger.isInfoEnabled()) {
logger.info("{}: Append {} logs to {}, resp: {}", member.getName(), logList.size(),
receiver, result);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
index e0927c0..8b7c668 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/PhysicalPlanLog.java
@@ -26,9 +26,11 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -109,4 +111,10 @@ public class PhysicalPlanLog extends Log {
public int hashCode() {
return Objects.hash(super.hashCode(), plan);
}
+
+ @Override
+ public boolean isBlockedLog() {
+ // only insertions can be unblocked from previous logs if allowWeakWriteOrdering is true
+ return ClusterDescriptor.getInstance().getConfig().isAllowWeakWriteOrdering() && !(plan instanceof InsertPlan);
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index bb8b231..24a233d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -637,8 +637,17 @@ public abstract class RaftLogManager {
blockedUnappliedLogList.add(entry);
continue;
}
+
+ waitForBlockedLog(entry);
+
try {
- logApplier.apply(entry);
+ // if the order of some logs is not required, we may apply them outside logManager to
+ // reduce lock contention
+ if (entry.isBlockedLog() ||
+ !ClusterDescriptor.getInstance().getConfig().isAllowWeakWriteOrdering() ||
+ !entry.isOnLeaderSide()) {
+ logApplier.apply(entry);
+ }
} catch (Exception e) {
entry.setException(e);
}
@@ -646,6 +655,23 @@ public abstract class RaftLogManager {
}
/**
+ * Some logs (like deletions) must wait until all previous logs are applied before they
+ * themselves can be applied.
+ * @param entry
+ */
+ private void waitForBlockedLog(Log entry) {
+ if (entry.isBlockedLog()) {
+ while (maxHaveAppliedCommitIndex < entry.getCurrLogIndex() - 1) {
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+
+ /**
* Check whether the parameters passed in satisfy the following properties. firstIndex <= low <=
* high.
*
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java
index 4d65241..9ca6413 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java
@@ -94,8 +94,10 @@ public class Timer {
RAFT_MEMBER_SENDER, "send log async", TIME_SCALE,
ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
- RAFT_SENDER_SEND_LOG(
- RAFT_MEMBER_SENDER, "send log", TIME_SCALE, true, RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
+ RAFT_SENDER_SEND_LOG_SYNC(
+ RAFT_MEMBER_SENDER, "send log sync", TIME_SCALE,
+ !ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
+ RAFT_SENDER_SEND_LOG_TO_FOLLOWERS),
RAFT_SENDER_VOTE_COUNTER(
RAFT_MEMBER_SENDER, "wait for votes", TIME_SCALE, true,
RaftMember.USE_LOG_DISPATCHER ? DATA_GROUP_MEMBER_LOCAL_EXECUTION
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index de200cf..e94691b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -169,6 +169,7 @@ public class DataGroupMember extends RaftMember {
setQueryManager(new ClusterQueryManager());
slotManager = new SlotManager(ClusterConstant.SLOT_NUM, getMemberDir());
LogApplier applier = new DataLogApplier(metaGroupMember, this);
+ directApplier = applier;
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncApplier()) {
applier = new AsyncDataLogApplier(applier, name);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 7cccccd..fe79179 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -280,6 +280,7 @@ public class MetaGroupMember extends RaftMember {
// committed logs are applied to the state machine (the IoTDB instance) through the applier
LogApplier metaLogApplier = new MetaLogApplier(this);
+ directApplier = metaLogApplier;
logManager = new MetaSingleSnapshotLogManager(metaLogApplier, this);
term.set(logManager.getHardState().getCurrentTerm());
voteFor = logManager.getHardState().getVoteFor();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 142af7b..1788c00 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -57,6 +57,7 @@ import org.apache.iotdb.cluster.log.CommitLogCallback;
import org.apache.iotdb.cluster.log.CommitLogTask;
import org.apache.iotdb.cluster.log.HardState;
import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.log.LogApplier;
import org.apache.iotdb.cluster.log.LogDispatcher;
import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
import org.apache.iotdb.cluster.log.LogParser;
@@ -245,6 +246,8 @@ public abstract class RaftMember {
*/
private LogDispatcher logDispatcher;
+ protected LogApplier directApplier;
+
protected RaftMember() {
}
@@ -893,6 +896,7 @@ public abstract class RaftMember {
log.setCurrLogTerm(getTerm().get());
log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
+ log.setOnLeaderSide(true);
log.setPlan(plan);
plan.setIndex(log.getCurrLogIndex());
logManager.append(log);
@@ -924,6 +928,7 @@ public abstract class RaftMember {
Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2
.calOperationCostTimeFromStart(startTime);
+ log.setOnLeaderSide(true);
log.setCurrLogTerm(getTerm().get());
log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
log.setPlan(plan);
@@ -951,7 +956,7 @@ public abstract class RaftMember {
.calOperationCostTimeFromStart(sendLogRequest.getLog().getCreateTime());
switch (appendLogResult) {
case OK:
- logger.debug("{}: log {} is accepted", name, log);
+ logger.debug("{}: log {} is accepted.", name, log);
startTime = Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
commitLog(log);
Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
@@ -1357,7 +1362,6 @@ public abstract class RaftMember {
return AppendLogResult.OK;
}
- @SuppressWarnings("java:S2445")
void commitLog(Log log) throws LogExecutionException {
long startTime = Statistic.RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT
.getOperationStartTime();
@@ -1369,21 +1373,37 @@ public abstract class RaftMember {
logManager.commitTo(log.getCurrLogIndex());
}
Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.calOperationCostTimeFromStart(startTime);
- // when using async applier, the log here may not be applied. To return the execution
- // result, we must wait until the log is applied.
- startTime = Statistic.RAFT_SENDER_COMMIT_WAIT_LOG_APPLY.getOperationStartTime();
- synchronized (log) {
- while (!log.isApplied()) {
- // wait until the log is applied
- try {
- log.wait(5);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new LogExecutionException(e);
+ waitLogApplied(log);
+
+ if (log.getException() != null) {
+ throw new LogExecutionException(log.getException());
+ }
+ }
+
+ @SuppressWarnings("java:S2445")
+ private void waitLogApplied(Log log) throws LogExecutionException {
+ long startTime = Statistic.RAFT_SENDER_COMMIT_WAIT_LOG_APPLY.getOperationStartTime();
+ if (ClusterDescriptor.getInstance().getConfig().isAllowWeakWriteOrdering() && !log.isBlockedLog()) {
+ long operationStartTime = Statistic.RAFT_SENDER_DATA_LOG_APPLY.getOperationStartTime();
+ directApplier.apply(log);
+ Statistic.RAFT_SENDER_DATA_LOG_APPLY.calOperationCostTimeFromStart(operationStartTime);
+ } else {
+ // when using async applier, the log here may not be applied. To return the execution
+ // result, we must wait until the log is applied.
+ synchronized (log) {
+ while (!log.isApplied()) {
+ // wait until the log is applied
+ try {
+ log.wait(1);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new LogExecutionException(e);
+ }
}
}
}
Statistic.RAFT_SENDER_COMMIT_WAIT_LOG_APPLY.calOperationCostTimeFromStart(startTime);
+
if (log.getException() != null) {
throw new LogExecutionException(log.getException());
}
@@ -1417,7 +1437,9 @@ public abstract class RaftMember {
AppendEntryRequest request = new AppendEntryRequest();
request.setTerm(term.get());
if (serializeNow) {
+ long operationStartTime = Statistic.RAFT_SENDER_SERIALIZE_LOG.getOperationStartTime();
request.setEntry(log.serialize());
+ Statistic.RAFT_SENDER_SERIALIZE_LOG.calOperationCostTimeFromStart(operationStartTime);
}
request.setLeader(getThisNode());
// don't need lock because even if it's larger than the commitIndex when appending this log to
@@ -1685,7 +1707,9 @@ public abstract class RaftMember {
node, leaderShipStale, newLeaderTerm, peer);
try {
logger.debug("{} sending a log to {}: {}", name, node, log);
+ long operationStartTime = Statistic.RAFT_SENDER_SEND_LOG_SYNC.getOperationStartTime();
long result = client.appendEntry(request);
+ Statistic.RAFT_SENDER_SEND_LOG_SYNC.calOperationCostTimeFromStart(operationStartTime);
handler.onComplete(result);
} catch (TException e) {
client.getInputProtocol().getTransport().close();
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 58ebd31..f51b2aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -29,6 +29,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;