You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/10/16 03:14:11 UTC

[iotdb] 01/01: add IntoOperator

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit bfad22909929b3480d473b41f3c2d543b4eaab6d
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Sun Oct 16 11:13:50 2022 +0800

    add IntoOperator
---
 .../execution/operator/process/IntoOperator.java   | 309 +++++++++++++++++++++
 .../process/RawDataAggregationOperator.java        |   2 +-
 .../process/SingleInputAggregationOperator.java    |   2 -
 .../process/SlidingWindowAggregationOperator.java  |   2 +-
 4 files changed, 311 insertions(+), 4 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
new file mode 100644
index 0000000000..c5ca4cc091
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BitMap;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class IntoOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+  private final Operator child;
+
+  private final List<InsertTabletStatementGenerator> insertTabletStatementGenerators;
+
+  public IntoOperator(
+      OperatorContext operatorContext,
+      Operator child,
+      Map<PartialPath, Map<String, InputLocation>> targetPathToSourceMap,
+      Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
+      Map<PartialPath, Boolean> targetDeviceToAlignedMap) {
+    this.operatorContext = operatorContext;
+    this.child = child;
+    this.insertTabletStatementGenerators =
+        constructInsertTabletStatementGenerators(
+            targetPathToSourceMap, targetPathToDataTypeMap, targetDeviceToAlignedMap);
+  }
+
+  private List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators(
+      Map<PartialPath, Map<String, InputLocation>> targetPathToSourceMap,
+      Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
+      Map<PartialPath, Boolean> targetDeviceToAlignedMap) {
+    List<InsertTabletStatementGenerator> insertTabletStatementGenerators =
+        new ArrayList<>(targetPathToSourceMap.size());
+    for (PartialPath targetDevice : targetPathToSourceMap.keySet()) {
+      InsertTabletStatementGenerator generator =
+          new InsertTabletStatementGenerator(
+              targetDevice,
+              targetPathToSourceMap.get(targetDevice),
+              targetPathToDataTypeMap.get(targetDevice),
+              targetDeviceToAlignedMap.get(targetDevice));
+      insertTabletStatementGenerators.add(generator);
+    }
+    return insertTabletStatementGenerators;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    return child.isBlocked();
+  }
+
+  @Override
+  public TsBlock next() {
+    TsBlock inputTsBlock = child.next();
+    int lastReadIndex = 0;
+    while (lastReadIndex < inputTsBlock.getPositionCount()) {
+      for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+        lastReadIndex = generator.processTsBlock(inputTsBlock, lastReadIndex);
+      }
+      insertMultiTabletsInternally(true);
+    }
+
+    if (child.hasNext()) {
+      return null;
+    } else {
+      insertMultiTabletsInternally(false);
+      return constructResultTsBlock();
+    }
+  }
+
+  private void insertMultiTabletsInternally(boolean needCheck) {
+    if ((needCheck && !insertTabletStatementGenerators.get(0).isFull())
+        || insertTabletStatementGenerators.get(0).isEmpty()) {
+      return;
+    }
+
+    List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>();
+    for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+      insertTabletStatementList.add(generator.constructInsertTabletStatement());
+    }
+
+    InsertMultiTabletsStatement insertMultiTabletsStatement = new InsertMultiTabletsStatement();
+    insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList);
+    // TODO: execute insertMultiTabletsStatement
+
+    for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+      generator.reset();
+    }
+  }
+
+  private TsBlock constructResultTsBlock() {
+    List<TSDataType> dataTypes = new ArrayList<>();
+    TsBlockBuilder resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
+    return resultTsBlockBuilder.build();
+  }
+
+  @Override
+  public boolean hasNext() {
+    return child.hasNext();
+  }
+
+  @Override
+  public void close() throws Exception {
+    child.close();
+  }
+
+  @Override
+  public boolean isFinished() {
+    return child.isFinished();
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return 0;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return 0;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0;
+  }
+
+  private static class InsertTabletStatementGenerator {
+
+    private final int TABLET_ROW_LIMIT =
+        IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
+
+    private final PartialPath devicePath;
+    private final boolean isAligned;
+    private final String[] measurements;
+    private final TSDataType[] dataTypes;
+    private final InputLocation[] inputLocations;
+
+    private int rowCount = 0;
+
+    private long[] times;
+    private Object[] columns;
+    private BitMap[] bitMaps;
+
+    public InsertTabletStatementGenerator(
+        PartialPath devicePath,
+        Map<String, InputLocation> measurementToInputLocationMap,
+        Map<String, TSDataType> measurementToDataTypeMap,
+        Boolean isAligned) {
+      this.devicePath = devicePath;
+      this.isAligned = isAligned;
+      this.measurements = measurementToInputLocationMap.keySet().toArray(new String[0]);
+      this.dataTypes = measurementToDataTypeMap.values().toArray(new TSDataType[0]);
+      this.inputLocations = measurementToInputLocationMap.values().toArray(new InputLocation[0]);
+    }
+
+    public void reset() {
+      this.rowCount = 0;
+      this.times = new long[TABLET_ROW_LIMIT];
+      this.columns = new Object[this.measurements.length];
+      this.bitMaps = new BitMap[this.measurements.length];
+      for (int i = 0; i < this.bitMaps.length; ++i) {
+        this.bitMaps[i] = new BitMap(TABLET_ROW_LIMIT);
+        this.bitMaps[i].markAll();
+      }
+    }
+
+    public int processTsBlock(TsBlock tsBlock, int lastReadIndex) {
+      for (; lastReadIndex < tsBlock.getPositionCount(); lastReadIndex++) {
+
+        times[rowCount] = tsBlock.getTimeByIndex(lastReadIndex);
+
+        for (int i = 0; i < measurements.length; ++i) {
+          Column valueColumn = tsBlock.getValueColumns()[inputLocations[i].getValueColumnIndex()];
+
+          // if the value is NULL
+          if (valueColumn.isNull(lastReadIndex)) {
+            // bit in bitMaps are marked as 1 (NULL) by default
+            continue;
+          }
+
+          bitMaps[i].unmark(rowCount);
+          switch (valueColumn.getDataType()) {
+            case INT32:
+              ((int[]) columns[i])[rowCount] = valueColumn.getInt(lastReadIndex);
+              break;
+            case INT64:
+              ((long[]) columns[i])[rowCount] = valueColumn.getLong(lastReadIndex);
+              break;
+            case FLOAT:
+              ((float[]) columns[i])[rowCount] = valueColumn.getFloat(lastReadIndex);
+              break;
+            case DOUBLE:
+              ((double[]) columns[i])[rowCount] = valueColumn.getDouble(lastReadIndex);
+              break;
+            case BOOLEAN:
+              ((boolean[]) columns[i])[rowCount] = valueColumn.getBoolean(lastReadIndex);
+              break;
+            case TEXT:
+              ((Binary[]) columns[i])[rowCount] = valueColumn.getBinary(lastReadIndex);
+              break;
+            default:
+              throw new UnSupportedDataTypeException(
+                  String.format(
+                      "data type %s is not supported when convert data at client",
+                      valueColumn.getDataType()));
+          }
+        }
+
+        ++rowCount;
+        if (rowCount == TABLET_ROW_LIMIT) {
+          break;
+        }
+      }
+      return lastReadIndex;
+    }
+
+    public boolean isFull() {
+      return rowCount == TABLET_ROW_LIMIT;
+    }
+
+    public boolean isEmpty() {
+      return rowCount == 0;
+    }
+
+    public InsertTabletStatement constructInsertTabletStatement() {
+      InsertTabletStatement insertTabletStatement = new InsertTabletStatement();
+      insertTabletStatement.setDevicePath(devicePath);
+      insertTabletStatement.setAligned(isAligned);
+      insertTabletStatement.setMeasurements(measurements);
+      insertTabletStatement.setDataTypes(dataTypes);
+      insertTabletStatement.setRowCount(rowCount);
+
+      if (rowCount != TABLET_ROW_LIMIT) {
+        times = Arrays.copyOf(times, rowCount);
+        for (int i = 0; i < columns.length; i++) {
+          switch (dataTypes[i]) {
+            case BOOLEAN:
+              columns[i] = Arrays.copyOf((boolean[]) columns[i], rowCount);
+              break;
+            case INT32:
+              columns[i] = Arrays.copyOf((int[]) columns[i], rowCount);
+              break;
+            case INT64:
+              columns[i] = Arrays.copyOf((long[]) columns[i], rowCount);
+              break;
+            case FLOAT:
+              columns[i] = Arrays.copyOf((float[]) columns[i], rowCount);
+              break;
+            case DOUBLE:
+              columns[i] = Arrays.copyOf((double[]) columns[i], rowCount);
+              break;
+            case TEXT:
+              columns[i] = Arrays.copyOf((Binary[]) columns[i], rowCount);
+              break;
+            default:
+              throw new UnSupportedDataTypeException(
+                  String.format("Data type %s is not supported.", dataTypes[i]));
+          }
+        }
+      }
+
+      insertTabletStatement.setTimes(times);
+      insertTabletStatement.setColumns(columns);
+      insertTabletStatement.setBitMaps(bitMaps);
+
+      return insertTabletStatement;
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
index e23a1232c7..30488c18d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -53,7 +53,7 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
       Operator child,
       boolean ascending,
       long maxReturnSize) {
-    super(operatorContext, aggregators, child, ascending, timeRangeIterator, maxReturnSize);
+    super(operatorContext, aggregators, child, ascending, maxReturnSize);
     this.windowManager = new TimeWindowManager(timeRangeIterator);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
index 676ccd823e..16071aea1e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
-import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -56,7 +55,6 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator
       List<Aggregator> aggregators,
       Operator child,
       boolean ascending,
-      ITimeRangeIterator timeRangeIterator,
       long maxReturnSize) {
     this.operatorContext = operatorContext;
     this.ascending = ascending;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
index a1fc271b1a..76499849cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
@@ -50,7 +50,7 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper
       boolean ascending,
       GroupByTimeParameter groupByTimeParameter,
       long maxReturnSize) {
-    super(operatorContext, aggregators, child, ascending, timeRangeIterator, maxReturnSize);
+    super(operatorContext, aggregators, child, ascending, maxReturnSize);
     checkArgument(
         groupByTimeParameter != null,
         "GroupByTimeParameter cannot be null in SlidingWindowAggregationOperator");