You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/08/27 01:40:03 UTC
[iotdb] branch expr updated: fix multiple issues
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/expr by this push:
new f00f6c6 fix multiple issues
f00f6c6 is described below
commit f00f6c65fc999f72d4cc519a88701fa59f736ad5
Author: jt <jt...@163.com>
AuthorDate: Fri Aug 27 09:39:28 2021 +0800
fix multiple issues
---
cluster/distribute.sh | 8 +-
cluster/src/assembly/resources/sbin/expr-bench.sh | 16 ++++
cluster/src/assembly/resources/sbin/expr-server.sh | 16 ++++
.../org/apache/iotdb/cluster/expr/ExprMember.java | 104 ++++++++++++---------
.../apache/iotdb/cluster/expr/VotingLogList.java | 20 ++--
.../org/apache/iotdb/cluster/log/VotingLog.java | 15 +++
.../iotdb/cluster/log/catchup/CatchUpTask.java | 11 +--
.../handlers/caller/AppendNodeEntryHandler.java | 74 +++++++--------
.../server/handlers/caller/HeartbeatHandler.java | 2 +-
.../server/handlers/caller/LogCatchUpHandler.java | 4 +-
.../cluster/server/member/DataGroupMember.java | 4 +-
.../cluster/server/member/MetaGroupMember.java | 7 +-
.../iotdb/cluster/server/member/RaftMember.java | 37 ++++++--
.../apache/iotdb/cluster/server/monitor/Timer.java | 3 +-
.../apache/iotdb/db/qp/physical/sys/ExprPlan.java | 9 ++
15 files changed, 211 insertions(+), 119 deletions(-)
diff --git a/cluster/distribute.sh b/cluster/distribute.sh
index 7981466..8c70e27 100644
--- a/cluster/distribute.sh
+++ b/cluster/distribute.sh
@@ -1,7 +1,7 @@
-src_lib_path=/e/codestore/incubator-iotdb2/cluster/target/iotdb-cluster-0.13.0-SNAPSHOT/lib/iotdb*
+src_lib_path=/e/codestore/incubator-iotdb2/cluster/target/iotdb-cluster-0.13.0-SNAPSHOT/lib/*
-ips=(fit35 fit36 fit38 fit39)
-target_lib_path=/data/iotdb_expr/lib/
+ips=(fit36 fit38 fit39)
+target_lib_path=/data/iotdb_expr/lib
for ip in ${ips[*]}
do
@@ -10,7 +10,7 @@ for ip in ${ips[*]}
done
ips=(fit31 fit33 fit34)
-target_lib_path=/disk/iotdb_expr/lib/
+target_lib_path=/disk/iotdb_expr/lib
for ip in ${ips[*]}
do
diff --git a/cluster/src/assembly/resources/sbin/expr-bench.sh b/cluster/src/assembly/resources/sbin/expr-bench.sh
index d4fa092..dc2bc46 100644
--- a/cluster/src/assembly/resources/sbin/expr-bench.sh
+++ b/cluster/src/assembly/resources/sbin/expr-bench.sh
@@ -33,6 +33,22 @@ if [ "$#" -ge "1" -a "$1" == "printgc" ]; then
shift
fi
+if [ -f "$IOTDB_CONF/iotdb-env.sh" ]; then
+ if [ $enable_printgc == "true" ]; then
+ . "$IOTDB_CONF/iotdb-env.sh" "printgc"
+ else
+ . "$IOTDB_CONF/iotdb-env.sh"
+ fi
+elif [ -f "${IOTDB_HOME}/conf/iotdb-env.sh" ]; then
+ if [ $enable_printgc == "true" ]; then
+ . "${IOTDB_HOME}/conf/iotdb-env.sh" "printgc"
+ else
+ . "${IOTDB_HOME}/conf/iotdb-env.sh"
+ fi
+else
+ echo "can't find $IOTDB_CONF/iotdb-env.sh"
+fi
+
if [ -n "$JAVA_HOME" ]; then
for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do
if [ -x "$java" ]; then
diff --git a/cluster/src/assembly/resources/sbin/expr-server.sh b/cluster/src/assembly/resources/sbin/expr-server.sh
index f323d0b..12d9dad 100644
--- a/cluster/src/assembly/resources/sbin/expr-server.sh
+++ b/cluster/src/assembly/resources/sbin/expr-server.sh
@@ -33,6 +33,22 @@ if [ "$#" -ge "1" -a "$1" == "printgc" ]; then
shift
fi
+if [ -f "$IOTDB_CONF/iotdb-env.sh" ]; then
+ if [ $enable_printgc == "true" ]; then
+ . "$IOTDB_CONF/iotdb-env.sh" "printgc"
+ else
+ . "$IOTDB_CONF/iotdb-env.sh"
+ fi
+elif [ -f "${IOTDB_HOME}/conf/iotdb-env.sh" ]; then
+ if [ $enable_printgc == "true" ]; then
+ . "${IOTDB_HOME}/conf/iotdb-env.sh" "printgc"
+ else
+ . "${IOTDB_HOME}/conf/iotdb-env.sh"
+ fi
+else
+ echo "can't find $IOTDB_CONF/iotdb-env.sh"
+fi
+
if [ -n "$JAVA_HOME" ]; then
for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do
if [ -x "$java" ]; then
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
index 3c6189b..0a061db 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
@@ -20,9 +20,12 @@
package org.apache.iotdb.cluster.expr;
import java.nio.ByteBuffer;
-import java.sql.Time;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.apache.iotdb.cluster.coordinator.Coordinator;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryResult;
@@ -42,39 +45,35 @@ import org.apache.iotdb.db.qp.physical.sys.ExprPlan;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocolFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ExprMember extends MetaGroupMember {
+ private static final Logger logger = LoggerFactory.getLogger(ExprMember.class);
+ private static final ExecutorService bypassPool = Executors.newCachedThreadPool();
public static boolean bypassRaft = false;
public static boolean useSlidingWindow = false;
- private VotingLogList votingLogList;
-
private int windowSize = 10000;
private Log[] logWindow = new Log[windowSize];
- private long[] prevIndices = new long[windowSize];
+ private long firstPosPrevIndex = 0;
private long[] prevTerms = new long[windowSize];
-
public ExprMember() {
}
public ExprMember(Node thisNode, List<Node> allNodes) {
this.thisNode = thisNode;
this.allNodes = allNodes;
- this.votingLogList = new VotingLogList(allNodes.size()/2 + 1);
}
public ExprMember(TProtocolFactory factory,
Node thisNode, Coordinator coordinator)
throws QueryProcessException {
super(factory, thisNode, coordinator);
- }
-
- @Override
- public void setAllNodes(List<Node> allNodes) {
- super.setAllNodes(allNodes);
- this.votingLogList = new VotingLogList(allNodes.size()/2 + 1);
+ this.firstPosPrevIndex = logManager.getLastLogIndex();
+ this.prevTerms[0] = logManager.getLastLogTerm();
}
@Override
@@ -84,37 +83,50 @@ public class ExprMember extends MetaGroupMember {
@Override
public TSStatus executeNonQueryPlan(PhysicalPlan plan) {
- if (bypassRaft) {
- if (plan instanceof ExprPlan && !((ExprPlan) plan).isNeedForward()) {
- return StatusUtils.OK;
- } else if (plan instanceof ExprPlan) {
- ((ExprPlan) plan).setNeedForward(false);
- }
+ try {
+ if (bypassRaft) {
+ int bufferSize = 4096;
+ if (plan instanceof ExprPlan && !((ExprPlan) plan).isNeedForward()) {
+ return StatusUtils.OK;
+ } else if (plan instanceof ExprPlan) {
+ ((ExprPlan) plan).setNeedForward(false);
+ bufferSize += ((ExprPlan) plan).getWorkload().length;
+ }
- ExecutNonQueryReq req = new ExecutNonQueryReq();
- ByteBuffer byteBuffer = ByteBuffer.allocate(128 * 1024);
- plan.serialize(byteBuffer);
- byteBuffer.flip();
- req.setPlanBytes(byteBuffer);
-
- for (Node node : getAllNodes()) {
- if (!ClusterUtils.isNodeEquals(node, thisNode)) {
- Client syncClient = getSyncClient(node);
- try {
- long operationStartTime = Statistic.RAFT_SENDER_SEND_LOG
- .getOperationStartTime();
- syncClient.executeNonQueryPlan(req);
- Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(operationStartTime);
- } catch (TException e) {
- ClientUtils.putBackSyncClient(syncClient);
- return StatusUtils.getStatus(StatusUtils.INTERNAL_ERROR, e.getMessage());
+ ExecutNonQueryReq req = new ExecutNonQueryReq();
+ ByteBuffer byteBuffer = ByteBuffer.allocate(bufferSize);
+ plan.serialize(byteBuffer);
+ byteBuffer.flip();
+ req.setPlanBytes(byteBuffer);
+ List<Future> futures = new ArrayList<>();
+ for (Node node : getAllNodes()) {
+ if (!ClusterUtils.isNodeEquals(node, thisNode)) {
+ futures.add(bypassPool.submit(() -> {
+ Client syncClient = getSyncClient(node);
+ try {
+ long operationStartTime = Statistic.RAFT_SENDER_SEND_LOG
+ .getOperationStartTime();
+ syncClient.executeNonQueryPlan(req);
+ Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(operationStartTime);
+ } catch (TException e) {
+ ClientUtils.putBackSyncClient(syncClient);
+ return StatusUtils.getStatus(StatusUtils.INTERNAL_ERROR, e.getMessage());
+ }
+ ClientUtils.putBackSyncClient(syncClient);
+ return null;
+ }));
}
- ClientUtils.putBackSyncClient(syncClient);
}
+ for (Future future : futures) {
+ future.get();
+ }
+ return StatusUtils.OK;
}
- return StatusUtils.OK;
+ return processNonPartitionedMetaPlan(plan);
+ } catch (Exception e) {
+ logger.error("Exception in processing plan", e);
+ return StatusUtils.INTERNAL_ERROR.deepCopy().setMessage(e.getMessage());
}
- return processNonPartitionedMetaPlan(plan);
}
/**
@@ -130,11 +142,10 @@ public class ExprMember extends MetaGroupMember {
private void checkLogPrev(int pos) {
// check the previous entry
- long prevLogIndex = prevIndices[pos];
long prevLogTerm = prevTerms[pos];
if (pos > 0) {
Log prev = logWindow[pos - 1];
- if (prev != null && (prev.getCurrLogIndex() != prevLogIndex || prev.getCurrLogTerm() != prevLogTerm)) {
+ if (prev != null && prev.getCurrLogTerm() != prevLogTerm) {
logWindow[pos - 1] = null;
}
}
@@ -145,9 +156,8 @@ public class ExprMember extends MetaGroupMember {
Log log = logWindow[pos];
boolean nextMismatch = false;
if (pos < windowSize - 1) {
- long nextPrevIndex = prevIndices[pos + 1];
long nextPrevTerm = prevTerms[pos + 1];
- if (!(nextPrevIndex != log.getCurrLogIndex() || nextPrevTerm != log.getCurrLogTerm())) {
+ if (nextPrevTerm != log.getCurrLogTerm()) {
nextMismatch = true;
}
}
@@ -170,7 +180,7 @@ public class ExprMember extends MetaGroupMember {
* @return
*/
private long flushWindow(AppendEntryResult result, long leaderCommit) {
- long windowPrevLogIndex = prevIndices[0];
+ long windowPrevLogIndex = firstPosPrevIndex;
long windowPrevLogTerm = prevTerms[0];
int flushPos = 0;
@@ -179,18 +189,21 @@ public class ExprMember extends MetaGroupMember {
break;
}
}
+
// flush [0, flushPos)
List<Log> logs = Arrays.asList(logWindow).subList(0, flushPos);
long success = logManager.maybeAppend(windowPrevLogIndex, windowPrevLogTerm, leaderCommit,
logs);
if (success != -1) {
System.arraycopy(logWindow, flushPos, logWindow, 0, windowSize - flushPos);
+ System.arraycopy(prevTerms, flushPos, prevTerms, 0, windowSize - flushPos);
for (int i = 1; i <= flushPos; i++) {
logWindow[windowSize - i] = null;
}
}
+ firstPosPrevIndex = logManager.getLastLogIndex();
result.status = Response.RESPONSE_STRONG_ACCEPT;
- result.setLastLogIndex(logManager.getLastLogIndex());
+ result.setLastLogIndex(firstPosPrevIndex);
result.setLastLogTerm(logManager.getLastLogTerm());
return success;
}
@@ -200,6 +213,7 @@ public class ExprMember extends MetaGroupMember {
if (!useSlidingWindow) {
return super.appendEntry(prevLogIndex, prevLogTerm, leaderCommit, log);
}
+
long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
long appendedPos = 0;
@@ -215,10 +229,8 @@ public class ExprMember extends MetaGroupMember {
} else if (windowPos < windowSize) {
// the new entry falls into the window
logWindow[windowPos] = log;
- prevIndices[windowPos] = prevLogIndex;
prevTerms[windowPos] = prevLogTerm;
checkLog(windowPos);
-
if (windowPos == 0) {
appendedPos = flushWindow(result, leaderCommit);
} else {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
index 695d37b..99f456a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/VotingLogList.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.cluster.log.VotingLog;
public class VotingLogList {
- private List<ExprVotingLog> logList = new ArrayList<>();
+ private List<VotingLog> logList = new ArrayList<>();
private volatile long currTerm = -1;
private int quorumSize;
@@ -39,7 +39,7 @@ public class VotingLogList {
*
* @param log
*/
- public synchronized void insert(ExprVotingLog log) {
+ public synchronized void insert(VotingLog log) {
if (log.getLog().getCurrLogTerm() != currTerm) {
logList.clear();
currTerm = log.getLog().getCurrLogTerm();
@@ -58,10 +58,10 @@ public class VotingLogList {
* @param acceptingNodeId
* @return the lastly removed entry if any.
*/
- public synchronized VotingLog onStronglyAccept(long index, long term, int acceptingNodeId) {
+ public synchronized void onStronglyAccept(long index, long term, int acceptingNodeId) {
int lastEntryIndexToCommit = -1;
for (int i = 0, logListSize = logList.size(); i < logListSize; i++) {
- ExprVotingLog votingLog = logList.get(i);
+ VotingLog votingLog = logList.get(i);
if (votingLog.getLog().getCurrLogIndex() <= index
&& votingLog.getLog().getCurrLogTerm() == term) {
votingLog.getStronglyAcceptedNodeIds().add(acceptingNodeId);
@@ -71,13 +71,15 @@ public class VotingLogList {
}
}
- VotingLog ret = null;
if (lastEntryIndexToCommit != -1) {
- ret = logList.get(lastEntryIndexToCommit);
- logList.subList(0, lastEntryIndexToCommit + 1).clear();
+ List<VotingLog> acceptedLogs = logList.subList(0, lastEntryIndexToCommit + 1);
+ for (VotingLog acceptedLog : acceptedLogs) {
+ synchronized (acceptedLog) {
+ acceptedLog.notifyAll();
+ }
+ }
+ acceptedLogs.clear();
}
-
- return ret;
}
public synchronized void clear() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
index da9f4af..dc2d9b0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/VotingLog.java
@@ -25,10 +25,12 @@ import java.util.Set;
public class VotingLog {
protected Log log;
protected Set<Integer> stronglyAcceptedNodeIds;
+ protected Set<Integer> weaklyAcceptedNodeIds;
public VotingLog(Log log, int groupSize) {
this.log = log;
stronglyAcceptedNodeIds = new HashSet<>(groupSize);
+ weaklyAcceptedNodeIds = new HashSet<>(groupSize);
}
public Log getLog() {
@@ -46,4 +48,17 @@ public class VotingLog {
public void setStronglyAcceptedNodeIds(Set<Integer> stronglyAcceptedNodeIds) {
this.stronglyAcceptedNodeIds = stronglyAcceptedNodeIds;
}
+
+ public Set<Integer> getWeaklyAcceptedNodeIds() {
+ return weaklyAcceptedNodeIds;
+ }
+
+ public void setWeaklyAcceptedNodeIds(Set<Integer> weaklyAcceptedNodeIds) {
+ this.weaklyAcceptedNodeIds = weaklyAcceptedNodeIds;
+ }
+
+ @Override
+ public String toString() {
+ return log.toString();
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
index 1472b12..2df2adc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
@@ -83,12 +83,11 @@ public class CatchUpTask implements Runnable {
long localFirstIndex = 0;
try {
// to avoid snapshot catch up when index is volatile
- synchronized (raftMember.getLogManager()) {
- localFirstIndex = raftMember.getLogManager().getFirstIndex();
- lo = Math.max(localFirstIndex, peer.getMatchIndex() + 1);
- hi = raftMember.getLogManager().getLastLogIndex() + 1;
- logs = raftMember.getLogManager().getEntries(lo, hi);
- }
+ localFirstIndex = raftMember.getLogManager().getFirstIndex();
+ lo = Math.max(localFirstIndex, peer.getMatchIndex() + 1);
+ hi = raftMember.getLogManager().getLastLogIndex() + 1;
+ logs = raftMember.getLogManager().getEntries(lo, hi);
+
// this may result from peer's match index being changed concurrently, making the peer
// actually catch up now
if (logger.isInfoEnabled()) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index bf87703..aa817d0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.cluster.server.handlers.caller;
import static org.apache.iotdb.cluster.server.Response.RESPONSE_STRONG_ACCEPT;
+import static org.apache.iotdb.cluster.server.Response.RESPONSE_WEAK_ACCEPT;
import java.net.ConnectException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -80,50 +81,43 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
// someone has rejected this log because the leadership is stale
return;
}
+
long resp = response.status;
- synchronized (log) {
- if (resp == RESPONSE_STRONG_ACCEPT) {
- log.getStronglyAcceptedNodeIds().add(receiver.nodeIdentifier);
- int remaining = quorumSize - log.getStronglyAcceptedNodeIds().size();
- logger.debug(
- "{}: Received an agreement from {} for {}, remaining votes to succeed: {}",
- member.getName(),
- receiver,
- log,
- remaining);
- if (remaining == 0) {
- logger.debug(
- "{}: Log [{}] {} is accepted by the quorum",
- member.getName(),
- log.getLog(),
- log);
- log.notifyAll();
- }
- peer.setMatchIndex(Math.max(log.getLog().getCurrLogIndex(), peer.getMatchIndex()));
- } else if (resp > 0) {
- // a response > 0 is the follower's term
- // the leader ship is stale, wait for the new leader's heartbeat
- long prevReceiverTerm = receiverTerm.get();
- logger.debug(
- "{}: Received a rejection from {} because term is stale: {}/{}",
- member.getName(),
- receiver,
- prevReceiverTerm,
- resp);
- if (resp > prevReceiverTerm) {
- receiverTerm.set(resp);
- }
- leaderShipStale.set(true);
+
+ if (resp == RESPONSE_STRONG_ACCEPT) {
+ member.getVotingLogList().onStronglyAccept(log.getLog().getCurrLogIndex(),
+ log.getLog().getCurrLogTerm(), receiver.nodeIdentifier);
+ peer.setMatchIndex(Math.max(log.getLog().getCurrLogIndex(), peer.getMatchIndex()));
+ } else if (resp > 0) {
+ // a response > 0 is the follower's term
+ // the leader ship is stale, wait for the new leader's heartbeat
+ long prevReceiverTerm = receiverTerm.get();
+ logger.debug(
+ "{}: Received a rejection from {} because term is stale: {}/{}",
+ member.getName(),
+ receiver,
+ prevReceiverTerm,
+ resp);
+ if (resp > prevReceiverTerm) {
+ receiverTerm.set(resp);
+ }
+ leaderShipStale.set(true);
+ synchronized (log) {
+ log.notifyAll();
+ }
+ } else if (resp == RESPONSE_WEAK_ACCEPT) {
+ synchronized (log) {
+ log.getWeaklyAcceptedNodeIds().add(receiver.nodeIdentifier);
log.notifyAll();
- } else {
- // e.g., Response.RESPONSE_LOG_MISMATCH
- logger.debug(
- "{}: The log {} is rejected by {} because: {}", member.getName(), log, receiver, resp);
- onFail();
}
- // rejected because the receiver's logs are stale or the receiver has no cluster info, just
- // wait for the heartbeat to handle
+ } else {
+ // e.g., Response.RESPONSE_LOG_MISMATCH
+ logger.debug(
+ "{}: The log {} is rejected by {} because: {}", member.getName(), log, receiver, resp);
+ onFail();
}
+ // rejected because the receiver's logs are stale or the receiver has no cluster info, just
+ // wait for the heartbeat to handle
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
index 6d95e06..de0b83f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
@@ -111,7 +111,7 @@ public class HeartbeatHandler implements AsyncMethodCallback<HeartBeatResponse>
if (lastLogIdx == peer.getLastHeartBeatIndex()) {
// the follower's lastLogIndex is unchanged, increase inconsistent counter
int inconsistentNum = peer.incInconsistentHeartbeatNum();
- if (inconsistentNum >= 10000) {
+ if (inconsistentNum >= 5) {
logger.info(
"{}: catching up node {}, index-term: {}-{}/{}-{}, peer match index {}",
memberName,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpHandler.java
index eeda328..62933a7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/LogCatchUpHandler.java
@@ -32,6 +32,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.iotdb.cluster.server.Response.RESPONSE_AGREE;
import static org.apache.iotdb.cluster.server.Response.RESPONSE_LOG_MISMATCH;
+import static org.apache.iotdb.cluster.server.Response.RESPONSE_STRONG_ACCEPT;
+import static org.apache.iotdb.cluster.server.Response.RESPONSE_WEAK_ACCEPT;
/**
* LogCatchUpHandler checks the result of appending a log in a catch-up task and decides to abort
@@ -51,7 +53,7 @@ public class LogCatchUpHandler implements AsyncMethodCallback<AppendEntryResult>
public void onComplete(AppendEntryResult response) {
logger.debug("{}: Received a catch-up result of {} from {}", memberName, log, follower);
long resp = response.status;
- if (resp == RESPONSE_AGREE) {
+ if (resp == RESPONSE_AGREE || resp == RESPONSE_STRONG_ACCEPT || resp == RESPONSE_WEAK_ACCEPT) {
synchronized (appendSucceed) {
appendSucceed.set(true);
appendSucceed.notifyAll();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 49acf81..484f5f6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -171,7 +171,7 @@ public class DataGroupMember extends RaftMember {
new AsyncClientPool(new SingleManagerFactory(factory)));
this.thisNode = thisNode;
this.metaGroupMember = metaGroupMember;
- allNodes = nodes;
+ setAllNodes(nodes);
setQueryManager(new ClusterQueryManager());
slotManager = new SlotManager(ClusterConstant.SLOT_NUM, getMemberDir());
LogApplier applier = new DataLogApplier(metaGroupMember, this);
@@ -792,7 +792,7 @@ public class DataGroupMember extends RaftMember {
synchronized (allNodes) {
if (allNodes.contains(removedNode)) {
// update the group if the deleted node was in it
- allNodes = metaGroupMember.getPartitionTable().getHeaderGroup(getHeader());
+ setAllNodes(metaGroupMember.getPartitionTable().getHeaderGroup(getHeader()));
initPeerMap();
if (removedNode.equals(leader.get())) {
// if the leader is removed, also start an election immediately
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 6bffeda..061a5d0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.cluster.exception.LogExecutionException;
import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
import org.apache.iotdb.cluster.exception.SnapshotInstallationException;
import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
+import org.apache.iotdb.cluster.expr.VotingLogList;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogApplier;
import org.apache.iotdb.cluster.log.applier.MetaLogApplier;
@@ -268,7 +269,7 @@ public class MetaGroupMember extends RaftMember {
new SyncClientPool(new SyncMetaClient.FactorySync(factory)),
new AsyncClientPool(new AsyncMetaHeartbeatClient.FactoryAsync(factory)),
new SyncClientPool(new SyncMetaHeartbeatClient.FactorySync(factory)));
- allNodes = new ArrayList<>();
+ setAllNodes(new ArrayList<>());
initPeerMap();
dataClientProvider = new DataClientProvider(factory);
@@ -685,7 +686,7 @@ public class MetaGroupMember extends RaftMember {
}
private void updateNodeList(Collection<Node> nodes) {
- allNodes = new ArrayList<>(nodes);
+ setAllNodes(new ArrayList<>(nodes));
initPeerMap();
logger.info("All nodes in the partition table: {}", allNodes);
initIdNodeMap();
@@ -714,7 +715,7 @@ public class MetaGroupMember extends RaftMember {
// leader through the first heartbeat. After the leader knows the node information of all
// nodes, it can replace the incomplete node information previously saved locally, and build
// partitionTable to send it to other followers.
- allNodes = new ArrayList<>(idNodeMap.values());
+ setAllNodes(new ArrayList<>(idNodeMap.values()));
if (partitionTable == null) {
partitionTable = new SlotPartitionTable(allNodes, thisNode);
logger.info("Partition table is set up");
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index f78e3d9..b2a8442 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -53,6 +53,7 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.CheckConsistencyException;
import org.apache.iotdb.cluster.exception.LogExecutionException;
import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
+import org.apache.iotdb.cluster.expr.VotingLogList;
import org.apache.iotdb.cluster.log.CommitLogCallback;
import org.apache.iotdb.cluster.log.CommitLogTask;
import org.apache.iotdb.cluster.log.HardState;
@@ -261,6 +262,8 @@ public abstract class RaftMember {
*/
protected Map<Pair<Long, Long>, AppendNodeEntryHandler> sentLogHandlers = new ConcurrentHashMap<>();
+ protected VotingLogList votingLogList;
+
protected RaftMember() {
}
@@ -772,6 +775,7 @@ public abstract class RaftMember {
public void setAllNodes(List<Node> allNodes) {
this.allNodes = allNodes;
+ this.votingLogList = new VotingLogList(allNodes.size()/2 + 1);
}
public Map<Node, Long> getLastCatchUpResponseTime() {
@@ -1132,6 +1136,8 @@ public abstract class RaftMember {
log.setCreateTime(System.nanoTime());
getLogDispatcher().offer(sendLogRequest);
Statistic.RAFT_SENDER_OFFER_LOG.calOperationCostTimeFromStart(startTime);
+
+ votingLogList.insert(sendLogRequest.getVotingLog());
}
try {
@@ -1144,6 +1150,10 @@ public abstract class RaftMember {
Timer.Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT.calOperationCostTimeFromStart(
sendLogRequest.getVotingLog().getLog().getCreateTime());
switch (appendLogResult) {
+ case WEAK_ACCEPT:
+ // TODO: change to weak
+ Statistic.RAFT_WEAK_ACCEPT.add(1);
+ return StatusUtils.OK;
case OK:
logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
startTime = Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
@@ -1541,9 +1551,10 @@ public abstract class RaftMember {
synchronized (log) {
long waitStart = System.currentTimeMillis();
long alreadyWait = 0;
- while (log.getStronglyAcceptedNodeIds().size() >= quorumSize
+ while (log.getStronglyAcceptedNodeIds().size() < quorumSize
&& alreadyWait < RaftServer.getWriteOperationTimeoutMS()
- && !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE)) {
+ && !log.getStronglyAcceptedNodeIds().contains(Integer.MAX_VALUE)
+ && log.getWeaklyAcceptedNodeIds().size() + log.getStronglyAcceptedNodeIds().size() < quorumSize) {
try {
log.wait(RaftServer.getWriteOperationTimeoutMS());
} catch (InterruptedException e) {
@@ -1566,10 +1577,16 @@ public abstract class RaftMember {
}
// cannot get enough agreements within a certain amount of time
- if (log.getStronglyAcceptedNodeIds().size() < quorumSize) {
+ if (log.getStronglyAcceptedNodeIds().size() < quorumSize
+ && (log.getStronglyAcceptedNodeIds().size() + log.getWeaklyAcceptedNodeIds().size()) < quorumSize) {
return AppendLogResult.TIME_OUT;
}
+ if (log.getStronglyAcceptedNodeIds().size() < quorumSize
+ && (log.getStronglyAcceptedNodeIds().size() + log.getWeaklyAcceptedNodeIds().size()) >= quorumSize) {
+ return AppendLogResult.WEAK_ACCEPT;
+ }
+
// voteCounter has counted down to zero
return AppendLogResult.OK;
}
@@ -1647,6 +1664,7 @@ public abstract class RaftMember {
// logManager, the follower can handle the larger commitIndex with no effect
request.setLeaderCommit(logManager.getCommitLogIndex());
request.setPrevLogIndex(log.getCurrLogIndex() - 1);
+ request.setIsFromLeader(true);
try {
request.setPrevLogTerm(logManager.getTerm(log.getCurrLogIndex() - 1));
} catch (Exception e) {
@@ -1913,7 +1931,7 @@ public abstract class RaftMember {
&& alreadyWait <= RaftServer.getWriteOperationTimeoutMS()) {
synchronized (peer) {
try {
- peer.wait(RaftServer.getWriteOperationTimeoutMS());
+ peer.wait(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Waiting for peer to catch up interrupted");
@@ -1939,12 +1957,14 @@ public abstract class RaftMember {
, quorumSize);
try {
logger.debug("{} sending a log to {}: {}", name, node, log);
+ long operationStartTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
AppendEntryResult result;
if (indirectReceivers == null || indirectReceivers.isEmpty()) {
result = client.appendEntry(request);
} else {
result = client.appendEntryIndirect(request, indirectReceivers);
}
+ Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(operationStartTime);
handler.onComplete(result);
} catch (TException e) {
@@ -2051,11 +2071,11 @@ public abstract class RaftMember {
protected long checkPrevLogIndex(long prevLogIndex) {
long lastLogIndex = logManager.getLastLogIndex();
long startTime = Timer.Statistic.RAFT_RECEIVER_WAIT_FOR_PREV_LOG.getOperationStartTime();
+ Timer.Statistic.RAFT_RECEIVER_INDEX_DIFF.add(prevLogIndex - lastLogIndex);
if (lastLogIndex < prevLogIndex && !waitForPrevLog(prevLogIndex)) {
// there are logs missing between the incoming log and the local last log, and such logs
// did not come within a timeout, report a mismatch to the sender and it shall fix this
// through catch-up
- Timer.Statistic.RAFT_RECEIVER_INDEX_DIFF.add(prevLogIndex - lastLogIndex);
return Response.RESPONSE_LOG_MISMATCH;
}
Timer.Statistic.RAFT_RECEIVER_WAIT_FOR_PREV_LOG.calOperationCostTimeFromStart(startTime);
@@ -2143,7 +2163,8 @@ public abstract class RaftMember {
enum AppendLogResult {
OK,
TIME_OUT,
- LEADERSHIP_STALE
+ LEADERSHIP_STALE,
+ WEAK_ACCEPT
}
/**
@@ -2167,4 +2188,8 @@ public abstract class RaftMember {
public void removeAppendLogHandler(Pair<Long, Long> indexTerm) {
sentLogHandlers.remove(indexTerm);
}
+
+ public VotingLogList getVotingLogList() {
+ return votingLogList;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index f4ac980..7ecca9e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -237,7 +237,8 @@ public class Timer {
"from create to end",
TIME_SCALE,
true,
- META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP);
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ RAFT_WEAK_ACCEPT(RAFT_MEMBER_SENDER, "weak accept", 1, true, ROOT);
String className;
String blockName;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ExprPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ExprPlan.java
index 2a1e16b..e3cb3a7 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ExprPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ExprPlan.java
@@ -81,4 +81,13 @@ public class ExprPlan extends PhysicalPlan {
public void setNeedForward(boolean needForward) {
this.needForward = needForward;
}
+
+ public byte[] getWorkload() {
+ return workload;
+ }
+
+ @Override
+ public String toString() {
+ return "ExprPlan";
+ }
}