You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/09/06 09:09:51 UTC

[iotdb] 01/04: SingleInputSingleOutputIntermediateLayer: constructPointReader & constructRowReader & constructRowSlidingSizeWindowReader

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch nested-operations
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 025fd00aa3766e50e0baf0717b53893abf105375
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon Sep 6 10:37:12 2021 +0800

    SingleInputSingleOutputIntermediateLayer: constructPointReader & constructRowReader & constructRowSlidingSizeWindowReader
---
 ...SerializableTVListBackedSingleColumnWindow.java |   4 +-
 .../db/query/udf/core/layer/IntermediateLayer.java |   8 +-
 .../MultiInputMultiOutputIntermediateLayer.java    |  27 +++-
 .../MultiInputSingleOutputIntermediateLayer.java   |  27 +++-
 .../SingleInputMultiOutputIntermediateLayer.java   |  29 +++-
 .../SingleInputSingleOutputIntermediateLayer.java  | 166 ++++++++++++++++++++-
 .../iotdb/db/query/udf/core/layer/UDFLayer.java    |  28 ++--
 .../db/query/udf/core/reader/LayerRowReader.java   |   2 +-
 8 files changed, 262 insertions(+), 29 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java
index f91dd35..4157f90 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/access/ElasticSerializableTVListBackedSingleColumnWindow.java
@@ -76,14 +76,12 @@ public class ElasticSerializableTVListBackedSingleColumnWindow implements RowWin
     return rowIterator;
   }
 
