You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/16 12:26:58 UTC
[incubator-iotdb] 07/19: remove threadlocal in TSServiceClusterImpl
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster_read
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit ba0389fca23c4426cb628546ef40206a479349f8
Author: lta <li...@163.com>
AuthorDate: Mon Apr 15 13:55:05 2019 +0800
remove threadlocal in TSServiceClusterImpl
---
...sterQPExecutor.java => AbstractQPExecutor.java} | 10 ++---
.../cluster/qp/executor/NonQueryExecutor.java | 7 ++--
.../cluster/qp/executor/QueryMetadataExecutor.java | 2 +-
.../apache/iotdb/cluster/qp/task/MultiQPTask.java | 4 +-
.../apache/iotdb/cluster/qp/task/SingleQPTask.java | 4 +-
.../executor/ClusterQueryRouter.java | 2 +-
.../executor/QueryProcessorExecutor.java | 2 +-
.../cluster/service/TSServiceClusterImpl.java | 44 ++++++++--------------
...ecutorTest.java => AbstractQPExecutorTest.java} | 7 +---
.../org/apache/iotdb/db/service/TSServiceImpl.java | 7 +---
10 files changed, 37 insertions(+), 52 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractClusterQPExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
similarity index 98%
rename from cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractClusterQPExecutor.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
index 1a3cfe3..654d13c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractClusterQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
@@ -45,9 +45,9 @@ import org.apache.iotdb.db.metadata.MManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class AbstractClusterQPExecutor {
+public abstract class AbstractQPExecutor {
- private static final Logger LOGGER = LoggerFactory.getLogger(AbstractClusterQPExecutor.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractQPExecutor.class);
private static final ClusterConfig CLUSTER_CONFIG = ClusterDescriptor.getInstance().getConfig();
@@ -69,7 +69,7 @@ public abstract class AbstractClusterQPExecutor {
/**
* The task in progress.
*/
- protected QPTask currentTask;
+ protected ThreadLocal<QPTask> currentTask;
/**
* Count limit to redo a single task
@@ -235,8 +235,8 @@ public abstract class AbstractClusterQPExecutor {
}
public void shutdown() {
- if (currentTask != null) {
- currentTask.shutdown();
+ if (currentTask.get() != null) {
+ currentTask.get().shutdown();
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index f9fa3a9..071c096 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import org.apache.iotdb.cluster.qp.task.BatchQPTask;
import org.apache.iotdb.cluster.qp.task.QPTask;
import org.apache.iotdb.cluster.qp.task.SingleQPTask;
@@ -60,7 +61,7 @@ import org.slf4j.LoggerFactory;
/**
* Handle distributed non-query logic
*/
-public class NonQueryExecutor extends AbstractClusterQPExecutor {
+public class NonQueryExecutor extends AbstractQPExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(NonQueryExecutor.class);
@@ -162,7 +163,7 @@ public class NonQueryExecutor extends AbstractClusterQPExecutor {
/** 3. Execute Multiple Sub Tasks **/
BatchQPTask task = new BatchQPTask(subTaskMap.size(), batchResult, subTaskMap, planIndexMap);
- currentTask = task;
+ currentTask.set(task);
task.execute(this);
task.await();
batchResult.setAllSuccessful(task.isAllSuccessful());
@@ -299,7 +300,7 @@ public class NonQueryExecutor extends AbstractClusterQPExecutor {
request = new DataGroupNonQueryRequest(groupId, plans);
}
SingleQPTask qpTask = new SingleQPTask(false, request);
- currentTask = qpTask;
+ currentTask.set(qpTask);
/** Check if the plan can be executed locally. **/
if (canHandleNonQueryByGroupId(groupId)) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 73e3181..f1098f8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory;
/**
* Handle < show timeseries <path> > logic
*/
-public class QueryMetadataExecutor extends AbstractClusterQPExecutor {
+public class QueryMetadataExecutor extends AbstractQPExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(QueryMetadataExecutor.class);
private static final String DOUB_SEPARATOR = "\\.";
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/MultiQPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/MultiQPTask.java
index f53a2a2..86e2b74 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/MultiQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/MultiQPTask.java
@@ -43,6 +43,8 @@ public abstract class MultiQPTask extends QPTask {
taskThread.interrupt();
}
}
- this.taskCountDownLatch.countDown();
+ while(taskCountDownLatch.getCount()!=0) {
+ this.taskCountDownLatch.countDown();
+ }
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java
index 7896ca3..805834e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java
@@ -57,6 +57,8 @@ public class SingleQPTask extends QPTask {
@Override
public void shutdown() {
- this.taskCountDownLatch.countDown();
+ if (taskCountDownLatch.getCount() != 0) {
+ this.taskCountDownLatch.countDown();
+ }
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryRouter.java
similarity index 96%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryRouter.java
index 0f797fc..8fc030a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/ClusterQueryRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/ClusterQueryRouter.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.cluster.query.executor;
+package org.apache.iotdb.cluster.query.coordinatornode.executor;
import java.io.IOException;
import java.util.List;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/QueryProcessorExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/QueryProcessorExecutor.java
similarity index 97%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/executor/QueryProcessorExecutor.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/QueryProcessorExecutor.java
index ecd5f61..8531512 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/executor/QueryProcessorExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/coordinatornode/executor/QueryProcessorExecutor.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.cluster.query.executor;
+package org.apache.iotdb.cluster.query.coordinatornode.executor;
import java.io.IOException;
import java.util.List;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
index 7d6516e..3a1561e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/service/TSServiceClusterImpl.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.cluster.service;
-import com.alipay.sofa.jraft.util.OnlyForTest;
import java.io.IOException;
import java.sql.Statement;
import java.util.Arrays;
@@ -30,7 +29,7 @@ import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.exception.ConsistencyLevelException;
import org.apache.iotdb.cluster.qp.executor.NonQueryExecutor;
import org.apache.iotdb.cluster.qp.executor.QueryMetadataExecutor;
-import org.apache.iotdb.cluster.query.executor.QueryProcessorExecutor;
+import org.apache.iotdb.cluster.query.coordinatornode.executor.QueryProcessorExecutor;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.exception.PathErrorException;
@@ -56,20 +55,14 @@ public class TSServiceClusterImpl extends TSServiceImpl {
private static final Logger LOGGER = LoggerFactory.getLogger(TSServiceClusterImpl.class);
private QueryProcessor processor = new QueryProcessor(new QueryProcessorExecutor());
- private ThreadLocal<NonQueryExecutor> nonQueryExecutor = new ThreadLocal<>();
- private ThreadLocal<QueryMetadataExecutor> queryMetadataExecutor = new ThreadLocal<>();
+ private NonQueryExecutor nonQueryExecutor = new NonQueryExecutor();
+ private QueryMetadataExecutor queryMetadataExecutor = new QueryMetadataExecutor();
public TSServiceClusterImpl() throws IOException {
super();
}
@Override
- public void initClusterService() {
- nonQueryExecutor.set(new NonQueryExecutor());
- queryMetadataExecutor.set(new QueryMetadataExecutor());
- }
-
- @Override
public TSExecuteBatchStatementResp executeBatchStatement(TSExecuteBatchStatementReq req)
throws TException {
try {
@@ -99,7 +92,7 @@ public class TSServiceClusterImpl extends TSServiceImpl {
result = resultTemp;
physicalPlans = physicalPlansTemp;
BatchResult batchResult = new BatchResult(isAllSuccessful, batchErrorMessage, result);
- nonQueryExecutor.get().processBatch(physicalPlans, batchResult);
+ nonQueryExecutor.processBatch(physicalPlans, batchResult);
return getTSBathExecuteStatementResp(TS_StatusCode.ERROR_STATUS,
"statement is query :" + statements.get(i), Arrays.stream(result).boxed().collect(
Collectors.toList()));
@@ -133,7 +126,7 @@ public class TSServiceClusterImpl extends TSServiceImpl {
}
BatchResult batchResult = new BatchResult(isAllSuccessful, batchErrorMessage, result);
- nonQueryExecutor.get().processBatch(physicalPlans, batchResult);
+ nonQueryExecutor.processBatch(physicalPlans, batchResult);
batchErrorMessage = batchResult.batchErrorMessage;
isAllSuccessful = batchResult.isAllSuccessful;
@@ -202,13 +195,13 @@ public class TSServiceClusterImpl extends TSServiceImpl {
if (Pattern.matches(ClusterConstant.SET_READ_METADATA_CONSISTENCY_LEVEL_PATTERN, statement)) {
String[] splits = statement.split("\\s+");
int level = Integer.parseInt(splits[splits.length - 1]);
- nonQueryExecutor.get().setReadMetadataConsistencyLevel(level);
+ nonQueryExecutor.setReadMetadataConsistencyLevel(level);
return true;
} else if (Pattern
.matches(ClusterConstant.SET_READ_DATA_CONSISTENCY_LEVEL_PATTERN, statement)) {
String[] splits = statement.split("\\s+");
int level = Integer.parseInt(splits[splits.length - 1]);
- nonQueryExecutor.get().setReadDataConsistencyLevel(level);
+ nonQueryExecutor.setReadDataConsistencyLevel(level);
return true;
} else {
return false;
@@ -220,7 +213,7 @@ public class TSServiceClusterImpl extends TSServiceImpl {
@Override
protected boolean executeNonQuery(PhysicalPlan plan) throws ProcessorException {
- return nonQueryExecutor.get().processNonQuery(plan);
+ return nonQueryExecutor.processNonQuery(plan);
}
/**
@@ -228,46 +221,41 @@ public class TSServiceClusterImpl extends TSServiceImpl {
*/
@Override
public void closeClusterService() {
- nonQueryExecutor.get().shutdown();
- queryMetadataExecutor.get().shutdown();
+ nonQueryExecutor.shutdown();
+ queryMetadataExecutor.shutdown();
}
@Override
protected Set<String> getAllStorageGroups() throws InterruptedException {
- return queryMetadataExecutor.get().processStorageGroupQuery();
+ return queryMetadataExecutor.processStorageGroupQuery();
}
@Override
protected List<List<String>> getTimeSeriesForPath(String path)
throws PathErrorException, InterruptedException, ProcessorException {
- return queryMetadataExecutor.get().processTimeSeriesQuery(path);
+ return queryMetadataExecutor.processTimeSeriesQuery(path);
}
@Override
protected String getMetadataInString()
throws InterruptedException, ProcessorException {
- return queryMetadataExecutor.get().processMetadataInStringQuery();
+ return queryMetadataExecutor.processMetadataInStringQuery();
}
@Override
protected Metadata getMetadata()
throws InterruptedException, ProcessorException, PathErrorException {
- return queryMetadataExecutor.get().processMetadataQuery();
+ return queryMetadataExecutor.processMetadataQuery();
}
@Override
protected TSDataType getSeriesType(String path) throws PathErrorException, InterruptedException, ProcessorException {
- return queryMetadataExecutor.get().processSeriesTypeQuery(path);
+ return queryMetadataExecutor.processSeriesTypeQuery(path);
}
@Override
protected List<String> getPaths(String path)
throws PathErrorException, InterruptedException, ProcessorException {
- return queryMetadataExecutor.get().processPathsQuery(path);
- }
-
- @OnlyForTest
- public NonQueryExecutor getNonQueryExecutor() {
- return nonQueryExecutor.get();
+ return queryMetadataExecutor.processPathsQuery(path);
}
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/qp/AbstractClusterQPExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/qp/AbstractQPExecutorTest.java
similarity index 96%
rename from cluster/src/test/java/org/apache/iotdb/cluster/qp/AbstractClusterQPExecutorTest.java
rename to cluster/src/test/java/org/apache/iotdb/cluster/qp/AbstractQPExecutorTest.java
index 7959423..e9dd05a 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/qp/AbstractClusterQPExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/qp/AbstractQPExecutorTest.java
@@ -31,21 +31,18 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-public class AbstractClusterQPExecutorTest {
+public class AbstractQPExecutorTest {
private static final ClusterConfig CLUSTER_CONFIG = ClusterDescriptor.getInstance().getConfig();
private TSServiceClusterImpl impl;
- private NonQueryExecutor executor;
+ private NonQueryExecutor executor = new NonQueryExecutor();
@Before
public void setUp() throws Exception {
EnvironmentUtils.envSetUp();
impl = new TSServiceClusterImpl();
- impl.initClusterService();
-
- executor = impl.getNonQueryExecutor();
}
@After
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index c0e711b..fd1a994 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -118,9 +118,8 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
LOGGER.info("{}: receive open session request from username {}", IoTDBConstant.GLOBAL_DB_NAME,
req.getUsername());
- initClusterService();
boolean status;
- IAuthorizer authorizer = null;
+ IAuthorizer authorizer;
try {
authorizer = LocalFileAuthorizer.getInstance();
} catch (AuthException e) {
@@ -160,10 +159,6 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
queryRet.set(new HashMap<>());
}
- public void initClusterService() {
-
- }
-
@Override
public TSCloseSessionResp closeSession(TSCloseSessionReq req) throws TException {
LOGGER.info("{}: receive close session", IoTDBConstant.GLOBAL_DB_NAME);