You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/06/08 16:11:17 UTC

[incubator-iotdb] 03/04: add error statement

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit db7d9921f4ea5c2127b2e369f23375d4c84c0738
Author: lta <li...@163.com>
AuthorDate: Sun Jun 9 00:02:13 2019 +0800

    add error statement
---
 .../iotdb/cluster/qp/executor/AbstractQPExecutor.java | 19 +++++++------------
 .../iotdb/cluster/qp/executor/NonQueryExecutor.java   |  4 ++--
 .../cluster/qp/executor/QueryMetadataExecutor.java    | 15 ++++++++-------
 .../org/apache/iotdb/cluster/qp/task/BatchQPTask.java |  2 +-
 .../java/org/apache/iotdb/cluster/qp/task/QPTask.java |  1 +
 .../apache/iotdb/cluster/qp/task/SingleQPTask.java    |  4 +---
 .../cluster/query/executor/ClusterQueryRouter.java    |  2 +-
 .../cluster/query/utils/ClusterRpcReaderUtils.java    |  2 +-
 .../rpc/raft/impl/RaftNodeAsClientManager.java        |  1 -
 .../iotdb/cluster/service/TSServiceClusterImpl.java   |  2 +-
 iotdb/iotdb/conf/iotdb-cluster.properties             | 10 ++++++----
 .../db/qp/strategy/optimizer/ConcatPathOptimizer.java |  4 ++--
 12 files changed, 31 insertions(+), 35 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
index 2a721f3..5059e06 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
@@ -22,10 +22,8 @@ import com.alipay.sofa.jraft.entity.PeerId;
 import java.util.HashSet;
 import java.util.Set;
 import org.apache.iotdb.cluster.config.ClusterConfig;
-import org.apache.iotdb.cluster.config.ClusterConsistencyLevel;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.entity.Server;
-import org.apache.iotdb.cluster.exception.ConsistencyLevelException;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.qp.task.QPTask;
 import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
