You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/03/25 02:47:09 UTC

[iotdb] 01/03: implement TimeJoinOperator

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c922cfe35f137270bcaa46d1ab5fcb05637fa0e3
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Wed Mar 23 09:10:05 2022 +0800

    implement TimeJoinOperator
---
 .../TimeJoinOperator.java => common/Column.java}   |  35 +------
 .../TimeColumn.java}                               |  36 +------
 .../org/apache/iotdb/db/mpp/common/TsBlock.java    |  53 ++++++++++
 .../iotdb/db/mpp/common/TsBlockMetadata.java       |   6 +-
 .../db/mpp/operator/process/LimitOperator.java     |   2 +-
 .../db/mpp/operator/process/TimeJoinOperator.java  | 107 ++++++++++++++++++++-
 6 files changed, 166 insertions(+), 73 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/Column.java
similarity index 51%
copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/common/Column.java
index 11cee59..edad3c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/Column.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,37 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.operator.process;
+package org.apache.iotdb.db.mpp.common;
 
-import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.operator.OperatorContext;
+public interface Column {
 
-import com.google.common.util.concurrent.ListenableFuture;
-
-public class TimeJoinOperator implements ProcessOperator {
-
-  @Override
-  public OperatorContext getOperatorContext() {
-    return null;
-  }
-
-  @Override
-  public ListenableFuture<Void> isBlocked() {
-    return ProcessOperator.super.isBlocked();
-  }
-
-  @Override
-  public TsBlock next() {
-    return null;
-  }
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public void close() throws Exception {
-    ProcessOperator.super.close();
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/TimeColumn.java
similarity index 51%
copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/common/TimeColumn.java
index 11cee59..3b05e1c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/TimeColumn.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,37 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.operator.process;
+package org.apache.iotdb.db.mpp.common;
 
-import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.operator.OperatorContext;
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-public class TimeJoinOperator implements ProcessOperator {
-
-  @Override
-  public OperatorContext getOperatorContext() {
-    return null;
-  }
-
-  @Override
-  public ListenableFuture<Void> isBlocked() {
-    return ProcessOperator.super.isBlocked();
-  }
-
-  @Override
-  public TsBlock next() {
-    return null;
-  }
-
-  @Override
-  public boolean hasNext() {
-    return false;
-  }
-
-  @Override
-  public void close() throws Exception {
-    ProcessOperator.super.close();
-  }
+public class TimeColumn implements Column {
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java
index 7ea49ca..6429c45 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java
@@ -35,8 +35,22 @@ public class TsBlock {
   // Describe the column info
   private TsBlockMetadata metadata;
 
+  private TimeColumn timeColumn;
+
+  private Column[] valueColumns;
+
   private int count;
 
+  public TsBlock() {
+  }
+
+  public TsBlock(int columnCount) {
+    timeColumn = new TimeColumn();
+    valueColumns = new Column[columnCount];
+  }
+
+
+
   public boolean hasNext() {
     return false;
   }
@@ -55,6 +69,17 @@ public class TsBlock {
   }
 
   /**
+   * TODO need to be implemented after the data structure being defined
+   */
+  public long getEndTime() {
+    return -1;
+  }
+
+  public boolean isEmpty() {
+    return count == 0;
+  }
+
+  /**
    * TODO has not been implemented yet
    *
    * @param positionOffset start offset
@@ -70,4 +95,32 @@ public class TsBlock {
     }
     return this;
   }
+
+  /**
+   * TODO need to be implemented after the data structure being defined
+   */
+  public long getTimeByIndex(int index) {
+    return -1;
+  }
+
+  public void addTime(long time) {
+
+  }
+
+  public int getValueColumnCount() {
+    return valueColumns.length;
+  }
+
+  public TimeColumn getTimeColumn() {
+    return timeColumn;
+  }
+
+  public Column getColumn(int columnIndex) {
+    return valueColumns[columnIndex];
+  }
+
+  public int addValues(int columnIndex, TimeColumn timeColumn, Column valueColumn, int rowIndex, long endTime) {
+
+    return rowIndex;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlockMetadata.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlockMetadata.java
index d8e480c..443c236 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlockMetadata.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlockMetadata.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.mpp.common;
 import java.util.List;
 
 public class TsBlockMetadata {
-  // list of all columns in current Tablet
+  // list of all columns in current TsBlock
   // The column list not only contains the series column, but also contains other column to
   // construct the final result
   // set such as timestamp and deviceName
@@ -29,9 +29,9 @@ public class TsBlockMetadata {
 
   // Indicate whether the result set should be aligned by device. This parameter can be used for
   // downstream operators
-  // when processing data from current Tablet. The RowRecord produced by Tablet with
+  // when processing data from current TsBlock. The RowRecord produced by TsBlock with
   // `alignedByDevice = true` will contain
-  // n + 1 fields which are n series field and 1 deviceName field.
+  // n + 2 fields which are n series field, 1 deviceName field and 1 timestamp.
   // For example, when the FilterOperator execute the filter operation, it may need the deviceName
   // field when matching
   // the series with corresponding column in Tablet
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
index 1ebca2c..e6e9e61 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
@@ -70,6 +70,6 @@ public class LimitOperator implements ProcessOperator {
 
   @Override
   public void close() throws Exception {
-    ProcessOperator.super.close();
+    child.close();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
index 11cee59..0f306b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
@@ -18,35 +18,134 @@
  */
 package org.apache.iotdb.db.mpp.operator.process;
 
+import org.apache.iotdb.db.mpp.common.TimeColumn;
 import org.apache.iotdb.db.mpp.common.TsBlock;
+import org.apache.iotdb.db.mpp.operator.Operator;
 import org.apache.iotdb.db.mpp.operator.OperatorContext;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+
+import java.util.List;
 
 public class TimeJoinOperator implements ProcessOperator {
 
+  private final OperatorContext operatorContext;
+
+  private final List<Operator> children;
+
+  private final int inputCount;
+
+  private final TsBlock[] inputTsBlocks;
+
+  private final int[] inputIndex;
+
+  private final boolean[] noMoreTsBlocks;
+
+  private final TimeSelector timeSelector;
+
+  private final int columnCount;
+
+
+  public TimeJoinOperator(OperatorContext operatorContext, List<Operator> children, OrderBy mergeOrder, int columnCount) {
+    this.operatorContext = operatorContext;
+    this.children = children;
+    this.inputCount = children.size();
+    this.inputTsBlocks = new TsBlock[this.inputCount];
+    this.inputIndex = new int[this.inputCount];
+    this.noMoreTsBlocks = new boolean[this.inputCount];
+    this.timeSelector = new TimeSelector(this.inputCount << 1, OrderBy.TIMESTAMP_ASC == mergeOrder);
+    this.columnCount = columnCount;
+  }
+
   @Override
   public OperatorContext getOperatorContext() {
-    return null;
+    return operatorContext;
   }
 
   @Override
   public ListenableFuture<Void> isBlocked() {
-    return ProcessOperator.super.isBlocked();
+    for (int i = 0; i < inputCount; i++) {
+      if (!noMoreTsBlocks[i] && empty(i)) {
+        ListenableFuture<Void> blocked = children.get(i).isBlocked();
+        if (!blocked.isDone()) {
+          return blocked;
+        }
+      }
+    }
+    return NOT_BLOCKED;
   }
 
   @Override
   public TsBlock next() {
-    return null;
+    // end time for returned TsBlock this time, it's the min end time among all the children TsBlocks
+    long currentEndTime = 0;
+    boolean init = false;
+    for (int i = 0; i < inputCount; i++) {
+      if (!noMoreTsBlocks[i] && empty(i)) {
+        inputIndex[i] = 0;
+        inputTsBlocks[i] = children.get(i).next();
+        if (!empty(i)) {
+          int rowSize = inputTsBlocks[i].getCount();
+          for (int row = 0; row < rowSize; row++) {
+            timeSelector.add(inputTsBlocks[i].getTimeByIndex(row));
+          }
+        }
+      }
+      // update the currentEndTime if the TsBlock is not empty
+      if (!empty(i)) {
+        currentEndTime = init ? Math.min(currentEndTime, inputTsBlocks[i].getEndTime()) : inputTsBlocks[i].getEndTime();
+        init = true;
+      }
+    }
+
+    if (timeSelector.isEmpty()) {
+      // TODO need to discuss whether to return null or return an empty TSBlock with TsBlockMetadata
+      return null;
+    }
+
+    TsBlock res = new TsBlock(columnCount);
+    while (!timeSelector.isEmpty() && timeSelector.first() <= currentEndTime) {
+      res.addTime(timeSelector.pollFirst());
+    }
+
+    for (int i = 0, column = 0; i < inputCount; i++) {
+      TsBlock block = inputTsBlocks[i];
+      TimeColumn timeColumn = block.getTimeColumn();
+      int valueColumnCount = block.getValueColumnCount();
+      int startIndex = inputIndex[i];
+      for (int j = 0; j < valueColumnCount; j++) {
+        inputIndex[i] = res.addValues(column++, timeColumn, block.getColumn(j), startIndex, currentEndTime);
+      }
+    }
+    return res;
   }
 
   @Override
   public boolean hasNext() {
+    for (int i = 0; i < inputCount; i++) {
+      if (!empty(i)) {
+        return true;
+      } else if (!noMoreTsBlocks[i]) {
+        if (children.get(i).hasNext()) {
+          return true;
+        } else {
+          noMoreTsBlocks[i] = true;
+        }
+      }
+    }
     return false;
   }
 
   @Override
   public void close() throws Exception {
-    ProcessOperator.super.close();
+    for (Operator child : children) {
+      child.close();
+    }
+  }
+
+  private boolean empty(int columnIndex) {
+    return inputTsBlocks[columnIndex] == null || inputTsBlocks[columnIndex].getCount() == inputIndex[columnIndex];
   }
 }