You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/10/19 11:13:09 UTC

[iotdb] 08/12: implement DeviceViewIntoOperator

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit de98a1a31da1e5b20a458fa5b6602108be374aed
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Oct 18 16:57:37 2022 +0800

    implement DeviceViewIntoOperator
---
 ...IntoOperator.java => AbstractIntoOperator.java} | 115 +++-----
 .../operator/process/DeviceViewIntoOperator.java   | 128 ++++++---
 .../execution/operator/process/IntoOperator.java   | 302 +--------------------
 3 files changed, 126 insertions(+), 419 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
similarity index 74%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index 1491623e77..d038874d0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -21,8 +21,6 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
@@ -31,13 +29,9 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
-import org.apache.iotdb.tsfile.utils.Pair;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
@@ -47,43 +41,37 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 
-public class IntoOperator implements ProcessOperator {
+public abstract class AbstractIntoOperator implements ProcessOperator {
 
-  private final OperatorContext operatorContext;
-  private final Operator child;
+  protected final OperatorContext operatorContext;
+  protected final Operator child;
 
-  private final List<InsertTabletStatementGenerator> insertTabletStatementGenerators;
-  private final List<Pair<String, PartialPath>> sourceTargetPathPairList;
-  private final Map<String, InputLocation> sourceColumnToInputLocationMap;
+  protected List<IntoOperator.InsertTabletStatementGenerator> insertTabletStatementGenerators;
 
-  public IntoOperator(
+  protected final Map<String, InputLocation> sourceColumnToInputLocationMap;
+
+  public AbstractIntoOperator(
       OperatorContext operatorContext,
       Operator child,
-      Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap,
-      Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
-      Map<String, Boolean> targetDeviceToAlignedMap,
-      List<Pair<String, PartialPath>> sourceTargetPathPairList,
+      List<IntoOperator.InsertTabletStatementGenerator> insertTabletStatementGenerators,
       Map<String, InputLocation> sourceColumnToInputLocationMap) {
     this.operatorContext = operatorContext;
     this.child = child;
-    this.insertTabletStatementGenerators =
-        constructInsertTabletStatementGenerators(
-            targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap);
-    this.sourceTargetPathPairList = sourceTargetPathPairList;
+    this.insertTabletStatementGenerators = insertTabletStatementGenerators;
     this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap;
   }
 
-  private List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators(
-      Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap,
-      Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
-      Map<String, Boolean> targetDeviceToAlignedMap) {
-    List<InsertTabletStatementGenerator> insertTabletStatementGenerators =
+  protected static List<IntoOperator.InsertTabletStatementGenerator>
+      constructInsertTabletStatementGenerators(
+          Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap,
+          Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
+          Map<String, Boolean> targetDeviceToAlignedMap) {
+    List<IntoOperator.InsertTabletStatementGenerator> insertTabletStatementGenerators =
         new ArrayList<>(targetPathToSourceInputLocationMap.size());
     for (PartialPath targetDevice : targetPathToSourceInputLocationMap.keySet()) {
-      InsertTabletStatementGenerator generator =
-          new InsertTabletStatementGenerator(
+      IntoOperator.InsertTabletStatementGenerator generator =
+          new IntoOperator.InsertTabletStatementGenerator(
               targetDevice,
               targetPathToSourceInputLocationMap.get(targetDevice),
               targetPathToDataTypeMap.get(targetDevice),
@@ -93,45 +81,14 @@ public class IntoOperator implements ProcessOperator {
     return insertTabletStatementGenerators;
   }
 
-  @Override
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  @Override
-  public ListenableFuture<?> isBlocked() {
-    return child.isBlocked();
-  }
-
-  @Override
-  public TsBlock next() {
-    TsBlock inputTsBlock = child.next();
-    if (inputTsBlock != null) {
-      int lastReadIndex = 0;
-      while (lastReadIndex < inputTsBlock.getPositionCount()) {
-        for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
-          lastReadIndex = generator.processTsBlock(inputTsBlock, lastReadIndex);
-        }
-        insertMultiTabletsInternally(true);
-      }
-    }
-
-    if (child.hasNext()) {
-      return null;
-    } else {
-      insertMultiTabletsInternally(false);
-      return constructResultTsBlock();
-    }
-  }
-
-  private void insertMultiTabletsInternally(boolean needCheck) {
+  protected void insertMultiTabletsInternally(boolean needCheck) {
     if ((needCheck && !insertTabletStatementGenerators.get(0).isFull())
         || insertTabletStatementGenerators.get(0).isEmpty()) {
       return;
     }
 
     List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>();
-    for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+    for (IntoOperator.InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
       insertTabletStatementList.add(generator.constructInsertTabletStatement());
     }
 
@@ -139,32 +96,14 @@ public class IntoOperator implements ProcessOperator {
     insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList);
     // TODO: execute insertMultiTabletsStatement
 
-    for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+    for (IntoOperator.InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
       generator.reset();
     }
   }
 
-  private TsBlock constructResultTsBlock() {
-    List<TSDataType> outputDataTypes =
-        ColumnHeaderConstant.selectIntoColumnHeaders.stream()
-            .map(ColumnHeader::getColumnType)
-            .collect(Collectors.toList());
-    TsBlockBuilder resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
-    TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
-    ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
-    for (Pair<String, PartialPath> sourceTargetPathPair : sourceTargetPathPairList) {
-      timeColumnBuilder.writeLong(0);
-      columnBuilders[0].writeBinary(new Binary(sourceTargetPathPair.left));
-      columnBuilders[1].writeBinary(new Binary(sourceTargetPathPair.right.toString()));
-      columnBuilders[2].writeInt(findWritten(sourceTargetPathPair.left));
-      resultTsBlockBuilder.declarePosition();
-    }
-    return resultTsBlockBuilder.build();
-  }
-
-  private int findWritten(String sourceColumn) {
+  protected int findWritten(String sourceColumn) {
     InputLocation inputLocation = sourceColumnToInputLocationMap.get(sourceColumn);
-    for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+    for (IntoOperator.InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
       int written = generator.getWrittenCount(inputLocation);
       if (written != -1) {
         return written;
@@ -173,6 +112,16 @@ public class IntoOperator implements ProcessOperator {
     return 0;
   }
 
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    return child.isBlocked();
+  }
+
   @Override
   public boolean hasNext() {
     return child.hasNext();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
index e1c38e662c..bf1a0920fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -19,64 +19,112 @@
 
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.Pair;
 
-import com.google.common.util.concurrent.ListenableFuture;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
-public class DeviceViewIntoOperator implements ProcessOperator {
+public class DeviceViewIntoOperator extends AbstractIntoOperator {
 
-  private final OperatorContext operatorContext;
-  private final Operator child;
+  private final Map<String, Map<PartialPath, Map<String, InputLocation>>>
+      deviceToTargetPathSourceInputLocationMap;
+  private final Map<String, Map<PartialPath, Map<String, TSDataType>>>
+      deviceToTargetPathDataTypeMap;
+  private final Map<String, Boolean> targetDeviceToAlignedMap;
+  private final Map<String, List<Pair<String, PartialPath>>> deviceToSourceTargetPathPairListMap;
 
-  public DeviceViewIntoOperator(OperatorContext operatorContext, Operator child) {
-    this.operatorContext = operatorContext;
-    this.child = child;
-  }
+  private String currentDevice;
 
-  @Override
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
+  private final TsBlockBuilder resultTsBlockBuilder;
 
-  @Override
-  public ListenableFuture<?> isBlocked() {
-    return child.isBlocked();
-  }
+  public DeviceViewIntoOperator(
+      OperatorContext operatorContext,
+      Operator child,
+      Map<String, Map<PartialPath, Map<String, InputLocation>>>
+          deviceToTargetPathSourceInputLocationMap,
+      Map<String, Map<PartialPath, Map<String, TSDataType>>> deviceToTargetPathDataTypeMap,
+      Map<String, Boolean> targetDeviceToAlignedMap,
+      Map<String, List<Pair<String, PartialPath>>> deviceToSourceTargetPathPairListMap,
+      Map<String, InputLocation> sourceColumnToInputLocationMap) {
+    super(operatorContext, child, null, sourceColumnToInputLocationMap);
+    this.deviceToTargetPathSourceInputLocationMap = deviceToTargetPathSourceInputLocationMap;
+    this.deviceToTargetPathDataTypeMap = deviceToTargetPathDataTypeMap;
+    this.targetDeviceToAlignedMap = targetDeviceToAlignedMap;
+    this.deviceToSourceTargetPathPairListMap = deviceToSourceTargetPathPairListMap;
 
-  @Override
-  public TsBlock next() {
-    return null;
+    List<TSDataType> outputDataTypes =
+        ColumnHeaderConstant.selectIntoAlignByDeviceColumnHeaders.stream()
+            .map(ColumnHeader::getColumnType)
+            .collect(Collectors.toList());
+    this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
   }
 
   @Override
-  public boolean hasNext() {
-    return child.hasNext();
-  }
-
-  @Override
-  public void close() throws Exception {
-    child.close();
-  }
+  public TsBlock next() {
+    TsBlock inputTsBlock = child.next();
+    if (inputTsBlock != null) {
+      String device = String.valueOf(inputTsBlock.getValueColumns()[0].getBinary(0));
+      if (!Objects.equals(device, currentDevice)) {
+        insertMultiTabletsInternally(false);
+        updateResultTsBlock();
 
-  @Override
-  public boolean isFinished() {
-    return child.isFinished();
-  }
+        insertTabletStatementGenerators = constructInsertTabletStatementGeneratorsByDevice(device);
+        currentDevice = device;
+      }
+      int lastReadIndex = 0;
+      while (lastReadIndex < inputTsBlock.getPositionCount()) {
+        for (IntoOperator.InsertTabletStatementGenerator generator :
+            insertTabletStatementGenerators) {
+          lastReadIndex = generator.processTsBlock(inputTsBlock, lastReadIndex);
+        }
+        insertMultiTabletsInternally(true);
+      }
+    }
 
-  @Override
-  public long calculateMaxPeekMemory() {
-    return child.calculateMaxPeekMemory();
+    if (child.hasNext()) {
+      return null;
+    } else {
+      insertMultiTabletsInternally(false);
+      updateResultTsBlock();
+      return resultTsBlockBuilder.build();
+    }
   }
 
-  @Override
-  public long calculateMaxReturnSize() {
-    return child.calculateMaxReturnSize();
+  private void updateResultTsBlock() {
+    TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
+    ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
+    for (Pair<String, PartialPath> sourceTargetPathPair :
+        deviceToSourceTargetPathPairListMap.get(currentDevice)) {
+      timeColumnBuilder.writeLong(0);
+      columnBuilders[0].writeBinary(new Binary(currentDevice));
+      columnBuilders[1].writeBinary(new Binary(sourceTargetPathPair.left));
+      columnBuilders[2].writeBinary(new Binary(sourceTargetPathPair.right.toString()));
+      columnBuilders[3].writeInt(findWritten(sourceTargetPathPair.left));
+      resultTsBlockBuilder.declarePosition();
+    }
   }
 
-  @Override
-  public long calculateRetainedSizeAfterCallingNext() {
-    return child.calculateRetainedSizeAfterCallingNext();
+  private List<IntoOperator.InsertTabletStatementGenerator>
+      constructInsertTabletStatementGeneratorsByDevice(String currentDevice) {
+    Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap =
+        deviceToTargetPathSourceInputLocationMap.get(currentDevice);
+    Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap =
+        deviceToTargetPathDataTypeMap.get(currentDevice);
+    return constructInsertTabletStatementGenerators(
+        targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index 1491623e77..6a6c2045d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -20,43 +20,26 @@
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
 import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
-import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
-import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
-import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.utils.Pair;
 
-import com.google.common.util.concurrent.ListenableFuture;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
-public class IntoOperator implements ProcessOperator {
-
-  private final OperatorContext operatorContext;
-  private final Operator child;
+public class IntoOperator extends AbstractIntoOperator {
 
-  private final List<InsertTabletStatementGenerator> insertTabletStatementGenerators;
   private final List<Pair<String, PartialPath>> sourceTargetPathPairList;
-  private final Map<String, InputLocation> sourceColumnToInputLocationMap;
 
   public IntoOperator(
       OperatorContext operatorContext,
@@ -66,41 +49,13 @@ public class IntoOperator implements ProcessOperator {
       Map<String, Boolean> targetDeviceToAlignedMap,
       List<Pair<String, PartialPath>> sourceTargetPathPairList,
       Map<String, InputLocation> sourceColumnToInputLocationMap) {
-    this.operatorContext = operatorContext;
-    this.child = child;
-    this.insertTabletStatementGenerators =
+    super(
+        operatorContext,
+        child,
         constructInsertTabletStatementGenerators(
-            targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap);
+            targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap),
+        sourceColumnToInputLocationMap);
     this.sourceTargetPathPairList = sourceTargetPathPairList;
-    this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap;
-  }
-
-  private List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators(
-      Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap,
-      Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
-      Map<String, Boolean> targetDeviceToAlignedMap) {
-    List<InsertTabletStatementGenerator> insertTabletStatementGenerators =
-        new ArrayList<>(targetPathToSourceInputLocationMap.size());
-    for (PartialPath targetDevice : targetPathToSourceInputLocationMap.keySet()) {
-      InsertTabletStatementGenerator generator =
-          new InsertTabletStatementGenerator(
-              targetDevice,
-              targetPathToSourceInputLocationMap.get(targetDevice),
-              targetPathToDataTypeMap.get(targetDevice),
-              targetDeviceToAlignedMap.get(targetDevice.toString()));
-      insertTabletStatementGenerators.add(generator);
-    }
-    return insertTabletStatementGenerators;
-  }
-
-  @Override
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  @Override
-  public ListenableFuture<?> isBlocked() {
-    return child.isBlocked();
   }
 
   @Override
@@ -124,26 +79,6 @@ public class IntoOperator implements ProcessOperator {
     }
   }
 
-  private void insertMultiTabletsInternally(boolean needCheck) {
-    if ((needCheck && !insertTabletStatementGenerators.get(0).isFull())
-        || insertTabletStatementGenerators.get(0).isEmpty()) {
-      return;
-    }
-
-    List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>();
-    for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
-      insertTabletStatementList.add(generator.constructInsertTabletStatement());
-    }
-
-    InsertMultiTabletsStatement insertMultiTabletsStatement = new InsertMultiTabletsStatement();
-    insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList);
-    // TODO: execute insertMultiTabletsStatement
-
-    for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
-      generator.reset();
-    }
-  }
-
   private TsBlock constructResultTsBlock() {
     List<TSDataType> outputDataTypes =
         ColumnHeaderConstant.selectIntoColumnHeaders.stream()
@@ -161,229 +96,4 @@ public class IntoOperator implements ProcessOperator {
     }
     return resultTsBlockBuilder.build();
   }
-
-  private int findWritten(String sourceColumn) {
-    InputLocation inputLocation = sourceColumnToInputLocationMap.get(sourceColumn);
-    for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
-      int written = generator.getWrittenCount(inputLocation);
-      if (written != -1) {
-        return written;
-      }
-    }
-    return 0;
-  }
-
-  @Override
-  public boolean hasNext() {
-    return child.hasNext();
-  }
-
-  @Override
-  public void close() throws Exception {
-    child.close();
-  }
-
-  @Override
-  public boolean isFinished() {
-    return child.isFinished();
-  }
-
-  @Override
-  public long calculateMaxPeekMemory() {
-    return child.calculateMaxPeekMemory();
-  }
-
-  @Override
-  public long calculateMaxReturnSize() {
-    return child.calculateMaxReturnSize();
-  }
-
-  @Override
-  public long calculateRetainedSizeAfterCallingNext() {
-    return child.calculateRetainedSizeAfterCallingNext();
-  }
-
-  public static class InsertTabletStatementGenerator {
-
-    private final int TABLET_ROW_LIMIT =
-        IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
-
-    private final PartialPath devicePath;
-    private final boolean isAligned;
-    private final String[] measurements;
-    private final TSDataType[] dataTypes;
-    private final InputLocation[] inputLocations;
-
-    private int rowCount = 0;
-
-    private long[] times;
-    private Object[] columns;
-    private BitMap[] bitMaps;
-
-    private final Map<InputLocation, AtomicInteger> writtenCounter;
-
-    public InsertTabletStatementGenerator(
-        PartialPath devicePath,
-        Map<String, InputLocation> measurementToInputLocationMap,
-        Map<String, TSDataType> measurementToDataTypeMap,
-        Boolean isAligned) {
-      this.devicePath = devicePath;
-      this.isAligned = isAligned;
-      this.measurements = measurementToInputLocationMap.keySet().toArray(new String[0]);
-      this.dataTypes = measurementToDataTypeMap.values().toArray(new TSDataType[0]);
-      this.inputLocations = measurementToInputLocationMap.values().toArray(new InputLocation[0]);
-      this.writtenCounter = new HashMap<>();
-      for (InputLocation inputLocation : inputLocations) {
-        writtenCounter.put(inputLocation, new AtomicInteger(0));
-      }
-      this.reset();
-    }
-
-    public void reset() {
-      this.rowCount = 0;
-      this.times = new long[TABLET_ROW_LIMIT];
-      this.columns = new Object[this.measurements.length];
-      for (int i = 0; i < this.measurements.length; i++) {
-        switch (dataTypes[i]) {
-          case BOOLEAN:
-            columns[i] = new boolean[TABLET_ROW_LIMIT];
-            break;
-          case INT32:
-            columns[i] = new int[TABLET_ROW_LIMIT];
-            break;
-          case INT64:
-            columns[i] = new long[TABLET_ROW_LIMIT];
-            break;
-          case FLOAT:
-            columns[i] = new float[TABLET_ROW_LIMIT];
-            break;
-          case DOUBLE:
-            columns[i] = new double[TABLET_ROW_LIMIT];
-            break;
-          case TEXT:
-            columns[i] = new Binary[TABLET_ROW_LIMIT];
-            Arrays.fill((Binary[]) columns[i], Binary.EMPTY_VALUE);
-            break;
-          default:
-            throw new UnSupportedDataTypeException(
-                String.format("Data type %s is not supported.", dataTypes[i]));
-        }
-      }
-      this.bitMaps = new BitMap[this.measurements.length];
-      for (int i = 0; i < this.bitMaps.length; ++i) {
-        this.bitMaps[i] = new BitMap(TABLET_ROW_LIMIT);
-        this.bitMaps[i].markAll();
-      }
-    }
-
-    public int processTsBlock(TsBlock tsBlock, int lastReadIndex) {
-      for (; lastReadIndex < tsBlock.getPositionCount(); lastReadIndex++) {
-
-        times[rowCount] = tsBlock.getTimeByIndex(lastReadIndex);
-
-        for (int i = 0; i < measurements.length; ++i) {
-          Column valueColumn = tsBlock.getValueColumns()[inputLocations[i].getValueColumnIndex()];
-
-          // if the value is NULL
-          if (valueColumn.isNull(lastReadIndex)) {
-            // bit in bitMaps are marked as 1 (NULL) by default
-            continue;
-          }
-
-          bitMaps[i].unmark(rowCount);
-          writtenCounter.get(inputLocations[i]).getAndIncrement();
-          switch (valueColumn.getDataType()) {
-            case INT32:
-              ((int[]) columns[i])[rowCount] = valueColumn.getInt(lastReadIndex);
-              break;
-            case INT64:
-              ((long[]) columns[i])[rowCount] = valueColumn.getLong(lastReadIndex);
-              break;
-            case FLOAT:
-              ((float[]) columns[i])[rowCount] = valueColumn.getFloat(lastReadIndex);
-              break;
-            case DOUBLE:
-              ((double[]) columns[i])[rowCount] = valueColumn.getDouble(lastReadIndex);
-              break;
-            case BOOLEAN:
-              ((boolean[]) columns[i])[rowCount] = valueColumn.getBoolean(lastReadIndex);
-              break;
-            case TEXT:
-              ((Binary[]) columns[i])[rowCount] = valueColumn.getBinary(lastReadIndex);
-              break;
-            default:
-              throw new UnSupportedDataTypeException(
-                  String.format(
-                      "data type %s is not supported when convert data at client",
-                      valueColumn.getDataType()));
-          }
-        }
-
-        ++rowCount;
-        if (rowCount == TABLET_ROW_LIMIT) {
-          break;
-        }
-      }
-      return lastReadIndex;
-    }
-
-    public boolean isFull() {
-      return rowCount == TABLET_ROW_LIMIT;
-    }
-
-    public boolean isEmpty() {
-      return rowCount == 0;
-    }
-
-    public InsertTabletStatement constructInsertTabletStatement() {
-      InsertTabletStatement insertTabletStatement = new InsertTabletStatement();
-      insertTabletStatement.setDevicePath(devicePath);
-      insertTabletStatement.setAligned(isAligned);
-      insertTabletStatement.setMeasurements(measurements);
-      insertTabletStatement.setDataTypes(dataTypes);
-      insertTabletStatement.setRowCount(rowCount);
-
-      if (rowCount != TABLET_ROW_LIMIT) {
-        times = Arrays.copyOf(times, rowCount);
-        for (int i = 0; i < columns.length; i++) {
-          switch (dataTypes[i]) {
-            case BOOLEAN:
-              columns[i] = Arrays.copyOf((boolean[]) columns[i], rowCount);
-              break;
-            case INT32:
-              columns[i] = Arrays.copyOf((int[]) columns[i], rowCount);
-              break;
-            case INT64:
-              columns[i] = Arrays.copyOf((long[]) columns[i], rowCount);
-              break;
-            case FLOAT:
-              columns[i] = Arrays.copyOf((float[]) columns[i], rowCount);
-              break;
-            case DOUBLE:
-              columns[i] = Arrays.copyOf((double[]) columns[i], rowCount);
-              break;
-            case TEXT:
-              columns[i] = Arrays.copyOf((Binary[]) columns[i], rowCount);
-              break;
-            default:
-              throw new UnSupportedDataTypeException(
-                  String.format("Data type %s is not supported.", dataTypes[i]));
-          }
-        }
-      }
-
-      insertTabletStatement.setTimes(times);
-      insertTabletStatement.setColumns(columns);
-      insertTabletStatement.setBitMaps(bitMaps);
-
-      return insertTabletStatement;
-    }
-
-    public int getWrittenCount(InputLocation inputLocation) {
-      if (!writtenCounter.containsKey(inputLocation)) {
-        return -1;
-      }
-      return writtenCounter.get(inputLocation).get();
-    }
-  }
 }