You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/07/05 06:21:41 UTC

[iotdb] branch expr_catch_up updated: tmp save

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch expr_catch_up
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/expr_catch_up by this push:
     new f586cbf959 tmp save
f586cbf959 is described below

commit f586cbf959fd5347268238c678db6c154d8e81d1
Author: jt <jt...@163.com>
AuthorDate: Tue Jul 5 14:21:31 2022 +0800

    tmp save
---
 .../cluster/client/sync/SyncClientAdaptor.java     |  2 --
 .../iotdb/cluster/coordinator/Coordinator.java     | 12 ++++++----
 .../iotdb/cluster/impl/PlanBasedStateMachine.java  | 22 +++++++----------
 .../org/apache/iotdb/cluster/log/LogParser.java    |  2 +-
 .../cluster/log/applier/AsyncDataLogApplier.java   |  4 ++--
 .../iotdb/cluster/log/applier/BaseApplier.java     | 28 ----------------------
 .../iotdb/cluster/log/applier/DataLogApplier.java  |  4 ++--
 .../iotdb/cluster/log/applier/MetaLogApplier.java  |  5 ++--
 .../iotdb/cluster/log/logtypes/RequestLog.java     |  8 ++++---
 .../manage/FilePartitionedSnapshotLogManager.java  | 11 ++++++---
 .../log/manage/MetaSingleSnapshotLogManager.java   | 13 ++++++----
 .../log/manage/PartitionedSnapshotLogManager.java  |  3 ++-
 .../iotdb/cluster/log/manage/RaftLogManager.java   | 10 ++++----
 .../log/sequencing/AsynchronousSequencer.java      |  5 ++--
 .../log/sequencing/SynchronousSequencer.java       |  5 +++-
 .../handlers/forwarder/ForwardPlanHandler.java     |  3 ++-
 .../cluster/server/member/DataGroupMember.java     | 13 ++++++----
 .../cluster/server/member/MetaGroupMember.java     |  3 ---
 .../iotdb/cluster/server/member/RaftMember.java    |  4 +---
 .../cluster/server/service/BaseAsyncService.java   |  8 +++----
 .../cluster/server/service/BaseSyncService.java    |  4 ++--
 .../apache/iotdb/cluster/utils/PartitionUtils.java |  1 -
 .../cluster/common/TestPartitionedLogManager.java  |  6 +++--
 .../apache/iotdb/cluster/log/LogParserTest.java    |  2 +-
 .../cluster/log/applier/MetaLogApplierTest.java    |  2 +-
 25 files changed, 83 insertions(+), 97 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
index 29eee1ea21..e333cbcb7b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
@@ -52,12 +52,10 @@ import org.apache.iotdb.cluster.server.handlers.caller.PullMeasurementSchemaHand
 import org.apache.iotdb.cluster.server.handlers.caller.PullSnapshotHandler;
 import org.apache.iotdb.cluster.server.handlers.caller.PullTimeseriesSchemaHandler;
 import org.apache.iotdb.cluster.server.handlers.forwarder.ForwardPlanHandler;
-import org.apache.iotdb.cluster.utils.PlanSerializer;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.SerializeUtils;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.tsfile.read.filter.TimeFilter;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
index 83824e9561..17d2607c2d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
@@ -47,7 +47,6 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.BatchPlan;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletsPlan;
@@ -139,8 +138,9 @@ public class Coordinator {
         return StatusUtils.getStatus(StatusUtils.UNSUPPORTED_OPERATION, e.getMessage());
       }
     } else {
-      result = RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR,
-          "Unsupported request: " + request);
+      result =
+          RpcUtils.getStatus(
+              TSStatusCode.EXECUTE_STATEMENT_ERROR, "Unsupported request: " + request);
     }
     Timer.Statistic.COORDINATOR_EXECUTE_NON_QUERY.calOperationCostTimeFromStart(startTime);
     return result;
