You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/11/17 06:36:50 UTC

[iotdb] 01/01: Make select into error more user-friendly

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch SelectIntoException
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a9f5c85e7457ce336fec8548a2c54b62e37fac4b
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu Nov 17 14:35:58 2022 +0800

    Make select into error more user-friendly
---
 .../iotdb/db/it/selectinto/IoTDBSelectIntoIT.java  | 10 +++++---
 .../schemaregion/rocksdb/RSchemaRegion.java        |  3 +--
 .../metadata/tagSchemaRegion/TagSchemaRegion.java  |  2 +-
 .../db/metadata/mtree/MTreeBelowSGCachedImpl.java  |  2 +-
 .../db/metadata/mtree/MTreeBelowSGMemoryImpl.java  |  2 +-
 .../iotdb/db/mpp/execution/QueryStateMachine.java  |  4 +++
 .../db/mpp/execution/exchange/ISinkHandle.java     |  8 ++++++
 .../db/mpp/execution/exchange/LocalSinkHandle.java | 16 ++++++++++++
 .../mpp/execution/exchange/SharedTsBlockQueue.java | 23 +++++++++++++++--
 .../db/mpp/execution/exchange/SinkHandle.java      |  5 ++++
 .../fragment/FragmentInstanceExecution.java        |  7 ++++-
 .../operator/process/AbstractIntoOperator.java     |  1 -
 .../execution/schedule/AbstractDriverThread.java   |  2 +-
 .../db/mpp/plan/analyze/ClusterSchemaFetcher.java  | 11 +++-----
 .../db/mpp/plan/execution/QueryExecution.java      | 30 +++++++++++++---------
 .../scheduler/FragmentInstanceDispatcherImpl.java  |  2 +-
 .../db/mpp/execution/exchange/StubSinkHandle.java  |  5 ++++
 17 files changed, 99 insertions(+), 34 deletions(-)

diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
index fd159a3bed..8f36bd1023 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.itbase.category.LocalStandaloneIT;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -469,7 +468,7 @@ public class IoTDBSelectIntoIT {
     executeNonQuery("CREATE TIMESERIES root.sg_error_bk1.new_d.t1 TEXT;");
     assertTestFail(
         "select s1, s2 into root.sg_error_bk1.new_d(t1, t2, t3, t4) from root.sg.*;",
-        "Task was cancelled.");
+        "Fail to insert measurements [t1] caused by [data type of root.sg_error_bk1.new_d.t1 is not consistent, registered type TEXT, inserting type INT32, timestamp 1, value 1]");
   }
 
   @Test
@@ -503,7 +502,6 @@ public class IoTDBSelectIntoIT {
   }
 
   @Test
-  @Ignore // TODO remove @Ignore after fix error message inconsistent
   public void testPermission2() throws SQLException {
     try (Connection adminCon = EnvFactory.getEnv().getConnection();
         Statement adminStmt = adminCon.createStatement()) {
@@ -516,7 +514,11 @@ public class IoTDBSelectIntoIT {
             "select s1, s2 into root.sg_bk.new_d(t1, t2, t3, t4) from root.sg.*;");
         fail("No exception!");
       } catch (SQLException e) {
-        Assert.assertTrue(e.getMessage(), e.getMessage().contains("Task was cancelled."));
+        Assert.assertTrue(
+            e.getMessage(),
+            e.getMessage()
+                .contains(
+                    "No permissions for this operation, please add privilege INSERT_TIMESERIES"));
       }
     }
   }