@@ -96,14 +94,14 @@ public abstract class AbstractQPExecutor {
       throws InterruptedException, RaftConnectionException {
     PeerId firstNode = task.getTargetNode();
     RaftUtils.updatePeerIDOrder(firstNode, groupId);
-    BasicResponse response = null;
+    BasicResponse response;
     try {
       asyncSendSingleTask(task, taskRetryNum);
       response = syncGetSingleTaskRes(task, taskRetryNum, taskInfo, groupId, downNodeSet);
+      return response;
     } catch (RaftConnectionException ex) {
-      boolean success = false;
       downNodeSet.add(firstNode);
-      while (!success) {
+      while (true) {
         PeerId nextNode = null;
         try {
           nextNode = RaftUtils.getPeerIDInOrder(groupId);
@@ -115,22 +113,19 @@ public abstract class AbstractQPExecutor {
               nextNode);
           task.resetTask();
           task.setTargetNode(nextNode);
-          task.setTaskState(TaskState.INITIAL);
           asyncSendSingleTask(task, taskRetryNum);
           response = syncGetSingleTaskRes(task, taskRetryNum, taskInfo, groupId, downNodeSet);
           LOGGER.debug("{} task for group {} to node {} succeed.", taskInfo, groupId, nextNode);
-          success = true;
+          return response;
         } catch (RaftConnectionException e1) {
           LOGGER.debug("{} task for group {} to node {} fail.", taskInfo, groupId, nextNode);
           downNodeSet.add(nextNode);
         }
       }
-      LOGGER.debug("The final result for {} task is {}", taskInfo, success);
-      if (!success) {
-        throw ex;
-      }
+      throw new RaftConnectionException(String
+          .format("Can not %s in all nodes of group<%s>, please check cluster status.",
+              taskInfo, groupId));
     }
-    return response;
   }
 
   protected BasicResponse syncHandleSingleTaskGetRes(SingleQPTask task, int taskRetryNum, String taskInfo, String groupId)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index b7dd242..1e6abea 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -88,7 +88,7 @@ public class NonQueryExecutor extends AbstractQPExecutor {
       return handleNonQueryRequest(groupId, plan);
     } catch (RaftConnectionException e) {
       LOGGER.error(e.getMessage());
-      throw new ProcessorException("Raft connection occurs error.", e);
+      throw new ProcessorException(e.getMessage());
     } catch (InterruptedException | PathErrorException | IOException e) {
       throw new ProcessorException(e);
     }
@@ -327,7 +327,7 @@ public class NonQueryExecutor extends AbstractQPExecutor {
     } else {
       PeerId leader = RaftUtils.getLocalLeaderPeerID(groupId);
       qpTask.setTargetNode(leader);
-      return syncHandleSingleTask(qpTask, "non-query", groupId);
+      return syncHandleSingleTask(qpTask, "execute non-query", groupId);
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 4270c2b..2d39863 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -29,7 +29,6 @@ import java.util.Map.Entry;
 import java.util.Set;
 import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterConsistencyLevel;
-import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.RaftService;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
@@ -68,7 +67,6 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
   private static final Logger LOGGER = LoggerFactory.getLogger(QueryMetadataExecutor.class);
   private static final String DOUB_SEPARATOR = "\\.";
   private static final char SINGLE_SEPARATOR = '.';
-  private static final String RAFT_CONNECTION_ERROR = "Raft connection occurs error.";
 
   public QueryMetadataExecutor() {
     super();
@@ -151,13 +149,13 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
           holder);
       res.addAll(queryTimeSeries(task, pathList, groupId));
     } catch (RaftConnectionException e) {
-      throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+      throw new ProcessorException(e.getMessage());
     }
   }
 
   private List<List<String>> queryTimeSeries(SingleQPTask task, List<String> pathList, String groupId)
       throws InterruptedException, RaftConnectionException {
-    BasicResponse response = syncHandleSingleTaskGetRes(task, 0, "show timeseries " + pathList, groupId);
+    BasicResponse response = syncHandleSingleTaskGetRes(task, 0, "query timeseries " + pathList, groupId);
     return response == null ? new ArrayList<>()
         : ((QueryTimeSeriesResponse) response).getTimeSeries();
   }
@@ -266,7 +264,10 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
         }
         LOGGER.debug("The final result for query metadata task is {}", success);
         if (!success) {
-          throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+          throw new ProcessorException(String
+              .format(
+                  "Can not query metadata in all nodes of group<%s>, please check cluster status.",
+                  groupId));
         }
       }
     }
@@ -316,7 +317,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
             holder);
         dataType = querySeriesType(task, path, groupId);
       } catch (RaftConnectionException e) {
-        throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+        throw new ProcessorException(e.getMessage());
       }
     }
     return dataType;
