You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/01/17 09:31:41 UTC
[iotdb] branch master updated: remove duplicate codes
(waitForThreadPool) in the ClusterPlanExecutor and remove duplicate params
of script (#2502)
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3016cb9 remove duplicate codes (waitForThreadPool) in the ClusterPlanExecutor and remove duplicate params of script (#2502)
3016cb9 is described below
commit 3016cb91cfe4dd528c1c07cda5be47e4b43a326c
Author: HouliangQi <ne...@163.com>
AuthorDate: Sun Jan 17 17:31:22 2021 +0800
remove duplicate codes (waitForThreadPool) in the ClusterPlanExecutor and remove duplicate params of script (#2502)
* remove duplicate codes in the ClusterPlanExecutor and remove duplicates params of script
---
cluster/src/assembly/resources/sbin/add-node.sh | 2 +-
cluster/src/assembly/resources/sbin/start-node.sh | 2 +-
.../apache/iotdb/cluster/metadata/CMManager.java | 2 +-
.../apache/iotdb/cluster/metadata/MetaPuller.java | 5 +-
.../iotdb/cluster/query/ClusterPlanExecutor.java | 53 ++++------------------
server/src/assembly/resources/sbin/start-server.sh | 2 +-
.../assembly/resources/tools/start-WalChecker.sh | 2 +-
7 files changed, 16 insertions(+), 52 deletions(-)
diff --git a/cluster/src/assembly/resources/sbin/add-node.sh b/cluster/src/assembly/resources/sbin/add-node.sh
index 0ed52e5..807175b 100755
--- a/cluster/src/assembly/resources/sbin/add-node.sh
+++ b/cluster/src/assembly/resources/sbin/add-node.sh
@@ -66,7 +66,7 @@ launch_service()
iotdb_parms="$iotdb_parms -DTSFILE_HOME=${IOTDB_HOME}"
iotdb_parms="$iotdb_parms -DIOTDB_CONF=${IOTDB_CONF}"
iotdb_parms="$iotdb_parms -Dname=iotdb\.IoTDB"
- exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS $iotdb_parms -cp "$CLASSPATH" "$class" -a
+ exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS -cp "$CLASSPATH" "$class" -a
return $?
}
diff --git a/cluster/src/assembly/resources/sbin/start-node.sh b/cluster/src/assembly/resources/sbin/start-node.sh
index 0560371..5f84110 100755
--- a/cluster/src/assembly/resources/sbin/start-node.sh
+++ b/cluster/src/assembly/resources/sbin/start-node.sh
@@ -90,7 +90,7 @@ launch_service()
iotdb_parms="$iotdb_parms -DIOTDB_CONF=${IOTDB_CONF}"
iotdb_parms="$iotdb_parms -DCLUSTER_CONF=${IOTDB_CONF}"
iotdb_parms="$iotdb_parms -Dname=iotdb\.IoTDB"
- exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS -cp "$CLASSPATH" "$class" $CONF_PARAMS
+ exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS -cp "$CLASSPATH" "$class" $CONF_PARAMS
return $?
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index 84b0697..d5aedcf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -1308,7 +1308,7 @@ public class CMManager extends MManager {
}));
}
- waitForThreadPool(futureList, pool);
+ waitForThreadPool(futureList, pool, "showTimeseries()");
List<ShowTimeSeriesResult> showTimeSeriesResults = applyShowTimeseriesLimitOffset(resultSet,
limit, offset);
logger.debug("Show {} has {} results", plan.getPath(), showTimeSeriesResults.size());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
index 68306cd..915b0f0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
@@ -74,8 +74,6 @@ public class MetaPuller {
/**
* Pull the all timeseries schemas of given prefixPaths from remote nodes. All prefixPaths must
* contain the storage group.
- * <p>
- * Attention!!! Just copy from metaGroupMember now, will refactor later.
*/
List<MeasurementSchema> pullMeasurementSchemas(List<PartialPath> prefixPaths)
throws MetadataException {
@@ -83,7 +81,8 @@ public class MetaPuller {
// split the paths by the data groups that will hold them
Map<PartitionGroup, List<PartialPath>> partitionGroupPathMap = new HashMap<>();
for (PartialPath prefixPath : prefixPaths) {
- PartitionGroup partitionGroup = ClusterUtils.partitionByPathTimeWithSync(prefixPath, metaGroupMember);
+ PartitionGroup partitionGroup = ClusterUtils
+ .partitionByPathTimeWithSync(prefixPath, metaGroupMember);
partitionGroupPathMap.computeIfAbsent(partitionGroup, g -> new ArrayList<>()).add(prefixPath);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
index 332857c..72d24be 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
@@ -194,7 +194,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
}
ExecutorService remoteQueryThreadPool = Executors.newFixedThreadPool(groupPathMap.size());
- List<Future<?>> remoteFutures = new ArrayList<>();
+ List<Future<Void>> remoteFutures = new ArrayList<>();
// query each data group separately
for (Entry<PartitionGroup, List<String>> partitionGroupPathEntry : groupPathMap.entrySet()) {
PartitionGroup partitionGroup = partitionGroupPathEntry.getKey();
@@ -206,28 +206,10 @@ public class ClusterPlanExecutor extends PlanExecutor {
logger.warn("Cannot get remote path count of {} from {}", pathsToQuery, partitionGroup,
e);
}
+ return null;
}));
}
- for (Future<?> remoteFuture : remoteFutures) {
- try {
- remoteFuture.get();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- logger.info("Query path count of {} level {} interrupted", sgPathMap, level);
- return result.get();
- } catch (ExecutionException e) {
- logger.warn("Cannot get remote path count of {} level {}", sgPathMap, level, e);
- }
- }
- remoteQueryThreadPool.shutdown();
- try {
- remoteQueryThreadPool
- .awaitTermination(RaftServer.getReadOperationTimeoutMS(), TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- logger.info("Query path count of {} level {} interrupted", sgPathMap, level);
- return result.get();
- }
+ waitForThreadPool(remoteFutures, remoteQueryThreadPool, "getPathCount()");
return result.get();
}
@@ -305,24 +287,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
return null;
}));
}
- for (Future<Void> future : futureList) {
- try {
- future.get();
- } catch (InterruptedException e) {
- logger.error("Interrupted when getting node lists");
- Thread.currentThread().interrupt();
- } catch (RuntimeException | ExecutionException e) {
- throw new MetadataException(e);
- }
- }
-
- pool.shutdown();
- try {
- pool.awaitTermination(RaftServer.getReadOperationTimeoutMS(), TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- logger.error("Unexpected interruption when waiting for getNodeList()", e);
- }
+ waitForThreadPool(futureList, pool, "getNodesList()");
return new ArrayList<>(nodeSet);
}
@@ -407,18 +372,18 @@ public class ClusterPlanExecutor extends PlanExecutor {
return null;
}));
}
-
- waitForThreadPool(futureList, pool);
+ waitForThreadPool(futureList, pool, "getPathNextChildren()");
return resultSet;
}
- public static void waitForThreadPool(List<Future<Void>> futures, ExecutorService pool)
+ public static void waitForThreadPool(List<Future<Void>> futures, ExecutorService pool,
+ String methodName)
throws MetadataException {
for (Future<Void> future : futures) {
try {
future.get();
} catch (InterruptedException e) {
- logger.error("Unexpected interruption when waiting for getNextChildren()", e);
+ logger.error("Unexpected interruption when waiting for {}", methodName, e);
Thread.currentThread().interrupt();
} catch (RuntimeException | ExecutionException e) {
throw new MetadataException(e);
@@ -430,7 +395,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
pool.awaitTermination(RaftServer.getReadOperationTimeoutMS(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- logger.error("Unexpected interruption when waiting for getNextChildren()", e);
+ logger.error("Unexpected interruption when waiting for {}", methodName, e);
}
}
diff --git a/server/src/assembly/resources/sbin/start-server.sh b/server/src/assembly/resources/sbin/start-server.sh
index c5b863f..680d5b9 100755
--- a/server/src/assembly/resources/sbin/start-server.sh
+++ b/server/src/assembly/resources/sbin/start-server.sh
@@ -74,7 +74,7 @@ launch_service()
iotdb_parms="$iotdb_parms -DIOTDB_CONF=${IOTDB_CONF}"
iotdb_parms="$iotdb_parms -DTSFILE_CONF=${IOTDB_CONF}"
iotdb_parms="$iotdb_parms -Dname=iotdb\.IoTDB"
- exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS -cp "$CLASSPATH" "$class" $CONF_PARAMS
+ exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS -cp "$CLASSPATH" "$class" $CONF_PARAMS
return $?
}
diff --git a/server/src/assembly/resources/tools/start-WalChecker.sh b/server/src/assembly/resources/tools/start-WalChecker.sh
index c7afb9f..a977ff2 100755
--- a/server/src/assembly/resources/tools/start-WalChecker.sh
+++ b/server/src/assembly/resources/tools/start-WalChecker.sh
@@ -67,7 +67,7 @@ launch_service()
iotdb_parms="$iotdb_parms -DTSFILE_HOME=${IOTDB_HOME}"
iotdb_parms="$iotdb_parms -DIOTDB_CONF=${IOTDB_CONF}"
# iotdb_parms="$iotdb_parms -Dname=iotdb\.IoTDB"
- exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS $iotdb_parms -cp "$CLASSPATH" "$class" "$WALPATH"
+ exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS -cp "$CLASSPATH" "$class" "$WALPATH"
return $?
}