You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/05/12 14:45:46 UTC

[iotdb] branch jira_1376 updated: properly handle BatchProcessException

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch jira_1376
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/jira_1376 by this push:
     new f339731  properly handle BatchProcessException
f339731 is described below

commit f339731528ec6842e322a56c877619537340fa34
Author: jt2594838 <jt...@163.com>
AuthorDate: Wed May 12 22:43:48 2021 +0800

    properly handle BatchProcessException
---
 .../iotdb/cluster/log/applier/BaseApplier.java     | 37 ++++++++++++-
 .../cluster/log/applier/DataLogApplierTest.java    | 60 +++++++++++++++++-----
 .../engine/storagegroup/StorageGroupProcessor.java |  8 ++-
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  7 +--
 .../org/apache/iotdb/db/qp/physical/BatchPlan.java | 47 +++++++++++++++++
 .../db/qp/physical/crud/InsertMultiTabletPlan.java | 39 +++++++++++++-
 .../physical/crud/InsertRowsOfOneDevicePlan.java   | 33 +++++++++++-
 .../iotdb/db/qp/physical/crud/InsertRowsPlan.java  | 39 +++++++++++++-
 .../qp/physical/sys/CreateMultiTimeSeriesPlan.java | 39 +++++++++++++-
 9 files changed, 284 insertions(+), 25 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
