You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/04/12 13:01:18 UTC

[iotdb] 02/03: TsBlockInputDataSet

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch udf-operator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2e7d195563dde8bbffe68bd44e654cf603958cef
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Apr 12 19:58:59 2022 +0800

    TsBlockInputDataSet
---
 .../db/mpp/operator/source/SeriesScanUtil.java     | 14 ++--
 .../query/udf/core/layer/TsBlockInputDataSet.java  | 64 ++++++++++++++++++
 .../iotdb/tsfile/read/common/block/TsBlock.java    | 78 +++++++++++++++++-----
 .../read/common/block/column/BinaryColumn.java     |  5 ++
 .../read/common/block/column/BooleanColumn.java    |  5 ++
 .../tsfile/read/common/block/column/Column.java    |  5 ++
 .../read/common/block/column/DoubleColumn.java     |  5 ++
 .../read/common/block/column/FloatColumn.java      |  5 ++
 .../tsfile/read/common/block/column/IntColumn.java |  5 ++
 .../read/common/block/column/LongColumn.java       |  5 ++
 .../block/column/RunLengthEncodedColumn.java       |  6 ++
 .../read/common/block/column/TimeColumn.java       |  5 ++
 12 files changed, 182 insertions(+), 20 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
index 80e0a67771..f65947a45d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesScanUtil.java
@@ -47,7 +47,13 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.*;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.Set;
 import java.util.function.ToLongFunction;
 import java.util.stream.Collectors;
 
@@ -705,7 +711,7 @@ public class SeriesScanUtil {
               mergeReader.addReader(
                   firstPageReader
                       .getAllSatisfiedPageData(orderUtils.getAscending())
-                      .getTsBlockIterator(),
+                      .getTsBlockColumnIterator(),
                   firstPageReader.version,
                   orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()),
                   context);
