You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/11/27 05:24:22 UTC
[iotdb] 04/10: [IOTDB-2010] fix incomplete show timeseries result (#4388)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master_performance
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0d0363ad1a3db3e15e8f00ee6eb79c3c0292a8d0
Author: Zhong Wang <wa...@alibaba-inc.com>
AuthorDate: Fri Nov 26 10:41:12 2021 +0800
[IOTDB-2010] fix incomplete show timeseries result (#4388)
* [IOTDB-2010] fix incomplete show timeseries result
* Self-review
1. drop duplicated show timeseries results
2. fix coding style issue
---
.../apache/iotdb/cluster/metadata/CMManager.java | 68 +++++------
.../handlers/caller/ShowTimeSeriesHandler.java | 133 +++++++++++++++++++++
2 files changed, 163 insertions(+), 38 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index e2d9280..ab81bd4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
import org.apache.iotdb.cluster.rpc.thrift.GetAllPathsResult;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
+import org.apache.iotdb.cluster.server.handlers.caller.ShowTimeSeriesHandler;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.cluster.utils.ClusterQueryUtils;
@@ -1392,19 +1393,18 @@ public class CMManager extends MManager {
@Override
public List<ShowTimeSeriesResult> showTimeseries(ShowTimeSeriesPlan plan, QueryContext context)
throws MetadataException {
- ConcurrentSkipListSet<ShowTimeSeriesResult> resultSet = new ConcurrentSkipListSet<>();
ExecutorService pool =
new ThreadPoolExecutor(
THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
- List<PartitionGroup> globalGroups = new ArrayList<>();
+ List<PartitionGroup> groups = new ArrayList<>();
try {
PartitionGroup partitionGroup =
metaGroupMember.getPartitionTable().partitionByPathTime(plan.getPath(), 0);
- globalGroups.add(partitionGroup);
+ groups.add(partitionGroup);
} catch (MetadataException e) {
// if the path location is not find, obtain the path location from all groups.
- globalGroups = metaGroupMember.getPartitionTable().getGlobalGroups();
+ groups = metaGroupMember.getPartitionTable().getGlobalGroups();
}
int limit = plan.getLimit() == 0 ? Integer.MAX_VALUE : plan.getLimit();
@@ -1421,35 +1421,31 @@ public class CMManager extends MManager {
}
if (logger.isDebugEnabled()) {
- logger.debug(
- "Fetch timeseries schemas of {} from {} groups", plan.getPath(), globalGroups.size());
+ logger.debug("Fetch timeseries schemas of {} from {} groups", plan.getPath(), groups.size());
}
+ ShowTimeSeriesHandler handler = new ShowTimeSeriesHandler(groups.size(), plan.getPath());
List<Future<Void>> futureList = new ArrayList<>();
- for (PartitionGroup group : globalGroups) {
+ for (PartitionGroup group : groups) {
futureList.add(
pool.submit(
() -> {
- try {
- showTimeseries(group, plan, resultSet, context);
- } catch (CheckConsistencyException | MetadataException e) {
- logger.error("Cannot get show timeseries result of {} from {}", plan, group);
- }
+ showTimeseries(group, plan, context, handler);
return null;
}));
}
waitForThreadPool(futureList, pool, "showTimeseries()");
List<ShowTimeSeriesResult> showTimeSeriesResults =
- applyShowTimeseriesLimitOffset(resultSet, limit, offset);
+ applyShowTimeseriesLimitOffset(handler.getResult(), limit, offset);
logger.debug("Show {} has {} results", plan.getPath(), showTimeSeriesResults.size());
return showTimeSeriesResults;
}
private List<ShowTimeSeriesResult> applyShowTimeseriesLimitOffset(
- ConcurrentSkipListSet<ShowTimeSeriesResult> resultSet, int limit, int offset) {
+ List<ShowTimeSeriesResult> results, int limit, int offset) {
List<ShowTimeSeriesResult> showTimeSeriesResults = new ArrayList<>();
- Iterator<ShowTimeSeriesResult> iterator = resultSet.iterator();
+ Iterator<ShowTimeSeriesResult> iterator = results.iterator();
while (iterator.hasNext() && limit > 0) {
if (offset > 0) {
offset--;
@@ -1482,13 +1478,12 @@ public class CMManager extends MManager {
private void showTimeseries(
PartitionGroup group,
ShowTimeSeriesPlan plan,
- Set<ShowTimeSeriesResult> resultSet,
- QueryContext context)
- throws CheckConsistencyException, MetadataException {
+ QueryContext context,
+ ShowTimeSeriesHandler handler) {
if (group.contains(metaGroupMember.getThisNode())) {
- showLocalTimeseries(group, plan, resultSet, context);
+ showLocalTimeseries(group, plan, context, handler);
} else {
- showRemoteTimeseries(group, plan, resultSet);
+ showRemoteTimeseries(group, plan, handler);
}
}
@@ -1521,28 +1516,21 @@ public class CMManager extends MManager {
private void showLocalTimeseries(
PartitionGroup group,
ShowTimeSeriesPlan plan,
- Set<ShowTimeSeriesResult> resultSet,
- QueryContext context)
- throws CheckConsistencyException, MetadataException {
- DataGroupMember localDataMember =
- metaGroupMember.getLocalDataMember(group.getHeader(), group.getRaftId());
- localDataMember.syncLeaderWithConsistencyCheck(false);
+ QueryContext context,
+ ShowTimeSeriesHandler handler) {
try {
+ DataGroupMember localDataMember =
+ metaGroupMember.getLocalDataMember(group.getHeader(), group.getRaftId());
+ localDataMember.syncLeaderWithConsistencyCheck(false);
List<ShowTimeSeriesResult> localResult = super.showTimeseries(plan, context);
- resultSet.addAll(localResult);
- logger.debug(
- "Fetched local timeseries {} schemas of {} from {}",
- localResult.size(),
- plan.getPath(),
- group);
- } catch (MetadataException e) {
- logger.error("Cannot execute show timeseries plan {} from {} locally.", plan, group);
- throw e;
+ handler.onComplete(localResult);
+ } catch (MetadataException | CheckConsistencyException e) {
+ handler.onError(e);
}
}
private void showRemoteTimeseries(
- PartitionGroup group, ShowTimeSeriesPlan plan, Set<ShowTimeSeriesResult> resultSet) {
+ PartitionGroup group, ShowTimeSeriesPlan plan, ShowTimeSeriesHandler handler) {
ByteBuffer resultBinary = null;
for (Node node : group) {
try {
@@ -1560,13 +1548,17 @@ public class CMManager extends MManager {
if (resultBinary != null) {
int size = resultBinary.getInt();
+ List<ShowTimeSeriesResult> results = new ArrayList<>();
logger.debug(
"Fetched remote timeseries {} schemas of {} from {}", size, plan.getPath(), group);
for (int i = 0; i < size; i++) {
- resultSet.add(ShowTimeSeriesResult.deserialize(resultBinary));
+ results.add(ShowTimeSeriesResult.deserialize(resultBinary));
}
+ handler.onComplete(results);
} else {
- logger.error("Failed to execute show timeseries {} in group: {}.", plan, group);
+ String errMsg =
+ String.format("Failed to get timeseries in path %s from group %s", plan.getPath(), group);
+ handler.onError(new MetadataException(errMsg));
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/ShowTimeSeriesHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/ShowTimeSeriesHandler.java
new file mode 100644
index 0000000..4d38687
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/ShowTimeSeriesHandler.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.server.handlers.caller;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
+
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/** Handler for getting the schemas from each data group concurrently. */
+public class ShowTimeSeriesHandler implements AsyncMethodCallback<List<ShowTimeSeriesResult>> {
+
+ private static class ShowTimeSeriesResultComparator implements Comparator<ShowTimeSeriesResult> {
+
+ @Override
+ public int compare(ShowTimeSeriesResult o1, ShowTimeSeriesResult o2) {
+ if (o1 == null && o2 == null) {
+ return 0;
+ } else if (o1 == null) {
+ return -1;
+ } else if (o2 == null) {
+ return 1;
+ }
+ return o1.getName().compareTo(o2.getName());
+ }
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(ShowTimeSeriesHandler.class);
+
+ /** String representation of a partial path for logging */
+ private final String path;
+
+ private final CountDownLatch countDownLatch;
+ private final long startTimeInMs;
+
+ private final Map<String, ShowTimeSeriesResult> timeSeriesNameToResult = new HashMap<>();
+ private final List<Exception> exceptions = new ArrayList<>();
+
+ public ShowTimeSeriesHandler(int numGroup, PartialPath path) {
+ this.countDownLatch = new CountDownLatch(numGroup);
+ this.path = path.toString();
+ this.startTimeInMs = System.currentTimeMillis();
+ }
+
+ @Override
+ public synchronized void onComplete(List<ShowTimeSeriesResult> response) {
+ for (ShowTimeSeriesResult r : response) {
+ timeSeriesNameToResult.put(r.getName(), r);
+ }
+ countDownLatch.countDown();
+ logger.debug(
+ "Got {} timeseries in path {}. Remaining count: {}",
+ response.size(),
+ path,
+ countDownLatch.getCount());
+ }
+
+ @Override
+ public synchronized void onError(Exception exception) {
+ exceptions.add(exception);
+ countDownLatch.countDown();
+ logger.error("Failed to get timeseries in path {} because of {}", path, exception.getMessage());
+ }
+
+ public List<ShowTimeSeriesResult> getResult() throws MetadataException {
+ if (!exceptions.isEmpty()) {
+ MetadataException e =
+ new MetadataException(
+ "Exception happened when getting the result."
+ + " See the suppressed exceptions for causes.");
+ for (Exception exception : exceptions) {
+ e.addSuppressed(exception);
+ }
+ throw e;
+ }
+
+ // Wait for the results and ignore the interruptions.
+ long timeout = IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold();
+ while (System.currentTimeMillis() - startTimeInMs < timeout) {
+ try {
+ if (countDownLatch.await(
+ System.currentTimeMillis() - startTimeInMs, TimeUnit.MILLISECONDS)) {
+ break;
+ }
+ } catch (InterruptedException ignored) {
+ }
+ }
+
+ if (countDownLatch.getCount() != 0) {
+ String errMsg =
+ String.format(
+ "Failed to get the show timeseries result"
+ + " since %d nodes didn't respond after %d ms",
+ countDownLatch.getCount(), timeout);
+ logger.error(errMsg);
+ throw new MetadataException(errMsg);
+ }
+
+ return timeSeriesNameToResult.values().stream()
+ .sorted(new ShowTimeSeriesResultComparator())
+ .collect(Collectors.toList());
+ }
+}