You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/05/04 07:13:44 UTC
[iotdb] 02/04: part implementation
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch deviceMergeOperator1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 537dfc32ce3ff27e72dc9c17ffdbad6e22caf5b5
Author: Alima777 <wx...@gmail.com>
AuthorDate: Tue May 3 20:23:20 2022 +0800
part implementation
---
.../operator/process/DeviceMergeOperator.java | 46 +++++++++++++++++++++-
1 file changed, 45 insertions(+), 1 deletion(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
index 4122015cb5..91fea13a9b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceMergeOperator.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
import org.apache.iotdb.db.mpp.execution.operator.Operator;
import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
@@ -34,17 +37,58 @@ import java.util.List;
* <p>The form of tsBlocks from input operators should be the same strictly, which is transferred by
* DeviceViewOperator.
*/
-public class DeviceMergeOperator {
+public class DeviceMergeOperator implements ProcessOperator {
private final OperatorContext operatorContext;
// The size devices and deviceOperators should be the same.
private final List<String> devices;
private final List<Operator> deviceOperators;
+ private final TsBlock[] inputTsBlocks;
+ private final boolean[] noMoreTsBlocks;
+
public DeviceMergeOperator(
OperatorContext operatorContext, List<String> devices, List<Operator> deviceOperators) {
this.operatorContext = operatorContext;
this.devices = devices;
this.deviceOperators = deviceOperators;
}
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public ListenableFuture<Void> isBlocked() {
+ for (int i = 0; i < inputCount; i++) {
+ if (!noMoreTsBlocks[i] && empty(i)) {
+ ListenableFuture<Void> blocked = children.get(i).isBlocked();
+ if (!blocked.isDone()) {
+ return blocked;
+ }
+ }
+ }
+ return NOT_BLOCKED;
+ }
+
+ @Override
+ public TsBlock next() {
+ return null;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return false;
+ }
+
+ @Override
+ public void close() throws Exception {
+ ProcessOperator.super.close();
+ }
+
+ @Override
+ public boolean isFinished() {
+ return false;
+ }
}