You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2020/04/30 09:11:32 UTC

[incubator-iotdb] 01/01: [IOTDB-631] Using new TsFile MetadataIndex to optimize query and cache

This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch jira_631
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 4339c68bbbfb0886d3661a3308b90feb83f681b1
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Thu Apr 30 17:10:41 2020 +0800

    [IOTDB-631] Using new TsFile MetadataIndex to optimize query and cache
---
 .../db/engine/cache/TimeSeriesMetadataCache.java   | 63 +++++++++++------
 .../tsfile/file/metadata/MetadataIndexNode.java    |  2 +-
 .../iotdb/tsfile/read/TsFileSequenceReader.java    | 79 +++++++++++++++++-----
 .../read/controller/MetadataQuerierByFileImpl.java | 10 ++-
 4 files changed, 109 insertions(+), 45 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
index e586cde..1fb7c8a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cache/TimeSeriesMetadataCache.java
@@ -19,6 +19,13 @@
 
 package org.apache.iotdb.db.engine.cache;
 
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -30,22 +37,17 @@ import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.utils.BloomFilter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
- * This class is used to cache <code>TimeSeriesMetadata</code> in IoTDB. The caching
- * strategy is LRU.
+ * This class is used to cache <code>TimeSeriesMetadata</code> in IoTDB. The caching strategy is
+ * LRU.
  */
 public class TimeSeriesMetadataCache {
 
   private static final Logger logger = LoggerFactory.getLogger(TimeSeriesMetadataCache.class);
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-  private static final long MEMORY_THRESHOLD_IN_TIME_SERIES_METADATA_CACHE = config.getAllocateMemoryForTimeSeriesMetaDataCache();
+  private static final long MEMORY_THRESHOLD_IN_TIME_SERIES_METADATA_CACHE = config
+      .getAllocateMemoryForTimeSeriesMetaDataCache();
   private static boolean cacheEnable = config.isMetaDataCacheEnable();
 
   private final LRULinkedHashMap<TimeSeriesMetadataCacheKey, TimeseriesMetadata> lruCache;
@@ -58,9 +60,11 @@ public class TimeSeriesMetadataCache {
 
   private TimeSeriesMetadataCache() {
     logger.info("TimeseriesMetadataCache size = " + MEMORY_THRESHOLD_IN_TIME_SERIES_METADATA_CACHE);
-    lruCache = new LRULinkedHashMap<TimeSeriesMetadataCacheKey, TimeseriesMetadata>(MEMORY_THRESHOLD_IN_TIME_SERIES_METADATA_CACHE, true) {
+    lruCache = new LRULinkedHashMap<TimeSeriesMetadataCacheKey, TimeseriesMetadata>(
+        MEMORY_THRESHOLD_IN_TIME_SERIES_METADATA_CACHE, true) {
       int count = 0;
       long averageSize = 0;
+
       @Override
       protected long calEntrySize(TimeSeriesMetadataCacheKey key, TimeseriesMetadata value) {
         if (count < 10) {
@@ -83,7 +87,8 @@ public class TimeSeriesMetadataCache {
     return TimeSeriesMetadataCache.TimeSeriesMetadataCacheHolder.INSTANCE;
   }
 
-  public TimeseriesMetadata get(TimeSeriesMetadataCacheKey key, Set<String> allSensors) throws IOException {
+  public TimeseriesMetadata get(TimeSeriesMetadataCacheKey key, Set<String> allSensors)
+      throws IOException {
     if (!cacheEnable) {
       // bloom filter part
       TsFileMetadata fileMetaData = TsFileMetaDataCache.getInstance().get(key.filePath);
@@ -125,9 +130,20 @@ public class TimeSeriesMetadataCache {
         return null;
       }
       TsFileSequenceReader reader = FileReaderManager.getInstance().get(key.filePath, true);
-      TimeseriesMetadata timeseriesMetadata = reader.readTimeseriesMetadata(new Path(key.device, key.measurement));
-      lruCache.put(key, timeseriesMetadata);
-      return timeseriesMetadata;
+      TimeseriesMetadata resultTimeseriesMetadata = reader
+          .readTimeseriesMetadata(new Path(key.device, key.measurement));
+      lruCache.put(key, resultTimeseriesMetadata);
+
+      List<TimeseriesMetadata> timeSeriesMetadataList = reader
+          .readTimeseriesMetadata(key.device, allSensors);
+      if (!allSensors.isEmpty()) {
+        // put TimeSeriesMetadata of all sensors used in this query into cache
+        timeSeriesMetadataList.forEach(timeseriesMetadata -> {
+          lruCache.put(new TimeSeriesMetadataCacheKey(key.filePath, key.device,
+              timeseriesMetadata.getMeasurementId()), timeseriesMetadata);
+        });
+      }
+      return resultTimeseriesMetadata;
     } catch (IOException e) {
       logger.error("something wrong happened while reading {}", key.filePath);
       throw e;
@@ -143,9 +159,9 @@ public class TimeSeriesMetadataCache {
       return;
     }
     logger.debug(
-            "[TimeSeriesMetadata cache {}hit] The number of requests for cache is {}, hit rate is {}.",
-            isHit ? "" : "didn't ", cacheRequestNum.get(),
-            cacheHitNum.get() * 1.0 / cacheRequestNum.get());
+        "[TimeSeriesMetadata cache {}hit] The number of requests for cache is {}, hit rate is {}.",
+        isHit ? "" : "didn't ", cacheRequestNum.get(),
+        cacheHitNum.get() * 1.0 / cacheRequestNum.get());
   }
 
   public double calculateTimeSeriesMetadataHitRatio() {
@@ -177,6 +193,7 @@ public class TimeSeriesMetadataCache {
   }
 
   public static class TimeSeriesMetadataCacheKey {
+
     private String filePath;
     private String device;
     private String measurement;
@@ -189,12 +206,16 @@ public class TimeSeriesMetadataCache {
 
     @Override
     public boolean equals(Object o) {
-      if (this == o) return true;
-      if (o == null || getClass() != o.getClass()) return false;
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
       TimeSeriesMetadataCacheKey that = (TimeSeriesMetadataCacheKey) o;
       return Objects.equals(filePath, that.filePath) &&
-              Objects.equals(device, that.device) &&
-              Objects.equals(measurement, that.measurement);
+          Objects.equals(device, that.device) &&
+          Objects.equals(measurement, that.measurement);
     }
 
     @Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
index 275521e..28fe5df 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/MetadataIndexNode.java
@@ -120,6 +120,6 @@ public class MetadataIndexNode {
         return mid; // key found
       }
     }
-    return low - 1;  // key not found
+    return low == 0 ? low : low - 1;  // key not found
   }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 1e3c8a7..b11a524 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -22,7 +22,6 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
@@ -34,7 +33,6 @@ import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.compress.IUnCompressor;
@@ -178,9 +176,8 @@ public class TsFileSequenceReader implements AutoCloseable {
       metadataSize.flip();
       // read file metadata size and position
       fileMetadataSize = ReadWriteIOUtils.readInt(metadataSize);
-      fileMetadataPos =
-          tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length - Integer.BYTES
-              - fileMetadataSize;
+      fileMetadataPos = tsFileInput.size() - TSFileConfig.MAGIC_STRING.getBytes().length
+          - Integer.BYTES - fileMetadataSize;
     }
   }
 
@@ -314,13 +311,13 @@ public class TsFileSequenceReader implements AutoCloseable {
   public TimeseriesMetadata readTimeseriesMetadata(Path path) throws IOException {
     readFileMetadata();
     MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex();
-    Pair<MetadataIndexEntry, Long> metadataIndexPair = getMetaDataAndEndOffset(
+    Pair<MetadataIndexEntry, Long> metadataIndexPair = getMetadataAndEndOffset(
         deviceMetadataIndexNode, path.getDevice(), MetadataIndexNodeType.INTERNAL_DEVICE);
     ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
     while (!metadataIndexPair.left.getChildNodeType()
         .equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
       MetadataIndexNode metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer);
-      metadataIndexPair = getMetaDataAndEndOffset(metadataIndexNode,
+      metadataIndexPair = getMetadataAndEndOffset(metadataIndexNode,
           path.getMeasurement(), MetadataIndexNodeType.INTERNAL_MEASUREMENT);
     }
     List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
@@ -328,14 +325,62 @@ public class TsFileSequenceReader implements AutoCloseable {
     while (buffer.hasRemaining()) {
       timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer));
     }
-    String[] measurementNameList = timeseriesMetadataList.stream()
-        .map(TimeseriesMetadata::getMeasurementId).collect(Collectors.toList())
-        .toArray(new String[timeseriesMetadataList.size()]);
-
     // return null if path does not exist in the TsFile
-    int searchResult;
-    return (searchResult = Arrays.binarySearch(measurementNameList, path.getMeasurement())) >= 0
-        ? timeseriesMetadataList.get(searchResult) : null;
+    int searchResult = binarySearchInTimeseriesMetadataList(timeseriesMetadataList,
+        path.getMeasurement());
+    return searchResult >= 0 ? timeseriesMetadataList.get(searchResult) : null;
+  }
+
+  public List<TimeseriesMetadata> readTimeseriesMetadata(String device, Set<String> measurements)
+      throws IOException {
+    readFileMetadata();
+    MetadataIndexNode deviceMetadataIndexNode = tsFileMetaData.getMetadataIndex();
+    Pair<MetadataIndexEntry, Long> metadataIndexPair = getMetadataAndEndOffset(
+        deviceMetadataIndexNode, device, MetadataIndexNodeType.INTERNAL_DEVICE);
+    List<TimeseriesMetadata> resultTimeseriesMetadataList = new ArrayList<>();
+    for (String measurement : measurements) {
+      ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
+      Pair<MetadataIndexEntry, Long> measurementMetadataIndexPair = metadataIndexPair;
+      List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
+      while (!measurementMetadataIndexPair.left.getChildNodeType()
+          .equals(MetadataIndexNodeType.LEAF_MEASUREMENT)) {
+        MetadataIndexNode metadataIndexNode = MetadataIndexNode.deserializeFrom(buffer);
+        measurementMetadataIndexPair = getMetadataAndEndOffset(metadataIndexNode,
+            measurement, MetadataIndexNodeType.INTERNAL_MEASUREMENT);
+      }
+      buffer = readData(measurementMetadataIndexPair.left.getOffset(),
+          measurementMetadataIndexPair.right);
+      while (buffer.hasRemaining()) {
+        timeseriesMetadataList.add(TimeseriesMetadata.deserializeFrom(buffer));
+      }
+      int searchResult = binarySearchInTimeseriesMetadataList(timeseriesMetadataList,
+          measurement);
+      if (searchResult >= 0) {
+        resultTimeseriesMetadataList.add(timeseriesMetadataList.get(searchResult));
+      }
+    }
+    return resultTimeseriesMetadataList;
+  }
+
+  private int binarySearchInTimeseriesMetadataList(List<TimeseriesMetadata> timeseriesMetadataList,
+      String key) {
+    int low = 0;
+    int high = timeseriesMetadataList.size() - 1;
+
+    while (low <= high) {
+      int mid = (low + high) >>> 1;
+      TimeseriesMetadata midVal = timeseriesMetadataList.get(mid);
+      int cmp = midVal.getMeasurementId().compareTo(key);
+
+      if (cmp < 0) {
+        low = mid + 1;
+      } else if (cmp > 0) {
+        high = mid - 1;
+      } else {
+        return mid; // key found
+      }
+    }
+    return -1;  // key not found
   }
 
   public List<String> getAllDevices() throws IOException {
@@ -489,7 +534,7 @@ public class TsFileSequenceReader implements AutoCloseable {
 
   private List<TimeseriesMetadata> getDeviceTimeseriesMetadata(String device) throws IOException {
     MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex();
-    Pair<MetadataIndexEntry, Long> metadataIndexPair = getMetaDataAndEndOffset(
+    Pair<MetadataIndexEntry, Long> metadataIndexPair = getMetadataAndEndOffset(
         metadataIndexNode, device, MetadataIndexNodeType.INTERNAL_DEVICE);
     ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right);
     Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap = new TreeMap<>();
@@ -501,14 +546,14 @@ public class TsFileSequenceReader implements AutoCloseable {
     return deviceTimeseriesMetadata;
   }
 
-  private Pair<MetadataIndexEntry, Long> getMetaDataAndEndOffset(MetadataIndexNode metadataIndex,
+  private Pair<MetadataIndexEntry, Long> getMetadataAndEndOffset(MetadataIndexNode metadataIndex,
       String name, MetadataIndexNodeType type) throws IOException {
     Pair<MetadataIndexEntry, Long> childIndexEntry = metadataIndex.getChildIndexEntry(name);
     if (!childIndexEntry.left.getChildNodeType().equals(type)) {
       return childIndexEntry;
     }
     ByteBuffer buffer = readData(childIndexEntry.left.getOffset(), childIndexEntry.right);
-    return getMetaDataAndEndOffset(MetadataIndexNode.deserializeFrom(buffer), name, type);
+    return getMetadataAndEndOffset(MetadataIndexNode.deserializeFrom(buffer), name, type);
   }
 
   /**
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java
index bb58ec4..02582f0 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/controller/MetadataQuerierByFileImpl.java
@@ -115,13 +115,11 @@ public class MetadataQuerierByFileImpl implements IMetadataQuerier {
         continue;
       }
 
-      Map<String, TimeseriesMetadata> timeseriesMetaDataInDevice = tsFileReader
-          .readDeviceMetadata(selectedDevice);
+      List<TimeseriesMetadata> timeseriesMetaDataList = tsFileReader
+          .readTimeseriesMetadata(selectedDevice, selectedMeasurements);
       List<ChunkMetadata> chunkMetadataList = new ArrayList<>();
-      for (Map.Entry<String, TimeseriesMetadata> entry : timeseriesMetaDataInDevice.entrySet()) {
-        if (selectedMeasurements.contains(entry.getKey())) {
-          chunkMetadataList.addAll(tsFileReader.readChunkMetaDataList(entry.getValue()));
-        }
+      for (TimeseriesMetadata timeseriesMetadata : timeseriesMetaDataList) {
+        chunkMetadataList.addAll(tsFileReader.readChunkMetaDataList(timeseriesMetadata));
       }
       // d1
       for (ChunkMetadata chunkMetaData : chunkMetadataList) {