You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/12/02 04:18:28 UTC

[iotdb] 01/06: fix bug when spliting

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-1971
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4f9d53acd8dbba0e2364ef9377e11fb872a18f58
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Wed Dec 1 16:52:29 2021 +0800

    fix bug when spliting
---
 .../query/dataset/udf/UDTFAlignByTimeDataSet.java  |  6 ++--
 .../iotdb/db/query/dataset/udf/UDTFDataSet.java    |  6 ++--
 .../db/query/dataset/udf/UDTFFragmentDataSet.java  | 11 ++++---
 .../query/dataset/udf/UDTFFragmentDataSetTask.java | 17 +++++-----
 .../db/query/dataset/udf/UDTFJoinDataSet.java      |  3 +-
 .../iotdb/db/query/expression/Expression.java      |  3 ++
 .../query/expression/binary/BinaryExpression.java  | 14 +++++++++
 .../db/query/expression/unary/ConstantOperand.java |  7 +++++
 .../query/expression/unary/FunctionExpression.java | 18 +++++++++++
 .../query/expression/unary/NegationExpression.java |  9 ++++++
 .../query/expression/unary/TimeSeriesOperand.java  |  7 +++++
 .../pool/DataSetFragmentExecutionPoolManager.java  |  8 +++--
 .../db/query/udf/core/layer/LayerBuilder.java      | 10 +++---
 .../query/udf/core/layer/RawQueryInputLayer.java   | 36 ++++++++++++----------
 14 files changed, 111 insertions(+), 44 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
index d3bc678..b0b6a3e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFAlignByTimeDataSet.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
 import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
 import org.apache.iotdb.db.query.reader.series.ManagedSeriesReader;
+import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
@@ -80,9 +81,10 @@ public class UDTFAlignByTimeDataSet extends UDTFDataSet implements DirectAlignBy
   }
 
   /** for data set fragment */
