You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/10/19 11:13:02 UTC

[iotdb] 01/12: finish IntoOperator v1

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2293ea4ca46e4ec95ff65f7950f69b1b5ca0429c
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Sun Oct 16 19:01:35 2022 +0800

    finish IntoOperator v1
---
 .../execution/operator/process/IntoOperator.java   | 65 +++++++++++++++++++---
 1 file changed, 58 insertions(+), 7 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index c5ca4cc091..1251bb3e50 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
@@ -31,15 +33,21 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 public class IntoOperator implements ProcessOperator {
 
@@ -47,18 +55,24 @@ public class IntoOperator implements ProcessOperator {
   private final Operator child;
 
   private final List<InsertTabletStatementGenerator> insertTabletStatementGenerators;
+  private final List<Pair<String, String>> sourceTargetPathPairList;
+  private final Map<String, InputLocation> sourceColumnToInputLocationMap;
 
   public IntoOperator(
       OperatorContext operatorContext,
       Operator child,
       Map<PartialPath, Map<String, InputLocation>> targetPathToSourceMap,
       Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
-      Map<PartialPath, Boolean> targetDeviceToAlignedMap) {
+      Map<PartialPath, Boolean> targetDeviceToAlignedMap,
+      List<Pair<String, String>> sourceTargetPathPairList,
+      Map<String, InputLocation> sourceColumnToInputLocationMap) {
     this.operatorContext = operatorContext;
     this.child = child;
     this.insertTabletStatementGenerators =
         constructInsertTabletStatementGenerators(
             targetPathToSourceMap, targetPathToDataTypeMap, targetDeviceToAlignedMap);
+    this.sourceTargetPathPairList = sourceTargetPathPairList;
+    this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap;
   }
 
   private List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators(
@@ -129,11 +143,34 @@ public class IntoOperator implements ProcessOperator {
   }
 
   private TsBlock constructResultTsBlock() {
-    List<TSDataType> dataTypes = new ArrayList<>();
-    TsBlockBuilder resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
+    List<TSDataType> outputDataTypes =
+        ColumnHeaderConstant.selectIntoColumnHeaders.stream()
+            .map(ColumnHeader::getColumnType)
+            .collect(Collectors.toList());
+    TsBlockBuilder resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
+    TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
+    ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
+    for (Pair<String, String> sourceTargetPathPair : sourceTargetPathPairList) {
+      timeColumnBuilder.writeLong(0);
+      columnBuilders[0].writeBinary(new Binary(sourceTargetPathPair.left));
+      columnBuilders[1].writeBinary(new Binary(sourceTargetPathPair.right));
+      columnBuilders[2].writeInt(findWritten(sourceTargetPathPair.left));
+      resultTsBlockBuilder.declarePosition();
+    }
     return resultTsBlockBuilder.build();
   }
 
+  private int findWritten(String sourceColumn) {
+    InputLocation inputLocation = sourceColumnToInputLocationMap.get(sourceColumn);
+    for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+      int written = generator.getWrittenCount(inputLocation);
+      if (written != -1) {
+        return written;
+      }
+    }
+    return 0;
+  }
+
   @Override
   public boolean hasNext() {
     return child.hasNext();
@@ -151,20 +188,20 @@ public class IntoOperator implements ProcessOperator {
 
   @Override
   public long calculateMaxPeekMemory() {
-    return 0;
+    return child.calculateMaxPeekMemory();
   }
 
   @Override
   public long calculateMaxReturnSize() {
-    return 0;
+    return child.calculateMaxReturnSize();
   }
 
   @Override
   public long calculateRetainedSizeAfterCallingNext() {
-    return 0;
+    return child.calculateRetainedSizeAfterCallingNext();
   }
 
-  private static class InsertTabletStatementGenerator {
+  public static class InsertTabletStatementGenerator {
 
     private final int TABLET_ROW_LIMIT =
         IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
@@ -181,6 +218,8 @@ public class IntoOperator implements ProcessOperator {
     private Object[] columns;
     private BitMap[] bitMaps;
 
+    private final Map<InputLocation, AtomicInteger> writtenCounter;
+
     public InsertTabletStatementGenerator(
         PartialPath devicePath,
         Map<String, InputLocation> measurementToInputLocationMap,
@@ -191,6 +230,10 @@ public class IntoOperator implements ProcessOperator {
       this.measurements = measurementToInputLocationMap.keySet().toArray(new String[0]);
       this.dataTypes = measurementToDataTypeMap.values().toArray(new TSDataType[0]);
       this.inputLocations = measurementToInputLocationMap.values().toArray(new InputLocation[0]);
+      this.writtenCounter = new HashMap<>();
+      for (InputLocation inputLocation : inputLocations) {
+        writtenCounter.put(inputLocation, new AtomicInteger(0));
+      }
     }
 
     public void reset() {
@@ -219,6 +262,7 @@ public class IntoOperator implements ProcessOperator {
           }
 
           bitMaps[i].unmark(rowCount);
+          writtenCounter.get(inputLocations[i]).getAndIncrement();
           switch (valueColumn.getDataType()) {
             case INT32:
               ((int[]) columns[i])[rowCount] = valueColumn.getInt(lastReadIndex);
@@ -305,5 +349,12 @@ public class IntoOperator implements ProcessOperator {
 
       return insertTabletStatement;
     }
+
+    public int getWrittenCount(InputLocation inputLocation) {
+      if (!writtenCounter.containsKey(inputLocation)) {
+        return -1;
+      }
+      return writtenCounter.get(inputLocation).get();
+    }
   }
 }