You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/11/17 06:36:49 UTC

[iotdb] branch SelectIntoException created (now a9f5c85e74)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch SelectIntoException
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at a9f5c85e74 Make select into error more user-friendly

This branch includes the following new commits:

     new a9f5c85e74 Make select into error more user-friendly

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: Make select into error more user-friendly

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch SelectIntoException
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a9f5c85e7457ce336fec8548a2c54b62e37fac4b
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu Nov 17 14:35:58 2022 +0800

    Make select into error more user-friendly
---
 .../iotdb/db/it/selectinto/IoTDBSelectIntoIT.java  | 10 +++++---
 .../schemaregion/rocksdb/RSchemaRegion.java        |  3 +--
 .../metadata/tagSchemaRegion/TagSchemaRegion.java  |  2 +-
 .../db/metadata/mtree/MTreeBelowSGCachedImpl.java  |  2 +-
 .../db/metadata/mtree/MTreeBelowSGMemoryImpl.java  |  2 +-
 .../iotdb/db/mpp/execution/QueryStateMachine.java  |  4 +++
 .../db/mpp/execution/exchange/ISinkHandle.java     |  8 ++++++
 .../db/mpp/execution/exchange/LocalSinkHandle.java | 16 ++++++++++++
 .../mpp/execution/exchange/SharedTsBlockQueue.java | 23 +++++++++++++++--
 .../db/mpp/execution/exchange/SinkHandle.java      |  5 ++++
 .../fragment/FragmentInstanceExecution.java        |  7 ++++-
 .../operator/process/AbstractIntoOperator.java     |  1 -
 .../execution/schedule/AbstractDriverThread.java   |  2 +-
 .../db/mpp/plan/analyze/ClusterSchemaFetcher.java  | 11 +++-----
 .../db/mpp/plan/execution/QueryExecution.java      | 30 +++++++++++++---------
 .../scheduler/FragmentInstanceDispatcherImpl.java  |  2 +-
 .../db/mpp/execution/exchange/StubSinkHandle.java  |  5 ++++
 17 files changed, 99 insertions(+), 34 deletions(-)

diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
index fd159a3bed..8f36bd1023 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.itbase.category.LocalStandaloneIT;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -469,7 +468,7 @@ public class IoTDBSelectIntoIT {
     executeNonQuery("CREATE TIMESERIES root.sg_error_bk1.new_d.t1 TEXT;");
     assertTestFail(
         "select s1, s2 into root.sg_error_bk1.new_d(t1, t2, t3, t4) from root.sg.*;",
-        "Task was cancelled.");
+        "Fail to insert measurements [t1] caused by [data type of root.sg_error_bk1.new_d.t1 is not consistent, registered type TEXT, inserting type INT32, timestamp 1, value 1]");
   }
 
   @Test
@@ -503,7 +502,6 @@ public class IoTDBSelectIntoIT {
   }
 
   @Test
-  @Ignore // TODO remove @Ignore after fix error message inconsistent
   public void testPermission2() throws SQLException {
     try (Connection adminCon = EnvFactory.getEnv().getConnection();
         Statement adminStmt = adminCon.createStatement()) {
@@ -516,7 +514,11 @@ public class IoTDBSelectIntoIT {
             "select s1, s2 into root.sg_bk.new_d(t1, t2, t3, t4) from root.sg.*;");
         fail("No exception!");
       } catch (SQLException e) {
-        Assert.assertTrue(e.getMessage(), e.getMessage().contains("Task was cancelled."));
+        Assert.assertTrue(
+            e.getMessage(),
+            e.getMessage()
+                .contains(
+                    "No permissions for this operation, please add privilege INSERT_TIMESERIES"));
       }
     }
   }
