You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/09 02:28:15 UTC
[incubator-iotdb] branch cluster updated: rehandle excepetion
during sending task vai client
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster by this push:
new b5d3fa3 rehandle excepetion during sending task vai client
new b882b3f Merge branch 'cluster' of github.com:apache/incubator-iotdb into cluster
b5d3fa3 is described below
commit b5d3fa37b231e85880db37fba548c5a831b6bc49
Author: lta <li...@163.com>
AuthorDate: Tue Apr 9 10:23:37 2019 +0800
rehandle excepetion during sending task vai client
---
.../apache/iotdb/cluster/callback/BatchQPTask.java | 5 ++--
.../org/apache/iotdb/cluster/callback/QPTask.java | 2 +-
.../iotdb/cluster/callback/SingleQPTask.java | 12 ++++++--
.../apache/iotdb/cluster/config/ClusterConfig.java | 6 ++--
.../apache/iotdb/cluster/qp/ClusterQPExecutor.java | 24 ++++++++-------
.../cluster/qp/executor/NonQueryExecutor.java | 14 +++++----
.../cluster/qp/executor/QueryMetadataExecutor.java | 35 +++++++++++-----------
.../cluster/rpc/impl/RaftNodeAsClientManager.java | 9 ++----
.../processor/DataGroupNonQueryAsyncProcessor.java | 4 +--
.../processor/MetaGroupNonQueryAsyncProcessor.java | 4 +--
.../QueryMetadataInStringAsyncProcessor.java | 7 ++---
.../processor/QueryTimeSeriesAsyncProcessor.java | 11 ++++---
.../rpc/response/DataGroupNonQueryResponse.java | 6 ++--
.../rpc/response/MetaGroupNonQueryResponse.java | 4 +--
.../response/QueryMetadataInStringResponse.java | 4 +--
.../rpc/response/QueryStorageGroupResponse.java | 4 +--
.../rpc/response/QueryTimeSeriesResponse.java | 7 ++---
.../org/apache/iotdb/cluster/utils/RaftUtils.java | 5 ++--
iotdb/iotdb/conf/iotdb-cluster.properties | 6 ++--
19 files changed, 86 insertions(+), 83 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/callback/BatchQPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/callback/BatchQPTask.java
index bd8a073..491848e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/BatchQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/BatchQPTask.java
@@ -25,7 +25,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
import org.apache.iotdb.cluster.qp.executor.NonQueryExecutor;
import org.apache.iotdb.cluster.rpc.response.BasicResponse;
@@ -141,7 +140,7 @@ public class BatchQPTask extends MultiQPTask {
this.run(subTask.getResponse());
} catch (InterruptedException e) {
LOGGER.error("Handle sub task locally failed.");
- this.run(DataGroupNonQueryResponse.createErrorInstance(groupId, e.getMessage()));
+ this.run(DataGroupNonQueryResponse.createErrorResponse(groupId, e.getMessage()));
}
}
@@ -154,7 +153,7 @@ public class BatchQPTask extends MultiQPTask {
this.run(subTask.getResponse());
} catch (RaftConnectionException | InterruptedException e) {
LOGGER.error("Async handle sub task failed.");
- this.run(DataGroupNonQueryResponse.createErrorInstance(groupId, e.getMessage()));
+ this.run(DataGroupNonQueryResponse.createErrorResponse(groupId, e.getMessage()));
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/callback/QPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/callback/QPTask.java
index cdb1325..04b74ab 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/QPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/QPTask.java
@@ -126,7 +126,7 @@ public abstract class QPTask {
}
public enum TaskType {
- SINGLE, BATCH, DELETE
+ SINGLE, BATCH
}
/**
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleQPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleQPTask.java
index feefcd8..f733920 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleQPTask.java
@@ -20,12 +20,16 @@ package org.apache.iotdb.cluster.callback;
import org.apache.iotdb.cluster.rpc.request.BasicRequest;
import org.apache.iotdb.cluster.rpc.response.BasicResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Process single task.
*/
public class SingleQPTask extends QPTask {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SingleQPTask.class);
+
private static final int TASK_NUM = 1;
public SingleQPTask(boolean isSyncTask, BasicRequest request) {
@@ -38,11 +42,13 @@ public class SingleQPTask extends QPTask {
*/
@Override
public void run(BasicResponse response) {
- if(response != null) {
+ if(taskState != TaskState.EXCEPTION) {
this.response = response;
- if (response.isRedirected()) {
+ if(response == null){
+ LOGGER.error("Response is null");
+ } else if (response.isRedirected()) {
this.taskState = TaskState.REDIRECT;
- } else if (taskState != TaskState.EXCEPTION) {
+ } else {
this.taskState = TaskState.FINISH;
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 40a20ef..d1d4e90 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -95,7 +95,7 @@ public class ClusterConfig {
/**
* Count limit to redo a single task
**/
- private int taskRedoCount = 3;
+ private int taskRedoCount = 10;
/**
* Timeout limit for a single task, the unit is milliseconds
**/
@@ -109,13 +109,13 @@ public class ClusterConfig {
/**
* Max number of @NodeAsClient usage
*/
- private int maxNumOfInnerRpcClient = 50;
+ private int maxNumOfInnerRpcClient = 500;
/**
* Max number of queue length to use @NodeAsClient, the request which exceed to this
* number will be rejected.
*/
- private int maxQueueNumOfInnerRpcClient = 50;
+ private int maxQueueNumOfInnerRpcClient = 500;
/**
* ReadMetadataConsistencyLevel: 1 Strong consistency, 2 Weak consistency
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
index 49db9cb..90cd1e1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
@@ -24,7 +24,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.cluster.callback.QPTask;
import org.apache.iotdb.cluster.callback.QPTask.TaskState;
import org.apache.iotdb.cluster.config.ClusterConfig;
@@ -84,12 +83,12 @@ public abstract class ClusterQPExecutor {
/**
* ReadDataConsistencyLevel: 1 Strong consistency, 2 Weak consistency
*/
- protected int readDataConsistencyLevel = CLUSTER_CONFIG.getReadDataConsistencyLevel();
+ private int readDataConsistencyLevel = CLUSTER_CONFIG.getReadDataConsistencyLevel();
/**
* Get Storage Group Name by device name
*/
- public String getStroageGroupByDevice(String device) throws PathErrorException {
+ protected String getStroageGroupByDevice(String device) throws PathErrorException {
String storageGroup;
try {
storageGroup = MManager.getInstance().getFileNameByPath(device);
@@ -117,7 +116,7 @@ public abstract class ClusterQPExecutor {
*
* @return key is groupId, value is all SGs belong to this data group
*/
- public Map<String, Set<String>> classifySGByGroupId(List<String> sgList) {
+ protected Map<String, Set<String>> classifySGByGroupId(List<String> sgList) {
Map<String, Set<String>> map = new HashMap<>();
for (int i = 0; i < sgList.size(); i++) {
String sg = sgList.get(i);
@@ -136,12 +135,12 @@ public abstract class ClusterQPExecutor {
/**
* Get raft group id by storage group name
*/
- public String getGroupIdBySG(String storageGroup) {
+ protected String getGroupIdBySG(String storageGroup) {
return router.getGroupID(router.routeGroup(storageGroup));
}
/**
- * Verify if the non query command can execute in local. 1. If this node belongs to the storage
+ * Check if the non query command can execute in local. 1. If this node belongs to the storage
* group 2. If this node is leader.
*/
public boolean canHandleNonQueryByGroupId(String groupId) {
@@ -158,9 +157,9 @@ public abstract class ClusterQPExecutor {
}
/**
- * Verify if the query command can execute in local. Check if this node belongs to the group id
+ * Check if the query command can execute in local. Check if this node belongs to the group id
*/
- public boolean canHandleQueryByGroupId(String groupId) {
+ protected boolean canHandleQueryByGroupId(String groupId) {
return router.containPhysicalNodeByGroupId(groupId, localNode);
}
@@ -172,7 +171,8 @@ public abstract class ClusterQPExecutor {
* @param taskRetryNum Number of QPTask retries due to timeout and redirected.
* @return basic response
*/
- public BasicResponse asyncHandleNonQueryTaskGetRes(QPTask task, PeerId leader, int taskRetryNum)
+ protected BasicResponse asyncHandleNonQueryTaskGetRes(QPTask task, PeerId leader,
+ int taskRetryNum)
throws InterruptedException, RaftConnectionException {
asyncSendNonQueryTask(task, leader, taskRetryNum);
return asyncGetNonQueryRes(task, leader, taskRetryNum);
@@ -211,13 +211,15 @@ public abstract class ClusterQPExecutor {
}
/**
- * Asynchronous get task response. If it's redirected, the task needs to be resent.
+ * Asynchronous get task response. If it's redirected or status is exception, the task needs to be
+ * resent. Note: If status is Exception, it marks that an exception occurred during the task is
+ * being sent instead of executed.
*
* @param task rpc task
* @param leader leader node of the group
* @param taskRetryNum Retry time of the task
*/
- public BasicResponse asyncGetNonQueryRes(QPTask task, PeerId leader, int taskRetryNum)
+ private BasicResponse asyncGetNonQueryRes(QPTask task, PeerId leader, int taskRetryNum)
throws InterruptedException, RaftConnectionException {
task.await();
if (task.getTaskState() != TaskState.FINISH) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index 1098ba2..ebaabdb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -106,7 +106,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
Status nullReadTaskStatus = Status.OK();
RaftUtils.handleNullReadToMetaGroup(nullReadTaskStatus);
- /** 1. Classify physical plan by group id **/
+ /** 1. Classify physical plans by group id **/
Map<String, List<PhysicalPlan>> physicalPlansMap = new HashMap<>();
Map<String, List<Integer>> planIndexMap = new HashMap<>();
for (int i = 0; i < result.length; i++) {
@@ -117,8 +117,10 @@ public class NonQueryExecutor extends ClusterQPExecutor {
String groupId = getGroupIdFromPhysicalPlan(plan);
if (groupId.equals(ClusterConfig.METADATA_GROUP_ID)) {
LOGGER.debug("Execute metadata group task");
- result[i] = handleNonQueryRequest(groupId, plan) ? Statement.SUCCESS_NO_INFO
+ boolean executeResult = handleNonQueryRequest(groupId, plan);
+ result[i] = executeResult ? Statement.SUCCESS_NO_INFO
: Statement.EXECUTE_FAILED;
+ batchResult.setAllSuccessful(executeResult);
}else {
if (!physicalPlansMap.containsKey(groupId)) {
physicalPlansMap.put(groupId, new ArrayList<>());
@@ -157,7 +159,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
}
}
- /** 3. Execute Multiple Tasks **/
+ /** 3. Execute Multiple Sub Tasks **/
BatchQPTask task = new BatchQPTask(subTaskMap.size(), batchResult, subTaskMap, planIndexMap);
currentTask = task;
task.execute(this);
@@ -169,7 +171,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
/**
* Get group id from physical plan
*/
- public String getGroupIdFromPhysicalPlan(PhysicalPlan plan)
+ private String getGroupIdFromPhysicalPlan(PhysicalPlan plan)
throws PathErrorException, ProcessorException {
String storageGroup;
String groupId;
@@ -315,11 +317,11 @@ public class NonQueryExecutor extends ClusterQPExecutor {
BasicResponse response;
RaftService service;
if (groupId.equals(ClusterConfig.METADATA_GROUP_ID)) {
- response = MetaGroupNonQueryResponse.createEmptyInstance(groupId);
+ response = MetaGroupNonQueryResponse.createEmptyResponse(groupId);
MetadataRaftHolder metadataRaftHolder = RaftUtils.getMetadataRaftHolder();
service = (RaftService) metadataRaftHolder.getService();
} else {
- response = DataGroupNonQueryResponse.createEmptyInstance(groupId);
+ response = DataGroupNonQueryResponse.createEmptyResponse(groupId);
DataPartitionRaftHolder dataRaftHolder = RaftUtils.getDataPartitonRaftHolder(groupId);
service = (RaftService) dataRaftHolder.getService();
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index f138c12..42fb66c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -167,12 +167,12 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
for (int i = 0; i < taskList.size(); i++) {
SingleQPTask task = taskList.get(i);
task.await();
- QueryMetadataInStringResponse response = (QueryMetadataInStringResponse) task.getResponse();
- if (!response.isSuccess()) {
+ BasicResponse response = task.getResponse();
+ if (response == null || !response.isSuccess()) {
LOGGER.error("Execute show timeseries statement false.");
throw new ProcessorException();
}
- metadataList.add(response.getMetadata());
+ metadataList.add(((QueryMetadataInStringResponse)response).getMetadata());
}
return combineMetadataInStringList(metadataList);
}
@@ -191,13 +191,13 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
/** Check consistency level**/
if (readMetadataConsistencyLevel == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
QueryTimeSeriesResponse response = QueryTimeSeriesResponse
- .createEmptyInstance(groupId);
+ .createEmptyResponse(groupId);
try {
for (String path : pathList) {
response.addTimeSeries(mManager.getShowTimeseriesPath(path));
}
} catch (final PathErrorException e) {
- response = QueryTimeSeriesResponse.createErrorInstance(groupId, e.getMessage());
+ response = QueryTimeSeriesResponse.createErrorResponse(groupId, e.getMessage());
}
task.run(response);
} else {
@@ -207,7 +207,7 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
@Override
public void run(Status status, long index, byte[] reqCtx) {
QueryTimeSeriesResponse response = QueryTimeSeriesResponse
- .createEmptyInstance(groupId);
+ .createEmptyResponse(groupId);
if (status.isOk()) {
try {
LOGGER.debug("start to read");
@@ -215,11 +215,11 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
response.addTimeSeries(mManager.getShowTimeseriesPath(path));
}
} catch (final PathErrorException e) {
- response = QueryTimeSeriesResponse.createErrorInstance(groupId, e.getMessage());
+ response = QueryTimeSeriesResponse.createErrorResponse(groupId, e.getMessage());
}
} else {
response = QueryTimeSeriesResponse
- .createErrorInstance(groupId, status.getErrorMsg());
+ .createErrorResponse(groupId, status.getErrorMsg());
}
task.run(response);
}
@@ -237,7 +237,8 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
private List<List<String>> queryTimeSeries(SingleQPTask task, PeerId leader)
throws InterruptedException, RaftConnectionException {
BasicResponse response = asyncHandleNonQueryTaskGetRes(task, leader, 0);
- return ((QueryTimeSeriesResponse) response).getTimeSeries();
+ return response == null ? new ArrayList<>()
+ : ((QueryTimeSeriesResponse) response).getTimeSeries();
}
/**
@@ -255,9 +256,9 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
QueryStorageGroupResponse response;
try {
response = QueryStorageGroupResponse
- .createSuccessInstance(metadataHolder.getFsm().getAllStorageGroups());
+ .createSuccessResponse(metadataHolder.getFsm().getAllStorageGroups());
} catch (final PathErrorException e) {
- response = QueryStorageGroupResponse.createErrorInstance(e.getMessage());
+ response = QueryStorageGroupResponse.createErrorResponse(e.getMessage());
}
task.run(response);
} else {
@@ -270,12 +271,12 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
if (status.isOk()) {
try {
response = QueryStorageGroupResponse
- .createSuccessInstance(metadataHolder.getFsm().getAllStorageGroups());
+ .createSuccessResponse(metadataHolder.getFsm().getAllStorageGroups());
} catch (final PathErrorException e) {
- response = QueryStorageGroupResponse.createErrorInstance(e.getMessage());
+ response = QueryStorageGroupResponse.createErrorResponse(e.getMessage());
}
} else {
- response = QueryStorageGroupResponse.createErrorInstance(status.getErrorMsg());
+ response = QueryStorageGroupResponse.createErrorResponse(status.getErrorMsg());
}
task.run(response);
}
@@ -294,7 +295,7 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
.getDataPartitionHolder(groupId);
if (readMetadataConsistencyLevel == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
QueryMetadataInStringResponse response = QueryMetadataInStringResponse
- .createSuccessInstance(groupId, mManager.getMetadataInString());
+ .createSuccessResponse(groupId, mManager.getMetadataInString());
response.addResult(true);
task.run(response);
} else {
@@ -307,11 +308,11 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
if (status.isOk()) {
LOGGER.debug("start to read");
response = QueryMetadataInStringResponse
- .createSuccessInstance(groupId, mManager.getMetadataInString());
+ .createSuccessResponse(groupId, mManager.getMetadataInString());
response.addResult(true);
} else {
response = QueryMetadataInStringResponse
- .createErrorInstance(groupId, status.getErrorMsg());
+ .createErrorResponse(groupId, status.getErrorMsg());
response.addResult(false);
}
task.run(response);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/impl/RaftNodeAsClientManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/impl/RaftNodeAsClientManager.java
index 0033a9f..b827ef0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/impl/RaftNodeAsClientManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/impl/RaftNodeAsClientManager.java
@@ -193,8 +193,7 @@ public class RaftNodeAsClientManager {
LOGGER.error("Bolt rpc client occurs errors when handling Request", e);
qpTask.setTaskState(TaskState.EXCEPTION);
releaseClient();
- qpTask.run(DataGroupNonQueryResponse
- .createErrorInstance(request.getGroupID(), e.getMessage()));
+ qpTask.run(null);
}
@Override
@@ -206,8 +205,7 @@ public class RaftNodeAsClientManager {
LOGGER.error(e.getMessage());
qpTask.setTaskState(TaskState.EXCEPTION);
releaseClient();
- qpTask.run(DataGroupNonQueryResponse
- .createErrorInstance(request.getGroupID(), e.getMessage()));
+ qpTask.run(null);
throw new RaftConnectionException(e);
}
}
@@ -222,8 +220,7 @@ public class RaftNodeAsClientManager {
qpTask.run(response);
} catch (RemotingException | InterruptedException e) {
qpTask.setTaskState(TaskState.EXCEPTION);
- qpTask.run(DataGroupNonQueryResponse
- .createErrorInstance(request.getGroupID(), e.getMessage()));
+ qpTask.run(null);
throw new RaftConnectionException(e);
} finally {
releaseClient();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataGroupNonQueryAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataGroupNonQueryAsyncProcessor.java
index 7119e53..bc8d7cb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataGroupNonQueryAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataGroupNonQueryAsyncProcessor.java
@@ -55,13 +55,13 @@ public class DataGroupNonQueryAsyncProcessor extends
LOGGER.debug("Request need to redirect leader: {}, groupId : {} ", leader, groupId);
DataGroupNonQueryResponse response = DataGroupNonQueryResponse
- .createRedirectedInstance(groupId, leader.toString());
+ .createRedirectedResponse(groupId, leader.toString());
asyncContext.sendResponse(response);
} else {
LOGGER.debug("Apply task to raft node");
/** Apply Task to Raft Node **/
- BasicResponse response = DataGroupNonQueryResponse.createEmptyInstance(groupId);
+ BasicResponse response = DataGroupNonQueryResponse.createEmptyResponse(groupId);
RaftService service = (RaftService) dataPartitionRaftHolder.getService();
RaftUtils.executeRaftTaskForRpcProcessor(service, asyncContext, request, response);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetaGroupNonQueryAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetaGroupNonQueryAsyncProcessor.java
index 95d58c7..f38897e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetaGroupNonQueryAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetaGroupNonQueryAsyncProcessor.java
@@ -52,13 +52,13 @@ public class MetaGroupNonQueryAsyncProcessor extends
LOGGER.debug("Request need to redirect leader: {}, groupId : {} ", leader, groupId);
MetaGroupNonQueryResponse response = MetaGroupNonQueryResponse
- .createRedirectedInstance(groupId, leader.toString());
+ .createRedirectedResponse(groupId, leader.toString());
asyncContext.sendResponse(response);
} else {
LOGGER.debug("Apply task to metadata raft node");
/** Apply Task to Raft Node **/
- BasicResponse response = MetaGroupNonQueryResponse.createEmptyInstance(groupId);
+ BasicResponse response = MetaGroupNonQueryResponse.createEmptyResponse(groupId);
RaftService service = (RaftService) metadataHolder.getService();
RaftUtils.executeRaftTaskForRpcProcessor(service, asyncContext, request, response);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataInStringAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataInStringAsyncProcessor.java
index ae9ab2e..ceeea87 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataInStringAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataInStringAsyncProcessor.java
@@ -22,7 +22,6 @@ import com.alipay.remoting.AsyncContext;
import com.alipay.remoting.BizContext;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.closure.ReadIndexClosure;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
import org.apache.iotdb.cluster.entity.raft.RaftService;
@@ -45,7 +44,7 @@ public class QueryMetadataInStringAsyncProcessor extends
if (request.getReadConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
QueryMetadataInStringResponse response = QueryMetadataInStringResponse
- .createSuccessInstance(groupId, mManager.getMetadataInString());
+ .createSuccessResponse(groupId, mManager.getMetadataInString());
response.addResult(true);
asyncContext.sendResponse(response);
} else {
@@ -57,11 +56,11 @@ public class QueryMetadataInStringAsyncProcessor extends
QueryMetadataInStringResponse response;
if (status.isOk()) {
response = QueryMetadataInStringResponse
- .createSuccessInstance(groupId, mManager.getMetadataInString());
+ .createSuccessResponse(groupId, mManager.getMetadataInString());
response.addResult(true);
} else {
response = QueryMetadataInStringResponse
- .createErrorInstance(groupId, status.getErrorMsg());
+ .createErrorResponse(groupId, status.getErrorMsg());
response.addResult(false);
}
asyncContext.sendResponse(response);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryTimeSeriesAsyncProcessor.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryTimeSeriesAsyncProcessor.java
index 431933f..e564e57 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryTimeSeriesAsyncProcessor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryTimeSeriesAsyncProcessor.java
@@ -22,7 +22,6 @@ import com.alipay.remoting.AsyncContext;
import com.alipay.remoting.BizContext;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.closure.ReadIndexClosure;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
import org.apache.iotdb.cluster.entity.raft.RaftService;
@@ -46,11 +45,11 @@ public class QueryTimeSeriesAsyncProcessor extends BasicAsyncUserProcessor<Query
if (request.getReadConsistencyLevel() == ClusterConstant.WEAK_CONSISTENCY_LEVEL) {
QueryTimeSeriesResponse response = QueryTimeSeriesResponse
- .createEmptyInstance(groupId);
+ .createEmptyResponse(groupId);
try {
queryTimeSeries(request, response);
} catch (final PathErrorException e) {
- response = QueryTimeSeriesResponse.createErrorInstance(groupId, e.getMessage());
+ response = QueryTimeSeriesResponse.createErrorResponse(groupId, e.getMessage());
}
asyncContext.sendResponse(response);
} else {
@@ -60,16 +59,16 @@ public class QueryTimeSeriesAsyncProcessor extends BasicAsyncUserProcessor<Query
@Override
public void run(Status status, long index, byte[] reqCtx) {
QueryTimeSeriesResponse response = QueryTimeSeriesResponse
- .createEmptyInstance(groupId);
+ .createEmptyResponse(groupId);
if (status.isOk()) {
try {
queryTimeSeries(request, response);
} catch (final PathErrorException e) {
- response = QueryTimeSeriesResponse.createErrorInstance(groupId, e.getMessage());
+ response = QueryTimeSeriesResponse.createErrorResponse(groupId, e.getMessage());
}
} else {
response = QueryTimeSeriesResponse
- .createErrorInstance(groupId, status.getErrorMsg());
+ .createErrorResponse(groupId, status.getErrorMsg());
}
asyncContext.sendResponse(response);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataGroupNonQueryResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataGroupNonQueryResponse.java
index e63634d..505cc11 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataGroupNonQueryResponse.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataGroupNonQueryResponse.java
@@ -28,15 +28,15 @@ public class DataGroupNonQueryResponse extends BasicResponse {
super(groupId, redirected, leaderStr, errorMsg);
}
- public static DataGroupNonQueryResponse createRedirectedInstance(String groupId, String leaderStr) {
+ public static DataGroupNonQueryResponse createRedirectedResponse(String groupId, String leaderStr) {
return new DataGroupNonQueryResponse(groupId, true, leaderStr, null);
}
- public static DataGroupNonQueryResponse createEmptyInstance(String groupId) {
+ public static DataGroupNonQueryResponse createEmptyResponse(String groupId) {
return new DataGroupNonQueryResponse(groupId, false, null, null);
}
- public static DataGroupNonQueryResponse createErrorInstance(String groupId, String errorMsg) {
+ public static DataGroupNonQueryResponse createErrorResponse(String groupId, String errorMsg) {
return new DataGroupNonQueryResponse(groupId, false, null, errorMsg);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetaGroupNonQueryResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetaGroupNonQueryResponse.java
index ea8561f..131f7c1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetaGroupNonQueryResponse.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetaGroupNonQueryResponse.java
@@ -28,11 +28,11 @@ public class MetaGroupNonQueryResponse extends BasicResponse {
super(groupId, redirected, leaderStr, errorMsg);
}
- public static MetaGroupNonQueryResponse createRedirectedInstance(String groupId, String leaderStr) {
+ public static MetaGroupNonQueryResponse createRedirectedResponse(String groupId, String leaderStr) {
return new MetaGroupNonQueryResponse(groupId, true, leaderStr, null);
}
- public static MetaGroupNonQueryResponse createEmptyInstance(String groupId) {
+ public static MetaGroupNonQueryResponse createEmptyResponse(String groupId) {
return new MetaGroupNonQueryResponse(groupId, false, null, null);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/QueryMetadataInStringResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/QueryMetadataInStringResponse.java
index 5a291ef..ab5137b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/QueryMetadataInStringResponse.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/QueryMetadataInStringResponse.java
@@ -27,7 +27,7 @@ public class QueryMetadataInStringResponse extends BasicResponse {
super(groupId, redirected, leaderStr, errorMsg);
}
- public static QueryMetadataInStringResponse createSuccessInstance(String groupId,
+ public static QueryMetadataInStringResponse createSuccessResponse(String groupId,
String metadata) {
QueryMetadataInStringResponse response = new QueryMetadataInStringResponse(groupId, false, null,
null);
@@ -35,7 +35,7 @@ public class QueryMetadataInStringResponse extends BasicResponse {
return response;
}
- public static QueryMetadataInStringResponse createErrorInstance(String groupId, String errorMsg) {
+ public static QueryMetadataInStringResponse createErrorResponse(String groupId, String errorMsg) {
return new QueryMetadataInStringResponse(groupId, false, null, errorMsg);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/QueryStorageGroupResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/QueryStorageGroupResponse.java
index 788d3f4..d9c2061 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/QueryStorageGroupResponse.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/QueryStorageGroupResponse.java
@@ -29,13 +29,13 @@ public class QueryStorageGroupResponse extends BasicResponse {
this.addResult(success);
}
- public static QueryStorageGroupResponse createSuccessInstance(Set<String> storageGroups) {
+ public static QueryStorageGroupResponse createSuccessResponse(Set<String> storageGroups) {
QueryStorageGroupResponse response = new QueryStorageGroupResponse(true, null, null);
response.setStorageGroups(storageGroups);
return response;
}
- public static QueryStorageGroupResponse createErrorInstance(String errorMsg) {
+ public static QueryStorageGroupResponse createErrorResponse(String errorMsg) {
return new QueryStorageGroupResponse(false, null, errorMsg);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/QueryTimeSeriesResponse.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/QueryTimeSeriesResponse.java
index b0ca82b..ed42eea 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/QueryTimeSeriesResponse.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/QueryTimeSeriesResponse.java
@@ -31,11 +31,11 @@ public class QueryTimeSeriesResponse extends BasicResponse {
timeSeries = new ArrayList<>();
}
- public static QueryTimeSeriesResponse createEmptyInstance(String groupId){
+ public static QueryTimeSeriesResponse createEmptyResponse(String groupId){
return new QueryTimeSeriesResponse(groupId, false, true, null, null);
}
- public static QueryTimeSeriesResponse createErrorInstance(String groupId, String errorMsg) {
+ public static QueryTimeSeriesResponse createErrorResponse(String groupId, String errorMsg) {
return new QueryTimeSeriesResponse(groupId, false, false, null, errorMsg);
}
@@ -47,7 +47,4 @@ public class QueryTimeSeriesResponse extends BasicResponse {
this.timeSeries.addAll(timeSeries);
}
- public void setTimeSeries(List<List<String>> timeSeries) {
- this.timeSeries = timeSeries;
- }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index 3838938..08c252a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.cluster.entity.raft.RaftService;
import org.apache.iotdb.cluster.rpc.closure.ResponseClosure;
import org.apache.iotdb.cluster.rpc.request.BasicRequest;
import org.apache.iotdb.cluster.rpc.response.BasicResponse;
+import org.apache.iotdb.cluster.rpc.response.MetaGroupNonQueryResponse;
import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
import org.apache.iotdb.cluster.utils.hash.Router;
import org.slf4j.Logger;
@@ -262,8 +263,8 @@ public class RaftUtils {
.readIndex(reqContext, new ReadIndexClosure() {
@Override
public void run(Status status, long index, byte[] reqCtx) {
- BasicResponse response = new BasicResponse(null, false, null, null) {
- };
+ BasicResponse response = MetaGroupNonQueryResponse
+ .createEmptyResponse(ClusterConfig.METADATA_GROUP_ID);
if (!status.isOk()) {
status.setCode(-1);
status.setErrorMsg(status.getErrorMsg());
diff --git a/iotdb/iotdb/conf/iotdb-cluster.properties b/iotdb/iotdb/conf/iotdb-cluster.properties
index dab4c7c..06ba43f 100644
--- a/iotdb/iotdb/conf/iotdb-cluster.properties
+++ b/iotdb/iotdb/conf/iotdb-cluster.properties
@@ -51,7 +51,7 @@ delay_snapshot = false
delay_hours = 2
# Count limit to redo a single task
-task_redo_count = 3
+task_redo_count = 10
# Timeout limit for a single task, the unit is milliseconds
task_timeout_ms = 1000
@@ -60,11 +60,11 @@ task_timeout_ms = 1000
num_of_virtual_nodes = 2
# Max number of use inner rpc client
-max_num_of_inner_rpc_client = 50
+max_num_of_inner_rpc_client = 500
# Max number of queue length to use inner rpc client, the request which exceed to this
# number will be rejected.
-max_queue_num_of_inner_rpc_client = 50
+max_queue_num_of_inner_rpc_client = 500
# ReadMetadataConsistencyLevel: 1 Strong consistency, 2 Weak consistency
read_metadata_consistency_level = 1