You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/20 11:08:29 UTC

[iotdb] 01/01: implement device merge operator

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch deviceMergeOperator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 94202c8a5d0950d9f1da31e56199bdee08c271c9
Author: Alima777 <wx...@gmail.com>
AuthorDate: Wed Apr 20 19:08:10 2022 +0800

    implement device merge operator
---
 .../mpp/operator/process/DeviceMergeOperator.java  | 89 ++++++++++++++++++++--
 .../planner/plan/node/process/DeviceMergeNode.java | 11 ++-
 .../iotdb/tsfile/read/common/block/TsBlock.java    | 31 ++++++--
 .../common/block/column/ColumnBuilderStatus.java   |  4 +
 4 files changed, 119 insertions(+), 16 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
index b2439b1752..ea8e11540f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/DeviceMergeOperator.java
@@ -19,38 +19,115 @@
 package org.apache.iotdb.db.mpp.operator.process;
 
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilderStatus;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * Since childDeviceOperatorMap should be sorted by the merge order as expected, what
+ * DeviceMergeOperator need to do is traversing the device child operators, get all tsBlocks of one
+ * device and transform it to the form we need, then get the next device operator until no next
+ * operator.
+ *
+ * <p>Attention! If some columns are not existing in one devices, those columns will be null. e.g.
+ * [s1,s2,s3] is query, but only [s1, s3] exists in device1, then the column of s2 will be null.
+ */
 public class DeviceMergeOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+  // <deviceName, corresponding query result operator responsible for that device>
