You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/10/16 03:14:10 UTC

[iotdb] branch lmh/intoOperator created (now bfad229099)

This is an automated email from the ASF dual-hosted git repository.

hui pushed a change to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at bfad229099 add IntoOperator

This branch includes the following new commits:

     new bfad229099 add IntoOperator

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: add IntoOperator

Posted by hu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit bfad22909929b3480d473b41f3c2d543b4eaab6d
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Sun Oct 16 11:13:50 2022 +0800

    add IntoOperator
---
 .../execution/operator/process/IntoOperator.java   | 309 +++++++++++++++++++++
 .../process/RawDataAggregationOperator.java        |   2 +-
 .../process/SingleInputAggregationOperator.java    |   2 -
 .../process/SlidingWindowAggregationOperator.java  |   2 +-
 4 files changed, 311 insertions(+), 4 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
new file mode 100644
index 0000000000..c5ca4cc091
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BitMap;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class IntoOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+  private final Operator child;
+
+  private final List<InsertTabletStatementGenerator> insertTabletStatementGenerators;
+
+  public IntoOperator(
+      OperatorContext operatorContext,
+      Operator child,
+      Map<PartialPath, Map<String, InputLocation>> targetPathToSourceMap,
+      Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
+      Map<PartialPath, Boolean> targetDeviceToAlignedMap) {
+    this.operatorContext = operatorContext;
+    this.child = child;
+    this.insertTabletStatementGenerators =
+        constructInsertTabletStatementGenerators(
+            targetPathToSourceMap, targetPathToDataTypeMap, targetDeviceToAlignedMap);
+  }
+
+  private List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators(
+      Map<PartialPath, Map<String, InputLocation>> targetPathToSourceMap,
+      Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
+      Map<PartialPath, Boolean> targetDeviceToAlignedMap) {
+    List<InsertTabletStatementGenerator> insertTabletStatementGenerators =
+        new ArrayList<>(targetPathToSourceMap.size());
+    for (PartialPath targetDevice : targetPathToSourceMap.keySet()) {
+      InsertTabletStatementGenerator generator =
+          new InsertTabletStatementGenerator(
+              targetDevice,
+              targetPathToSourceMap.get(targetDevice),
+              targetPathToDataTypeMap.get(targetDevice),
+              targetDeviceToAlignedMap.get(targetDevice));
+      insertTabletStatementGenerators.add(generator);
+    }
+    return insertTabletStatementGenerators;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    return child.isBlocked();
+  }
+
+  @Override
+  public TsBlock next() {
+    TsBlock inputTsBlock = child.next();
+    int lastReadIndex = 0;
+    while (lastReadIndex < inputTsBlock.getPositionCount()) {
+      for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+        lastReadIndex = generator.processTsBlock(inputTsBlock, lastReadIndex);
+      }
+      insertMultiTabletsInternally(true);
+    }
+
+    if (child.hasNext()) {
+      return null;
+    } else {
+      insertMultiTabletsInternally(false);
+      return constructResultTsBlock();
+    }
+  }
+
+  private void insertMultiTabletsInternally(boolean needCheck) {
+    if ((needCheck && !insertTabletStatementGenerators.get(0).isFull())
+        || insertTabletStatementGenerators.get(0).isEmpty()) {
+      return;
+    }
+
+    List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>();
+    for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+      insertTabletStatementList.add(generator.constructInsertTabletStatement());
+    }
+
+    InsertMultiTabletsStatement insertMultiTabletsStatement = new InsertMultiTabletsStatement();
+    insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList);
+    // TODO: execute insertMultiTabletsStatement
+
+    for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+      generator.reset();
+    }
+  }
+
+  private TsBlock constructResultTsBlock() {
+    List<TSDataType> dataTypes = new ArrayList<>();
+    TsBlockBuilder resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
+    return resultTsBlockBuilder.build();
+  }
+
+  @Override
+  public boolean hasNext() {
+    return child.hasNext();
+  }
+
+  @Override
+  public void close() throws Exception {
+    child.close();
+  }
+
+  @Override
+  public boolean isFinished() {
+    return child.isFinished();
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return 0;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return 0;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0;
+  }
+
+  private static class InsertTabletStatementGenerator {
+
+    private final int TABLET_ROW_LIMIT =
+        IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
+
+    private final PartialPath devicePath;
+    private final boolean isAligned;
+    private final String[] measurements;
+    private final TSDataType[] dataTypes;
+    private final InputLocation[] inputLocations;
+
+    private int rowCount = 0;
+
+    private long[] times;
+    private Object[] columns;
+    private BitMap[] bitMaps;
+
+    public InsertTabletStatementGenerator(
+        PartialPath devicePath,
+        Map<String, InputLocation> measurementToInputLocationMap,
+        Map<String, TSDataType> measurementToDataTypeMap,
+        Boolean isAligned) {
+      this.devicePath = devicePath;
+      this.isAligned = isAligned;
+      this.measurements = measurementToInputLocationMap.keySet().toArray(new String[0]);
+      this.dataTypes = measurementToDataTypeMap.values().toArray(new TSDataType[0]);
+      this.inputLocations = measurementToInputLocationMap.values().toArray(new InputLocation[0]);
+    }
+
+    public void reset() {
+      this.rowCount = 0;
+      this.times = new long[TABLET_ROW_LIMIT];
+      this.columns = new Object[this.measurements.length];
+      this.bitMaps = new BitMap[this.measurements.length];
+      for (int i = 0; i < this.bitMaps.length; ++i) {
+        this.bitMaps[i] = new BitMap(TABLET_ROW_LIMIT);
+        this.bitMaps[i].markAll();
+      }
+    }
+
+    public int processTsBlock(TsBlock tsBlock, int lastReadIndex) {
+      for (; lastReadIndex < tsBlock.getPositionCount(); lastReadIndex++) {
+
+        times[rowCount] = tsBlock.getTimeByIndex(lastReadIndex);
+
+        for (int i = 0; i < measurements.length; ++i) {
+          Column valueColumn = tsBlock.getValueColumns()[inputLocations[i].getValueColumnIndex()];
+
+          // if the value is NULL
+          if (valueColumn.isNull(lastReadIndex)) {
+            // bit in bitMaps are marked as 1 (NULL) by default
+            continue;
+          }
+
+          bitMaps[i].unmark(rowCount);
+          switch (valueColumn.getDataType()) {
+            case INT32:
+              ((int[]) columns[i])[rowCount] = valueColumn.getInt(lastReadIndex);
+              break;
+            case INT64:
+              ((long[]) columns[i])[rowCount] = valueColumn.getLong(lastReadIndex);
+              break;
+            case FLOAT:
+              ((float[]) columns[i])[rowCount] = valueColumn.getFloat(lastReadIndex);
+              break;
+            case DOUBLE:
+              ((double[]) columns[i])[rowCount] = valueColumn.getDouble(lastReadIndex);
+              break;
+            case BOOLEAN:
+              ((boolean[]) columns[i])[rowCount] = valueColumn.getBoolean(lastReadIndex);
+              break;
+            case TEXT:
+              ((Binary[]) columns[i])[rowCount] = valueColumn.getBinary(lastReadIndex);
+              break;
+            default:
+              throw new UnSupportedDataTypeException(
+                  String.format(
+                      "data type %s is not supported when convert data at client",
+                      valueColumn.getDataType()));
+          }
+        }
+
+        ++rowCount;
+        if (rowCount == TABLET_ROW_LIMIT) {
+          break;
+        }
+      }
+      return lastReadIndex;
+    }
+
+    public boolean isFull() {
+      return rowCount == TABLET_ROW_LIMIT;
+    }
+
+    public boolean isEmpty() {
+      return rowCount == 0;
+    }
+
+    public InsertTabletStatement constructInsertTabletStatement() {
+      InsertTabletStatement insertTabletStatement = new InsertTabletStatement();
+      insertTabletStatement.setDevicePath(devicePath);
+      insertTabletStatement.setAligned(isAligned);
+      insertTabletStatement.setMeasurements(measurements);
+      insertTabletStatement.setDataTypes(dataTypes);
+      insertTabletStatement.setRowCount(rowCount);
+
+      if (rowCount != TABLET_ROW_LIMIT) {
+        times = Arrays.copyOf(times, rowCount);
+        for (int i = 0; i < columns.length; i++) {
+          switch (dataTypes[i]) {
+            case BOOLEAN:
+              columns[i] = Arrays.copyOf((boolean[]) columns[i], rowCount);
+              break;
+            case INT32:
+              columns[i] = Arrays.copyOf((int[]) columns[i], rowCount);
+              break;
+            case INT64:
+              columns[i] = Arrays.copyOf((long[]) columns[i], rowCount);
+              break;
+            case FLOAT:
+              columns[i] = Arrays.copyOf((float[]) columns[i], rowCount);
+              break;
+            case DOUBLE:
+              columns[i] = Arrays.copyOf((double[]) columns[i], rowCount);
+              break;
+            case TEXT:
+              columns[i] = Arrays.copyOf((Binary[]) columns[i], rowCount);
+              break;
+            default:
+              throw new UnSupportedDataTypeException(
+                  String.format("Data type %s is not supported.", dataTypes[i]));
+          }
+        }
+      }
+
+      insertTabletStatement.setTimes(times);
+      insertTabletStatement.setColumns(columns);
+      insertTabletStatement.setBitMaps(bitMaps);
+
+      return insertTabletStatement;
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
index e23a1232c7..30488c18d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -53,7 +53,7 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
       Operator child,
       boolean ascending,
       long maxReturnSize) {
-    super(operatorContext, aggregators, child, ascending, timeRangeIterator, maxReturnSize);
+    super(operatorContext, aggregators, child, ascending, maxReturnSize);
     this.windowManager = new TimeWindowManager(timeRangeIterator);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
index 676ccd823e..16071aea1e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.db.mpp.aggregation.Aggregator;
-import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -56,7 +55,6 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator
       List<Aggregator> aggregators,
       Operator child,
       boolean ascending,
-      ITimeRangeIterator timeRangeIterator,
       long maxReturnSize) {
     this.operatorContext = operatorContext;
     this.ascending = ascending;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
index a1fc271b1a..76499849cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
@@ -50,7 +50,7 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper
       boolean ascending,
       GroupByTimeParameter groupByTimeParameter,
       long maxReturnSize) {
-    super(operatorContext, aggregators, child, ascending, timeRangeIterator, maxReturnSize);
+    super(operatorContext, aggregators, child, ascending, maxReturnSize);
     checkArgument(
         groupByTimeParameter != null,
         "GroupByTimeParameter cannot be null in SlidingWindowAggregationOperator");