You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/08/08 07:49:54 UTC
[iotdb] branch master updated: [IOTDB-1372] delete devices field in
FileTimeIndex (#3606)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4dcae2b [IOTDB-1372] delete devices field in FileTimeIndex (#3606)
4dcae2b is described below
commit 4dcae2b69924c8cd5deac7fd5af78d16f6394249
Author: Yuting Yan <56...@users.noreply.github.com>
AuthorDate: Sun Aug 8 02:49:23 2021 -0500
[IOTDB-1372] delete devices field in FileTimeIndex (#3606)
---
example/mqtt-customize/pom.xml | 1 -
.../db/engine/storagegroup/TsFileResource.java | 6 +-
.../storagegroup/timeindex/DeviceTimeIndex.java | 2 +-
.../storagegroup/timeindex/FileTimeIndex.java | 71 ++++++----------------
.../engine/storagegroup/timeindex/ITimeIndex.java | 2 +-
5 files changed, 22 insertions(+), 60 deletions(-)
diff --git a/example/mqtt-customize/pom.xml b/example/mqtt-customize/pom.xml
index 7098667..c524fb0 100644
--- a/example/mqtt-customize/pom.xml
+++ b/example/mqtt-customize/pom.xml
@@ -37,6 +37,5 @@
<artifactId>iotdb-server</artifactId>
<version>${project.version}</version>
</dependency>
-
</dependencies>
</project>
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index f0c7d18..b6f07d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -183,7 +183,7 @@ public class TsFileResource {
this.timeIndexType = (byte) config.getTimeIndexLevel().ordinal();
}
- /** unsealed TsFile */
+ /** unsealed TsFile, for writter */
public TsFileResource(File file, TsFileProcessor processor) {
this.file = file;
this.version = FilePathUtils.splitAndGetTsFileVersion(this.file.getName());
@@ -192,7 +192,7 @@ public class TsFileResource {
this.processor = processor;
}
- /** unsealed TsFile */
+ /** unsealed TsFile, for query */
public TsFileResource(
List<ReadOnlyMemChunk> readOnlyMemChunk,
List<IChunkMetadata> chunkMetadataList,
@@ -469,7 +469,7 @@ public class TsFileResource {
}
public Set<String> getDevices() {
- return timeIndex.getDevices();
+ return timeIndex.getDevices(file.getPath());
}
public boolean endTimeEmpty() {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
index b93cf87..013f111 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/DeviceTimeIndex.java
@@ -146,7 +146,7 @@ public class DeviceTimeIndex implements ITimeIndex {
}
@Override
- public Set<String> getDevices() {
+ public Set<String> getDevices(String tsFilePath) {
return deviceToIndex.keySet();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
index 9cc46eb..d427f34 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/FileTimeIndex.java
@@ -21,26 +21,25 @@ package org.apache.iotdb.db.engine.storagegroup.timeindex;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.PartitionViolationException;
-import org.apache.iotdb.db.rescon.CachedStringPool;
+import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.utils.FilePathUtils;
-import org.apache.iotdb.db.utils.SerializeUtils;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.utils.RamUsageEstimator;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
-import io.netty.util.internal.ConcurrentSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
public class FileTimeIndex implements ITimeIndex {
- protected static final Map<String, String> cachedDevicePool =
- CachedStringPool.getInstance().getCachedPool();
+ private static final Logger logger = LoggerFactory.getLogger(FileTimeIndex.class);
/** start time */
protected long startTime;
@@ -48,64 +47,31 @@ public class FileTimeIndex implements ITimeIndex {
/** end times. The value is Long.MIN_VALUE if it's an unsealed sequence tsfile */
protected long endTime;
- /** devices */
- protected Set<String> devices;
-
public FileTimeIndex() {
- this.devices = new ConcurrentSet<>();
this.startTime = Long.MAX_VALUE;
this.endTime = Long.MIN_VALUE;
}
- public FileTimeIndex(Set<String> devices, long startTime, long endTime) {
+ public FileTimeIndex(long startTime, long endTime) {
this.startTime = startTime;
this.endTime = endTime;
- this.devices = devices;
}
@Override
public void serialize(OutputStream outputStream) throws IOException {
- ReadWriteIOUtils.write(devices.size(), outputStream);
- Set<String> stringMemoryReducedSet = new ConcurrentSet<>();
- for (String device : devices) {
- // To reduce the String number in memory,
- // use the deviceId from cached pool
- stringMemoryReducedSet.add(cachedDevicePool.computeIfAbsent(device, k -> k));
- ReadWriteIOUtils.write(device, outputStream);
- }
ReadWriteIOUtils.write(startTime, outputStream);
ReadWriteIOUtils.write(endTime, outputStream);
- devices = stringMemoryReducedSet;
}
@Override
public FileTimeIndex deserialize(InputStream inputStream) throws IOException {
- int size = ReadWriteIOUtils.readInt(inputStream);
- Set<String> deviceSet = new HashSet<>();
- for (int i = 0; i < size; i++) {
- String path = ReadWriteIOUtils.readString(inputStream);
- // To reduce the String number in memory,
- // use the deviceId from memory instead of the deviceId read from disk
- String cachedPath = cachedDevicePool.computeIfAbsent(path, k -> k);
- deviceSet.add(cachedPath);
- }
return new FileTimeIndex(
- deviceSet, ReadWriteIOUtils.readLong(inputStream), ReadWriteIOUtils.readLong(inputStream));
+ ReadWriteIOUtils.readLong(inputStream), ReadWriteIOUtils.readLong(inputStream));
}
@Override
public FileTimeIndex deserialize(ByteBuffer buffer) {
- int size = buffer.getInt();
- Set<String> deviceSet = new HashSet<>(size);
-
- for (int i = 0; i < size; i++) {
- String path = SerializeUtils.deserializeString(buffer);
- // To reduce the String number in memory,
- // use the deviceId from memory instead of the deviceId read from disk
- String cachedPath = cachedDevicePool.computeIfAbsent(path, k -> k);
- deviceSet.add(cachedPath);
- }
- return new FileTimeIndex(deviceSet, buffer.getLong(), buffer.getLong());
+ return new FileTimeIndex(buffer.getLong(), buffer.getLong());
}
@Override
@@ -114,8 +80,14 @@ public class FileTimeIndex implements ITimeIndex {
}
@Override
- public Set<String> getDevices() {
- return devices;
+ public Set<String> getDevices(String tsFilePath) {
+ try {
+ TsFileSequenceReader fileReader = FileReaderManager.getInstance().get(tsFilePath, true);
+ return new HashSet<>(fileReader.getAllDevices());
+ } catch (IOException e) {
+ logger.error("Can't read file {} from disk ", tsFilePath, e);
+ throw new RuntimeException("Can't read file " + tsFilePath + " from disk");
+ }
}
@Override
@@ -134,17 +106,12 @@ public class FileTimeIndex implements ITimeIndex {
@Override
public long calculateRamSize() {
- return RamUsageEstimator.sizeOf(devices)
- + RamUsageEstimator.sizeOf(startTime)
- + RamUsageEstimator.sizeOf(endTime);
+ return RamUsageEstimator.sizeOf(startTime) + RamUsageEstimator.sizeOf(endTime);
}
@Override
public long getTimePartition(String tsFilePath) {
try {
- if (devices != null && !devices.isEmpty()) {
- return StorageEngine.getTimePartition(startTime);
- }
String[] filePathSplits = FilePathUtils.splitTsFilePath(tsFilePath);
return Long.parseLong(filePathSplits[filePathSplits.length - 2]);
} catch (NumberFormatException e) {
@@ -178,7 +145,6 @@ public class FileTimeIndex implements ITimeIndex {
@Override
public void updateStartTime(String deviceId, long time) {
- devices.add(deviceId);
if (this.startTime > time) {
this.startTime = time;
}
@@ -186,7 +152,6 @@ public class FileTimeIndex implements ITimeIndex {
@Override
public void updateEndTime(String deviceId, long time) {
- devices.add(deviceId);
if (this.endTime < time) {
this.endTime = time;
}
@@ -194,13 +159,11 @@ public class FileTimeIndex implements ITimeIndex {
@Override
public void putStartTime(String deviceId, long time) {
- devices.add(deviceId);
this.startTime = time;
}
@Override
public void putEndTime(String deviceId, long time) {
- devices.add(deviceId);
this.endTime = time;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
index 3710362..d53dd48 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/timeindex/ITimeIndex.java
@@ -62,7 +62,7 @@ public interface ITimeIndex {
*
* @return device names
*/
- Set<String> getDevices();
+ Set<String> getDevices(String tsFilePath);
/** @return whether end time is empty (Long.MIN_VALUE) */
boolean endTimeEmpty();