@@ -732,7 +738,7 @@ public class SeriesScanUtil {
               mergeReader.addReader(
                   pageReader
                       .getAllSatisfiedPageData(orderUtils.getAscending())
-                      .getTsBlockIterator(),
+                      .getTsBlockColumnIterator(),
                   pageReader.version,
                   orderUtils.getOverlapCheckTime(pageReader.getStatistics()),
                   context);
@@ -913,7 +919,7 @@ public class SeriesScanUtil {
 
   private void putPageReaderToMergeReader(VersionPageReader pageReader) throws IOException {
     mergeReader.addReader(
-        pageReader.getAllSatisfiedPageData(orderUtils.getAscending()).getTsBlockIterator(),
+        pageReader.getAllSatisfiedPageData(orderUtils.getAscending()).getTsBlockColumnIterator(),
         pageReader.version,
         orderUtils.getOverlapCheckTime(pageReader.getStatistics()),
         context);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/TsBlockInputDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/TsBlockInputDataSet.java
new file mode 100644
index 0000000000..a12e5a0cab
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/TsBlockInputDataSet.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.core.layer;
+
+import org.apache.iotdb.db.mpp.operator.Operator;
+import org.apache.iotdb.db.query.dataset.IUDFInputDataSet;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock.TsBlockRowIterator;
+
+import java.util.List;
+
+public class TsBlockInputDataSet implements IUDFInputDataSet {
+
+  private final Operator operator;
+  private final List<TSDataType> dataTypes;
+
+  private TsBlockRowIterator tsBlockRowIterator;
+
+  public TsBlockInputDataSet(Operator operator, List<TSDataType> dataTypes) {
+    this.operator = operator;
+    this.dataTypes = dataTypes;
+  }
+
+  @Override
+  public List<TSDataType> getDataTypes() {
+    return dataTypes;
+  }
+
+  @Override
+  public boolean hasNextRowInObjects() {
+    if (tsBlockRowIterator != null && tsBlockRowIterator.hasNext()) {
+      return true;
+    }
+
+    if (!operator.hasNext()) {
+      return false;
+    }
+
+    tsBlockRowIterator = operator.next().getTsBlockRowIterator();
+    return true;
+  }
+
+  @Override
+  public Object[] nextRowInObjects() {
+    return tsBlockRowIterator.next();
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
index 29a17a1a79..261c6337d7 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 import org.openjdk.jol.info.ClassLayout;
 
 import java.util.Arrays;
+import java.util.Iterator;
 
 import static io.airlift.slice.SizeOf.sizeOf;
 import static java.lang.String.format;
@@ -173,8 +174,16 @@ public class TsBlock {
     return valueColumns[columnIndex];
   }
 
-  public TsBlockIterator getTsBlockIterator() {
-    return new TsBlockIterator(0);
+  public TsBlockColumnIterator getTsBlockColumnIterator() {
+    return new TsBlockColumnIterator(0);
+  }
+
+  public TsBlockColumnIterator getTsBlockColumnIterator(int columnIndex) {
+    return new TsBlockColumnIterator(0, columnIndex);
+  }
+
+  public TsBlockRowIterator getTsBlockRowIterator() {
+    return new TsBlockRowIterator(0);
   }
 
   /** Only used for the batch data of vector time series. */
@@ -182,17 +191,24 @@ public class TsBlock {
     return new AlignedTsBlockIterator(0, subIndex);
   }
 
-  private class TsBlockIterator implements IPointReader, IBatchDataIterator {
+  private class TsBlockColumnIterator implements IPointReader, IBatchDataIterator {
+
+    protected int rowIndex;
+    protected int columnIndex;
 
-    protected int index;
+    public TsBlockColumnIterator(int rowIndex) {
+      this.rowIndex = rowIndex;
+      this.columnIndex = 0;
+    }
 
-    public TsBlockIterator(int index) {
-      this.index = index;
+    public TsBlockColumnIterator(int rowIndex, int columnIndex) {
+      this.rowIndex = rowIndex;
+      this.columnIndex = columnIndex;
     }
 
     @Override
     public boolean hasNext() {
-      return index < positionCount;
+      return rowIndex < positionCount;
     }
 
     @Override
@@ -202,22 +218,22 @@ public class TsBlock {
 
     @Override
     public void next() {
-      index++;
+      rowIndex++;
     }
 
     @Override
     public long currentTime() {
-      return timeColumn.getLong(index);
+      return timeColumn.getLong(rowIndex);
     }
 
     @Override
     public Object currentValue() {
-      return valueColumns[0].getTsPrimitiveType(index).getValue();
+      return valueColumns[columnIndex].getTsPrimitiveType(rowIndex).getValue();
     }
 
     @Override
     public void reset() {
-      index = 0;
+      rowIndex = 0;
     }
 
     @Override
@@ -240,14 +256,44 @@ public class TsBlock {
     @Override
     public TimeValuePair currentTimeValuePair() {
       return new TimeValuePair(
-          timeColumn.getLong(index), valueColumns[0].getTsPrimitiveType(index));
+          timeColumn.getLong(rowIndex), valueColumns[columnIndex].getTsPrimitiveType(rowIndex));
     }
 
     @Override
     public void close() {}
   }
 
-  private class AlignedTsBlockIterator extends TsBlockIterator {
+  public class TsBlockRowIterator implements Iterator<Object[]> {
+
+    protected int rowIndex;
+    protected int columnCount;
+
+    public TsBlockRowIterator(int rowIndex) {
+      this.rowIndex = rowIndex;
+      columnCount = getValueColumnCount();
+    }
+
+    @Override
+    public boolean hasNext() {
+      return rowIndex < positionCount;
+    }
+
+    @Override
+    public Object[] next() {
+      int columnCount = getValueColumnCount();
+      Object[] row = new Object[columnCount + 1];
+      for (int i = 0; i < columnCount; ++i) {
+        row[i] = valueColumns[i].getObject(rowIndex);
+      }
+      row[columnCount] = timeColumn.getObject(rowIndex);
+
+      rowIndex++;
+
+      return row;
+    }
+  }
+
+  private class AlignedTsBlockIterator extends TsBlockColumnIterator {
 
     private final int subIndex;
 
@@ -277,7 +323,7 @@ public class TsBlock {
 
     @Override
     public Object currentValue() {
-      TsPrimitiveType v = valueColumns[subIndex].getTsPrimitiveType(index);
+      TsPrimitiveType v = valueColumns[subIndex].getTsPrimitiveType(rowIndex);
       return v == null ? null : v.getValue();
     }
 
@@ -286,12 +332,12 @@ public class TsBlock {
       // aligned timeseries' BatchData length() may return the length of time column
       // we need traverse to VectorBatchDataIterator calculate the actual value column's length
       int cnt = 0;
-      int indexSave = index;
+      int indexSave = rowIndex;
       while (hasNext()) {
         cnt++;
         next();
       }
-      index = indexSave;
+      rowIndex = indexSave;
       return cnt;
     }
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
index 8828393a68..9d95c4edcc 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BinaryColumn.java
@@ -74,6 +74,11 @@ public class BinaryColumn implements Column {
     return values[position + arrayOffset];
   }
 
+  @Override
+  public Object getObject(int position) {
+    return getBinary(position);
+  }
+
   @Override
   public TsPrimitiveType getTsPrimitiveType(int position) {
     checkReadablePosition(position);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
index 54544d3650..66ef186690 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/BooleanColumn.java
@@ -73,6 +73,11 @@ public class BooleanColumn implements Column {
     return values[position + arrayOffset];
   }
 
+  @Override
+  public Object getObject(int position) {
+    return getBoolean(position);
+  }
+
   @Override
   public TsPrimitiveType getTsPrimitiveType(int position) {
     checkReadablePosition(position);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java
index adc06a1f52..67e5d2f601 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/Column.java
@@ -53,6 +53,11 @@ public interface Column {
     throw new UnsupportedOperationException(getClass().getName());
   }
 
+  /** Gets an Object at {@code position}. */
+  default Object getObject(int position) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
   /** Gets a TsPrimitiveType at {@code position}. */
   default TsPrimitiveType getTsPrimitiveType(int position) {
     throw new UnsupportedOperationException(getClass().getName());
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
index 32809b02f6..7abfa05a8e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/DoubleColumn.java
@@ -73,6 +73,11 @@ public class DoubleColumn implements Column {
     return values[position + arrayOffset];
   }
 
+  @Override
+  public Object getObject(int position) {
+    return getDouble(position);
+  }
+
   @Override
   public TsPrimitiveType getTsPrimitiveType(int position) {
     checkReadablePosition(position);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
index 51a2675dae..25bbe44fdf 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/FloatColumn.java
@@ -72,6 +72,11 @@ public class FloatColumn implements Column {
     return values[position + arrayOffset];
   }
 
+  @Override
+  public Object getObject(int position) {
+    return getFloat(position);
+  }
+
   @Override
   public TsPrimitiveType getTsPrimitiveType(int position) {
     checkReadablePosition(position);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
index 0d48dd2133..49d3357c1f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/IntColumn.java
@@ -72,6 +72,11 @@ public class IntColumn implements Column {
     return values[position + arrayOffset];
   }
 
+  @Override
+  public Object getObject(int position) {
+    return getInt(position);
+  }
+
   @Override
   public TsPrimitiveType getTsPrimitiveType(int position) {
     checkReadablePosition(position);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
index 345e71d5bc..e3838d77e7 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/LongColumn.java
@@ -72,6 +72,11 @@ public class LongColumn implements Column {
     return values[position + arrayOffset];
   }
 
+  @Override
+  public Object getObject(int position) {
+    return getLong(position);
+  }
+
   @Override
   public TsPrimitiveType getTsPrimitiveType(int position) {
     checkReadablePosition(position);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
index 39775002dd..e60d7ebc66 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/RunLengthEncodedColumn.java
@@ -97,6 +97,12 @@ public class RunLengthEncodedColumn implements Column {
     return value.getBinary(position);
   }
 
+  @Override
+  public Object getObject(int position) {
+    checkReadablePosition(position);
+    return value.getObject(position);
+  }
+
   @Override
   public TsPrimitiveType getTsPrimitiveType(int position) {
     checkReadablePosition(position);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
index c46fbd2ea3..e80b670f84 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/TimeColumn.java
@@ -62,6 +62,11 @@ public class TimeColumn implements Column {
     return values[position + arrayOffset];
   }
 
+  @Override
+  public Object getObject(int position) {
+    return getLong(position);
+  }
+
   @Override
   public boolean isNull(int position) {
     return false;