index 6333717..7a9c34a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
@@ -35,10 +35,13 @@ import org.apache.iotdb.db.exception.metadata.UndefinedTemplateException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.qp.physical.BatchPlan;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.utils.SchemaUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,12 +75,12 @@ abstract class BaseApplier implements LogApplier {
     } else if (plan != null && !plan.isQuery()) {
       try {
         getQueryExecutor().processNonQuery(plan);
+      } catch (BatchProcessException e) {
+        handleBatchProcessException(e, plan);
       } catch (QueryProcessException e) {
         if (e.getCause() instanceof StorageGroupNotSetException
             || e.getCause() instanceof UndefinedTemplateException) {
           executeAfterSync(plan);
-        } else if (e instanceof BatchProcessException) {
-          logger.warn("Exception occurred while processing non-query. ", e);
         } else {
           throw e;
         }
@@ -89,6 +92,36 @@ abstract class BaseApplier implements LogApplier {
     }
   }
 
+  private void handleBatchProcessException(BatchProcessException e, PhysicalPlan plan)
+      throws QueryProcessException, StorageEngineException, StorageGroupNotSetException {
+    TSStatus[] failingStatus = e.getFailingStatus();
+    for (int i = 0; i < failingStatus.length; i++) {
+      TSStatus status = failingStatus[i];
+      // skip succeeded plans in later execution
+      if (status != null
+          && status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+          && plan instanceof BatchPlan) {
+        ((BatchPlan) plan).setIsExecuted(i);
+      }
+    }
+    boolean needRetry = false;
+    for (int i = 0, failingStatusLength = failingStatus.length; i < failingStatusLength; i++) {
+      TSStatus status = failingStatus[i];
+      if (status != null
+          && (status.getCode() == TSStatusCode.STORAGE_GROUP_NOT_EXIST.getStatusCode()
+              || status.getCode() == TSStatusCode.UNDEFINED_TEMPLATE.getStatusCode())
+          && plan instanceof BatchPlan) {
+        ((BatchPlan) plan).unsetIsExecuted(i);
+        needRetry = true;
+      }
+    }
+    if (needRetry) {
+      executeAfterSync(plan);
+      return;
+    }
+    throw e;
+  }
+
   private void executeAfterSync(PhysicalPlan plan)
       throws QueryProcessException, StorageGroupNotSetException, StorageEngineException {
     try {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
index af6272e..cfee1bb 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
@@ -19,6 +19,19 @@
 
 package org.apache.iotdb.cluster.log.applier;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import junit.framework.TestCase;
 import org.apache.iotdb.cluster.client.DataClientProvider;
 import org.apache.iotdb.cluster.client.async.AsyncDataClient;
 import org.apache.iotdb.cluster.common.IoTDBTest;
@@ -58,41 +71,39 @@ import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
-
-import junit.framework.TestCase;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.apache.thrift.protocol.TBinaryProtocol.Factory;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class DataLogApplierTest extends IoTDBTest {
 
+  private static final Logger logger = LoggerFactory.getLogger(DataLogApplierTest.class);
   private boolean partialWriteEnabled;
 
   private TestMetaGroupMember testMetaGroupMember =
       new TestMetaGroupMember() {
         @Override
         public boolean syncLeader(RaftMember.CheckConsistency checkConsistency) {
+          try {
+            // for testApplyCreateMultiTimeseiresWithPulling()
+            IoTDB.metaManager.setStorageGroup(new PartialPath("root.sg2"));
+          } catch (MetadataException e) {
+            logger.error("Cannot set sg for test", e);
+          }
           return true;
         }
 
@@ -338,4 +349,25 @@ public class DataLogApplierTest extends IoTDBTest {
         "Storage group is not set for current seriesPath: [root.test20]",
         log.getException().getMessage());
   }
+
+  @Test
+  public void testApplyCreateMultiTimeseiresWithPulling() throws MetadataException {
+    IoTDB.metaManager.setStorageGroup(new PartialPath("root.sg1"));
+    CreateMultiTimeSeriesPlan multiTimeSeriesPlan = new CreateMultiTimeSeriesPlan();
+    multiTimeSeriesPlan.setPaths(
+        Arrays.asList(
+            new PartialPath("root.sg1.s1"),
+            // root.sg2 should be pulled
+            new PartialPath("root.sg2.s1")));
+    multiTimeSeriesPlan.setCompressors(
+        Arrays.asList(CompressionType.UNCOMPRESSED, CompressionType.UNCOMPRESSED));
+    multiTimeSeriesPlan.setDataTypes(Arrays.asList(TSDataType.DOUBLE, TSDataType.DOUBLE));
+    multiTimeSeriesPlan.setEncodings(Arrays.asList(TSEncoding.GORILLA, TSEncoding.GORILLA));
+
+    PhysicalPlanLog log = new PhysicalPlanLog(multiTimeSeriesPlan);
+    // the applier should sync meta leader to get root.sg2 and report no error
+    applier.apply(log);
+    assertTrue(IoTDB.metaManager.getAllStorageGroupPaths().contains(new PartialPath("root.sg2")));
+    assertNull(log.getException());
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 2e0ac5c..26261df 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -2899,9 +2899,13 @@ public class StorageGroupProcessor {
     writeLock();
     try {
       boolean isSequence = false;
-      for (InsertRowPlan plan : insertRowsOfOneDevicePlan.getRowPlans()) {
-        if (!isAlive(plan.getTime())) {
+      InsertRowPlan[] rowPlans = insertRowsOfOneDevicePlan.getRowPlans();
+      for (int i = 0, rowPlansLength = rowPlans.length; i < rowPlansLength; i++) {
+
+        InsertRowPlan plan = rowPlans[i];
+        if (!isAlive(plan.getTime()) || insertRowsOfOneDevicePlan.isExecuted(i)) {
           // we do not need to write these part of data, as they can not be queried
+          // or the sub-plan has already been executed, we are retrying other sub-plans
           continue;
         }
         // init map
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index d4d08c1..54ca1d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -1237,7 +1237,7 @@ public class PlanExecutor implements IPlanExecutor {
   @Override
   public void insert(InsertRowsPlan plan) throws QueryProcessException {
     for (int i = 0; i < plan.getInsertRowPlanList().size(); i++) {
-      if (plan.getResults().containsKey(i)) {
+      if (plan.getResults().containsKey(i) || plan.isExecuted(i)) {
         continue;
       }
       try {
@@ -1290,7 +1290,8 @@ public class PlanExecutor implements IPlanExecutor {
   public void insertTablet(InsertMultiTabletPlan insertMultiTabletPlan)
       throws QueryProcessException {
     for (int i = 0; i < insertMultiTabletPlan.getInsertTabletPlanList().size(); i++) {
-      if (insertMultiTabletPlan.getResults().containsKey(i)) {
+      if (insertMultiTabletPlan.getResults().containsKey(i)
+          || insertMultiTabletPlan.isExecuted(i)) {
         continue;
       }
       insertTablet(insertMultiTabletPlan.getInsertTabletPlanList().get(i));
@@ -1418,7 +1419,7 @@ public class PlanExecutor implements IPlanExecutor {
       throws BatchProcessException {
     int dataTypeIdx = 0;
     for (int i = 0; i < multiPlan.getPaths().size(); i++) {
-      if (multiPlan.getResults().containsKey(i)) {
+      if (multiPlan.getResults().containsKey(i) || multiPlan.isExecuted(i)) {
         continue;
       }
       PartialPath path = multiPlan.getPaths().get(i);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/BatchPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/BatchPlan.java
new file mode 100644
index 0000000..4319c82
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/BatchPlan.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.physical;
+
+/** BatchPlan contains multiple sub-plans. */
+public interface BatchPlan {
+
+  /**
+   * Mark the sub-plan at position i as executed.
+   *
+   * @param i
+   */
+  void setIsExecuted(int i);
+
+  /**
+   * Mark the sub-plan at position i as not executed.
+   *
+   * @param i
+   */
+  void unsetIsExecuted(int i);
+
+  /**
+   * @return whether the sub-plan at position i has been executed.
+   * @param i
+   */
+  boolean isExecuted(int i);
+
+  /** @return how many sub-plans are in the plan. */
+  int getBatchSize();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
index ba552b0..1883823 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+import org.apache.iotdb.db.qp.physical.BatchPlan;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
 import java.io.DataOutputStream;
@@ -39,7 +40,7 @@ import java.util.TreeMap;
  * reduce the number of raft logs. For details, please refer to
  * https://issues.apache.org/jira/browse/IOTDB-1099
  */
-public class InsertMultiTabletPlan extends InsertPlan {
+public class InsertMultiTabletPlan extends InsertPlan implements BatchPlan {
 
   /**
    * the value is used to indict the parent InsertTabletPlan's index when the parent
@@ -90,6 +91,8 @@ public class InsertMultiTabletPlan extends InsertPlan {
   /** record the result of creation of time series */
   private Map<Integer, TSStatus> results = new TreeMap<>();
 
+  boolean[] isExecuted;
+
   public InsertMultiTabletPlan() {
     super(OperatorType.MULTI_BATCH_INSERT);
     this.insertTabletPlanList = new ArrayList<>();
@@ -327,4 +330,38 @@ public class InsertMultiTabletPlan extends InsertPlan {
                 : 0);
     return result;
   }
+
+  @Override
+  public void setIsExecuted(int i) {
+    if (isExecuted == null) {
+      isExecuted = new boolean[getBatchSize()];
+    }
+    isExecuted[i] = true;
+  }
+
+  @Override
+  public boolean isExecuted(int i) {
+    if (isExecuted == null) {
+      isExecuted = new boolean[getBatchSize()];
+    }
+    return isExecuted[i];
+  }
+
+  @Override
+  public int getBatchSize() {
+    return insertTabletPlanList.size();
+  }
+
+  @Override
+  public void unsetIsExecuted(int i) {
+    if (isExecuted == null) {
+      isExecuted = new boolean[getBatchSize()];
+    }
+    isExecuted[i] = false;
+    if (parentInsertTabletPlanIndexList != null) {
+      results.remove(getParentIndex(i));
+    } else {
+      results.remove(i);
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
index 41509bd..8886688 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+import org.apache.iotdb.db.qp.physical.BatchPlan;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -31,8 +32,9 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-public class InsertRowsOfOneDevicePlan extends InsertPlan {
+public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
 
+  boolean[] isExecuted;
   private InsertRowPlan[] rowPlans;
 
   public InsertRowsOfOneDevicePlan(
@@ -163,4 +165,33 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan {
   public InsertRowPlan[] getRowPlans() {
     return rowPlans;
   }
+
+  @Override
+  public void setIsExecuted(int i) {
+    if (isExecuted == null) {
+      isExecuted = new boolean[getBatchSize()];
+    }
+    isExecuted[i] = true;
+  }
+
+  @Override
+  public boolean isExecuted(int i) {
+    if (isExecuted == null) {
+      isExecuted = new boolean[getBatchSize()];
+    }
+    return isExecuted[i];
+  }
+
+  @Override
+  public int getBatchSize() {
+    return rowPlans.length;
+  }
+
+  @Override
+  public void unsetIsExecuted(int i) {
+    if (isExecuted == null) {
+      isExecuted = new boolean[getBatchSize()];
+    }
+    isExecuted[i] = false;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java
index d5122cf..8bcfe32 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsPlan.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+import org.apache.iotdb.db.qp.physical.BatchPlan;
 import org.apache.iotdb.db.utils.StatusUtils;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
@@ -34,7 +35,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-public class InsertRowsPlan extends InsertPlan {
+public class InsertRowsPlan extends InsertPlan implements BatchPlan {
 
   /**
    * Suppose there is an InsertRowsPlan, which contains 5 InsertRowPlans,
@@ -51,6 +52,8 @@ public class InsertRowsPlan extends InsertPlan {
   /** the InsertRowsPlan list */
   private List<InsertRowPlan> insertRowPlanList;
 
+  boolean[] isExecuted;
+
   /** record the result of insert rows */
   private Map<Integer, TSStatus> results = new HashMap<>();
 
@@ -234,4 +237,38 @@ public class InsertRowsPlan extends InsertPlan {
   public TSStatus[] getFailingStatus() {
     return StatusUtils.getFailingStatus(results, insertRowPlanList.size());
   }
+
+  @Override
+  public void setIsExecuted(int i) {
+    if (isExecuted == null) {
+      isExecuted = new boolean[getBatchSize()];
+    }
+    isExecuted[i] = true;
+  }
+
+  @Override
+  public boolean isExecuted(int i) {
+    if (isExecuted == null) {
+      isExecuted = new boolean[getBatchSize()];
+    }
+    return isExecuted[i];
+  }
+
+  @Override
+  public int getBatchSize() {
+    return insertRowPlanList.size();
+  }
+
+  @Override
+  public void unsetIsExecuted(int i) {
+    if (isExecuted == null) {
+      isExecuted = new boolean[getBatchSize()];
+    }
+    isExecuted[i] = false;
+    if (insertRowPlanIndexList != null) {
+      results.remove(insertRowPlanIndexList.get(i));
+    } else {
+      results.remove(i);
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
index e1982e6..146e93f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateMultiTimeSeriesPlan.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.BatchPlan;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.utils.StatusUtils;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
@@ -43,7 +44,7 @@ import java.util.TreeMap;
 /**
  * create multiple timeSeries, could be split to several sub Plans to execute in different DataGroup
  */
-public class CreateMultiTimeSeriesPlan extends PhysicalPlan {
+public class CreateMultiTimeSeriesPlan extends PhysicalPlan implements BatchPlan {
 
   private List<PartialPath> paths;
   private List<TSDataType> dataTypes;
@@ -54,6 +55,8 @@ public class CreateMultiTimeSeriesPlan extends PhysicalPlan {
   private List<Map<String, String>> tags = null;
   private List<Map<String, String>> attributes = null;
 
+  boolean[] isExecuted;
+
   /** record the result of creation of time series */
   private Map<Integer, TSStatus> results = new TreeMap<>();
 
@@ -346,4 +349,38 @@ public class CreateMultiTimeSeriesPlan extends PhysicalPlan {
       }
     }
   }
+
+  @Override
+  public void setIsExecuted(int i) {
+    if (isExecuted == null) {
+      isExecuted = new boolean[getBatchSize()];
+    }
+    isExecuted[i] = true;
+  }
+
+  @Override
+  public boolean isExecuted(int i) {
+    if (isExecuted == null) {
+      isExecuted = new boolean[getBatchSize()];
+    }
+    return isExecuted[i];
+  }
+
+  @Override
+  public int getBatchSize() {
+    return paths.size();
+  }
+
+  @Override
+  public void unsetIsExecuted(int i) {
+    if (isExecuted == null) {
+      isExecuted = new boolean[getBatchSize()];
+    }
+    isExecuted[i] = false;
+    if (indexes != null) {
+      results.remove(indexes.get(i));
+    } else {
+      results.remove(i);
+    }
+  }
 }