You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/11/04 10:15:02 UTC

[iotdb] branch master updated: Improve query performance (#7894)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d0c685b07 Improve query performance (#7894)
8d0c685b07 is described below

commit 8d0c685b07ade3fac46861687d5c9a3749a7238c
Author: Jackie Tien <ja...@gmail.com>
AuthorDate: Fri Nov 4 18:14:56 2022 +0800

    Improve query performance (#7894)
---
 .../db/engine/storagegroup/TsFileResource.java     |  9 +++--
 .../storagegroup/timeindex/DeviceTimeIndex.java    | 11 ++++++
 .../storagegroup/timeindex/FileTimeIndex.java      |  5 +++
 .../engine/storagegroup/timeindex/ITimeIndex.java  |  5 +++
 .../storagegroup/timeindex/V012FileTimeIndex.java  |  6 +++
 .../db/mpp/plan/execution/QueryExecution.java      | 11 +++++-
 .../db/mpp/plan/scheduler/ClusterScheduler.java    |  6 ++-
 .../scheduler/FixedRateFragInsStateTracker.java    | 46 ++++++++++++++++------
 .../plan/scheduler/IFragInstanceStateTracker.java  |  6 +++
 .../mpp/plan/scheduler/SimpleQueryTerminator.java  | 30 +++++++++-----
 .../org/apache/iotdb/db/utils/FileLoaderUtils.java |  3 --
 11 files changed, 108 insertions(+), 30 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index ef039eaea9..0f2fd7424c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -645,7 +645,10 @@ public class TsFileResource {
       return isSatisfied(timeFilter, isSeq, ttl, debug);
     }
 
-    if (!mayContainsDevice(deviceId)) {
+    long[] startAndEndTime = timeIndex.getStartAndEndTime(deviceId);
+
+    // doesn't contain this device
+    if (startAndEndTime == null) {
       if (debug) {
         DEBUG_LOGGER.info(
             "Path: {} file {} is not satisfied because of no device!", deviceId, file);
@@ -653,8 +656,8 @@ public class TsFileResource {
       return false;
     }
 
-    long startTime = getStartTime(deviceId);
-    long endTime = isClosed() || !isSeq ? getEndTime(deviceId) : Long.MAX_VALUE;
+    long startTime = startAndEndTime[0];
+    long endTime = isClosed() || !isSeq ? startAndEndTime[1] : Long.MAX_VALUE;
 
     if (!isAlive(endTime, ttl)) {
       if (debug) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
index b4ef48e847..41c354c50d 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
@@ -362,6 +362,17 @@ public class DeviceTimeIndex implements ITimeIndex {
     return deviceToIndex.containsKey(device);
   }
 
+  @Override
+  public long[] getStartAndEndTime(String deviceId) {
+    Integer index = deviceToIndex.get(deviceId);
+    if (index == null) {
+      return null;
+    } else {
+      int i = index;
+      return new long[] {startTimes[i], endTimes[i]};
+    }
+  }
+
   @Override
   public Pair<Long, Long> getPossibleStartTimeAndEndTime(PartialPath devicePattern) {
     boolean hasMatchedDevice = false;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
index 03c5a1d94b..ed7b6bf18e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
@@ -228,6 +228,11 @@ public class FileTimeIndex implements ITimeIndex {
     return true;
   }
 
+  @Override
+  public long[] getStartAndEndTime(String deviceId) {
+    return new long[] {startTime, endTime};
+  }
+
   @Override
   public Pair<Long, Long> getPossibleStartTimeAndEndTime(PartialPath devicePattern) {
     return new Pair<>(startTime, endTime);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
index fd91d214ea..8db4b76530 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
@@ -194,6 +194,11 @@ public interface ITimeIndex {
    */
   boolean mayContainsDevice(String device);
 
+  /**
+   * @return null if the deviceId doesn't exist, otherwise index 0 is startTime, index 1 is endTime
+   */
+  long[] getStartAndEndTime(String deviceId);
+
   Pair<Long, Long> getPossibleStartTimeAndEndTime(PartialPath devicePattern);
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/V012FileTimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/V012FileTimeIndex.java
index 4f959583ca..2abca47547 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/V012FileTimeIndex.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/V012FileTimeIndex.java
@@ -185,6 +185,12 @@ public class V012FileTimeIndex implements ITimeIndex {
         "V012FileTimeIndex should be rewritten while upgrading and containsDevice() method should not be called any more.");
   }
 
+  @Override
+  public long[] getStartAndEndTime(String deviceId) {
+    throw new UnsupportedOperationException(
+        "V012FileTimeIndex should be rewritten while upgrading and getStartAndEndTime() method should not be called any more.");
+  }
+
   @Override
   public Pair<Long, Long> getPossibleStartTimeAndEndTime(PartialPath devicePattern) {
     throw new UnsupportedOperationException(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 387f9d7258..8c48c6a574 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -77,6 +77,7 @@ import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Throwables.throwIfUnchecked;
@@ -121,6 +122,8 @@ public class QueryExecution implements IQueryExecution {
   private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
       internalServiceClientManager;
 
+  private AtomicBoolean stopped;
+
   public QueryExecution(
       Statement statement,
       MPPQueryContext context,
@@ -161,6 +164,7 @@ public class QueryExecution implements IQueryExecution {
             }
           }
         });
+    this.stopped = new AtomicBoolean(false);
   }
 
   @FunctionalInterface
@@ -315,8 +319,11 @@ public class QueryExecution implements IQueryExecution {
 
   // Stop the workers for this query
   public void stop() {
-    if (this.scheduler != null) {
-      this.scheduler.stop();
+    // only stop once
+    if (stopped.compareAndSet(false, true)) {
+      if (this.scheduler != null) {
+        this.scheduler.stop();
+      }
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index 178b91d149..303a824c0d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -86,7 +86,11 @@ public class ClusterScheduler implements IScheduler {
               stateMachine, scheduledExecutor, instances, internalServiceClientManager);
       this.queryTerminator =
           new SimpleQueryTerminator(
-              scheduledExecutor, queryContext, instances, internalServiceClientManager);
+              scheduledExecutor,
+              queryContext,
+              instances,
+              internalServiceClientManager,
+              stateTracker);
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
index 16554b4b69..97faac95ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
@@ -28,12 +28,14 @@ import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.utils.SetThreadName;
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -77,6 +79,24 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
             TimeUnit.MILLISECONDS);
   }
 
+  @Override
+  public synchronized List<TFragmentInstanceId> filterUnFinishedFIs(
+      List<TFragmentInstanceId> instanceIds) {
+    List<TFragmentInstanceId> res = new ArrayList<>();
+    if (instanceIds == null) {
+      return res;
+    }
+    for (TFragmentInstanceId tFragmentInstanceId : instanceIds) {
+      InstanceStateMetrics stateMetrics =
+          instanceStateMap.get(FragmentInstanceId.fromThrift(tFragmentInstanceId));
+      if (stateMetrics != null
+          && (stateMetrics.lastState == null || !stateMetrics.lastState.isDone())) {
+        res.add(tFragmentInstanceId);
+      }
+    }
+    return res;
+  }
+
   @Override
   public synchronized void abort() {
     aborted = true;
@@ -96,18 +116,20 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
     for (FragmentInstance instance : instances) {
       try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) {
         FragmentInstanceState state = fetchState(instance);
-        InstanceStateMetrics metrics =
-            instanceStateMap.computeIfAbsent(
-                instance.getId(), k -> new InstanceStateMetrics(instance.isRoot()));
-        if (needPrintState(metrics.lastState, state, metrics.durationToLastPrintInMS)) {
-          logger.debug("[PrintFIState] state is {}", state);
-          metrics.reset(state);
-        } else {
-          metrics.addDuration(STATE_FETCH_INTERVAL_IN_MS);
-        }
-
-        if (state != null) {
-          updateQueryState(instance.getId(), state);
+        synchronized (this) {
+          InstanceStateMetrics metrics =
+              instanceStateMap.computeIfAbsent(
+                  instance.getId(), k -> new InstanceStateMetrics(instance.isRoot()));
+          if (needPrintState(metrics.lastState, state, metrics.durationToLastPrintInMS)) {
+            logger.debug("[PrintFIState] state is {}", state);
+            metrics.reset(state);
+          } else {
+            metrics.addDuration(STATE_FETCH_INTERVAL_IN_MS);
+          }
+
+          if (state != null) {
+            updateQueryState(instance.getId(), state);
+          }
         }
       } catch (TException | IOException e) {
         // TODO: do nothing ?
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IFragInstanceStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IFragInstanceStateTracker.java
index 8403a9e7d6..796af2e575 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IFragInstanceStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/IFragInstanceStateTracker.java
@@ -19,8 +19,14 @@
 
 package org.apache.iotdb.db.mpp.plan.scheduler;
 
+import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+
+import java.util.List;
+
 public interface IFragInstanceStateTracker {
   void start();
 
   void abort();
+
+  List<TFragmentInstanceId> filterUnFinishedFIs(List<TFragmentInstanceId> instanceIds);
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
index 1bfd27438f..bda8205edf 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 
-import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +46,8 @@ public class SimpleQueryTerminator implements IQueryTerminator {
   protected ScheduledExecutorService scheduledExecutor;
   private final QueryId queryId;
   private final MPPQueryContext queryContext;
+
+  private final IFragInstanceStateTracker stateTracker;
   private List<TEndPoint> relatedHost;
   private Map<TEndPoint, List<TFragmentInstanceId>> ownedFragmentInstance;
 
@@ -57,11 +58,13 @@ public class SimpleQueryTerminator implements IQueryTerminator {
       ScheduledExecutorService scheduledExecutor,
       MPPQueryContext queryContext,
       List<FragmentInstance> fragmentInstances,
-      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager) {
+      IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager,
+      IFragInstanceStateTracker stateTracker) {
     this.scheduledExecutor = scheduledExecutor;
     this.queryId = queryContext.getQueryId();
     this.queryContext = queryContext;
     this.internalServiceClientManager = internalServiceClientManager;
+    this.stateTracker = stateTracker;
     calculateParameter(fragmentInstances);
   }
 
@@ -85,19 +88,28 @@ public class SimpleQueryTerminator implements IQueryTerminator {
   }
 
   public Boolean syncTerminate() {
+    boolean succeed = true;
     for (TEndPoint endPoint : relatedHost) {
+      // we only send cancel query request if there is remaining unfinished FI in that node
+      List<TFragmentInstanceId> unfinishedFIs =
+          stateTracker.filterUnFinishedFIs(ownedFragmentInstance.get(endPoint));
+      if (unfinishedFIs.isEmpty()) {
+        continue;
+      }
       try (SyncDataNodeInternalServiceClient client =
           internalServiceClientManager.borrowClient(endPoint)) {
-        client.cancelQuery(
-            new TCancelQueryReq(queryId.getId(), ownedFragmentInstance.get(endPoint)));
+        client.cancelQuery(new TCancelQueryReq(queryId.getId(), unfinishedFIs));
       } catch (IOException e) {
-        logger.error("can't connect to node {}", endPoint, e);
-        return false;
-      } catch (TException e) {
-        return false;
+        logger.warn("can't connect to node {}", endPoint, e);
+        // we shouldn't return here and need to cancel queryTasks in other nodes
+        succeed = false;
+      } catch (Throwable t) {
+        logger.warn("cancel query {} on node {} failed.", queryId.getId(), endPoint, t);
+        // we shouldn't return here and need to cancel queryTasks in other nodes
+        succeed = false;
       }
     }
-    return true;
+    return succeed;
   }
 
   private List<TEndPoint> getRelatedHost(List<FragmentInstance> fragmentInstances) {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index 9bafc0463a..ee3ec2a058 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -160,9 +160,6 @@ public class FileLoaderUtils {
     AlignedTimeSeriesMetadata alignedTimeSeriesMetadata = null;
     // If the tsfile is closed, we need to load from tsfile
     if (resource.isClosed()) {
-      if (!resource.getTsFile().exists()) {
-        return null;
-      }
       // load all the TimeseriesMetadata of vector, the first one is for time column and the
       // remaining is for sub sensors
       // the order of timeSeriesMetadata list is same as subSensorList's order