You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/20 08:51:34 UTC

[iotdb] 01/01: add TsBlock as the returned result of IConfigTask

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/config-exe-result
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7de6a157ba19203d6715c4885b8bcc184571b960
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Apr 20 16:51:18 2022 +0800

    add TsBlock as the returned result of IConfigTask
---
 .../mpp/execution/config/AuthorizerConfigTask.java |  6 ++--
 .../db/mpp/execution/config/ConfigExecution.java   | 33 +++++++++---------
 ...SampleConfigTask.java => ConfigTaskResult.java} | 38 ++++++++++++---------
 .../iotdb/db/mpp/execution/config/IConfigTask.java |  2 +-
 .../mpp/execution/config/SetStorageGroupTask.java  |  6 ++--
 .../db/mpp/execution/ConfigExecutionTest.java      | 39 ++++++++++++++++++----
 6 files changed, 79 insertions(+), 45 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/AuthorizerConfigTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/AuthorizerConfigTask.java
index 9b27ad3b9d..8cc891bf38 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/AuthorizerConfigTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/AuthorizerConfigTask.java
@@ -48,8 +48,8 @@ public class AuthorizerConfigTask implements IConfigTask {
   }
 
   @Override
-  public ListenableFuture<Void> execute() {
-    SettableFuture<Void> future = SettableFuture.create();
+  public ListenableFuture<ConfigTaskResult> execute() {
+    SettableFuture<ConfigTaskResult> future = SettableFuture.create();
     ConfigNodeClient configNodeClient = null;
     try {
       // Construct request using statement
@@ -75,7 +75,7 @@ public class AuthorizerConfigTask implements IConfigTask {
             tsStatus);
         future.setException(new StatementExecutionException(tsStatus));
       } else {
-        future.set(null);
+        future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
       }
     } catch (IoTDBConnectionException | BadNodeUrlException e) {
       LOGGER.error("Failed to connect to config node.");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
index fe63991dfc..b13d3006c6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
@@ -47,7 +47,8 @@ public class ConfigExecution implements IQueryExecution {
   private final ExecutorService executor;
 
   private final QueryStateMachine stateMachine;
-  private final SettableFuture<Boolean> result;
+  private final SettableFuture<ConfigTaskResult> taskFuture;
+  private TsBlock resultSet;
 
   private final IConfigTask task;
 
@@ -56,7 +57,7 @@ public class ConfigExecution implements IQueryExecution {
     this.statement = statement;
     this.executor = executor;
     this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
-    this.result = SettableFuture.create();
+    this.taskFuture = SettableFuture.create();
     this.task = statement.accept(new ConfigTaskVisitor(), new ConfigTaskVisitor.TaskContext());
   }
 
@@ -67,21 +68,21 @@ public class ConfigExecution implements IQueryExecution {
     this.statement = statement;
     this.executor = executor;
     this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
-    this.result = SettableFuture.create();
+    this.taskFuture = SettableFuture.create();
     this.task = task;
   }
 
   @Override
   public void start() {
     try {
-      ListenableFuture<Void> future = task.execute();
+      ListenableFuture<ConfigTaskResult> future = task.execute();
       Futures.addCallback(
           future,
-          new FutureCallback<Void>() {
+          new FutureCallback<ConfigTaskResult>() {
             @Override
-            public void onSuccess(Void success) {
+            public void onSuccess(ConfigTaskResult taskRet) {
               stateMachine.transitionToFinished();
-              result.set(true);
+              taskFuture.set(taskRet);
             }
 
             @Override
@@ -98,7 +99,7 @@ public class ConfigExecution implements IQueryExecution {
 
   public void fail(Throwable cause) {
     stateMachine.transitionToFailed(cause);
-    result.set(false);
+    taskFuture.set(new ConfigTaskResult(TSStatusCode.INTERNAL_SERVER_ERROR));
   }
 
   @Override
@@ -110,10 +111,11 @@ public class ConfigExecution implements IQueryExecution {
   @Override
   public ExecutionResult getStatus() {
     try {
-      Boolean success = result.get();
-      TSStatusCode statusCode =
-          success ? TSStatusCode.SUCCESS_STATUS : TSStatusCode.QUERY_PROCESS_ERROR;
-      String message = success ? "" : stateMachine.getFailureMessage();
+      ConfigTaskResult taskResult = taskFuture.get();
+      TSStatusCode statusCode = taskResult.getStatusCode();
+      resultSet = taskResult.getResultSet();
+      String message =
+          statusCode == TSStatusCode.SUCCESS_STATUS ? "" : stateMachine.getFailureMessage();
       return new ExecutionResult(context.getQueryId(), RpcUtils.getStatus(statusCode, message));
     } catch (InterruptedException | ExecutionException e) {
       Thread.currentThread().interrupt();
@@ -125,13 +127,14 @@ public class ConfigExecution implements IQueryExecution {
 
   @Override
   public TsBlock getBatchResult() {
-    // TODO
-    return null;
+    return resultSet;
   }
 
+  // According to the execution process of ConfigExecution. When the hasNextResult() is invoked,
+  // the getStatus() is already be invoked. So we always return true here.
   @Override
   public boolean hasNextResult() {
-    return false;
+    return true;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SampleConfigTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskResult.java
similarity index 54%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SampleConfigTask.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskResult.java
index 408b8cd448..7a8248b50e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SampleConfigTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskResult.java
@@ -19,29 +19,35 @@
 
 package org.apache.iotdb.db.mpp.execution.config;
 
-import org.apache.iotdb.db.mpp.sql.statement.Statement;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
+public class ConfigTaskResult {
+  private TSStatusCode statusCode;
+  private TsBlock resultSet;
 
-public class SampleConfigTask implements IConfigTask {
-
-  private Statement statement;
+  public ConfigTaskResult(TSStatusCode statusCode) {
+    this.statusCode = statusCode;
+  }
 
-  public SampleConfigTask(Statement statement) {
-    this.statement = statement;
+  public ConfigTaskResult(TSStatusCode statusCode, TsBlock resultSet) {
+    this.statusCode = statusCode;
+    this.resultSet = resultSet;
   }
 
-  @Override
-  public ListenableFuture<Void> execute() {
-    // Construct request using statement
+  public TSStatusCode getStatusCode() {
+    return statusCode;
+  }
 
-    // Send request to some API server
+  public void setStatusCode(TSStatusCode statusCode) {
+    this.statusCode = statusCode;
+  }
 
-    // Get response or throw exception
+  public TsBlock getResultSet() {
+    return resultSet;
+  }
 
-    // If the action is executed successfully, return the Future.
-    // If your operation is async, you can return the corresponding future directly.
-    return Futures.immediateVoidFuture();
+  public void setResultSet(TsBlock resultSet) {
+    this.resultSet = resultSet;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java
index fac0575fc3..a9c506d256 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java
@@ -22,5 +22,5 @@ package org.apache.iotdb.db.mpp.execution.config;
 import com.google.common.util.concurrent.ListenableFuture;
 
 public interface IConfigTask {
-  ListenableFuture<Void> execute() throws InterruptedException;
+  ListenableFuture<ConfigTaskResult> execute() throws InterruptedException;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SetStorageGroupTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SetStorageGroupTask.java
index cdfd6376cd..934d671e27 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SetStorageGroupTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SetStorageGroupTask.java
@@ -42,8 +42,8 @@ public class SetStorageGroupTask implements IConfigTask {
   }
 
   @Override
-  public ListenableFuture<Void> execute() {
-    SettableFuture<Void> future = SettableFuture.create();
+  public ListenableFuture<ConfigTaskResult> execute() {
+    SettableFuture<ConfigTaskResult> future = SettableFuture.create();
     // Construct request using statement
     TSetStorageGroupReq req =
         new TSetStorageGroupReq(setStorageGroupStatement.getStorageGroupPath().getFullPath());
@@ -61,7 +61,7 @@ public class SetStorageGroupTask implements IConfigTask {
             tsStatus);
         future.setException(new StatementExecutionException(tsStatus));
       } else {
-        future.set(null);
+        future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
       }
     } catch (IoTDBConnectionException | BadNodeUrlException e) {
       LOGGER.error("Failed to connect to config node.");
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
index 7ee608d7a0..9a1c36beb5 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
@@ -23,15 +23,20 @@ import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.config.ConfigExecution;
+import org.apache.iotdb.db.mpp.execution.config.ConfigTaskResult;
 import org.apache.iotdb.db.mpp.execution.config.IConfigTask;
 import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 
 import static com.google.common.util.concurrent.Futures.immediateFuture;
@@ -40,7 +45,7 @@ public class ConfigExecutionTest {
 
   @Test
   public void normalConfigTaskTest() {
-    IConfigTask task = () -> immediateFuture(null);
+    IConfigTask task = () -> immediateFuture(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
     ConfigExecution execution =
         new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
     execution.start();
@@ -48,6 +53,26 @@ public class ConfigExecutionTest {
     Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), result.status.code);
   }
 
+  @Test
+  public void normalConfigTaskWithResultTest() {
+    TsBlock tsBlock =
+        new TsBlock(
+            new TimeColumn(1, new long[] {0}),
+            new IntColumn(1, Optional.of(new boolean[] {false}), new int[] {1}));
+    IConfigTask task =
+        () -> immediateFuture(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, tsBlock));
+    ConfigExecution execution =
+        new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+    execution.start();
+    ExecutionResult result = execution.getStatus();
+    TsBlock tsBlockFromExecution = null;
+    if (execution.hasNextResult()) {
+      tsBlockFromExecution = execution.getBatchResult();
+    }
+    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), result.status.code);
+    Assert.assertEquals(tsBlock, tsBlockFromExecution);
+  }
+
   @Test
   public void exceptionConfigTaskTest() {
     IConfigTask task =
@@ -63,16 +88,16 @@ public class ConfigExecutionTest {
 
   @Test
   public void configTaskCancelledTest() throws InterruptedException {
-    SettableFuture<Void> taskResult = SettableFuture.create();
+    SettableFuture<ConfigTaskResult> taskResult = SettableFuture.create();
     class SimpleTask implements IConfigTask {
-      private final ListenableFuture<Void> result;
+      private final ListenableFuture<ConfigTaskResult> result;
 
-      public SimpleTask(ListenableFuture<Void> future) {
+      public SimpleTask(ListenableFuture<ConfigTaskResult> future) {
         this.result = future;
       }
 
       @Override
-      public ListenableFuture<Void> execute() throws InterruptedException {
+      public ListenableFuture<ConfigTaskResult> execute() throws InterruptedException {
         return result;
       }
     }
@@ -86,7 +111,7 @@ public class ConfigExecutionTest {
             () -> {
               ExecutionResult result = execution.getStatus();
               Assert.assertEquals(
-                  TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode(), result.status.code);
+                  TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), result.status.code);
             });
     resultThread.start();
     taskResult.cancel(true);
@@ -106,7 +131,7 @@ public class ConfigExecutionTest {
             () -> {
               ExecutionResult result = execution.getStatus();
               Assert.assertEquals(
-                  TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode(), result.status.code);
+                  TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), result.status.code);
             });
     resultThread.start();
     execution.start();