You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/16 12:27:04 UTC

[incubator-iotdb] 13/19: merge conflicts

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_read
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit c0aea2c11275a7b3611023bef24d54c574d230b1
Merge: fda20cc e285a9d
Author: lta <li...@163.com>
AuthorDate: Mon Apr 15 22:50:57 2019 +0800

    merge conflicts

 .../ThreadName.java}                               |  23 ++--
 .../cluster/concurrent/pool/QPTaskManager.java     |  66 ++++++++++++
 .../cluster/concurrent/pool/ThreadPoolManager.java | 102 ++++++++++++++++++
 .../apache/iotdb/cluster/config/ClusterConfig.java |  58 ++++++----
 .../iotdb/cluster/config/ClusterConstant.java      |  11 +-
 .../iotdb/cluster/config/ClusterDescriptor.java    |  19 ++--
 .../org/apache/iotdb/cluster/entity/Server.java    |  12 ++-
 .../cluster/entity/raft/DataStateMachine.java      | 117 +++++++++++----------
 .../cluster/entity/raft/MetadataStateManchine.java | 108 ++++++++++---------
 .../iotdb/cluster/entity/raft/RaftService.java     |  15 ---
 .../cluster/qp/executor/AbstractQPExecutor.java    |   2 +-
 .../cluster/qp/executor/NonQueryExecutor.java      |  52 +++++----
 .../apache/iotdb/cluster/qp/task/BatchQPTask.java  |  44 ++++----
 .../apache/iotdb/cluster/qp/task/MultiQPTask.java  |  14 +--
 .../rpc/raft/impl/RaftNodeAsClientManager.java     |   2 +-
 .../org/apache/iotdb/cluster/utils/RaftUtils.java  |   7 ++
 .../cluster/concurrent/pool/QPTaskManagerTest.java |  85 +++++++++++++++
 .../cluster/config/ClusterDescriptorTest.java      |  26 +++--
 .../cluster/utils/ClusterConfigureGenerator.java   |   4 +-
 iotdb/iotdb/conf/iotdb-cluster.properties          |  26 +++--
 .../org/apache/iotdb/db/concurrent/ThreadName.java |   2 +-
 .../apache/iotdb/db/engine/pool/FlushManager.java  |   6 +-
 .../apache/iotdb/db/engine/pool/MergeManager.java  |   8 +-
 23 files changed, 570 insertions(+), 239 deletions(-)

diff --cc cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index 071c096,c8c2a9b..f90bc66
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@@ -38,6 -34,10 +34,9 @@@ import org.apache.iotdb.cluster.entity.
  import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
  import org.apache.iotdb.cluster.entity.raft.RaftService;
  import org.apache.iotdb.cluster.exception.RaftConnectionException;
 -import org.apache.iotdb.cluster.qp.ClusterQPExecutor;
 -import org.apache.iotdb.cluster.qp.callback.BatchQPTask;
 -import org.apache.iotdb.cluster.qp.callback.QPTask;
 -import org.apache.iotdb.cluster.qp.callback.SingleQPTask;
++import org.apache.iotdb.cluster.qp.task.BatchQPTask;
++import org.apache.iotdb.cluster.qp.task.QPTask;
++import org.apache.iotdb.cluster.qp.task.SingleQPTask;
  import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
  import org.apache.iotdb.cluster.rpc.raft.request.DataGroupNonQueryRequest;
  import org.apache.iotdb.cluster.rpc.raft.request.MetaGroupNonQueryRequest;
@@@ -110,6 -109,27 +108,27 @@@ public class NonQueryExecutor extends A
      /** 1. Classify physical plans by group id **/
      Map<String, List<PhysicalPlan>> physicalPlansMap = new HashMap<>();
      Map<String, List<Integer>> planIndexMap = new HashMap<>();
+     classifyPhysicalPlanByGroupId(physicalPlans, batchResult, physicalPlansMap, planIndexMap);
+ 
+     /** 2. Construct Multiple Data Group Requests **/
 -    Map<String, QPTask> subTaskMap = new HashMap<>();
++    Map<String, SingleQPTask> subTaskMap = new HashMap<>();
+     constructMultipleRequests(physicalPlansMap, planIndexMap, subTaskMap, batchResult);
+ 
+     /** 3. Execute Multiple Sub Tasks **/
+     BatchQPTask task = new BatchQPTask(subTaskMap.size(), batchResult, subTaskMap, planIndexMap);
 -    currentTask = task;
++    currentTask.set(task);
+     task.execute(this);
+     task.await();
+     batchResult.setAllSuccessful(task.isAllSuccessful());
+     batchResult.setBatchErrorMessage(task.getBatchErrorMessage());
+   }
+ 
+   /**
+    * Classify batch physical plan by groupId
+    */
+   private void classifyPhysicalPlanByGroupId(PhysicalPlan[] physicalPlans, BatchResult batchResult,
+       Map<String, List<PhysicalPlan>> physicalPlansMap, Map<String, List<Integer>> planIndexMap) {
+     int[] result = batchResult.getResult();
      for (int i = 0; i < result.length; i++) {
        /** Check if the request has failed. If it has failed, ignore it. **/
        if (result[i] != Statement.EXECUTE_FAILED) {
@@@ -139,9 -159,15 +158,15 @@@
          }
        }
      }
+   }
  
