You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/10/19 11:13:02 UTC
[iotdb] 01/12: finish IntoOperator v1
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2293ea4ca46e4ec95ff65f7950f69b1b5ca0429c
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Sun Oct 16 19:01:35 2022 +0800
finish IntoOperator v1
---
.../execution/operator/process/IntoOperator.java | 65 +++++++++++++++++++---
1 file changed, 58 insertions(+), 7 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index c5ca4cc091..1251bb3e50 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
@@ -31,15 +33,21 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
+import org.apache.iotdb.tsfile.utils.Pair;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
public class IntoOperator implements ProcessOperator {
@@ -47,18 +55,24 @@ public class IntoOperator implements ProcessOperator {
private final Operator child;
private final List<InsertTabletStatementGenerator> insertTabletStatementGenerators;
+ private final List<Pair<String, String>> sourceTargetPathPairList;
+ private final Map<String, InputLocation> sourceColumnToInputLocationMap;
public IntoOperator(
OperatorContext operatorContext,
Operator child,
Map<PartialPath, Map<String, InputLocation>> targetPathToSourceMap,
Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
- Map<PartialPath, Boolean> targetDeviceToAlignedMap) {
+ Map<PartialPath, Boolean> targetDeviceToAlignedMap,
+ List<Pair<String, String>> sourceTargetPathPairList,
+ Map<String, InputLocation> sourceColumnToInputLocationMap) {
this.operatorContext = operatorContext;
this.child = child;
this.insertTabletStatementGenerators =
constructInsertTabletStatementGenerators(
targetPathToSourceMap, targetPathToDataTypeMap, targetDeviceToAlignedMap);
+ this.sourceTargetPathPairList = sourceTargetPathPairList;
+ this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap;
}
private List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators(
@@ -129,11 +143,34 @@ public class IntoOperator implements ProcessOperator {
}
private TsBlock constructResultTsBlock() {
- List<TSDataType> dataTypes = new ArrayList<>();
- TsBlockBuilder resultTsBlockBuilder = new TsBlockBuilder(dataTypes);
+ List<TSDataType> outputDataTypes =
+ ColumnHeaderConstant.selectIntoColumnHeaders.stream()
+ .map(ColumnHeader::getColumnType)
+ .collect(Collectors.toList());
+ TsBlockBuilder resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
+ TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
+ ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
+ for (Pair<String, String> sourceTargetPathPair : sourceTargetPathPairList) {
+ timeColumnBuilder.writeLong(0);
+ columnBuilders[0].writeBinary(new Binary(sourceTargetPathPair.left));
+ columnBuilders[1].writeBinary(new Binary(sourceTargetPathPair.right));
+ columnBuilders[2].writeInt(findWritten(sourceTargetPathPair.left));
+ resultTsBlockBuilder.declarePosition();
+ }
return resultTsBlockBuilder.build();
}
+ private int findWritten(String sourceColumn) {
+ InputLocation inputLocation = sourceColumnToInputLocationMap.get(sourceColumn);
+ for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+ int written = generator.getWrittenCount(inputLocation);
+ if (written != -1) {
+ return written;
+ }
+ }
+ return 0;
+ }
+
@Override
public boolean hasNext() {
return child.hasNext();
@@ -151,20 +188,20 @@ public class IntoOperator implements ProcessOperator {
@Override
public long calculateMaxPeekMemory() {
- return 0;
+ return child.calculateMaxPeekMemory();
}
@Override
public long calculateMaxReturnSize() {
- return 0;
+ return child.calculateMaxReturnSize();
}
@Override
public long calculateRetainedSizeAfterCallingNext() {
- return 0;
+ return child.calculateRetainedSizeAfterCallingNext();
}
- private static class InsertTabletStatementGenerator {
+ public static class InsertTabletStatementGenerator {
private final int TABLET_ROW_LIMIT =
IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
@@ -181,6 +218,8 @@ public class IntoOperator implements ProcessOperator {
private Object[] columns;
private BitMap[] bitMaps;
+ private final Map<InputLocation, AtomicInteger> writtenCounter;
+
public InsertTabletStatementGenerator(
PartialPath devicePath,
Map<String, InputLocation> measurementToInputLocationMap,
@@ -191,6 +230,10 @@ public class IntoOperator implements ProcessOperator {
this.measurements = measurementToInputLocationMap.keySet().toArray(new String[0]);
this.dataTypes = measurementToDataTypeMap.values().toArray(new TSDataType[0]);
this.inputLocations = measurementToInputLocationMap.values().toArray(new InputLocation[0]);
+ this.writtenCounter = new HashMap<>();
+ for (InputLocation inputLocation : inputLocations) {
+ writtenCounter.put(inputLocation, new AtomicInteger(0));
+ }
}
public void reset() {
@@ -219,6 +262,7 @@ public class IntoOperator implements ProcessOperator {
}
bitMaps[i].unmark(rowCount);
+ writtenCounter.get(inputLocations[i]).getAndIncrement();
switch (valueColumn.getDataType()) {
case INT32:
((int[]) columns[i])[rowCount] = valueColumn.getInt(lastReadIndex);
@@ -305,5 +349,12 @@ public class IntoOperator implements ProcessOperator {
return insertTabletStatement;
}
+
+ public int getWrittenCount(InputLocation inputLocation) {
+ if (!writtenCounter.containsKey(inputLocation)) {
+ return -1;
+ }
+ return writtenCounter.get(inputLocation).get();
+ }
}
}