You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/08/08 07:49:54 UTC

[iotdb] branch master updated: [IOTDB-1372] delete devices field in FileTimeIndex (#3606)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4dcae2b  [IOTDB-1372] delete devices field in FileTimeIndex (#3606)
4dcae2b is described below

commit 4dcae2b69924c8cd5deac7fd5af78d16f6394249
Author: Yuting Yan <56...@users.noreply.github.com>
AuthorDate: Sun Aug 8 02:49:23 2021 -0500

    [IOTDB-1372] delete devices field in FileTimeIndex (#3606)
---
 example/mqtt-customize/pom.xml                     |  1 -
 .../db/engine/storagegroup/TsFileResource.java     |  6 +-
 .../storagegroup/timeindex/DeviceTimeIndex.java    |  2 +-
 .../storagegroup/timeindex/FileTimeIndex.java      | 71 ++++++----------------
 .../engine/storagegroup/timeindex/ITimeIndex.java  |  2 +-
 5 files changed, 22 insertions(+), 60 deletions(-)

diff --git a/example/mqtt-customize/pom.xml b/example/mqtt-customize/pom.xml
index 7098667..c524fb0 100644
--- a/example/mqtt-customize/pom.xml
+++ b/example/mqtt-customize/pom.xml
@@ -37,6 +37,5 @@
             <artifactId>iotdb-server</artifactId>
             <version>${project.version}</version>
         </dependency>
-
     </dependencies>
 </project>
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index f0c7d18..b6f07d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -183,7 +183,7 @@ public class TsFileResource {
     this.timeIndexType = (byte) config.getTimeIndexLevel().ordinal();
   }
 
-  /** unsealed TsFile */
+  /** unsealed TsFile, for writter */
   public TsFileResource(File file, TsFileProcessor processor) {
     this.file = file;
     this.version = FilePathUtils.splitAndGetTsFileVersion(this.file.getName());
@@ -192,7 +192,7 @@ public class TsFileResource {
     this.processor = processor;
   }
 
-  /** unsealed TsFile */
+  /** unsealed TsFile, for query */
   public TsFileResource(
       List<ReadOnlyMemChunk> readOnlyMemChunk,
       List<IChunkMetadata> chunkMetadataList,
@@ -469,7 +469,7 @@ public class TsFileResource {
   }
 
   public Set<String> getDevices() {
-    return timeIndex.getDevices();
+    return timeIndex.getDevices(file.getPath());
   }
 
   public boolean endTimeEmpty() {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
index b93cf87..013f111 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
@@ -146,7 +146,7 @@ public class DeviceTimeIndex implements ITimeIndex {
   }
 
   @Override
-  public Set<String> getDevices() {
+  public Set<String> getDevices(String tsFilePath) {
     return deviceToIndex.keySet();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
index 9cc46eb..d427f34 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
@@ -21,26 +21,25 @@ package org.apache.iotdb.db.engine.storagegroup.timeindex;
 
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.exception.PartitionViolationException;
-import org.apache.iotdb.db.rescon.CachedStringPool;
+import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.utils.FilePathUtils;
-import org.apache.iotdb.db.utils.SerializeUtils;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.utils.RamUsageEstimator;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import io.netty.util.internal.ConcurrentSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
 
 public class FileTimeIndex implements ITimeIndex {
 
-  protected static final Map<String, String> cachedDevicePool =
-      CachedStringPool.getInstance().getCachedPool();
+  private static final Logger logger = LoggerFactory.getLogger(FileTimeIndex.class);
 
   /** start time */
   protected long startTime;
@@ -48,64 +47,31 @@ public class FileTimeIndex implements ITimeIndex {
   /** end times. The value is Long.MIN_VALUE if it's an unsealed sequence tsfile */
   protected long endTime;
 
-  /** devices */
-  protected Set<String> devices;
-
   public FileTimeIndex() {
-    this.devices = new ConcurrentSet<>();
     this.startTime = Long.MAX_VALUE;
     this.endTime = Long.MIN_VALUE;
   }
 
-  public FileTimeIndex(Set<String> devices, long startTime, long endTime) {
+  public FileTimeIndex(long startTime, long endTime) {
     this.startTime = startTime;
     this.endTime = endTime;
-    this.devices = devices;
   }
 
   @Override
   public void serialize(OutputStream outputStream) throws IOException {
-    ReadWriteIOUtils.write(devices.size(), outputStream);
-    Set<String> stringMemoryReducedSet = new ConcurrentSet<>();
-    for (String device : devices) {
-      // To reduce the String number in memory,
-      // use the deviceId from cached pool
-      stringMemoryReducedSet.add(cachedDevicePool.computeIfAbsent(device, k -> k));
-      ReadWriteIOUtils.write(device, outputStream);
-    }
     ReadWriteIOUtils.write(startTime, outputStream);
     ReadWriteIOUtils.write(endTime, outputStream);
-    devices = stringMemoryReducedSet;
   }
 
   @Override
   public FileTimeIndex deserialize(InputStream inputStream) throws IOException {
-    int size = ReadWriteIOUtils.readInt(inputStream);
-    Set<String> deviceSet = new HashSet<>();
-    for (int i = 0; i < size; i++) {
-      String path = ReadWriteIOUtils.readString(inputStream);
-      // To reduce the String number in memory,
-      // use the deviceId from memory instead of the deviceId read from disk
-      String cachedPath = cachedDevicePool.computeIfAbsent(path, k -> k);
-      deviceSet.add(cachedPath);
-    }
     return new FileTimeIndex(
-        deviceSet, ReadWriteIOUtils.readLong(inputStream), ReadWriteIOUtils.readLong(inputStream));
+        ReadWriteIOUtils.readLong(inputStream), ReadWriteIOUtils.readLong(inputStream));
   }
 
   @Override
   public FileTimeIndex deserialize(ByteBuffer buffer) {
-    int size = buffer.getInt();
-    Set<String> deviceSet = new HashSet<>(size);
-
-    for (int i = 0; i < size; i++) {
-      String path = SerializeUtils.deserializeString(buffer);
-      // To reduce the String number in memory,
-      // use the deviceId from memory instead of the deviceId read from disk
-      String cachedPath = cachedDevicePool.computeIfAbsent(path, k -> k);
-      deviceSet.add(cachedPath);
-    }
-    return new FileTimeIndex(deviceSet, buffer.getLong(), buffer.getLong());
+    return new FileTimeIndex(buffer.getLong(), buffer.getLong());
   }
 
   @Override
@@ -114,8 +80,14 @@ public class FileTimeIndex implements ITimeIndex {
   }
 
   @Override
-  public Set<String> getDevices() {
-    return devices;
+  public Set<String> getDevices(String tsFilePath) {
+    try {
+      TsFileSequenceReader fileReader = FileReaderManager.getInstance().get(tsFilePath, true);
+      return new HashSet<>(fileReader.getAllDevices());
+    } catch (IOException e) {
+      logger.error("Can't read file {} from disk ", tsFilePath, e);
+      throw new RuntimeException("Can't read file " + tsFilePath + " from disk");
+    }
   }
 
   @Override
@@ -134,17 +106,12 @@ public class FileTimeIndex implements ITimeIndex {
 
   @Override
   public long calculateRamSize() {
-    return RamUsageEstimator.sizeOf(devices)
-        + RamUsageEstimator.sizeOf(startTime)
-        + RamUsageEstimator.sizeOf(endTime);
+    return RamUsageEstimator.sizeOf(startTime) + RamUsageEstimator.sizeOf(endTime);
   }
 
   @Override
   public long getTimePartition(String tsFilePath) {
     try {
-      if (devices != null && !devices.isEmpty()) {
-        return StorageEngine.getTimePartition(startTime);
-      }
       String[] filePathSplits = FilePathUtils.splitTsFilePath(tsFilePath);
       return Long.parseLong(filePathSplits[filePathSplits.length - 2]);
     } catch (NumberFormatException e) {
@@ -178,7 +145,6 @@ public class FileTimeIndex implements ITimeIndex {
 
   @Override
   public void updateStartTime(String deviceId, long time) {
-    devices.add(deviceId);
     if (this.startTime > time) {
       this.startTime = time;
     }
@@ -186,7 +152,6 @@ public class FileTimeIndex implements ITimeIndex {
 
   @Override
   public void updateEndTime(String deviceId, long time) {
-    devices.add(deviceId);
     if (this.endTime < time) {
       this.endTime = time;
     }
@@ -194,13 +159,11 @@ public class FileTimeIndex implements ITimeIndex {
 
   @Override
   public void putStartTime(String deviceId, long time) {
-    devices.add(deviceId);
     this.startTime = time;
   }
 
   @Override
   public void putEndTime(String deviceId, long time) {
-    devices.add(deviceId);
     this.endTime = time;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
index 3710362..d53dd48 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
@@ -62,7 +62,7 @@ public interface ITimeIndex {
    *
    * @return device names
    */
-  Set<String> getDevices();
+  Set<String> getDevices(String tsFilePath);
 
   /** @return whether end time is empty (Long.MIN_VALUE) */
   boolean endTimeEmpty();