You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/12/01 02:01:25 UTC
[iotdb] 05/09: fix
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/FixIntoOperator1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 25bf877dbb03751c792772f1c56488b855578d32
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Nov 29 22:09:37 2022 +0800
fix
---
.../operator/process/AbstractIntoOperator.java | 62 ++++++++++------------
.../operator/process/DeviceViewIntoOperator.java | 7 ++-
.../execution/operator/process/IntoOperator.java | 7 ++-
3 files changed, 37 insertions(+), 39 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index 9ec9b0d5bc..6181dab228 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -146,48 +146,40 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
isBlocked = SettableFuture.create();
writeOperationFuture =
- writeOperationExecutor.submit(
- () -> {
- LOGGER.info("");
- return client.insertTablets(insertMultiTabletsStatement);
- });
-
+ writeOperationExecutor.submit(() -> client.insertTablets(insertMultiTabletsStatement));
writeOperationFuture.addListener(
- () -> {
- LOGGER.info("");
- ((SettableFuture<Void>) isBlocked).set(null);
- },
- writeOperationExecutor);
+ () -> ((SettableFuture<Void>) isBlocked).set(null), writeOperationExecutor);
}
- protected boolean handleFuture() {
- if (writeOperationFuture != null) {
- if (writeOperationFuture.isDone()) {
- try {
- TSStatus executionStatus = writeOperationFuture.get();
- if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
- && executionStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
- String message =
- String.format(
- "Error occurred while inserting tablets in SELECT INTO: %s",
- executionStatus.getMessage());
- throw new IntoProcessException(message);
- }
-
- for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
- generator.reset();
- }
+ protected boolean writeOperationDone() {
+ if (writeOperationFuture == null) {
+ return true;
+ }
- writeOperationFuture = null;
- return true;
- } catch (ExecutionException | InterruptedException e) {
- throw new IntoProcessException(e.getMessage());
- }
- } else {
+ try {
+ if (!writeOperationFuture.isDone()) {
return false;
}
+
+ TSStatus executionStatus = writeOperationFuture.get();
+ if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && executionStatus.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
+ String message =
+ String.format(
+ "Error occurred while inserting tablets in SELECT INTO: %s",
+ executionStatus.getMessage());
+ throw new IntoProcessException(message);
+ }
+
+ for (InsertTabletStatementGenerator generator : insertTabletStatementGenerators) {
+ generator.reset();
+ }
+
+ writeOperationFuture = null;
+ return true;
+ } catch (ExecutionException | InterruptedException e) {
+ throw new IntoProcessException(e.getMessage());
}
- return true;
}
private boolean existFullStatement(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
index 7ef65c9e16..e36bb49168 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java
@@ -76,7 +76,7 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
@Override
public TsBlock next() {
- if (!handleFuture()) {
+ if (!writeOperationDone()) {
return null;
}
@@ -86,7 +86,10 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator {
cachedTsBlock = null;
if (child.hasNext()) {
- processTsBlock(child.next());
+ TsBlock inputTsBlock = child.next();
+ processTsBlock(inputTsBlock);
+
+ // call child.next only once
return null;
} else {
InsertMultiTabletsStatement insertMultiTabletsStatement =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
index 9e5c46e9fa..67a4fb0b0e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/IntoOperator.java
@@ -60,7 +60,7 @@ public class IntoOperator extends AbstractIntoOperator {
@Override
public TsBlock next() {
- if (!handleFuture()) {
+ if (!writeOperationDone()) {
return null;
}
@@ -70,7 +70,10 @@ public class IntoOperator extends AbstractIntoOperator {
cachedTsBlock = null;
if (child.hasNext()) {
- processTsBlock(child.next());
+ TsBlock inputTsBlock = child.next();
+ processTsBlock(inputTsBlock);
+
+ // call child.next only once
return null;
} else {
if (insertMultiTabletsInternally(false)) {