You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/06/01 11:08:54 UTC
[iotdb] 01/02: temp save
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4aa5448b1eceb0e499266678d3677787e0d9a010
Author: jt <jt...@163.com>
AuthorDate: Tue May 25 09:40:24 2021 +0800
temp save
---
cluster/distribute.sh | 19 ++
cluster/src/assembly/resources/sbin/expr-bench.sh | 73 ++++++
cluster/src/assembly/resources/sbin/expr-server.sh | 73 ++++++
.../apache/iotdb/cluster/config/ClusterConfig.java | 10 +
.../org/apache/iotdb/cluster/expr/ExprBench.java | 103 ++++++++
.../ExprLogDispatcher.java} | 217 ++++-------------
.../org/apache/iotdb/cluster/expr/ExprMember.java | 151 ++++++++++++
.../org/apache/iotdb/cluster/expr/ExprServer.java | 90 +++++++
.../iotdb/cluster/log/IndirectLogDispatcher.java | 103 ++++++++
.../apache/iotdb/cluster/log/LogDispatcher.java | 35 ++-
.../iotdb/cluster/log/manage/RaftLogManager.java | 33 +++
.../log/manage/UnCommittedEntryManager.java | 12 +-
.../serializable/SyncLogDequeSerializer.java | 2 +-
.../iotdb/cluster/server/DataClusterServer.java | 8 +-
.../iotdb/cluster/server/MetaClusterServer.java | 24 +-
.../apache/iotdb/cluster/server/RaftServer.java | 14 +-
.../server/handlers/caller/HeartbeatHandler.java | 2 +-
.../handlers/forwarder/IndirectAppendHandler.java | 49 ++++
.../cluster/server/heartbeat/HeartbeatThread.java | 3 +-
.../server/heartbeat/MetaHeartbeatServer.java | 4 +-
.../server/heartbeat/MetaHeartbeatThread.java | 10 +-
.../cluster/server/member/MetaGroupMember.java | 26 +-
.../iotdb/cluster/server/member/RaftMember.java | 264 ++++++++++++++++-----
.../cluster/server/service/BaseAsyncService.java | 19 ++
.../cluster/server/service/BaseSyncService.java | 52 ++--
.../apache/iotdb/cluster/utils/ClientUtils.java | 2 +
.../apache/iotdb/cluster/utils/ClusterNode.java | 2 +-
.../apache/iotdb/cluster/utils/ClusterUtils.java | 12 +
pom.xml | 52 ++--
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 2 +
.../org/apache/iotdb/db/qp/logical/Operator.java | 1 +
.../apache/iotdb/db/qp/physical/PhysicalPlan.java | 7 +-
.../apache/iotdb/db/qp/physical/sys/ExprPlan.java | 84 +++++++
thrift-cluster/src/main/thrift/cluster.thrift | 21 ++
34 files changed, 1239 insertions(+), 340 deletions(-)
diff --git a/cluster/distribute.sh b/cluster/distribute.sh
new file mode 100644
index 0000000..98758da
--- /dev/null
+++ b/cluster/distribute.sh
@@ -0,0 +1,19 @@
+src_lib_path=/e/codestore/incubator-iotdb2/cluster/target/iotdb-cluster-0.13.0-SNAPSHOT/lib/iotdb*
+
+ips=(fit36 fit38 fit39)
+target_lib_path=/data/iotdb_expr/lib/
+
+for ip in ${ips[*]}
+ do
+ ssh fit@$ip "mkdir $target_lib_path"
+ scp -r $src_lib_path fit@$ip:$target_lib_path
+ done
+
+ips=(fit31 fit32 fit33 fit34)
+target_lib_path=/disk/iotdb_expr/lib/
+
+for ip in ${ips[*]}
+ do
+ ssh fit@$ip "mkdir $target_lib_path"
+ scp -r $src_lib_path fit@$ip:$target_lib_path
+ done
\ No newline at end of file
diff --git a/cluster/src/assembly/resources/sbin/expr-bench.sh b/cluster/src/assembly/resources/sbin/expr-bench.sh
new file mode 100644
index 0000000..d4fa092
--- /dev/null
+++ b/cluster/src/assembly/resources/sbin/expr-bench.sh
@@ -0,0 +1,73 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+echo ---------------------
+echo "Starting IoTDB (Cluster Mode)"
+echo ---------------------
+
+if [ -z "${IOTDB_HOME}" ]; then
+ export IOTDB_HOME="`dirname "$0"`/.."
+fi
+
+enable_printgc=false
+if [ "$#" -ge "1" -a "$1" == "printgc" ]; then
+ enable_printgc=true;
+ shift
+fi
+
+if [ -n "$JAVA_HOME" ]; then
+ for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do
+ if [ -x "$java" ]; then
+ JAVA="$java"
+ break
+ fi
+ done
+else
+ JAVA=java
+fi
+
+if [ -z $JAVA ] ; then
+ echo Unable to find java executable. Check JAVA_HOME and PATH environment variables. > /dev/stderr
+ exit 1;
+fi
+
+CLASSPATH=""
+for f in ${IOTDB_HOME}/lib/*.jar; do
+ CLASSPATH=${CLASSPATH}":"$f
+done
+classname=org.apache.iotdb.cluster.expr.ExprBench
+
+launch_service()
+{
+ iotdb_parms="-Dlogback.configurationFile=${IOTDB_CONF}/logback.xml"
+ iotdb_parms="$iotdb_parms -DIOTDB_HOME=${IOTDB_HOME}"
+ iotdb_parms="$iotdb_parms -DTSFILE_HOME=${IOTDB_HOME}"
+ iotdb_parms="$iotdb_parms -DIOTDB_CONF=${IOTDB_CONF}"
+ iotdb_parms="$iotdb_parms -DTSFILE_CONF=${IOTDB_CONF}"
+ iotdb_parms="$iotdb_parms -Dname=iotdb\.IoTDB"
+ exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS -cp "$CLASSPATH" $@
+ return $?
+}
+
+# Start up the service
+launch_service "$classname" $@
+
+exit $?
diff --git a/cluster/src/assembly/resources/sbin/expr-server.sh b/cluster/src/assembly/resources/sbin/expr-server.sh
new file mode 100644
index 0000000..f323d0b
--- /dev/null
+++ b/cluster/src/assembly/resources/sbin/expr-server.sh
@@ -0,0 +1,73 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+echo ---------------------
+echo "Starting IoTDB (Cluster Mode)"
+echo ---------------------
+
+if [ -z "${IOTDB_HOME}" ]; then
+ export IOTDB_HOME="`dirname "$0"`/.."
+fi
+
+enable_printgc=false
+if [ "$#" -ge "1" -a "$1" == "printgc" ]; then
+ enable_printgc=true;
+ shift
+fi
+
+if [ -n "$JAVA_HOME" ]; then
+ for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do
+ if [ -x "$java" ]; then
+ JAVA="$java"
+ break
+ fi
+ done
+else
+ JAVA=java
+fi
+
+if [ -z $JAVA ] ; then
+ echo Unable to find java executable. Check JAVA_HOME and PATH environment variables. > /dev/stderr
+ exit 1;
+fi
+
+CLASSPATH=""
+for f in ${IOTDB_HOME}/lib/*.jar; do
+ CLASSPATH=${CLASSPATH}":"$f
+done
+classname=org.apache.iotdb.cluster.expr.ExprServer
+
+launch_service()
+{
+ iotdb_parms="-Dlogback.configurationFile=${IOTDB_CONF}/logback.xml"
+ iotdb_parms="$iotdb_parms -DIOTDB_HOME=${IOTDB_HOME}"
+ iotdb_parms="$iotdb_parms -DTSFILE_HOME=${IOTDB_HOME}"
+ iotdb_parms="$iotdb_parms -DIOTDB_CONF=${IOTDB_CONF}"
+ iotdb_parms="$iotdb_parms -DTSFILE_CONF=${IOTDB_CONF}"
+ iotdb_parms="$iotdb_parms -Dname=iotdb\.IoTDB"
+ exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS -cp "$CLASSPATH" $@
+ return $?
+}
+
+# Start up the service
+launch_service "$classname" $@
+
+exit $?
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 7d3e42a..540201c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -168,6 +168,8 @@ public class ClusterConfig {
private boolean openServerRpcPort = false;
+ private boolean useIndirectBroadcasting = false;
+
/**
* create a clusterConfig class. The internalIP will be set according to the server's hostname. If
* there is something error for getting the ip of the hostname, then set the internalIp as
@@ -183,6 +185,14 @@ public class ClusterConfig {
seedNodeUrls = Arrays.asList(String.format("%s:%d", internalIp, internalMetaPort));
}
+ public boolean isUseIndirectBroadcasting() {
+ return useIndirectBroadcasting;
+ }
+
+ public void setUseIndirectBroadcasting(boolean useIndirectBroadcasting) {
+ this.useIndirectBroadcasting = useIndirectBroadcasting;
+ }
+
public int getSelectorNumOfClientPool() {
return selectorNumOfClientPool;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
new file mode 100644
index 0000000..b625f2c
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprBench.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.expr;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iotdb.cluster.client.sync.SyncClientFactory;
+import org.apache.iotdb.cluster.client.sync.SyncClientPool;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient.FactorySync;
+import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.db.qp.physical.sys.ExprPlan;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol.Factory;
+
+public class ExprBench {
+
+ private AtomicLong requestCounter = new AtomicLong();
+ private int threadNum = 64;
+ private int workloadSize = 64 * 1024;
+ private SyncClientPool clientPool;
+ private Node target;
+ private int maxRequestNum;
+
+ public ExprBench(Node target) {
+ this.target = target;
+ SyncClientFactory factory = new FactorySync(new Factory());
+ clientPool = new SyncClientPool(factory);
+ }
+
+ public void benchmark() {
+ long startTime = System.currentTimeMillis();
+ for (int i = 0; i < threadNum; i++) {
+ new Thread(
+ () -> {
+ Client client = clientPool.getClient(target);
+ ExecutNonQueryReq request = new ExecutNonQueryReq();
+ ExprPlan plan = new ExprPlan();
+ plan.setWorkload(new byte[workloadSize]);
+ plan.setNeedForward(true);
+ ByteBuffer byteBuffer = ByteBuffer.allocate(workloadSize + 4096);
+ plan.serialize(byteBuffer);
+ byteBuffer.flip();
+ request.setPlanBytes(byteBuffer);
+ long currRequsetNum = -1;
+ while (true) {
+
+ try {
+ client.executeNonQueryPlan(request);
+ currRequsetNum = requestCounter.incrementAndGet();
+ } catch (TException e) {
+ e.printStackTrace();
+ }
+
+ if (currRequsetNum % 1000 == 0) {
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ System.out.println(String.format("%d %d %f(%f)", elapsedTime,
+ currRequsetNum,
+ (currRequsetNum + 0.0) / elapsedTime,
+ currRequsetNum * workloadSize / (1024.0*1024.0) / elapsedTime));
+ }
+
+ if (currRequsetNum >= maxRequestNum) {
+ break;
+ }
+ }
+ })
+ .start();
+ }
+ }
+
+ public void setMaxRequestNum(int maxRequestNum) {
+ this.maxRequestNum = maxRequestNum;
+ }
+
+ public static void main(String[] args) {
+ Node target = new Node();
+ target.setInternalIp(args[0]);
+ target.setMetaPort(Integer.parseInt(args[1]));
+ ExprBench bench = new ExprBench(target);
+ bench.maxRequestNum = Integer.parseInt(args[2]);
+ bench.threadNum = Integer.parseInt(args[3]);
+ bench.benchmark();
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprLogDispatcher.java
similarity index 63%
copy from cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprLogDispatcher.java
index a600c4a..4b302a7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprLogDispatcher.java
@@ -17,9 +17,27 @@
* under the License.
*/
-package org.apache.iotdb.cluster.log;
+package org.apache.iotdb.cluster.expr;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iotdb.cluster.client.sync.SyncClientPool;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient.FactorySync;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
+import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -33,26 +51,13 @@ import org.apache.iotdb.cluster.utils.ClientUtils;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.utils.TestOnly;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
* A LogDispatcher serves a raft leader by queuing logs that the leader wants to send to its
* followers and send the logs in an ordered manner so that the followers will not wait for previous
@@ -60,27 +65,25 @@ import java.util.concurrent.atomic.AtomicLong;
* follower A, the actual reach order may be log3, log2, and log1. According to the protocol, log3
* and log2 must halt until log1 reaches, as a result, the total delay may increase significantly.
*/
-public class LogDispatcher {
+public class ExprLogDispatcher {
- private static final Logger logger = LoggerFactory.getLogger(LogDispatcher.class);
- private RaftMember member;
- private boolean useBatchInLogCatchUp =
- ClusterDescriptor.getInstance().getConfig().isUseBatchInLogCatchUp();
+ private static final Logger logger = LoggerFactory.getLogger(ExprLogDispatcher.class);
private List<BlockingQueue<SendLogRequest>> nodeLogQueues = new ArrayList<>();
private ExecutorService executorService;
private static ExecutorService serializationService =
Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors(),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DispatcherEncoder-%d").build());
+ private SyncClientPool clientPool;
+ private Node leader;
- public LogDispatcher(RaftMember member) {
- this.member = member;
+ public ExprLogDispatcher(List<Node> nodes, Node leader) {
executorService = Executors.newCachedThreadPool();
- for (Node node : member.getAllNodes()) {
- if (!node.equals(member.getThisNode())) {
- nodeLogQueues.add(createQueueAndBindingThread(node));
- }
+ for (Node node : nodes) {
+ nodeLogQueues.add(createQueueAndBindingThread(node));
}
+ clientPool = new SyncClientPool(new FactorySync(new Factory()));
+ this.leader = leader;
}
@TestOnly
@@ -114,15 +117,9 @@ public class LogDispatcher {
addSucceeded = nodeLogQueue.add(log);
}
- if (!addSucceeded) {
- logger.debug(
- "Log queue[{}] of {} is full, ignore the log to this node", i, member.getName());
- } else {
+ if (addSucceeded) {
log.setEnqueueTime(System.nanoTime());
}
- } catch (IllegalStateException e) {
- logger.debug(
- "Log queue[{}] of {} is full, ignore the log to this node", i, member.getName());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
@@ -219,30 +216,26 @@ public class LogDispatcher {
class DispatcherThread implements Runnable {
- private Node receiver;
+ private Node node;
+ private Client client;
private BlockingQueue<SendLogRequest> logBlockingDeque;
private List<SendLogRequest> currBatch = new ArrayList<>();
- private Peer peer;
- DispatcherThread(Node receiver, BlockingQueue<SendLogRequest> logBlockingDeque) {
- this.receiver = receiver;
+ DispatcherThread(Node node, BlockingQueue<SendLogRequest> logBlockingDeque) {
+ this.client = clientPool.getClient(node);
this.logBlockingDeque = logBlockingDeque;
- this.peer =
- member
- .getPeerMap()
- .computeIfAbsent(receiver, r -> new Peer(member.getLogManager().getLastLogIndex()));
}
@Override
public void run() {
- Thread.currentThread().setName("LogDispatcher-" + member.getName() + "-" + receiver);
+ Thread.currentThread().setName("LogDispatcher-" + node);
try {
while (!Thread.interrupted()) {
SendLogRequest poll = logBlockingDeque.take();
currBatch.add(poll);
logBlockingDeque.drainTo(currBatch);
if (logger.isDebugEnabled()) {
- logger.debug("Sending {} logs to {}", currBatch.size(), receiver);
+ logger.debug("Sending {} logs to {}", currBatch.size(), node);
}
for (SendLogRequest request : currBatch) {
request.getAppendEntryRequest().entry = request.serializedLogFuture.get();
@@ -258,59 +251,25 @@ public class LogDispatcher {
logger.info("Dispatcher exits");
}
- private void appendEntriesAsync(
- List<ByteBuffer> logList, AppendEntriesRequest request, List<SendLogRequest> currBatch)
- throws TException {
- AsyncMethodCallback<Long> handler = new AppendEntriesHandler(currBatch);
- AsyncClient client = member.getSendLogAsyncClient(receiver);
- if (logger.isDebugEnabled()) {
- logger.debug(
- "{}: append entries {} with {} logs", member.getName(), receiver, logList.size());
- }
- if (client != null) {
- client.appendEntries(request, handler);
- }
- }
-
private void appendEntriesSync(
List<ByteBuffer> logList, AppendEntriesRequest request, List<SendLogRequest> currBatch) {
long startTime = Timer.Statistic.RAFT_SENDER_WAIT_FOR_PREV_LOG.getOperationStartTime();
- if (!member.waitForPrevLog(peer, currBatch.get(0).getLog())) {
- logger.warn(
- "{}: node {} timed out when appending {}",
- member.getName(),
- receiver,
- currBatch.get(0).getLog());
- return;
- }
Timer.Statistic.RAFT_SENDER_WAIT_FOR_PREV_LOG.calOperationCostTimeFromStart(startTime);
- Client client = member.getSyncClient(receiver);
- if (client == null) {
- logger.error("No available client for {}", receiver);
- return;
- }
- AsyncMethodCallback<Long> handler = new AppendEntriesHandler(currBatch);
startTime = Timer.Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
try {
long result = client.appendEntries(request);
Timer.Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
if (result != -1 && logger.isInfoEnabled()) {
logger.info(
- "{}: Append {} logs to {}, resp: {}",
- member.getName(),
+ "Append {} logs to {}, resp: {}",
logList.size(),
- receiver,
+ node,
result);
}
- handler.onComplete(result);
} catch (TException e) {
- client.getInputProtocol().getTransport().close();
- handler.onError(e);
logger.warn("Failed logs: {}, first index: {}", logList, request.prevLogIndex + 1);
- } finally {
- ClientUtils.putBackSyncClient(client);
}
}
@@ -318,15 +277,8 @@ public class LogDispatcher {
List<ByteBuffer> logList, List<SendLogRequest> currBatch, int firstIndex) {
AppendEntriesRequest request = new AppendEntriesRequest();
- if (member.getHeader() != null) {
- request.setHeader(member.getHeader());
- }
- request.setLeader(member.getThisNode());
- request.setLeaderCommit(member.getLogManager().getCommitLogIndex());
-
- synchronized (member.getTerm()) {
- request.setTerm(member.getTerm().get());
- }
+ request.setLeader(leader);
+ request.setTerm(1);
request.setEntries(logList);
// set index for raft
@@ -339,7 +291,7 @@ public class LogDispatcher {
return request;
}
- private void sendLogs(List<SendLogRequest> currBatch) throws TException {
+ private void sendLogs(List<SendLogRequest> currBatch) {
int logIndex = 0;
logger.debug(
"send logs from index {} to {}",
@@ -362,11 +314,7 @@ public class LogDispatcher {
}
AppendEntriesRequest appendEntriesRequest = prepareRequest(logList, currBatch, prevIndex);
- if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
- appendEntriesAsync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
- } else {
- appendEntriesSync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
- }
+ appendEntriesSync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
for (; prevIndex < logIndex; prevIndex++) {
Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_END.calOperationCostTimeFromStart(
currBatch.get(prevIndex).getLog().getCreateTime());
@@ -374,84 +322,9 @@ public class LogDispatcher {
}
}
- private void sendBatchLogs(List<SendLogRequest> currBatch) throws TException {
- if (currBatch.size() > 1) {
- if (useBatchInLogCatchUp) {
- sendLogs(currBatch);
- } else {
- for (SendLogRequest batch : currBatch) {
- sendLog(batch);
- }
- }
- } else {
- sendLog(currBatch.get(0));
- }
- }
-
- private void sendLog(SendLogRequest logRequest) {
- Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
- logRequest.getLog().getCreateTime());
- member.sendLogToFollower(
- logRequest.getLog(),
- logRequest.getVoteCounter(),
- receiver,
- logRequest.getLeaderShipStale(),
- logRequest.getNewLeaderTerm(),
- logRequest.getAppendEntryRequest());
- Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_END.calOperationCostTimeFromStart(
- logRequest.getLog().getCreateTime());
+ private void sendBatchLogs(List<SendLogRequest> currBatch) {
+ sendLogs(currBatch);
}
- class AppendEntriesHandler implements AsyncMethodCallback<Long> {
-
- private final List<AsyncMethodCallback<Long>> singleEntryHandlers;
-
- private AppendEntriesHandler(List<SendLogRequest> batch) {
- singleEntryHandlers = new ArrayList<>(batch.size());
- for (SendLogRequest sendLogRequest : batch) {
- AppendNodeEntryHandler handler =
- getAppendNodeEntryHandler(
- sendLogRequest.getLog(),
- sendLogRequest.getVoteCounter(),
- receiver,
- sendLogRequest.getLeaderShipStale(),
- sendLogRequest.getNewLeaderTerm(),
- peer);
- singleEntryHandlers.add(handler);
- }
- }
-
- @Override
- public void onComplete(Long aLong) {
- for (AsyncMethodCallback<Long> singleEntryHandler : singleEntryHandlers) {
- singleEntryHandler.onComplete(aLong);
- }
- }
-
- @Override
- public void onError(Exception e) {
- for (AsyncMethodCallback<Long> singleEntryHandler : singleEntryHandlers) {
- singleEntryHandler.onError(e);
- }
- }
-
- private AppendNodeEntryHandler getAppendNodeEntryHandler(
- Log log,
- AtomicInteger voteCounter,
- Node node,
- AtomicBoolean leaderShipStale,
- AtomicLong newLeaderTerm,
- Peer peer) {
- AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
- handler.setReceiver(node);
- handler.setVoteCounter(voteCounter);
- handler.setLeaderShipStale(leaderShipStale);
- handler.setLog(log);
- handler.setMember(member);
- handler.setPeer(peer);
- handler.setReceiverTerm(newLeaderTerm);
- return handler;
- }
- }
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
new file mode 100644
index 0000000..efa63c2
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprMember.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.expr;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iotdb.cluster.coordinator.Coordinator;
+import org.apache.iotdb.cluster.log.Log;
+import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.cluster.server.Response;
+import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.utils.ClientUtils;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
+import org.apache.iotdb.cluster.utils.StatusUtils;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.ExprPlan;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TProtocolFactory;
+
+public class ExprMember extends MetaGroupMember {
+
+ public ExprMember() {
+ }
+
+ public ExprMember(Node thisNode, List<Node> allNodes) {
+ this.thisNode = thisNode;
+ this.allNodes = allNodes;
+ }
+
+ public ExprMember(TProtocolFactory factory,
+ Node thisNode, Coordinator coordinator)
+ throws QueryProcessException {
+ super(factory, thisNode, coordinator);
+ }
+
+ private int windowSize = 10000;
+ private Log[] logWindow = new Log[windowSize];
+ private long windowPrevLogIndex;
+ private long windowPrevLogTerm;
+ private long windowTerm;
+
+ @Override
+ protected synchronized void startSubServers() {
+
+ }
+
+ @Override
+ public TSStatus executeNonQueryPlan(PhysicalPlan plan) {
+ if (false) {
+ if (plan instanceof ExprPlan && !((ExprPlan) plan).isNeedForward()) {
+ return StatusUtils.OK;
+ } else if (plan instanceof ExprPlan) {
+ ((ExprPlan) plan).setNeedForward(false);
+ }
+
+ ExecutNonQueryReq req = new ExecutNonQueryReq();
+ ByteBuffer byteBuffer = ByteBuffer.allocate(128 * 1024);
+ plan.serialize(byteBuffer);
+ byteBuffer.flip();
+ req.setPlanBytes(byteBuffer);
+
+ for (Node node : getAllNodes()) {
+ if (!ClusterUtils.isNodeEquals(node, thisNode)) {
+ Client syncClient = getSyncClient(node);
+ try {
+ syncClient.executeNonQueryPlan(req);
+ } catch (TException e) {
+ ClientUtils.putBackSyncClient(syncClient);
+ return StatusUtils.getStatus(StatusUtils.INTERNAL_ERROR, e.getMessage());
+ }
+ ClientUtils.putBackSyncClient(syncClient);
+ }
+ }
+ return StatusUtils.OK;
+ }
+ return processNonPartitionedMetaPlan(plan);
+ }
+
+ protected long appendEntry1(long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
+ long resp;
+ long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
+ long success = 0;
+
+ synchronized (logManager) {
+ long windowPos = log.getCurrLogIndex() - logManager.getLastLogIndex() - 1;
+ if (windowPos < 0) {
+ success = logManager.maybeAppend(prevLogIndex, prevLogTerm, leaderCommit, log);
+ } else if (windowPos < windowSize) {
+ logWindow[(int) windowPos] = log;
+ if (windowPos == 0) {
+ windowPrevLogIndex = prevLogIndex;
+ windowPrevLogTerm = prevLogTerm;
+
+ int flushPos = 0;
+ for (; flushPos < windowSize; flushPos++) {
+ if (logWindow[flushPos] == null) {
+ break;
+ }
+ }
+ // flush [0, flushPos)
+ List<Log> logs = Arrays.asList(logWindow).subList(0, flushPos);
+ success = logManager.maybeAppend(windowPrevLogIndex, windowPrevLogTerm, leaderCommit,
+ logs);
+ if (success != -1) {
+ System.arraycopy(logWindow, flushPos, logWindow, 0, windowSize - flushPos);
+ for (int i = 1; i <= flushPos; i++) {
+ logWindow[windowSize - i] = null;
+ }
+ } else {
+ System.out.println("not success");
+ }
+ }
+ } else {
+ return Response.RESPONSE_LOG_MISMATCH;
+ }
+ }
+
+ Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
+ if (success != -1) {
+ resp = Response.RESPONSE_AGREE;
+ } else {
+ // the incoming log points to an illegal position, reject it
+ resp = Response.RESPONSE_LOG_MISMATCH;
+ }
+ return resp;
+ }
+
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java
new file mode 100644
index 0000000..338755e
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/expr/ExprServer.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.expr;
+
+import io.moquette.broker.config.IConfig;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.exception.ConfigInconsistentException;
+import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
+import org.apache.iotdb.cluster.log.LogDispatcher;
+import org.apache.iotdb.cluster.server.MetaClusterServer;
+import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.tsfile.read.filter.operator.In;
+import org.apache.thrift.protocol.TBinaryProtocol.Factory;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransportException;
+
+public class ExprServer extends MetaClusterServer {
+
+ protected ExprServer() throws QueryProcessException {
+ super();
+ }
+
+ @Override
+ protected MetaGroupMember createMetaGroupMember() throws QueryProcessException {
+ return new ExprMember(new Factory(), thisNode, coordinator);
+ }
+
+ @Override
+ protected String getClientThreadPrefix() {
+ return "FollowerClient";
+ }
+
+ @Override
+ protected String getServerClientName() {
+ return "FollowerServer";
+ }
+
+ @Override
+ protected TServerTransport getServerSocket() throws TTransportException {
+ return new TServerSocket(
+ new InetSocketAddress(thisNode.getInternalIp(), thisNode.getMetaPort()));
+ }
+
+
+ public static void main(String[] args)
+ throws StartupException, TTransportException, QueryProcessException, ConfigInconsistentException, StartUpCheckFailureException {
+
+ String[] nodeStrings = args[0].split(":");
+ String ip = nodeStrings[0];
+ int port = Integer.parseInt(nodeStrings[1]);
+ String[] allNodeStr = args[1].split(",");
+
+ int dispatcherThreadNum = Integer.parseInt(args[2]);
+
+ ClusterDescriptor.getInstance().getConfig().setSeedNodeUrls(Arrays.asList(allNodeStr));
+ ClusterDescriptor.getInstance().getConfig().setInternalMetaPort(port);
+ ClusterDescriptor.getInstance().getConfig().setInternalIp(ip);
+ ClusterDescriptor.getInstance().getConfig().setEnableRaftLogPersistence(false);
+ ClusterDescriptor.getInstance().getConfig().setUseBatchInLogCatchUp(false);
+ RaftMember.USE_LOG_DISPATCHER = true;
+ LogDispatcher.bindingThreadNum = dispatcherThreadNum;
+
+ ExprServer server = new ExprServer();
+ server.start();
+ server.buildCluster();
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
new file mode 100644
index 0000000..75f1b16
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/IndirectLogDispatcher.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.log;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import org.apache.iotdb.cluster.log.LogDispatcher.DispatcherThread;
+import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.server.member.RaftMember;
+import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
+
+/**
+ * IndirectLogDispatcher sends entries only to a pre-selected subset of followers instead of all
+ * followers and let the selected followers to broadcast the log to other followers.
+ */
+public class IndirectLogDispatcher extends LogDispatcher {
+
+ private Map<Node, List<Node>> directToIndirectFollowerMap = new HashMap<>();
+
+ public IndirectLogDispatcher(RaftMember member) {
+ super(member);
+ recalculateDirectFollowerMap();
+ }
+
+ @Override
+ LogDispatcher.DispatcherThread newDispatcherThread(Node node,
+ BlockingQueue<SendLogRequest> logBlockingQueue) {
+ return new DispatcherThread(node, logBlockingQueue);
+ }
+
+ @Override
+ void createQueueAndBindingThreads() {
+ for (Node node : directToIndirectFollowerMap.keySet()) {
+ nodeLogQueues.add(createQueueAndBindingThread(node));
+ }
+ }
+
+ public void recalculateDirectFollowerMap() {
+ List<Node> allNodes = new ArrayList<>(member.getAllNodes());
+ allNodes.removeIf(n -> ClusterUtils.isNodeEquals(n, member.getThisNode()));
+ QueryCoordinator instance = QueryCoordinator.getINSTANCE();
+ List<Node> orderedNodes = instance.reorderNodes(allNodes);
+
+ synchronized (this) {
+ directToIndirectFollowerMap.clear();
+ for (int i = 0, j = orderedNodes.size() - 1; i <= j; i++, j--) {
+ if (i != j) {
+ directToIndirectFollowerMap.put(orderedNodes.get(i),
+ Collections.singletonList(orderedNodes.get(j)));
+ } else {
+ directToIndirectFollowerMap.put(orderedNodes.get(i), Collections.emptyList());
+ }
+ }
+ }
+ }
+
+ class DispatcherThread extends LogDispatcher.DispatcherThread {
+
+ DispatcherThread(Node receiver,
+ BlockingQueue<SendLogRequest> logBlockingDeque) {
+ super(receiver, logBlockingDeque);
+ }
+
+ @Override
+ void sendLog(SendLogRequest logRequest) {
+ Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
+ logRequest.getLog().getCreateTime());
+ member.sendLogToFollower(
+ logRequest.getLog(),
+ logRequest.getVoteCounter(),
+ receiver,
+ logRequest.getLeaderShipStale(),
+ logRequest.getNewLeaderTerm(),
+ logRequest.getAppendEntryRequest(), directToIndirectFollowerMap.get(receiver));
+ Timer.Statistic.LOG_DISPATCHER_FROM_CREATE_TO_END.calOperationCostTimeFromStart(
+ logRequest.getLog().getCreateTime());
+ }
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index a600c4a..711b511 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.cluster.log;
+import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
@@ -63,19 +64,24 @@ import java.util.concurrent.atomic.AtomicLong;
public class LogDispatcher {
private static final Logger logger = LoggerFactory.getLogger(LogDispatcher.class);
- private RaftMember member;
- private boolean useBatchInLogCatchUp =
- ClusterDescriptor.getInstance().getConfig().isUseBatchInLogCatchUp();
- private List<BlockingQueue<SendLogRequest>> nodeLogQueues = new ArrayList<>();
+ RaftMember member;
+ private ClusterConfig clusterConfig = ClusterDescriptor.getInstance().getConfig();
+ private boolean useBatchInLogCatchUp = clusterConfig.isUseBatchInLogCatchUp();
+ List<BlockingQueue<SendLogRequest>> nodeLogQueues = new ArrayList<>();
private ExecutorService executorService;
private static ExecutorService serializationService =
Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors(),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DispatcherEncoder-%d").build());
+ public static int bindingThreadNum = 1;
public LogDispatcher(RaftMember member) {
this.member = member;
executorService = Executors.newCachedThreadPool();
+ createQueueAndBindingThreads();
+ }
+
+ void createQueueAndBindingThreads() {
for (Node node : member.getAllNodes()) {
if (!node.equals(member.getThisNode())) {
nodeLogQueues.add(createQueueAndBindingThread(node));
@@ -129,17 +135,20 @@ public class LogDispatcher {
}
}
- private BlockingQueue<SendLogRequest> createQueueAndBindingThread(Node node) {
+ BlockingQueue<SendLogRequest> createQueueAndBindingThread(Node node) {
BlockingQueue<SendLogRequest> logBlockingQueue =
new ArrayBlockingQueue<>(
ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
- int bindingThreadNum = 1;
for (int i = 0; i < bindingThreadNum; i++) {
- executorService.submit(new DispatcherThread(node, logBlockingQueue));
+ executorService.submit(newDispatcherThread(node, logBlockingQueue));
}
return logBlockingQueue;
}
+ DispatcherThread newDispatcherThread(Node node, BlockingQueue<SendLogRequest> logBlockingQueue) {
+ return new DispatcherThread(node, logBlockingQueue);
+ }
+
public static class SendLogRequest {
private Log log;
@@ -219,7 +228,7 @@ public class LogDispatcher {
class DispatcherThread implements Runnable {
- private Node receiver;
+ Node receiver;
private BlockingQueue<SendLogRequest> logBlockingDeque;
private List<SendLogRequest> currBatch = new ArrayList<>();
private Peer peer;
@@ -238,9 +247,11 @@ public class LogDispatcher {
Thread.currentThread().setName("LogDispatcher-" + member.getName() + "-" + receiver);
try {
while (!Thread.interrupted()) {
- SendLogRequest poll = logBlockingDeque.take();
- currBatch.add(poll);
- logBlockingDeque.drainTo(currBatch);
+ synchronized (logBlockingDeque) {
+ SendLogRequest poll = logBlockingDeque.take();
+ currBatch.add(poll);
+ logBlockingDeque.drainTo(currBatch);
+ }
if (logger.isDebugEnabled()) {
logger.debug("Sending {} logs to {}", currBatch.size(), receiver);
}
@@ -388,7 +399,7 @@ public class LogDispatcher {
}
}
- private void sendLog(SendLogRequest logRequest) {
+ void sendLog(SendLogRequest logRequest) {
Timer.Statistic.LOG_DISPATCHER_LOG_IN_QUEUE.calOperationCostTimeFromStart(
logRequest.getLog().getCreateTime());
member.sendLogToFollower(
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index 446eefc..eae40e1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -420,6 +420,25 @@ public abstract class RaftLogManager {
return -1;
}
+ public long append(long lastIndex, long lastTerm, long leaderCommit, Log entry) {
+ long newLastIndex = lastIndex + 1;
+ if (entry.getCurrLogIndex() <= commitIndex) {
+ logger.debug(
+ "{}: entry {} conflict with committed entry [commitIndex({})]",
+ name,
+ entry.getCurrLogIndex(),
+ commitIndex);
+ } else {
+ append(entry);
+ }
+ try {
+ commitTo(Math.min(leaderCommit, newLastIndex));
+ } catch (LogExecutionException e) {
+ // exceptions are ignored on follower side
+ }
+ return newLastIndex;
+ }
+
/**
* Used by leader node or MaybeAppend to directly append to unCommittedEntryManager. Note that the
* caller should ensure entries[0].index > committed.
@@ -466,6 +485,20 @@ public abstract class RaftLogManager {
return getLastLogIndex();
}
+ public long appendDirectly(Log entry) {
+ long after = entry.getCurrLogIndex();
+ if (after <= commitIndex) {
+ logger.error("{}: after({}) is out of range [commitIndex({})]", name, after, commitIndex);
+ return -1;
+ }
+ getUnCommittedEntryManager().truncateAndAppend(entry);
+ Object logUpdateCondition = getLogUpdateCondition(entry.getCurrLogIndex());
+ synchronized (logUpdateCondition) {
+ logUpdateCondition.notifyAll();
+ }
+ return getLastLogIndex();
+ }
+
/**
* Used by leader node to try to commit entries.
*
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java
index a3026c9..c85e709 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/UnCommittedEntryManager.java
@@ -185,10 +185,10 @@ public class UnCommittedEntryManager {
* @param appendingEntry request entry
*/
void truncateAndAppend(Log appendingEntry) {
- if (maybeTerm(appendingEntry.getCurrLogIndex()) == appendingEntry.getCurrLogTerm()) {
- // skip existing entry
- return;
- }
+// if (maybeTerm(appendingEntry.getCurrLogIndex()) == appendingEntry.getCurrLogTerm()) {
+// // skip existing entry
+// return;
+// }
long after = appendingEntry.getCurrLogIndex();
long len = after - offset;
@@ -203,8 +203,8 @@ public class UnCommittedEntryManager {
} else {
// clear conflict entries
// then append
- logger.info(
- "truncate the entries after index {}, append a new entry {}", after, appendingEntry);
+// logger.info(
+// "truncate the entries after index {}, append a new entry {}", after, appendingEntry);
int truncateIndex = (int) (after - offset);
if (truncateIndex < entries.size()) {
entries.subList(truncateIndex, entries.size()).clear();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
index c81e6d0..a945a92 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/serializable/SyncLogDequeSerializer.java
@@ -431,7 +431,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
@Override
public void setHardStateAndFlush(HardState state) {
this.state = state;
- serializeMeta(meta);
+ // serializeMeta(meta);
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index e4c81f8..d88d1a3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -521,7 +521,7 @@ public class DataClusterServer extends RaftServer
}
@Override
- TProcessor getProcessor() {
+ protected TProcessor getProcessor() {
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
return new AsyncProcessor<>(this);
} else {
@@ -530,7 +530,7 @@ public class DataClusterServer extends RaftServer
}
@Override
- TServerTransport getServerSocket() throws TTransportException {
+ protected TServerTransport getServerSocket() throws TTransportException {
logger.info(
"[{}] Cluster node will listen {}:{}",
getServerClientName(),
@@ -547,12 +547,12 @@ public class DataClusterServer extends RaftServer
}
@Override
- String getClientThreadPrefix() {
+ protected String getClientThreadPrefix() {
return "DataClientThread-";
}
@Override
- String getServerClientName() {
+ protected String getServerClientName() {
return "DataServerThread-";
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index 12e286f..ddc64cc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -74,27 +74,31 @@ public class MetaClusterServer extends RaftServer
private static Logger logger = LoggerFactory.getLogger(MetaClusterServer.class);
// each node only contains one MetaGroupMember
- private MetaGroupMember member;
- private Coordinator coordinator;
+ protected MetaGroupMember member;
+ protected Coordinator coordinator;
// the single-node IoTDB instance
private IoTDB ioTDB;
// to register the ClusterMonitor that helps monitoring the cluster
private RegisterManager registerManager = new RegisterManager();
- private MetaAsyncService asyncService;
- private MetaSyncService syncService;
- private MetaHeartbeatServer metaHeartbeatServer;
+ protected MetaAsyncService asyncService;
+ protected MetaSyncService syncService;
+ protected MetaHeartbeatServer metaHeartbeatServer;
public MetaClusterServer() throws QueryProcessException {
super();
metaHeartbeatServer = new MetaHeartbeatServer(thisNode, this);
coordinator = new Coordinator();
- member = new MetaGroupMember(protocolFactory, thisNode, coordinator);
+ member = createMetaGroupMember();
coordinator.setMetaGroupMember(member);
asyncService = new MetaAsyncService(member);
syncService = new MetaSyncService(member);
MetaPuller.getInstance().init(member);
}
+ protected MetaGroupMember createMetaGroupMember() throws QueryProcessException {
+ return new MetaGroupMember(protocolFactory, thisNode, coordinator);
+ }
+
/**
* Besides the standard RaftServer start-up, the IoTDB instance, a MetaGroupMember and the
* ClusterMonitor are also started.
@@ -150,7 +154,7 @@ public class MetaClusterServer extends RaftServer
* @throws TTransportException if create the socket fails
*/
@Override
- TServerTransport getServerSocket() throws TTransportException {
+ protected TServerTransport getServerSocket() throws TTransportException {
logger.info(
"[{}] Cluster node will listen {}:{}",
getServerClientName(),
@@ -167,17 +171,17 @@ public class MetaClusterServer extends RaftServer
}
@Override
- String getClientThreadPrefix() {
+ protected String getClientThreadPrefix() {
return "MetaClientThread-";
}
@Override
- String getServerClientName() {
+ protected String getServerClientName() {
return "MetaServerThread-";
}
@Override
- TProcessor getProcessor() {
+ protected TProcessor getProcessor() {
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
return new AsyncProcessor<>(this);
} else {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
index 09956d2..405c6e2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
@@ -71,9 +71,9 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService.
private TServerTransport socket;
// RPC processing server
private TServer poolServer;
- Node thisNode;
+ protected Node thisNode;
- TProtocolFactory protocolFactory =
+ protected TProtocolFactory protocolFactory =
config.isRpcThriftCompressionEnabled()
? new TCompactProtocol.Factory()
: new TBinaryProtocol.Factory();
@@ -92,7 +92,7 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService.
thisNode.setClientIp(IoTDBDescriptor.getInstance().getConfig().getRpcAddress());
}
- RaftServer(Node thisNode) {
+ protected RaftServer(Node thisNode) {
this.thisNode = thisNode;
}
@@ -167,14 +167,14 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService.
* @return An AsyncProcessor that contains the extended interfaces of a non-abstract subclass of
* RaftService (DataService or MetaService).
*/
- abstract TProcessor getProcessor();
+ protected abstract TProcessor getProcessor();
/**
* @return A socket that will be used to establish a thrift server to listen to RPC requests.
* DataServer and MetaServer use different port, so this is to be determined.
* @throws TTransportException
*/
- abstract TServerTransport getServerSocket() throws TTransportException;
+ protected abstract TServerTransport getServerSocket() throws TTransportException;
/**
* Each thrift RPC request will be processed in a separate thread and this will return the name
@@ -183,7 +183,7 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService.
*
* @return name prefix of RPC processing threads.
*/
- abstract String getClientThreadPrefix();
+ protected abstract String getClientThreadPrefix();
/**
* The thrift server will be run in a separate thread, and this will be its name. It help you
@@ -191,7 +191,7 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService.
*
* @return The name of the thread running the thrift server.
*/
- abstract String getServerClientName();
+ protected abstract String getServerClientName();
private TServer createAsyncServer() throws TTransportException {
socket = getServerSocket();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
index de0b83f..6d95e06 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
@@ -111,7 +111,7 @@ public class HeartbeatHandler implements AsyncMethodCallback<HeartBeatResponse>
if (lastLogIdx == peer.getLastHeartBeatIndex()) {
// the follower's lastLogIndex is unchanged, increase inconsistent counter
int inconsistentNum = peer.incInconsistentHeartbeatNum();
- if (inconsistentNum >= 5) {
+ if (inconsistentNum >= 10000) {
logger.info(
"{}: catching up node {}, index-term: {}-{}/{}-{}, peer match index {}",
memberName,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/IndirectAppendHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/IndirectAppendHandler.java
new file mode 100644
index 0000000..71903d4
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/IndirectAppendHandler.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.server.handlers.forwarder;
+
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IndirectAppendHandler implements AsyncMethodCallback<Long> {
+
+ private static final Logger logger = LoggerFactory.getLogger(IndirectAppendHandler.class);
+ private Node receiver;
+ private AppendEntryRequest request;
+
+ public IndirectAppendHandler(Node receiver,
+ AppendEntryRequest request) {
+ this.receiver = receiver;
+ this.request = request;
+ }
+
+ @Override
+ public void onComplete(Long response) {
+ // ignore response from indirect appender
+ }
+
+ @Override
+ public void onError(Exception exception) {
+ logger.warn("Cannot send request {} to {}", request, receiver, exception);
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
index 0459fef..50a46bb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.cluster.server.handlers.caller.HeartbeatHandler;
import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.cluster.utils.ClientUtils;
+import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -152,7 +153,7 @@ public class HeartbeatThread implements Runnable {
synchronized (nodes) {
// avoid concurrent modification
for (Node node : nodes) {
- if (node.equals(localMember.getThisNode())) {
+ if (ClusterUtils.isNodeEquals(node, localMember.getThisNode())) {
continue;
}
if (Thread.currentThread().isInterrupted()) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java
index ed99c3d..e744fdb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatServer.java
@@ -74,8 +74,8 @@ public class MetaHeartbeatServer extends HeartbeatServer {
} else {
return new TServerSocket(
new InetSocketAddress(
- config.getInternalIp(),
- config.getInternalMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET));
+ metaClusterServer.getMember().getThisNode().getInternalIp(),
+ metaClusterServer.getMember().getThisNode().getMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET));
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
index 002c4fe..2c5cf35 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
@@ -40,7 +40,15 @@ public class MetaHeartbeatThread extends HeartbeatThread {
request.setRequireIdentifier(!node.isSetNodeIdentifier());
synchronized (localMetaMember.getIdConflictNodes()) {
request.unsetRegenerateIdentifier();
- if (localMetaMember.getIdConflictNodes().contains(node)) {
+ boolean hasIdConflict = false;
+ for (Node idConflictNode : localMetaMember.getIdConflictNodes()) {
+ if (idConflictNode.getInternalIp().equals(node.internalIp)
+ && idConflictNode.getMetaPort() == node.metaPort) {
+ hasIdConflict = true;
+ break;
+ }
+ }
+ if (hasIdConflict) {
request.setRegenerateIdentifier(true);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index eb10d44..056a485 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.cluster.server.member;
+import java.util.Comparator;
import org.apache.iotdb.cluster.client.DataClientProvider;
import org.apache.iotdb.cluster.client.async.AsyncClientPool;
import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
@@ -1021,21 +1022,26 @@ public class MetaGroupMember extends RaftMember {
ExecutorService pool = new ScheduledThreadPoolExecutor(getAllNodes().size() - 1);
for (Node seedNode : getAllNodes()) {
Node thisNode = getThisNode();
- if (seedNode.equals(thisNode)) {
+ if (ClusterUtils.isNodeEquals(thisNode, seedNode)) {
continue;
}
pool.submit(
() -> {
- CheckStatusResponse response = checkStatus(seedNode);
- if (response != null) {
- // check the response
- ClusterUtils.examineCheckStatusResponse(
- response, consistentNum, inconsistentNum, seedNode);
- } else {
- logger.warn(
- "Start up exception. Cannot connect to node {}. Try again in next turn.",
- seedNode);
+ try {
+ CheckStatusResponse response = checkStatus(seedNode);
+ if (response != null) {
+ // check the response
+ ClusterUtils.examineCheckStatusResponse(
+ response, consistentNum, inconsistentNum, seedNode);
+ } else {
+ logger.warn(
+ "Start up exception. Cannot connect to node {}. Try again in next turn.",
+ seedNode);
+ }
+ } catch (Exception e) {
+ logger.error("Start up exception:", e);
}
+
});
}
pool.shutdown();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index f2187f9..daa3f4e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.cluster.server.member;
+import java.util.Collections;
import org.apache.iotdb.cluster.client.async.AsyncClientPool;
import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
import org.apache.iotdb.cluster.client.sync.SyncClientPool;
@@ -39,6 +40,7 @@ import org.apache.iotdb.cluster.log.catchup.CatchUpTask;
import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
import org.apache.iotdb.cluster.log.manage.RaftLogManager;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryAcknowledgement;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
@@ -53,6 +55,7 @@ import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
+import org.apache.iotdb.cluster.server.handlers.forwarder.IndirectAppendHandler;
import org.apache.iotdb.cluster.server.monitor.Peer;
import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
@@ -77,7 +80,9 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.thrift.TException;
+import org.checkerframework.checker.units.qual.A;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -107,12 +112,14 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/**
- * RaftMember process the common raft logic like leader election, log appending, catch-up and so on.
+ * RaftMember process the common raft logic like leader election, log appending, catch-up and so
+ * on.
*/
@SuppressWarnings("java:S3077") // reference volatile is enough
public abstract class RaftMember {
+
private static final Logger logger = LoggerFactory.getLogger(RaftMember.class);
- public static final boolean USE_LOG_DISPATCHER = false;
+ public static boolean USE_LOG_DISPATCHER = false;
private static final String MSG_FORWARD_TIMEOUT = "{}: Forward {} to {} time out";
private static final String MSG_FORWARD_ERROR =
@@ -132,19 +139,29 @@ public abstract class RaftMember {
* on this may be woken.
*/
private final Object waitLeaderCondition = new Object();
- /** the lock is to make sure that only one thread can apply snapshot at the same time */
+ /**
+ * the lock is to make sure that only one thread can apply snapshot at the same time
+ */
private final Object snapshotApplyLock = new Object();
protected Node thisNode = ClusterConstant.EMPTY_NODE;
- /** the nodes that belong to the same raft group as thisNode. */
+ /**
+ * the nodes that belong to the same raft group as thisNode.
+ */
protected List<Node> allNodes;
ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
- /** the name of the member, to distinguish several members in the logs. */
+ /**
+ * the name of the member, to distinguish several members in the logs.
+ */
String name;
- /** to choose nodes to send request of joining cluster randomly. */
+ /**
+ * to choose nodes to send request of joining cluster randomly.
+ */
Random random = new Random();
- /** when the node is a leader, this map is used to track log progress of each follower. */
+ /**
+ * when the node is a leader, this map is used to track log progress of each follower.
+ */
Map<Node, Peer> peerMap;
/**
* the current term of the node, this object also works as lock of some transactions of the member
@@ -165,8 +182,10 @@ public abstract class RaftMember {
* offline.
*/
volatile long lastHeartbeatReceivedTime;
- /** the raft logs are all stored and maintained in the log manager */
- RaftLogManager logManager;
+ /**
+ * the raft logs are all stored and maintained in the log manager
+ */
+ protected RaftLogManager logManager;
/**
* the single thread pool that runs the heartbeat thread, which send heartbeats to the follower
* when this node is a leader, or start elections when this node is an elector.
@@ -183,7 +202,9 @@ public abstract class RaftMember {
* member by comparing it with the current last log index.
*/
long lastReportedLogIndex;
- /** the thread pool that runs catch-up tasks */
+ /**
+ * the thread pool that runs catch-up tasks
+ */
private ExecutorService catchUpService;
/**
* lastCatchUpResponseTime records when is the latest response of each node's catch-up. There
@@ -219,20 +240,30 @@ public abstract class RaftMember {
* one slow node.
*/
private ExecutorService serialToParallelPool;
- /** a thread pool that is used to do commit log tasks asynchronous in heartbeat thread */
+ /**
+ * a thread pool that is used to do commit log tasks asynchronous in heartbeat thread
+ */
private ExecutorService commitLogPool;
/**
* logDispatcher buff the logs orderly according to their log indexes and send them sequentially,
- * which avoids the followers receiving out-of-order logs, forcing them to wait for previous logs.
+ * which avoids the followers receiving out-of-order logs, forcing them to wait for previous
+ * logs.
*/
private LogDispatcher logDispatcher;
/**
- * localExecutor is used to directly execute plans like load configuration in the underlying IoTDB
+ * localExecutor is used to directly execute plans like load configuration in the underlying
+ * IoTDB
*/
protected PlanExecutor localExecutor;
- protected RaftMember() {}
+ /**
+ * (logIndex, logTerm) -> append handler
+ */
+ protected Map<Pair<Long, Long>, AppendNodeEntryHandler> sentLogHandlers = new ConcurrentHashMap<>();
+
+ protected RaftMember() {
+ }
protected RaftMember(
String name,
@@ -527,10 +558,68 @@ public abstract class RaftMember {
long result = appendEntry(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, log);
logger.debug("{} AppendEntryRequest of {} completed with result {}", name, log, result);
+ if (!request.isFromLeader) {
+ appendAckLeader(request.leader, log, result);
+ }
+
return result;
}
- /** Similar to appendEntry, while the incoming load is batch of logs instead of a single log. */
+ private void appendAckLeader(Node leader, Log log, long response) {
+ AppendEntryAcknowledgement appendEntryAcknowledgement = new AppendEntryAcknowledgement();
+ appendEntryAcknowledgement.index = log.getCurrLogIndex();
+ appendEntryAcknowledgement.term = log.getCurrLogTerm();
+ appendEntryAcknowledgement.response = response;
+
+ Client syncClient = null;
+ try {
+ if (config.isUseAsyncServer()) {
+ GenericHandler<Void> handler = new GenericHandler<>(leader, null);
+ getAsyncClient(leader).acknowledgeAppendEntry(appendEntryAcknowledgement, handler);
+ } else {
+ syncClient = getSyncClient(leader);
+ syncClient.acknowledgeAppendEntry(appendEntryAcknowledgement);
+ }
+ } catch (TException e) {
+ logger.warn("Cannot send ack of {} to leader {}", log, leader, e);
+ } finally {
+ if (syncClient != null) {
+ ClientUtils.putBackSyncClient(syncClient);
+ }
+ }
+ }
+
+ public long appendEntryIndirect(AppendEntryRequest request, List<Node> subFollowers) throws UnknownLogTypeException {
+ long result = appendEntry(request);
+ appendLogThreadPool.submit(() -> sendLogToSubFollowers(request, subFollowers));
+ return result;
+ }
+
+ private void sendLogToSubFollowers(AppendEntryRequest request, List<Node> subFollowers) {
+ request.setIsFromLeader(false);
+ for (Node subFollower : subFollowers) {
+ Client syncClient = null;
+ try {
+ if (config.isUseAsyncServer()) {
+ getAsyncClient(subFollower).appendEntry(request, new IndirectAppendHandler(subFollower,
+ request));
+ } else {
+ syncClient = getSyncClient(subFollower);
+ syncClient.appendEntry(request);
+ }
+ } catch (TException e) {
+ logger.error("Cannot send {} to {}", request, subFollower, e);
+ } finally {
+ if (syncClient != null) {
+ ClientUtils.putBackSyncClient(syncClient);
+ }
+ }
+ }
+ }
+
+ /**
+ * Similar to appendEntry, while the incoming load is batch of logs instead of a single log.
+ */
public long appendEntries(AppendEntriesRequest request) throws UnknownLogTypeException {
logger.debug("{} received an AppendEntriesRequest", name);
@@ -603,13 +692,17 @@ public abstract class RaftMember {
AtomicBoolean leaderShipStale,
AtomicLong newLeaderTerm,
AppendEntryRequest request,
- Peer peer) {
+ Peer peer, List<Node> indirectReceivers) {
AsyncClient client = getSendLogAsyncClient(node);
if (client != null) {
AppendNodeEntryHandler handler =
getAppendNodeEntryHandler(log, voteCounter, node, leaderShipStale, newLeaderTerm, peer);
try {
- client.appendEntry(request, handler);
+ if (indirectReceivers == null || indirectReceivers.isEmpty()) {
+ client.appendEntry(request, handler);
+ } else {
+ client.appendEntryIndirect(request, indirectReceivers, handler);
+ }
logger.debug("{} sending a log to {}: {}", name, node, log);
} catch (Exception e) {
logger.warn("{} cannot append log to node {}", name, node, e);
@@ -680,16 +773,22 @@ public abstract class RaftMember {
return lastCatchUpResponseTime;
}
- /** Sub-classes will add their own process of HeartBeatResponse in this method. */
- public void processValidHeartbeatResp(HeartBeatResponse response, Node receiver) {}
+ /**
+ * Sub-classes will add their own process of HeartBeatResponse in this method.
+ */
+ public void processValidHeartbeatResp(HeartBeatResponse response, Node receiver) {
+ }
- /** The actions performed when the node wins in an election (becoming a leader). */
- public void onElectionWins() {}
+ /**
+ * The actions performed when the node wins in an election (becoming a leader).
+ */
+ public void onElectionWins() {
+ }
/**
* Update the followers' log by sending logs whose index >= followerLastMatchedLogIndex to the
- * follower. If some of the required logs are removed, also send the snapshot. <br>
- * notice that if a part of data is in the snapshot, then it is not in the logs.
+ * follower. If some of the required logs are removed, also send the snapshot. <br> notice that if
+ * a part of data is in the snapshot, then it is not in the logs.
*/
public void catchUp(Node follower, long lastLogIdx) {
// for one follower, there is at most one ongoing catch-up, so the same data will not be sent
@@ -744,7 +843,7 @@ public abstract class RaftMember {
* @param plan a non-query plan.
* @return A TSStatus indicating the execution result.
*/
- abstract TSStatus executeNonQueryPlan(PhysicalPlan plan);
+ protected abstract TSStatus executeNonQueryPlan(PhysicalPlan plan);
/**
* according to the consistency configuration, decide whether to execute syncLeader or not and
@@ -775,7 +874,9 @@ public abstract class RaftMember {
}
}
- /** call back after syncLeader */
+ /**
+ * call back after syncLeader
+ */
public interface CheckConsistency {
/**
@@ -784,7 +885,7 @@ public abstract class RaftMember {
* @param leaderCommitId leader commit id
* @param localAppliedId local applied id
* @throws CheckConsistencyException maybe throw CheckConsistencyException, which is defined in
- * implements.
+ * implements.
*/
void postCheckConsistency(long leaderCommitId, long localAppliedId)
throws CheckConsistencyException;
@@ -793,8 +894,7 @@ public abstract class RaftMember {
public static class MidCheckConsistency implements CheckConsistency {
/**
- * if leaderCommitId - localAppliedId > MaxReadLogLag, will throw
- * CHECK_MID_CONSISTENCY_EXCEPTION
+ * if leaderCommitId - localAppliedId > MaxReadLogLag, will throw CHECK_MID_CONSISTENCY_EXCEPTION
*
* @param leaderCommitId leader commit id
* @param localAppliedId local applied id
@@ -806,7 +906,7 @@ public abstract class RaftMember {
if (leaderCommitId == Long.MAX_VALUE
|| leaderCommitId == Long.MIN_VALUE
|| leaderCommitId - localAppliedId
- > ClusterDescriptor.getInstance().getConfig().getMaxReadLogLag()) {
+ > ClusterDescriptor.getInstance().getConfig().getMaxReadLogLag()) {
throw CheckConsistencyException.CHECK_MID_CONSISTENCY_EXCEPTION;
}
}
@@ -839,7 +939,7 @@ public abstract class RaftMember {
* @param checkConsistency check after syncleader
* @return true if the node has caught up, false otherwise
* @throws CheckConsistencyException if leaderCommitId bigger than localAppliedId a threshold
- * value after timeout
+ * value after timeout
*/
public boolean syncLeader(CheckConsistency checkConsistency) throws CheckConsistencyException {
if (character == NodeCharacter.LEADER) {
@@ -858,7 +958,9 @@ public abstract class RaftMember {
return waitUntilCatchUp(checkConsistency);
}
- /** Wait until the leader of this node becomes known or time out. */
+ /**
+ * Wait until the leader of this node becomes known or time out.
+ */
public void waitLeader() {
long startTime = System.currentTimeMillis();
while (leader.get() == null || ClusterConstant.EMPTY_NODE.equals(leader.get())) {
@@ -885,7 +987,7 @@ public abstract class RaftMember {
*
* @return true if this node has caught up before timeout, false otherwise
* @throws CheckConsistencyException if leaderCommitId bigger than localAppliedId a threshold
- * value after timeout
+ * value after timeout
*/
protected boolean waitUntilCatchUp(CheckConsistency checkConsistency)
throws CheckConsistencyException {
@@ -957,7 +1059,7 @@ public abstract class RaftMember {
* call this method. Will commit the log locally and send it to followers
*
* @return OK if over half of the followers accept the log or null if the leadership is lost
- * during the appending
+ * during the appending
*/
TSStatus processPlanLocally(PhysicalPlan plan) {
if (USE_LOG_DISPATCHER) {
@@ -1156,7 +1258,9 @@ public abstract class RaftMember {
return peerMap;
}
- /** @return true if there is a log whose index is "index" and term is "term", false otherwise */
+ /**
+ * @return true if there is a log whose index is "index" and term is "term", false otherwise
+ */
public boolean matchLog(long index, long term) {
boolean matched = logManager.matchTerm(term, index);
logger.debug("Log {}-{} matched: {}", index, term, matched);
@@ -1171,15 +1275,18 @@ public abstract class RaftMember {
return syncLock;
}
- /** Sub-classes will add their own process of HeartBeatRequest in this method. */
- void processValidHeartbeatReq(HeartBeatRequest request, HeartBeatResponse response) {}
+ /**
+ * Sub-classes will add their own process of HeartBeatRequest in this method.
+ */
+ void processValidHeartbeatReq(HeartBeatRequest request, HeartBeatResponse response) {
+ }
/**
* Verify the validity of an ElectionRequest, and make itself a follower of the elector if the
* request is valid.
*
* @return Response.RESPONSE_AGREE if the elector is valid or the local term if the elector has a
- * smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
+ * smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
*/
long checkElectorLogProgress(ElectionRequest electionRequest) {
@@ -1222,7 +1329,7 @@ public abstract class RaftMember {
* lastLogIndex is smaller than the voter's Otherwise accept the election.
*
* @return Response.RESPONSE_AGREE if the elector is valid or the local term if the elector has a
- * smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
+ * smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
*/
long checkLogProgress(long lastLogIndex, long lastLogTerm) {
long response;
@@ -1239,10 +1346,10 @@ public abstract class RaftMember {
/**
* Forward a non-query plan to a node using the default client.
*
- * @param plan a non-query plan
- * @param node cannot be the local node
+ * @param plan a non-query plan
+ * @param node cannot be the local node
* @param header must be set for data group communication, set to null for meta group
- * communication
+ * communication
* @return a TSStatus indicating if the forwarding is successful.
*/
public TSStatus forwardPlan(PhysicalPlan plan, Node node, Node header) {
@@ -1273,7 +1380,7 @@ public abstract class RaftMember {
/**
* Forward a non-query plan to "receiver" using "client".
*
- * @param plan a non-query plan
+ * @param plan a non-query plan
* @param header to determine which DataGroupMember of "receiver" will process the request.
* @return a TSStatus indicating if the forwarding is successful.
*/
@@ -1354,7 +1461,7 @@ public abstract class RaftMember {
* Get an asynchronous thrift client of the given node.
*
* @return an asynchronous thrift client or null if the caller tries to connect the local node or
- * the node cannot be reached.
+ * the node cannot be reached.
*/
public AsyncClient getAsyncClient(Node node) {
return getAsyncClient(node, asyncClientPool, true);
@@ -1541,7 +1648,7 @@ public abstract class RaftMember {
* heartbeat timer.
*
* @param fromLeader true if the request is from a leader, false if the request is from an
- * elector.
+ * elector.
*/
public void stepDown(long newTerm, boolean fromLeader) {
synchronized (term) {
@@ -1572,7 +1679,9 @@ public abstract class RaftMember {
this.thisNode = thisNode;
}
- /** @return the header of the data raft group or null if this is in a meta group. */
+ /**
+ * @return the header of the data raft group or null if this is in a meta group.
+ */
public Node getHeader() {
return null;
}
@@ -1653,7 +1762,7 @@ public abstract class RaftMember {
* success.
*
* @param requiredQuorum the number of votes needed to make the log valid, when requiredQuorum <=
- * 0, half of the cluster size will be used.
+ * 0, half of the cluster size will be used.
* @return an AppendLogResult
*/
private AppendLogResult sendLogToFollowers(Log log, int requiredQuorum) {
@@ -1722,7 +1831,6 @@ public abstract class RaftMember {
return waitAppendResult(voteCounter, leaderShipStale, newLeaderTerm);
}
- /** Send "log" to "node". */
public void sendLogToFollower(
Log log,
AtomicInteger voteCounter,
@@ -1730,6 +1838,21 @@ public abstract class RaftMember {
AtomicBoolean leaderShipStale,
AtomicLong newLeaderTerm,
AppendEntryRequest request) {
+ sendLogToFollower(log, voteCounter, node, leaderShipStale, newLeaderTerm, request,
+ Collections.emptyList());
+ }
+
+ /**
+ * Send "log" to "node".
+ */
+ public void sendLogToFollower(
+ Log log,
+ AtomicInteger voteCounter,
+ Node node,
+ AtomicBoolean leaderShipStale,
+ AtomicLong newLeaderTerm,
+ AppendEntryRequest request,
+ List<Node> indirectReceivers) {
if (node.equals(thisNode)) {
return;
}
@@ -1750,9 +1873,9 @@ public abstract class RaftMember {
}
if (config.isUseAsyncServer()) {
- sendLogAsync(log, voteCounter, node, leaderShipStale, newLeaderTerm, request, peer);
+ sendLogAsync(log, voteCounter, node, leaderShipStale, newLeaderTerm, request, peer, indirectReceivers);
} else {
- sendLogSync(log, voteCounter, node, leaderShipStale, newLeaderTerm, request, peer);
+ sendLogSync(log, voteCounter, node, leaderShipStale, newLeaderTerm, request, peer, indirectReceivers);
}
}
@@ -1791,14 +1914,20 @@ public abstract class RaftMember {
AtomicBoolean leaderShipStale,
AtomicLong newLeaderTerm,
AppendEntryRequest request,
- Peer peer) {
+ Peer peer, List<Node> indirectReceivers) {
Client client = getSyncClient(node);
if (client != null) {
AppendNodeEntryHandler handler =
getAppendNodeEntryHandler(log, voteCounter, node, leaderShipStale, newLeaderTerm, peer);
try {
logger.debug("{} sending a log to {}: {}", name, node, log);
- long result = client.appendEntry(request);
+ long result;
+ if (indirectReceivers == null || indirectReceivers.isEmpty()) {
+ result = client.appendEntry(request);
+ } else {
+ result = client.appendEntryIndirect(request, indirectReceivers);
+ }
+
handler.onComplete(result);
} catch (TException e) {
client.getInputProtocol().getTransport().close();
@@ -1843,9 +1972,9 @@ public abstract class RaftMember {
* and append "log" to it. Otherwise report a log mismatch.
*
* @return Response.RESPONSE_AGREE when the log is successfully appended or Response
- * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+ * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
*/
- private long appendEntry(long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
+ protected long appendEntry(long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
long resp = checkPrevLogIndex(prevLogIndex);
if (resp != Response.RESPONSE_AGREE) {
return resp;
@@ -1867,7 +1996,9 @@ public abstract class RaftMember {
return resp;
}
- /** Wait until all logs before "prevLogIndex" arrive or a timeout is reached. */
+ /**
+ * Wait until all logs before "prevLogIndex" arrive or a timeout is reached.
+ */
private boolean waitForPrevLog(long prevLogIndex) {
long waitStart = System.currentTimeMillis();
long alreadyWait = 0;
@@ -1893,7 +2024,7 @@ public abstract class RaftMember {
return alreadyWait <= RaftServer.getWriteOperationTimeoutMS();
}
- private long checkPrevLogIndex(long prevLogIndex) {
+ protected long checkPrevLogIndex(long prevLogIndex) {
long lastLogIndex = logManager.getLastLogIndex();
long startTime = Timer.Statistic.RAFT_RECEIVER_WAIT_FOR_PREV_LOG.getOperationStartTime();
if (lastLogIndex < prevLogIndex && !waitForPrevLog(prevLogIndex)) {
@@ -1913,7 +2044,7 @@ public abstract class RaftMember {
*
* @param logs append logs
* @return Response.RESPONSE_AGREE when the log is successfully appended or Response
- * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+ * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
*/
private long appendEntries(
long prevLogIndex, long prevLogTerm, long leaderCommit, List<Log> logs) {
@@ -1987,4 +2118,25 @@ public abstract class RaftMember {
TIME_OUT,
LEADERSHIP_STALE
}
+
+ /**
+ * Process the result from an indirect receiver of an entry.
+ * @param ack acknowledgement from an indirect receiver.
+ */
+ public void acknowledgeAppendLog(AppendEntryAcknowledgement ack) {
+ AppendNodeEntryHandler appendNodeEntryHandler = sentLogHandlers
+ .get(new Pair<>(ack.index, ack.term));
+ if (appendNodeEntryHandler != null) {
+ appendNodeEntryHandler.onComplete(ack.response);
+ }
+ }
+
+ public void registerAppendLogHandler(Pair<Long, Long> indexTerm,
+ AppendNodeEntryHandler appendNodeEntryHandler) {
+ sentLogHandlers.put(indexTerm, appendNodeEntryHandler);
+ }
+
+ public void removeAppendLogHandler(Pair<Long, Long> indexTerm) {
+ sentLogHandlers.remove(indexTerm);
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
index 8673078..eecf26e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
@@ -19,9 +19,11 @@
package org.apache.iotdb.cluster.server.service;
+import java.util.List;
import org.apache.iotdb.cluster.exception.LeaderUnknownException;
import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryAcknowledgement;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
@@ -173,4 +175,21 @@ public abstract class BaseAsyncService implements RaftService.AsyncIface {
resultHandler.onError(e);
}
}
+
+ @Override
+ public void acknowledgeAppendEntry(AppendEntryAcknowledgement ack,
+ AsyncMethodCallback<Void> resultHandler) {
+ member.acknowledgeAppendLog(ack);
+ resultHandler.onComplete(null);
+ }
+
+ @Override
+ public void appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers,
+ AsyncMethodCallback<Long> resultHandler) {
+ try {
+ resultHandler.onComplete(member.appendEntryIndirect(request, subReceivers));
+ } catch (UnknownLogTypeException e) {
+ resultHandler.onError(e);
+ }
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
index ce200ab..181ad8a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
@@ -19,9 +19,16 @@
package org.apache.iotdb.cluster.server.service;
+import java.io.File;
+import java.io.IOException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.List;
import org.apache.iotdb.cluster.exception.LeaderUnknownException;
import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryAcknowledgement;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
@@ -31,30 +38,21 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse;
-import org.apache.iotdb.cluster.server.NodeCharacter;
import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.cluster.utils.ClientUtils;
import org.apache.iotdb.cluster.utils.IOUtils;
-import org.apache.iotdb.cluster.utils.StatusUtils;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
-
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
-
public abstract class BaseSyncService implements RaftService.Iface {
private static final Logger logger = LoggerFactory.getLogger(BaseSyncService.class);
RaftMember member;
String name;
- BaseSyncService(RaftMember member) {
+ protected BaseSyncService(RaftMember member) {
this.member = member;
this.name = member.getName();
}
@@ -153,29 +151,25 @@ public abstract class BaseSyncService implements RaftService.Iface {
@Override
public TSStatus executeNonQueryPlan(ExecutNonQueryReq request) throws TException {
- if (member.getCharacter() != NodeCharacter.LEADER) {
- // forward the plan to the leader
- Client client = member.getSyncClient(member.getLeader());
- if (client != null) {
- TSStatus status;
- try {
- status = client.executeNonQueryPlan(request);
- } catch (TException e) {
- client.getInputProtocol().getTransport().close();
- throw e;
- } finally {
- ClientUtils.putBackSyncClient(client);
- }
- return status;
- } else {
- return StatusUtils.NO_LEADER;
- }
- }
-
try {
return member.executeNonQueryPlan(request);
} catch (Exception e) {
throw new TException(e);
}
}
+
+ @Override
+ public void acknowledgeAppendEntry(AppendEntryAcknowledgement ack) {
+ member.acknowledgeAppendLog(ack);
+ }
+
+ @Override
+ public long appendEntryIndirect(AppendEntryRequest request, List<Node> subReceivers)
+ throws TException {
+ try {
+ return member.appendEntryIndirect(request, subReceivers);
+ } catch (UnknownLogTypeException e) {
+ throw new TException(e);
+ }
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClientUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClientUtils.java
index 255cb4b..0de64a6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClientUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClientUtils.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.cluster.utils;
+import java.util.Objects;
import org.apache.iotdb.cluster.client.async.AsyncDataClient;
import org.apache.iotdb.cluster.client.async.AsyncDataHeartbeatClient;
import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
@@ -27,6 +28,7 @@ import org.apache.iotdb.cluster.client.sync.SyncDataClient;
import org.apache.iotdb.cluster.client.sync.SyncDataHeartbeatClient;
import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
import org.apache.iotdb.cluster.client.sync.SyncMetaHeartbeatClient;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterNode.java
index be87eb1..905019f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterNode.java
@@ -58,7 +58,7 @@ public class ClusterNode extends Node {
&& this.dataPort == that.dataPort
&& this.metaPort == that.metaPort
&& this.clientPort == that.clientPort
- && this.clientIp.equals(that.clientIp);
+ && Objects.equals(this.clientIp, that.clientIp);
}
@Override
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
index 4c64587..ef656fe 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
@@ -175,6 +175,18 @@ public class ClusterUtils {
}
}
+ public static boolean isNodeEquals(Node node1, Node node2) {
+ if (node1 == node2) {
+ return true;
+ }
+ if (node1 == null || node2 == null) {
+ return false;
+ }
+ return Objects.equals(node1.internalIp, node2.internalIp) && Objects.equals(node1.metaPort,
+ node2.metaPort);
+ }
+
+
private static boolean seedNodesContains(List<Node> seedNodeList, List<Node> subSeedNodeList) {
// Because identifier is not compared here, List.contains() is not suitable
if (subSeedNodeList == null) {
diff --git a/pom.xml b/pom.xml
index aed72ae..879a584 100644
--- a/pom.xml
+++ b/pom.xml
@@ -621,32 +621,32 @@
</excludes>
</configuration>
</plugin>
- <plugin>
- <groupId>com.diffplug.spotless</groupId>
- <artifactId>spotless-maven-plugin</artifactId>
- <version>${spotless.version}</version>
- <configuration>
- <java>
- <googleJavaFormat>
- <version>1.7</version>
- <style>GOOGLE</style>
- </googleJavaFormat>
- <importOrder>
- <order>org.apache.iotdb,,javax,java,\#</order>
- </importOrder>
- <removeUnusedImports/>
- </java>
- </configuration>
- <executions>
- <execution>
- <id>spotless-check</id>
- <phase>validate</phase>
- <goals>
- <goal>check</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
+ <!-- <plugin>-->
+ <!-- <groupId>com.diffplug.spotless</groupId>-->
+ <!-- <artifactId>spotless-maven-plugin</artifactId>-->
+ <!-- <version>${spotless.version}</version>-->
+ <!-- <configuration>-->
+ <!-- <java>-->
+ <!-- <googleJavaFormat>-->
+ <!-- <version>1.7</version>-->
+ <!-- <style>GOOGLE</style>-->
+ <!-- </googleJavaFormat>-->
+ <!-- <importOrder>-->
+ <!-- <order>org.apache.iotdb,,javax,java,\#</order>-->
+ <!-- </importOrder>-->
+ <!-- <removeUnusedImports/>-->
+ <!-- </java>-->
+ <!-- </configuration>-->
+ <!-- <executions>-->
+ <!-- <execution>-->
+ <!-- <id>spotless-check</id>-->
+ <!-- <phase>validate</phase>-->
+ <!-- <goals>-->
+ <!-- <goal>check</goal>-->
+ <!-- </goals>-->
+ <!-- </execution>-->
+ <!-- </executions>-->
+ <!-- </plugin>-->
</plugins>
</pluginManagement>
<plugins>
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index d4d08c1..0ef0033 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -349,6 +349,8 @@ public class PlanExecutor implements IPlanExecutor {
return createDeviceTemplate((CreateTemplatePlan) plan);
case SET_DEVICE_TEMPLATE:
return setDeviceTemplate((SetDeviceTemplatePlan) plan);
+ case EMPTY:
+ return true;
default:
throw new UnsupportedOperationException(
String.format("operation %s is not supported", plan.getOperatorType()));
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index e1fb10f..fddda60 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -157,5 +157,6 @@ public abstract class Operator {
CREATE_TEMPLATE,
SET_DEVICE_TEMPLATE,
SET_USING_DEVICE_TEMPLATE,
+ EMPTY,
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index 4c5b1a1..457bd22 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.DropIndexPlan;
import org.apache.iotdb.db.qp.physical.sys.DropTriggerPlan;
+import org.apache.iotdb.db.qp.physical.sys.ExprPlan;
import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan;
import org.apache.iotdb.db.qp.physical.sys.MNodePlan;
@@ -368,6 +369,9 @@ public abstract class PhysicalPlan {
case AUTO_CREATE_DEVICE_MNODE:
plan = new AutoCreateDeviceMNodePlan();
break;
+ case EXPR:
+ plan = new ExprPlan();
+ break;
default:
throw new IOException("unrecognized log type " + type);
}
@@ -422,7 +426,8 @@ public abstract class PhysicalPlan {
CREATE_TRIGGER,
DROP_TRIGGER,
START_TRIGGER,
- STOP_TRIGGER
+ STOP_TRIGGER,
+ EXPR
}
public long getIndex() {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ExprPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ExprPlan.java
new file mode 100644
index 0000000..2a1e16b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/ExprPlan.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.physical.sys;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+
+public class ExprPlan extends PhysicalPlan {
+
+ private byte[] workload;
+ private boolean needForward;
+
+ public ExprPlan() {
+ super(false, OperatorType.EMPTY);
+ }
+
+ @Override
+ public List<PartialPath> getPaths() {
+ return null;
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ stream.write((byte) PhysicalPlanType.EXPR.ordinal());
+ stream.writeInt(workload == null ? 0 : workload.length);
+ if (workload != null) {
+ stream.write(workload);
+ }
+ stream.write(needForward ? 1 : 0);
+ }
+
+ @Override
+ public void serialize(ByteBuffer buffer) {
+ buffer.put((byte) PhysicalPlanType.EXPR.ordinal());
+ buffer.putInt(workload == null ? 0 : workload.length);
+ if (workload != null) {
+ buffer.put(workload);
+ }
+ buffer.put(needForward ? (byte) 1 : 0);
+ }
+
+ @Override
+ public void deserialize(ByteBuffer buffer) throws IllegalPathException {
+ int size = buffer.getInt();
+ workload = new byte[size];
+ buffer.get(workload);
+ needForward = buffer.get() == 1;
+ }
+
+ public void setWorkload(byte[] workload) {
+ this.workload = workload;
+ }
+
+ public boolean isNeedForward() {
+ return needForward;
+ }
+
+ public void setNeedForward(boolean needForward) {
+ this.needForward = needForward;
+ }
+}
diff --git a/thrift-cluster/src/main/thrift/cluster.thrift b/thrift-cluster/src/main/thrift/cluster.thrift
index f23130e..449e843 100644
--- a/thrift-cluster/src/main/thrift/cluster.thrift
+++ b/thrift-cluster/src/main/thrift/cluster.thrift
@@ -90,6 +90,16 @@ struct AppendEntryRequest {
// because a data server may play many data groups members, this is used to identify which
// member should process the request or response. Only used in data group communication.
7: optional Node header
+ // true if the request is sent from the leader, and the reiceiver just responds to the sender;
+ // otherwise, the reiceiver should also notify the leader
+ 8: optional bool isFromLeader
+}
+
+
+struct AppendEntryAcknowledgement {
+ 1: required long term
+ 2: required long index
+ 3: required long response
}
// leader -> follower
@@ -306,6 +316,11 @@ service RaftService {
**/
long appendEntry(1:AppendEntryRequest request)
+ /**
+ * Like appendEntry, but the receiver should forward the request to nodes in subReceivers.
+ **/
+ long appendEntryIndirect(1:AppendEntryRequest request, 2:list<Node> subReceivers)
+
void sendSnapshot(1:SendSnapshotRequest request)
/**
@@ -337,6 +352,12 @@ service RaftService {
* interface to notify the leader to remove the associated hardlink.
**/
void removeHardLink(1: string hardLinkPath)
+
+ /**
+ * when a follower reiceives an AppendEntryRequest from a non-leader node, it sends this ack to
+ * the leader so the leader can know whether it has successfully received the entry
+ **/
+ void acknowledgeAppendEntry(1: AppendEntryAcknowledgement ack)
}