You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/10/16 03:14:11 UTC
[iotdb] 01/01: add IntoOperator
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bfad22909929b3480d473b41f3c2d543b4eaab6d
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Sun Oct 16 11:13:50 2022 +0800
add IntoOperator
---
.../execution/operator/process/IntoOperator.java | 309 +++++++++++++++++++++
.../process/RawDataAggregationOperator.java | 2 +-
.../process/SingleInputAggregationOperator.java | 2 -
.../process/SlidingWindowAggregationOperator.java | 2 +-
4 files changed, 311 insertions(+), 4 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
new file mode 100644
index 0000000000..c5ca4cc091
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BitMap;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class IntoOperator implements ProcessOperator {
+
+ private final OperatorContext operatorContext;
+ private final Operator child;
+
+ private final List<InsertTabletStatementGenerator> insertTabletStatementGenerators;
+
+ public IntoOperator(
+ OperatorContext operatorContext,
+ Operator child,
+ Map<PartialPath, Map<String, InputLocation>> targetPathToSourceMap,
+ Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
+ Map<PartialPath, Boolean> targetDeviceToAlignedMap) {
+ this.operatorContext = operatorContext;
+ this.child = child;
+ this.insertTabletStatementGenerators =
+ constructInsertTabletStatementGenerators(
+ targetPathToSourceMap, targetPathToDataTypeMap, targetDeviceToAlignedMap);
+ }
+
+ private List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators(
+ Map<PartialPath, Map<String, InputLocation>> targetPathToSourceMap,
+ Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
+ Map<PartialPath, Boolean> targetDeviceToAlignedMap) {
+ List<InsertTabletStatementGenerator> insertTabletStatementGenerators =
+ new ArrayList<>(targetPathToSourceMap.size());
+ for (PartialPath targetDevice : targetPathToSourceMap.keySet()) {
+ InsertTabletStatementGenerator generator =
+ new InsertTabletStatementGenerator(
+ targetDevice,
+ targetPathToSourceMap.get(targetDevice),
+ targetPathToDataTypeMap.get(targetDevice),
+ targetDeviceToAlignedMap.get(targetDevice));
+ insertTabletStatementGenerators.add(generator);
+ }
+ return insertTabletStatementGenerators;
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public ListenableFuture<?> isBlocked() {
+ return child.isBlocked();
+ }
+
+ @Override
+ public TsBlock next() {
+ TsBlock inputTsBlock = child.next();
+ int lastReadIndex = 0;
+ while (lastReadIndex < inputTsBlock.getPositionCount()) {
+ for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+ lastReadIndex = generator.processTsBlock(inputTsBlock, lastReadIndex);
+ }
+ insertMultiTabletsInternally(true);
+ }
+
+ if (child.hasNext()) {
+ return null;
+ } else {
+ insertMultiTabletsInternally(false);
+ return constructResultTsBlock();
+ }
+ }
+
+ private void insertMultiTabletsInternally(boolean needCheck) {
+ if ((needCheck && !insertTabletStatementGenerators.get(0).isFull())
+ || insertTabletStatementGenerators.get(0).isEmpty()) {
+ return;
+ }
+
+ List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>();
+ for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+ insertTabletStatementList.add(generator.constructInsertTabletStatement());
+ }
+
+ InsertMultiTabletsStatement insertMultiTabletsStatement = new InsertMultiTabletsStatement();
+ insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList);
+ // TODO: execute insertMultiTabletsStatement
+
+ for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+ generator.reset();
+ }
+ }
+
+ private TsBlock constructResultTsBlock() {
+ List<TSDataType> dataTypes = new ArrayList<>();
+ TsBlockBuilder resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
+ return resultTsBlockBuilder.build();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return child.hasNext();
+ }
+
+ @Override
+ public void close() throws Exception {
+ child.close();
+ }
+
+ @Override
+ public boolean isFinished() {
+ return child.isFinished();
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 0;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 0;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
+
+ private static class InsertTabletStatementGenerator {
+
+ private final int TABLET_ROW_LIMIT =
+ IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
+
+ private final PartialPath devicePath;
+ private final boolean isAligned;
+ private final String[] measurements;
+ private final TSDataType[] dataTypes;
+ private final InputLocation[] inputLocations;
+
+ private int rowCount = 0;
+
+ private long[] times;
+ private Object[] columns;
+ private BitMap[] bitMaps;
+
+ public InsertTabletStatementGenerator(
+ PartialPath devicePath,
+ Map<String, InputLocation> measurementToInputLocationMap,
+ Map<String, TSDataType> measurementToDataTypeMap,
+ Boolean isAligned) {
+ this.devicePath = devicePath;
+ this.isAligned = isAligned;
+ this.measurements = measurementToInputLocationMap.keySet().toArray(new String[0]);
+ this.dataTypes = measurementToDataTypeMap.values().toArray(new TSDataType[0]);
+ this.inputLocations = measurementToInputLocationMap.values().toArray(new InputLocation[0]);
+ }
+
+ public void reset() {
+ this.rowCount = 0;
+ this.times = new long[TABLET_ROW_LIMIT];
+ this.columns = new Object[this.measurements.length];
+ this.bitMaps = new BitMap[this.measurements.length];
+ for (int i = 0; i < this.bitMaps.length; ++i) {
+ this.bitMaps[i] = new BitMap(TABLET_ROW_LIMIT);
+ this.bitMaps[i].markAll();
+ }
+ }
+
+ public int processTsBlock(TsBlock tsBlock, int lastReadIndex) {
+ for (; lastReadIndex < tsBlock.getPositionCount(); lastReadIndex++) {
+
+ times[rowCount] = tsBlock.getTimeByIndex(lastReadIndex);
+
+ for (int i = 0; i < measurements.length; ++i) {
+ Column valueColumn = tsBlock.getValueColumns()[inputLocations[i].getValueColumnIndex()];
+
+ // if the value is NULL
+ if (valueColumn.isNull(lastReadIndex)) {
+ // bit in bitMaps are marked as 1 (NULL) by default
+ continue;
+ }
+
+ bitMaps[i].unmark(rowCount);
+ switch (valueColumn.getDataType()) {
+ case INT32:
+ ((int[]) columns[i])[rowCount] = valueColumn.getInt(lastReadIndex);
+ break;
+ case INT64:
+ ((long[]) columns[i])[rowCount] = valueColumn.getLong(lastReadIndex);
+ break;
+ case FLOAT:
+ ((float[]) columns[i])[rowCount] = valueColumn.getFloat(lastReadIndex);
+ break;
+ case DOUBLE:
+ ((double[]) columns[i])[rowCount] = valueColumn.getDouble(lastReadIndex);
+ break;
+ case BOOLEAN:
+ ((boolean[]) columns[i])[rowCount] = valueColumn.getBoolean(lastReadIndex);
+ break;
+ case TEXT:
+ ((Binary[]) columns[i])[rowCount] = valueColumn.getBinary(lastReadIndex);
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format(
+ "data type %s is not supported when convert data at client",
+ valueColumn.getDataType()));
+ }
+ }
+
+ ++rowCount;
+ if (rowCount == TABLET_ROW_LIMIT) {
+ break;
+ }
+ }
+ return lastReadIndex;
+ }
+
+ public boolean isFull() {
+ return rowCount == TABLET_ROW_LIMIT;
+ }
+
+ public boolean isEmpty() {
+ return rowCount == 0;
+ }
+
+ public InsertTabletStatement constructInsertTabletStatement() {
+ InsertTabletStatement insertTabletStatement = new InsertTabletStatement();
+ insertTabletStatement.setDevicePath(devicePath);
+ insertTabletStatement.setAligned(isAligned);
+ insertTabletStatement.setMeasurements(measurements);
+ insertTabletStatement.setDataTypes(dataTypes);
+ insertTabletStatement.setRowCount(rowCount);
+
+ if (rowCount != TABLET_ROW_LIMIT) {
+ times = Arrays.copyOf(times, rowCount);
+ for (int i = 0; i < columns.length; i++) {
+ switch (dataTypes[i]) {
+ case BOOLEAN:
+ columns[i] = Arrays.copyOf((boolean[]) columns[i], rowCount);
+ break;
+ case INT32:
+ columns[i] = Arrays.copyOf((int[]) columns[i], rowCount);
+ break;
+ case INT64:
+ columns[i] = Arrays.copyOf((long[]) columns[i], rowCount);
+ break;
+ case FLOAT:
+ columns[i] = Arrays.copyOf((float[]) columns[i], rowCount);
+ break;
+ case DOUBLE:
+ columns[i] = Arrays.copyOf((double[]) columns[i], rowCount);
+ break;
+ case TEXT:
+ columns[i] = Arrays.copyOf((Binary[]) columns[i], rowCount);
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.", dataTypes[i]));
+ }
+ }
+ }
+
+ insertTabletStatement.setTimes(times);
+ insertTabletStatement.setColumns(columns);
+ insertTabletStatement.setBitMaps(bitMaps);
+
+ return insertTabletStatement;
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
index e23a1232c7..30488c18d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java
@@ -53,7 +53,7 @@ public class RawDataAggregationOperator extends SingleInputAggregationOperator {
Operator child,
boolean ascending,
long maxReturnSize) {
- super(operatorContext, aggregators, child, ascending, timeRangeIterator, maxReturnSize);
+ super(operatorContext, aggregators, child, ascending, maxReturnSize);
this.windowManager = new TimeWindowManager(timeRangeIterator);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
index 676ccd823e..16071aea1e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
-import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -56,7 +55,6 @@ public abstract class SingleInputAggregationOperator implements ProcessOperator
List<Aggregator> aggregators,
Operator child,
boolean ascending,
- ITimeRangeIterator timeRangeIterator,
long maxReturnSize) {
this.operatorContext = operatorContext;
this.ascending = ascending;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
index a1fc271b1a..76499849cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SlidingWindowAggregationOperator.java
@@ -50,7 +50,7 @@ public class SlidingWindowAggregationOperator extends SingleInputAggregationOper
boolean ascending,
GroupByTimeParameter groupByTimeParameter,
long maxReturnSize) {
- super(operatorContext, aggregators, child, ascending, timeRangeIterator, maxReturnSize);
+ super(operatorContext, aggregators, child, ascending, maxReturnSize);
checkArgument(
groupByTimeParameter != null,
"GroupByTimeParameter cannot be null in SlidingWindowAggregationOperator");