You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/04/12 13:01:18 UTC
[iotdb] 02/03: TsBlockInputDataSet
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch udf-operator
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2e7d195563dde8bbffe68bd44e654cf603958cef
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Apr 12 19:58:59 2022 +0800
TsBlockInputDataSet
---
.../db/mpp/operator/source/SeriesScanUtil.java | 14 ++--
.../query/udf/core/layer/TsBlockInputDataSet.java | 64 ++++++++++++++++++
.../iotdb/tsfile/read/common/block/TsBlock.java | 78 +++++++++++++++++-----
.../read/common/block/column/BinaryColumn.java | 5 ++
.../read/common/block/column/BooleanColumn.java | 5 ++
.../tsfile/read/common/block/column/Column.java | 5 ++
.../read/common/block/column/DoubleColumn.java | 5 ++
.../read/common/block/column/FloatColumn.java | 5 ++
.../tsfile/read/common/block/column/IntColumn.java | 5 ++
.../read/common/block/column/LongColumn.java | 5 ++
.../block/column/RunLengthEncodedColumn.java | 6 ++
.../read/common/block/column/TimeColumn.java | 5 ++
12 files changed, 182 insertions(+), 20 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
index 80e0a67771..f65947a45d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
@@ -47,7 +47,13 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import java.io.IOException;
import java.io.Serializable;
-import java.util.*;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.Set;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
@@ -705,7 +711,7 @@ public class SeriesScanUtil {
mergeReader.addReader(
firstPageReader
.getAllSatisfiedPageData(orderUtils.getAscending())
- .getTsBlockIterator(),
+ .getTsBlockColumnIterator(),
firstPageReader.version,
orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()),
context);
@@ -732,7 +738,7 @@ public class SeriesScanUtil {
mergeReader.addReader(
pageReader
.getAllSatisfiedPageData(orderUtils.getAscending())
- .getTsBlockIterator(),
+ .getTsBlockColumnIterator(),
pageReader.version,
orderUtils.getOverlapCheckTime(pageReader.getStatistics()),
context);
@@ -913,7 +919,7 @@ public class SeriesScanUtil {
private void putPageReaderToMergeReader(VersionPageReader pageReader) throws IOException {
mergeReader.addReader(
- pageReader.getAllSatisfiedPageData(orderUtils.getAscending()).getTsBlockIterator(),
+ pageReader.getAllSatisfiedPageData(orderUtils.getAscending()).getTsBlockColumnIterator(),
pageReader.version,
orderUtils.getOverlapCheckTime(pageReader.getStatistics()),
context);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/TsBlockInputDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/TsBlockInputDataSet.java
new file mode 100644
index 0000000000..a12e5a0cab
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/TsBlockInputDataSet.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.layer;
+
+import org.apache.iotdb.db.mpp.operator.Operator;
+import org.apache.iotdb.db.query.dataset.IUDFInputDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockRowIterator;
+
+import java.util.List;
+
+public class TsBlockInputDataSet implements IUDFInputDataSet {
+
+ private final Operator operator;
+ private final List<TSDataType> dataTypes;
+
+ private TsBlockRowIterator tsBlockRowIterator;
+
+ public TsBlockInputDataSet(Operator operator, List<TSDataType> dataTypes) {
+ this.operator = operator;
+ this.dataTypes = dataTypes;
+ }
+
+ @Override
+ public List<TSDataType> getDataTypes() {
+ return dataTypes;
+ }
+
+ @Override
+ public boolean hasNextRowInObjects() {
+ if (tsBlockRowIterator != null && tsBlockRowIterator.hasNext()) {
+ return true;
+ }
+
+ if (!operator.hasNext()) {
+ return false;
+ }
+
+ tsBlockRowIterator = operator.next().getTsBlockRowIterator();
+ return true;
+ }
+
+ @Override
+ public Object[] nextRowInObjects() {
+ return tsBlockRowIterator.next();
+ }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
index 29a17a1a79..261c6337d7 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.openjdk.jol.info.ClassLayout;
import java.util.Arrays;
+import java.util.Iterator;
import static io.airlift.slice.SizeOf.sizeOf;
import static java.lang.String.format;
@@ -173,8 +174,16 @@ public class TsBlock {
return valueColumns[columnIndex];
}
- public TsBlockIterator getTsBlockIterator() {
- return new TsBlockIterator(0);
+ public TsBlockColumnIterator getTsBlockColumnIterator() {
+ return new TsBlockColumnIterator(0);
+ }
+
+ public TsBlockColumnIterator getTsBlockColumnIterator(int columnIndex) {
+ return new TsBlockColumnIterator(0, columnIndex);
+ }
+
+ public TsBlockRowIterator getTsBlockRowIterator() {
+ return new TsBlockRowIterator(0);
}
/** Only used for the batch data of vector time series. */
@@ -182,17 +191,24 @@ public class TsBlock {
return new AlignedTsBlockIterator(0, subIndex);
}
- private class TsBlockIterator implements IPointReader, IBatchDataIterator {
+ private class TsBlockColumnIterator implements IPointReader, IBatchDataIterator {
+
+ protected int rowIndex;
+ protected int columnIndex;
- protected int index;
+ public TsBlockColumnIterator(int rowIndex) {
+ this.rowIndex = rowIndex;
+ this.columnIndex = 0;
+ }
- public TsBlockIterator(int index) {
- this.index = index;
+ public TsBlockColumnIterator(int rowIndex, int columnIndex) {
+ this.rowIndex = rowIndex;
+ this.columnIndex = columnIndex;
}
@Override
public boolean hasNext() {
- return index < positionCount;
+ return rowIndex < positionCount;
}
@Override
@@ -202,22 +218,22 @@ public class TsBlock {
@Override
public void next() {
- index++;
+ rowIndex++;
}
@Override
public long currentTime() {
- return timeColumn.getLong(index);
+ return timeColumn.getLong(rowIndex);
}
@Override
public Object currentValue() {
- return valueColumns[0].getTsPrimitiveType(index).getValue();
+ return valueColumns[columnIndex].getTsPrimitiveType(rowIndex).getValue();
}
@Override
public void reset() {
- index = 0;
+ rowIndex = 0;
}
@Override
@@ -240,14 +256,44 @@ public class TsBlock {
@Override
public TimeValuePair currentTimeValuePair() {
return new TimeValuePair(
- timeColumn.getLong(index), valueColumns[0].getTsPrimitiveType(index));
+ timeColumn.getLong(rowIndex), valueColumns[columnIndex].getTsPrimitiveType(rowIndex));
}
@Override
public void close() {}
}
- private class AlignedTsBlockIterator extends TsBlockIterator {
+ public class TsBlockRowIterator implements Iterator<Object[]> {
+
+ protected int rowIndex;
+ protected int columnCount;
+
+ public TsBlockRowIterator(int rowIndex) {
+ this.rowIndex = rowIndex;
+ columnCount = getValueColumnCount();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return rowIndex < positionCount;
+ }
+
+ @Override
+ public Object[] next() {
+ int columnCount = getValueColumnCount();
+ Object[] row = new Object[columnCount + 1];
+ for (int i = 0; i < columnCount; ++i) {
+ row[i] = valueColumns[i].getObject(rowIndex);
+ }
+ row[columnCount] = timeColumn.getObject(rowIndex);
+
+ rowIndex++;
+
+ return row;
+ }
+ }
+
+ private class AlignedTsBlockIterator extends TsBlockColumnIterator {
private final int subIndex;
@@ -277,7 +323,7 @@ public class TsBlock {
@Override
public Object currentValue() {
- TsPrimitiveType v = valueColumns[subIndex].getTsPrimitiveType(index);
+ TsPrimitiveType v = valueColumns[subIndex].getTsPrimitiveType(rowIndex);
return v == null ? null : v.getValue();
}
@@ -286,12 +332,12 @@ public class TsBlock {
// aligned timeseries' BatchData length() may return the length of time column
// we need traverse to VectorBatchDataIterator calculate the actual value column's length
int cnt = 0;
- int indexSave = index;
+ int indexSave = rowIndex;
while (hasNext()) {
cnt++;
next();
}
- index = indexSave;
+ rowIndex = indexSave;
return cnt;
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
index 8828393a68..9d95c4edcc 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
@@ -74,6 +74,11 @@ public class BinaryColumn implements Column {
return values[position + arrayOffset];
}
+ @Override
+ public Object getObject(int position) {
+ return getBinary(position);
+ }
+
@Override
public TsPrimitiveType getTsPrimitiveType(int position) {
checkReadablePosition(position);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
index 54544d3650..66ef186690 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
@@ -73,6 +73,11 @@ public class BooleanColumn implements Column {
return values[position + arrayOffset];
}
+ @Override
+ public Object getObject(int position) {
+ return getBoolean(position);
+ }
+
@Override
public TsPrimitiveType getTsPrimitiveType(int position) {
checkReadablePosition(position);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java
index adc06a1f52..67e5d2f601 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java
@@ -53,6 +53,11 @@ public interface Column {
throw new UnsupportedOperationException(getClass().getName());
}
+ /** Gets an Object at {@code position}. */
+ default Object getObject(int position) {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
/** Gets a TsPrimitiveType at {@code position}. */
default TsPrimitiveType getTsPrimitiveType(int position) {
throw new UnsupportedOperationException(getClass().getName());
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
index 32809b02f6..7abfa05a8e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
@@ -73,6 +73,11 @@ public class DoubleColumn implements Column {
return values[position + arrayOffset];
}
+ @Override
+ public Object getObject(int position) {
+ return getDouble(position);
+ }
+
@Override
public TsPrimitiveType getTsPrimitiveType(int position) {
checkReadablePosition(position);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
index 51a2675dae..25bbe44fdf 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
@@ -72,6 +72,11 @@ public class FloatColumn implements Column {
return values[position + arrayOffset];
}
+ @Override
+ public Object getObject(int position) {
+ return getFloat(position);
+ }
+
@Override
public TsPrimitiveType getTsPrimitiveType(int position) {
checkReadablePosition(position);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
index 0d48dd2133..49d3357c1f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
@@ -72,6 +72,11 @@ public class IntColumn implements Column {
return values[position + arrayOffset];
}
+ @Override
+ public Object getObject(int position) {
+ return getInt(position);
+ }
+
@Override
public TsPrimitiveType getTsPrimitiveType(int position) {
checkReadablePosition(position);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
index 345e71d5bc..e3838d77e7 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
@@ -72,6 +72,11 @@ public class LongColumn implements Column {
return values[position + arrayOffset];
}
+ @Override
+ public Object getObject(int position) {
+ return getLong(position);
+ }
+
@Override
public TsPrimitiveType getTsPrimitiveType(int position) {
checkReadablePosition(position);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
index 39775002dd..e60d7ebc66 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
@@ -97,6 +97,12 @@ public class RunLengthEncodedColumn implements Column {
return value.getBinary(position);
}
+ @Override
+ public Object getObject(int position) {
+ checkReadablePosition(position);
+ return value.getObject(position);
+ }
+
@Override
public TsPrimitiveType getTsPrimitiveType(int position) {
checkReadablePosition(position);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
index c46fbd2ea3..e80b670f84 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
@@ -62,6 +62,11 @@ public class TimeColumn implements Column {
return values[position + arrayOffset];
}
+ @Override
+ public Object getObject(int position) {
+ return getLong(position);
+ }
+
@Override
public boolean isNull(int position) {
return false;