You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/05/03 07:33:31 UTC

[iotdb] 01/01: Return Option instead of TsBlock to let caller clearly be aware of that the method will return null

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch stable-mpp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit cdc3846852cca337ed8918aeb85e07fcca1a37e1
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Tue May 3 15:33:16 2022 +0800

    Return Option<TsBlock> instead of TsBlock to let caller clearly be aware of that the method will return null
---
 .../db/mpp/plan/analyze/ClusterSchemaFetcher.java  |  7 ++++---
 .../db/mpp/plan/execution/IQueryExecution.java     |  4 +++-
 .../db/mpp/plan/execution/QueryExecution.java      |  9 +++++----
 .../mpp/plan/execution/config/ConfigExecution.java |  7 ++++---
 .../apache/iotdb/db/utils/QueryDataSetUtils.java   |  6 ++++--
 .../db/mpp/execution/ConfigExecutionTest.java      | 23 +++++++++++-----------
 6 files changed, 32 insertions(+), 24 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index 30cba02443..781e01eb50 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -41,6 +41,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 public class ClusterSchemaFetcher implements ISchemaFetcher {
 
@@ -79,14 +80,14 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
     }
     SchemaTree result = new SchemaTree();
     while (coordinator.getQueryExecution(queryId).hasNextResult()) {
-      TsBlock tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
-      if (tsBlock == null) {
+      Optional<TsBlock> tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
+      if (!tsBlock.isPresent()) {
         break;
       }
       result.setStorageGroups(storageGroups);
       Binary binary;
       SchemaTree fetchedSchemaTree;
-      Column column = tsBlock.getColumn(0);
+      Column column = tsBlock.get().getColumn(0);
       for (int i = 0; i < column.getPositionCount(); i++) {
         binary = column.getBinary(i);
         fetchedSchemaTree = SchemaTree.deserialize(ByteBuffer.wrap(binary.getValues()));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
index ad21e71531..49434afec5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/IQueryExecution.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.db.mpp.plan.execution;
 import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
+import java.util.Optional;
+
 public interface IQueryExecution {
 
   void start();
@@ -32,7 +34,7 @@ public interface IQueryExecution {
 
   ExecutionResult getStatus();
 
-  TsBlock getBatchResult();
+  Optional<TsBlock> getBatchResult();
 
   boolean hasNextResult();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 776a75d58e..4ba48805bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -56,6 +56,7 @@ import org.slf4j.LoggerFactory;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -221,18 +222,18 @@ public class QueryExecution implements IQueryExecution {
    * implemented with DataStreamManager)
    */
   @Override
-  public TsBlock getBatchResult() {
+  public Optional<TsBlock> getBatchResult() {
     try {
       if (resultHandle.isAborted() || resultHandle.isFinished()) {
-        return null;
+        return Optional.empty();
       }
       ListenableFuture<Void> blocked = resultHandle.isBlocked();
       blocked.get();
       if (resultHandle.isFinished()) {
         releaseResource();
-        return null;
+        return Optional.empty();
       }
-      return resultHandle.receive();
+      return Optional.of(resultHandle.receive());
     } catch (ExecutionException | CancellationException e) {
       stateMachine.transitionToFailed(e);
       throwIfUnchecked(e.getCause());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
index eccb8e19d6..907b7ff122 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigExecution.java
@@ -37,6 +37,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import jersey.repackaged.com.google.common.util.concurrent.SettableFuture;
 import org.jetbrains.annotations.NotNull;
 
+import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 
@@ -129,12 +130,12 @@ public class ConfigExecution implements IQueryExecution {
   }
 
   @Override
-  public TsBlock getBatchResult() {
+  public Optional<TsBlock> getBatchResult() {
     if (!resultSetConsumed) {
       resultSetConsumed = true;
-      return resultSet;
+      return Optional.of(resultSet);
     }
-    return null;
+    return Optional.empty();
   }
 
   // According to the execution process of ConfigExecution, there is only one TsBlock for
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index a50bc92564..3b8d2bd608 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -39,6 +39,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Optional;
 
 /** TimeValuePairUtils to convert between thrift format and TsFile format. */
 public class QueryDataSetUtils {
@@ -196,10 +197,11 @@ public class QueryDataSetUtils {
     // used to record a bitmap for every 8 points
     int[] bitmaps = new int[columnNum];
     while (rowCount < fetchSize) {
-      TsBlock tsBlock = queryExecution.getBatchResult();
-      if (tsBlock == null) {
+      Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
+      if (!optionalTsBlock.isPresent()) {
         break;
       }
+      TsBlock tsBlock = optionalTsBlock.get();
       int currentCount = tsBlock.getPositionCount();
       // serialize time column
       for (int i = 0; i < currentCount; i++) {
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
index 306bb21d07..e51e324290 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
-import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -45,6 +44,8 @@ import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 
 import static com.google.common.util.concurrent.Futures.immediateFuture;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class ConfigExecutionTest {
 
@@ -55,7 +56,7 @@ public class ConfigExecutionTest {
         new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
     execution.start();
     ExecutionResult result = execution.getStatus();
-    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), result.status.code);
+    assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), result.status.code);
   }
 
   @Test
@@ -77,10 +78,12 @@ public class ConfigExecutionTest {
     ExecutionResult result = execution.getStatus();
     TsBlock tsBlockFromExecution = null;
     if (execution.hasNextResult()) {
-      tsBlockFromExecution = execution.getBatchResult();
+      Optional<TsBlock> optionalTsBlock = execution.getBatchResult();
+      assertTrue(optionalTsBlock.isPresent());
+      tsBlockFromExecution = optionalTsBlock.get();
     }
-    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), result.status.code);
-    Assert.assertEquals(tsBlock, tsBlockFromExecution);
+    assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), result.status.code);
+    assertEquals(tsBlock, tsBlockFromExecution);
   }
 
   @Test
@@ -93,7 +96,7 @@ public class ConfigExecutionTest {
         new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
     execution.start();
     ExecutionResult result = execution.getStatus();
-    Assert.assertEquals(TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode(), result.status.code);
+    assertEquals(TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode(), result.status.code);
   }
 
   @Test
@@ -120,8 +123,7 @@ public class ConfigExecutionTest {
         new Thread(
             () -> {
               ExecutionResult result = execution.getStatus();
-              Assert.assertEquals(
-                  TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), result.status.code);
+              assertEquals(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), result.status.code);
             });
     resultThread.start();
     taskResult.cancel(true);
@@ -140,8 +142,7 @@ public class ConfigExecutionTest {
         new Thread(
             () -> {
               ExecutionResult result = execution.getStatus();
-              Assert.assertEquals(
-                  TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), result.status.code);
+              assertEquals(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), result.status.code);
             });
     resultThread.start();
     execution.start();
@@ -154,7 +155,7 @@ public class ConfigExecutionTest {
       // Assert.fail("InterruptedException should be threw here");
     } catch (InterruptedException e) {
       ExecutionResult result = execution.getStatus();
-      Assert.assertEquals(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), result.status.code);
+      assertEquals(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), result.status.code);
       execution.stop();
     }
   }