@@ -291,7 +291,8 @@ public class Coordinator {
         status =
             metaGroupMember
                 .getLocalDataMember(partitionGroup.getHeader())
-                .executeRequest(plan).getStatus();
+                .executeRequest(plan)
+                .getStatus();
         logger.debug(
             "Execute {} in a local group of {} with status {}",
             plan,
@@ -471,7 +472,8 @@ public class Coordinator {
       result =
           metaGroupMember
               .getLocalDataMember(entry.getValue().getHeader())
-              .executeRequest(entry.getKey()).getStatus();
+              .executeRequest(entry.getKey())
+              .getStatus();
       logger.debug(
           "Execute {} in a local group of {}, {}",
           entry.getKey(),
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/impl/PlanBasedStateMachine.java b/cluster/src/main/java/org/apache/iotdb/cluster/impl/PlanBasedStateMachine.java
index b98652e5de..f8b237e2dc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/impl/PlanBasedStateMachine.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/impl/PlanBasedStateMachine.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.cluster.impl;
 
-import java.io.File;
 import org.apache.iotdb.cluster.query.ClusterPlanExecutor;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.cluster.utils.StatusUtils;
@@ -31,9 +30,12 @@ import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+
 public class PlanBasedStateMachine implements IStateMachine {
 
   private static final Logger logger = LoggerFactory.getLogger(PlanBasedStateMachine.class);
@@ -41,8 +43,7 @@ public class PlanBasedStateMachine implements IStateMachine {
   private ClusterPlanExecutor planExecutor;
   private MetaGroupMember metaGroupMember;
 
-  public PlanBasedStateMachine() {
-  }
+  public PlanBasedStateMachine() {}
 
   public PlanBasedStateMachine(MetaGroupMember metaGroupMember) {
     this.metaGroupMember = metaGroupMember;
@@ -58,23 +59,20 @@ public class PlanBasedStateMachine implements IStateMachine {
   }
 
   @Override
-  public void stop() {
-
-  }
+  public void stop() {}
 
   @Override
   public TSStatus write(IConsensusRequest request) {
     if (!(request instanceof PhysicalPlan)) {
-      return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR,
-          "Not supported request: " + request);
+      return StatusUtils.getStatus(
+          StatusUtils.EXECUTE_STATEMENT_ERROR, "Not supported request: " + request);
     }
     try {
       planExecutor.processNonQuery(((PhysicalPlan) request));
       return StatusUtils.OK;
     } catch (QueryProcessException | StorageGroupNotSetException | StorageEngineException e) {
       logger.warn("Plan execution error", e);
-      return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR,
-          e.getMessage());
+      return StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, e.getMessage());
     }
   }
 
@@ -89,9 +87,7 @@ public class PlanBasedStateMachine implements IStateMachine {
   }
 
   @Override
-  public void loadSnapshot(File latestSnapshotRootDir) {
-
-  }
+  public void loadSnapshot(File latestSnapshotRootDir) {}
 
   public void setMetaGroupMember(MetaGroupMember metaGroupMember) {
     this.metaGroupMember = metaGroupMember;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
index b434873f1a..152cea5d10 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogParser.java
@@ -26,8 +26,8 @@ import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
 import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
 import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
 import org.apache.iotdb.cluster.log.logtypes.LargeTestLog;
-import org.apache.iotdb.cluster.log.logtypes.RequestLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
index 127a590b68..c2e6677fc2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
@@ -116,9 +116,9 @@ public class AsyncDataLogApplier implements LogApplier {
   private PartialPath getLogKey(Log log) throws StorageGroupNotSetException {
     // we can only apply some kinds of plans in parallel, for other logs, we must wait until all
     // previous logs are applied, or the order of deletions and insertions may get wrong
-    if (log instanceof RequestLog) {
+    if (log instanceof RequestLog && ((RequestLog) log).getRequest() instanceof PhysicalPlan) {
       RequestLog requestLog = (RequestLog) log;
-      PhysicalPlan plan = requestLog.getRequest();
+      PhysicalPlan plan = ((PhysicalPlan) requestLog.getRequest());
       // this plan only affects one sg, so we can run it with other plans in parallel
       return getPlanKey(plan);
     } else if (log instanceof CloseFileLog) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
index 876be2bfde..585084eed7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
@@ -19,37 +19,9 @@
 
 package org.apache.iotdb.cluster.log.applier;
 
-import org.apache.iotdb.cluster.exception.CheckConsistencyException;
 import org.apache.iotdb.cluster.log.LogApplier;
-import org.apache.iotdb.cluster.metadata.MetaPuller;
-import org.apache.iotdb.cluster.query.ClusterPlanExecutor;
-import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
-import org.apache.iotdb.cluster.server.member.DataGroupMember;
-import org.apache.iotdb.cluster.server.member.MetaGroupMember;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.IStateMachine;
-import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.BatchProcessException;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.metadata.PathNotExistException;
-import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
-import org.apache.iotdb.db.exception.metadata.template.UndefinedTemplateException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.qp.physical.BatchPlan;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
-import org.apache.iotdb.db.utils.SchemaUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
 
 /** BaseApplier use PlanExecutor to execute PhysicalPlans. */
 abstract class BaseApplier implements LogApplier {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
index 266e61c332..30a718d21b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
 import org.apache.iotdb.rpc.TSStatusCode;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,8 +50,7 @@ public class DataLogApplier extends BaseApplier {
 
   protected DataGroupMember dataGroupMember;
 
-  public DataLogApplier(DataGroupMember dataGroupMember,
-      IStateMachine stateMachine) {
+  public DataLogApplier(DataGroupMember dataGroupMember, IStateMachine stateMachine) {
     super(stateMachine);
     this.dataGroupMember = dataGroupMember;
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
index 1259c0df84..fa4c70a7e7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java
@@ -24,13 +24,12 @@ import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
 import org.apache.iotdb.cluster.log.logtypes.FragmentedLog;
-import org.apache.iotdb.cluster.log.logtypes.RequestLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
 import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
-
 import org.apache.iotdb.consensus.IStateMachine;
-import org.apache.iotdb.db.mpp.execution.StateMachine;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RequestLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RequestLog.java
index 4f797d459f..8f66c52763 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RequestLog.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RequestLog.java
@@ -52,7 +52,9 @@ public class RequestLog extends Log {
   public int getDefaultBufferSize() {
     if (request instanceof DummyPlan) {
       int workloadSize =
-          ((DummyPlan) request).getWorkload() == null ? 0 : ((DummyPlan) request).getWorkload().length;
+          ((DummyPlan) request).getWorkload() == null
+              ? 0
+              : ((DummyPlan) request).getWorkload().length;
       return workloadSize + 512;
     }
     return DEFAULT_BUFFER_SIZE;
@@ -68,8 +70,8 @@ public class RequestLog extends Log {
       dataOutputStream.writeLong(getCurrLogTerm());
 
       ByteBuffer byteBuffer = request.serializeToByteBuffer();
-      dataOutputStream.write(byteBuffer.array(), byteBuffer.arrayOffset(),
-          byteBuffer.limit() - byteBuffer.position());
+      dataOutputStream.write(
+          byteBuffer.array(), byteBuffer.arrayOffset(), byteBuffer.limit() - byteBuffer.position());
     } catch (IOException e) {
       // unreachable
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
index 656def1b65..1774a0c3ac 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java
@@ -38,7 +38,6 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.mpp.execution.StateMachine;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
 
@@ -72,8 +71,14 @@ public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogMan
       Node header,
       Node thisNode,
       DataGroupMember dataGroupMember) {
-    super(createLogApplier(dataGroupMember, stateMachine), partitionTable, header, thisNode, Factory.INSTANCE,
-        dataGroupMember, stateMachine);
+    super(
+        createLogApplier(dataGroupMember, stateMachine),
+        partitionTable,
+        header,
+        thisNode,
+        Factory.INSTANCE,
+        dataGroupMember,
+        stateMachine);
   }
 
   private static LogApplier createLogApplier(
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
index 32c7d19ce5..5d5d5bfe28 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java
@@ -19,8 +19,6 @@
 
 package org.apache.iotdb.cluster.log.manage;
 
-import java.io.IOException;
-import java.util.Map;
 import org.apache.iotdb.cluster.log.Snapshot;
 import org.apache.iotdb.cluster.log.applier.MetaLogApplier;
 import org.apache.iotdb.cluster.log.manage.serializable.SyncLogDequeSerializer;
@@ -36,9 +34,13 @@ import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.db.metadata.template.Template;
 import org.apache.iotdb.db.metadata.template.TemplateManager;
 import org.apache.iotdb.db.service.IoTDB;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.Map;
+
 /** MetaSingleSnapshotLogManager provides a MetaSimpleSnapshot as snapshot. */
 public class MetaSingleSnapshotLogManager extends RaftLogManager {
 
@@ -52,8 +54,11 @@ public class MetaSingleSnapshotLogManager extends RaftLogManager {
   private long term;
 
   public MetaSingleSnapshotLogManager(IStateMachine stateMachine, MetaGroupMember metaGroupMember) {
-    super(new SyncLogDequeSerializer(0), new MetaLogApplier(metaGroupMember, stateMachine),
-        metaGroupMember.getName(), stateMachine);
+    super(
+        new SyncLogDequeSerializer(0),
+        new MetaLogApplier(metaGroupMember, stateMachine),
+        metaGroupMember.getName(),
+        stateMachine);
     this.metaGroupMember = metaGroupMember;
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
index 8e2c7f46c0..5c7133b928 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
@@ -74,7 +74,8 @@ public abstract class PartitionedSnapshotLogManager<T extends Snapshot> extends
       Node header,
       Node thisNode,
       SnapshotFactory<T> factory,
-      DataGroupMember dataGroupMember, IStateMachine stateMachine) {
+      DataGroupMember dataGroupMember,
+      IStateMachine stateMachine) {
     super(
         new SyncLogDequeSerializer(header.nodeIdentifier),
         logApplier,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index ec2dbcbd6e..a3ab0bb2be 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -31,13 +31,12 @@ import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.Snapshot;
 import org.apache.iotdb.cluster.log.StableEntryManager;
 import org.apache.iotdb.cluster.log.manage.serializable.LogManagerMeta;
-import org.apache.iotdb.cluster.log.manage.serializable.SyncLogDequeSerializer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.utils.TestOnly;
-
 import org.apache.iotdb.consensus.IStateMachine;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -115,8 +114,11 @@ public abstract class RaftLogManager {
 
   protected IStateMachine stateMachine;
 
-  protected RaftLogManager(StableEntryManager stableEntryManager, LogApplier applier, String name
-      , IStateMachine stateMachine) {
+  protected RaftLogManager(
+      StableEntryManager stableEntryManager,
+      LogApplier applier,
+      String name,
+      IStateMachine stateMachine) {
     this.logApplier = applier;
     this.name = name;
     this.stateMachine = stateMachine;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
index b3fdb6d0af..465c4f0dd5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/AsynchronousSequencer.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -104,8 +105,8 @@ public class AsynchronousSequencer implements LogSequencer {
         log.setSequenceStartTime(sequenceStartTime);
         log.setCurrLogTerm(member.getTerm().get());
         log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
-        if (log instanceof RequestLog) {
-          ((RequestLog) log).getRequest().setIndex(log.getCurrLogIndex());
+        if (log instanceof RequestLog && ((RequestLog) log).getRequest() instanceof PhysicalPlan) {
+          ((PhysicalPlan) ((RequestLog) log).getRequest()).setIndex(log.getCurrLogIndex());
         }
 
         startTime = Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.getOperationStartTime();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
index bbaf8770ce..3f337274af 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/sequencing/SynchronousSequencer.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.sys.LogPlan;
 
 import java.util.concurrent.TimeUnit;
@@ -71,8 +72,10 @@ public class SynchronousSequencer implements LogSequencer {
           // if the log contains a physical plan which is not a LogPlan, assign the same index to
           // the plan so the state machine can be bridged with the consensus
           if (log instanceof RequestLog
+              && (((RequestLog) log).getRequest() instanceof PhysicalPlan)
               && !(((RequestLog) log).getRequest() instanceof LogPlan)) {
-            ((RequestLog) log).getRequest().setIndex(logManager.getLastLogIndex() + 1);
+            ((PhysicalPlan) ((RequestLog) log).getRequest())
+                .setIndex(logManager.getLastLogIndex() + 1);
           }
           log.setCurrLogTerm(member.getTerm().get());
           log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/ForwardPlanHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/ForwardPlanHandler.java
index 44b317d505..490be699c1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/ForwardPlanHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/forwarder/ForwardPlanHandler.java
@@ -38,7 +38,8 @@ public class ForwardPlanHandler implements AsyncMethodCallback<TSStatus> {
   private AtomicReference<TSStatus> result;
   private Node node;
 
-  public ForwardPlanHandler(AtomicReference<TSStatus> result, IConsensusRequest request, Node node) {
+  public ForwardPlanHandler(
+      AtomicReference<TSStatus> result, IConsensusRequest request, Node node) {
     this.result = result;
     this.request = request;
     this.node = node;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 669979bd24..95c569db50 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -207,7 +207,10 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
     logSequencer = SEQUENCER_FACTORY.create(this, logManager);
   }
 
-  DataGroupMember(Node thisNode, PartitionGroup nodes, MetaGroupMember metaGroupMember,
+  DataGroupMember(
+      Node thisNode,
+      PartitionGroup nodes,
+      MetaGroupMember metaGroupMember,
       IStateMachine stateMachine) {
     // The name is used in JMX, so we have to avoid to use "(" "," "=" ")"
     super(
@@ -344,8 +347,8 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
     }
 
     public DataGroupMember create(Node thisNode, PartitionGroup partitionGroup) {
-      return new DataGroupMember(thisNode, partitionGroup, metaGroupMember,
-          new PlanBasedStateMachine(metaGroupMember));
+      return new DataGroupMember(
+          thisNode, partitionGroup, metaGroupMember, new PlanBasedStateMachine(metaGroupMember));
     }
   }
 
@@ -735,8 +738,8 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
           }
         } catch (MetadataException | CheckConsistencyException ex) {
           logger.error("{}: Cannot auto-create timeseries for {}", name, request, e);
-          return new ConsensusWriteResponse(null,
-              StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, ex.getMessage()));
+          return new ConsensusWriteResponse(
+              null, StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, ex.getMessage()));
         }
         if (hasCreated) {
           return executeRequest(request);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index a786a86d62..7f16f6fb78 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -36,9 +36,7 @@ import org.apache.iotdb.cluster.exception.LogExecutionException;
 import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException;
 import org.apache.iotdb.cluster.exception.SnapshotInstallationException;
 import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
-import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.VotingLog;
-import org.apache.iotdb.cluster.log.applier.MetaLogApplier;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
 import org.apache.iotdb.cluster.log.manage.MetaSingleSnapshotLogManager;
@@ -88,7 +86,6 @@ import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.TimeValuePairUtils;
 import org.apache.iotdb.db.utils.TimeValuePairUtils.Intervals;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index b2c4d92219..bb40aea9ae 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -79,7 +79,6 @@ import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.iotdb.cluster.utils.IOUtils;
-import org.apache.iotdb.cluster.utils.PlanSerializer;
 import org.apache.iotdb.cluster.utils.StatusUtils;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
@@ -101,8 +100,6 @@ import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.metadata.template.DuplicatedTemplateException;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.qp.physical.sys.DummyPlan;
@@ -190,6 +187,7 @@ public abstract class RaftMember implements RaftMemberMBean {
   ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
   /** the name of the member, to distinguish several members in the logs. */
   ConsensusGroupId groupId;
+
   String name;
   /** to choose nodes to send request of joining cluster randomly. */
   Random random = new Random();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
index 2a83adc28e..e3adae4230 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java
@@ -38,17 +38,17 @@ import org.apache.iotdb.cluster.utils.IOUtils;
 import org.apache.iotdb.cluster.utils.StatusUtils;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+
 import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public abstract class BaseAsyncService implements RaftService.AsyncIface {
 
@@ -174,7 +174,7 @@ public abstract class BaseAsyncService implements RaftService.AsyncIface {
       // process the plan locally
       PhysicalPlan plan = PhysicalPlan.Factory.create(request.planBytes);
 
-      TSStatus status = member.executeRequest(plan);
+      TSStatus status = member.executeRequest(plan).getStatus();
       logger.debug("{}: Received a plan {}, executed answer: {}", name, plan, status);
       resultHandler.onComplete(
           StatusUtils.getStatus(
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
index 8119eceda5..c41eea3ea0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java
@@ -36,8 +36,8 @@ import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.cluster.utils.IOUtils;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -157,7 +157,7 @@ public abstract class BaseSyncService implements RaftService.Iface {
       // process the plan locally
       PhysicalPlan plan = PhysicalPlan.Factory.create(request.planBytes);
 
-      TSStatus answer = member.executeRequest(plan);
+      TSStatus answer = member.executeRequest(plan).getStatus();
       logger.debug("{}: Received a plan {}, executed answer: {}", name, plan, answer);
       return answer;
     } catch (Exception e) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
index 55761cd1eb..d68c2d1597 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionedLogManager.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionedLogManager.java
index 2b919a32f9..8a4c5eb3cf 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionedLogManager.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestPartitionedLogManager.java
@@ -35,7 +35,8 @@ public class TestPartitionedLogManager extends PartitionedSnapshotLogManager {
         new Node("localhost", 30001, 1, Constants.RPC_PORT, 6667, "localhost"),
         null,
         null,
-        null, stateMachine);
+        null,
+        stateMachine);
   }
 
   public TestPartitionedLogManager(
@@ -46,7 +47,8 @@ public class TestPartitionedLogManager extends PartitionedSnapshotLogManager {
         header,
         new Node("localhost", 30001, 1, 40001, Constants.RPC_PORT, "localhost"),
         factory,
-        null, stateMachine);
+        null,
+        stateMachine);
   }
 
   @Override
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
index 768ea092ca..bb9c9fa9a0 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java
@@ -24,8 +24,8 @@ import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
 import org.apache.iotdb.cluster.log.logtypes.EmptyContentLog;
-import org.apache.iotdb.cluster.log.logtypes.RequestLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
 import org.apache.iotdb.cluster.utils.PlanSerializer;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
index d8ebb4f093..7bb8130f44 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java
@@ -25,8 +25,8 @@ import org.apache.iotdb.cluster.common.TestUtils;
 import org.apache.iotdb.cluster.coordinator.Coordinator;
 import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.logtypes.AddNodeLog;
-import org.apache.iotdb.cluster.log.logtypes.RequestLog;
 import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog;
+import org.apache.iotdb.cluster.log.logtypes.RequestLog;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.utils.Constants;
 import org.apache.iotdb.commons.exception.MetadataException;