You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/11/03 06:54:29 UTC
[iotdb] 01/01: Improve query performance
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch QueryPerf
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 234d3d7fe28ddbd8274c411ce5357d685ff2541c
Author: JackieTien97 <ja...@gmail.com>
AuthorDate: Thu Nov 3 14:54:12 2022 +0800
Improve query performance
---
.../db/engine/storagegroup/TsFileResource.java | 9 +++--
.../storagegroup/timeindex/DeviceTimeIndex.java | 11 ++++++
.../storagegroup/timeindex/FileTimeIndex.java | 5 +++
.../engine/storagegroup/timeindex/ITimeIndex.java | 5 +++
.../storagegroup/timeindex/V012FileTimeIndex.java | 6 +++
.../db/mpp/plan/execution/QueryExecution.java | 11 +++++-
.../db/mpp/plan/scheduler/ClusterScheduler.java | 6 ++-
.../scheduler/FixedRateFragInsStateTracker.java | 46 ++++++++++++++++------
.../plan/scheduler/IFragInstanceStateTracker.java | 6 +++
.../mpp/plan/scheduler/SimpleQueryTerminator.java | 30 +++++++++-----
.../org/apache/iotdb/db/utils/FileLoaderUtils.java | 3 --
11 files changed, 108 insertions(+), 30 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index ef039eaea9..0f2fd7424c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -645,7 +645,10 @@ public class TsFileResource {
return isSatisfied(timeFilter, isSeq, ttl, debug);
}
- if (!mayContainsDevice(deviceId)) {
+ long[] startAndEndTime = timeIndex.getStartAndEndTime(deviceId);
+
+ // doesn't contain this device
+ if (startAndEndTime == null) {
if (debug) {
DEBUG_LOGGER.info(
"Path: {} file {} is not satisfied because of no device!", deviceId, file);
@@ -653,8 +656,8 @@ public class TsFileResource {
return false;
}
- long startTime = getStartTime(deviceId);
- long endTime = isClosed() || !isSeq ? getEndTime(deviceId) : Long.MAX_VALUE;
+ long startTime = startAndEndTime[0];
+ long endTime = isClosed() || !isSeq ? startAndEndTime[1] : Long.MAX_VALUE;
if (!isAlive(endTime, ttl)) {
if (debug) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
index b4ef48e847..41c354c50d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
@@ -362,6 +362,17 @@ public class DeviceTimeIndex implements ITimeIndex {
return deviceToIndex.containsKey(device);
}
+ @Override
+ public long[] getStartAndEndTime(String deviceId) {
+ Integer index = deviceToIndex.get(deviceId);
+ if (index == null) {
+ return null;
+ } else {
+ int i = index;
+ return new long[] {startTimes[i], endTimes[i]};
+ }
+ }
+
@Override
public Pair<Long, Long> getPossibleStartTimeAndEndTime(PartialPath devicePattern) {
boolean hasMatchedDevice = false;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
index 03c5a1d94b..ed7b6bf18e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
@@ -228,6 +228,11 @@ public class FileTimeIndex implements ITimeIndex {
return true;
}
+ @Override
+ public long[] getStartAndEndTime(String deviceId) {
+ return new long[] {startTime, endTime};
+ }
+
@Override
public Pair<Long, Long> getPossibleStartTimeAndEndTime(PartialPath devicePattern) {
return new Pair<>(startTime, endTime);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
index fd91d214ea..8db4b76530 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
@@ -194,6 +194,11 @@ public interface ITimeIndex {
*/
boolean mayContainsDevice(String device);
+ /**
+ * @return null if the deviceId doesn't exist, otherwise index 0 is startTime, index 1 is endTime
+ */
+ long[] getStartAndEndTime(String deviceId);
+
Pair<Long, Long> getPossibleStartTimeAndEndTime(PartialPath devicePattern);
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/V012FileTimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/V012FileTimeIndex.java
index 4f959583ca..2abca47547 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/V012FileTimeIndex.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/V012FileTimeIndex.java
@@ -185,6 +185,12 @@ public class V012FileTimeIndex implements ITimeIndex {
"V012FileTimeIndex should be rewritten while upgrading and containsDevice() method should not be called any more.");
}
+ @Override
+ public long[] getStartAndEndTime(String deviceId) {
+ throw new UnsupportedOperationException(
+ "V012FileTimeIndex should be rewritten while upgrading and getStartAndEndTime() method should not be called any more.");
+ }
+
@Override
public Pair<Long, Long> getPossibleStartTimeAndEndTime(PartialPath devicePattern) {
throw new UnsupportedOperationException(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 387f9d7258..8c48c6a574 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -77,6 +77,7 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Throwables.throwIfUnchecked;
@@ -121,6 +122,8 @@ public class QueryExecution implements IQueryExecution {
private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
internalServiceClientManager;
+ private AtomicBoolean stopped;
+
public QueryExecution(
Statement statement,
MPPQueryContext context,
@@ -161,6 +164,7 @@ public class QueryExecution implements IQueryExecution {
}
}
});
+ this.stopped = new AtomicBoolean(false);
}
@FunctionalInterface
@@ -315,8 +319,11 @@ public class QueryExecution implements IQueryExecution {
// Stop the workers for this query
public void stop() {
- if (this.scheduler != null) {
- this.scheduler.stop();
+ // only stop once
+ if (stopped.compareAndSet(false, true)) {
+ if (this.scheduler != null) {
+ this.scheduler.stop();
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index 178b91d149..303a824c0d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -86,7 +86,11 @@ public class ClusterScheduler implements IScheduler {
stateMachine, scheduledExecutor, instances, internalServiceClientManager);
this.queryTerminator =
new SimpleQueryTerminator(
- scheduledExecutor, queryContext, instances, internalServiceClientManager);
+ scheduledExecutor,
+ queryContext,
+ instances,
+ internalServiceClientManager,
+ stateTracker);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
index 16554b4b69..97faac95ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
@@ -28,12 +28,14 @@ import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.utils.SetThreadName;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -77,6 +79,24 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
TimeUnit.MILLISECONDS);
}
+ @Override
+ public synchronized List<TFragmentInstanceId> filterUnFinishedFIs(
+ List<TFragmentInstanceId> instanceIds) {
+ List<TFragmentInstanceId> res = new ArrayList<>();
+ if (instanceIds == null) {
+ return res;
+ }
+ for (TFragmentInstanceId tFragmentInstanceId : instanceIds) {
+ InstanceStateMetrics stateMetrics =
+ instanceStateMap.get(FragmentInstanceId.fromThrift(tFragmentInstanceId));
+ if (stateMetrics != null
+ && (stateMetrics.lastState == null || !stateMetrics.lastState.isDone())) {
+ res.add(tFragmentInstanceId);
+ }
+ }
+ return res;
+ }
+
@Override
public synchronized void abort() {
aborted = true;
@@ -96,18 +116,20 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
for (FragmentInstance instance : instances) {
try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) {
FragmentInstanceState state = fetchState(instance);
- InstanceStateMetrics metrics =
- instanceStateMap.computeIfAbsent(
- instance.getId(), k -> new InstanceStateMetrics(instance.isRoot()));
- if (needPrintState(metrics.lastState, state, metrics.durationToLastPrintInMS)) {
- logger.debug("[PrintFIState] state is {}", state);
- metrics.reset(state);
- } else {
- metrics.addDuration(STATE_FETCH_INTERVAL_IN_MS);
- }
-
- if (state != null) {
- updateQueryState(instance.getId(), state);
+ synchronized (this) {
+ InstanceStateMetrics metrics =
+ instanceStateMap.computeIfAbsent(
+ instance.getId(), k -> new InstanceStateMetrics(instance.isRoot()));
+ if (needPrintState(metrics.lastState, state, metrics.durationToLastPrintInMS)) {
+ logger.debug("[PrintFIState] state is {}", state);
+ metrics.reset(state);
+ } else {
+ metrics.addDuration(STATE_FETCH_INTERVAL_IN_MS);
+ }
+
+ if (state != null) {
+ updateQueryState(instance.getId(), state);
+ }
}
} catch (TException | IOException e) {
// TODO: do nothing ?
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IFragInstanceStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IFragInstanceStateTracker.java
index 8403a9e7d6..796af2e575 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IFragInstanceStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IFragInstanceStateTracker.java
@@ -19,8 +19,14 @@
package org.apache.iotdb.db.mpp.plan.scheduler;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+
+import java.util.List;
+
public interface IFragInstanceStateTracker {
void start();
void abort();
+
+ List<TFragmentInstanceId> filterUnFinishedFIs(List<TFragmentInstanceId> instanceIds);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
index 1bfd27438f..bda8205edf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
-import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +46,8 @@ public class SimpleQueryTerminator implements IQueryTerminator {
protected ScheduledExecutorService scheduledExecutor;
private final QueryId queryId;
private final MPPQueryContext queryContext;
+
+ private final IFragInstanceStateTracker stateTracker;
private List<TEndPoint> relatedHost;
private Map<TEndPoint, List<TFragmentInstanceId>> ownedFragmentInstance;
@@ -57,11 +58,13 @@ public class SimpleQueryTerminator implements IQueryTerminator {
ScheduledExecutorService scheduledExecutor,
MPPQueryContext queryContext,
List<FragmentInstance> fragmentInstances,
- IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
+ IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager,
+ IFragInstanceStateTracker stateTracker) {
this.scheduledExecutor = scheduledExecutor;
this.queryId = queryContext.getQueryId();
this.queryContext = queryContext;
this.internalServiceClientManager = internalServiceClientManager;
+ this.stateTracker = stateTracker;
calculateParameter(fragmentInstances);
}
@@ -85,19 +88,28 @@ public class SimpleQueryTerminator implements IQueryTerminator {
}
public Boolean syncTerminate() {
+ boolean succeed = true;
for (TEndPoint endPoint : relatedHost) {
+ // we only send cancel query request if there is remaining unfinished FI in that node
+ List<TFragmentInstanceId> unfinishedFIs =
+ stateTracker.filterUnFinishedFIs(ownedFragmentInstance.get(endPoint));
+ if (unfinishedFIs.isEmpty()) {
+ continue;
+ }
try (SyncDataNodeInternalServiceClient client =
internalServiceClientManager.borrowClient(endPoint)) {
- client.cancelQuery(
- new TCancelQueryReq(queryId.getId(), ownedFragmentInstance.get(endPoint)));
+ client.cancelQuery(new TCancelQueryReq(queryId.getId(), unfinishedFIs));
} catch (IOException e) {
- logger.error("can't connect to node {}", endPoint, e);
- return false;
- } catch (TException e) {
- return false;
+ logger.warn("can't connect to node {}", endPoint, e);
+ // we shouldn't return here and need to cancel queryTasks in other nodes
+ succeed = false;
+ } catch (Throwable t) {
+ logger.warn("cancel query {} on node {} failed.", queryId.getId(), endPoint, t);
+ // we shouldn't return here and need to cancel queryTasks in other nodes
+ succeed = false;
}
}
- return true;
+ return succeed;
}
private List<TEndPoint> getRelatedHost(List<FragmentInstance> fragmentInstances) {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index 9bafc0463a..ee3ec2a058 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -160,9 +160,6 @@ public class FileLoaderUtils {
AlignedTimeSeriesMetadata alignedTimeSeriesMetadata = null;
// If the tsfile is closed, we need to load from tsfile
if (resource.isClosed()) {
- if (!resource.getTsFile().exists()) {
- return null;
- }
// load all the TimeseriesMetadata of vector, the first one is for time column and the
// remaining is for sub sensors
// the order of timeSeriesMetadata list is same as subSensorList's order