You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/01/17 09:31:41 UTC

[iotdb] branch master updated: remove duplicate codes (waitForThreadPool) in the ClusterPlanExecutor and remove duplicate params of script (#2502)

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3016cb9  remove duplicate codes (waitForThreadPool) in the ClusterPlanExecutor and remove duplicate params of script (#2502)
3016cb9 is described below

commit 3016cb91cfe4dd528c1c07cda5be47e4b43a326c
Author: HouliangQi <ne...@163.com>
AuthorDate: Sun Jan 17 17:31:22 2021 +0800

    remove duplicate codes (waitForThreadPool) in the ClusterPlanExecutor and remove duplicate params of script (#2502)
    
    * remove duplicate codes in the ClusterPlanExecutor and remove duplicates params of script
---
 cluster/src/assembly/resources/sbin/add-node.sh    |  2 +-
 cluster/src/assembly/resources/sbin/start-node.sh  |  2 +-
 .../apache/iotdb/cluster/metadata/CMManager.java   |  2 +-
 .../apache/iotdb/cluster/metadata/MetaPuller.java  |  5 +-
 .../iotdb/cluster/query/ClusterPlanExecutor.java   | 53 ++++------------------
 server/src/assembly/resources/sbin/start-server.sh |  2 +-
 .../assembly/resources/tools/start-WalChecker.sh   |  2 +-
 7 files changed, 16 insertions(+), 52 deletions(-)

diff --git a/cluster/src/assembly/resources/sbin/add-node.sh b/cluster/src/assembly/resources/sbin/add-node.sh
index 0ed52e5..807175b 100755
--- a/cluster/src/assembly/resources/sbin/add-node.sh
+++ b/cluster/src/assembly/resources/sbin/add-node.sh
@@ -66,7 +66,7 @@ launch_service()
 	iotdb_parms="$iotdb_parms -DTSFILE_HOME=${IOTDB_HOME}"
 	iotdb_parms="$iotdb_parms -DIOTDB_CONF=${IOTDB_CONF}"
 	iotdb_parms="$iotdb_parms -Dname=iotdb\.IoTDB"
-	exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS $iotdb_parms -cp "$CLASSPATH"  "$class" -a
+	exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS -cp "$CLASSPATH" "$class" -a
 	return $?
 }
 
diff --git a/cluster/src/assembly/resources/sbin/start-node.sh b/cluster/src/assembly/resources/sbin/start-node.sh
index 0560371..5f84110 100755
--- a/cluster/src/assembly/resources/sbin/start-node.sh
+++ b/cluster/src/assembly/resources/sbin/start-node.sh
@@ -90,7 +90,7 @@ launch_service()
 	iotdb_parms="$iotdb_parms -DIOTDB_CONF=${IOTDB_CONF}"
 	iotdb_parms="$iotdb_parms -DCLUSTER_CONF=${IOTDB_CONF}"
 	iotdb_parms="$iotdb_parms -Dname=iotdb\.IoTDB"
-	exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS -cp "$CLASSPATH"  "$class" $CONF_PARAMS
+	exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS -cp "$CLASSPATH" "$class" $CONF_PARAMS
 	return $?
 }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index 84b0697..d5aedcf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -1308,7 +1308,7 @@ public class CMManager extends MManager {
       }));
     }
 
-    waitForThreadPool(futureList, pool);
+    waitForThreadPool(futureList, pool, "showTimeseries()");
     List<ShowTimeSeriesResult> showTimeSeriesResults = applyShowTimeseriesLimitOffset(resultSet,
         limit, offset);
     logger.debug("Show {} has {} results", plan.getPath(), showTimeSeriesResults.size());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
index 68306cd..915b0f0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
@@ -74,8 +74,6 @@ public class MetaPuller {
   /**
    * Pull the all timeseries schemas of given prefixPaths from remote nodes. All prefixPaths must
    * contain the storage group.
-   * <p>
-   * Attention!!!  Just copy from metaGroupMember now, will refactor later.
    */
   List<MeasurementSchema> pullMeasurementSchemas(List<PartialPath> prefixPaths)
       throws MetadataException {
@@ -83,7 +81,8 @@ public class MetaPuller {
     // split the paths by the data groups that will hold them
     Map<PartitionGroup, List<PartialPath>> partitionGroupPathMap = new HashMap<>();
     for (PartialPath prefixPath : prefixPaths) {
-      PartitionGroup partitionGroup = ClusterUtils.partitionByPathTimeWithSync(prefixPath, metaGroupMember);
+      PartitionGroup partitionGroup = ClusterUtils
+          .partitionByPathTimeWithSync(prefixPath, metaGroupMember);
       partitionGroupPathMap.computeIfAbsent(partitionGroup, g -> new ArrayList<>()).add(prefixPath);
     }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
index 332857c..72d24be 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
@@ -194,7 +194,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
     }
 
     ExecutorService remoteQueryThreadPool = Executors.newFixedThreadPool(groupPathMap.size());
-    List<Future<?>> remoteFutures = new ArrayList<>();
+    List<Future<Void>> remoteFutures = new ArrayList<>();
     // query each data group separately
     for (Entry<PartitionGroup, List<String>> partitionGroupPathEntry : groupPathMap.entrySet()) {
       PartitionGroup partitionGroup = partitionGroupPathEntry.getKey();
@@ -206,28 +206,10 @@ public class ClusterPlanExecutor extends PlanExecutor {
           logger.warn("Cannot get remote path count of {} from {}", pathsToQuery, partitionGroup,
               e);
         }
+        return null;
       }));
     }
