You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/11/17 06:36:50 UTC
[iotdb] 01/01: Make select into error more user-friendly
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch SelectIntoException
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a9f5c85e7457ce336fec8548a2c54b62e37fac4b
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu Nov 17 14:35:58 2022 +0800
Make select into error more user-friendly
---
.../iotdb/db/it/selectinto/IoTDBSelectIntoIT.java | 10 +++++---
.../schemaregion/rocksdb/RSchemaRegion.java | 3 +--
.../metadata/tagSchemaRegion/TagSchemaRegion.java | 2 +-
.../db/metadata/mtree/MTreeBelowSGCachedImpl.java | 2 +-
.../db/metadata/mtree/MTreeBelowSGMemoryImpl.java | 2 +-
.../iotdb/db/mpp/execution/QueryStateMachine.java | 4 +++
.../db/mpp/execution/exchange/ISinkHandle.java | 8 ++++++
.../db/mpp/execution/exchange/LocalSinkHandle.java | 16 ++++++++++++
.../mpp/execution/exchange/SharedTsBlockQueue.java | 23 +++++++++++++++--
.../db/mpp/execution/exchange/SinkHandle.java | 5 ++++
.../fragment/FragmentInstanceExecution.java | 7 ++++-
.../operator/process/AbstractIntoOperator.java | 1 -
.../execution/schedule/AbstractDriverThread.java | 2 +-
.../db/mpp/plan/analyze/ClusterSchemaFetcher.java | 11 +++-----
.../db/mpp/plan/execution/QueryExecution.java | 30 +++++++++++++---------
.../scheduler/FragmentInstanceDispatcherImpl.java | 2 +-
.../db/mpp/execution/exchange/StubSinkHandle.java | 5 ++++
17 files changed, 99 insertions(+), 34 deletions(-)
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
index fd159a3bed..8f36bd1023 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.itbase.category.LocalStandaloneIT;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -469,7 +468,7 @@ public class IoTDBSelectIntoIT {
executeNonQuery("CREATE TIMESERIES root.sg_error_bk1.new_d.t1 TEXT;");
assertTestFail(
"select s1, s2 into root.sg_error_bk1.new_d(t1, t2, t3, t4) from root.sg.*;",
- "Task was cancelled.");
+ "Fail to insert measurements [t1] caused by [data type of root.sg_error_bk1.new_d.t1 is not consistent, registered type TEXT, inserting type INT32, timestamp 1, value 1]");
}
@Test
@@ -503,7 +502,6 @@ public class IoTDBSelectIntoIT {
}
@Test
- @Ignore // TODO remove @Ignore after fix error message inconsistent
public void testPermission2() throws SQLException {
try (Connection adminCon = EnvFactory.getEnv().getConnection();
Statement adminStmt = adminCon.createStatement()) {
@@ -516,7 +514,11 @@ public class IoTDBSelectIntoIT {
"select s1, s2 into root.sg_bk.new_d(t1, t2, t3, t4) from root.sg.*;");
fail("No exception!");
} catch (SQLException e) {
- Assert.assertTrue(e.getMessage(), e.getMessage().contains("Task was cancelled."));
+ Assert.assertTrue(
+ e.getMessage(),
+ e.getMessage()
+ .contains(
+ "No permissions for this operation, please add privilege INSERT_TIMESERIES"));
}
}
}
diff --git a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
index e18a0e5ace..d327b6d2da 100644
--- a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
+++ b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
@@ -367,8 +367,7 @@ public class RSchemaRegion implements ISchemaRegion {
} else if (checkResult.getResult(RMNodeType.ENTITY)) {
if ((checkResult.getValue()[1] & FLAG_IS_ALIGNED) != 0) {
throw new AlignedTimeseriesException(
- "Timeseries under this entity is aligned, please use createAlignedTimeseries"
- + " or change entity.",
+ "timeseries under this entity is aligned, please use createAlignedTimeseries or change entity.",
RSchemaUtils.getPathByLevelPath(levelPath));
}
} else {
diff --git a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
index 174f989e86..df3d187d10 100644
--- a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
+++ b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
@@ -278,7 +278,7 @@ public class TagSchemaRegion implements ISchemaRegion {
if (deviceEntry != null) {
if (!deviceEntry.isAligned()) {
throw new AlignedTimeseriesException(
- "Timeseries under this entity is aligned, please use createAlignedTimeseries or change entity.",
+ "timeseries under this entity is aligned, please use createAlignedTimeseries or change entity.",
devicePath.getFullPath());
} else {
filterExistingMeasurements(plan, deviceEntry.getMeasurementMap().keySet());
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
index 4b1e8b94ad..84f488a987 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
@@ -267,7 +267,7 @@ public class MTreeBelowSGCachedImpl implements IMTreeBelowSG {
if (device.isEntity() && device.getAsEntityMNode().isAligned()) {
throw new AlignedTimeseriesException(
- "Timeseries under this entity is aligned, please use createAlignedTimeseries or change entity.",
+ "timeseries under this entity is aligned, please use createAlignedTimeseries or change entity.",
device.getFullPath());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
index bb1e1d1ef1..28c039c997 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
@@ -246,7 +246,7 @@ public class MTreeBelowSGMemoryImpl implements IMTreeBelowSG {
if (device.isEntity() && device.getAsEntityMNode().isAligned()) {
throw new AlignedTimeseriesException(
- "Timeseries under this entity is aligned, please use createAlignedTimeseries or change entity.",
+ "timeseries under this entity is aligned, please use createAlignedTimeseries or change entity.",
device.getFullPath());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index d5aaa093fb..fcc630e8bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -142,6 +142,10 @@ public class QueryStateMachine {
return "no detailed failure reason in QueryStateMachine";
}
+ public Throwable getFailureException() {
+ return failureException;
+ }
+
public TSStatus getFailureStatus() {
return failureStatus;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
index 406ea5d625..50e342621f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
@@ -73,6 +73,14 @@ public interface ISinkHandle {
*/
void abort();
+ /**
+ * Abort the sink handle. Discard all tsblocks which may still be in the memory buffer and cancel
+ * the future returned by {@link #isFull()}.
+ *
+ * <p>Should only be called in abnormal case
+ */
+ void abort(Throwable throwable);
+
/**
* Close the sink handle. Discard all tsblocks which may still be in the memory buffer and
* complete the future returned by {@link #isFull()}.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
index bcf469b38d..aa3aeb1d2a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
@@ -157,6 +157,22 @@ public class LocalSinkHandle implements ISinkHandle {
logger.debug("[EndAbortLocalSinkHandle]");
}
+ @Override
+ public void abort(Throwable throwable) {
+ logger.debug("[StartAbortLocalSinkHandle]");
+ synchronized (queue) {
+ synchronized (this) {
+ if (aborted || closed) {
+ return;
+ }
+ aborted = true;
+ queue.abort(throwable);
+ sinkHandleListener.onAborted(this);
+ }
+ }
+ logger.debug("[EndAbortLocalSinkHandle]");
+ }
+
@Override
public void close() {
logger.debug("[StartCloseLocalSinkHandle]");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index b42a370780..a593bf5c91 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -196,8 +196,6 @@ public class SharedTsBlockQueue {
}
}
- // TODO add Throwable t as a parameter of this method, and then call blocked.setException(t);
- // instead of blocked.cancel(true);
/** Destroy the queue and cancel the future. Should only be called in abnormal case */
public void abort() {
if (closed) {
@@ -218,4 +216,25 @@ public class SharedTsBlockQueue {
bufferRetainedSizeInBytes = 0;
}
}
+
+ /** Destroy the queue and cancel the future. Should only be called in abnormal case */
+ public void abort(Throwable t) {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ if (!blocked.isDone()) {
+ blocked.setException(t);
+ }
+ if (blockedOnMemory != null) {
+ bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blockedOnMemory);
+ }
+ queue.clear();
+ if (bufferRetainedSizeInBytes > 0L) {
+ localMemoryManager
+ .getQueryPool()
+ .free(localFragmentInstanceId.getQueryId(), bufferRetainedSizeInBytes);
+ bufferRetainedSizeInBytes = 0;
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
index 5c37286c78..cf8e26be56 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
@@ -195,6 +195,11 @@ public class SinkHandle implements ISinkHandle {
logger.debug("[EndAbortSinkHandle]");
}
+ @Override
+ public void abort(Throwable throwable) {
+ abort();
+ }
+
@Override
public synchronized void close() {
logger.debug("[StartCloseSinkHandle]");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index a71ebd0dab..a271f05a91 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -116,7 +116,12 @@ public class FragmentInstanceExecution {
}
if (newState.isFailed()) {
- sinkHandle.abort();
+ Throwable throwable = stateMachine.getFailureCauses().peek();
+ if (throwable == null) {
+ sinkHandle.abort();
+ } else {
+ sinkHandle.abort(throwable);
+ }
} else {
sinkHandle.close();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index 4b1b57dfa3..7b0fa693fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -120,7 +120,6 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
String.format(
"Error occurred while inserting tablets in SELECT INTO: %s",
executionStatus.getMessage());
- LOGGER.error(message);
throw new IntoProcessException(message);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
index ae4e997a7a..4a4101da85 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
@@ -77,7 +77,7 @@ public abstract class AbstractDriverThread extends Thread implements Closeable {
// reset the thread name here
try (SetThreadName fragmentInstanceName =
new SetThreadName(next.getFragmentInstance().getInfo().getFullId())) {
- logger.error("[ExecuteFailed]", t);
+ logger.warn("[ExecuteFailed]", t);
next.setAbortCause(FragmentInstanceAbortedException.BY_INTERNAL_ERROR_SCHEDULED);
scheduler.toAborted(next);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index 6a683fc4ce..7eb8b57d0d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
import org.apache.iotdb.db.metadata.template.ITemplateManager;
@@ -61,6 +62,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -490,7 +492,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
new IoTDBException(executionResult.status.getMessage(), statusCode));
}
- List<String> failedCreationList = new ArrayList<>();
+ Set<String> failedCreationList = new HashSet<>();
List<MeasurementPath> alreadyExistingMeasurements = new ArrayList<>();
for (TSStatus subStatus : executionResult.status.subStatus) {
if (subStatus.code == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
@@ -502,12 +504,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
}
if (!failedCreationList.isEmpty()) {
- StringBuilder stringBuilder = new StringBuilder();
- for (String message : failedCreationList) {
- stringBuilder.append(message).append("\n");
- }
- throw new RuntimeException(
- new MetadataException(String.format("Failed to auto create schema\n %s", stringBuilder)));
+ throw new SemanticException(new MetadataException(String.join(";, ", failedCreationList)));
}
return alreadyExistingMeasurements;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 8c48c6a574..61296aec64 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -388,25 +388,31 @@ public class QueryExecution implements IQueryExecution {
return Optional.empty();
}
} catch (ExecutionException | CancellationException e) {
- stateMachine.transitionToFailed(e.getCause() != null ? e.getCause() : e);
- if (stateMachine.getFailureStatus() != null) {
- throw new IoTDBException(
- stateMachine.getFailureStatus().getMessage(), stateMachine.getFailureStatus().code);
- }
- Throwable t = e.getCause() == null ? e : e.getCause();
- throwIfUnchecked(t);
- throw new IoTDBException(t, TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
+ dealWithException(e.getCause() != null ? e.getCause() : e);
} catch (InterruptedException e) {
- stateMachine.transitionToFailed(e);
Thread.currentThread().interrupt();
- throw new IoTDBException(e, TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
+ dealWithException(e);
} catch (Throwable t) {
- stateMachine.transitionToFailed(t);
- throw t;
+ dealWithException(t);
}
}
}
+ private void dealWithException(Throwable t) throws IoTDBException {
+ stateMachine.transitionToFailed(t);
+ if (stateMachine.getFailureStatus() != null) {
+ throw new IoTDBException(
+ stateMachine.getFailureStatus().getMessage(), stateMachine.getFailureStatus().code);
+ } else if (stateMachine.getFailureException() != null) {
+ Throwable rootCause = stateMachine.getFailureException();
+ throwIfUnchecked(rootCause);
+ throw new IoTDBException(rootCause, TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
+ } else {
+ throwIfUnchecked(t);
+ throw new IoTDBException(t, TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
+ }
+ }
+
@Override
public Optional<TsBlock> getBatchResult() throws IoTDBException {
return getResult(this::getDeserializedTsBlock);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 6c8bca7a51..af1a2fec89 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -193,7 +193,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
logger.error("can't connect to node {}", endPoint, e);
TSStatus status = new TSStatus();
status.setCode(TSStatusCode.SYNC_CONNECTION_ERROR.getStatusCode());
- status.setMessage("can't connect to node {}" + endPoint);
+ status.setMessage("can't connect to node " + endPoint);
// If the DataNode cannot be connected, its endPoint will be put into black list
// so that the following retry will avoid dispatching instance towards this DataNode.
queryContext.addFailedEndPoint(endPoint);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
index 2930abe48b..a2094a5127 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
@@ -93,6 +93,11 @@ public class StubSinkHandle implements ISinkHandle {
tsBlocks.clear();
}
+ @Override
+ public void abort(Throwable throwable) {
+ abort();
+ }
+
@Override
public void close() {
closed = true;