+  private Map<String, Operator> childDeviceOperatorMap;
+  // Column dataTypes that includes device column
+  private final List<TSDataType> dataTypes;
+  // Used to fill columns and leave null columns which doesn't exist in some devices.
+  // e.g. [s1,s2,s3] is query, but [s1, s3] exists in device1, then device1 -> [1, 3], s1 is 1 but
+  // not 0 because device is the first column
+  private Map<String, List<Integer>> deviceToColumnIndexMap;
+
+  private Iterator<Entry<String, Operator>> deviceIterator;
+  private Entry<String, Operator> curDeviceEntry;
+
+  public DeviceMergeOperator(
+      OperatorContext operatorContext,
+      Map<String, Operator> childDeviceOperatorMap,
+      List<TSDataType> dataTypes,
+      Map<String, List<Integer>> deviceToColumnIndexMap) {
+    this.operatorContext = operatorContext;
+    this.childDeviceOperatorMap = childDeviceOperatorMap;
+    this.dataTypes = dataTypes;
+    this.deviceToColumnIndexMap = deviceToColumnIndexMap;
+
+    this.deviceIterator = childDeviceOperatorMap.entrySet().iterator();
+    if (deviceIterator.hasNext()) {
+      curDeviceEntry = deviceIterator.next();
+    }
+  }
+
   @Override
   public OperatorContext getOperatorContext() {
-    return null;
+    return operatorContext;
   }
 
   @Override
   public ListenableFuture<Void> isBlocked() {
-    return ProcessOperator.super.isBlocked();
+    ListenableFuture<Void> blocked = curDeviceEntry.getValue().isBlocked();
+    if (!blocked.isDone()) {
+      return blocked;
+    }
+    return NOT_BLOCKED;
   }
 
   @Override
   public TsBlock next() {
-    return null;
+    TsBlock tsBlock = curDeviceEntry.getValue().next();
+    List<Integer> indexes = deviceToColumnIndexMap.get(curDeviceEntry.getKey());
+
+    // fill existing columns
+    Column[] newValueColumns = new Column[dataTypes.size()];
+    for (int i = 0; i < indexes.size(); i++) {
+      newValueColumns[indexes.get(i)] = tsBlock.getColumn(i);
+    }
+    // construct device column
+    ColumnBuilder deviceColumnBuilder =
+        new BinaryColumnBuilder(new ColumnBuilderStatus(), tsBlock.getPositionCount());
+    for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+      deviceColumnBuilder.writeObject(curDeviceEntry.getKey());
+    }
+    // leave other columns null
+    return new TsBlock(tsBlock.getPositionCount(), tsBlock.getTimeColumn(), newValueColumns);
   }
 
   @Override
   public boolean hasNext() {
-    return false;
+    while (!curDeviceEntry.getValue().hasNext()) {
+      if (deviceIterator.hasNext()) {
+        curDeviceEntry = deviceIterator.next();
+      } else {
+        return false;
+      }
+    }
+    return true;
   }
 
   @Override
   public void close() throws Exception {
-    ProcessOperator.super.close();
+    for (Operator child : childDeviceOperatorMap.values()) {
+      child.close();
+    }
   }
 
   @Override
   public boolean isFinished() {
-    return false;
+    return !this.hasNext();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
index 83514408ac..9d66c3c126 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
@@ -33,7 +33,11 @@ import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 /**
@@ -50,10 +54,9 @@ public class DeviceMergeNode extends ProcessNode implements IOutputPlanNode {
   // The result output order that this operator
   private OrderBy mergeOrder;
 
-  // The policy to decide whether a row should be discarded
+  // The policy to decide whether a row should be discarded.
   // The without policy is able to be push down to the DeviceMergeNode because we can know whether a
-  // row contains
-  // null or not.
+  // row contains null or not.
   private FilterNullComponent filterNullComponent;
 
   // The map from deviceName to corresponding query result node responsible for that device.
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
index 256b95498d..41686b0b06 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
@@ -67,14 +67,14 @@ public class TsBlock {
 
   private volatile long retainedSizeInBytes = -1;
 
-  public TsBlock(TimeColumn timeColumn, Column... valueColumns) {
-    this(true, determinePositionCount(valueColumns), timeColumn, valueColumns);
-  }
-
   public TsBlock(int positionCount) {
     this(false, positionCount, null, EMPTY_COLUMNS);
   }
 
+  public TsBlock(TimeColumn timeColumn, Column... valueColumns) {
+    this(true, determinePositionCount(valueColumns), timeColumn, valueColumns);
+  }
+
   public TsBlock(int positionCount, TimeColumn timeColumn, Column... valueColumns) {
     this(true, positionCount, timeColumn, valueColumns);
   }
@@ -152,7 +152,7 @@ public class TsBlock {
   }
 
   public TsBlock appendValueColumn(Column column) {
-    requireNonNull(column, "column is null");
+    requireNonNull(column, "Column is null");
     if (positionCount != column.getPositionCount()) {
       throw new IllegalArgumentException("Block does not have same position count");
     }
@@ -162,6 +162,18 @@ public class TsBlock {
     return wrapBlocksWithoutCopy(positionCount, timeColumn, newBlocks);
   }
 
+  public TsBlock insertValueColumn(int index, Column column) {
+    requireNonNull(column, "Column is null");
+    if (positionCount != column.getPositionCount()) {
+      throw new IllegalArgumentException("Block does not have same position count");
+    }
+
+    Column[] newBlocks = Arrays.copyOf(valueColumns, valueColumns.length + 1);
+    System.arraycopy(newBlocks, index, newBlocks, index + 1, valueColumns.length - index);
+    newBlocks[index] = column;
+    return wrapBlocksWithoutCopy(positionCount, timeColumn, newBlocks);
+  }
+
   public long getTimeByIndex(int index) {
     return timeColumn.getLong(index);
   }
@@ -183,6 +195,9 @@ public class TsBlock {
   }
 
   public TsBlockSingleColumnIterator getTsBlockSingleColumnIterator(int columnIndex) {
+    if (valueColumns[columnIndex] == null) {
+      throw new UnsupportedOperationException("Can not get the iterator of null columns");
+    }
     return new TsBlockSingleColumnIterator(0, columnIndex);
   }
 
@@ -305,7 +320,11 @@ public class TsBlock {
       int columnCount = getValueColumnCount();
       Object[] row = new Object[columnCount + 1];
       for (int i = 0; i < columnCount; ++i) {
-        row[i] = valueColumns[i].getObject(rowIndex);
+        if (valueColumns[i] == null) {
+          row[i] = null;
+        } else {
+          row[i] = valueColumns[i].getObject(rowIndex);
+        }
       }
       row[columnCount] = timeColumn.getObject(rowIndex);
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilderStatus.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilderStatus.java
index 86dca2ee50..b1cf4ab750 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilderStatus.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilderStatus.java
@@ -41,6 +41,10 @@ public class ColumnBuilderStatus {
         requireNonNull(tsBlockBuilderStatus, "tsBlockBuilderStatus must not be null");
   }
 
+  public ColumnBuilderStatus() {
+    this.tsBlockBuilderStatus = new TsBlockBuilderStatus();
+  }
+
   public int getMaxTsBlockSizeInBytes() {
     return tsBlockBuilderStatus.getMaxTsBlockSizeInBytes();
   }