@@ -377,7 +378,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
           .debug("Send get paths for {} task for group {} to node {}.", pathList, groupId, holder);
       res.addAll(queryPaths(task, pathList, groupId));
     } catch (RaftConnectionException e) {
-      throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+      throw new ProcessorException(e.getMessage());
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
index 8548879..c2b290f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
@@ -180,7 +180,7 @@ public class BatchQPTask extends MultiQPTask {
    */
   private void executeRpcSubTask(SingleQPTask subTask, String groupId) {
     try {
-      executor.syncHandleSingleTask(subTask, "sub non-query", groupId);
+      executor.syncHandleSingleTask(subTask, "execute sub non-query", groupId);
       this.receive(subTask.getResponse());
     } catch (RaftConnectionException | InterruptedException e) {
       LOGGER.error("Async handle sub task failed.");
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QPTask.java
index b86e92a..f3182c9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QPTask.java
@@ -100,6 +100,7 @@ public abstract class QPTask {
 
   public void resetTask() {
     this.taskCountDownLatch = new CountDownLatch(taskNum);
+    this.taskState = TaskState.INITIAL;
   }
 
   public TaskState getTaskState() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java
index c684cf1..1faef65 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java
@@ -28,8 +28,6 @@ import org.slf4j.LoggerFactory;
  */
 public class SingleQPTask extends QPTask {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(SingleQPTask.class);
-
   private static final int TASK_NUM = 1;
 
   public SingleQPTask(boolean isSyncTask, BasicRequest request) {
@@ -45,7 +43,7 @@ public class SingleQPTask extends QPTask {
     if(taskState != TaskState.EXCEPTION) {
       this.response = response;
       if(response == null){
-        LOGGER.error("Response is null");
+        this.taskState = TaskState.RAFT_CONNECTION_EXCEPTION;
       } else if (response.isRedirected()) {
         this.taskState = TaskState.REDIRECT;
       } else {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
index d56540e..3db7c6a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
@@ -92,7 +92,7 @@ public class ClusterQueryRouter extends AbstractQueryRouter {
         return engineExecutor.execute(context);
       }
     } catch (QueryFilterOptimizationException | IOException | RaftConnectionException e) {
-      throw new FileNodeManagerException(e);
+      throw new FileNodeManagerException(e.getMessage());
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
index bd61375..bab0a67 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
@@ -73,7 +73,7 @@ public class ClusterRpcReaderUtils {
       }
     }
     throw new RaftConnectionException(
-        String.format("Can not init series reader in all nodes of group<%s>.", groupId));
+        String.format("Can not init series reader in all nodes of group<%s>, please check cluster status.", groupId));
   }
 
   /**
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
index 088177f..8913086 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
@@ -224,7 +224,6 @@ public class RaftNodeAsClientManager {
         qpTask.receive(response);
       } catch (RemotingException | InterruptedException e) {
         LOGGER.error(e.getMessage());
-        qpTask.setTaskState(TaskState.RAFT_CONNECTION_EXCEPTION);
         qpTask.receive(null);
       }
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
index b2b4fab..505afd9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
@@ -321,7 +321,7 @@ public class TSServiceClusterImpl extends TSServiceImpl {
     contextMapLocal.get().put(req.queryId, context);
 
     queryManager.addSingleQuery(jobId, (QueryPlan) physicalPlan);
-    QueryDataSet queryDataSet = processor.getExecutor().processQuery((QueryPlan) physicalPlan,
+    QueryDataSet queryDataSet = processor.getExecutor().processQuery(physicalPlan,
         context);
     try {
       queryRet.get().put(statement, queryDataSet);
diff --git a/iotdb/iotdb/conf/iotdb-cluster.properties b/iotdb/iotdb/conf/iotdb-cluster.properties
index 2761b22..c85ecc6 100644
--- a/iotdb/iotdb/conf/iotdb-cluster.properties
+++ b/iotdb/iotdb/conf/iotdb-cluster.properties
@@ -77,11 +77,13 @@ concurrent_inner_rpc_client_thread = 0
 # waiting qp tasks exceed to this number, new qp task will be rejected.
 max_queue_num_of_inner_rpc_client = 500
 
-# ReadMetadataConsistencyLevel: 1  Strong consistency, 2  Weak consistency
-read_metadata_consistency_level = 1
+# ReadMetadataConsistencyLevel: strong or weak. Default consistency level is strong.
+# This parameter is case-insensitive.
+read_metadata_consistency_level = strong
 
-# ReadDataConsistencyLevel: 1  Strong consistency, 2  Weak consistency
-read_data_consistency_level = 1
+# ReadDataConsistencyLevel: strong or weak. Default consistency level is strong.
+# This parameter is case-insensitive.
+read_data_consistency_level = strong
 
 # Maximum number of threads which execute tasks generated by client requests concurrently.
 # Each client request corresponds to a QP Task. A QP task may be divided into several sub-tasks.
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
index 9d1d618..689a05d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.db.exception.qp.LogicalOperatorException;
 import org.apache.iotdb.db.exception.qp.LogicalOptimizeException;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.executor.IQueryProcessExecutor;
-import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.logical.crud.BasicFunctionOperator;
 import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
@@ -355,7 +354,8 @@ public class ConcatPathOptimizer implements ILogicalOptimizer {
           }
         }
       } catch (PathErrorException e) {
-        throw new LogicalOptimizeException("error when remove star: ", e);
+        throw new LogicalOptimizeException(
+            String.format("error when remove star: %s", e.getMessage()), e);
       }
     }
     if (retPaths.isEmpty()) {