-     /** 2. Construct Multiple Data Group Requests **/
-     Map<String, SingleQPTask> subTaskMap = new HashMap<>();
+   /**
+    * Construct multiple data group requests
+    */
+   private void constructMultipleRequests(Map<String, List<PhysicalPlan>> physicalPlansMap,
 -      Map<String, List<Integer>> planIndexMap, Map<String, QPTask> subTaskMap,
++      Map<String, List<Integer>> planIndexMap, Map<String, SingleQPTask> subTaskMap,
+       BatchResult batchResult) {
+     int[] result = batchResult.getResult();
      for (Entry<String, List<PhysicalPlan>> entry : physicalPlansMap.entrySet()) {
        String groupId = entry.getKey();
        SingleQPTask singleQPTask;
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
index 23b84c1,2706388..67c7362
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
@@@ -71,9 -73,9 +73,9 @@@ public class BatchQPTask extends MultiQ
    private NonQueryExecutor executor;
  
  
 -  public BatchQPTask(int taskNum, BatchResult batchResult, Map<String, QPTask> taskMap,
 +  public BatchQPTask(int taskNum, BatchResult batchResult, Map<String, SingleQPTask> taskMap,
        Map<String, List<Integer>> planIndexMap) {
-     super(false, taskNum, TaskState.INITIAL, TaskType.BATCH);
+     super(false, taskNum, TaskType.BATCH);
      this.batchResult = batchResult.getResult();
      this.isAllSuccessful = batchResult.isAllSuccessful();
      this.batchErrorMessage = batchResult.getBatchErrorMessage();
@@@ -115,19 -117,19 +117,19 @@@
    public void execute(NonQueryExecutor executor) {
      this.executor = executor;
  
 -    for (Entry<String, QPTask> entry : taskMap.entrySet()) {
 +    for (Entry<String, SingleQPTask> entry : taskMap.entrySet()) {
        String groupId = entry.getKey();
 -      QPTask subTask = entry.getValue();
 +      SingleQPTask subTask = entry.getValue();
-       Thread thread;
+       Future<?> taskThread;
        if (executor.canHandleNonQueryByGroupId(groupId)) {
-         thread = new Thread(() -> executeLocalSubTask(subTask, groupId));
-         thread.start();
+         taskThread = QPTaskManager.getInstance()
+             .submit(() -> executeLocalSubTask(subTask, groupId));
        } else {
          PeerId leader = RaftUtils.getLeaderPeerID(groupId);
-         thread = new Thread(() -> executeRpcSubTask(subTask, leader, groupId));
-         thread.start();
+         taskThread = QPTaskManager.getInstance()
+             .submit(() -> executeRpcSubTask(subTask, leader, groupId));
        }
-       taskThreadMap.put(groupId, thread);
+       taskThreadMap.put(groupId, taskThread);
      }
    }
  
@@@ -147,15 -149,15 +149,15 @@@
    /**
     * Execute RPC sub task
     */
 -  private void executeRpcSubTask(QPTask subTask, PeerId leader, String groupId) {
 +  private void executeRpcSubTask(SingleQPTask subTask, PeerId leader, String groupId) {
-       try {
-         executor.asyncHandleNonQueryTask(subTask, leader);
-         this.run(subTask.getResponse());
-       } catch (RaftConnectionException | InterruptedException e) {
-         LOGGER.error("Async handle sub task failed.");
-         this.run(DataGroupNonQueryResponse.createErrorResponse(groupId, e.getMessage()));
-       }
+     try {
+       executor.asyncHandleNonQueryTask(subTask, leader);
+       this.run(subTask.getResponse());
+     } catch (RaftConnectionException | InterruptedException e) {
+       LOGGER.error("Async handle sub task failed.");
+       this.run(DataGroupNonQueryResponse.createErrorResponse(groupId, e.getMessage()));
      }
+   }
  
    public boolean isAllSuccessful() {
      return isAllSuccessful;
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/qp/task/MultiQPTask.java
index 86e2b74,f400eaf..de23087
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/MultiQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/MultiQPTask.java
@@@ -16,35 -16,35 +16,37 @@@
   * specific language governing permissions and limitations
   * under the License.
   */
 -package org.apache.iotdb.cluster.qp.callback;
 +package org.apache.iotdb.cluster.qp.task;
  
  import java.util.Map;
+ import java.util.concurrent.Future;
  
  public abstract class MultiQPTask extends QPTask {
  
    /**
     * Each request is corresponding to a group id. String: group id
     */
 -  Map<String, QPTask> taskMap;
 +  Map<String, SingleQPTask> taskMap;
  
    /**
-    * Task thread map
+    * Each future task handle a request in taskMap, which is corresponding to a group id. String:
+    * group id
     */
-   Map<String, Thread> taskThreadMap;
+   Map<String, Future<?>> taskThreadMap;
  
-   public MultiQPTask(boolean isSyncTask, int taskNum, TaskState taskState, TaskType taskType) {
+   public MultiQPTask(boolean isSyncTask, int taskNum, TaskType taskType) {
      super(isSyncTask, taskNum, TaskState.INITIAL, taskType);
    }
  
    @Override
    public void shutdown() {
-     for (Thread taskThread : taskThreadMap.values()) {
-       if (taskThread.isAlive() && !taskThread.isInterrupted()) {
-         taskThread.interrupt();
+     for (Future<?> task : taskThreadMap.values()) {
+       if (!task.isDone()) {
+         task.cancel(true);
        }
      }
 -    this.taskCountDownLatch.countDown();
 +    while(taskCountDownLatch.getCount()!=0) {
 +      this.taskCountDownLatch.countDown();
 +    }
    }
  }