You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/16 12:26:58 UTC

[incubator-iotdb] 07/19: remove threadlocal in TSServiceClusterImpl

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_read
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit ba0389fca23c4426cb628546ef40206a479349f8
Author: lta <li...@163.com>
AuthorDate: Mon Apr 15 13:55:05 2019 +0800

    remove threadlocal in TSServiceClusterImpl
---
 ...sterQPExecutor.java => AbstractQPExecutor.java} | 10 ++---
 .../cluster/qp/executor/NonQueryExecutor.java      |  7 ++--
 .../cluster/qp/executor/QueryMetadataExecutor.java |  2 +-
 .../apache/iotdb/cluster/qp/task/MultiQPTask.java  |  4 +-
 .../apache/iotdb/cluster/qp/task/SingleQPTask.java |  4 +-
 .../executor/ClusterQueryRouter.java               |  2 +-
 .../executor/QueryProcessorExecutor.java           |  2 +-
 .../cluster/service/TSServiceClusterImpl.java      | 44 ++++++++--------------
 ...ecutorTest.java => AbstractQPExecutorTest.java} |  7 +---
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  7 +---
 10 files changed, 37 insertions(+), 52 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractClusterQPExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
similarity index 98%
rename from cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractClusterQPExecutor.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
index 1a3cfe3..654d13c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractClusterQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
@@ -45,9 +45,9 @@ import org.apache.iotdb.db.metadata.MManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class AbstractClusterQPExecutor {
+public abstract class AbstractQPExecutor {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(AbstractClusterQPExecutor.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(AbstractQPExecutor.class);
 
   private static final ClusterConfig CLUSTER_CONFIG = ClusterDescriptor.getInstance().getConfig();
 
@@ -69,7 +69,7 @@ public abstract class AbstractClusterQPExecutor {
   /**
    * The task in progress.
    */
-  protected QPTask currentTask;
+  protected ThreadLocal<QPTask> currentTask;
 
   /**
    * Count limit to redo a single task
@@ -235,8 +235,8 @@ public abstract class AbstractClusterQPExecutor {
   }
 
   public void shutdown() {
-    if (currentTask != null) {
-      currentTask.shutdown();
+    if (currentTask.get() != null) {
+      currentTask.get().shutdown();
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index f9fa3a9..071c096 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import org.apache.iotdb.cluster.qp.task.BatchQPTask;
 import org.apache.iotdb.cluster.qp.task.QPTask;
 import org.apache.iotdb.cluster.qp.task.SingleQPTask;
@@ -60,7 +61,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Handle distributed non-query logic
  */
-public class NonQueryExecutor extends AbstractClusterQPExecutor {
+public class NonQueryExecutor extends AbstractQPExecutor {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(NonQueryExecutor.class);
 
@@ -162,7 +163,7 @@ public class NonQueryExecutor extends AbstractClusterQPExecutor {
 
     /** 3. Execute Multiple Sub Tasks **/
     BatchQPTask task = new BatchQPTask(subTaskMap.size(), batchResult, subTaskMap, planIndexMap);
-    currentTask = task;
+    currentTask.set(task);
     task.execute(this);
     task.await();
     batchResult.setAllSuccessful(task.isAllSuccessful());
@@ -299,7 +300,7 @@ public class NonQueryExecutor extends AbstractClusterQPExecutor {
       request = new DataGroupNonQueryRequest(groupId, plans);
     }
     SingleQPTask qpTask = new SingleQPTask(false, request);
-    currentTask = qpTask;
+    currentTask.set(qpTask);
 
     /** Check if the plan can be executed locally. **/
     if (canHandleNonQueryByGroupId(groupId)) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 73e3181..f1098f8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Handle < show timeseries <path> > logic
  */
-public class QueryMetadataExecutor extends AbstractClusterQPExecutor {
+public class QueryMetadataExecutor extends AbstractQPExecutor {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(QueryMetadataExecutor.class);
   private static final String DOUB_SEPARATOR = "\\.";
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/MultiQPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/MultiQPTask.java
index f53a2a2..86e2b74 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/MultiQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/MultiQPTask.java
@@ -43,6 +43,8 @@ public abstract class MultiQPTask extends QPTask {
         taskThread.interrupt();
       }
     }
-    this.taskCountDownLatch.countDown();
+    while(taskCountDownLatch.getCount()!=0) {
+      this.taskCountDownLatch.countDown();
+    }
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java
index 7896ca3..805834e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java
@@ -57,6 +57,8 @@ public class SingleQPTask extends QPTask {
 
   @Override
   public void shutdown() {
-    this.taskCountDownLatch.countDown();
+    if (taskCountDownLatch.getCount() != 0) {
+      this.taskCountDownLatch.countDown();
+    }
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryRouter.java
similarity index 96%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryRouter.java
index 0f797fc..8fc030a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryRouter.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.cluster.query.executor;
+package org.apache.iotdb.cluster.query.coordinatornode.executor;
 
 import java.io.IOException;
 import java.util.List;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/QueryProcessorExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/QueryProcessorExecutor.java
similarity index 97%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/executor/QueryProcessorExecutor.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/QueryProcessorExecutor.java
index ecd5f61..8531512 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/QueryProcessorExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/QueryProcessorExecutor.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.cluster.query.executor;
+package org.apache.iotdb.cluster.query.coordinatornode.executor;
 
 import java.io.IOException;
 import java.util.List;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
index 7d6516e..3a1561e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.cluster.service;
 
-import com.alipay.sofa.jraft.util.OnlyForTest;
 import java.io.IOException;
 import java.sql.Statement;
 import java.util.Arrays;
@@ -30,7 +29,7 @@ import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.exception.ConsistencyLevelException;
 import org.apache.iotdb.cluster.qp.executor.NonQueryExecutor;
 import org.apache.iotdb.cluster.qp.executor.QueryMetadataExecutor;
-import org.apache.iotdb.cluster.query.executor.QueryProcessorExecutor;
+import org.apache.iotdb.cluster.query.coordinatornode.executor.QueryProcessorExecutor;
 import org.apache.iotdb.db.auth.AuthException;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.exception.PathErrorException;
@@ -56,20 +55,14 @@ public class TSServiceClusterImpl extends TSServiceImpl {
   private static final Logger LOGGER = LoggerFactory.getLogger(TSServiceClusterImpl.class);
 
   private QueryProcessor processor = new QueryProcessor(new QueryProcessorExecutor());
-  private ThreadLocal<NonQueryExecutor> nonQueryExecutor = new ThreadLocal<>();
-  private ThreadLocal<QueryMetadataExecutor> queryMetadataExecutor = new ThreadLocal<>();
+  private NonQueryExecutor nonQueryExecutor = new NonQueryExecutor();
+  private QueryMetadataExecutor queryMetadataExecutor = new QueryMetadataExecutor();
 
   public TSServiceClusterImpl() throws IOException {
     super();
   }
 
   @Override
-  public void initClusterService() {
-    nonQueryExecutor.set(new NonQueryExecutor());
-    queryMetadataExecutor.set(new QueryMetadataExecutor());
-  }
-
-  @Override
   public TSExecuteBatchStatementResp executeBatchStatement(TSExecuteBatchStatementReq req)
       throws TException {
     try {
@@ -99,7 +92,7 @@ public class TSServiceClusterImpl extends TSServiceImpl {
             result = resultTemp;
             physicalPlans = physicalPlansTemp;
             BatchResult batchResult = new BatchResult(isAllSuccessful, batchErrorMessage, result);
-            nonQueryExecutor.get().processBatch(physicalPlans, batchResult);
+            nonQueryExecutor.processBatch(physicalPlans, batchResult);
             return getTSBathExecuteStatementResp(TS_StatusCode.ERROR_STATUS,
                 "statement is query :" + statements.get(i), Arrays.stream(result).boxed().collect(
                     Collectors.toList()));
@@ -133,7 +126,7 @@ public class TSServiceClusterImpl extends TSServiceImpl {
       }
 
       BatchResult batchResult = new BatchResult(isAllSuccessful, batchErrorMessage, result);
-      nonQueryExecutor.get().processBatch(physicalPlans, batchResult);
+      nonQueryExecutor.processBatch(physicalPlans, batchResult);
       batchErrorMessage = batchResult.batchErrorMessage;
       isAllSuccessful = batchResult.isAllSuccessful;
 
@@ -202,13 +195,13 @@ public class TSServiceClusterImpl extends TSServiceImpl {
       if (Pattern.matches(ClusterConstant.SET_READ_METADATA_CONSISTENCY_LEVEL_PATTERN, statement)) {
         String[] splits = statement.split("\\s+");
         int level = Integer.parseInt(splits[splits.length - 1]);
-        nonQueryExecutor.get().setReadMetadataConsistencyLevel(level);
+        nonQueryExecutor.setReadMetadataConsistencyLevel(level);
         return true;
       } else if (Pattern
           .matches(ClusterConstant.SET_READ_DATA_CONSISTENCY_LEVEL_PATTERN, statement)) {
         String[] splits = statement.split("\\s+");
         int level = Integer.parseInt(splits[splits.length - 1]);
-        nonQueryExecutor.get().setReadDataConsistencyLevel(level);
+        nonQueryExecutor.setReadDataConsistencyLevel(level);
         return true;
       } else {
         return false;
@@ -220,7 +213,7 @@ public class TSServiceClusterImpl extends TSServiceImpl {
 
   @Override
   protected boolean executeNonQuery(PhysicalPlan plan) throws ProcessorException {
-    return nonQueryExecutor.get().processNonQuery(plan);
+    return nonQueryExecutor.processNonQuery(plan);
   }
 
   /**
@@ -228,46 +221,41 @@ public class TSServiceClusterImpl extends TSServiceImpl {
    */
   @Override
   public void closeClusterService() {
-    nonQueryExecutor.get().shutdown();
-    queryMetadataExecutor.get().shutdown();
+    nonQueryExecutor.shutdown();
+    queryMetadataExecutor.shutdown();
   }
 
   @Override
   protected Set<String> getAllStorageGroups() throws InterruptedException {
-    return queryMetadataExecutor.get().processStorageGroupQuery();
+    return queryMetadataExecutor.processStorageGroupQuery();
   }
 
   @Override
   protected List<List<String>> getTimeSeriesForPath(String path)
       throws PathErrorException, InterruptedException, ProcessorException {
-    return queryMetadataExecutor.get().processTimeSeriesQuery(path);
+    return queryMetadataExecutor.processTimeSeriesQuery(path);
   }
 
   @Override
   protected String getMetadataInString()
       throws InterruptedException, ProcessorException {
-    return queryMetadataExecutor.get().processMetadataInStringQuery();
+    return queryMetadataExecutor.processMetadataInStringQuery();
   }
 
   @Override
   protected Metadata getMetadata()
       throws InterruptedException, ProcessorException, PathErrorException {
-    return queryMetadataExecutor.get().processMetadataQuery();
+    return queryMetadataExecutor.processMetadataQuery();
   }
 
   @Override
   protected TSDataType getSeriesType(String path) throws PathErrorException, InterruptedException, ProcessorException {
-    return queryMetadataExecutor.get().processSeriesTypeQuery(path);
+    return queryMetadataExecutor.processSeriesTypeQuery(path);
   }
 
   @Override
   protected List<String> getPaths(String path)
       throws PathErrorException, InterruptedException, ProcessorException {
-    return queryMetadataExecutor.get().processPathsQuery(path);
-  }
-
-  @OnlyForTest
-  public NonQueryExecutor getNonQueryExecutor() {
-    return nonQueryExecutor.get();
+    return queryMetadataExecutor.processPathsQuery(path);
   }
 }
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/qp/AbstractClusterQPExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/qp/AbstractQPExecutorTest.java
similarity index 96%
rename from cluster/src/test/java/org/apache/iotdb/cluster/qp/AbstractClusterQPExecutorTest.java
rename to cluster/src/test/java/org/apache/iotdb/cluster/qp/AbstractQPExecutorTest.java
index 7959423..e9dd05a 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/qp/AbstractClusterQPExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/qp/AbstractQPExecutorTest.java
@@ -31,21 +31,18 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-public class AbstractClusterQPExecutorTest {
+public class AbstractQPExecutorTest {
 
   private static final ClusterConfig CLUSTER_CONFIG = ClusterDescriptor.getInstance().getConfig();
 
   private TSServiceClusterImpl impl;
 
-  private NonQueryExecutor executor;
+  private NonQueryExecutor executor = new NonQueryExecutor();
 
   @Before
   public void setUp() throws Exception {
     EnvironmentUtils.envSetUp();
     impl = new TSServiceClusterImpl();
-    impl.initClusterService();
-
-    executor = impl.getNonQueryExecutor();
   }
 
   @After
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index c0e711b..fd1a994 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -118,9 +118,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     LOGGER.info("{}: receive open session request from username {}", IoTDBConstant.GLOBAL_DB_NAME,
         req.getUsername());
 
-    initClusterService();
     boolean status;
-    IAuthorizer authorizer = null;
+    IAuthorizer authorizer;
     try {
       authorizer = LocalFileAuthorizer.getInstance();
     } catch (AuthException e) {
@@ -160,10 +159,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
     queryRet.set(new HashMap<>());
   }
 
-  public void initClusterService() {
-
-  }
-
   @Override
   public TSCloseSessionResp closeSession(TSCloseSessionReq req) throws TException {
     LOGGER.info("{}: receive close session", IoTDBConstant.GLOBAL_DB_NAME);