-  private RowWindow seek(int beginIndex, int endIndex) {
+  public void seek(int beginIndex, int endIndex) {
     this.beginIndex = beginIndex;
     this.endIndex = endIndex;
     size = endIndex - beginIndex;
 
     row.seek(beginIndex);
     rowIterator = null;
-
-    return this;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
index 069af89..49aae1a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/IntermediateLayer.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.query.udf.core.layer;
 
+import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.query.udf.api.customizer.strategy.AccessStrategy;
 import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
 import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
@@ -41,7 +42,7 @@ public abstract class IntermediateLayer {
   public abstract LayerRowReader constructRowReader();
 
   public final LayerRowWindowReader constructRowWindowReader(
-      AccessStrategy strategy, float memoryBudgetInMB) {
+      AccessStrategy strategy, float memoryBudgetInMB) throws QueryProcessException {
     switch (strategy.getAccessStrategyType()) {
       case SLIDING_TIME_WINDOW:
         return constructRowSlidingTimeWindowReader(
@@ -56,10 +57,9 @@ public abstract class IntermediateLayer {
   }
 
   protected abstract LayerRowWindowReader constructRowSlidingSizeWindowReader(
-      SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB);
+      SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB)
+      throws QueryProcessException;
 
   protected abstract LayerRowWindowReader constructRowSlidingTimeWindowReader(
       SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB);
-
-  public abstract void updateEvictionUpperBound();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputMultiOutputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputMultiOutputIntermediateLayer.java
index 9cd5422..2a70a7b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputMultiOutputIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputMultiOutputIntermediateLayer.java
@@ -19,17 +19,40 @@
 
 package org.apache.iotdb.db.query.udf.core.layer;
 
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader;
+import org.apache.iotdb.db.query.udf.core.reader.LayerRowWindowReader;
 
 import java.util.List;
 
-public class MultiInputMultiOutputIntermediateLayer implements IntermediateLayer {
+public class MultiInputMultiOutputIntermediateLayer extends IntermediateLayer {
 
   public MultiInputMultiOutputIntermediateLayer(
-      List<LayerPointReader> parentLayerPointReaders, long queryId, float memoryBudgetInMB) {}
+      long queryId, float memoryBudgetInMB, List<LayerPointReader> parentLayerPointReaders) {
+    super(queryId, memoryBudgetInMB);
+  }
 
   @Override
   public LayerPointReader constructPointReader() {
     return null;
   }
+
+  @Override
+  public LayerRowReader constructRowReader() {
+    return null;
+  }
+
+  @Override
+  protected LayerRowWindowReader constructRowSlidingSizeWindowReader(
+      SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB) {
+    return null;
+  }
+
+  @Override
+  protected LayerRowWindowReader constructRowSlidingTimeWindowReader(
+      SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) {
+    return null;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputSingleOutputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputSingleOutputIntermediateLayer.java
index 71a4f06..c63b1be 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputSingleOutputIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/MultiInputSingleOutputIntermediateLayer.java
@@ -19,17 +19,40 @@
 
 package org.apache.iotdb.db.query.udf.core.layer;
 
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader;
+import org.apache.iotdb.db.query.udf.core.reader.LayerRowWindowReader;
 
 import java.util.List;
 
-public class MultiInputSingleOutputIntermediateLayer implements IntermediateLayer {
+public class MultiInputSingleOutputIntermediateLayer extends IntermediateLayer {
 
   public MultiInputSingleOutputIntermediateLayer(
-      List<LayerPointReader> parentLayerPointReaders, long queryId, float memoryBudgetInMB) {}
+      long queryId, float memoryBudgetInMB, List<LayerPointReader> parentLayerPointReaders) {
+    super(queryId, memoryBudgetInMB);
+  }
 
   @Override
   public LayerPointReader constructPointReader() {
     return null;
   }
+
+  @Override
+  public LayerRowReader constructRowReader() {
+    return null;
+  }
+
+  @Override
+  protected LayerRowWindowReader constructRowSlidingSizeWindowReader(
+      SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB) {
+    return null;
+  }
+
+  @Override
+  protected LayerRowWindowReader constructRowSlidingTimeWindowReader(
+      SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) {
+    return null;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java
index a711ced..17467b8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputMultiOutputIntermediateLayer.java
@@ -20,27 +20,33 @@
 package org.apache.iotdb.db.query.udf.core.layer;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
 import org.apache.iotdb.db.query.udf.core.layer.SafetyLine.SafetyPile;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader;
+import org.apache.iotdb.db.query.udf.core.reader.LayerRowWindowReader;
 import org.apache.iotdb.db.query.udf.datastructure.tv.ElasticSerializableTVList;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 
 import java.io.IOException;
 
-public class SingleInputMultiOutputIntermediateLayer implements IntermediateLayer {
+public class SingleInputMultiOutputIntermediateLayer extends IntermediateLayer {
 
   private static final int CACHE_BLOCK_SIZE = 2;
 
-  private final TSDataType dataType;
   private final LayerPointReader parentLayerPointReader;
+  private final TSDataType dataType;
   private final ElasticSerializableTVList tvList;
   private final SafetyLine safetyLine;
 
   public SingleInputMultiOutputIntermediateLayer(
-      LayerPointReader parentLayerPointReader, long queryId, float memoryBudgetInMB)
+      long queryId, float memoryBudgetInMB, LayerPointReader parentLayerPointReader)
       throws QueryProcessException {
+    super(queryId, memoryBudgetInMB);
     this.parentLayerPointReader = parentLayerPointReader;
+
     dataType = parentLayerPointReader.getDataType();
     tvList =
         ElasticSerializableTVList.newElasticSerializableTVList(
@@ -162,4 +168,21 @@ public class SingleInputMultiOutputIntermediateLayer implements IntermediateLaye
       }
     };
   }
+
+  @Override
+  public LayerRowReader constructRowReader() {
+    return null;
+  }
+
+  @Override
+  protected LayerRowWindowReader constructRowSlidingSizeWindowReader(
+      SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB) {
+    return null;
+  }
+
+  @Override
+  protected LayerRowWindowReader constructRowSlidingTimeWindowReader(
+      SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) {
+    return null;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java
index 131472a..3c1c705 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/SingleInputSingleOutputIntermediateLayer.java
@@ -19,13 +19,28 @@
 
 package org.apache.iotdb.db.query.udf.core.layer;
 
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.access.RowWindow;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
+import org.apache.iotdb.db.query.udf.core.access.ElasticSerializableTVListBackedSingleColumnWindow;
+import org.apache.iotdb.db.query.udf.core.access.LayerPointReaderBackedSingleColumnRow;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
+import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader;
+import org.apache.iotdb.db.query.udf.core.reader.LayerRowWindowReader;
+import org.apache.iotdb.db.query.udf.datastructure.tv.ElasticSerializableTVList;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
-public class SingleInputSingleOutputIntermediateLayer implements IntermediateLayer {
+import java.io.IOException;
+
+public class SingleInputSingleOutputIntermediateLayer extends IntermediateLayer {
 
   private final LayerPointReader parentLayerPointReader;
 
-  public SingleInputSingleOutputIntermediateLayer(LayerPointReader parentLayerPointReader) {
+  public SingleInputSingleOutputIntermediateLayer(
+      long queryId, float memoryBudgetInMB, LayerPointReader parentLayerPointReader) {
+    super(queryId, memoryBudgetInMB);
     this.parentLayerPointReader = parentLayerPointReader;
   }
 
@@ -33,4 +48,151 @@ public class SingleInputSingleOutputIntermediateLayer implements IntermediateLay
   public LayerPointReader constructPointReader() {
     return parentLayerPointReader;
   }
+
+  @Override
+  public LayerRowReader constructRowReader() {
+
+    return new LayerRowReader() {
+
+      private final Row row = new LayerPointReaderBackedSingleColumnRow(parentLayerPointReader);
+      private final TSDataType[] dataTypes =
+          new TSDataType[] {parentLayerPointReader.getDataType()};
+      private boolean hasCached = false;
+
+      @Override
+      public boolean next() throws IOException, QueryProcessException {
+        if (hasCached) {
+          return true;
+        }
+        hasCached = parentLayerPointReader.next();
+        return hasCached;
+      }
+
+      @Override
+      public void readyForNext() {
+        parentLayerPointReader.readyForNext();
+        hasCached = false;
+      }
+
+      @Override
+      public TSDataType[] getDataTypes() {
+        return dataTypes;
+      }
+
+      @Override
+      public long currentTime() throws IOException {
+        return parentLayerPointReader.currentTime();
+      }
+
+      @Override
+      public Row currentRow() {
+        return row;
+      }
+    };
+  }
+
+  @Override
+  protected LayerRowWindowReader constructRowSlidingSizeWindowReader(
+      SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB)
+      throws QueryProcessException {
+
+    return new LayerRowWindowReader() {
+
+      private final int windowSize = strategy.getWindowSize();
+      private final int slidingStep = strategy.getSlidingStep();
+
+      private final TSDataType dataType = parentLayerPointReader.getDataType();
+      private final ElasticSerializableTVList tvList =
+          ElasticSerializableTVList.newElasticSerializableTVList(
+              dataType, queryId, memoryBudgetInMB, 2);
+      private final ElasticSerializableTVListBackedSingleColumnWindow window =
+          new ElasticSerializableTVListBackedSingleColumnWindow(tvList);
+
+      private boolean hasCached = false;
+      private int beginIndex = -slidingStep;
+
+      @Override
+      public boolean next() throws IOException, QueryProcessException {
+        if (hasCached) {
+          return true;
+        }
+
+        beginIndex += slidingStep;
+        int endIndex = beginIndex + windowSize;
+
+        int pointsToBeCollected = endIndex - tvList.size();
+        if (0 < pointsToBeCollected) {
+          hasCached = collectPoints(pointsToBeCollected) != 0;
+          window.seek(beginIndex, tvList.size());
+        } else {
+          hasCached = true;
+          window.seek(beginIndex, endIndex);
+        }
+
+        return hasCached;
+      }
+
+      /** @return number of actually collected, which may be less than or equals to pointNumber */
+      private int collectPoints(int pointNumber) throws QueryProcessException, IOException {
+        int count = 0;
+
+        while (parentLayerPointReader.next() && count < pointNumber) {
+          ++count;
+
+          switch (dataType) {
+            case INT32:
+              tvList.putInt(
+                  parentLayerPointReader.currentTime(), parentLayerPointReader.currentInt());
+              break;
+            case INT64:
+              tvList.putLong(
+                  parentLayerPointReader.currentTime(), parentLayerPointReader.currentLong());
+              break;
+            case FLOAT:
+              tvList.putFloat(
+                  parentLayerPointReader.currentTime(), parentLayerPointReader.currentFloat());
+              break;
+            case DOUBLE:
+              tvList.putDouble(
+                  parentLayerPointReader.currentTime(), parentLayerPointReader.currentDouble());
+              break;
+            case BOOLEAN:
+              tvList.putBoolean(
+                  parentLayerPointReader.currentTime(), parentLayerPointReader.currentBoolean());
+              break;
+            case TEXT:
+              tvList.putBinary(
+                  parentLayerPointReader.currentTime(), parentLayerPointReader.currentBinary());
+              break;
+            default:
+          }
+
+          parentLayerPointReader.readyForNext();
+        }
+
+        return count;
+      }
+
+      @Override
+      public void readyForNext() {
+        hasCached = false;
+      }
+
+      @Override
+      public TSDataType[] getDataTypes() {
+        return new TSDataType[] {dataType};
+      }
+
+      @Override
+      public RowWindow currentWindow() {
+        return window;
+      }
+    };
+  }
+
+  @Override
+  protected LayerRowWindowReader constructRowSlidingTimeWindowReader(
+      SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB) {
+    return null;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/UDFLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/UDFLayer.java
index a613c4c..3148dd4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/UDFLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/UDFLayer.java
@@ -31,8 +31,8 @@ import org.apache.iotdb.db.query.udf.api.access.RowWindow;
 import org.apache.iotdb.db.query.udf.api.customizer.strategy.AccessStrategy;
 import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
 import org.apache.iotdb.db.query.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
-import org.apache.iotdb.db.query.udf.core.access.RowImpl;
-import org.apache.iotdb.db.query.udf.core.access.RowWindowImpl;
+import org.apache.iotdb.db.query.udf.core.access.MultiColumnRow;
+import org.apache.iotdb.db.query.udf.core.access.MultiColumnWindow;
 import org.apache.iotdb.db.query.udf.core.layer.SafetyLine.SafetyPile;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.db.query.udf.core.reader.LayerRowReader;
@@ -68,7 +68,7 @@ public class UDFLayer {
       List<TSDataType> dataTypes,
       List<ManagedSeriesReader> readers)
       throws QueryProcessException, IOException, InterruptedException {
-    constructInputLayer(
+    construct(
         queryId,
         memoryBudgetInMB,
         new RawQueryDataSetWithoutValueFilter(queryId, paths, dataTypes, readers, true));
@@ -84,14 +84,18 @@ public class UDFLayer {
       List<IReaderByTimestamp> readers,
       List<Boolean> cached)
       throws QueryProcessException {
-    constructInputLayer(
+    construct(
         queryId,
         memoryBudgetInMB,
         new RawQueryDataSetWithValueFilter(paths, dataTypes, timeGenerator, readers, cached, true));
   }
 
-  private void constructInputLayer(
-      long queryId, float memoryBudgetInMB, UDFInputDataSet queryDataSet)
+  public UDFLayer(long queryId, float memoryBudgetInMB, UDFInputDataSet queryDataSet)
+      throws QueryProcessException {
+    construct(queryId, memoryBudgetInMB, queryDataSet);
+  }
+
+  private void construct(long queryId, float memoryBudgetInMB, UDFInputDataSet queryDataSet)
       throws QueryProcessException {
     this.queryId = queryId;
     this.queryDataSet = queryDataSet;
@@ -242,7 +246,7 @@ public class UDFLayer {
     private boolean hasCachedRowRecord;
     private Object[] cachedRowRecord;
 
-    private final RowImpl row;
+    private final MultiColumnRow row;
 
     public InputLayerRowReader(int[] columnIndexes) {
       safetyPile = safetyLine.addSafetyPile();
@@ -253,7 +257,7 @@ public class UDFLayer {
       hasCachedRowRecord = false;
       cachedRowRecord = null;
 
-      row = new RowImpl(columnIndexes, dataTypes);
+      row = new MultiColumnRow(columnIndexes, dataTypes);
     }
 
     @Override
@@ -321,7 +325,7 @@ public class UDFLayer {
 
     private final int windowSize;
     private final IntList rowIndexes;
-    private final RowWindowImpl rowWindow;
+    private final MultiColumnWindow rowWindow;
 
     private final int slidingStep;
 
@@ -343,7 +347,7 @@ public class UDFLayer {
           windowSize < SerializableIntList.calculateCapacity(memoryBudgetInMB)
               ? new WrappedIntArray(windowSize)
               : new ElasticSerializableIntList(queryId, memoryBudgetInMB, 2);
-      rowWindow = new RowWindowImpl(rowRecordList, columnIndexes, dataTypes, rowIndexes);
+      rowWindow = new MultiColumnWindow(rowRecordList, columnIndexes, dataTypes, rowIndexes);
 
       slidingStep = accessStrategy.getSlidingStep();
 
@@ -452,7 +456,7 @@ public class UDFLayer {
     private final long displayWindowEnd;
 
     private final IntList rowIndexes;
-    private final RowWindowImpl rowWindow;
+    private final MultiColumnWindow rowWindow;
 
     private long nextWindowTimeBegin;
     private int nextIndexBegin;
@@ -475,7 +479,7 @@ public class UDFLayer {
       displayWindowEnd = accessStrategy.getDisplayWindowEnd();
 
       rowIndexes = new ElasticSerializableIntList(queryId, memoryBudgetInMB, 2);
-      rowWindow = new RowWindowImpl(rowRecordList, columnIndexes, dataTypes, rowIndexes);
+      rowWindow = new MultiColumnWindow(rowRecordList, columnIndexes, dataTypes, rowIndexes);
 
       nextWindowTimeBegin = accessStrategy.getDisplayWindowBegin();
       nextIndexBegin = 0;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/reader/LayerRowReader.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/reader/LayerRowReader.java
index 413a24d..0bc6521 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/reader/LayerRowReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/reader/LayerRowReader.java
@@ -33,7 +33,7 @@ public interface LayerRowReader {
 
   TSDataType[] getDataTypes();
 
-  long currentTime();
+  long currentTime() throws IOException;
 
   Row currentRow();
 }