You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/20 08:51:34 UTC
[iotdb] 01/01: add TsBlock as the returned result of IConfigTask
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch xingtanzjr/config-exe-result
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7de6a157ba19203d6715c4885b8bcc184571b960
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed Apr 20 16:51:18 2022 +0800
add TsBlock as the returned result of IConfigTask
---
.../mpp/execution/config/AuthorizerConfigTask.java | 6 ++--
.../db/mpp/execution/config/ConfigExecution.java | 33 +++++++++---------
...SampleConfigTask.java => ConfigTaskResult.java} | 38 ++++++++++++---------
.../iotdb/db/mpp/execution/config/IConfigTask.java | 2 +-
.../mpp/execution/config/SetStorageGroupTask.java | 6 ++--
.../db/mpp/execution/ConfigExecutionTest.java | 39 ++++++++++++++++++----
6 files changed, 79 insertions(+), 45 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/AuthorizerConfigTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/AuthorizerConfigTask.java
index 9b27ad3b9d..8cc891bf38 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/AuthorizerConfigTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/AuthorizerConfigTask.java
@@ -48,8 +48,8 @@ public class AuthorizerConfigTask implements IConfigTask {
}
@Override
- public ListenableFuture<Void> execute() {
- SettableFuture<Void> future = SettableFuture.create();
+ public ListenableFuture<ConfigTaskResult> execute() {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
ConfigNodeClient configNodeClient = null;
try {
// Construct request using statement
@@ -75,7 +75,7 @@ public class AuthorizerConfigTask implements IConfigTask {
tsStatus);
future.setException(new StatementExecutionException(tsStatus));
} else {
- future.set(null);
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (IoTDBConnectionException | BadNodeUrlException e) {
LOGGER.error("Failed to connect to config node.");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
index fe63991dfc..b13d3006c6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
@@ -47,7 +47,8 @@ public class ConfigExecution implements IQueryExecution {
private final ExecutorService executor;
private final QueryStateMachine stateMachine;
- private final SettableFuture<Boolean> result;
+ private final SettableFuture<ConfigTaskResult> taskFuture;
+ private TsBlock resultSet;
private final IConfigTask task;
@@ -56,7 +57,7 @@ public class ConfigExecution implements IQueryExecution {
this.statement = statement;
this.executor = executor;
this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
- this.result = SettableFuture.create();
+ this.taskFuture = SettableFuture.create();
this.task = statement.accept(new ConfigTaskVisitor(), new ConfigTaskVisitor.TaskContext());
}
@@ -67,21 +68,21 @@ public class ConfigExecution implements IQueryExecution {
this.statement = statement;
this.executor = executor;
this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
- this.result = SettableFuture.create();
+ this.taskFuture = SettableFuture.create();
this.task = task;
}
@Override
public void start() {
try {
- ListenableFuture<Void> future = task.execute();
+ ListenableFuture<ConfigTaskResult> future = task.execute();
Futures.addCallback(
future,
- new FutureCallback<Void>() {
+ new FutureCallback<ConfigTaskResult>() {
@Override
- public void onSuccess(Void success) {
+ public void onSuccess(ConfigTaskResult taskRet) {
stateMachine.transitionToFinished();
- result.set(true);
+ taskFuture.set(taskRet);
}
@Override
@@ -98,7 +99,7 @@ public class ConfigExecution implements IQueryExecution {
public void fail(Throwable cause) {
stateMachine.transitionToFailed(cause);
- result.set(false);
+ taskFuture.set(new ConfigTaskResult(TSStatusCode.INTERNAL_SERVER_ERROR));
}
@Override
@@ -110,10 +111,11 @@ public class ConfigExecution implements IQueryExecution {
@Override
public ExecutionResult getStatus() {
try {
- Boolean success = result.get();
- TSStatusCode statusCode =
- success ? TSStatusCode.SUCCESS_STATUS : TSStatusCode.QUERY_PROCESS_ERROR;
- String message = success ? "" : stateMachine.getFailureMessage();
+ ConfigTaskResult taskResult = taskFuture.get();
+ TSStatusCode statusCode = taskResult.getStatusCode();
+ resultSet = taskResult.getResultSet();
+ String message =
+ statusCode == TSStatusCode.SUCCESS_STATUS ? "" : stateMachine.getFailureMessage();
return new ExecutionResult(context.getQueryId(), RpcUtils.getStatus(statusCode, message));
} catch (InterruptedException | ExecutionException e) {
Thread.currentThread().interrupt();
@@ -125,13 +127,14 @@ public class ConfigExecution implements IQueryExecution {
@Override
public TsBlock getBatchResult() {
- // TODO
- return null;
+ return resultSet;
}
+ // According to the execution process of ConfigExecution. When the hasNextResult() is invoked,
+ // the getStatus() is already be invoked. So we always return true here.
@Override
public boolean hasNextResult() {
- return false;
+ return true;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SampleConfigTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskResult.java
similarity index 54%
rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SampleConfigTask.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskResult.java
index 408b8cd448..7a8248b50e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SampleConfigTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskResult.java
@@ -19,29 +19,35 @@
package org.apache.iotdb.db.mpp.execution.config;
-import org.apache.iotdb.db.mpp.sql.statement.Statement;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
+public class ConfigTaskResult {
+ private TSStatusCode statusCode;
+ private TsBlock resultSet;
-public class SampleConfigTask implements IConfigTask {
-
- private Statement statement;
+ public ConfigTaskResult(TSStatusCode statusCode) {
+ this.statusCode = statusCode;
+ }
- public SampleConfigTask(Statement statement) {
- this.statement = statement;
+ public ConfigTaskResult(TSStatusCode statusCode, TsBlock resultSet) {
+ this.statusCode = statusCode;
+ this.resultSet = resultSet;
}
- @Override
- public ListenableFuture<Void> execute() {
- // Construct request using statement
+ public TSStatusCode getStatusCode() {
+ return statusCode;
+ }
- // Send request to some API server
+ public void setStatusCode(TSStatusCode statusCode) {
+ this.statusCode = statusCode;
+ }
- // Get response or throw exception
+ public TsBlock getResultSet() {
+ return resultSet;
+ }
- // If the action is executed successfully, return the Future.
- // If your operation is async, you can return the corresponding future directly.
- return Futures.immediateVoidFuture();
+ public void setResultSet(TsBlock resultSet) {
+ this.resultSet = resultSet;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java
index fac0575fc3..a9c506d256 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/IConfigTask.java
@@ -22,5 +22,5 @@ package org.apache.iotdb.db.mpp.execution.config;
import com.google.common.util.concurrent.ListenableFuture;
public interface IConfigTask {
- ListenableFuture<Void> execute() throws InterruptedException;
+ ListenableFuture<ConfigTaskResult> execute() throws InterruptedException;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SetStorageGroupTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SetStorageGroupTask.java
index cdfd6376cd..934d671e27 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SetStorageGroupTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SetStorageGroupTask.java
@@ -42,8 +42,8 @@ public class SetStorageGroupTask implements IConfigTask {
}
@Override
- public ListenableFuture<Void> execute() {
- SettableFuture<Void> future = SettableFuture.create();
+ public ListenableFuture<ConfigTaskResult> execute() {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
// Construct request using statement
TSetStorageGroupReq req =
new TSetStorageGroupReq(setStorageGroupStatement.getStorageGroupPath().getFullPath());
@@ -61,7 +61,7 @@ public class SetStorageGroupTask implements IConfigTask {
tsStatus);
future.setException(new StatementExecutionException(tsStatus));
} else {
- future.set(null);
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
} catch (IoTDBConnectionException | BadNodeUrlException e) {
LOGGER.error("Failed to connect to config node.");
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
index 7ee608d7a0..9a1c36beb5 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
@@ -23,15 +23,20 @@ import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.execution.config.ConfigExecution;
+import org.apache.iotdb.db.mpp.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.mpp.execution.config.IConfigTask;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.junit.Assert;
import org.junit.Test;
+import java.util.Optional;
import java.util.concurrent.ExecutorService;
import static com.google.common.util.concurrent.Futures.immediateFuture;
@@ -40,7 +45,7 @@ public class ConfigExecutionTest {
@Test
public void normalConfigTaskTest() {
- IConfigTask task = () -> immediateFuture(null);
+ IConfigTask task = () -> immediateFuture(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
ConfigExecution execution =
new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
execution.start();
@@ -48,6 +53,26 @@ public class ConfigExecutionTest {
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), result.status.code);
}
+ @Test
+ public void normalConfigTaskWithResultTest() {
+ TsBlock tsBlock =
+ new TsBlock(
+ new TimeColumn(1, new long[] {0}),
+ new IntColumn(1, Optional.of(new boolean[] {false}), new int[] {1}));
+ IConfigTask task =
+ () -> immediateFuture(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, tsBlock));
+ ConfigExecution execution =
+ new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
+ execution.start();
+ ExecutionResult result = execution.getStatus();
+ TsBlock tsBlockFromExecution = null;
+ if (execution.hasNextResult()) {
+ tsBlockFromExecution = execution.getBatchResult();
+ }
+ Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), result.status.code);
+ Assert.assertEquals(tsBlock, tsBlockFromExecution);
+ }
+
@Test
public void exceptionConfigTaskTest() {
IConfigTask task =
@@ -63,16 +88,16 @@ public class ConfigExecutionTest {
@Test
public void configTaskCancelledTest() throws InterruptedException {
- SettableFuture<Void> taskResult = SettableFuture.create();
+ SettableFuture<ConfigTaskResult> taskResult = SettableFuture.create();
class SimpleTask implements IConfigTask {
- private final ListenableFuture<Void> result;
+ private final ListenableFuture<ConfigTaskResult> result;
- public SimpleTask(ListenableFuture<Void> future) {
+ public SimpleTask(ListenableFuture<ConfigTaskResult> future) {
this.result = future;
}
@Override
- public ListenableFuture<Void> execute() throws InterruptedException {
+ public ListenableFuture<ConfigTaskResult> execute() throws InterruptedException {
return result;
}
}
@@ -86,7 +111,7 @@ public class ConfigExecutionTest {
() -> {
ExecutionResult result = execution.getStatus();
Assert.assertEquals(
- TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode(), result.status.code);
+ TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), result.status.code);
});
resultThread.start();
taskResult.cancel(true);
@@ -106,7 +131,7 @@ public class ConfigExecutionTest {
() -> {
ExecutionResult result = execution.getStatus();
Assert.assertEquals(
- TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode(), result.status.code);
+ TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), result.status.code);
});
resultThread.start();
execution.start();