diff --git a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
index e18a0e5ace..d327b6d2da 100644
--- a/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
+++ b/schema-engine-rocksdb/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/RSchemaRegion.java
@@ -367,8 +367,7 @@ public class RSchemaRegion implements ISchemaRegion {
             } else if (checkResult.getResult(RMNodeType.ENTITY)) {
               if ((checkResult.getValue()[1] & FLAG_IS_ALIGNED) != 0) {
                 throw new AlignedTimeseriesException(
-                    "Timeseries under this entity is aligned, please use createAlignedTimeseries"
-                        + " or change entity.",
+                    "timeseries under this entity is aligned, please use createAlignedTimeseries or change entity.",
                     RSchemaUtils.getPathByLevelPath(levelPath));
               }
             } else {
diff --git a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
index 174f989e86..df3d187d10 100644
--- a/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
+++ b/schema-engine-tag/src/main/java/org/apache/iotdb/db/metadata/tagSchemaRegion/TagSchemaRegion.java
@@ -278,7 +278,7 @@ public class TagSchemaRegion implements ISchemaRegion {
     if (deviceEntry != null) {
       if (!deviceEntry.isAligned()) {
         throw new AlignedTimeseriesException(
-            "Timeseries under this entity is aligned, please use createAlignedTimeseries or change entity.",
+            "timeseries under this entity is aligned, please use createAlignedTimeseries or change entity.",
             devicePath.getFullPath());
       } else {
         filterExistingMeasurements(plan, deviceEntry.getMeasurementMap().keySet());
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
index 4b1e8b94ad..84f488a987 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
@@ -267,7 +267,7 @@ public class MTreeBelowSGCachedImpl implements IMTreeBelowSG {
 
           if (device.isEntity() && device.getAsEntityMNode().isAligned()) {
             throw new AlignedTimeseriesException(
-                "Timeseries under this entity is aligned, please use createAlignedTimeseries or change entity.",
+                "timeseries under this entity is aligned, please use createAlignedTimeseries or change entity.",
                 device.getFullPath());
           }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
index bb1e1d1ef1..28c039c997 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
@@ -246,7 +246,7 @@ public class MTreeBelowSGMemoryImpl implements IMTreeBelowSG {
 
       if (device.isEntity() && device.getAsEntityMNode().isAligned()) {
         throw new AlignedTimeseriesException(
-            "Timeseries under this entity is aligned, please use createAlignedTimeseries or change entity.",
+            "timeseries under this entity is aligned, please use createAlignedTimeseries or change entity.",
             device.getFullPath());
       }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index d5aaa093fb..fcc630e8bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -142,6 +142,10 @@ public class QueryStateMachine {
     return "no detailed failure reason in QueryStateMachine";
   }
 
+  public Throwable getFailureException() {
+    return failureException;
+  }
+
   public TSStatus getFailureStatus() {
     return failureStatus;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
index 406ea5d625..50e342621f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/ISinkHandle.java
@@ -73,6 +73,14 @@ public interface ISinkHandle {
    */
   void abort();
 
+  /**
+   * Abort the sink handle. Discard all tsblocks which may still be in the memory buffer and cancel
+   * the future returned by {@link #isFull()}.
+   *
+   * <p>Should only be called in abnormal case
+   */
+  void abort(Throwable throwable);
+
   /**
    * Close the sink handle. Discard all tsblocks which may still be in the memory buffer and
    * complete the future returned by {@link #isFull()}.
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
index bcf469b38d..aa3aeb1d2a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandle.java
@@ -157,6 +157,22 @@ public class LocalSinkHandle implements ISinkHandle {
     logger.debug("[EndAbortLocalSinkHandle]");
   }
 
+  @Override
+  public void abort(Throwable throwable) {
+    logger.debug("[StartAbortLocalSinkHandle]");
+    synchronized (queue) {
+      synchronized (this) {
+        if (aborted || closed) {
+          return;
+        }
+        aborted = true;
+        queue.abort(throwable);
+        sinkHandleListener.onAborted(this);
+      }
+    }
+    logger.debug("[EndAbortLocalSinkHandle]");
+  }
+
   @Override
   public void close() {
     logger.debug("[StartCloseLocalSinkHandle]");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index b42a370780..a593bf5c91 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -196,8 +196,6 @@ public class SharedTsBlockQueue {
     }
   }
 
-  // TODO add Throwable t as a parameter of this method, and then call blocked.setException(t);
-  // instead of blocked.cancel(true);
   /** Destroy the queue and cancel the future. Should only be called in abnormal case */
   public void abort() {
     if (closed) {
@@ -218,4 +216,25 @@ public class SharedTsBlockQueue {
       bufferRetainedSizeInBytes = 0;
     }
   }
+
+  /** Destroy the queue and cancel the future. Should only be called in abnormal case */
+  public void abort(Throwable t) {
+    if (closed) {
+      return;
+    }
+    closed = true;
+    if (!blocked.isDone()) {
+      blocked.setException(t);
+    }
+    if (blockedOnMemory != null) {
+      bufferRetainedSizeInBytes -= localMemoryManager.getQueryPool().tryCancel(blockedOnMemory);
+    }
+    queue.clear();
+    if (bufferRetainedSizeInBytes > 0L) {
+      localMemoryManager
+          .getQueryPool()
+          .free(localFragmentInstanceId.getQueryId(), bufferRetainedSizeInBytes);
+      bufferRetainedSizeInBytes = 0;
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
index 5c37286c78..cf8e26be56 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
@@ -195,6 +195,11 @@ public class SinkHandle implements ISinkHandle {
     logger.debug("[EndAbortSinkHandle]");
   }
 
+  @Override
+  public void abort(Throwable throwable) {
+    abort();
+  }
+
   @Override
   public synchronized void close() {
     logger.debug("[StartCloseSinkHandle]");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index a71ebd0dab..a271f05a91 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -116,7 +116,12 @@ public class FragmentInstanceExecution {
             }
 
             if (newState.isFailed()) {
-              sinkHandle.abort();
+              Throwable throwable = stateMachine.getFailureCauses().peek();
+              if (throwable == null) {
+                sinkHandle.abort();
+              } else {
+                sinkHandle.abort(throwable);
+              }
             } else {
               sinkHandle.close();
             }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
index 4b1b57dfa3..7b0fa693fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java
@@ -120,7 +120,6 @@ public abstract class AbstractIntoOperator implements ProcessOperator {
           String.format(
               "Error occurred while inserting tablets in SELECT INTO: %s",
               executionStatus.getMessage());
-      LOGGER.error(message);
       throw new IntoProcessException(message);
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
index ae4e997a7a..4a4101da85 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/AbstractDriverThread.java
@@ -77,7 +77,7 @@ public abstract class AbstractDriverThread extends Thread implements Closeable {
           // reset the thread name here
           try (SetThreadName fragmentInstanceName =
               new SetThreadName(next.getFragmentInstance().getInfo().getFullId())) {
-            logger.error("[ExecuteFailed]", t);
+            logger.warn("[ExecuteFailed]", t);
             next.setAbortCause(FragmentInstanceAbortedException.BY_INTERNAL_ERROR_SCHEDULED);
             scheduler.toAborted(next);
           }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index 6a683fc4ce..7eb8b57d0d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
 import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
 import org.apache.iotdb.db.metadata.template.ITemplateManager;
@@ -61,6 +62,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -490,7 +492,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
           new IoTDBException(executionResult.status.getMessage(), statusCode));
     }
 
-    List<String> failedCreationList = new ArrayList<>();
+    Set<String> failedCreationList = new HashSet<>();
     List<MeasurementPath> alreadyExistingMeasurements = new ArrayList<>();
     for (TSStatus subStatus : executionResult.status.subStatus) {
       if (subStatus.code == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
@@ -502,12 +504,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
     }
 
     if (!failedCreationList.isEmpty()) {
-      StringBuilder stringBuilder = new StringBuilder();
-      for (String message : failedCreationList) {
-        stringBuilder.append(message).append("\n");
-      }
-      throw new RuntimeException(
-          new MetadataException(String.format("Failed to auto create schema\n %s", stringBuilder)));
+      throw new SemanticException(new MetadataException(String.join(";, ", failedCreationList)));
     }
 
     return alreadyExistingMeasurements;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 8c48c6a574..61296aec64 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -388,25 +388,31 @@ public class QueryExecution implements IQueryExecution {
           return Optional.empty();
         }
       } catch (ExecutionException | CancellationException e) {
-        stateMachine.transitionToFailed(e.getCause() != null ? e.getCause() : e);
-        if (stateMachine.getFailureStatus() != null) {
-          throw new IoTDBException(
-              stateMachine.getFailureStatus().getMessage(), stateMachine.getFailureStatus().code);
-        }
-        Throwable t = e.getCause() == null ? e : e.getCause();
-        throwIfUnchecked(t);
-        throw new IoTDBException(t, TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
+        dealWithException(e.getCause() != null ? e.getCause() : e);
       } catch (InterruptedException e) {
-        stateMachine.transitionToFailed(e);
         Thread.currentThread().interrupt();
-        throw new IoTDBException(e, TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
+        dealWithException(e);
       } catch (Throwable t) {
-        stateMachine.transitionToFailed(t);
-        throw t;
+        dealWithException(t);
       }
     }
   }
 
+  private void dealWithException(Throwable t) throws IoTDBException {
+    stateMachine.transitionToFailed(t);
+    if (stateMachine.getFailureStatus() != null) {
+      throw new IoTDBException(
+          stateMachine.getFailureStatus().getMessage(), stateMachine.getFailureStatus().code);
+    } else if (stateMachine.getFailureException() != null) {
+      Throwable rootCause = stateMachine.getFailureException();
+      throwIfUnchecked(rootCause);
+      throw new IoTDBException(rootCause, TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
+    } else {
+      throwIfUnchecked(t);
+      throw new IoTDBException(t, TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
+    }
+  }
+
   @Override
   public Optional<TsBlock> getBatchResult() throws IoTDBException {
     return getResult(this::getDeserializedTsBlock);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 6c8bca7a51..af1a2fec89 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -193,7 +193,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
       logger.error("can't connect to node {}", endPoint, e);
       TSStatus status = new TSStatus();
       status.setCode(TSStatusCode.SYNC_CONNECTION_ERROR.getStatusCode());
-      status.setMessage("can't connect to node {}" + endPoint);
+      status.setMessage("can't connect to node " + endPoint);
       // If the DataNode cannot be connected, its endPoint will be put into black list
       // so that the following retry will avoid dispatching instance towards this DataNode.
       queryContext.addFailedEndPoint(endPoint);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
index 2930abe48b..a2094a5127 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/StubSinkHandle.java
@@ -93,6 +93,11 @@ public class StubSinkHandle implements ISinkHandle {
     tsBlocks.clear();
   }
 
+  @Override
+  public void abort(Throwable throwable) {
+    abort();
+  }
+
   @Override
   public void close() {
     closed = true;