You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/20 08:51:33 UTC

[iotdb] branch xingtanzjr/config-exe-result created (now 7de6a157ba)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a change to branch xingtanzjr/config-exe-result
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 7de6a157ba add TsBlock as the returned result of IConfigTask

This branch includes the following new commits:

     new 7de6a157ba add TsBlock as the returned result of IConfigTask

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: add TsBlock as the returned result of IConfigTask

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/config-exe-result
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7de6a157ba19203d6715c4885b8bcc184571b960
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Apr 20 16:51:18 2022 +0800

    add TsBlock as the returned result of IConfigTask
---
 .../mpp/execution/config/AuthorizerConfigTask.java |  6 ++--
 .../db/mpp/execution/config/ConfigExecution.java   | 33 +++++++++---------
 ...SampleConfigTask.java => ConfigTaskResult.java} | 38 ++++++++++++---------
 .../iotdb/db/mpp/execution/config/IConfigTask.java |  2 +-
 .../mpp/execution/config/SetStorageGroupTask.java  |  6 ++--
 .../db/mpp/execution/ConfigExecutionTest.java      | 39 ++++++++++++++++++----
 6 files changed, 79 insertions(+), 45 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/AuthorizerConfigTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/AuthorizerConfigTask.java
index 9b27ad3b9d..8cc891bf38 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/AuthorizerConfigTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/AuthorizerConfigTask.java
@@ -48,8 +48,8 @@ public class AuthorizerConfigTask implements IConfigTask {
   }
 
   @Override
