You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/31 01:07:27 UTC

[iotdb] branch master updated: [IOTDB-3229] Field is 0 rather than null when query two series in one device (#6071)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 88adfbea69 [IOTDB-3229] Field is 0 rather than null when query two series in one device (#6071)
88adfbea69 is described below

commit 88adfbea697931b1d03cd83394491fe77cf4f960
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue May 31 09:07:22 2022 +0800

    [IOTDB-3229] Field is 0 rather than null when query two series in one device (#6071)
---
 .../execution/operator/process/FilterOperator.java | 80 ++++++++++------------
 .../operator/process/TransformOperator.java        | 70 +++++++++++--------
 .../iotdb/tsfile/read/common/block/TsBlock.java    |  3 +-
 3 files changed, 79 insertions(+), 74 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
index b9050c27ad..cf04e891ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
@@ -33,6 +33,9 @@ import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.time.ZoneId;
 import java.util.ArrayList;
@@ -41,6 +44,8 @@ import java.util.Map;
 
 public class FilterOperator extends TransformOperator {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(FilterOperator.class);
+
   private LayerPointReader filterPointReader;
 
   public FilterOperator(
@@ -90,18 +95,6 @@ public class FilterOperator extends TransformOperator {
     }
   }
 
-  @Override
-  protected void readyForFirstIteration() throws QueryProcessException, IOException {
-    iterateFilterReaderToNextValid();
-  }
-
-  private void iterateFilterReaderToNextValid() throws QueryProcessException, IOException {
-    do {
-      filterPointReader.readyForNext();
-    } while (filterPointReader.next()
-        && (filterPointReader.isCurrentNull() || !filterPointReader.currentBoolean()));
-  }
-
   @Override
   public TsBlock next() {
     final TsBlockBuilder tsBlockBuilder = TsBlockBuilder.createWithOnlyTimeColumn();
@@ -121,56 +114,57 @@ public class FilterOperator extends TransformOperator {
 
     try {
       int rowCount = 0;
-      while (rowCount < FETCH_SIZE && filterPointReader.next()) {
-        final long currentTime = filterPointReader.currentTime();
 
-        boolean hasAtLeastOneValid = false;
-        for (int i = 0; i < outputColumnCount; ++i) {
-          if (currentTime == iterateValueReadersToNextValid(transformers[i], currentTime)) {
-            hasAtLeastOneValid = true;
+      while (rowCount < FETCH_SIZE && !timeHeap.isEmpty()) {
+        final long currentTime = timeHeap.pollFirst();
+
+        if (filterPointReader.next() && filterPointReader.currentTime() == currentTime) {
+          if (!filterPointReader.isCurrentNull() && filterPointReader.currentBoolean()) {
+            // time
+            timeBuilder.writeLong(currentTime);
+
+            // values
+            for (int i = 0; i < outputColumnCount; ++i) {
+              collectDataPointAndIterateToNextValid(
+                  transformers[i], columnBuilders[i], currentTime);
+            }
+
+            ++rowCount;
+          } else {
+            // values
+            for (int i = 0; i < outputColumnCount; ++i) {
+              skipDataPointAndIterateToNextValid(transformers[i], currentTime);
+            }
           }
-        }
 
-        if (hasAtLeastOneValid) {
-          timeBuilder.writeLong(currentTime);
+          filterPointReader.readyForNext();
+          iterateReaderToNextValid(filterPointReader);
+        } else {
+          // values
           for (int i = 0; i < outputColumnCount; ++i) {
-            collectDataPoint(transformers[i], columnBuilders[i], currentTime);
+            skipDataPointAndIterateToNextValid(transformers[i], currentTime);
           }
-          ++rowCount;
         }
 
-        iterateFilterReaderToNextValid();
-
         inputLayer.updateRowRecordListEvictionUpperBound();
       }
 
       tsBlockBuilder.declarePositions(rowCount);
     } catch (Exception e) {
+      LOGGER.error("FilterOperator#next()", e);
       throw new RuntimeException(e);
     }
 
     return tsBlockBuilder.build();
   }
 
-  private long iterateValueReadersToNextValid(LayerPointReader reader, long currentTime)
-      throws QueryProcessException, IOException {
-    while (reader.next() && (reader.isCurrentNull() || reader.currentTime() < currentTime)) {
-      reader.readyForNext();
+  private void skipDataPointAndIterateToNextValid(LayerPointReader reader, long currentTime)
+      throws IOException, QueryProcessException {
+    if (!reader.next() || reader.currentTime() != currentTime) {
+      return;
     }
-    return reader.currentTime();
-  }
 
-  @Override
-  public boolean hasNext() {
-    try {
-      if (isFirstIteration) {
-        readyForFirstIteration();
-        isFirstIteration = false;
-      }
-
-      return filterPointReader.next();
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
+    reader.readyForNext();
+    iterateReaderToNextValid(reader);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
index c66d3d3c98..b36c5cd308 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
@@ -42,6 +42,8 @@ import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.time.ZoneId;
@@ -51,6 +53,8 @@ import java.util.Map;
 
 public class TransformOperator implements ProcessOperator {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(TransformOperator.class);
+
   // TODO: make it configurable
   protected static final int FETCH_SIZE = 10000;
 
@@ -142,7 +146,7 @@ public class TransformOperator implements ProcessOperator {
     }
   }
 
-  private void iterateReaderToNextValid(LayerPointReader reader)
+  protected void iterateReaderToNextValid(LayerPointReader reader)
       throws QueryProcessException, IOException {
     // Since a constant operand is not allowed to be a result column, the reader will not be
     // a ConstantLayerPointReader.
@@ -158,11 +162,12 @@ public class TransformOperator implements ProcessOperator {
   }
 
   @Override
-  public boolean hasNext() {
+  public final boolean hasNext() {
     if (isFirstIteration) {
       try {
         readyForFirstIteration();
       } catch (Exception e) {
+        LOGGER.error("TransformOperator#hasNext()", e);
         throw new RuntimeException(e);
       }
       isFirstIteration = false;
@@ -197,9 +202,7 @@ public class TransformOperator implements ProcessOperator {
 
         // values
         for (int i = 0; i < columnCount; ++i) {
-          LayerPointReader reader = transformers[i];
-          collectDataPoint(reader, columnBuilders[i], currentTime);
-          iterateReaderToNextValid(reader);
+          collectDataPointAndIterateToNextValid(transformers[i], columnBuilders[i], currentTime);
         }
 
         ++rowCount;
@@ -209,45 +212,52 @@ public class TransformOperator implements ProcessOperator {
 
       tsBlockBuilder.declarePositions(rowCount);
     } catch (Exception e) {
+      LOGGER.error("TransformOperator#next()", e);
       throw new RuntimeException(e);
     }
 
     return tsBlockBuilder.build();
   }
 
-  protected void collectDataPoint(LayerPointReader reader, ColumnBuilder writer, long currentTime)
+  protected void collectDataPointAndIterateToNextValid(
+      LayerPointReader reader, ColumnBuilder writer, long currentTime)
       throws QueryProcessException, IOException {
-    if (!reader.next() || reader.currentTime() != currentTime || reader.isCurrentNull()) {
+    if (!reader.next() || reader.currentTime() != currentTime) {
       writer.appendNull();
       return;
     }
 
-    TSDataType type = reader.getDataType();
-    switch (type) {
-      case INT32:
-        writer.writeInt(reader.currentInt());
-        break;
-      case INT64:
-        writer.writeLong(reader.currentLong());
-        break;
-      case FLOAT:
-        writer.writeFloat(reader.currentFloat());
-        break;
-      case DOUBLE:
-        writer.writeDouble(reader.currentDouble());
-        break;
-      case BOOLEAN:
-        writer.writeBoolean(reader.currentBoolean());
-        break;
-      case TEXT:
-        writer.writeBinary(reader.currentBinary());
-        break;
-      default:
-        throw new UnSupportedDataTypeException(
-            String.format("Data type %s is not supported.", type));
+    if (reader.isCurrentNull()) {
+      writer.appendNull();
+    } else {
+      TSDataType type = reader.getDataType();
+      switch (type) {
+        case INT32:
+          writer.writeInt(reader.currentInt());
+          break;
+        case INT64:
+          writer.writeLong(reader.currentLong());
+          break;
+        case FLOAT:
+          writer.writeFloat(reader.currentFloat());
+          break;
+        case DOUBLE:
+          writer.writeDouble(reader.currentDouble());
+          break;
+        case BOOLEAN:
+          writer.writeBoolean(reader.currentBoolean());
+          break;
+        case TEXT:
+          writer.writeBinary(reader.currentBinary());
+          break;
+        default:
+          throw new UnSupportedDataTypeException(
+              String.format("Data type %s is not supported.", type));
+      }
     }
 
     reader.readyForNext();
+    iterateReaderToNextValid(reader);
   }
 
   @Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
index 18e61c2ea3..7e4d836bcd 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
@@ -347,7 +347,8 @@ public class TsBlock {
       int columnCount = getValueColumnCount();
       Object[] row = new Object[columnCount + 1];
       for (int i = 0; i < columnCount; ++i) {
-        row[i] = valueColumns[i].getObject(rowIndex);
+        final Column column = valueColumns[i];
+        row[i] = column.isNull(rowIndex) ? null : column.getObject(rowIndex);
       }
       row[columnCount] = timeColumn.getObject(rowIndex);