You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2020/11/19 02:39:06 UTC

[iotdb] branch rel/0.11 updated: Refactor LastQueryExecutor to separate the execution into multiple stages

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.11 by this push:
     new 574aa05  Refactor LastQueryExecutor to separate the execution into multiple stages
574aa05 is described below

commit 574aa05ccb4e9f12cd4d118b9850f1f44982c25f
Author: wshao08 <ko...@163.com>
AuthorDate: Tue Nov 17 19:24:37 2020 +0800

    Refactor LastQueryExecutor to separate the execution into multiple stages
    
    (cherry picked from commit 39691323a5b3abb6e5e144b176acdd132f7c6b6b)
---
 .../java/org/apache/iotdb/db/metadata/MTree.java   |  12 +-
 .../query/dataset/groupby/GroupByFillDataSet.java  |  15 +-
 .../iotdb/db/query/executor/LastQueryExecutor.java | 183 ++++++++++++---------
 .../iotdb/db/query/executor/QueryRouter.java       |   6 +-
 4 files changed, 126 insertions(+), 90 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
index 4355319..ba844bf 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.metadata;
 import static java.util.stream.Collectors.toList;
 import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR;
 import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_WILDCARD;
-import static org.apache.iotdb.db.query.executor.LastQueryExecutor.calculateLastPairForOneSeriesLocally;
 
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
@@ -55,6 +54,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
@@ -68,6 +68,8 @@ import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
 import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.query.executor.fill.LastPointReader;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -107,8 +109,12 @@ public class MTree implements Serializable {
       return node.getCachedLast().getTimestamp();
     } else {
       try {
-        last = calculateLastPairForOneSeriesLocally(node.getPartialPath(),
-            node.getSchema().getType(), queryContext, null, Collections.emptySet());
+        QueryDataSource dataSource = QueryResourceManager.getInstance().
+                getQueryDataSource(node.getPartialPath(), queryContext, null);
+        LastPointReader lastReader = new LastPointReader(node.getPartialPath(),
+                node.getSchema().getType(), Collections.emptySet(), queryContext,
+                dataSource, Long.MAX_VALUE, null);
+        last = lastReader.readLastPoint();
         return (last != null ? last.getTimestamp() : Long.MIN_VALUE);
       } catch (Exception e) {
         logger.error("Something wrong happened while trying to get last time value pair of {}",
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
index 97b285f..c79bc41 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByFillDataSet.java
@@ -105,13 +105,16 @@ public class GroupByFillDataSet extends QueryDataSet {
       throws IOException, StorageEngineException, QueryProcessException {
     lastTimeArray = new long[paths.size()];
     Arrays.fill(lastTimeArray, Long.MAX_VALUE);
+    List<PartialPath> seriesPaths = new ArrayList<>();
     for (int i = 0; i < paths.size(); i++) {
-      TimeValuePair lastTimeValuePair;
-      lastTimeValuePair = LastQueryExecutor.calculateLastPairForOneSeriesLocally(
-          (PartialPath) paths.get(i), dataTypes.get(i), context, null,
-          groupByFillPlan.getAllMeasurementsInDevice(paths.get(i).getDevice()));
-      if (lastTimeValuePair != null && lastTimeValuePair.getValue() != null) {
-        lastTimeArray[i] = lastTimeValuePair.getTimestamp();
+      seriesPaths.add((PartialPath) paths.get(i));
+    }
+    List<Pair<Boolean, TimeValuePair>> lastPairList =
+            LastQueryExecutor.calculateLastPairForSeriesLocally(
+                    seriesPaths, dataTypes, context, null, groupByFillPlan);
+    for (int i = 0; i < lastPairList.size(); i++) {
+      if (lastPairList.get(i).left) {
+        lastTimeArray[i] = lastPairList.get(i).right.getTimestamp();
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
index cc598d5..3d30bb4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
@@ -24,9 +24,9 @@ import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_VALUE;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Set;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.query.dataset.ListDataSet;
@@ -51,13 +52,14 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 public class LastQueryExecutor {
 
   private List<PartialPath> selectedSeries;
   private List<TSDataType> dataTypes;
   private IExpression expression;
-  private static boolean lastCacheEnabled =
+  private static final boolean lastCacheEnabled =
           IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled();
 
   public LastQueryExecutor(LastQueryPlan lastQueryPlan) {
@@ -83,36 +85,31 @@ public class LastQueryExecutor {
         Arrays.asList(new PartialPath(COLUMN_TIMESERIES, false), new PartialPath(COLUMN_VALUE, false)),
         Arrays.asList(TSDataType.TEXT, TSDataType.TEXT));
 
-    List<StorageGroupProcessor> list = StorageEngine.getInstance().mergeLock(selectedSeries);
-    try {
-      for (int i = 0; i < selectedSeries.size(); i++) {
-        TimeValuePair lastTimeValuePair;
-        lastTimeValuePair = calculateLastPairForOneSeries(
-            selectedSeries.get(i), dataTypes.get(i), context,
-            lastQueryPlan.getAllMeasurementsInDevice(selectedSeries.get(i).getDevice()));
-        if (lastTimeValuePair != null && lastTimeValuePair.getValue() != null) {
-          RowRecord resultRecord = new RowRecord(lastTimeValuePair.getTimestamp());
-          Field pathField = new Field(TSDataType.TEXT);
-          if (selectedSeries.get(i).getTsAlias() != null) {
-            pathField.setBinaryV(new Binary(selectedSeries.get(i).getTsAlias()));
+    List<Pair<Boolean, TimeValuePair>> lastPairList = calculateLastPairForSeriesLocally(
+            selectedSeries, dataTypes, context, expression, lastQueryPlan);
+
+    for (int i = 0; i < lastPairList.size(); i++) {
+      if (lastPairList.get(i).left) {
+        TimeValuePair lastTimeValuePair = lastPairList.get(i).right;
+        RowRecord resultRecord = new RowRecord(lastTimeValuePair.getTimestamp());
+        Field pathField = new Field(TSDataType.TEXT);
+        if (selectedSeries.get(i).getTsAlias() != null) {
+          pathField.setBinaryV(new Binary(selectedSeries.get(i).getTsAlias()));
+        } else {
+          if (selectedSeries.get(i).getMeasurementAlias() != null) {
+            pathField.setBinaryV(new Binary(selectedSeries.get(i).getFullPathWithAlias()));
           } else {
-            if (selectedSeries.get(i).getMeasurementAlias() != null) {
-              pathField.setBinaryV(new Binary(selectedSeries.get(i).getFullPathWithAlias()));
-            } else {
-              pathField.setBinaryV(new Binary(selectedSeries.get(i).getFullPath()));
-            }
+            pathField.setBinaryV(new Binary(selectedSeries.get(i).getFullPath()));
           }
-          resultRecord.addField(pathField);
+        }
+        resultRecord.addField(pathField);
 
-          Field valueField = new Field(TSDataType.TEXT);
-          valueField.setBinaryV(new Binary(lastTimeValuePair.getValue().getStringValue()));
-          resultRecord.addField(valueField);
+        Field valueField = new Field(TSDataType.TEXT);
+        valueField.setBinaryV(new Binary(lastTimeValuePair.getValue().getStringValue()));
+        resultRecord.addField(valueField);
 
-          dataSet.putRecord(resultRecord);
-        }
+        dataSet.putRecord(resultRecord);
       }
-    } finally {
-      StorageEngine.getInstance().mergeUnLock(list);
     }
 
     if (!lastQueryPlan.isAscending()) {
@@ -121,35 +118,90 @@ public class LastQueryExecutor {
     return dataSet;
   }
 
-  protected TimeValuePair calculateLastPairForOneSeries(
-      PartialPath seriesPath, TSDataType tsDataType, QueryContext context, Set<String> deviceMeasurements)
-      throws IOException, QueryProcessException, StorageEngineException {
-    return calculateLastPairForOneSeriesLocally(seriesPath, tsDataType, context,
-        expression, deviceMeasurements);
+  public static List<Pair<Boolean, TimeValuePair>> calculateLastPairForSeriesLocally(
+          List<PartialPath> seriesPaths, List<TSDataType> dataTypes, QueryContext context,
+          IExpression expression, RawDataQueryPlan lastQueryPlan)
+          throws QueryProcessException, StorageEngineException, IOException {
+    List<LastCacheAccessor> cacheAccessors = new ArrayList<>();
+    Filter filter = (expression == null) ? null : ((GlobalTimeExpression) expression).getFilter();
+
+    List<PartialPath> restPaths = new ArrayList<>();
+    List<Pair<Boolean, TimeValuePair>> resultContainer =
+            readLastPairsFromCache(seriesPaths, filter, cacheAccessors, restPaths);
+    // If any '>' or '>=' filters are specified, only access cache to get Last result.
+    if (filter != null || restPaths.isEmpty()) {
+      return resultContainer;
+    }
+
+    List<LastPointReader> readerList = new ArrayList<>();
+    List<StorageGroupProcessor> list = StorageEngine.getInstance().mergeLock(restPaths);
+    try {
+      for (int i = 0; i < restPaths.size(); i++) {
+        QueryDataSource dataSource =
+                QueryResourceManager.getInstance().getQueryDataSource(seriesPaths.get(i), context, null);
+        LastPointReader lastReader = new LastPointReader(seriesPaths.get(i), dataTypes.get(i),
+                lastQueryPlan.getAllMeasurementsInDevice(seriesPaths.get(i).getDevice()),
+                context, dataSource, Long.MAX_VALUE, null);
+        readerList.add(lastReader);
+      }
+    } finally {
+      StorageEngine.getInstance().mergeUnLock(list);
+    }
+
+    int index = 0;
+    for (int i = 0; i < resultContainer.size(); i++) {
+      if (!resultContainer.get(i).left) {
+        resultContainer.get(i).left = true;
+        resultContainer.get(i).right = readerList.get(index++).readLastPoint();
+        if (lastCacheEnabled) {
+          cacheAccessors.get(i).write(resultContainer.get(i).right);
+        }
+      }
+    }
+    return resultContainer;
   }
 
-  /**
-   * get last result for one series
-   *
-   * @param context query context
-   * @return TimeValuePair, result can be null
-   */
-  public static TimeValuePair calculateLastPairForOneSeriesLocally(
-      PartialPath seriesPath, TSDataType tsDataType, QueryContext context,
-      IExpression expression, Set<String> deviceMeasurements)
-      throws IOException, QueryProcessException, StorageEngineException {
-
-    // Retrieve last value from MNode
-    MeasurementMNode node = null;
-    Filter filter = null;
+  private static List<Pair<Boolean, TimeValuePair>> readLastPairsFromCache(List<PartialPath> seriesPaths,
+          Filter filter, List<LastCacheAccessor> cacheAccessors, List<PartialPath> restPaths) {
+    List<Pair<Boolean, TimeValuePair>> resultContainer = new ArrayList<>();
     if (lastCacheEnabled) {
-      if (expression != null) {
-        filter = ((GlobalTimeExpression) expression).getFilter();
+      for (PartialPath path : seriesPaths) {
+        cacheAccessors.add(new LastCacheAccessor(path, filter));
+      }
+    } else {
+      restPaths.addAll(seriesPaths);
+    }
+    for (int i = 0; i < cacheAccessors.size(); i++) {
+      TimeValuePair tvPair = cacheAccessors.get(i).read();
+      if (tvPair != null) {
+        resultContainer.add(new Pair<>(true, tvPair));
+      } else {
+        resultContainer.add(new Pair<>(false, null));
+        restPaths.add(seriesPaths.get(i));
       }
+    }
+    return resultContainer;
+  }
+
+  private static class LastCacheAccessor {
+    private PartialPath path;
+    private Filter filter;
+    private MeasurementMNode node;
+
+    LastCacheAccessor(PartialPath seriesPath) {
+      this.path = seriesPath;
+    }
+
+    LastCacheAccessor(PartialPath seriesPath, Filter filter) {
+      this.path = seriesPath;
+      this.filter = filter;
+    }
+
+    public TimeValuePair read() {
       try {
-        node = (MeasurementMNode) IoTDB.metaManager.getNodeByPath(seriesPath);
+        node = (MeasurementMNode) IoTDB.metaManager.getNodeByPath(path);
       } catch (MetadataException e) {
-        TimeValuePair timeValuePair = IoTDB.metaManager.getLastCache(seriesPath);
+        TimeValuePair timeValuePair = IoTDB.metaManager.getLastCache(path);
         if (timeValuePair != null && satisfyFilter(filter, timeValuePair)) {
           return timeValuePair;
         } else if (timeValuePair != null) {
@@ -165,37 +217,16 @@ public class LastQueryExecutor {
           return null;
         }
       }
+      return null;
     }
 
-    return calculateLastPairByScanningTsFiles(
-        seriesPath, tsDataType, context, filter, deviceMeasurements, node);
-  }
-
-  private static TimeValuePair calculateLastPairByScanningTsFiles(
-          PartialPath seriesPath, TSDataType tsDataType, QueryContext context,
-          Filter filter, Set<String> deviceMeasurements, MeasurementMNode node)
-      throws QueryProcessException, StorageEngineException, IOException {
-
-    QueryDataSource dataSource =
-        QueryResourceManager.getInstance().getQueryDataSource(seriesPath, context, filter);
-
-    LastPointReader lastReader = new LastPointReader(
-        seriesPath, tsDataType, deviceMeasurements, context, dataSource, Long.MAX_VALUE, filter);
-    TimeValuePair resultPair = lastReader.readLastPoint();
-
-    // Update cached last value with low priority unless "FROM" expression exists
-    if (lastCacheEnabled) {
-      IoTDB.metaManager.updateLastCache(
-          seriesPath, resultPair, false, Long.MIN_VALUE, node);
+    public void write(TimeValuePair pair) {
+      IoTDB.metaManager.updateLastCache(path, pair, false, Long.MIN_VALUE, node);
     }
-    return resultPair;
   }
 
   private static boolean satisfyFilter(Filter filter, TimeValuePair tvPair) {
-    if (filter == null ||
-        filter.satisfy(tvPair.getTimestamp(), tvPair.getValue().getValue())) {
-      return true;
-    }
-    return false;
+    return filter == null ||
+            filter.satisfy(tvPair.getTimestamp(), tvPair.getValue().getValue());
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
index 8c7750e..90289e7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
@@ -225,12 +225,8 @@ public class QueryRouter implements IQueryRouter {
   @Override
   public QueryDataSet lastQuery(LastQueryPlan lastQueryPlan, QueryContext context)
       throws StorageEngineException, QueryProcessException, IOException {
-    LastQueryExecutor lastQueryExecutor = getLastQueryExecutor(lastQueryPlan);
+    LastQueryExecutor lastQueryExecutor = new LastQueryExecutor(lastQueryPlan);
     return lastQueryExecutor.execute(context, lastQueryPlan);
   }
 
-  protected LastQueryExecutor getLastQueryExecutor(LastQueryPlan lastQueryPlan) {
-    return new LastQueryExecutor(lastQueryPlan);
-  }
-
 }