-    for (Future<?> remoteFuture : remoteFutures) {
-      try {
-        remoteFuture.get();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        logger.info("Query path count of {} level {} interrupted", sgPathMap, level);
-        return result.get();
-      } catch (ExecutionException e) {
-        logger.warn("Cannot get remote path count of {} level {}", sgPathMap, level, e);
-      }
-    }
-    remoteQueryThreadPool.shutdown();
-    try {
-      remoteQueryThreadPool
-          .awaitTermination(RaftServer.getReadOperationTimeoutMS(), TimeUnit.MILLISECONDS);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      logger.info("Query path count of {} level {} interrupted", sgPathMap, level);
-      return result.get();
-    }
+    waitForThreadPool(remoteFutures, remoteQueryThreadPool, "getPathCount()");
 
     return result.get();
   }
@@ -305,24 +287,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
         return null;
       }));
     }
-    for (Future<Void> future : futureList) {
-      try {
-        future.get();
-      } catch (InterruptedException e) {
-        logger.error("Interrupted when getting node lists");
-        Thread.currentThread().interrupt();
-      } catch (RuntimeException | ExecutionException e) {
-        throw new MetadataException(e);
-      }
-    }
-
-    pool.shutdown();
-    try {
-      pool.awaitTermination(RaftServer.getReadOperationTimeoutMS(), TimeUnit.MILLISECONDS);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      logger.error("Unexpected interruption when waiting for getNodeList()", e);
-    }
+    waitForThreadPool(futureList, pool, "getNodesList()");
     return new ArrayList<>(nodeSet);
   }
 
@@ -407,18 +372,18 @@ public class ClusterPlanExecutor extends PlanExecutor {
         return null;
       }));
     }
-
-    waitForThreadPool(futureList, pool);
+    waitForThreadPool(futureList, pool, "getPathNextChildren()");
     return resultSet;
   }
 
-  public static void waitForThreadPool(List<Future<Void>> futures, ExecutorService pool)
+  public static void waitForThreadPool(List<Future<Void>> futures, ExecutorService pool,
+      String methodName)
       throws MetadataException {
     for (Future<Void> future : futures) {
       try {
         future.get();
       } catch (InterruptedException e) {
-        logger.error("Unexpected interruption when waiting for getNextChildren()", e);
+        logger.error("Unexpected interruption when waiting for {}", methodName, e);
         Thread.currentThread().interrupt();
       } catch (RuntimeException | ExecutionException e) {
         throw new MetadataException(e);
@@ -430,7 +395,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
       pool.awaitTermination(RaftServer.getReadOperationTimeoutMS(), TimeUnit.MILLISECONDS);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
-      logger.error("Unexpected interruption when waiting for getNextChildren()", e);
+      logger.error("Unexpected interruption when waiting for {}", methodName, e);
     }
   }
 
diff --git a/server/src/assembly/resources/sbin/start-server.sh b/server/src/assembly/resources/sbin/start-server.sh
index c5b863f..680d5b9 100755
--- a/server/src/assembly/resources/sbin/start-server.sh
+++ b/server/src/assembly/resources/sbin/start-server.sh
@@ -74,7 +74,7 @@ launch_service()
 	iotdb_parms="$iotdb_parms -DIOTDB_CONF=${IOTDB_CONF}"
 	iotdb_parms="$iotdb_parms -DTSFILE_CONF=${IOTDB_CONF}"
 	iotdb_parms="$iotdb_parms -Dname=iotdb\.IoTDB"
-	exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS -cp "$CLASSPATH"  "$class" $CONF_PARAMS
+	exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS -cp "$CLASSPATH" "$class" $CONF_PARAMS
 	return $?
 }
 
diff --git a/server/src/assembly/resources/tools/start-WalChecker.sh b/server/src/assembly/resources/tools/start-WalChecker.sh
index c7afb9f..a977ff2 100755
--- a/server/src/assembly/resources/tools/start-WalChecker.sh
+++ b/server/src/assembly/resources/tools/start-WalChecker.sh
@@ -67,7 +67,7 @@ launch_service()
 	iotdb_parms="$iotdb_parms -DTSFILE_HOME=${IOTDB_HOME}"
 	iotdb_parms="$iotdb_parms -DIOTDB_CONF=${IOTDB_CONF}"
 #	iotdb_parms="$iotdb_parms -Dname=iotdb\.IoTDB"
-	exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS $iotdb_parms -cp "$CLASSPATH"  "$class" "$WALPATH"
+	exec "$JAVA" $iotdb_parms $IOTDB_JMX_OPTS -cp "$CLASSPATH" "$class" "$WALPATH"
 	return $?
 }