You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ch...@apache.org on 2020/11/25 01:02:47 UTC
[iotdb] 03/05: fix conflict
This is an automated email from the ASF dual-hosted git repository.
chaow pushed a commit to branch fileIndex
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e4b39e68755c3264e357a9a740d40f8bc1e22041
Author: chaow <xu...@gmail.com>
AuthorDate: Mon Sep 7 15:53:29 2020 +0800
fix conflict
---
.../resources/conf/iotdb-engine.properties | 7 ++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 8 ++
.../iotdb/db/engine/merge/task/MergeFileTask.java | 11 +-
.../db/engine/merge/task/RecoverMergeTask.java | 5 +-
.../engine/storagegroup/StorageGroupProcessor.java | 88 ++++++++-------
.../db/engine/storagegroup/TsFileProcessor.java | 24 ++--
.../db/engine/storagegroup/TsFileResource.java | 2 -
.../apache/iotdb/db/timeIndex/IndexerManager.java | 53 +++++----
.../{device/DeviceIndex.java => TimeIndex.java} | 16 +--
.../org/apache/iotdb/db/timeIndex/TimeIndexer.java | 27 ++++-
.../timeIndex/{device => }/UpdateIndexsParam.java | 16 +--
.../db/timeIndex/device/DeviceTimeIndexer.java | 124 ---------------------
.../db/timeIndex/device/LoadAllTimeIndexer.java | 20 ++--
.../timeIndex/device/RocksDBDeviceTimeIndexer.java | 20 ++--
14 files changed, 182 insertions(+), 239 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index aae3444..4b33bfc 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -229,6 +229,7 @@ write_read_schema_free_memory_proportion=4:3:1:2
# primitive array size (length of each array) in array pool
primitive_array_size=128
+<<<<<<< HEAD
# Ratio of write memory for invoking flush disk, 0.4 by default
flush_proportion=0.4
@@ -253,6 +254,12 @@ max_waiting_time_when_insert_blocked=0
# estimated metadata size (in byte) of one timeseries in Mtree
estimated_series_size=300
+=======
+# device time indexer
+enable_device_indexer=false
+# 0, indexer base on file; 1, indexer base on rocksdb
+device_indexer_type=0
+>>>>>>> fix conflict
####################
### Upgrade Configurations
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 570272a..8b559db 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -517,6 +517,14 @@ public class IoTDBDescriptor {
conf.setDebugState(Boolean.parseBoolean(properties
.getProperty("debug_state", String.valueOf(conf.isDebugOn()))));
+ conf.setEnableDeviceIndexer((Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_device_indexer", String.valueOf(conf.isEnableDeviceIndexer())))));
+
+ conf.setDeviceIndexerType((Integer.parseInt(
+ properties.getProperty(
+ "device_indexer_type", String.valueOf(conf.getDeviceIndexerType())))));
+
// mqtt
if (properties.getProperty(IoTDBConstant.MQTT_HOST_NAME) != null) {
conf.setMqttHost(properties.getProperty(IoTDBConstant.MQTT_HOST_NAME));
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
index 42486ca..ac645d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
@@ -38,12 +38,11 @@ import org.apache.iotdb.db.engine.merge.manage.MergeContext;
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.query.control.FileReaderManager;
-import org.apache.iotdb.db.timeIndex.device.DeviceIndex;
-import org.apache.iotdb.db.timeIndex.device.DeviceTimeIndexer;
import org.apache.iotdb.db.timeIndex.IndexerManager;
-import org.apache.iotdb.db.timeIndex.device.UpdateIndexsParam;
+import org.apache.iotdb.db.timeIndex.TimeIndexer;
import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
@@ -85,7 +84,7 @@ class MergeFileTask {
this.unmergedFiles = unmergedSeqFiles;
}
- void mergeFiles() throws IOException {
+ void mergeFiles() throws IOException, IllegalPathException {
// decide whether to write the unmerged chunks to the merge files or to move the merged chunks
// back to the origin seqFile's
if (logger.isInfoEnabled()) {
@@ -206,8 +205,8 @@ class MergeFileTask {
if (IoTDBDescriptor.getInstance().getConfig().isEnableDeviceIndexer()) {
// add new device index
// may Indexer need delete old index background
- DeviceTimeIndexer deviceTimeIndexer = IndexerManager.getInstance().getSeqIndexer(seqFile.getStorageGroupName());
- deviceTimeIndexer.addIndexForDevices(seqFile.getDeviceToIndexMap(), seqFile.getStartTimes(), seqFile.getEndTimes(), seqFile.getTsFilePath());
+ TimeIndexer timeIndexer = IndexerManager.getInstance().getSeqIndexer(seqFile.getStorageGroupName());
+ timeIndexer.addIndexForPaths(seqFile.getDeviceToIndexMap(), seqFile.getStartTimes(), seqFile.getEndTimes(), seqFile.getTsFilePath());
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
index 1607797..cec9aea 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/RecoverMergeTask.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.engine.merge.recover.LogAnalyzer.Status;
import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
import org.apache.iotdb.db.engine.merge.selector.MaxSeriesMergeFileSelector;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.utils.MergeUtils;
@@ -94,7 +95,7 @@ public class RecoverMergeTask extends MergeTask {
}
}
- private void resumeAfterFilesLogged(boolean continueMerge) throws IOException {
+ private void resumeAfterFilesLogged(boolean continueMerge) throws IOException, IllegalPathException {
if (continueMerge) {
resumeMergeProgress();
calculateConcurrentSeriesNum();
@@ -115,7 +116,7 @@ public class RecoverMergeTask extends MergeTask {
cleanUp(continueMerge);
}
- private void resumeAfterAllTsMerged(boolean continueMerge) throws IOException {
+ private void resumeAfterAllTsMerged(boolean continueMerge) throws IOException, IllegalPathException {
if (continueMerge) {
resumeMergeProgress();
MergeFileTask mergeFileTask = new MergeFileTask(taskName, mergeContext, mergeLogger, resource,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 90b3769..11dcf16 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -83,7 +83,7 @@ import org.apache.iotdb.db.query.control.QueryFileManager;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.service.UpgradeSevice;
import org.apache.iotdb.db.timeIndex.IndexerManager;
-import org.apache.iotdb.db.timeIndex.device.DeviceTimeIndexer;
+import org.apache.iotdb.db.timeIndex.TimeIndexer;
import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
import org.apache.iotdb.rpc.RpcUtils;
@@ -270,8 +270,8 @@ public class StorageGroupProcessor {
* unseqDeviceTimeIndexer manage the device Index of unseq tsfiles
*
*/
- private DeviceTimeIndexer seqDeviceTimeIndexer;
- private DeviceTimeIndexer unseqDeviceTimeIndexer;
+ private TimeIndexer seqTimeIndexer;
+ private TimeIndexer unseqTimeIndexer;
public StorageGroupProcessor(String systemDir, String storageGroupName,
TsFileFlushPolicy fileFlushPolicy) throws StorageGroupProcessorException {
@@ -387,11 +387,15 @@ public class StorageGroupProcessor {
if (IoTDBDescriptor.getInstance().getConfig().isEnableDeviceIndexer()) {
// init TimeIndexer
- seqDeviceTimeIndexer = IndexerManager.getInstance().getSeqIndexer(storageGroupName);
- unseqDeviceTimeIndexer = IndexerManager.getInstance().getUnseqIndexer(storageGroupName);
- if (seqDeviceTimeIndexer != null) {
- seqDeviceTimeIndexer.init();
- unseqDeviceTimeIndexer.init();
+ try {
+ seqTimeIndexer = IndexerManager.getInstance().getSeqIndexer(storageGroupName);
+ unseqTimeIndexer = IndexerManager.getInstance().getUnseqIndexer(storageGroupName);
+ if (seqTimeIndexer != null) {
+ seqTimeIndexer.init();
+ unseqTimeIndexer.init();
+ }
+ } catch (IllegalPathException e) {
+ throw new StorageGroupProcessorException(e);
}
}
@@ -1300,21 +1304,19 @@ public class StorageGroupProcessor {
tsFileManagement.remove(resource, isSeq);
if (isSeq) {
if (IoTDBDescriptor.getInstance().getConfig().isEnableDeviceIndexer()) {
- DeviceTimeIndexer deviceTimeIndexer = IndexerManager.getInstance().getSeqIndexer(resource.getStorageGroupName());
- if (deviceTimeIndexer != null) {
- deviceTimeIndexer.deleteIndexForDevices(resource.getDeviceToIndexMap(), resource.getStartTimes(),
- resource.getEndTimes(), resource.getTsFilePath());
- }
+ TimeIndexer timeIndexer = IndexerManager.getInstance().getSeqIndexer(resource.getStorageGroupName());
+ timeIndexer.deleteIndexForPaths(resource.getDeviceToIndexMap(), resource.getStartTimes(),
+ resource.getEndTimes(), resource.getTsFilePath());
}
} else {
if (IoTDBDescriptor.getInstance().getConfig().isEnableDeviceIndexer()) {
- DeviceTimeIndexer deviceTimeIndexer = IndexerManager.getInstance().getUnseqIndexer(resource.getStorageGroupName());
- if (deviceTimeIndexer != null) {
- deviceTimeIndexer.deleteIndexForDevices(resource.getDeviceToIndexMap(), resource.getStartTimes(),
+ TimeIndexer timeIndexer = IndexerManager.getInstance().getUnseqIndexer(resource.getStorageGroupName());
+ timeIndexer.deleteIndexForPaths(resource.getDeviceToIndexMap(), resource.getStartTimes(),
resource.getEndTimes(), resource.getTsFilePath());
- }
}
}
+ } catch (IllegalPathException e) {
+ logger.error("Fail to get DeviceTimeIndexer for storage group {}, err:{}", resource.getStorageGroupName(), e.getMessage());
} finally {
resource.writeUnlock();
}
@@ -1402,8 +1404,8 @@ public class StorageGroupProcessor {
List<TsFileResource> unseqResources;
try {
if (IoTDBDescriptor.getInstance().getConfig().isEnableDeviceIndexer()) {
- seqResources = seqDeviceTimeIndexer.filterByOneDevice(deviceId, timeFilter);
- unseqResources = unseqDeviceTimeIndexer.filterByOneDevice(deviceId, timeFilter);
+ seqResources = seqTimeIndexer.filterByPath(deviceId, timeFilter);
+ unseqResources = unseqTimeIndexer.filterByPath(deviceId, timeFilter);
List<TsFileResource> unsealedSeqFiles = getUnSealedListResourceForQuery(
tsFileManagement.getTsFileList(true),
deviceId, measurementId, context, timeFilter, true);
@@ -1414,7 +1416,7 @@ public class StorageGroupProcessor {
unsealedSeqFiles.addAll(unsealedUnseqFiles);
} else {
seqResources = getFileResourceListForQuery(tsFileManagement.getTsFileList(true),
- upgradeSeqFileList, deviceId, measurementId, context, timeFilter, true);
+ upgradeSeqFileList, deviceId, measurementId, context, timeFilter, true);
unseqResources = getFileResourceListForQuery(tsFileManagement.getTsFileList(false),
upgradeUnseqFileList, deviceId, measurementId, context, timeFilter, false);
}
@@ -1894,16 +1896,16 @@ public class StorageGroupProcessor {
updateLatestTimeMap(newTsFileResource);
}
if (IoTDBDescriptor.getInstance().getConfig().isEnableDeviceIndexer()) {
- DeviceTimeIndexer deviceTimeIndexer = null;
+ TimeIndexer timeIndexer = null;
if (newTsFileResource.isSeq()) {
- deviceTimeIndexer = IndexerManager.getInstance().getSeqIndexer(newTsFileResource.getStorageGroupName());
+ timeIndexer = IndexerManager.getInstance().getSeqIndexer(newTsFileResource.getStorageGroupName());
} else {
- deviceTimeIndexer = IndexerManager.getInstance().getUnseqIndexer(newTsFileResource.getStorageGroupName());
+ timeIndexer = IndexerManager.getInstance().getUnseqIndexer(newTsFileResource.getStorageGroupName());
}
- deviceTimeIndexer.addIndexForDevices(newTsFileResource.getDeviceToIndexMap(), newTsFileResource.getStartTimes(),
- newTsFileResource.getEndTimes(), newTsFileResource.getTsFilePath());
+ timeIndexer.addIndexForPaths(newTsFileResource.getDeviceToIndexMap(), newTsFileResource.getStartTimes(),
+ newTsFileResource.getEndTimes(), newTsFileResource.getTsFilePath());
}
- } catch (DiskSpaceInsufficientException e) {
+ } catch (DiskSpaceInsufficientException | IllegalPathException e) {
logger.error(
"Failed to append the tsfile {} to storage group processor {} because the disk space is insufficient.",
tsfileToBeInserted.getAbsolutePath(), tsfileToBeInserted.getParentFile().getName());
@@ -1964,14 +1966,20 @@ public class StorageGroupProcessor {
}
if (IoTDBDescriptor.getInstance().getConfig().isEnableDeviceIndexer()) {
- DeviceTimeIndexer deviceTimeIndexer = null;
- if (newTsFileResource.isSeq()) {
- deviceTimeIndexer = IndexerManager.getInstance().getSeqIndexer(newTsFileResource.getStorageGroupName());
- } else {
- deviceTimeIndexer = IndexerManager.getInstance().getUnseqIndexer(newTsFileResource.getStorageGroupName());
+ try {
+ TimeIndexer timeIndexer = null;
+ if (newTsFileResource.isSeq()) {
+ timeIndexer = IndexerManager.getInstance().getSeqIndexer(newTsFileResource.getStorageGroupName());
+ } else {
+ timeIndexer = IndexerManager.getInstance().getUnseqIndexer(newTsFileResource.getStorageGroupName());
+ }
+ timeIndexer.addIndexForPaths(newTsFileResource.getDeviceToIndexMap(), newTsFileResource.getStartTimes(),
+ newTsFileResource.getEndTimes(), newTsFileResource.getTsFilePath());
+ } catch (IllegalPathException e) {
+ logger.error("Fail to get DeviceTimeIndexer for storage group {}, err:{}", newTsFileResource.getStorageGroupName(), e.getMessage());
+ IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
+ throw new LoadFileException(e);
}
- deviceTimeIndexer.addIndexForDevices(newTsFileResource.getDeviceToIndexMap(), newTsFileResource.getStartTimes(),
- newTsFileResource.getEndTimes(), newTsFileResource.getTsFilePath());
}
// update latest time map
@@ -2375,17 +2383,21 @@ public class StorageGroupProcessor {
try {
tsFileResourceToBeDeleted.remove();
if (IoTDBDescriptor.getInstance().getConfig().isEnableDeviceIndexer()) {
- DeviceTimeIndexer deviceTimeIndexer = null;
+ try {
+ TimeIndexer timeIndexer = null;
if (tsFileResourceToBeDeleted.isSeq()) {
- deviceTimeIndexer = IndexerManager.getInstance().getSeqIndexer(tsFileResourceToBeDeleted.getStorageGroupName());
+ timeIndexer = IndexerManager.getInstance().getSeqIndexer(tsFileResourceToBeDeleted.getStorageGroupName());
} else {
- deviceTimeIndexer = IndexerManager.getInstance().getUnseqIndexer(tsFileResourceToBeDeleted.getStorageGroupName());
+ timeIndexer = IndexerManager.getInstance().getUnseqIndexer(tsFileResourceToBeDeleted.getStorageGroupName());
}
- if (deviceTimeIndexer != null) {
- deviceTimeIndexer.deleteIndexForDevices(tsFileResourceToBeDeleted.getDeviceToIndexMap(),
+ if (timeIndexer != null) {
+ timeIndexer.deleteIndexForPaths(tsFileResourceToBeDeleted.getDeviceToIndexMap(),
tsFileResourceToBeDeleted.getStartTimes(), tsFileResourceToBeDeleted.getEndTimes(),
tsFileResourceToBeDeleted.getTsFilePath());
}
+ } catch (IllegalPathException e) {
+ return false;
+ }
}
logger.info("Delete tsfile {} successfully.", tsFileResourceToBeDeleted.getTsFile());
} finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 8711bd5..d44c281 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -50,7 +50,11 @@ import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.UpdateEndTi
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.exception.WriteProcessException;
+<<<<<<< HEAD
import org.apache.iotdb.db.exception.metadata.MetadataException;
+=======
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+>>>>>>> fix conflict
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
@@ -59,8 +63,8 @@ import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.utils.MemUtils;
-import org.apache.iotdb.db.timeIndex.device.DeviceTimeIndexer;
import org.apache.iotdb.db.timeIndex.IndexerManager;
+import org.apache.iotdb.db.timeIndex.TimeIndexer;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.db.writelog.WALFlushListener;
@@ -816,12 +820,18 @@ public class TsFileProcessor {
long closeStartTime = System.currentTimeMillis();
if (IoTDBDescriptor.getInstance().getConfig().isEnableDeviceIndexer()) {
// update device index
- if (sequence) {
- DeviceTimeIndexer seqIndexer = IndexerManager.getInstance().getSeqIndexer(storageGroupName);
- seqIndexer.addIndexForDevices(tsFileResource.deviceToIndex, tsFileResource.startTimes, tsFileResource.endTimes, tsFileResource.getTsFilePath());
- } else {
- DeviceTimeIndexer unseqIndexer = IndexerManager.getInstance().getUnseqIndexer(storageGroupName);
- unseqIndexer.addIndexForDevices(tsFileResource.deviceToIndex, tsFileResource.startTimes, tsFileResource.endTimes, tsFileResource.getTsFilePath());
+ try {
+ TimeIndexer timeIndexer = null;
+ if (sequence) {
+ timeIndexer = IndexerManager.getInstance().getSeqIndexer(storageGroupName);
+ } else {
+ timeIndexer = IndexerManager.getInstance().getUnseqIndexer(storageGroupName);
+ }
+ timeIndexer.addIndexForPaths(tsFileResource.deviceToIndex, tsFileResource.startTimes,
+ tsFileResource.endTimes, tsFileResource.getTsFilePath());
+ } catch (IllegalPathException e) {
+ logger.error("Failed to endFile {} for storage group {}, err:{}", tsFileResource.getTsFile(),
+ tsFileResource.getStorageGroupName(), e.getMessage());
}
}
tsFileResource.serialize();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 66d42d9..ca2b819 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -46,7 +46,6 @@ import org.apache.iotdb.db.engine.upgrade.UpgradeTask;
import org.apache.iotdb.db.exception.PartitionViolationException;
import org.apache.iotdb.db.rescon.CachedStringPool;
import org.apache.iotdb.db.service.UpgradeSevice;
-import org.apache.iotdb.db.timeIndex.device.DeviceTimeIndexer;
import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
@@ -167,7 +166,6 @@ public class TsFileResource {
*/
private long minPlanIndex = Long.MAX_VALUE;
- private DeviceTimeIndexer deviceTimeIndexer;
private String storageGroupName;
diff --git a/server/src/main/java/org/apache/iotdb/db/timeIndex/IndexerManager.java b/server/src/main/java/org/apache/iotdb/db/timeIndex/IndexerManager.java
index 504dfd4..eb09ab8 100644
--- a/server/src/main/java/org/apache/iotdb/db/timeIndex/IndexerManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/timeIndex/IndexerManager.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.timeIndex;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.timeIndex.device.DeviceTimeIndexer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,8 +31,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public class IndexerManager {
private String indexerFilePath;
- private Map<PartialPath, DeviceTimeIndexer> seqIndexers;
- private Map<PartialPath, DeviceTimeIndexer> unseqIndexers;
+ private Map<PartialPath, TimeIndexer> seqIndexers;
+ private Map<PartialPath, TimeIndexer> unseqIndexers;
private ReentrantReadWriteLock lock;
private static final Logger logger = LoggerFactory.getLogger(IndexerManager.class);
@@ -69,31 +68,43 @@ public class IndexerManager {
return true;
}
- public void addSeqIndexer(PartialPath storageGroup, DeviceTimeIndexer deviceTimeIndexer) {
+ public void addSeqIndexer(PartialPath storageGroup, TimeIndexer TimeIndexer) {
lock.writeLock().lock();
- seqIndexers.put(storageGroup, deviceTimeIndexer);
- lock.writeLock().unlock();
+ try {
+ seqIndexers.put(storageGroup, TimeIndexer);
+ } finally {
+ lock.writeLock().unlock();
+ }
}
- public void addUnseqIndexer(PartialPath storageGroup, DeviceTimeIndexer deviceTimeIndexer) {
+ public void addUnseqIndexer(PartialPath storageGroup, TimeIndexer TimeIndexer) {
lock.writeLock().lock();
- unseqIndexers.put(storageGroup, deviceTimeIndexer);
- lock.writeLock().unlock();
+ try {
+ unseqIndexers.put(storageGroup, TimeIndexer);
+ } finally {
+ lock.writeLock().unlock();
+ }
}
public void deleteSeqIndexer(PartialPath storageGroup) {
lock.writeLock().lock();
- seqIndexers.remove(storageGroup);
- lock.writeLock().unlock();
+ try {
+ seqIndexers.remove(storageGroup);
+ } finally {
+ lock.writeLock().unlock();
+ }
}
public void deleteUnseqIndexer(PartialPath storageGroup) {
lock.writeLock().lock();
- unseqIndexers.remove(storageGroup);
- lock.writeLock().unlock();
+ try {
+ unseqIndexers.remove(storageGroup);
+ } finally {
+ lock.writeLock().unlock();
+ }
}
- public DeviceTimeIndexer getSeqIndexer(PartialPath storageGroup) {
+ public TimeIndexer getSeqIndexer(PartialPath storageGroup) {
lock.readLock().lock();
try {
return seqIndexers.get(storageGroup);
@@ -102,13 +113,13 @@ public class IndexerManager {
}
}
- public DeviceTimeIndexer getSeqIndexer(String storageGroup) {
+ public TimeIndexer getSeqIndexer(String storageGroup) throws IllegalPathException {
PartialPath sgName;
try {
sgName = new PartialPath(storageGroup);
} catch (IllegalPathException e) {
- logger.warn("Fail to get DeviceTimeIndexer for storage group {}, err:{}", storageGroup, e.getMessage());
- return null;
+ logger.warn("Fail to get TimeIndexer for storage group {}, err:{}", storageGroup, e.getMessage());
+ throw e;
}
lock.readLock().lock();
try {
@@ -118,7 +129,7 @@ public class IndexerManager {
}
}
- public DeviceTimeIndexer getUnseqIndexer(PartialPath storageGroup) {
+ public TimeIndexer getUnseqIndexer(PartialPath storageGroup) {
lock.readLock().lock();
try {
return unseqIndexers.get(storageGroup);
@@ -127,13 +138,13 @@ public class IndexerManager {
}
}
- public DeviceTimeIndexer getUnseqIndexer(String storageGroup) {
+ public TimeIndexer getUnseqIndexer(String storageGroup) throws IllegalPathException {
PartialPath sgName;
try {
sgName = new PartialPath(storageGroup);
} catch (IllegalPathException e) {
- logger.warn("Fail to get DeviceTimeIndexer for storage group {}, err:{}", storageGroup, e.getMessage());
- return null;
+ logger.warn("Fail to get TimeIndexer for storage group {}, err:{}", storageGroup, e.getMessage());
+ throw e;
}
lock.readLock().lock();
try {
diff --git a/server/src/main/java/org/apache/iotdb/db/timeIndex/device/DeviceIndex.java b/server/src/main/java/org/apache/iotdb/db/timeIndex/TimeIndex.java
similarity index 80%
rename from server/src/main/java/org/apache/iotdb/db/timeIndex/device/DeviceIndex.java
rename to server/src/main/java/org/apache/iotdb/db/timeIndex/TimeIndex.java
index bafa0c1..261033d 100644
--- a/server/src/main/java/org/apache/iotdb/db/timeIndex/device/DeviceIndex.java
+++ b/server/src/main/java/org/apache/iotdb/db/timeIndex/TimeIndex.java
@@ -16,25 +16,25 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.timeIndex.device;
+package org.apache.iotdb.db.timeIndex;
import org.apache.iotdb.db.metadata.PartialPath;
/**
- * Device Index, like [(deviceId, startTime, endTime, TsFilePath)] to accelerate query
+ * Time Index, like deviceIndexer: [(deviceId, startTime, endTime, TsFilePath)] to accelerate query
*/
-public class DeviceIndex {
- private PartialPath[] deviceIds;
+public class TimeIndex {
+ private PartialPath[] paths;
private long[] startTimes;
private long[] endTimes;
private String tsFilePath;
- public PartialPath[] getDeviceIds() {
- return deviceIds;
+ public PartialPath[] getPaths() {
+ return paths;
}
- public void setDeviceIds(PartialPath[] deviceIds) {
- this.deviceIds = deviceIds;
+ public void setPaths(PartialPath[] paths) {
+ this.paths = paths;
}
public long[] getStartTimes() {
diff --git a/server/src/main/java/org/apache/iotdb/db/timeIndex/TimeIndexer.java b/server/src/main/java/org/apache/iotdb/db/timeIndex/TimeIndexer.java
index a2d9d17..36e9297 100644
--- a/server/src/main/java/org/apache/iotdb/db/timeIndex/TimeIndexer.java
+++ b/server/src/main/java/org/apache/iotdb/db/timeIndex/TimeIndexer.java
@@ -1,8 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.iotdb.db.timeIndex;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.timeIndex.device.UpdateIndexsParam;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import java.util.List;
@@ -55,7 +72,7 @@ public interface TimeIndexer {
* @param tsFilePath
* @return
*/
- public boolean addIndexForPaths(Map<PartialPath, Integer> paths, long[] startTimes, long[] endTimes, String tsFilePath);
+ public boolean addIndexForPaths(Map<String, Integer> paths, long[] startTimes, long[] endTimes, String tsFilePath);
/**
* delete one index for the path
@@ -85,14 +102,14 @@ public interface TimeIndexer {
* @param tsFilePath
* @return
*/
- public boolean deleteIndexForDevices(Map<PartialPath, Integer> paths, long[] startTimes, long[] endTimes, String tsFilePath);
+ public boolean deleteIndexForPaths(Map<String, Integer> paths, long[] startTimes, long[] endTimes, String tsFilePath);
/**
* after merge, we should keep the index is updated in consistency
* @param updateIndexsParam
* @return whether success
*/
- public boolean updateIndexForDevices(UpdateIndexsParam updateIndexsParam);
+ public boolean updateIndexForPaths(UpdateIndexsParam updateIndexsParam);
/**
* found the related tsFile(only cover sealed tsfile) for one deviceId
@@ -100,5 +117,5 @@ public interface TimeIndexer {
* @param timeFilter
* @return whether success
*/
- public List<TsFileResource> filterByOneDevice(PartialPath path, Filter timeFilter);
+ public List<TsFileResource> filterByPath(PartialPath path, Filter timeFilter);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/timeIndex/device/UpdateIndexsParam.java b/server/src/main/java/org/apache/iotdb/db/timeIndex/UpdateIndexsParam.java
similarity index 74%
rename from server/src/main/java/org/apache/iotdb/db/timeIndex/device/UpdateIndexsParam.java
rename to server/src/main/java/org/apache/iotdb/db/timeIndex/UpdateIndexsParam.java
index e99fc3f..e7b14b7 100644
--- a/server/src/main/java/org/apache/iotdb/db/timeIndex/device/UpdateIndexsParam.java
+++ b/server/src/main/java/org/apache/iotdb/db/timeIndex/UpdateIndexsParam.java
@@ -16,28 +16,28 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.timeIndex.device;
+package org.apache.iotdb.db.timeIndex;
/**
- * to atomic update indexs for device
+ * to atomic update indexs for path
*/
public class UpdateIndexsParam {
- private DeviceIndex[] oldIndexs;
- private DeviceIndex newIndex;
+ private TimeIndex[] oldIndexs;
+ private TimeIndex newIndex;
- public DeviceIndex[] getOldIndexs() {
+ public TimeIndex[] getOldIndexs() {
return oldIndexs;
}
- public void setOldIndexs(DeviceIndex[] oldIndexs) {
+ public void setOldIndexs(TimeIndex[] oldIndexs) {
this.oldIndexs = oldIndexs;
}
- public DeviceIndex getNewIndex() {
+ public TimeIndex getNewIndex() {
return newIndex;
}
- public void setNewIndex(DeviceIndex newIndex) {
+ public void setNewIndex(TimeIndex newIndex) {
this.newIndex = newIndex;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/timeIndex/device/DeviceTimeIndexer.java b/server/src/main/java/org/apache/iotdb/db/timeIndex/device/DeviceTimeIndexer.java
deleted file mode 100644
index d846d0d..0000000
--- a/server/src/main/java/org/apache/iotdb/db/timeIndex/device/DeviceTimeIndexer.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.timeIndex.device;
-
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * manage the index [deviceId, (startTime, endTime, TsFilePath)] for all devices
- */
-public interface DeviceTimeIndexer {
- /**
- * init the Indexer when IoTDB start
- * @return whether success
- */
- public boolean init();
-
- /**
- * may do some prepared work before operation
- * @return whether success
- */
- public boolean begin();
-
- /**
- * may do some resource release after operation
- * @return whether success
- */
- public boolean end();
-
- /**
- * add one index for the deviceId
- * @param deviceId
- * @param startTime
- * @param endTime
- * @param tsFilePath
- * @return whether success
- */
- public boolean addIndexForDevice(PartialPath deviceId, long startTime, long endTime, String tsFilePath);
-
- /**
- * add one index for the deviceId
- * @param deviceId
- * @param startTime
- * @param endTime
- * @param tsFilePath
- * @return
- */
- public boolean addIndexForDevice(String deviceId, long startTime, long endTime, String tsFilePath);
-
- /**
- * add all indexs for a flushed tsFile
- * @param deviceIds
- * @param startTimes
- * @param endTimes
- * @param tsFilePath
- * @return
- */
- public boolean addIndexForDevices(Map<String, Integer> deviceIds, long[] startTimes, long[] endTimes, String tsFilePath);
-
- /**
- * delete one index for the deviceId
- * @param deviceId
- * @param startTime
- * @param endTime
- * @param tsFilePath
- * @return whether success
- */
- public boolean deleteIndexForDevice(PartialPath deviceId, long startTime, long endTime, String tsFilePath);
-
- /**
- * delete one index for the deviceId
- * @param deviceId
- * @param startTime
- * @param endTime
- * @param tsFilePath
- * @return
- */
- public boolean deleteIndexForDevice(String deviceId, long startTime, long endTime, String tsFilePath);
-
- /**
- * delete all index for one deleted tsfile
- * @param deviceIds
- * @param startTimes
- * @param endTimes
- * @param tsFilePath
- * @return
- */
- public boolean deleteIndexForDevices(Map<String, Integer> deviceIds, long[] startTimes, long[] endTimes, String tsFilePath);
-
- /**
- * after merge, we should keep the index is updated in consistency
- * @param updateIndexsParam
- * @return whether success
- */
- public boolean updateIndexForDevices(UpdateIndexsParam updateIndexsParam);
-
- /**
- * found the related tsFile(only cover sealed tsfile) for one deviceId
- * @param deviceId
- * @param timeFilter
- * @return whether success
- */
- public List<TsFileResource> filterByOneDevice(PartialPath deviceId, Filter timeFilter);
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/timeIndex/device/LoadAllTimeIndexer.java b/server/src/main/java/org/apache/iotdb/db/timeIndex/device/LoadAllTimeIndexer.java
index c5dab56..7cd51f7 100644
--- a/server/src/main/java/org/apache/iotdb/db/timeIndex/device/LoadAllTimeIndexer.java
+++ b/server/src/main/java/org/apache/iotdb/db/timeIndex/device/LoadAllTimeIndexer.java
@@ -20,12 +20,14 @@ package org.apache.iotdb.db.timeIndex.device;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.timeIndex.TimeIndexer;
+import org.apache.iotdb.db.timeIndex.UpdateIndexsParam;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import java.util.List;
import java.util.Map;
-public class LoadAllTimeIndexer implements DeviceTimeIndexer {
+public class LoadAllTimeIndexer implements TimeIndexer {
public LoadAllTimeIndexer() {
@@ -47,42 +49,42 @@ public class LoadAllTimeIndexer implements DeviceTimeIndexer {
}
@Override
- public boolean addIndexForDevice(PartialPath deviceId, long startTime, long endTime, String tsFilePath) {
+ public boolean addIndexForPath(PartialPath path, long startTime, long endTime, String tsFilePath) {
return false;
}
@Override
- public boolean addIndexForDevice(String deviceId, long startTime, long endTime, String tsFilePath) {
+ public boolean addIndexForPath(String path, long startTime, long endTime, String tsFilePath) {
return false;
}
@Override
- public boolean addIndexForDevices(Map<String, Integer> deviceIds, long[] startTimes, long[] endTimes, String tsFilePath) {
+ public boolean addIndexForPaths(Map<String, Integer> paths, long[] startTimes, long[] endTimes, String tsFilePath) {
return false;
}
@Override
- public boolean deleteIndexForDevice(PartialPath deviceId, long startTime, long endTime, String tsFilePath) {
+ public boolean deleteIndexForPath(PartialPath path, long startTime, long endTime, String tsFilePath) {
return false;
}
@Override
- public boolean deleteIndexForDevice(String deviceId, long startTime, long endTime, String tsFilePath) {
+ public boolean deleteIndexForPath(String path, long startTime, long endTime, String tsFilePath) {
return false;
}
@Override
- public boolean deleteIndexForDevices(Map<String, Integer> deviceIds, long[] startTimes, long[] endTimes, String tsFilePath) {
+ public boolean deleteIndexForPaths(Map<String, Integer> paths, long[] startTimes, long[] endTimes, String tsFilePath) {
return false;
}
@Override
- public boolean updateIndexForDevices(UpdateIndexsParam updateIndexsParam) {
+ public boolean updateIndexForPaths(UpdateIndexsParam updateIndexsParam) {
return false;
}
@Override
- public List<TsFileResource> filterByOneDevice(PartialPath deviceId, Filter timeFilter) {
+ public List<TsFileResource> filterByPath(PartialPath path, Filter timeFilter) {
return null;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/timeIndex/device/RocksDBDeviceTimeIndexer.java b/server/src/main/java/org/apache/iotdb/db/timeIndex/device/RocksDBDeviceTimeIndexer.java
index 62fa0cf..9635232 100644
--- a/server/src/main/java/org/apache/iotdb/db/timeIndex/device/RocksDBDeviceTimeIndexer.java
+++ b/server/src/main/java/org/apache/iotdb/db/timeIndex/device/RocksDBDeviceTimeIndexer.java
@@ -20,12 +20,14 @@ package org.apache.iotdb.db.timeIndex.device;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.timeIndex.TimeIndexer;
+import org.apache.iotdb.db.timeIndex.UpdateIndexsParam;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import java.util.List;
import java.util.Map;
-public class RocksDBDeviceTimeIndexer implements DeviceTimeIndexer {
+public class RocksDBDeviceTimeIndexer implements TimeIndexer {
@Override
public boolean init() {
return false;
@@ -42,42 +44,42 @@ public class RocksDBDeviceTimeIndexer implements DeviceTimeIndexer {
}
@Override
- public boolean addIndexForDevice(PartialPath deviceId, long startTime, long endTime, String tsFilePath) {
+ public boolean addIndexForPath(PartialPath path, long startTime, long endTime, String tsFilePath) {
return false;
}
@Override
- public boolean addIndexForDevice(String deviceId, long startTime, long endTime, String tsFilePath) {
+ public boolean addIndexForPath(String path, long startTime, long endTime, String tsFilePath) {
return false;
}
@Override
- public boolean addIndexForDevices(Map<String, Integer> deviceIds, long[] startTimes, long[] endTimes, String tsFilePath) {
+ public boolean addIndexForPaths(Map<String, Integer> paths, long[] startTimes, long[] endTimes, String tsFilePath) {
return false;
}
@Override
- public boolean deleteIndexForDevice(PartialPath deviceId, long startTime, long endTime, String tsFilePath) {
+ public boolean deleteIndexForPath(PartialPath path, long startTime, long endTime, String tsFilePath) {
return false;
}
@Override
- public boolean deleteIndexForDevice(String deviceId, long startTime, long endTime, String tsFilePath) {
+ public boolean deleteIndexForPath(String path, long startTime, long endTime, String tsFilePath) {
return false;
}
@Override
- public boolean deleteIndexForDevices(Map<String, Integer> deviceIds, long[] startTimes, long[] endTimes, String tsFilePath) {
+ public boolean deleteIndexForPaths(Map<String, Integer> deviceIds, long[] startTimes, long[] endTimes, String tsFilePath) {
return false;
}
@Override
- public boolean updateIndexForDevices(UpdateIndexsParam updateIndexsParam) {
+ public boolean updateIndexForPaths(UpdateIndexsParam updateIndexsParam) {
return false;
}
@Override
- public List<TsFileResource> filterByOneDevice(PartialPath deviceId, Filter timeFilter) {
+ public List<TsFileResource> filterByPath(PartialPath path, Filter timeFilter) {
return null;
}
}