diff --git a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
index e18a0e5ace..d327b6d2da 100644
--- a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
+++ b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
@@ -367,8 +367,7 @@ public class RSchemaRegion implements ISchemaRegion {
             } else if (checkResult.getResult(RMNodeType.ENTITY)) {
               if ((checkResult.getValue()[1] & FLAG_IS_ALIGNED) != 0) {
                 throw new AlignedTimeseriesException(
-                    "Timeseries under this entity is aligned, please use createAlignedTimeseries"
-                        + " or change entity.",
+                    "timeseries under this entity is aligned, please use createAlignedTimeseries or change entity.",
                     RSchemaUtils.getPathByLevelPath(levelPath));
               }
             } else {
diff --git a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
index 174f989e86..df3d187d10 100644
--- a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
+++ b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
@@ -278,7 +278,7 @@ public class TagSchemaRegion implements ISchemaRegion {
     if (deviceEntry != null) {
       if (!deviceEntry.isAligned()) {
         throw new AlignedTimeseriesException(
-            "Timeseries under this entity is aligned, please use createAlignedTimeseries or change entity.",
+            "timeseries under this entity is aligned, please use createAlignedTimeseries or change entity.",
             devicePath.getFullPath());
       } else {
         filterExistingMeasurements(plan, deviceEntry.getMeasurementMap().keySet());
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
index 4b1e8b94ad..84f488a987 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
@@ -267,7 +267,7 @@ public class MTreeBelowSGCachedImpl implements IMTreeBelowSG {
 
           if (device.isEntity() && device.getAsEntityMNode().isAligned()) {
             throw new AlignedTimeseriesException(
-                "Timeseries under this entity is aligned, please use createAlignedTimeseries or change entity.",
+                "timeseries under this entity is aligned, please use createAlignedTimeseries or change entity.",
                 device.getFullPath());
           }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
index bb1e1d1ef1..28c039c997 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
@@ -246,7 +246,7 @@ public class MTreeBelowSGMemoryImpl implements IMTreeBelowSG {
 
       if (device.isEntity() && device.getAsEntityMNode().isAligned()) {
         throw new AlignedTimeseriesException(
-            "Timeseries under this entity is aligned, please use createAlignedTimeseries or change entity.",
+            "timeseries under this entity is aligned, please use createAlignedTimeseries or change entity.",
             device.getFullPath());
       }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index d5aaa093fb..fcc630e8bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -142,6 +142,10 @@ public class QueryStateMachine {
     return "no detailed failure reason in QueryStateMachine";
   }
 
+  public Throwable getFailureException() {
+    return failureException;
+  }
+
   public TSStatus getFailureStatus() {
     return failureStatus;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
index 406ea5d625..50e342621f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
@@ -73,6 +73,14 @@ public interface ISinkHandle {
    */
   void abort();
 
+  /**
+   * Abort the sink handle. Discard all tsblocks which may still be in the memory buffer and cancel
+   * the future returned by {@link #isFull()}.
+   *
+   * <p>Should only be called in abnormal case
+   */
+  void abort(Throwable throwable);
+
   /**
    * Close the sink handle. Discard all tsblocks which may still be in the memory buffer and
    * complete the future returned by {@link #isFull()}.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
index bcf469b38d..aa3aeb1d2a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
@@ -157,6 +157,22 @@ public class LocalSinkHandle implements ISinkHandle {
     logger.debug("[EndAbortLocalSinkHandle]");
   }
 
+  @Override
+  public void abort(Throwable throwable) {
+    logger.debug("[StartAbortLocalSinkHandle]");
+    synchronized (queue) {
+      synchronized (this) {
+        if (aborted || closed) {
+          return;
+        }
+        aborted = true;
+        queue.abort(throwable);
+        sinkHandleListener.onAborted(this);
+      }
+    }
+    logger.debug("[EndAbortLocalSinkHandle]");
+  }
+
   @Override
   public void close() {
     logger.debug("[StartCloseLocalSinkHandle]");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index b42a370780..a593bf5c91 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -196,8 +196,6 @@ public class SharedTsBlockQueue {
     }
   }
 
-  // TODO add Throwable t as a parameter of this method, and then call blocked.setException(t);
-  // instead of blocked.cancel(true);
   /** Destroy the queue and cancel the future. Should only be called in abnormal case */
   public void abort() {
     if (closed) {
@@ -218,4 +216,25 @@ public class SharedTsBlockQueue {
       bufferRetainedSizeInBytes = 0;
     }
   }
+
+  /** Destroy the queue and cancel the future. Should only be called in abnormal case */
+  public void abort(Throwable t) {
+    if (closed) {
+      return;
+    }
+    closed = true;
+    if (!blocked.isDone()) {
+      blocked.setException(t);
+    }
+    if (blockedOnMemory != null) {
+      bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blockedOnMemory);
+    }
+    queue.clear();
+    if (bufferRetainedSizeInBytes > 0L) {
+      localMemoryManager
+          .getQueryPool()
+          .free(localFragmentInstanceId.getQueryId(), bufferRetainedSizeInBytes);
+      bufferRetainedSizeInBytes = 0;
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
index 5c37286c78..cf8e26be56 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
@@ -195,6 +195,11 @@ public class SinkHandle implements ISinkHandle {
     logger.debug("[EndAbortSinkHandle]");
   }
 
+  @Override
+  public void abort(Throwable throwable) {
+    abort();
+  }
+
   @Override
   public synchronized void close() {
     logger.debug("[StartCloseSinkHandle]");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index a71ebd0dab..a271f05a91 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -116,7 +116,12 @@ public class FragmentInstanceExecution {
             }
 
             if (newState.isFailed()) {
-              sinkHandle.abort();
+              Throwable throwable = stateMachine.getFailureCauses().peek();
+              if (throwable == null) {
+                sinkHandle.abort();
+              } else {
+                sinkHandle.abort(throwable);
+              }
             } else {
               sinkHandle.close();
             }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index 4b1b57dfa3..7b0fa693fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -120,7 +120,6 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
           String.format(
               "Error occurred while inserting tablets in SELECT INTO: %s",
               executionStatus.getMessage());
-      LOGGER.error(message);
       throw new IntoProcessException(message);
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
index ae4e997a7a..4a4101da85 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
@@ -77,7 +77,7 @@ public abstract class AbstractDriverThread extends Thread implements Closeable {
           // reset the thread name here
           try (SetThreadName fragmentInstanceName =
               new SetThreadName(next.getFragmentInstance().getInfo().getFullId())) {
-            logger.error("[ExecuteFailed]", t);
+            logger.warn("[ExecuteFailed]", t);
             next.setAbortCause(FragmentInstanceAbortedException.BY_INTERNAL_ERROR_SCHEDULED);
             scheduler.toAborted(next);
           }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index 6a683fc4ce..7eb8b57d0d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
 import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
 import org.apache.iotdb.db.metadata.template.ITemplateManager;
@@ -61,6 +62,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -490,7 +492,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
           new IoTDBException(executionResult.status.getMessage(), statusCode));
     }
 
-    List<String> failedCreationList = new ArrayList<>();
+    Set<String> failedCreationList = new HashSet<>();
     List<MeasurementPath> alreadyExistingMeasurements = new ArrayList<>();
     for (TSStatus subStatus : executionResult.status.subStatus) {
       if (subStatus.code == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
@@ -502,12 +504,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
     }
 
     if (!failedCreationList.isEmpty()) {
-      StringBuilder stringBuilder = new StringBuilder();
-      for (String message : failedCreationList) {
-        stringBuilder.append(message).append("\n");
-      }
-      throw new RuntimeException(
-          new MetadataException(String.format("Failed to auto create schema\n %s", stringBuilder)));
+      throw new SemanticException(new MetadataException(String.join(";, ", failedCreationList)));
     }
 
     return alreadyExistingMeasurements;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 8c48c6a574..61296aec64 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -388,25 +388,31 @@ public class QueryExecution implements IQueryExecution {
           return Optional.empty();
         }
       } catch (ExecutionException | CancellationException e) {
-        stateMachine.transitionToFailed(e.getCause() != null ? e.getCause() : e);
-        if (stateMachine.getFailureStatus() != null) {
-          throw new IoTDBException(
-              stateMachine.getFailureStatus().getMessage(), stateMachine.getFailureStatus().code);
-        }
-        Throwable t = e.getCause() == null ? e : e.getCause();
-        throwIfUnchecked(t);
-        throw new IoTDBException(t, TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
+        dealWithException(e.getCause() != null ? e.getCause() : e);
       } catch (InterruptedException e) {
-        stateMachine.transitionToFailed(e);
         Thread.currentThread().interrupt();
-        throw new IoTDBException(e, TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
+        dealWithException(e);
       } catch (Throwable t) {
-        stateMachine.transitionToFailed(t);
-        throw t;
+        dealWithException(t);
       }
     }
   }
 
+  private void dealWithException(Throwable t) throws IoTDBException {
+    stateMachine.transitionToFailed(t);
+    if (stateMachine.getFailureStatus() != null) {
+      throw new IoTDBException(
+          stateMachine.getFailureStatus().getMessage(), stateMachine.getFailureStatus().code);
+    } else if (stateMachine.getFailureException() != null) {
+      Throwable rootCause = stateMachine.getFailureException();
+      throwIfUnchecked(rootCause);
+      throw new IoTDBException(rootCause, TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
+    } else {
+      throwIfUnchecked(t);
+      throw new IoTDBException(t, TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
+    }
+  }
+
   @Override
   public Optional<TsBlock> getBatchResult() throws IoTDBException {
     return getResult(this::getDeserializedTsBlock);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 6c8bca7a51..af1a2fec89 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -193,7 +193,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
       logger.error("can't connect to node {}", endPoint, e);
       TSStatus status = new TSStatus();
       status.setCode(TSStatusCode.SYNC_CONNECTION_ERROR.getStatusCode());
-      status.setMessage("can't connect to node {}" + endPoint);
+      status.setMessage("can't connect to node " + endPoint);
       // If the DataNode cannot be connected, its endPoint will be put into black list
       // so that the following retry will avoid dispatching instance towards this DataNode.
       queryContext.addFailedEndPoint(endPoint);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
index 2930abe48b..a2094a5127 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
@@ -93,6 +93,11 @@ public class StubSinkHandle implements ISinkHandle {
     tsBlocks.clear();
   }
 
+  @Override
+  public void abort(Throwable throwable) {
+    abort();
+  }
+
   @Override
   public void close() {
     closed = true;