You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/20 11:08:29 UTC
[iotdb] 01/01: implement device merge operator
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch deviceMergeOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 94202c8a5d0950d9f1da31e56199bdee08c271c9
Author: Alima777 <wx...@gmail.com>
AuthorDate: Wed Apr 20 19:08:10 2022 +0800
implement device merge operator
---
.../mpp/operator/process/DeviceMergeOperator.java | 89 ++++++++++++++++++++--
.../planner/plan/node/process/DeviceMergeNode.java | 11 ++-
.../iotdb/tsfile/read/common/block/TsBlock.java | 31 ++++++--
.../common/block/column/ColumnBuilderStatus.java | 4 +
4 files changed, 119 insertions(+), 16 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
index b2439b1752..ea8e11540f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
@@ -19,38 +19,115 @@
package org.apache.iotdb.db.mpp.operator.process;
import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilderStatus;
import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Since childDeviceOperatorMap should be sorted by the merge order as expected, what
+ * DeviceMergeOperator need to do is traversing the device child operators, get all tsBlocks of one
+ * device and transform it to the form we need, then get the next device operator until no next
+ * operator.
+ *
+ * <p>Attention! If some columns are not existing in one devices, those columns will be null. e.g.
+ * [s1,s2,s3] is query, but only [s1, s3] exists in device1, then the column of s2 will be null.
+ */
public class DeviceMergeOperator implements ProcessOperator {
+
+ private final OperatorContext operatorContext;
+ // <deviceName, corresponding query result operator responsible for that device>
+ private Map<String, Operator> childDeviceOperatorMap;
+ // Column dataTypes that includes device column
+ private final List<TSDataType> dataTypes;
+ // Used to fill columns and leave null columns which doesn't exist in some devices.
+ // e.g. [s1,s2,s3] is query, but [s1, s3] exists in device1, then device1 -> [1, 3], s1 is 1 but
+ // not 0 because device is the first column
+ private Map<String, List<Integer>> deviceToColumnIndexMap;
+
+ private Iterator<Entry<String, Operator>> deviceIterator;
+ private Entry<String, Operator> curDeviceEntry;
+
+ public DeviceMergeOperator(
+ OperatorContext operatorContext,
+ Map<String, Operator> childDeviceOperatorMap,
+ List<TSDataType> dataTypes,
+ Map<String, List<Integer>> deviceToColumnIndexMap) {
+ this.operatorContext = operatorContext;
+ this.childDeviceOperatorMap = childDeviceOperatorMap;
+ this.dataTypes = dataTypes;
+ this.deviceToColumnIndexMap = deviceToColumnIndexMap;
+
+ this.deviceIterator = childDeviceOperatorMap.entrySet().iterator();
+ if (deviceIterator.hasNext()) {
+ curDeviceEntry = deviceIterator.next();
+ }
+ }
+
@Override
public OperatorContext getOperatorContext() {
- return null;
+ return operatorContext;
}
@Override
public ListenableFuture<Void> isBlocked() {
- return ProcessOperator.super.isBlocked();
+ ListenableFuture<Void> blocked = curDeviceEntry.getValue().isBlocked();
+ if (!blocked.isDone()) {
+ return blocked;
+ }
+ return NOT_BLOCKED;
}
@Override
public TsBlock next() {
- return null;
+ TsBlock tsBlock = curDeviceEntry.getValue().next();
+ List<Integer> indexes = deviceToColumnIndexMap.get(curDeviceEntry.getKey());
+
+ // fill existing columns
+ Column[] newValueColumns = new Column[dataTypes.size()];
+ for (int i = 0; i < indexes.size(); i++) {
+ newValueColumns[indexes.get(i)] = tsBlock.getColumn(i);
+ }
+ // construct device column
+ ColumnBuilder deviceColumnBuilder =
+ new BinaryColumnBuilder(new ColumnBuilderStatus(), tsBlock.getPositionCount());
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ deviceColumnBuilder.writeObject(curDeviceEntry.getKey());
+ }
+ // leave other columns null
+ return new TsBlock(tsBlock.getPositionCount(), tsBlock.getTimeColumn(), newValueColumns);
}
@Override
public boolean hasNext() {
- return false;
+ while (!curDeviceEntry.getValue().hasNext()) {
+ if (deviceIterator.hasNext()) {
+ curDeviceEntry = deviceIterator.next();
+ } else {
+ return false;
+ }
+ }
+ return true;
}
@Override
public void close() throws Exception {
- ProcessOperator.super.close();
+ for (Operator child : childDeviceOperatorMap.values()) {
+ child.close();
+ }
}
@Override
public boolean isFinished() {
- return false;
+ return !this.hasNext();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
index 83514408ac..9d66c3c126 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
@@ -33,7 +33,11 @@ import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
import java.util.stream.Collectors;
/**
@@ -50,10 +54,9 @@ public class DeviceMergeNode extends ProcessNode implements IOutputPlanNode {
// The result output order that this operator
private OrderBy mergeOrder;
- // The policy to decide whether a row should be discarded
+ // The policy to decide whether a row should be discarded.
// The without policy is able to be push down to the DeviceMergeNode because we can know whether a
- // row contains
- // null or not.
+ // row contains null or not.
private FilterNullComponent filterNullComponent;
// The map from deviceName to corresponding query result node responsible for that device.
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
index 256b95498d..41686b0b06 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
@@ -67,14 +67,14 @@ public class TsBlock {
private volatile long retainedSizeInBytes = -1;
- public TsBlock(TimeColumn timeColumn, Column... valueColumns) {
- this(true, determinePositionCount(valueColumns), timeColumn, valueColumns);
- }
-
public TsBlock(int positionCount) {
this(false, positionCount, null, EMPTY_COLUMNS);
}
+ public TsBlock(TimeColumn timeColumn, Column... valueColumns) {
+ this(true, determinePositionCount(valueColumns), timeColumn, valueColumns);
+ }
+
public TsBlock(int positionCount, TimeColumn timeColumn, Column... valueColumns) {
this(true, positionCount, timeColumn, valueColumns);
}
@@ -152,7 +152,7 @@ public class TsBlock {
}
public TsBlock appendValueColumn(Column column) {
- requireNonNull(column, "column is null");
+ requireNonNull(column, "Column is null");
if (positionCount != column.getPositionCount()) {
throw new IllegalArgumentException("Block does not have same position count");
}
@@ -162,6 +162,18 @@ public class TsBlock {
return wrapBlocksWithoutCopy(positionCount, timeColumn, newBlocks);
}
+ public TsBlock insertValueColumn(int index, Column column) {
+ requireNonNull(column, "Column is null");
+ if (positionCount != column.getPositionCount()) {
+ throw new IllegalArgumentException("Block does not have same position count");
+ }
+
+ Column[] newBlocks = Arrays.copyOf(valueColumns, valueColumns.length + 1);
+ System.arraycopy(newBlocks, index, newBlocks, index + 1, valueColumns.length - index);
+ newBlocks[index] = column;
+ return wrapBlocksWithoutCopy(positionCount, timeColumn, newBlocks);
+ }
+
public long getTimeByIndex(int index) {
return timeColumn.getLong(index);
}
@@ -183,6 +195,9 @@ public class TsBlock {
}
public TsBlockSingleColumnIterator getTsBlockSingleColumnIterator(int columnIndex) {
+ if (valueColumns[columnIndex] == null) {
+ throw new UnsupportedOperationException("Can not get the iterator of null columns");
+ }
return new TsBlockSingleColumnIterator(0, columnIndex);
}
@@ -305,7 +320,11 @@ public class TsBlock {
int columnCount = getValueColumnCount();
Object[] row = new Object[columnCount + 1];
for (int i = 0; i < columnCount; ++i) {
- row[i] = valueColumns[i].getObject(rowIndex);
+ if (valueColumns[i] == null) {
+ row[i] = null;
+ } else {
+ row[i] = valueColumns[i].getObject(rowIndex);
+ }
}
row[columnCount] = timeColumn.getObject(rowIndex);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilderStatus.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilderStatus.java
index 86dca2ee50..b1cf4ab750 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilderStatus.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilderStatus.java
@@ -41,6 +41,10 @@ public class ColumnBuilderStatus {
requireNonNull(tsBlockBuilderStatus, "tsBlockBuilderStatus must not be null");
}
+ public ColumnBuilderStatus() {
+ this.tsBlockBuilderStatus = new TsBlockBuilderStatus();
+ }
+
public int getMaxTsBlockSizeInBytes() {
return tsBlockBuilderStatus.getMaxTsBlockSizeInBytes();
}