You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/04 07:13:44 UTC

[iotdb] 02/04: part implementation

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch deviceMergeOperator1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 537dfc32ce3ff27e72dc9c17ffdbad6e22caf5b5
Author: Alima777 <wx...@gmail.com>
AuthorDate: Tue May 3 20:23:20 2022 +0800

    part implementation
---
 .../operator/process/DeviceMergeOperator.java      | 46 +++++++++++++++++++++-
 1 file changed, 45 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
index 4122015cb5..91fea13a9b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
 
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
 
 import java.util.List;
 
@@ -34,17 +37,58 @@ import java.util.List;
  * <p>The form of tsBlocks from input operators should be the same strictly, which is transferred by
  * DeviceViewOperator.
  */
-public class DeviceMergeOperator {
+public class DeviceMergeOperator implements ProcessOperator {
 
   private final OperatorContext operatorContext;
   // The size devices and deviceOperators should be the same.
   private final List<String> devices;
   private final List<Operator> deviceOperators;
 
+  private final TsBlock[] inputTsBlocks;
+  private final boolean[] noMoreTsBlocks;
+
   public DeviceMergeOperator(
       OperatorContext operatorContext, List<String> devices, List<Operator> deviceOperators) {
     this.operatorContext = operatorContext;
     this.devices = devices;
     this.deviceOperators = deviceOperators;
   }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<Void> isBlocked() {
+    for (int i = 0; i < inputCount; i++) {
+      if (!noMoreTsBlocks[i] && empty(i)) {
+        ListenableFuture<Void> blocked = children.get(i).isBlocked();
+        if (!blocked.isDone()) {
+          return blocked;
+        }
+      }
+    }
+    return NOT_BLOCKED;
+  }
+
+  @Override
+  public TsBlock next() {
+    return null;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return false;
+  }
+
+  @Override
+  public void close() throws Exception {
+    ProcessOperator.super.close();
+  }
+
+  @Override
+  public boolean isFinished() {
+    return false;
+  }
 }