You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2022/01/05 02:17:27 UTC
[iotdb] 01/02: init
This is an automated email from the ASF dual-hosted git repository.
xuekaifeng pushed a commit to branch xkf_id_table_flush_time
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3355a9ecd2dc68871bd2e76f71daee9c10b7a314
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Wed Jan 5 10:14:52 2022 +0800
init
---
.../storagegroup/IDTableFlushTimeManager.java | 212 +++++++++++++++++++
.../storagegroup/VirtualStorageGroupProcessor.java | 78 +++----
.../apache/iotdb/db/metadata/idtable/IDTable.java | 16 ++
.../db/metadata/idtable/IDTableHashmapImpl.java | 39 +++-
.../db/metadata/idtable/entry/DeviceEntry.java | 65 ++++++
.../db/metadata/idtable/entry/DeviceIDFactory.java | 16 +-
.../db/metadata/idtable/IDTableFlushTimeTest.java | 224 +++++++++++++++++++++
7 files changed, 605 insertions(+), 45 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableFlushTimeManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableFlushTimeManager.java
new file mode 100644
index 0000000..b7ff81b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableFlushTimeManager.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.storagegroup;
+
+import org.apache.iotdb.db.metadata.idtable.IDTable;
+import org.apache.iotdb.db.metadata.idtable.entry.DeviceEntry;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * This class manages last time and flush time for sequence and unsequence determination This class
+ * This class is NOT thread safe, caller should ensure synchronization This class not support
+ * upgrade
+ */
+public class IDTableFlushTimeManager implements ILastFlushTimeManager {
+ private static final Logger logger = LoggerFactory.getLogger(LastFlushTimeManager.class);
+
+ IDTable idTable;
+
+ public IDTableFlushTimeManager(IDTable idTable) {
+ this.idTable = idTable;
+ }
+
+ // region set
+ @Override
+ public void setLastTimeAll(long timePartitionId, Map<String, Long> lastTimeMap) {
+ for (Map.Entry<String, Long> entry : lastTimeMap.entrySet()) {
+ idTable.getDeviceEntry(entry.getKey()).putLastTimeMap(timePartitionId, entry.getValue());
+ }
+ }
+
+ @Override
+ public void setLastTime(long timePartitionId, String path, long time) {
+ idTable.getDeviceEntry(path).putLastTimeMap(timePartitionId, time);
+ }
+
+ @Override
+ public void setFlushedTimeAll(long timePartitionId, Map<String, Long> flushedTimeMap) {
+ for (Map.Entry<String, Long> entry : flushedTimeMap.entrySet()) {
+ idTable.getDeviceEntry(entry.getKey()).putFlushTimeMap(timePartitionId, entry.getValue());
+ }
+ }
+
+ @Override
+ public void setFlushedTime(long timePartitionId, String path, long time) {
+ idTable.getDeviceEntry(path).putFlushTimeMap(timePartitionId, time);
+ }
+
+ @Override
+ public void setGlobalFlushedTimeAll(Map<String, Long> globalFlushedTimeMap) {
+ for (Map.Entry<String, Long> entry : globalFlushedTimeMap.entrySet()) {
+ idTable.getDeviceEntry(entry.getKey()).setGlobalFlushTime(entry.getValue());
+ }
+ }
+
+ @Override
+ public void setGlobalFlushedTime(String path, long time) {
+ idTable.getDeviceEntry(path).setGlobalFlushTime(time);
+ }
+
+ // endregion
+
+ // region update
+
+ @Override
+ public void updateLastTime(long timePartitionId, String path, long time) {
+ idTable.getDeviceEntry(path).updateLastTimeMap(timePartitionId, time);
+ }
+
+ @Override
+ public void updateFlushedTime(long timePartitionId, String path, long time) {
+ idTable.getDeviceEntry(path).updateFlushTimeMap(timePartitionId, time);
+ }
+
+ @Override
+ public void updateGlobalFlushedTime(String path, long time) {
+ idTable.getDeviceEntry(path).updateGlobalFlushTime(time);
+ }
+
+ @Override
+ public void updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(
+ long partitionId, String deviceId, long time) {
+ throw new UnsupportedOperationException("IDTableFlushTimeManager doesn't support upgrade");
+ }
+
+ // endregion
+
+ // region ensure
+
+ @Override
+ public void ensureLastTimePartition(long timePartitionId) {
+ // do nothing is correct
+ }
+
+ @Override
+ public void ensureFlushedTimePartition(long timePartitionId) {
+ // do nothing is correct
+ }
+
+ @Override
+ public long ensureFlushedTimePartitionAndInit(long timePartitionId, String path, long initTime) {
+ return idTable.getDeviceEntry(path).updateFlushTimeMap(timePartitionId, initTime);
+ }
+
+ // endregion
+
+ // region upgrade support methods
+
+ @Override
+ public void applyNewlyFlushedTimeToFlushedTime() {
+ throw new UnsupportedOperationException("IDTableFlushTimeManager doesn't support upgrade");
+ }
+
+ /**
+ * update latest flush time for partition id
+ *
+ * @param partitionId partition id
+ * @param latestFlushTime lastest flush time
+ * @return true if update latest flush time success
+ */
+ @Override
+ public boolean updateLatestFlushTimeToPartition(long partitionId, long latestFlushTime) {
+ for (DeviceEntry deviceEntry : idTable.getAllDeviceEntry()) {
+ deviceEntry.putLastTimeMap(partitionId, latestFlushTime);
+ deviceEntry.putFlushTimeMap(partitionId, latestFlushTime);
+ deviceEntry.updateGlobalFlushTime(latestFlushTime);
+ }
+
+ return true;
+ }
+
+ @Override
+ public boolean updateLatestFlushTime(long partitionId) {
+ boolean updated = false;
+
+ for (DeviceEntry deviceEntry : idTable.getAllDeviceEntry()) {
+ Long lastTime = deviceEntry.getLastTime(partitionId);
+ if (lastTime == null) {
+ continue;
+ }
+
+ updated = true;
+ deviceEntry.putFlushTimeMap(partitionId, lastTime);
+ deviceEntry.updateGlobalFlushTime(lastTime);
+ }
+
+ return updated;
+ }
+
+ // endregion
+
+ // region query
+ @Override
+ public long getFlushedTime(long timePartitionId, String path) {
+ return idTable.getDeviceEntry(path).getFLushTimeWithDefaultValue(timePartitionId);
+ }
+
+ @Override
+ public long getLastTime(long timePartitionId, String path) {
+ return idTable.getDeviceEntry(path).getLastTimeWithDefaultValue(timePartitionId);
+ }
+
+ @Override
+ public long getGlobalFlushedTime(String path) {
+ return idTable.getDeviceEntry(path).getGlobalFlushTime();
+ }
+
+ // endregion
+
+ // region clear
+ @Override
+ public void clearLastTime() {
+ for (DeviceEntry deviceEntry : idTable.getAllDeviceEntry()) {
+ deviceEntry.clearLastTime();
+ }
+ }
+
+ @Override
+ public void clearFlushedTime() {
+ for (DeviceEntry deviceEntry : idTable.getAllDeviceEntry()) {
+ deviceEntry.clearFlushTime();
+ }
+ }
+
+ @Override
+ public void clearGlobalFlushedTime() {
+ for (DeviceEntry deviceEntry : idTable.getAllDeviceEntry()) {
+ deviceEntry.setGlobalFlushTime(Long.MIN_VALUE);
+ }
+ }
+ // endregion
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
index 12cd817..db351f1 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
@@ -18,6 +18,39 @@
*/
package org.apache.iotdb.db.engine.storagegroup;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
+import static org.apache.iotdb.db.engine.compaction.cross.inplace.task.CrossSpaceMergeTask.MERGE_SUFFIX;
+import static org.apache.iotdb.db.engine.compaction.inner.utils.SizeTieredCompactionLogger.COMPACTION_LOG_NAME;
+import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Pattern;
+import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -89,44 +122,9 @@ import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
-
-import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Pattern;
-
-import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
-import static org.apache.iotdb.db.engine.compaction.cross.inplace.task.CrossSpaceMergeTask.MERGE_SUFFIX;
-import static org.apache.iotdb.db.engine.compaction.inner.utils.SizeTieredCompactionLogger.COMPACTION_LOG_NAME;
-import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
-
/**
* For sequence data, a StorageGroupProcessor has some TsFileProcessors, in which there is only one
* TsFileProcessor in the working status. <br>
@@ -262,7 +260,7 @@ public class VirtualStorageGroupProcessor {
// DEFAULT_POOL_TRIM_INTERVAL_MILLIS
private long timeWhenPoolNotEmpty = Long.MAX_VALUE;
- private LastFlushTimeManager lastFlushTimeManager = new LastFlushTimeManager();
+ private ILastFlushTimeManager lastFlushTimeManager = new LastFlushTimeManager();
/**
* record the insertWriteLock in SG is being hold by which method, it will be empty string if on
@@ -396,9 +394,12 @@ public class VirtualStorageGroupProcessor {
if (config.isEnableIDTable()) {
try {
idTable = IDTableManager.getInstance().getIDTable(new PartialPath(logicalStorageGroupName));
+ lastFlushTimeManager = new IDTableFlushTimeManager(idTable);
} catch (IllegalPathException e) {
logger.error("failed to create id table");
}
+ } else {
+ lastFlushTimeManager = new LastFlushTimeManager();
}
recover();
if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric()) {
@@ -3233,4 +3234,9 @@ public class VirtualStorageGroupProcessor {
public IDTable getIdTable() {
return idTable;
}
+
+ @TestOnly
+ public ILastFlushTimeManager getLastFlushTimeManager() {
+ return lastFlushTimeManager;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java
index 07f4df7..c04e063 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java
@@ -40,6 +40,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
public interface IDTable {
@@ -139,6 +140,21 @@ public interface IDTable {
void clear() throws IOException;
/**
+ * get device entry from device path
+ *
+ * @param deviceName device name of the time series
+ * @return device entry of the timeseries
+ */
+ public DeviceEntry getDeviceEntry(String deviceName);
+
+ /**
+ * get all device entries
+ *
+ * @return all device entries
+ */
+ public List<DeviceEntry> getAllDeviceEntry();
+
+ /**
* translate query path's device path to device id
*
* @param fullPath full query path
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java
index a4faf6a..fd3a99f 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java
@@ -48,7 +48,9 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
/** id table belongs to a storage group and mapping timeseries path to it's schema */
@@ -88,7 +90,7 @@ public class IDTableHashmapImpl implements IDTable {
*/
public synchronized void createAlignedTimeseries(CreateAlignedTimeSeriesPlan plan)
throws MetadataException {
- DeviceEntry deviceEntry = getDeviceEntry(plan.getPrefixPath(), true);
+ DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(plan.getPrefixPath(), true);
for (int i = 0; i < plan.getMeasurements().size(); i++) {
PartialPath fullPath =
@@ -112,7 +114,7 @@ public class IDTableHashmapImpl implements IDTable {
* @throws MetadataException if the device is aligned, throw it
*/
public synchronized void createTimeseries(CreateTimeSeriesPlan plan) throws MetadataException {
- DeviceEntry deviceEntry = getDeviceEntry(plan.getPath().getDevicePath(), false);
+ DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(plan.getPath().getDevicePath(), false);
SchemaEntry schemaEntry =
new SchemaEntry(
plan.getDataType(),
@@ -137,7 +139,7 @@ public class IDTableHashmapImpl implements IDTable {
IMeasurementMNode[] measurementMNodes = plan.getMeasurementMNodes();
// 1. get device entry and check align
- DeviceEntry deviceEntry = getDeviceEntry(devicePath, plan.isAligned());
+ DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(devicePath, plan.isAligned());
// 2. get schema of each measurement
for (int i = 0; i < measurementList.length; i++) {
@@ -222,7 +224,7 @@ public class IDTableHashmapImpl implements IDTable {
public synchronized void registerTrigger(PartialPath fullPath, IMeasurementMNode measurementMNode)
throws MetadataException {
boolean isAligned = measurementMNode.getParent().isAligned();
- DeviceEntry deviceEntry = getDeviceEntry(fullPath.getDevicePath(), isAligned);
+ DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(fullPath.getDevicePath(), isAligned);
deviceEntry.getSchemaEntry(fullPath.getMeasurement()).setUsingTrigger();
}
@@ -237,7 +239,7 @@ public class IDTableHashmapImpl implements IDTable {
public synchronized void deregisterTrigger(
PartialPath fullPath, IMeasurementMNode measurementMNode) throws MetadataException {
boolean isAligned = measurementMNode.getParent().isAligned();
- DeviceEntry deviceEntry = getDeviceEntry(fullPath.getDevicePath(), isAligned);
+ DeviceEntry deviceEntry = getDeviceEntryWithAlignedCheck(fullPath.getDevicePath(), isAligned);
deviceEntry.getSchemaEntry(fullPath.getMeasurement()).setUnUsingTrigger();
}
@@ -279,6 +281,31 @@ public class IDTableHashmapImpl implements IDTable {
}
/**
+ * get device entry from device path
+ *
+ * @param deviceName device name of the time series
+ * @return device entry of the timeseries
+ */
+ @Override
+ public DeviceEntry getDeviceEntry(String deviceName) {
+ IDeviceID deviceID = DeviceIDFactory.getInstance().getDeviceID(deviceName);
+ int slot = calculateSlot(deviceID);
+
+ // reuse device entry in map
+ return idTables[slot].get(deviceID);
+ }
+
+ @Override
+ public List<DeviceEntry> getAllDeviceEntry() {
+ List<DeviceEntry> res = new ArrayList<>();
+ for (int i = 0; i < NUM_OF_SLOTS; i++) {
+ res.addAll(idTables[i].values());
+ }
+
+ return res;
+ }
+
+ /**
* check whether a time series is exist if exist, check the type consistency if not exist, call
* MManager to create it
*
@@ -345,7 +372,7 @@ public class IDTableHashmapImpl implements IDTable {
* @param isAligned whether the insert plan is aligned
* @return device entry of the timeseries
*/
- private DeviceEntry getDeviceEntry(PartialPath deviceName, boolean isAligned)
+ private DeviceEntry getDeviceEntryWithAlignedCheck(PartialPath deviceName, boolean isAligned)
throws MetadataException {
IDeviceID deviceID = DeviceIDFactory.getInstance().getDeviceID(deviceName);
int slot = calculateSlot(deviceID);
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceEntry.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceEntry.java
index eb7041c..c4f0369 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceEntry.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceEntry.java
@@ -32,9 +32,18 @@ public class DeviceEntry {
boolean isAligned;
+ // for manages flush time
+ Map<Long, Long> lastTimeMap;
+
+ Map<Long, Long> flushTimeMap;
+
+ long globalFlushTime = Long.MIN_VALUE;
+
public DeviceEntry(IDeviceID deviceID) {
this.deviceID = deviceID;
measurementMap = new HashMap<>();
+ lastTimeMap = new HashMap<>();
+ flushTimeMap = new HashMap<>();
}
/**
@@ -78,4 +87,60 @@ public class DeviceEntry {
public IDeviceID getDeviceID() {
return deviceID;
}
+
+ // region support flush time
+ public void putLastTimeMap(long timePartition, long lastTime) {
+ lastTimeMap.put(timePartition, lastTime);
+ }
+
+ public void putFlushTimeMap(long timePartition, long flushTime) {
+ flushTimeMap.put(timePartition, flushTime);
+ }
+
+ public long updateLastTimeMap(long timePartition, long lastTime) {
+ return lastTimeMap.compute(
+ timePartition, (k, v) -> v == null ? lastTime : Math.max(v, lastTime));
+ }
+
+ public long updateFlushTimeMap(long timePartition, long flushTime) {
+ return flushTimeMap.compute(
+ timePartition, (k, v) -> v == null ? flushTime : Math.max(v, flushTime));
+ }
+
+ public void updateGlobalFlushTime(long flushTime) {
+ globalFlushTime = Math.max(globalFlushTime, flushTime);
+ }
+
+ public void setGlobalFlushTime(long globalFlushTime) {
+ this.globalFlushTime = globalFlushTime;
+ }
+
+ public Long getLastTime(long timePartition) {
+ return lastTimeMap.get(timePartition);
+ }
+
+ public Long getFlushTime(long timePartition) {
+ return flushTimeMap.get(timePartition);
+ }
+
+ public Long getLastTimeWithDefaultValue(long timePartition) {
+ return lastTimeMap.getOrDefault(timePartition, Long.MIN_VALUE);
+ }
+
+ public Long getFLushTimeWithDefaultValue(long timePartition) {
+ return flushTimeMap.getOrDefault(timePartition, Long.MIN_VALUE);
+ }
+
+ public long getGlobalFlushTime() {
+ return globalFlushTime;
+ }
+
+ public void clearLastTime() {
+ lastTimeMap.clear();
+ }
+
+ public void clearFlushTime() {
+ flushTimeMap.clear();
+ }
+ // endregion
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceIDFactory.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceIDFactory.java
index ba47c35..c4a9676 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceIDFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/entry/DeviceIDFactory.java
@@ -27,7 +27,7 @@ import java.util.function.Function;
/** factory to build device id according to configured algorithm */
public class DeviceIDFactory {
- Function<PartialPath, IDeviceID> getDeviceIDFunction;
+ Function<String, IDeviceID> getDeviceIDFunction;
// region DeviceIDFactory Singleton
private static class DeviceIDFactoryHolder {
@@ -54,9 +54,9 @@ public class DeviceIDFactory {
.getConfig()
.getDeviceIDTransformationMethod()
.equals("SHA256")) {
- getDeviceIDFunction = partialPath -> new SHA256DeviceID(partialPath.toString());
+ getDeviceIDFunction = SHA256DeviceID::new;
} else {
- getDeviceIDFunction = partialPath -> new PlainDeviceID(partialPath.toString());
+ getDeviceIDFunction = PlainDeviceID::new;
}
}
// endregion
@@ -68,6 +68,16 @@ public class DeviceIDFactory {
* @return device id of the timeseries
*/
public IDeviceID getDeviceID(PartialPath devicePath) {
+ return getDeviceIDFunction.apply(devicePath.toString());
+ }
+
+ /**
+ * get device id by full path
+ *
+ * @param devicePath device path of the timeseries
+ * @return device id of the timeseries
+ */
+ public IDeviceID getDeviceID(String devicePath) {
return getDeviceIDFunction.apply(devicePath);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableFlushTimeTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableFlushTimeTest.java
new file mode 100644
index 0000000..32590f6
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/idtable/IDTableFlushTimeTest.java
@@ -0,0 +1,224 @@
+/*
+ *Licensed to the Apache Software Foundation(ASF)under one
+ *or more contributor license agreements.See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.The ASF licenses this file
+ *to you under the Apache License,Version2.0(the
+ *"License");you may not use this file except in compliance
+ *with the License.You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS"BASIS,WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *KIND,either express or implied.See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+
+package org.apache.iotdb.db.metadata.idtable;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.Planner;
+import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.DeletePartitionPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IDTableFlushTimeTest {
+ private PlanExecutor executor = new PlanExecutor();
+
+ private final Planner processor = new Planner();
+
+ private boolean isEnableIDTable = false;
+
+ private String originalDeviceIDTransformationMethod = null;
+
+ private boolean isEnableIDTableLogFile = false;
+
+ public IDTableFlushTimeTest() throws QueryProcessException {}
+
+ @Before
+ public void before() {
+ IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(true);
+ isEnableIDTable = IoTDBDescriptor.getInstance().getConfig().isEnableIDTable();
+ originalDeviceIDTransformationMethod =
+ IoTDBDescriptor.getInstance().getConfig().getDeviceIDTransformationMethod();
+ isEnableIDTableLogFile = IoTDBDescriptor.getInstance().getConfig().isEnableIDTableLogFile();
+
+ IoTDBDescriptor.getInstance().getConfig().setEnableIDTable(true);
+ IoTDBDescriptor.getInstance().getConfig().setDeviceIDTransformationMethod("SHA256");
+ IoTDBDescriptor.getInstance().getConfig().setEnableIDTableLogFile(true);
+ EnvironmentUtils.envSetUp();
+ }
+
+ @After
+ public void clean() throws IOException, StorageEngineException {
+ IoTDBDescriptor.getInstance().getConfig().setEnableIDTable(isEnableIDTable);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setDeviceIDTransformationMethod(originalDeviceIDTransformationMethod);
+ IoTDBDescriptor.getInstance().getConfig().setEnableIDTableLogFile(isEnableIDTableLogFile);
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void testSequenceInsert()
+ throws MetadataException, QueryProcessException, StorageEngineException {
+ insertData(0);
+ insertData(10);
+ PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
+ executor.processNonQuery(flushPlan);
+
+ insertData(20);
+
+ VirtualStorageGroupProcessor storageGroupProcessor =
+ StorageEngine.getInstance().getProcessor(new PartialPath("root.isp.d1"));
+ assertEquals(2, storageGroupProcessor.getSequenceFileTreeSet().size());
+ assertEquals(0, storageGroupProcessor.getUnSequenceFileList().size());
+ }
+
+ @Test
+ public void testUnSequenceInsert()
+ throws MetadataException, QueryProcessException, StorageEngineException {
+ insertData(100);
+ PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
+ executor.processNonQuery(flushPlan);
+
+ insertData(20);
+
+ VirtualStorageGroupProcessor storageGroupProcessor =
+ StorageEngine.getInstance().getProcessor(new PartialPath("root.isp.d1"));
+ assertEquals(1, storageGroupProcessor.getSequenceFileTreeSet().size());
+ assertEquals(1, storageGroupProcessor.getUnSequenceFileList().size());
+ }
+
+ @Test
+ public void testSequenceAndUnSequenceInsert()
+ throws MetadataException, QueryProcessException, StorageEngineException {
+ // sequence
+ insertData(100);
+ PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
+ executor.processNonQuery(flushPlan);
+
+ // sequence
+ insertData(120);
+ executor.processNonQuery(flushPlan);
+
+ // unsequence
+ insertData(20);
+ // sequence
+ insertData(130);
+ executor.processNonQuery(flushPlan);
+
+ // sequence
+ insertData(150);
+ // unsequence
+ insertData(90);
+
+ VirtualStorageGroupProcessor storageGroupProcessor =
+ StorageEngine.getInstance().getProcessor(new PartialPath("root.isp.d1"));
+ assertEquals(4, storageGroupProcessor.getSequenceFileTreeSet().size());
+ assertEquals(2, storageGroupProcessor.getUnSequenceFileList().size());
+ assertEquals(1, storageGroupProcessor.getWorkSequenceTsFileProcessors().size());
+ assertEquals(1, storageGroupProcessor.getWorkUnsequenceTsFileProcessors().size());
+ }
+
+ @Test
+ public void testDeletePartition()
+ throws MetadataException, QueryProcessException, StorageEngineException {
+ insertData(100);
+ PhysicalPlan flushPlan = processor.parseSQLToPhysicalPlan("flush");
+ executor.processNonQuery(flushPlan);
+ insertData(20);
+ insertData(120);
+
+ VirtualStorageGroupProcessor storageGroupProcessor =
+ StorageEngine.getInstance().getProcessor(new PartialPath("root.isp.d1"));
+
+ assertEquals(
+ 103L, storageGroupProcessor.getLastFlushTimeManager().getFlushedTime(0L, "root.isp.d1"));
+ assertEquals(
+ 123L, storageGroupProcessor.getLastFlushTimeManager().getLastTime(0L, "root.isp.d1"));
+ assertEquals(
+ 103L, storageGroupProcessor.getLastFlushTimeManager().getGlobalFlushedTime("root.isp.d1"));
+
+ // delete time partition
+ Set<Long> deletedPartition = new HashSet<>();
+ deletedPartition.add(0L);
+ DeletePartitionPlan deletePartitionPlan =
+ new DeletePartitionPlan(new PartialPath("root.isp"), deletedPartition);
+ executor.processNonQuery(deletePartitionPlan);
+
+ assertEquals(
+ Long.MIN_VALUE,
+ storageGroupProcessor.getLastFlushTimeManager().getFlushedTime(0L, "root.isp.d1"));
+ assertEquals(
+ Long.MIN_VALUE,
+ storageGroupProcessor.getLastFlushTimeManager().getLastTime(0L, "root.isp.d1"));
+ assertEquals(
+ 123L, storageGroupProcessor.getLastFlushTimeManager().getGlobalFlushedTime("root.isp.d1"));
+ }
+
+ private void insertData(long initTime) throws IllegalPathException, QueryProcessException {
+
+ long[] times = new long[] {initTime, initTime + 1, initTime + 2, initTime + 3};
+ List<Integer> dataTypes = new ArrayList<>();
+ dataTypes.add(TSDataType.DOUBLE.ordinal());
+ dataTypes.add(TSDataType.FLOAT.ordinal());
+ dataTypes.add(TSDataType.INT64.ordinal());
+ dataTypes.add(TSDataType.INT32.ordinal());
+ dataTypes.add(TSDataType.BOOLEAN.ordinal());
+ dataTypes.add(TSDataType.TEXT.ordinal());
+
+ Object[] columns = new Object[6];
+ columns[0] = new double[4];
+ columns[1] = new float[4];
+ columns[2] = new long[4];
+ columns[3] = new int[4];
+ columns[4] = new boolean[4];
+ columns[5] = new Binary[4];
+
+ for (int r = 0; r < 4; r++) {
+ ((double[]) columns[0])[r] = 10.0 + r;
+ ((float[]) columns[1])[r] = 20 + r;
+ ((long[]) columns[2])[r] = 100000 + r;
+ ((int[]) columns[3])[r] = 1000 + r;
+ ((boolean[]) columns[4])[r] = false;
+ ((Binary[]) columns[5])[r] = new Binary("mm" + r);
+ }
+
+ InsertTabletPlan tabletPlan =
+ new InsertTabletPlan(
+ new PartialPath("root.isp.d1"),
+ new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+ dataTypes);
+ tabletPlan.setTimes(times);
+ tabletPlan.setColumns(columns);
+ tabletPlan.setRowCount(times.length);
+
+ PlanExecutor executor = new PlanExecutor();
+ executor.insertTablet(tabletPlan);
+ }
+}