You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/06/08 16:11:17 UTC
[incubator-iotdb] 03/04: add error statement
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit db7d9921f4ea5c2127b2e369f23375d4c84c0738
Author: lta <li...@163.com>
AuthorDate: Sun Jun 9 00:02:13 2019 +0800
add error statement
---
.../iotdb/cluster/qp/executor/AbstractQPExecutor.java | 19 +++++++------------
.../iotdb/cluster/qp/executor/NonQueryExecutor.java | 4 ++--
.../cluster/qp/executor/QueryMetadataExecutor.java | 15 ++++++++-------
.../org/apache/iotdb/cluster/qp/task/BatchQPTask.java | 2 +-
.../java/org/apache/iotdb/cluster/qp/task/QPTask.java | 1 +
.../apache/iotdb/cluster/qp/task/SingleQPTask.java | 4 +---
.../cluster/query/executor/ClusterQueryRouter.java | 2 +-
.../cluster/query/utils/ClusterRpcReaderUtils.java | 2 +-
.../rpc/raft/impl/RaftNodeAsClientManager.java | 1 -
.../iotdb/cluster/service/TSServiceClusterImpl.java | 2 +-
iotdb/iotdb/conf/iotdb-cluster.properties | 10 ++++++----
.../db/qp/strategy/optimizer/ConcatPathOptimizer.java | 4 ++--
12 files changed, 31 insertions(+), 35 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
index 2a721f3..5059e06 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
@@ -22,10 +22,8 @@ import com.alipay.sofa.jraft.entity.PeerId;
import java.util.HashSet;
import java.util.Set;
import org.apache.iotdb.cluster.config.ClusterConfig;
-import org.apache.iotdb.cluster.config.ClusterConsistencyLevel;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.entity.Server;
-import org.apache.iotdb.cluster.exception.ConsistencyLevelException;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
import org.apache.iotdb.cluster.qp.task.QPTask;
import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
@@ -96,14 +94,14 @@ public abstract class AbstractQPExecutor {
throws InterruptedException, RaftConnectionException {
PeerId firstNode = task.getTargetNode();
RaftUtils.updatePeerIDOrder(firstNode, groupId);
- BasicResponse response = null;
+ BasicResponse response;
try {
asyncSendSingleTask(task, taskRetryNum);
response = syncGetSingleTaskRes(task, taskRetryNum, taskInfo, groupId, downNodeSet);
+ return response;
} catch (RaftConnectionException ex) {
- boolean success = false;
downNodeSet.add(firstNode);
- while (!success) {
+ while (true) {
PeerId nextNode = null;
try {
nextNode = RaftUtils.getPeerIDInOrder(groupId);
@@ -115,22 +113,19 @@ public abstract class AbstractQPExecutor {
nextNode);
task.resetTask();
task.setTargetNode(nextNode);
- task.setTaskState(TaskState.INITIAL);
asyncSendSingleTask(task, taskRetryNum);
response = syncGetSingleTaskRes(task, taskRetryNum, taskInfo, groupId, downNodeSet);
LOGGER.debug("{} task for group {} to node {} succeed.", taskInfo, groupId, nextNode);
- success = true;
+ return response;
} catch (RaftConnectionException e1) {
LOGGER.debug("{} task for group {} to node {} fail.", taskInfo, groupId, nextNode);
downNodeSet.add(nextNode);
}
}
- LOGGER.debug("The final result for {} task is {}", taskInfo, success);
- if (!success) {
- throw ex;
- }
+ throw new RaftConnectionException(String
+ .format("Can not %s in all nodes of group<%s>, please check cluster status.",
+ taskInfo, groupId));
}
- return response;
}
protected BasicResponse syncHandleSingleTaskGetRes(SingleQPTask task, int taskRetryNum, String taskInfo, String groupId)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index b7dd242..1e6abea 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -88,7 +88,7 @@ public class NonQueryExecutor extends AbstractQPExecutor {
return handleNonQueryRequest(groupId, plan);
} catch (RaftConnectionException e) {
LOGGER.error(e.getMessage());
- throw new ProcessorException("Raft connection occurs error.", e);
+ throw new ProcessorException(e.getMessage());
} catch (InterruptedException | PathErrorException | IOException e) {
throw new ProcessorException(e);
}
@@ -327,7 +327,7 @@ public class NonQueryExecutor extends AbstractQPExecutor {
} else {
PeerId leader = RaftUtils.getLocalLeaderPeerID(groupId);
qpTask.setTargetNode(leader);
- return syncHandleSingleTask(qpTask, "non-query", groupId);
+ return syncHandleSingleTask(qpTask, "execute non-query", groupId);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 4270c2b..2d39863 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -29,7 +29,6 @@ import java.util.Map.Entry;
import java.util.Set;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterConsistencyLevel;
-import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
import org.apache.iotdb.cluster.entity.raft.RaftService;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
@@ -68,7 +67,6 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(QueryMetadataExecutor.class);
private static final String DOUB_SEPARATOR = "\\.";
private static final char SINGLE_SEPARATOR = '.';
- private static final String RAFT_CONNECTION_ERROR = "Raft connection occurs error.";
public QueryMetadataExecutor() {
super();
@@ -151,13 +149,13 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
holder);
res.addAll(queryTimeSeries(task, pathList, groupId));
} catch (RaftConnectionException e) {
- throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+ throw new ProcessorException(e.getMessage());
}
}
private List<List<String>> queryTimeSeries(SingleQPTask task, List<String> pathList, String groupId)
throws InterruptedException, RaftConnectionException {
- BasicResponse response = syncHandleSingleTaskGetRes(task, 0, "show timeseries " + pathList, groupId);
+ BasicResponse response = syncHandleSingleTaskGetRes(task, 0, "query timeseries " + pathList, groupId);
return response == null ? new ArrayList<>()
: ((QueryTimeSeriesResponse) response).getTimeSeries();
}
@@ -266,7 +264,10 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
}
LOGGER.debug("The final result for query metadata task is {}", success);
if (!success) {
- throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+ throw new ProcessorException(String
+ .format(
+ "Can not query metadata in all nodes of group<%s>, please check cluster status.",
+ groupId));
}
}
}
@@ -316,7 +317,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
holder);
dataType = querySeriesType(task, path, groupId);
} catch (RaftConnectionException e) {
- throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+ throw new ProcessorException(e.getMessage());
}
}
return dataType;
@@ -377,7 +378,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor {
.debug("Send get paths for {} task for group {} to node {}.", pathList, groupId, holder);
res.addAll(queryPaths(task, pathList, groupId));
} catch (RaftConnectionException e) {
- throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
+ throw new ProcessorException(e.getMessage());
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
index 8548879..c2b290f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
@@ -180,7 +180,7 @@ public class BatchQPTask extends MultiQPTask {
*/
private void executeRpcSubTask(SingleQPTask subTask, String groupId) {
try {
- executor.syncHandleSingleTask(subTask, "sub non-query", groupId);
+ executor.syncHandleSingleTask(subTask, "execute sub non-query", groupId);
this.receive(subTask.getResponse());
} catch (RaftConnectionException | InterruptedException e) {
LOGGER.error("Async handle sub task failed.");
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QPTask.java
index b86e92a..f3182c9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QPTask.java
@@ -100,6 +100,7 @@ public abstract class QPTask {
public void resetTask() {
this.taskCountDownLatch = new CountDownLatch(taskNum);
+ this.taskState = TaskState.INITIAL;
}
public TaskState getTaskState() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java
index c684cf1..1faef65 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java
@@ -28,8 +28,6 @@ import org.slf4j.LoggerFactory;
*/
public class SingleQPTask extends QPTask {
- private static final Logger LOGGER = LoggerFactory.getLogger(SingleQPTask.class);
-
private static final int TASK_NUM = 1;
public SingleQPTask(boolean isSyncTask, BasicRequest request) {
@@ -45,7 +43,7 @@ public class SingleQPTask extends QPTask {
if(taskState != TaskState.EXCEPTION) {
this.response = response;
if(response == null){
- LOGGER.error("Response is null");
+ this.taskState = TaskState.RAFT_CONNECTION_EXCEPTION;
} else if (response.isRedirected()) {
this.taskState = TaskState.REDIRECT;
} else {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
index d56540e..3db7c6a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
@@ -92,7 +92,7 @@ public class ClusterQueryRouter extends AbstractQueryRouter {
return engineExecutor.execute(context);
}
} catch (QueryFilterOptimizationException | IOException | RaftConnectionException e) {
- throw new FileNodeManagerException(e);
+ throw new FileNodeManagerException(e.getMessage());
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
index bd61375..bab0a67 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
@@ -73,7 +73,7 @@ public class ClusterRpcReaderUtils {
}
}
throw new RaftConnectionException(
- String.format("Can not init series reader in all nodes of group<%s>.", groupId));
+ String.format("Can not init series reader in all nodes of group<%s>, please check cluster status.", groupId));
}
/**
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
index 088177f..8913086 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
@@ -224,7 +224,6 @@ public class RaftNodeAsClientManager {
qpTask.receive(response);
} catch (RemotingException | InterruptedException e) {
LOGGER.error(e.getMessage());
- qpTask.setTaskState(TaskState.RAFT_CONNECTION_EXCEPTION);
qpTask.receive(null);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
index b2b4fab..505afd9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
@@ -321,7 +321,7 @@ public class TSServiceClusterImpl extends TSServiceImpl {
contextMapLocal.get().put(req.queryId, context);
queryManager.addSingleQuery(jobId, (QueryPlan) physicalPlan);
- QueryDataSet queryDataSet = processor.getExecutor().processQuery((QueryPlan) physicalPlan,
+ QueryDataSet queryDataSet = processor.getExecutor().processQuery(physicalPlan,
context);
try {
queryRet.get().put(statement, queryDataSet);
diff --git a/iotdb/iotdb/conf/iotdb-cluster.properties b/iotdb/iotdb/conf/iotdb-cluster.properties
index 2761b22..c85ecc6 100644
--- a/iotdb/iotdb/conf/iotdb-cluster.properties
+++ b/iotdb/iotdb/conf/iotdb-cluster.properties
@@ -77,11 +77,13 @@ concurrent_inner_rpc_client_thread = 0
# waiting qp tasks exceed to this number, new qp task will be rejected.
max_queue_num_of_inner_rpc_client = 500
-# ReadMetadataConsistencyLevel: 1 Strong consistency, 2 Weak consistency
-read_metadata_consistency_level = 1
+# ReadMetadataConsistencyLevel: strong or weak. Default consistency level is strong.
+# This parameter is case-insensitive.
+read_metadata_consistency_level = strong
-# ReadDataConsistencyLevel: 1 Strong consistency, 2 Weak consistency
-read_data_consistency_level = 1
+# ReadDataConsistencyLevel: strong or weak. Default consistency level is strong.
+# This parameter is case-insensitive.
+read_data_consistency_level = strong
# Maximum number of threads which execute tasks generated by client requests concurrently.
# Each client request corresponds to a QP Task. A QP task may be divided into several sub-tasks.
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
index 9d1d618..689a05d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/strategy/optimizer/ConcatPathOptimizer.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.db.exception.qp.LogicalOperatorException;
import org.apache.iotdb.db.exception.qp.LogicalOptimizeException;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.executor.IQueryProcessExecutor;
-import org.apache.iotdb.db.qp.executor.QueryProcessExecutor;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.logical.crud.BasicFunctionOperator;
import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
@@ -355,7 +354,8 @@ public class ConcatPathOptimizer implements ILogicalOptimizer {
}
}
} catch (PathErrorException e) {
- throw new LogicalOptimizeException("error when remove star: ", e);
+ throw new LogicalOptimizeException(
+ String.format("error when remove star: %s", e.getMessage()), e);
}
}
if (retPaths.isEmpty()) {