You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/16 12:27:04 UTC
[incubator-iotdb] 13/19: merge conflicts
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster_read
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit c0aea2c11275a7b3611023bef24d54c574d230b1
Merge: fda20cc e285a9d
Author: lta <li...@163.com>
AuthorDate: Mon Apr 15 22:50:57 2019 +0800
merge conflicts
.../ThreadName.java} | 23 ++--
.../cluster/concurrent/pool/QPTaskManager.java | 66 ++++++++++++
.../cluster/concurrent/pool/ThreadPoolManager.java | 102 ++++++++++++++++++
.../apache/iotdb/cluster/config/ClusterConfig.java | 58 ++++++----
.../iotdb/cluster/config/ClusterConstant.java | 11 +-
.../iotdb/cluster/config/ClusterDescriptor.java | 19 ++--
.../org/apache/iotdb/cluster/entity/Server.java | 12 ++-
.../cluster/entity/raft/DataStateMachine.java | 117 +++++++++++----------
.../cluster/entity/raft/MetadataStateManchine.java | 108 ++++++++++---------
.../iotdb/cluster/entity/raft/RaftService.java | 15 ---
.../cluster/qp/executor/AbstractQPExecutor.java | 2 +-
.../cluster/qp/executor/NonQueryExecutor.java | 52 +++++----
.../apache/iotdb/cluster/qp/task/BatchQPTask.java | 44 ++++----
.../apache/iotdb/cluster/qp/task/MultiQPTask.java | 14 +--
.../rpc/raft/impl/RaftNodeAsClientManager.java | 2 +-
.../org/apache/iotdb/cluster/utils/RaftUtils.java | 7 ++
.../cluster/concurrent/pool/QPTaskManagerTest.java | 85 +++++++++++++++
.../cluster/config/ClusterDescriptorTest.java | 26 +++--
.../cluster/utils/ClusterConfigureGenerator.java | 4 +-
iotdb/iotdb/conf/iotdb-cluster.properties | 26 +++--
.../org/apache/iotdb/db/concurrent/ThreadName.java | 2 +-
.../apache/iotdb/db/engine/pool/FlushManager.java | 6 +-
.../apache/iotdb/db/engine/pool/MergeManager.java | 8 +-
23 files changed, 570 insertions(+), 239 deletions(-)
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index 071c096,c8c2a9b..f90bc66
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@@ -38,6 -34,10 +34,9 @@@ import org.apache.iotdb.cluster.entity.
import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
import org.apache.iotdb.cluster.entity.raft.RaftService;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
-import org.apache.iotdb.cluster.qp.ClusterQPExecutor;
-import org.apache.iotdb.cluster.qp.callback.BatchQPTask;
-import org.apache.iotdb.cluster.qp.callback.QPTask;
-import org.apache.iotdb.cluster.qp.callback.SingleQPTask;
++import org.apache.iotdb.cluster.qp.task.BatchQPTask;
++import org.apache.iotdb.cluster.qp.task.QPTask;
++import org.apache.iotdb.cluster.qp.task.SingleQPTask;
import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
import org.apache.iotdb.cluster.rpc.raft.request.DataGroupNonQueryRequest;
import org.apache.iotdb.cluster.rpc.raft.request.MetaGroupNonQueryRequest;
@@@ -110,6 -109,27 +108,27 @@@ public class NonQueryExecutor extends A
/** 1. Classify physical plans by group id **/
Map<String, List<PhysicalPlan>> physicalPlansMap = new HashMap<>();
Map<String, List<Integer>> planIndexMap = new HashMap<>();
+ classifyPhysicalPlanByGroupId(physicalPlans, batchResult, physicalPlansMap, planIndexMap);
+
+ /** 2. Construct Multiple Data Group Requests **/
- Map<String, QPTask> subTaskMap = new HashMap<>();
++ Map<String, SingleQPTask> subTaskMap = new HashMap<>();
+ constructMultipleRequests(physicalPlansMap, planIndexMap, subTaskMap, batchResult);
+
+ /** 3. Execute Multiple Sub Tasks **/
+ BatchQPTask task = new BatchQPTask(subTaskMap.size(), batchResult, subTaskMap, planIndexMap);
- currentTask = task;
++ currentTask.set(task);
+ task.execute(this);
+ task.await();
+ batchResult.setAllSuccessful(task.isAllSuccessful());
+ batchResult.setBatchErrorMessage(task.getBatchErrorMessage());
+ }
+
+ /**
+ * Classify batch physical plan by groupId
+ */
+ private void classifyPhysicalPlanByGroupId(PhysicalPlan[] physicalPlans, BatchResult batchResult,
+ Map<String, List<PhysicalPlan>> physicalPlansMap, Map<String, List<Integer>> planIndexMap) {
+ int[] result = batchResult.getResult();
for (int i = 0; i < result.length; i++) {
/** Check if the request has failed. If it has failed, ignore it. **/
if (result[i] != Statement.EXECUTE_FAILED) {
@@@ -139,9 -159,15 +158,15 @@@
}
}
}
+ }
- /** 2. Construct Multiple Data Group Requests **/
- Map<String, SingleQPTask> subTaskMap = new HashMap<>();
+ /**
+ * Construct multiple data group requests
+ */
+ private void constructMultipleRequests(Map<String, List<PhysicalPlan>> physicalPlansMap,
- Map<String, List<Integer>> planIndexMap, Map<String, QPTask> subTaskMap,
++ Map<String, List<Integer>> planIndexMap, Map<String, SingleQPTask> subTaskMap,
+ BatchResult batchResult) {
+ int[] result = batchResult.getResult();
for (Entry<String, List<PhysicalPlan>> entry : physicalPlansMap.entrySet()) {
String groupId = entry.getKey();
SingleQPTask singleQPTask;
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
index 23b84c1,2706388..67c7362
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
@@@ -71,9 -73,9 +73,9 @@@ public class BatchQPTask extends MultiQ
private NonQueryExecutor executor;
- public BatchQPTask(int taskNum, BatchResult batchResult, Map<String, QPTask> taskMap,
+ public BatchQPTask(int taskNum, BatchResult batchResult, Map<String, SingleQPTask> taskMap,
Map<String, List<Integer>> planIndexMap) {
- super(false, taskNum, TaskState.INITIAL, TaskType.BATCH);
+ super(false, taskNum, TaskType.BATCH);
this.batchResult = batchResult.getResult();
this.isAllSuccessful = batchResult.isAllSuccessful();
this.batchErrorMessage = batchResult.getBatchErrorMessage();
@@@ -115,19 -117,19 +117,19 @@@
public void execute(NonQueryExecutor executor) {
this.executor = executor;
- for (Entry<String, QPTask> entry : taskMap.entrySet()) {
+ for (Entry<String, SingleQPTask> entry : taskMap.entrySet()) {
String groupId = entry.getKey();
- QPTask subTask = entry.getValue();
+ SingleQPTask subTask = entry.getValue();
- Thread thread;
+ Future<?> taskThread;
if (executor.canHandleNonQueryByGroupId(groupId)) {
- thread = new Thread(() -> executeLocalSubTask(subTask, groupId));
- thread.start();
+ taskThread = QPTaskManager.getInstance()
+ .submit(() -> executeLocalSubTask(subTask, groupId));
} else {
PeerId leader = RaftUtils.getLeaderPeerID(groupId);
- thread = new Thread(() -> executeRpcSubTask(subTask, leader, groupId));
- thread.start();
+ taskThread = QPTaskManager.getInstance()
+ .submit(() -> executeRpcSubTask(subTask, leader, groupId));
}
- taskThreadMap.put(groupId, thread);
+ taskThreadMap.put(groupId, taskThread);
}
}
@@@ -147,15 -149,15 +149,15 @@@
/**
* Execute RPC sub task
*/
- private void executeRpcSubTask(QPTask subTask, PeerId leader, String groupId) {
+ private void executeRpcSubTask(SingleQPTask subTask, PeerId leader, String groupId) {
- try {
- executor.asyncHandleNonQueryTask(subTask, leader);
- this.run(subTask.getResponse());
- } catch (RaftConnectionException | InterruptedException e) {
- LOGGER.error("Async handle sub task failed.");
- this.run(DataGroupNonQueryResponse.createErrorResponse(groupId, e.getMessage()));
- }
+ try {
+ executor.asyncHandleNonQueryTask(subTask, leader);
+ this.run(subTask.getResponse());
+ } catch (RaftConnectionException | InterruptedException e) {
+ LOGGER.error("Async handle sub task failed.");
+ this.run(DataGroupNonQueryResponse.createErrorResponse(groupId, e.getMessage()));
}
+ }
public boolean isAllSuccessful() {
return isAllSuccessful;
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/qp/task/MultiQPTask.java
index 86e2b74,f400eaf..de23087
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/MultiQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/MultiQPTask.java
@@@ -16,35 -16,35 +16,37 @@@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.qp.callback;
+package org.apache.iotdb.cluster.qp.task;
import java.util.Map;
+ import java.util.concurrent.Future;
public abstract class MultiQPTask extends QPTask {
/**
* Each request is corresponding to a group id. String: group id
*/
- Map<String, QPTask> taskMap;
+ Map<String, SingleQPTask> taskMap;
/**
- * Task thread map
+ * Each future task handle a request in taskMap, which is corresponding to a group id. String:
+ * group id
*/
- Map<String, Thread> taskThreadMap;
+ Map<String, Future<?>> taskThreadMap;
- public MultiQPTask(boolean isSyncTask, int taskNum, TaskState taskState, TaskType taskType) {
+ public MultiQPTask(boolean isSyncTask, int taskNum, TaskType taskType) {
super(isSyncTask, taskNum, TaskState.INITIAL, taskType);
}
@Override
public void shutdown() {
- for (Thread taskThread : taskThreadMap.values()) {
- if (taskThread.isAlive() && !taskThread.isInterrupted()) {
- taskThread.interrupt();
+ for (Future<?> task : taskThreadMap.values()) {
+ if (!task.isDone()) {
+ task.cancel(true);
}
}
- this.taskCountDownLatch.countDown();
+ while(taskCountDownLatch.getCount()!=0) {
+ this.taskCountDownLatch.countDown();
+ }
}
}