-  public ListenableFuture<Void> execute() {
-    SettableFuture<Void> future = SettableFuture.create();
+  public ListenableFuture<ConfigTaskResult> execute() {
+    SettableFuture<ConfigTaskResult> future = SettableFuture.create();
     ConfigNodeClient configNodeClient = null;
     try {
       // Construct request using statement
@@ -75,7 +75,7 @@ public class AuthorizerConfigTask implements IConfigTask {
             tsStatus);
         future.setException(new StatementExecutionException(tsStatus));
       } else {
-        future.set(null);
+        future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
       }
     } catch (IoTDBConnectionException | BadNodeUrlException e) {
       LOGGER.error("Failed to connect to config node.");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
index fe63991dfc..b13d3006c6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
@@ -47,7 +47,8 @@ public class ConfigExecution implements IQueryExecution {
   private final ExecutorService executor;
 
   private final QueryStateMachine stateMachine;
-  private final SettableFuture<Boolean> result;
+  private final SettableFuture<ConfigTaskResult> taskFuture;
+  private TsBlock resultSet;
 
   private final IConfigTask task;
 
@@ -56,7 +57,7 @@ public class ConfigExecution implements IQueryExecution {
     this.statement = statement;
     this.executor = executor;
     this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
-    this.result = SettableFuture.create();
+    this.taskFuture = SettableFuture.create();
     this.task = statement.accept(new ConfigTaskVisitor(), new ConfigTaskVisitor.TaskContext());
   }
 
@@ -67,21 +68,21 @@ public class ConfigExecution implements IQueryExecution {
     this.statement = statement;
     this.executor = executor;
     this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
-    this.result = SettableFuture.create();
+    this.taskFuture = SettableFuture.create();
     this.task = task;
   }
 
   @Override
   public void start() {
     try {
-      ListenableFuture<Void> future = task.execute();
+      ListenableFuture<ConfigTaskResult> future = task.execute();
       Futures.addCallback(
           future,
-          new FutureCallback<Void>() {
+          new FutureCallback<ConfigTaskResult>() {
             @Override
-            public void onSuccess(Void success) {
+            public void onSuccess(ConfigTaskResult taskRet) {
               stateMachine.transitionToFinished();
-              result.set(true);
+              taskFuture.set(taskRet);
             }
 
             @Override
@@ -98,7 +99,7 @@ public class ConfigExecution implements IQueryExecution {
 
   public void fail(Throwable cause) {
     stateMachine.transitionToFailed(cause);
-    result.set(false);
+    taskFuture.set(new ConfigTaskResult(TSStatusCode.INTERNAL_SERVER_ERROR));
   }
 
   @Override
@@ -110,10 +111,11 @@ public class ConfigExecution implements IQueryExecution {
   @Override
   public ExecutionResult getStatus() {
     try {
-      Boolean success = result.get();
-      TSStatusCode statusCode =
-          success ? TSStatusCode.SUCCESS_STATUS : TSStatusCode.QUERY_PROCESS_ERROR;
-      String message = success ? "" : stateMachine.getFailureMessage();
+      ConfigTaskResult taskResult = taskFuture.get();
+      TSStatusCode statusCode = taskResult.getStatusCode();
+      resultSet = taskResult.getResultSet();
+      String message =
+          statusCode == TSStatusCode.SUCCESS_STATUS ? "" : stateMachine.getFailureMessage();
       return new ExecutionResult(context.getQueryId(), RpcUtils.getStatus(statusCode, message));
     } catch (InterruptedException | ExecutionException e) {
       Thread.currentThread().interrupt();
@@ -125,13 +127,14 @@ public class ConfigExecution implements IQueryExecution {
 
   @Override
   public TsBlock getBatchResult() {
-    // TODO
-    return null;
+    return resultSet;
   }
 
+  // According to the execution process of ConfigExecution. When the hasNextResult() is invoked,
+  // the getStatus() is already be invoked. So we always return true here.
   @Override
   public boolean hasNextResult() {
-    return false;
+    return true;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SampleConfigTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskResult.java
similarity index 54%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SampleConfigTask.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskResult.java
index 408b8cd448..7a8248b50e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SampleConfigTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskResult.java
@@ -19,29 +19,35 @@
 
 package org.apache.iotdb.db.mpp.execution.config;
 
-import org.apache.iotdb.db.mpp.sql.statement.Statement;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
+public class ConfigTaskResult {
+  private TSStatusCode statusCode;
+  private TsBlock resultSet;
 
-public class SampleConfigTask implements IConfigTask {
-
-  private Statement statement;
+  public ConfigTaskResult(TSStatusCode statusCode) {
+    this.statusCode = statusCode;
+  }
 
-  public SampleConfigTask(Statement statement) {
-    this.statement = statement;
+  public ConfigTaskResult(TSStatusCode statusCode, TsBlock resultSet) {
+    this.statusCode = statusCode;
+    this.resultSet = resultSet;
   }
 
-  @Override
-  public ListenableFuture<Void> execute() {
-    // Construct request using statement
+  public TSStatusCode getStatusCode() {
+    return statusCode;
+  }
 
-    // Send request to some API server
+  public void setStatusCode(TSStatusCode statusCode) {
+    this.statusCode = statusCode;
+  }
 
-    // Get response or throw exception
+  public TsBlock getResultSet() {
+    return resultSet;
+  }
 
-    // If the action is executed successfully, return the Future.
-    // If your operation is async, you can return the corresponding future directly.
-    return Futures.immediateVoidFuture();
+  public void setResultSet(TsBlock resultSet) {
+    this.resultSet = resultSet;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java
index fac0575fc3..a9c506d256 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java
@@ -22,5 +22,5 @@ package org.apache.iotdb.db.mpp.execution.config;
 import com.google.common.util.concurrent.ListenableFuture;
 
 public interface IConfigTask {
-  ListenableFuture<Void> execute() throws InterruptedException;
+  ListenableFuture<ConfigTaskResult> execute() throws InterruptedException;
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SetStorageGroupTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SetStorageGroupTask.java
index cdfd6376cd..934d671e27 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SetStorageGroupTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SetStorageGroupTask.java
@@ -42,8 +42,8 @@ public class SetStorageGroupTask implements IConfigTask {
   }
 
   @Override
-  public ListenableFuture<Void> execute() {
-    SettableFuture<Void> future = SettableFuture.create();
+  public ListenableFuture<ConfigTaskResult> execute() {
+    SettableFuture<ConfigTaskResult> future = SettableFuture.create();
     // Construct request using statement
     TSetStorageGroupReq req =
         new TSetStorageGroupReq(setStorageGroupStatement.getStorageGroupPath().getFullPath());
@@ -61,7 +61,7 @@ public class SetStorageGroupTask implements IConfigTask {
             tsStatus);
         future.setException(new StatementExecutionException(tsStatus));
       } else {
-        future.set(null);
+        future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
       }
     } catch (IoTDBConnectionException | BadNodeUrlException e) {
       LOGGER.error("Failed to connect to config node.");
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
index 7ee608d7a0..9a1c36beb5 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
@@ -23,15 +23,20 @@ import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.execution.config.ConfigExecution;
+import org.apache.iotdb.db.mpp.execution.config.ConfigTaskResult;
 import org.apache.iotdb.db.mpp.execution.config.IConfigTask;
 import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 
 import static com.google.common.util.concurrent.Futures.immediateFuture;
@@ -40,7 +45,7 @@ public class ConfigExecutionTest {
 
   @Test
   public void normalConfigTaskTest() {
-    IConfigTask task = () -> immediateFuture(null);
+    IConfigTask task = () -> immediateFuture(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
     ConfigExecution execution =
         new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
     execution.start();
@@ -48,6 +53,26 @@ public class ConfigExecutionTest {
     Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), result.status.code);
   }
 
+  @Test
+  public void normalConfigTaskWithResultTest() {
+    TsBlock tsBlock =
+        new TsBlock(
+            new TimeColumn(1, new long[] {0}),
+            new IntColumn(1, Optional.of(new boolean[] {false}), new int[] {1}));
+    IConfigTask task =
+        () -> immediateFuture(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, tsBlock));
+    ConfigExecution execution =
+        new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+    execution.start();
+    ExecutionResult result = execution.getStatus();
+    TsBlock tsBlockFromExecution = null;
+    if (execution.hasNextResult()) {
+      tsBlockFromExecution = execution.getBatchResult();
+    }
+    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), result.status.code);
+    Assert.assertEquals(tsBlock, tsBlockFromExecution);
+  }
+
   @Test
   public void exceptionConfigTaskTest() {
     IConfigTask task =
@@ -63,16 +88,16 @@ public class ConfigExecutionTest {
 
   @Test
   public void configTaskCancelledTest() throws InterruptedException {
-    SettableFuture<Void> taskResult = SettableFuture.create();
+    SettableFuture<ConfigTaskResult> taskResult = SettableFuture.create();
     class SimpleTask implements IConfigTask {
-      private final ListenableFuture<Void> result;
+      private final ListenableFuture<ConfigTaskResult> result;
 
-      public SimpleTask(ListenableFuture<Void> future) {
+      public SimpleTask(ListenableFuture<ConfigTaskResult> future) {
         this.result = future;
       }
 
       @Override
-      public ListenableFuture<Void> execute() throws InterruptedException {
+      public ListenableFuture<ConfigTaskResult> execute() throws InterruptedException {
         return result;
       }
     }
@@ -86,7 +111,7 @@ public class ConfigExecutionTest {
             () -> {
               ExecutionResult result = execution.getStatus();
               Assert.assertEquals(
-                  TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode(), result.status.code);
+                  TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), result.status.code);
             });
     resultThread.start();
     taskResult.cancel(true);
@@ -106,7 +131,7 @@ public class ConfigExecutionTest {
             () -> {
               ExecutionResult result = execution.getStatus();
               Assert.assertEquals(
-                  TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode(), result.status.code);
+                  TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), result.status.code);
             });
     resultThread.start();
     execution.start();