You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/03/25 02:47:09 UTC
[iotdb] 01/03: implement TimeJoinOperator
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c922cfe35f137270bcaa46d1ab5fcb05637fa0e3
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Wed Mar 23 09:10:05 2022 +0800
implement TimeJoinOperator
---
.../TimeJoinOperator.java => common/Column.java} | 35 +------
.../TimeColumn.java} | 36 +------
.../org/apache/iotdb/db/mpp/common/TsBlock.java | 53 ++++++++++
.../iotdb/db/mpp/common/TsBlockMetadata.java | 6 +-
.../db/mpp/operator/process/LimitOperator.java | 2 +-
.../db/mpp/operator/process/TimeJoinOperator.java | 107 ++++++++++++++++++++-
6 files changed, 166 insertions(+), 73 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/Column.java
similarity index 51%
copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/common/Column.java
index 11cee59..edad3c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/Column.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -16,37 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.operator.process;
+package org.apache.iotdb.db.mpp.common;
-import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.operator.OperatorContext;
+public interface Column {
-import com.google.common.util.concurrent.ListenableFuture;
-
-public class TimeJoinOperator implements ProcessOperator {
-
- @Override
- public OperatorContext getOperatorContext() {
- return null;
- }
-
- @Override
- public ListenableFuture<Void> isBlocked() {
- return ProcessOperator.super.isBlocked();
- }
-
- @Override
- public TsBlock next() {
- return null;
- }
-
- @Override
- public boolean hasNext() {
- return false;
- }
-
- @Override
- public void close() throws Exception {
- ProcessOperator.super.close();
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/TimeColumn.java
similarity index 51%
copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/common/TimeColumn.java
index 11cee59..3b05e1c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/TimeColumn.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -16,37 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.mpp.operator.process;
+package org.apache.iotdb.db.mpp.common;
-import org.apache.iotdb.db.mpp.common.TsBlock;
-import org.apache.iotdb.db.mpp.operator.OperatorContext;
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-public class TimeJoinOperator implements ProcessOperator {
-
- @Override
- public OperatorContext getOperatorContext() {
- return null;
- }
-
- @Override
- public ListenableFuture<Void> isBlocked() {
- return ProcessOperator.super.isBlocked();
- }
-
- @Override
- public TsBlock next() {
- return null;
- }
-
- @Override
- public boolean hasNext() {
- return false;
- }
-
- @Override
- public void close() throws Exception {
- ProcessOperator.super.close();
- }
+public class TimeColumn implements Column {
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java
index 7ea49ca..6429c45 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlock.java
@@ -35,8 +35,22 @@ public class TsBlock {
// Describe the column info
private TsBlockMetadata metadata;
+ private TimeColumn timeColumn;
+
+ private Column[] valueColumns;
+
private int count;
+ public TsBlock() {
+ }
+
+ public TsBlock(int columnCount) {
+ timeColumn = new TimeColumn();
+ valueColumns = new Column[columnCount];
+ }
+
+
+
public boolean hasNext() {
return false;
}
@@ -55,6 +69,17 @@ public class TsBlock {
}
/**
+ * TODO need to be implemented after the data structure being defined
+ */
+ public long getEndTime() {
+ return -1;
+ }
+
+ public boolean isEmpty() {
+ return count == 0;
+ }
+
+ /**
* TODO has not been implemented yet
*
* @param positionOffset start offset
@@ -70,4 +95,32 @@ public class TsBlock {
}
return this;
}
+
+ /**
+ * TODO need to be implemented after the data structure being defined
+ */
+ public long getTimeByIndex(int index) {
+ return -1;
+ }
+
+ public void addTime(long time) {
+
+ }
+
+ public int getValueColumnCount() {
+ return valueColumns.length;
+ }
+
+ public TimeColumn getTimeColumn() {
+ return timeColumn;
+ }
+
+ public Column getColumn(int columnIndex) {
+ return valueColumns[columnIndex];
+ }
+
+ public int addValues(int columnIndex, TimeColumn timeColumn, Column valueColumn, int rowIndex, long endTime) {
+
+ return rowIndex;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlockMetadata.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlockMetadata.java
index d8e480c..443c236 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlockMetadata.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/TsBlockMetadata.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.mpp.common;
import java.util.List;
public class TsBlockMetadata {
- // list of all columns in current Tablet
+ // list of all columns in current TsBlock
// The column list not only contains the series column, but also contains other column to
// construct the final result
// set such as timestamp and deviceName
@@ -29,9 +29,9 @@ public class TsBlockMetadata {
// Indicate whether the result set should be aligned by device. This parameter can be used for
// downstream operators
- // when processing data from current Tablet. The RowRecord produced by Tablet with
+ // when processing data from current TsBlock. The RowRecord produced by TsBlock with
// `alignedByDevice = true` will contain
- // n + 1 fields which are n series field and 1 deviceName field.
+ // n + 2 fields which are n series field, 1 deviceName field and 1 timestamp.
// For example, when the FilterOperator execute the filter operation, it may need the deviceName
// field when matching
// the series with corresponding column in Tablet
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
index 1ebca2c..e6e9e61 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/LimitOperator.java
@@ -70,6 +70,6 @@ public class LimitOperator implements ProcessOperator {
@Override
public void close() throws Exception {
- ProcessOperator.super.close();
+ child.close();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
index 11cee59..0f306b5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/process/TimeJoinOperator.java
@@ -18,35 +18,134 @@
*/
package org.apache.iotdb.db.mpp.operator.process;
+import org.apache.iotdb.db.mpp.common.TimeColumn;
import org.apache.iotdb.db.mpp.common.TsBlock;
+import org.apache.iotdb.db.mpp.operator.Operator;
import org.apache.iotdb.db.mpp.operator.OperatorContext;
import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.db.utils.datastructure.TimeSelector;
+
+import java.util.List;
public class TimeJoinOperator implements ProcessOperator {
+ private final OperatorContext operatorContext;
+
+ private final List<Operator> children;
+
+ private final int inputCount;
+
+ private final TsBlock[] inputTsBlocks;
+
+ private final int[] inputIndex;
+
+ private final boolean[] noMoreTsBlocks;
+
+ private final TimeSelector timeSelector;
+
+ private final int columnCount;
+
+
+ public TimeJoinOperator(OperatorContext operatorContext, List<Operator> children, OrderBy mergeOrder, int columnCount) {
+ this.operatorContext = operatorContext;
+ this.children = children;
+ this.inputCount = children.size();
+ this.inputTsBlocks = new TsBlock[this.inputCount];
+ this.inputIndex = new int[this.inputCount];
+ this.noMoreTsBlocks = new boolean[this.inputCount];
+ this.timeSelector = new TimeSelector(this.inputCount << 1, OrderBy.TIMESTAMP_ASC == mergeOrder);
+ this.columnCount = columnCount;
+ }
+
@Override
public OperatorContext getOperatorContext() {
- return null;
+ return operatorContext;
}
@Override
public ListenableFuture<Void> isBlocked() {
- return ProcessOperator.super.isBlocked();
+ for (int i = 0; i < inputCount; i++) {
+ if (!noMoreTsBlocks[i] && empty(i)) {
+ ListenableFuture<Void> blocked = children.get(i).isBlocked();
+ if (!blocked.isDone()) {
+ return blocked;
+ }
+ }
+ }
+ return NOT_BLOCKED;
}
@Override
public TsBlock next() {
- return null;
+ // end time for returned TsBlock this time, it's the min end time among all the children TsBlocks
+ long currentEndTime = 0;
+ boolean init = false;
+ for (int i = 0; i < inputCount; i++) {
+ if (!noMoreTsBlocks[i] && empty(i)) {
+ inputIndex[i] = 0;
+ inputTsBlocks[i] = children.get(i).next();
+ if (!empty(i)) {
+ int rowSize = inputTsBlocks[i].getCount();
+ for (int row = 0; row < rowSize; row++) {
+ timeSelector.add(inputTsBlocks[i].getTimeByIndex(row));
+ }
+ }
+ }
+ // update the currentEndTime if the TsBlock is not empty
+ if (!empty(i)) {
+ currentEndTime = init ? Math.min(currentEndTime, inputTsBlocks[i].getEndTime()) : inputTsBlocks[i].getEndTime();
+ init = true;
+ }
+ }
+
+ if (timeSelector.isEmpty()) {
+ // TODO need to discuss whether to return null or return an empty TSBlock with TsBlockMetadata
+ return null;
+ }
+
+ TsBlock res = new TsBlock(columnCount);
+ while (!timeSelector.isEmpty() && timeSelector.first() <= currentEndTime) {
+ res.addTime(timeSelector.pollFirst());
+ }
+
+ for (int i = 0, column = 0; i < inputCount; i++) {
+ TsBlock block = inputTsBlocks[i];
+ TimeColumn timeColumn = block.getTimeColumn();
+ int valueColumnCount = block.getValueColumnCount();
+ int startIndex = inputIndex[i];
+ for (int j = 0; j < valueColumnCount; j++) {
+ inputIndex[i] = res.addValues(column++, timeColumn, block.getColumn(j), startIndex, currentEndTime);
+ }
+ }
+ return res;
}
@Override
public boolean hasNext() {
+ for (int i = 0; i < inputCount; i++) {
+ if (!empty(i)) {
+ return true;
+ } else if (!noMoreTsBlocks[i]) {
+ if (children.get(i).hasNext()) {
+ return true;
+ } else {
+ noMoreTsBlocks[i] = true;
+ }
+ }
+ }
return false;
}
@Override
public void close() throws Exception {
- ProcessOperator.super.close();
+ for (Operator child : children) {
+ child.close();
+ }
+ }
+
+ private boolean empty(int columnIndex) {
+ return inputTsBlocks[columnIndex] == null || inputTsBlocks[columnIndex].getCount() == inputIndex[columnIndex];
}
}