You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/11/27 05:24:22 UTC

[iotdb] 04/10: [IOTDB-2010] fix incomplete show timeseries result (#4388)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master_performance
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0d0363ad1a3db3e15e8f00ee6eb79c3c0292a8d0
Author: Zhong Wang <wa...@alibaba-inc.com>
AuthorDate: Fri Nov 26 10:41:12 2021 +0800

    [IOTDB-2010] fix incomplete show timeseries result (#4388)
    
    * [IOTDB-2010] fix incomplete show timeseries result
    
    * Self-review
    
    1. drop duplicated show timeseries results
    2. fix coding style issue
---
 .../apache/iotdb/cluster/metadata/CMManager.java   |  68 +++++------
 .../handlers/caller/ShowTimeSeriesHandler.java     | 133 +++++++++++++++++++++
 2 files changed, 163 insertions(+), 38 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index e2d9280..ab81bd4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
 import org.apache.iotdb.cluster.rpc.thrift.GetAllPathsResult;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
+import org.apache.iotdb.cluster.server.handlers.caller.ShowTimeSeriesHandler;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.cluster.utils.ClusterQueryUtils;
@@ -1392,19 +1393,18 @@ public class CMManager extends MManager {
   @Override
   public List<ShowTimeSeriesResult> showTimeseries(ShowTimeSeriesPlan plan, QueryContext context)
       throws MetadataException {
-    ConcurrentSkipListSet<ShowTimeSeriesResult> resultSet = new ConcurrentSkipListSet<>();
     ExecutorService pool =
         new ThreadPoolExecutor(
             THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
 
-    List<PartitionGroup> globalGroups = new ArrayList<>();
+    List<PartitionGroup> groups = new ArrayList<>();
     try {
       PartitionGroup partitionGroup =
           metaGroupMember.getPartitionTable().partitionByPathTime(plan.getPath(), 0);
-      globalGroups.add(partitionGroup);
+      groups.add(partitionGroup);
     } catch (MetadataException e) {
       // if the path location is not find, obtain the path location from all groups.
-      globalGroups = metaGroupMember.getPartitionTable().getGlobalGroups();
+      groups = metaGroupMember.getPartitionTable().getGlobalGroups();
     }
 
     int limit = plan.getLimit() == 0 ? Integer.MAX_VALUE : plan.getLimit();
@@ -1421,35 +1421,31 @@ public class CMManager extends MManager {
     }
 
     if (logger.isDebugEnabled()) {
-      logger.debug(
-          "Fetch timeseries schemas of {} from {} groups", plan.getPath(), globalGroups.size());
+      logger.debug("Fetch timeseries schemas of {} from {} groups", plan.getPath(), groups.size());
     }
 
+    ShowTimeSeriesHandler handler = new ShowTimeSeriesHandler(groups.size(), plan.getPath());
     List<Future<Void>> futureList = new ArrayList<>();
-    for (PartitionGroup group : globalGroups) {
+    for (PartitionGroup group : groups) {
       futureList.add(
           pool.submit(
               () -> {
-                try {
-                  showTimeseries(group, plan, resultSet, context);
-                } catch (CheckConsistencyException | MetadataException e) {
-                  logger.error("Cannot get show timeseries result of {} from {}", plan, group);
-                }
+                showTimeseries(group, plan, context, handler);
                 return null;
               }));
     }
 
     waitForThreadPool(futureList, pool, "showTimeseries()");
     List<ShowTimeSeriesResult> showTimeSeriesResults =
-        applyShowTimeseriesLimitOffset(resultSet, limit, offset);
+        applyShowTimeseriesLimitOffset(handler.getResult(), limit, offset);
     logger.debug("Show {} has {} results", plan.getPath(), showTimeSeriesResults.size());
     return showTimeSeriesResults;
   }
 
   private List<ShowTimeSeriesResult> applyShowTimeseriesLimitOffset(
-      ConcurrentSkipListSet<ShowTimeSeriesResult> resultSet, int limit, int offset) {
+      List<ShowTimeSeriesResult> results, int limit, int offset) {
     List<ShowTimeSeriesResult> showTimeSeriesResults = new ArrayList<>();
-    Iterator<ShowTimeSeriesResult> iterator = resultSet.iterator();
+    Iterator<ShowTimeSeriesResult> iterator = results.iterator();
     while (iterator.hasNext() && limit > 0) {
       if (offset > 0) {
         offset--;
@@ -1482,13 +1478,12 @@ public class CMManager extends MManager {
   private void showTimeseries(
       PartitionGroup group,
       ShowTimeSeriesPlan plan,
-      Set<ShowTimeSeriesResult> resultSet,
-      QueryContext context)
-      throws CheckConsistencyException, MetadataException {
+      QueryContext context,
+      ShowTimeSeriesHandler handler) {
     if (group.contains(metaGroupMember.getThisNode())) {
-      showLocalTimeseries(group, plan, resultSet, context);
+      showLocalTimeseries(group, plan, context, handler);
     } else {
-      showRemoteTimeseries(group, plan, resultSet);
+      showRemoteTimeseries(group, plan, handler);
     }
   }
 
@@ -1521,28 +1516,21 @@ public class CMManager extends MManager {
   private void showLocalTimeseries(
       PartitionGroup group,
       ShowTimeSeriesPlan plan,
-      Set<ShowTimeSeriesResult> resultSet,
-      QueryContext context)
-      throws CheckConsistencyException, MetadataException {
-    DataGroupMember localDataMember =
-        metaGroupMember.getLocalDataMember(group.getHeader(), group.getRaftId());
-    localDataMember.syncLeaderWithConsistencyCheck(false);
+      QueryContext context,
+      ShowTimeSeriesHandler handler) {
     try {
+      DataGroupMember localDataMember =
+          metaGroupMember.getLocalDataMember(group.getHeader(), group.getRaftId());
+      localDataMember.syncLeaderWithConsistencyCheck(false);
       List<ShowTimeSeriesResult> localResult = super.showTimeseries(plan, context);
-      resultSet.addAll(localResult);
-      logger.debug(
-          "Fetched local timeseries {} schemas of {} from {}",
-          localResult.size(),
-          plan.getPath(),
-          group);
-    } catch (MetadataException e) {
-      logger.error("Cannot execute show timeseries plan  {} from {} locally.", plan, group);
-      throw e;
+      handler.onComplete(localResult);
+    } catch (MetadataException | CheckConsistencyException e) {
+      handler.onError(e);
     }
   }
 
   private void showRemoteTimeseries(
-      PartitionGroup group, ShowTimeSeriesPlan plan, Set<ShowTimeSeriesResult> resultSet) {
+      PartitionGroup group, ShowTimeSeriesPlan plan, ShowTimeSeriesHandler handler) {
     ByteBuffer resultBinary = null;
     for (Node node : group) {
       try {
@@ -1560,13 +1548,17 @@ public class CMManager extends MManager {
 
     if (resultBinary != null) {
       int size = resultBinary.getInt();
+      List<ShowTimeSeriesResult> results = new ArrayList<>();
       logger.debug(
           "Fetched remote timeseries {} schemas of {} from {}", size, plan.getPath(), group);
       for (int i = 0; i < size; i++) {
-        resultSet.add(ShowTimeSeriesResult.deserialize(resultBinary));
+        results.add(ShowTimeSeriesResult.deserialize(resultBinary));
       }
+      handler.onComplete(results);
     } else {
-      logger.error("Failed to execute show timeseries {} in group: {}.", plan, group);
+      String errMsg =
+          String.format("Failed to get timeseries in path %s from group %s", plan.getPath(), group);
+      handler.onError(new MetadataException(errMsg));
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/ShowTimeSeriesHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/ShowTimeSeriesHandler.java
new file mode 100644
index 0000000..4d38687
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/ShowTimeSeriesHandler.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.server.handlers.caller;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
+
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/** Handler for getting the schemas from each data group concurrently. */
+public class ShowTimeSeriesHandler implements AsyncMethodCallback<List<ShowTimeSeriesResult>> {
+
+  private static class ShowTimeSeriesResultComparator implements Comparator<ShowTimeSeriesResult> {
+
+    @Override
+    public int compare(ShowTimeSeriesResult o1, ShowTimeSeriesResult o2) {
+      if (o1 == null && o2 == null) {
+        return 0;
+      } else if (o1 == null) {
+        return -1;
+      } else if (o2 == null) {
+        return 1;
+      }
+      return o1.getName().compareTo(o2.getName());
+    }
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(ShowTimeSeriesHandler.class);
+
+  /** String representation of a partial path for logging */
+  private final String path;
+
+  private final CountDownLatch countDownLatch;
+  private final long startTimeInMs;
+
+  private final Map<String, ShowTimeSeriesResult> timeSeriesNameToResult = new HashMap<>();
+  private final List<Exception> exceptions = new ArrayList<>();
+
+  public ShowTimeSeriesHandler(int numGroup, PartialPath path) {
+    this.countDownLatch = new CountDownLatch(numGroup);
+    this.path = path.toString();
+    this.startTimeInMs = System.currentTimeMillis();
+  }
+
+  @Override
+  public synchronized void onComplete(List<ShowTimeSeriesResult> response) {
+    for (ShowTimeSeriesResult r : response) {
+      timeSeriesNameToResult.put(r.getName(), r);
+    }
+    countDownLatch.countDown();
+    logger.debug(
+        "Got {} timeseries in path {}. Remaining count: {}",
+        response.size(),
+        path,
+        countDownLatch.getCount());
+  }
+
+  @Override
+  public synchronized void onError(Exception exception) {
+    exceptions.add(exception);
+    countDownLatch.countDown();
+    logger.error("Failed to get timeseries in path {} because of {}", path, exception.getMessage());
+  }
+
+  public List<ShowTimeSeriesResult> getResult() throws MetadataException {
+    if (!exceptions.isEmpty()) {
+      MetadataException e =
+          new MetadataException(
+              "Exception happened when getting the result."
+                  + " See the suppressed exceptions for causes.");
+      for (Exception exception : exceptions) {
+        e.addSuppressed(exception);
+      }
+      throw e;
+    }
+
+    // Wait for the results and ignore the interruptions.
+    long timeout = IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold();
+    while (System.currentTimeMillis() - startTimeInMs < timeout) {
+      try {
+        if (countDownLatch.await(
+            System.currentTimeMillis() - startTimeInMs, TimeUnit.MILLISECONDS)) {
+          break;
+        }
+      } catch (InterruptedException ignored) {
+      }
+    }
+
+    if (countDownLatch.getCount() != 0) {
+      String errMsg =
+          String.format(
+              "Failed to get the show timeseries result"
+                  + " since %d nodes didn't respond after %d ms",
+              countDownLatch.getCount(), timeout);
+      logger.error(errMsg);
+      throw new MetadataException(errMsg);
+    }
+
+    return timeSeriesNameToResult.values().stream()
+        .sorted(new ShowTimeSeriesResultComparator())
+        .collect(Collectors.toList());
+  }
+}