You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/10/19 11:13:09 UTC
[iotdb] 08/12: implement DeviceViewIntoOperator
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/intoOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit de98a1a31da1e5b20a458fa5b6602108be374aed
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Oct 18 16:57:37 2022 +0800
implement DeviceViewIntoOperator
---
...IntoOperator.java => AbstractIntoOperator.java} | 115 +++-----
.../operator/process/DeviceViewIntoOperator.java | 128 ++++++---
.../execution/operator/process/IntoOperator.java | 302 +--------------------
3 files changed, 126 insertions(+), 419 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
similarity index 74%
copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index 1491623e77..d038874d0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -21,8 +21,6 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
-import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
@@ -31,13 +29,9 @@ import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
-import org.apache.iotdb.tsfile.utils.Pair;
import com.google.common.util.concurrent.ListenableFuture;
@@ -47,43 +41,37 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-public class IntoOperator implements ProcessOperator {
+public abstract class AbstractIntoOperator implements ProcessOperator {
- private final OperatorContext operatorContext;
- private final Operator child;
+ protected final OperatorContext operatorContext;
+ protected final Operator child;
- private final List<InsertTabletStatementGenerator> insertTabletStatementGenerators;
- private final List<Pair<String, PartialPath>> sourceTargetPathPairList;
- private final Map<String, InputLocation> sourceColumnToInputLocationMap;
+ protected List<IntoOperator.InsertTabletStatementGenerator> insertTabletStatementGenerators;
- public IntoOperator(
+ protected final Map<String, InputLocation> sourceColumnToInputLocationMap;
+
+ public AbstractIntoOperator(
OperatorContext operatorContext,
Operator child,
- Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap,
- Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
- Map<String, Boolean> targetDeviceToAlignedMap,
- List<Pair<String, PartialPath>> sourceTargetPathPairList,
+ List<IntoOperator.InsertTabletStatementGenerator> insertTabletStatementGenerators,
Map<String, InputLocation> sourceColumnToInputLocationMap) {
this.operatorContext = operatorContext;
this.child = child;
- this.insertTabletStatementGenerators =
- constructInsertTabletStatementGenerators(
- targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap);
- this.sourceTargetPathPairList = sourceTargetPathPairList;
+ this.insertTabletStatementGenerators = insertTabletStatementGenerators;
this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap;
}
- private List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators(
- Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap,
- Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
- Map<String, Boolean> targetDeviceToAlignedMap) {
- List<InsertTabletStatementGenerator> insertTabletStatementGenerators =
+ protected static List<IntoOperator.InsertTabletStatementGenerator>
+ constructInsertTabletStatementGenerators(
+ Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap,
+ Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
+ Map<String, Boolean> targetDeviceToAlignedMap) {
+ List<IntoOperator.InsertTabletStatementGenerator> insertTabletStatementGenerators =
new ArrayList<>(targetPathToSourceInputLocationMap.size());
for (PartialPath targetDevice : targetPathToSourceInputLocationMap.keySet()) {
- InsertTabletStatementGenerator generator =
- new InsertTabletStatementGenerator(
+ IntoOperator.InsertTabletStatementGenerator generator =
+ new IntoOperator.InsertTabletStatementGenerator(
targetDevice,
targetPathToSourceInputLocationMap.get(targetDevice),
targetPathToDataTypeMap.get(targetDevice),
@@ -93,45 +81,14 @@ public class IntoOperator implements ProcessOperator {
return insertTabletStatementGenerators;
}
- @Override
- public OperatorContext getOperatorContext() {
- return operatorContext;
- }
-
- @Override
- public ListenableFuture<?> isBlocked() {
- return child.isBlocked();
- }
-
- @Override
- public TsBlock next() {
- TsBlock inputTsBlock = child.next();
- if (inputTsBlock != null) {
- int lastReadIndex = 0;
- while (lastReadIndex < inputTsBlock.getPositionCount()) {
- for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
- lastReadIndex = generator.processTsBlock(inputTsBlock, lastReadIndex);
- }
- insertMultiTabletsInternally(true);
- }
- }
-
- if (child.hasNext()) {
- return null;
- } else {
- insertMultiTabletsInternally(false);
- return constructResultTsBlock();
- }
- }
-
- private void insertMultiTabletsInternally(boolean needCheck) {
+ protected void insertMultiTabletsInternally(boolean needCheck) {
if ((needCheck && !insertTabletStatementGenerators.get(0).isFull())
|| insertTabletStatementGenerators.get(0).isEmpty()) {
return;
}
List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>();
- for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+ for (IntoOperator.InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
insertTabletStatementList.add(generator.constructInsertTabletStatement());
}
@@ -139,32 +96,14 @@ public class IntoOperator implements ProcessOperator {
insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList);
// TODO: execute insertMultiTabletsStatement
- for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+ for (IntoOperator.InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
generator.reset();
}
}
- private TsBlock constructResultTsBlock() {
- List<TSDataType> outputDataTypes =
- ColumnHeaderConstant.selectIntoColumnHeaders.stream()
- .map(ColumnHeader::getColumnType)
- .collect(Collectors.toList());
- TsBlockBuilder resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
- TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
- ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
- for (Pair<String, PartialPath> sourceTargetPathPair : sourceTargetPathPairList) {
- timeColumnBuilder.writeLong(0);
- columnBuilders[0].writeBinary(new Binary(sourceTargetPathPair.left));
- columnBuilders[1].writeBinary(new Binary(sourceTargetPathPair.right.toString()));
- columnBuilders[2].writeInt(findWritten(sourceTargetPathPair.left));
- resultTsBlockBuilder.declarePosition();
- }
- return resultTsBlockBuilder.build();
- }
-
- private int findWritten(String sourceColumn) {
+ protected int findWritten(String sourceColumn) {
InputLocation inputLocation = sourceColumnToInputLocationMap.get(sourceColumn);
- for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+ for (IntoOperator.InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
int written = generator.getWrittenCount(inputLocation);
if (written != -1) {
return written;
@@ -173,6 +112,16 @@ public class IntoOperator implements ProcessOperator {
return 0;
}
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public ListenableFuture<?> isBlocked() {
+ return child.isBlocked();
+ }
+
@Override
public boolean hasNext() {
return child.hasNext();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
index e1c38e662c..bf1a0920fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -19,64 +19,112 @@
package org.apache.iotdb.db.mpp.execution.operator.process;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.Pair;
-import com.google.common.util.concurrent.ListenableFuture;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
-public class DeviceViewIntoOperator implements ProcessOperator {
+public class DeviceViewIntoOperator extends AbstractIntoOperator {
- private final OperatorContext operatorContext;
- private final Operator child;
+ private final Map<String, Map<PartialPath, Map<String, InputLocation>>>
+ deviceToTargetPathSourceInputLocationMap;
+ private final Map<String, Map<PartialPath, Map<String, TSDataType>>>
+ deviceToTargetPathDataTypeMap;
+ private final Map<String, Boolean> targetDeviceToAlignedMap;
+ private final Map<String, List<Pair<String, PartialPath>>> deviceToSourceTargetPathPairListMap;
- public DeviceViewIntoOperator(OperatorContext operatorContext, Operator child) {
- this.operatorContext = operatorContext;
- this.child = child;
- }
+ private String currentDevice;
- @Override
- public OperatorContext getOperatorContext() {
- return operatorContext;
- }
+ private final TsBlockBuilder resultTsBlockBuilder;
- @Override
- public ListenableFuture<?> isBlocked() {
- return child.isBlocked();
- }
+ public DeviceViewIntoOperator(
+ OperatorContext operatorContext,
+ Operator child,
+ Map<String, Map<PartialPath, Map<String, InputLocation>>>
+ deviceToTargetPathSourceInputLocationMap,
+ Map<String, Map<PartialPath, Map<String, TSDataType>>> deviceToTargetPathDataTypeMap,
+ Map<String, Boolean> targetDeviceToAlignedMap,
+ Map<String, List<Pair<String, PartialPath>>> deviceToSourceTargetPathPairListMap,
+ Map<String, InputLocation> sourceColumnToInputLocationMap) {
+ super(operatorContext, child, null, sourceColumnToInputLocationMap);
+ this.deviceToTargetPathSourceInputLocationMap = deviceToTargetPathSourceInputLocationMap;
+ this.deviceToTargetPathDataTypeMap = deviceToTargetPathDataTypeMap;
+ this.targetDeviceToAlignedMap = targetDeviceToAlignedMap;
+ this.deviceToSourceTargetPathPairListMap = deviceToSourceTargetPathPairListMap;
- @Override
- public TsBlock next() {
- return null;
+ List<TSDataType> outputDataTypes =
+ ColumnHeaderConstant.selectIntoAlignByDeviceColumnHeaders.stream()
+ .map(ColumnHeader::getColumnType)
+ .collect(Collectors.toList());
+ this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes);
}
@Override
- public boolean hasNext() {
- return child.hasNext();
- }
-
- @Override
- public void close() throws Exception {
- child.close();
- }
+ public TsBlock next() {
+ TsBlock inputTsBlock = child.next();
+ if (inputTsBlock != null) {
+ String device = String.valueOf(inputTsBlock.getValueColumns()[0].getBinary(0));
+ if (!Objects.equals(device, currentDevice)) {
+ insertMultiTabletsInternally(false);
+ updateResultTsBlock();
- @Override
- public boolean isFinished() {
- return child.isFinished();
- }
+ insertTabletStatementGenerators = constructInsertTabletStatementGeneratorsByDevice(device);
+ currentDevice = device;
+ }
+ int lastReadIndex = 0;
+ while (lastReadIndex < inputTsBlock.getPositionCount()) {
+ for (IntoOperator.InsertTabletStatementGenerator generator :
+ insertTabletStatementGenerators) {
+ lastReadIndex = generator.processTsBlock(inputTsBlock, lastReadIndex);
+ }
+ insertMultiTabletsInternally(true);
+ }
+ }
- @Override
- public long calculateMaxPeekMemory() {
- return child.calculateMaxPeekMemory();
+ if (child.hasNext()) {
+ return null;
+ } else {
+ insertMultiTabletsInternally(false);
+ updateResultTsBlock();
+ return resultTsBlockBuilder.build();
+ }
}
- @Override
- public long calculateMaxReturnSize() {
- return child.calculateMaxReturnSize();
+ private void updateResultTsBlock() {
+ TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder();
+ ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
+ for (Pair<String, PartialPath> sourceTargetPathPair :
+ deviceToSourceTargetPathPairListMap.get(currentDevice)) {
+ timeColumnBuilder.writeLong(0);
+ columnBuilders[0].writeBinary(new Binary(currentDevice));
+ columnBuilders[1].writeBinary(new Binary(sourceTargetPathPair.left));
+ columnBuilders[2].writeBinary(new Binary(sourceTargetPathPair.right.toString()));
+ columnBuilders[3].writeInt(findWritten(sourceTargetPathPair.left));
+ resultTsBlockBuilder.declarePosition();
+ }
}
- @Override
- public long calculateRetainedSizeAfterCallingNext() {
- return child.calculateRetainedSizeAfterCallingNext();
+ private List<IntoOperator.InsertTabletStatementGenerator>
+ constructInsertTabletStatementGeneratorsByDevice(String currentDevice) {
+ Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap =
+ deviceToTargetPathSourceInputLocationMap.get(currentDevice);
+ Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap =
+ deviceToTargetPathDataTypeMap.get(currentDevice);
+ return constructInsertTabletStatementGenerators(
+ targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index 1491623e77..6a6c2045d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -20,43 +20,26 @@
package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
-import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
-import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement;
-import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.utils.Pair;
-import com.google.common.util.concurrent.ListenableFuture;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
-public class IntoOperator implements ProcessOperator {
-
- private final OperatorContext operatorContext;
- private final Operator child;
+public class IntoOperator extends AbstractIntoOperator {
- private final List<InsertTabletStatementGenerator> insertTabletStatementGenerators;
private final List<Pair<String, PartialPath>> sourceTargetPathPairList;
- private final Map<String, InputLocation> sourceColumnToInputLocationMap;
public IntoOperator(
OperatorContext operatorContext,
@@ -66,41 +49,13 @@ public class IntoOperator implements ProcessOperator {
Map<String, Boolean> targetDeviceToAlignedMap,
List<Pair<String, PartialPath>> sourceTargetPathPairList,
Map<String, InputLocation> sourceColumnToInputLocationMap) {
- this.operatorContext = operatorContext;
- this.child = child;
- this.insertTabletStatementGenerators =
+ super(
+ operatorContext,
+ child,
constructInsertTabletStatementGenerators(
- targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap);
+ targetPathToSourceInputLocationMap, targetPathToDataTypeMap, targetDeviceToAlignedMap),
+ sourceColumnToInputLocationMap);
this.sourceTargetPathPairList = sourceTargetPathPairList;
- this.sourceColumnToInputLocationMap = sourceColumnToInputLocationMap;
- }
-
- private List<InsertTabletStatementGenerator> constructInsertTabletStatementGenerators(
- Map<PartialPath, Map<String, InputLocation>> targetPathToSourceInputLocationMap,
- Map<PartialPath, Map<String, TSDataType>> targetPathToDataTypeMap,
- Map<String, Boolean> targetDeviceToAlignedMap) {
- List<InsertTabletStatementGenerator> insertTabletStatementGenerators =
- new ArrayList<>(targetPathToSourceInputLocationMap.size());
- for (PartialPath targetDevice : targetPathToSourceInputLocationMap.keySet()) {
- InsertTabletStatementGenerator generator =
- new InsertTabletStatementGenerator(
- targetDevice,
- targetPathToSourceInputLocationMap.get(targetDevice),
- targetPathToDataTypeMap.get(targetDevice),
- targetDeviceToAlignedMap.get(targetDevice.toString()));
- insertTabletStatementGenerators.add(generator);
- }
- return insertTabletStatementGenerators;
- }
-
- @Override
- public OperatorContext getOperatorContext() {
- return operatorContext;
- }
-
- @Override
- public ListenableFuture<?> isBlocked() {
- return child.isBlocked();
}
@Override
@@ -124,26 +79,6 @@ public class IntoOperator implements ProcessOperator {
}
}
- private void insertMultiTabletsInternally(boolean needCheck) {
- if ((needCheck && !insertTabletStatementGenerators.get(0).isFull())
- || insertTabletStatementGenerators.get(0).isEmpty()) {
- return;
- }
-
- List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>();
- for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
- insertTabletStatementList.add(generator.constructInsertTabletStatement());
- }
-
- InsertMultiTabletsStatement insertMultiTabletsStatement = new InsertMultiTabletsStatement();
- insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList);
- // TODO: execute insertMultiTabletsStatement
-
- for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
- generator.reset();
- }
- }
-
private TsBlock constructResultTsBlock() {
List<TSDataType> outputDataTypes =
ColumnHeaderConstant.selectIntoColumnHeaders.stream()
@@ -161,229 +96,4 @@ public class IntoOperator implements ProcessOperator {
}
return resultTsBlockBuilder.build();
}
-
- private int findWritten(String sourceColumn) {
- InputLocation inputLocation = sourceColumnToInputLocationMap.get(sourceColumn);
- for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
- int written = generator.getWrittenCount(inputLocation);
- if (written != -1) {
- return written;
- }
- }
- return 0;
- }
-
- @Override
- public boolean hasNext() {
- return child.hasNext();
- }
-
- @Override
- public void close() throws Exception {
- child.close();
- }
-
- @Override
- public boolean isFinished() {
- return child.isFinished();
- }
-
- @Override
- public long calculateMaxPeekMemory() {
- return child.calculateMaxPeekMemory();
- }
-
- @Override
- public long calculateMaxReturnSize() {
- return child.calculateMaxReturnSize();
- }
-
- @Override
- public long calculateRetainedSizeAfterCallingNext() {
- return child.calculateRetainedSizeAfterCallingNext();
- }
-
- public static class InsertTabletStatementGenerator {
-
- private final int TABLET_ROW_LIMIT =
- IoTDBDescriptor.getInstance().getConfig().getSelectIntoInsertTabletPlanRowLimit();
-
- private final PartialPath devicePath;
- private final boolean isAligned;
- private final String[] measurements;
- private final TSDataType[] dataTypes;
- private final InputLocation[] inputLocations;
-
- private int rowCount = 0;
-
- private long[] times;
- private Object[] columns;
- private BitMap[] bitMaps;
-
- private final Map<InputLocation, AtomicInteger> writtenCounter;
-
- public InsertTabletStatementGenerator(
- PartialPath devicePath,
- Map<String, InputLocation> measurementToInputLocationMap,
- Map<String, TSDataType> measurementToDataTypeMap,
- Boolean isAligned) {
- this.devicePath = devicePath;
- this.isAligned = isAligned;
- this.measurements = measurementToInputLocationMap.keySet().toArray(new String[0]);
- this.dataTypes = measurementToDataTypeMap.values().toArray(new TSDataType[0]);
- this.inputLocations = measurementToInputLocationMap.values().toArray(new InputLocation[0]);
- this.writtenCounter = new HashMap<>();
- for (InputLocation inputLocation : inputLocations) {
- writtenCounter.put(inputLocation, new AtomicInteger(0));
- }
- this.reset();
- }
-
- public void reset() {
- this.rowCount = 0;
- this.times = new long[TABLET_ROW_LIMIT];
- this.columns = new Object[this.measurements.length];
- for (int i = 0; i < this.measurements.length; i++) {
- switch (dataTypes[i]) {
- case BOOLEAN:
- columns[i] = new boolean[TABLET_ROW_LIMIT];
- break;
- case INT32:
- columns[i] = new int[TABLET_ROW_LIMIT];
- break;
- case INT64:
- columns[i] = new long[TABLET_ROW_LIMIT];
- break;
- case FLOAT:
- columns[i] = new float[TABLET_ROW_LIMIT];
- break;
- case DOUBLE:
- columns[i] = new double[TABLET_ROW_LIMIT];
- break;
- case TEXT:
- columns[i] = new Binary[TABLET_ROW_LIMIT];
- Arrays.fill((Binary[]) columns[i], Binary.EMPTY_VALUE);
- break;
- default:
- throw new UnSupportedDataTypeException(
- String.format("Data type %s is not supported.", dataTypes[i]));
- }
- }
- this.bitMaps = new BitMap[this.measurements.length];
- for (int i = 0; i < this.bitMaps.length; ++i) {
- this.bitMaps[i] = new BitMap(TABLET_ROW_LIMIT);
- this.bitMaps[i].markAll();
- }
- }
-
- public int processTsBlock(TsBlock tsBlock, int lastReadIndex) {
- for (; lastReadIndex < tsBlock.getPositionCount(); lastReadIndex++) {
-
- times[rowCount] = tsBlock.getTimeByIndex(lastReadIndex);
-
- for (int i = 0; i < measurements.length; ++i) {
- Column valueColumn = tsBlock.getValueColumns()[inputLocations[i].getValueColumnIndex()];
-
- // if the value is NULL
- if (valueColumn.isNull(lastReadIndex)) {
- // bit in bitMaps are marked as 1 (NULL) by default
- continue;
- }
-
- bitMaps[i].unmark(rowCount);
- writtenCounter.get(inputLocations[i]).getAndIncrement();
- switch (valueColumn.getDataType()) {
- case INT32:
- ((int[]) columns[i])[rowCount] = valueColumn.getInt(lastReadIndex);
- break;
- case INT64:
- ((long[]) columns[i])[rowCount] = valueColumn.getLong(lastReadIndex);
- break;
- case FLOAT:
- ((float[]) columns[i])[rowCount] = valueColumn.getFloat(lastReadIndex);
- break;
- case DOUBLE:
- ((double[]) columns[i])[rowCount] = valueColumn.getDouble(lastReadIndex);
- break;
- case BOOLEAN:
- ((boolean[]) columns[i])[rowCount] = valueColumn.getBoolean(lastReadIndex);
- break;
- case TEXT:
- ((Binary[]) columns[i])[rowCount] = valueColumn.getBinary(lastReadIndex);
- break;
- default:
- throw new UnSupportedDataTypeException(
- String.format(
- "data type %s is not supported when convert data at client",
- valueColumn.getDataType()));
- }
- }
-
- ++rowCount;
- if (rowCount == TABLET_ROW_LIMIT) {
- break;
- }
- }
- return lastReadIndex;
- }
-
- public boolean isFull() {
- return rowCount == TABLET_ROW_LIMIT;
- }
-
- public boolean isEmpty() {
- return rowCount == 0;
- }
-
- public InsertTabletStatement constructInsertTabletStatement() {
- InsertTabletStatement insertTabletStatement = new InsertTabletStatement();
- insertTabletStatement.setDevicePath(devicePath);
- insertTabletStatement.setAligned(isAligned);
- insertTabletStatement.setMeasurements(measurements);
- insertTabletStatement.setDataTypes(dataTypes);
- insertTabletStatement.setRowCount(rowCount);
-
- if (rowCount != TABLET_ROW_LIMIT) {
- times = Arrays.copyOf(times, rowCount);
- for (int i = 0; i < columns.length; i++) {
- switch (dataTypes[i]) {
- case BOOLEAN:
- columns[i] = Arrays.copyOf((boolean[]) columns[i], rowCount);
- break;
- case INT32:
- columns[i] = Arrays.copyOf((int[]) columns[i], rowCount);
- break;
- case INT64:
- columns[i] = Arrays.copyOf((long[]) columns[i], rowCount);
- break;
- case FLOAT:
- columns[i] = Arrays.copyOf((float[]) columns[i], rowCount);
- break;
- case DOUBLE:
- columns[i] = Arrays.copyOf((double[]) columns[i], rowCount);
- break;
- case TEXT:
- columns[i] = Arrays.copyOf((Binary[]) columns[i], rowCount);
- break;
- default:
- throw new UnSupportedDataTypeException(
- String.format("Data type %s is not supported.", dataTypes[i]));
- }
- }
- }
-
- insertTabletStatement.setTimes(times);
- insertTabletStatement.setColumns(columns);
- insertTabletStatement.setBitMaps(bitMaps);
-
- return insertTabletStatement;
- }
-
- public int getWrittenCount(InputLocation inputLocation) {
- if (!writtenCounter.containsKey(inputLocation)) {
- return -1;
- }
- return writtenCounter.get(inputLocation).get();
- }
- }
}