-  protected UDTFAlignByTimeDataSet(LayerPointReader[] transformers)
+  protected UDTFAlignByTimeDataSet(
+      RawQueryInputLayer rawQueryInputLayer, LayerPointReader[] transformers)
       throws QueryProcessException, IOException {
-    super(transformers);
+    super(rawQueryInputLayer, transformers);
     initTimeHeap();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFDataSet.java
index 5700336..3e523b7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFDataSet.java
@@ -128,12 +128,12 @@ public abstract class UDTFDataSet extends QueryDataSet {
   }
 
   /** for data set fragment */
-  protected UDTFDataSet(LayerPointReader[] transformers) {
-    // The following 3 fields are useless because they are recorded in their parent data set.
+  protected UDTFDataSet(RawQueryInputLayer rawQueryInputLayer, LayerPointReader[] transformers) {
+    // The following 2 fields are useless.
     queryId = -1;
     udtfPlan = null;
-    rawQueryInputLayer = null;
 
+    this.rawQueryInputLayer = rawQueryInputLayer;
     this.transformers = transformers;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java
index 50c646a..8d1cbbb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSet.java
@@ -21,8 +21,10 @@ package org.apache.iotdb.db.query.dataset.udf;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.query.pool.DataSetFragmentExecutionPoolManager;
+import org.apache.iotdb.db.query.udf.core.layer.RawQueryInputLayer;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,7 +33,7 @@ import java.io.IOException;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
-public class UDTFFragmentDataSet extends UDTFAlignByTimeDataSet {
+public class UDTFFragmentDataSet extends QueryDataSet {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(UDTFFragmentDataSet.class);
 
@@ -39,6 +41,7 @@ public class UDTFFragmentDataSet extends UDTFAlignByTimeDataSet {
   private static final DataSetFragmentExecutionPoolManager
       DATA_SET_FRAGMENT_EXECUTION_POOL_MANAGER = DataSetFragmentExecutionPoolManager.getInstance();
 
+  private final QueryDataSet fragmentDataSet;
   private final BlockingQueue<Object[]> productionBlockingQueue;
 
   private RowRecord[] rowRecords = null;
@@ -47,9 +50,9 @@ public class UDTFFragmentDataSet extends UDTFAlignByTimeDataSet {
 
   private boolean hasNextRowRecords = true;
 
-  public UDTFFragmentDataSet(LayerPointReader[] transformers)
+  public UDTFFragmentDataSet(RawQueryInputLayer rawQueryInputLayer, LayerPointReader[] transformers)
       throws QueryProcessException, IOException {
-    super(transformers);
+    fragmentDataSet = new UDTFAlignByTimeDataSet(rawQueryInputLayer, transformers);
     productionBlockingQueue = new LinkedBlockingQueue<>(BLOCKING_QUEUE_CAPACITY);
     submitTask();
   }
@@ -105,7 +108,7 @@ public class UDTFFragmentDataSet extends UDTFAlignByTimeDataSet {
   private void submitTask() {
     if (productionBlockingQueue.remainingCapacity() > 0) {
       DATA_SET_FRAGMENT_EXECUTION_POOL_MANAGER.submit(
-          new UDTFFragmentDataSetTask(fetchSize, this, productionBlockingQueue));
+          new UDTFFragmentDataSetTask(fetchSize, fragmentDataSet, productionBlockingQueue));
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSetTask.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSetTask.java
index fb50dea..62452dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSetTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFFragmentDataSetTask.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.query.dataset.udf;
 
 import org.apache.iotdb.db.concurrent.WrappedRunnable;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,22 +33,20 @@ public class UDTFFragmentDataSetTask extends WrappedRunnable {
   private static final Logger LOGGER = LoggerFactory.getLogger(UDTFFragmentDataSetTask.class);
 
   private final int fetchSize;
-  private final UDTFFragmentDataSet fragmentDataSet;
+  private final QueryDataSet queryDataSet;
 
   // there are 3 elements in Object[].
   // [0]: RowRecord[] or Throwable.
   // [2]: Integer. actual length of produced row records in [0]. note that the element is -1 when
   // the [0] element is a Throwable.
-  // [1]: Boolean. true if the fragmentDataSet still has next RowRecord to be consumed, otherwise
+  // [1]: Boolean. true if the queryDataSet still has next RowRecord to be consumed, otherwise
   // false. note that the element is false when the [0] element is a Throwable.
   private final BlockingQueue<Object[]> productionBlockingQueue;
 
   public UDTFFragmentDataSetTask(
-      int fetchSize,
-      UDTFFragmentDataSet fragmentDataSet,
-      BlockingQueue<Object[]> productionBlockingQueue) {
+      int fetchSize, QueryDataSet queryDataSet, BlockingQueue<Object[]> productionBlockingQueue) {
     this.fetchSize = fetchSize;
-    this.fragmentDataSet = fragmentDataSet;
+    this.queryDataSet = queryDataSet;
     this.productionBlockingQueue = productionBlockingQueue;
   }
 
@@ -56,13 +55,13 @@ public class UDTFFragmentDataSetTask extends WrappedRunnable {
     try {
       int rowRecordCount = 0;
       RowRecord[] rowRecords = new RowRecord[fetchSize];
-      while (rowRecordCount < fetchSize && fragmentDataSet.hasNextWithoutConstraint()) {
-        rowRecords[rowRecordCount++] = fragmentDataSet.nextWithoutConstraint();
+      while (rowRecordCount < fetchSize && queryDataSet.hasNextWithoutConstraint()) {
+        rowRecords[rowRecordCount++] = queryDataSet.nextWithoutConstraint();
       }
 
       // if a task is submitted, there must be free space in the queue
       productionBlockingQueue.put(
-          new Object[] {rowRecords, rowRecordCount, fragmentDataSet.hasNextWithoutConstraint()});
+          new Object[] {rowRecords, rowRecordCount, queryDataSet.hasNextWithoutConstraint()});
     } catch (Throwable e) {
       onThrowable(e);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
index ca2ef3a..323e1c2 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
 import java.io.IOException;
 
-// TODO: performances joining in pool, packing row records while calculating
 public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDataSet {
 
   private final UDTFFragmentDataSet[] fragmentDataSets;
@@ -68,7 +67,7 @@ public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDa
     timeHeap = new TimeSelector(resultColumnsLength << 1, true);
 
     for (int i = 0; i < resultColumnsLength; ++i) {
-      UDTFDataSet fragmentDataSet = fragmentDataSets[i];
+      QueryDataSet fragmentDataSet = fragmentDataSets[i];
       if (fragmentDataSet.hasNextWithoutConstraint()) {
         rowRecordsCache[i] = fragmentDataSet.nextWithoutConstraint();
         timeHeap.add(rowRecordsCache[i].getTimestamp());
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index 27df69b..6284c02 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -96,6 +96,9 @@ public abstract class Expression {
     return expressionIntermediateLayerMap.get(this);
   }
 
+  public abstract Integer tryToGetFragmentDataSetIndex(
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap);
+
   /** Sub-classes should override this method indicating if the expression is a constant operand */
   protected abstract boolean isConstantOperandInternal();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index 176b746..9fc05dd 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -188,6 +188,20 @@ public abstract class BinaryExpression extends Expression {
       LayerPointReader leftParentLayerPointReader, LayerPointReader rightParentLayerPointReader);
 
   @Override
+  public Integer tryToGetFragmentDataSetIndex(
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap) {
+    IntermediateLayer intermediateLayer = expressionIntermediateLayerMap.get(this);
+    if (intermediateLayer != null) {
+      return intermediateLayer.getFragmentDataSetIndex();
+    }
+
+    Integer index = leftExpression.tryToGetFragmentDataSetIndex(expressionIntermediateLayerMap);
+    return index != null
+        ? index
+        : rightExpression.tryToGetFragmentDataSetIndex(expressionIntermediateLayerMap);
+  }
+
+  @Override
   public final String getExpressionStringInternal() {
     StringBuilder builder = new StringBuilder();
     if (leftExpression instanceof BinaryExpression) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
index 0a58321..f55c82c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
@@ -102,6 +102,13 @@ public class ConstantOperand extends Expression {
   }
 
   @Override
+  public Integer tryToGetFragmentDataSetIndex(
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap) {
+    IntermediateLayer intermediateLayer = expressionIntermediateLayerMap.get(this);
+    return intermediateLayer == null ? null : intermediateLayer.getFragmentDataSetIndex();
+  }
+
+  @Override
   public String getExpressionStringInternal() {
     return valueString;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index 2217fe6..b9e8d7c 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -293,6 +293,24 @@ public class FunctionExpression extends Expression {
     }
   }
 
+  @Override
+  public Integer tryToGetFragmentDataSetIndex(
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap) {
+    IntermediateLayer intermediateLayer = expressionIntermediateLayerMap.get(this);
+    if (intermediateLayer != null) {
+      return intermediateLayer.getFragmentDataSetIndex();
+    }
+
+    for (Expression expression : expressions) {
+      Integer index = expression.tryToGetFragmentDataSetIndex(expressionIntermediateLayerMap);
+      if (index != null) {
+        return index;
+      }
+    }
+
+    return null;
+  }
+
   public List<PartialPath> getPaths() {
     if (paths == null) {
       paths = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index 39f4bab..c57d8e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -138,6 +138,15 @@ public class NegationExpression extends Expression {
   }
 
   @Override
+  public Integer tryToGetFragmentDataSetIndex(
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap) {
+    IntermediateLayer intermediateLayer = expressionIntermediateLayerMap.get(this);
+    return intermediateLayer != null
+        ? intermediateLayer.getFragmentDataSetIndex()
+        : expression.tryToGetFragmentDataSetIndex(expressionIntermediateLayerMap);
+  }
+
+  @Override
   public String getExpressionStringInternal() {
     return "-" + expression.toString();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index 9c65d48..ec2a4de 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -116,6 +116,13 @@ public class TimeSeriesOperand extends Expression {
                 this, queryId, memoryBudgetInMB, fragmentDataSetIndex, parentLayerPointReader));
   }
 
+  @Override
+  public Integer tryToGetFragmentDataSetIndex(
+      Map<Expression, IntermediateLayer> expressionIntermediateLayerMap) {
+    IntermediateLayer intermediateLayer = expressionIntermediateLayerMap.get(this);
+    return intermediateLayer == null ? null : intermediateLayer.getFragmentDataSetIndex();
+  }
+
   public String getExpressionStringInternal() {
     return path.isMeasurementAliasExists() ? path.getFullPathWithAlias() : path.getFullPath();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/pool/DataSetFragmentExecutionPoolManager.java b/server/src/main/java/org/apache/iotdb/db/query/pool/DataSetFragmentExecutionPoolManager.java
index 0705380..c62e415 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/pool/DataSetFragmentExecutionPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/pool/DataSetFragmentExecutionPoolManager.java
@@ -35,7 +35,9 @@ public class DataSetFragmentExecutionPoolManager extends AbstractPoolManager {
   private DataSetFragmentExecutionPoolManager() {
     pool =
         IoTDBThreadPoolFactory.newFixedThreadPool(
-            IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread(),
+            Math.min(
+                Runtime.getRuntime().availableProcessors(),
+                IoTDBDescriptor.getInstance().getConfig().getMaxConcurrentSubQueryThread()),
             ThreadName.QUERY_FRAGMENT_SERVICE.getName());
   }
 
@@ -58,7 +60,9 @@ public class DataSetFragmentExecutionPoolManager extends AbstractPoolManager {
     if (pool == null) {
       pool =
           IoTDBThreadPoolFactory.newFixedThreadPool(
-              IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread(),
+              Math.min(
+                  Runtime.getRuntime().availableProcessors(),
+                  IoTDBDescriptor.getInstance().getConfig().getMaxConcurrentSubQueryThread()),
               ThreadName.QUERY_FRAGMENT_SERVICE.getName());
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
index 1541829..a7b0582 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/LayerBuilder.java
@@ -93,12 +93,9 @@ public class LayerBuilder {
   public LayerBuilder buildResultColumnPointReaders() throws QueryProcessException, IOException {
     for (int i = 0, n = resultColumnExpressions.length; i < n; ++i) {
       // resultColumnExpressions[i] -> the index of the fragment it belongs to
-      int fragmentDataSetIndex;
-      IntermediateLayer intermediateLayer =
-          expressionIntermediateLayerMap.get(resultColumnExpressions[i]);
-      if (intermediateLayer != null) {
-        fragmentDataSetIndex = intermediateLayer.getFragmentDataSetIndex();
-      } else {
+      Integer fragmentDataSetIndex =
+          resultColumnExpressions[i].tryToGetFragmentDataSetIndex(expressionIntermediateLayerMap);
+      if (fragmentDataSetIndex == null) {
         fragmentDataSetIndex = fragmentDataSetIndexToLayerPointReaders.size();
         fragmentDataSetIndexToLayerPointReaders.add(new ArrayList<>());
       }
@@ -150,6 +147,7 @@ public class LayerBuilder {
     for (int i = 0; i < n; ++i) {
       fragmentDataSets[i] =
           new UDTFFragmentDataSet(
+              rawTimeSeriesInputLayer,
               fragmentDataSetIndexToLayerPointReaders.get(i).toArray(new LayerPointReader[0]));
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
index 55d8aca..5efef6a 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/layer/RawQueryInputLayer.java
@@ -87,7 +87,9 @@ public class RawQueryInputLayer {
   }
 
   public void updateRowRecordListEvictionUpperBound() {
-    rowRecordList.setEvictionUpperBound(safetyLine.getSafetyLine());
+    synchronized (rowRecordList) {
+      rowRecordList.setEvictionUpperBound(safetyLine.getSafetyLine());
+    }
   }
 
   public LayerPointReader constructPointReader(int columnIndex) {
@@ -125,27 +127,29 @@ public class RawQueryInputLayer {
         return true;
       }
 
-      for (int i = currentRowIndex + 1; i < rowRecordList.size(); ++i) {
-        Object[] rowRecordCandidate = rowRecordList.getRowRecord(i);
-        if (rowRecordCandidate[columnIndex] != null) {
-          hasCachedRowRecord = true;
-          cachedRowRecord = rowRecordCandidate;
-          currentRowIndex = i;
-          break;
-        }
-      }
-
-      if (!hasCachedRowRecord) {
-        while (queryDataSet.hasNextRowInObjects()) {
-          Object[] rowRecordCandidate = queryDataSet.nextRowInObjects();
-          rowRecordList.put(rowRecordCandidate);
+      synchronized (rowRecordList) {
+        for (int i = currentRowIndex + 1; i < rowRecordList.size(); ++i) {
+          Object[] rowRecordCandidate = rowRecordList.getRowRecord(i);
           if (rowRecordCandidate[columnIndex] != null) {
             hasCachedRowRecord = true;
             cachedRowRecord = rowRecordCandidate;
-            currentRowIndex = rowRecordList.size() - 1;
+            currentRowIndex = i;
             break;
           }
         }
+
+        if (!hasCachedRowRecord) {
+          while (queryDataSet.hasNextRowInObjects()) {
+            Object[] rowRecordCandidate = queryDataSet.nextRowInObjects();
+            rowRecordList.put(rowRecordCandidate);
+            if (rowRecordCandidate[columnIndex] != null) {
+              hasCachedRowRecord = true;
+              cachedRowRecord = rowRecordCandidate;
+              currentRowIndex = rowRecordList.size() - 1;
+              break;
+            }
+          }
+        }
       }
 
       return hasCachedRowRecord;