You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/07/05 06:21:41 UTC
[iotdb] branch expr_catch_up updated: tmp save
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch expr_catch_up
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/expr_catch_up by this push:
new f586cbf959 tmp save
f586cbf959 is described below
commit f586cbf959fd5347268238c678db6c154d8e81d1
Author: jt <jt...@163.com>
AuthorDate: Tue Jul 5 14:21:31 2022 +0800
tmp save
---
.../cluster/client/sync/SyncClientAdaptor.java | 2 --
.../iotdb/cluster/coordinator/Coordinator.java | 12 ++++++----
.../iotdb/cluster/impl/PlanBasedStateMachine.java | 22 +++++++----------
.../org/apache/iotdb/cluster/log/LogParser.java | 2 +-
.../cluster/log/applier/AsyncDataLogApplier.java | 4 ++--
.../iotdb/cluster/log/applier/BaseApplier.java | 28 ----------------------
.../iotdb/cluster/log/applier/DataLogApplier.java | 4 ++--
.../iotdb/cluster/log/applier/MetaLogApplier.java | 5 ++--
.../iotdb/cluster/log/logtypes/RequestLog.java | 8 ++++---
.../manage/FilePartitionedSnapshotLogManager.java | 11 ++++++---
.../log/manage/MetaSingleSnapshotLogManager.java | 13 ++++++----
.../log/manage/PartitionedSnapshotLogManager.java | 3 ++-
.../iotdb/cluster/log/manage/RaftLogManager.java | 10 ++++----
.../log/sequencing/AsynchronousSequencer.java | 5 ++--
.../log/sequencing/SynchronousSequencer.java | 5 +++-
.../handlers/forwarder/ForwardPlanHandler.java | 3 ++-
.../cluster/server/member/DataGroupMember.java | 13 ++++++----
.../cluster/server/member/MetaGroupMember.java | 3 ---
.../iotdb/cluster/server/member/RaftMember.java | 4 +---
.../cluster/server/service/BaseAsyncService.java | 8 +++----
.../cluster/server/service/BaseSyncService.java | 4 ++--
.../apache/iotdb/cluster/utils/PartitionUtils.java | 1 -
.../cluster/common/TestPartitionedLogManager.java | 6 +++--
.../apache/iotdb/cluster/log/LogParserTest.java | 2 +-
.../cluster/log/applier/MetaLogApplierTest.java | 2 +-
25 files changed, 83 insertions(+), 97 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
index 29eee1ea21..e333cbcb7b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
@@ -52,12 +52,10 @@ import org.apache.iotdb.cluster.server.handlers.caller.PullMeasurementSchemaHand
import org.apache.iotdb.cluster.server.handlers.caller.PullSnapshotHandler;
import org.apache.iotdb.cluster.server.handlers.caller.PullTimeseriesSchemaHandler;
import org.apache.iotdb.cluster.server.handlers.forwarder.ForwardPlanHandler;
-import org.apache.iotdb.cluster.utils.PlanSerializer;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.SerializeUtils;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.tsfile.read.filter.TimeFilter;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
index 83824e9561..17d2607c2d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
@@ -47,7 +47,6 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.physical.BatchPlan;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletsPlan;
@@ -139,8 +138,9 @@ public class Coordinator {
return StatusUtils.getStatus(StatusUtils.UNSUPPORTED_OPERATION, e.getMessage());
}
} else {
- result = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
- "Unsupported request: " + request);
+ result =
+ RpcUtils.getStatus(
+ TSStatusCode.EXECUTE_STATEMENT_ERROR, "Unsupported request: " + request);
}
Timer.Statistic.COORDINATOR_EXECUTE_NON_QUERY.calOperationCostTimeFromStart(startTime);
return result;
@@ -291,7 +291,8 @@ public class Coordinator {
status =
metaGroupMember
.getLocalDataMember(partitionGroup.getHeader())
- .executeRequest(plan).getStatus();
+ .executeRequest(plan)
+ .getStatus();
logger.debug(
"Execute {} in a local group of {} with status {}",
plan,
@@ -471,7 +472,8 @@ public class Coordinator {
result =
metaGroupMember
.getLocalDataMember(entry.getValue().getHeader())
- .executeRequest(entry.getKey()).getStatus();
+ .executeRequest(entry.getKey())
+ .getStatus();
logger.debug(
"Execute {} in a local group of {}, {}",
entry.getKey(),
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/impl/PlanBasedStateMachine.java b/cluster/src/main/java/org/apache/iotdb/cluster/impl/PlanBasedStateMachine.java
index b98652e5de..f8b237e2dc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/impl/PlanBasedStateMachine.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/impl/PlanBasedStateMachine.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.cluster.impl;
-import java.io.File;
import org.apache.iotdb.cluster.query.ClusterPlanExecutor;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.cluster.utils.StatusUtils;
@@ -31,9 +30,12 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+
public class PlanBasedStateMachine implements IStateMachine {
private static final Logger logger = LoggerFactory.getLogger(PlanBasedStateMachine.class);
@@ -41,8 +43,7 @@ public class PlanBasedStateMachine implements IStateMachine {
private ClusterPlanExecutor planExecutor;
private MetaGroupMember metaGroupMember;
- public PlanBasedStateMachine() {
- }
+ public PlanBasedStateMachine() {}
public PlanBasedStateMachine(MetaGroupMember metaGroupMember) {
this.metaGroupMember = metaGroupMember;
@@ -58,23 +59,20 @@ public class PlanBasedStateMachine implements IStateMachine {
}
@Override
- public void stop() {
-
- }
+ public void stop() {}
@Override
public TSStatus write(IConsensusRequest request) {
if (!(request instanceof PhysicalPlan)) {
- return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR,
- "Not supported request: " + request);
+ return StatusUtils.getStatus(
+ StatusUtils.EXECUTE_STATEMENT_ERROR, "Not supported request: " + request);
}
try {
planExecutor.processNonQuery(((PhysicalPlan) request));
return StatusUtils.OK;
} catch (QueryProcessException | StorageGroupNotSetException | StorageEngineException e) {
logger.warn("Plan execution error", e);
- return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR,
- e.getMessage());
+ return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, e.getMessage());
}
}
@@ -89,9 +87,7 @@ public class PlanBasedStateMachine implements IStateMachine {
}
@Override
- public void loadSnapshot(File latestSnapshotRootDir) {
-
- }
+ public void loadSnapshot(File latestSnapshotRootDir) {}
public void setMetaGroupMember(MetaGroupMember metaGroupMember) {
this.metaGroupMember = metaGroupMember;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
index b434873f1a..152cea5d10 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
@@ -26,8 +26,8 @@ import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
import org.apache.iotdb.cluster.log.logtypes.LargeTestLog;
-import org.apache.iotdb.cluster.log.logtypes.RequestLog;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
index 127a590b68..c2e6677fc2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
@@ -116,9 +116,9 @@ public class AsyncDataLogApplier implements LogApplier {
private PartialPath getLogKey(Log log) throws StorageGroupNotSetException {
// we can only apply some kinds of plans in parallel, for other logs, we must wait until all
// previous logs are applied, or the order of deletions and insertions may get wrong
- if (log instanceof RequestLog) {
+ if (log instanceof RequestLog && ((RequestLog) log).getRequest() instanceof PhysicalPlan) {
RequestLog requestLog = (RequestLog) log;
- PhysicalPlan plan = requestLog.getRequest();
+ PhysicalPlan plan = ((PhysicalPlan) requestLog.getRequest());
// this plan only affects one sg, so we can run it with other plans in parallel
return getPlanKey(plan);
} else if (log instanceof CloseFileLog) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
index 876be2bfde..585084eed7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
@@ -19,37 +19,9 @@
package org.apache.iotdb.cluster.log.applier;
-import org.apache.iotdb.cluster.exception.CheckConsistencyException;
import org.apache.iotdb.cluster.log.LogApplier;
-import org.apache.iotdb.cluster.metadata.MetaPuller;
-import org.apache.iotdb.cluster.query.ClusterPlanExecutor;
-import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
-import org.apache.iotdb.cluster.server.member.DataGroupMember;
-import org.apache.iotdb.cluster.server.member.MetaGroupMember;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.IStateMachine;
-import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.BatchProcessException;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.metadata.PathNotExistException;
-import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
-import org.apache.iotdb.db.exception.metadata.template.UndefinedTemplateException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.qp.physical.BatchPlan;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
-import org.apache.iotdb.db.utils.SchemaUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
/** BaseApplier use PlanExecutor to execute PhysicalPlans. */
abstract class BaseApplier implements LogApplier {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
index 266e61c332..30a718d21b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
import org.apache.iotdb.rpc.TSStatusCode;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,8 +50,7 @@ public class DataLogApplier extends BaseApplier {
protected DataGroupMember dataGroupMember;
- public DataLogApplier(DataGroupMember dataGroupMember,
- IStateMachine stateMachine) {
+ public DataLogApplier(DataGroupMember dataGroupMember, IStateMachine stateMachine) {
super(stateMachine);
this.dataGroupMember = dataGroupMember;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
index 1259c0df84..fa4c70a7e7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
@@ -24,13 +24,12 @@ import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
-import org.apache.iotdb.cluster.log.logtypes.RequestLog;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
import org.apache.iotdb.cluster.server.NodeCharacter;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
-
import org.apache.iotdb.consensus.IStateMachine;
-import org.apache.iotdb.db.mpp.execution.StateMachine;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RequestLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RequestLog.java
index 4f797d459f..8f66c52763 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RequestLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RequestLog.java
@@ -52,7 +52,9 @@ public class RequestLog extends Log {
public int getDefaultBufferSize() {
if (request instanceof DummyPlan) {
int workloadSize =
- ((DummyPlan) request).getWorkload() == null ? 0 : ((DummyPlan) request).getWorkload().length;
+ ((DummyPlan) request).getWorkload() == null
+ ? 0
+ : ((DummyPlan) request).getWorkload().length;
return workloadSize + 512;
}
return DEFAULT_BUFFER_SIZE;
@@ -68,8 +70,8 @@ public class RequestLog extends Log {
dataOutputStream.writeLong(getCurrLogTerm());
ByteBuffer byteBuffer = request.serializeToByteBuffer();
- dataOutputStream.write(byteBuffer.array(), byteBuffer.arrayOffset(),
- byteBuffer.limit() - byteBuffer.position());
+ dataOutputStream.write(
+ byteBuffer.array(), byteBuffer.arrayOffset(), byteBuffer.limit() - byteBuffer.position());
} catch (IOException e) {
// unreachable
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
index 656def1b65..1774a0c3ac 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
@@ -38,7 +38,6 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.mpp.execution.StateMachine;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
@@ -72,8 +71,14 @@ public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogMan
Node header,
Node thisNode,
DataGroupMember dataGroupMember) {
- super(createLogApplier(dataGroupMember, stateMachine), partitionTable, header, thisNode, Factory.INSTANCE,
- dataGroupMember, stateMachine);
+ super(
+ createLogApplier(dataGroupMember, stateMachine),
+ partitionTable,
+ header,
+ thisNode,
+ Factory.INSTANCE,
+ dataGroupMember,
+ stateMachine);
}
private static LogApplier createLogApplier(
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
index 32c7d19ce5..5d5d5bfe28 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
@@ -19,8 +19,6 @@
package org.apache.iotdb.cluster.log.manage;
-import java.io.IOException;
-import java.util.Map;
import org.apache.iotdb.cluster.log.Snapshot;
import org.apache.iotdb.cluster.log.applier.MetaLogApplier;
import org.apache.iotdb.cluster.log.manage.serializable.SyncLogDequeSerializer;
@@ -36,9 +34,13 @@ import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.metadata.template.TemplateManager;
import org.apache.iotdb.db.service.IoTDB;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.Map;
+
/** MetaSingleSnapshotLogManager provides a MetaSimpleSnapshot as snapshot. */
public class MetaSingleSnapshotLogManager extends RaftLogManager {
@@ -52,8 +54,11 @@ public class MetaSingleSnapshotLogManager extends RaftLogManager {
private long term;
public MetaSingleSnapshotLogManager(IStateMachine stateMachine, MetaGroupMember metaGroupMember) {
- super(new SyncLogDequeSerializer(0), new MetaLogApplier(metaGroupMember, stateMachine),
- metaGroupMember.getName(), stateMachine);
+ super(
+ new SyncLogDequeSerializer(0),
+ new MetaLogApplier(metaGroupMember, stateMachine),
+ metaGroupMember.getName(),
+ stateMachine);
this.metaGroupMember = metaGroupMember;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
index 8e2c7f46c0..5c7133b928 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
@@ -74,7 +74,8 @@ public abstract class PartitionedSnapshotLogManager<T extends Snapshot> extends
Node header,
Node thisNode,
SnapshotFactory<T> factory,
- DataGroupMember dataGroupMember, IStateMachine stateMachine) {
+ DataGroupMember dataGroupMember,
+ IStateMachine stateMachine) {
super(
new SyncLogDequeSerializer(header.nodeIdentifier),
logApplier,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index ec2dbcbd6e..a3ab0bb2be 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -31,13 +31,12 @@ import org.apache.iotdb.cluster.log.LogApplier;
import org.apache.iotdb.cluster.log.Snapshot;
import org.apache.iotdb.cluster.log.StableEntryManager;
import org.apache.iotdb.cluster.log.manage.serializable.LogManagerMeta;
-import org.apache.iotdb.cluster.log.manage.serializable.SyncLogDequeSerializer;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.utils.TestOnly;
-
import org.apache.iotdb.consensus.IStateMachine;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -115,8 +114,11 @@ public abstract class RaftLogManager {
protected IStateMachine stateMachine;
- protected RaftLogManager(StableEntryManager stableEntryManager, LogApplier applier, String name
- , IStateMachine stateMachine) {
+ protected RaftLogManager(
+ StableEntryManager stableEntryManager,
+ LogApplier applier,
+ String name,
+ IStateMachine stateMachine) {
this.logApplier = applier;
this.name = name;
this.stateMachine = stateMachine;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
index b3fdb6d0af..465c4f0dd5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -104,8 +105,8 @@ public class AsynchronousSequencer implements LogSequencer {
log.setSequenceStartTime(sequenceStartTime);
log.setCurrLogTerm(member.getTerm().get());
log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
- if (log instanceof RequestLog) {
- ((RequestLog) log).getRequest().setIndex(log.getCurrLogIndex());
+ if (log instanceof RequestLog && ((RequestLog) log).getRequest() instanceof PhysicalPlan) {
+ ((PhysicalPlan) ((RequestLog) log).getRequest()).setIndex(log.getCurrLogIndex());
}
startTime = Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.getOperationStartTime();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
index bbaf8770ce..3f337274af 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.LogPlan;
import java.util.concurrent.TimeUnit;
@@ -71,8 +72,10 @@ public class SynchronousSequencer implements LogSequencer {
// if the log contains a physical plan which is not a LogPlan, assign the same index to
// the plan so the state machine can be bridged with the consensus
if (log instanceof RequestLog
+ && (((RequestLog) log).getRequest() instanceof PhysicalPlan)
&& !(((RequestLog) log).getRequest() instanceof LogPlan)) {
- ((RequestLog) log).getRequest().setIndex(logManager.getLastLogIndex() + 1);
+ ((PhysicalPlan) ((RequestLog) log).getRequest())
+ .setIndex(logManager.getLastLogIndex() + 1);
}
log.setCurrLogTerm(member.getTerm().get());
log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/ForwardPlanHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/ForwardPlanHandler.java
index 44b317d505..490be699c1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/ForwardPlanHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/ForwardPlanHandler.java
@@ -38,7 +38,8 @@ public class ForwardPlanHandler implements AsyncMethodCallback<TSStatus> {
private AtomicReference<TSStatus> result;
private Node node;
- public ForwardPlanHandler(AtomicReference<TSStatus> result, IConsensusRequest request, Node node) {
+ public ForwardPlanHandler(
+ AtomicReference<TSStatus> result, IConsensusRequest request, Node node) {
this.result = result;
this.request = request;
this.node = node;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 669979bd24..95c569db50 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -207,7 +207,10 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
logSequencer = SEQUENCER_FACTORY.create(this, logManager);
}
- DataGroupMember(Node thisNode, PartitionGroup nodes, MetaGroupMember metaGroupMember,
+ DataGroupMember(
+ Node thisNode,
+ PartitionGroup nodes,
+ MetaGroupMember metaGroupMember,
IStateMachine stateMachine) {
// The name is used in JMX, so we have to avoid to use "(" "," "=" ")"
super(
@@ -344,8 +347,8 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
}
public DataGroupMember create(Node thisNode, PartitionGroup partitionGroup) {
- return new DataGroupMember(thisNode, partitionGroup, metaGroupMember,
- new PlanBasedStateMachine(metaGroupMember));
+ return new DataGroupMember(
+ thisNode, partitionGroup, metaGroupMember, new PlanBasedStateMachine(metaGroupMember));
}
}
@@ -735,8 +738,8 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
}
} catch (MetadataException | CheckConsistencyException ex) {
logger.error("{}: Cannot auto-create timeseries for {}", name, request, e);
- return new ConsensusWriteResponse(null,
- StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, ex.getMessage()));
+ return new ConsensusWriteResponse(
+ null, StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, ex.getMessage()));
}
if (hasCreated) {
return executeRequest(request);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index a786a86d62..7f16f6fb78 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -36,9 +36,7 @@ import org.apache.iotdb.cluster.exception.LogExecutionException;
import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
import org.apache.iotdb.cluster.exception.SnapshotInstallationException;
import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
-import org.apache.iotdb.cluster.log.LogApplier;
import org.apache.iotdb.cluster.log.VotingLog;
-import org.apache.iotdb.cluster.log.applier.MetaLogApplier;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
import org.apache.iotdb.cluster.log.manage.MetaSingleSnapshotLogManager;
@@ -88,7 +86,6 @@ import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.TimeValuePairUtils;
import org.apache.iotdb.db.utils.TimeValuePairUtils.Intervals;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index b2c4d92219..bb40aea9ae 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -79,7 +79,6 @@ import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.cluster.utils.ClientUtils;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.cluster.utils.IOUtils;
-import org.apache.iotdb.cluster.utils.PlanSerializer;
import org.apache.iotdb.cluster.utils.StatusUtils;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
@@ -101,8 +100,6 @@ import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.metadata.template.DuplicatedTemplateException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.sys.DummyPlan;
@@ -190,6 +187,7 @@ public abstract class RaftMember implements RaftMemberMBean {
ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
/** the name of the member, to distinguish several members in the logs. */
ConsensusGroupId groupId;
+
String name;
/** to choose nodes to send request of joining cluster randomly. */
Random random = new Random();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
index 2a83adc28e..e3adae4230 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
@@ -38,17 +38,17 @@ import org.apache.iotdb.cluster.utils.IOUtils;
import org.apache.iotdb.cluster.utils.StatusUtils;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public abstract class BaseAsyncService implements RaftService.AsyncIface {
@@ -174,7 +174,7 @@ public abstract class BaseAsyncService implements RaftService.AsyncIface {
// process the plan locally
PhysicalPlan plan = PhysicalPlan.Factory.create(request.planBytes);
- TSStatus status = member.executeRequest(plan);
+ TSStatus status = member.executeRequest(plan).getStatus();
logger.debug("{}: Received a plan {}, executed answer: {}", name, plan, status);
resultHandler.onComplete(
StatusUtils.getStatus(
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
index 8119eceda5..c41eea3ea0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
@@ -36,8 +36,8 @@ import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.cluster.utils.ClientUtils;
import org.apache.iotdb.cluster.utils.IOUtils;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -157,7 +157,7 @@ public abstract class BaseSyncService implements RaftService.Iface {
// process the plan locally
PhysicalPlan plan = PhysicalPlan.Factory.create(request.planBytes);
- TSStatus answer = member.executeRequest(plan);
+ TSStatus answer = member.executeRequest(plan).getStatus();
logger.debug("{}: Received a plan {}, executed answer: {}", name, plan, answer);
return answer;
} catch (Exception e) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
index 55761cd1eb..d68c2d1597 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionedLogManager.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionedLogManager.java
index 2b919a32f9..8a4c5eb3cf 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionedLogManager.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionedLogManager.java
@@ -35,7 +35,8 @@ public class TestPartitionedLogManager extends PartitionedSnapshotLogManager {
new Node("localhost", 30001, 1, Constants.RPC_PORT, 6667, "localhost"),
null,
null,
- null, stateMachine);
+ null,
+ stateMachine);
}
public TestPartitionedLogManager(
@@ -46,7 +47,8 @@ public class TestPartitionedLogManager extends PartitionedSnapshotLogManager {
header,
new Node("localhost", 30001, 1, 40001, Constants.RPC_PORT, "localhost"),
factory,
- null, stateMachine);
+ null,
+ stateMachine);
}
@Override
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
index 768ea092ca..bb9c9fa9a0 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
@@ -24,8 +24,8 @@ import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
-import org.apache.iotdb.cluster.log.logtypes.RequestLog;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
import org.apache.iotdb.cluster.utils.PlanSerializer;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
index d8ebb4f093..7bb8130f44 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
@@ -25,8 +25,8 @@ import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.cluster.coordinator.Coordinator;
import org.apache.iotdb.cluster.log.LogApplier;
import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
-import org.apache.iotdb.cluster.log.logtypes.RequestLog;
import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.utils.Constants;
import org.apache.iotdb.commons.exception.MetadataException;