You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xu...@apache.org on 2022/03/14 08:12:06 UTC
[iotdb] 01/02: fix compaction bug
This is an automated email from the ASF dual-hosted git repository.
xuekaifeng pushed a commit to branch xkf_fix_idtable_compaction
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit cfc121917f9204c4c87bc788fc10e8896936bf87
Author: 151250176 <15...@smail.nju.edu.cn>
AuthorDate: Mon Mar 14 15:51:55 2022 +0800
fix compaction bug
---
.../integration/IoTDBCompactionITWithIDTable.java | 352 +++++++++++++++++++++
.../org/apache/iotdb/db/conf/IoTDBConfigCheck.java | 40 ++-
.../db/engine/compaction/CompactionUtils.java | 19 +-
.../manage/CrossSpaceCompactionResource.java | 6 -
.../inner/utils/InnerSpaceCompactionUtils.java | 8 +-
.../apache/iotdb/db/metadata/idtable/IDTable.java | 10 +
.../db/metadata/idtable/IDTableHashmapImpl.java | 28 ++
.../iotdb/db/metadata/idtable/IDTableManager.java | 21 ++
8 files changed, 461 insertions(+), 23 deletions(-)
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBCompactionITWithIDTable.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBCompactionITWithIDTable.java
new file mode 100644
index 0000000..90d1701
--- /dev/null
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBCompactionITWithIDTable.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.integration;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
+import org.apache.iotdb.integration.env.ConfigFactory;
+import org.apache.iotdb.integration.env.EnvFactory;
+import org.apache.iotdb.itbase.category.ClusterTest;
+import org.apache.iotdb.itbase.category.LocalStandaloneTest;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@Category({LocalStandaloneTest.class, ClusterTest.class})
+public class IoTDBCompactionITWithIDTable {
+
+ private static final Logger logger = LoggerFactory.getLogger(IoTDBCompactionIT.class);
+ private long prevPartitionInterval;
+
+ private static boolean isEnableIDTable = false;
+
+ private static String originalDeviceIDTransformationMethod = null;
+
+ @Before
+ public void setUp() throws Exception {
+ prevPartitionInterval = IoTDBDescriptor.getInstance().getConfig().getPartitionInterval();
+ ConfigFactory.getConfig().setPartitionInterval(1);
+ isEnableIDTable = IoTDBDescriptor.getInstance().getConfig().isEnableIDTable();
+ originalDeviceIDTransformationMethod =
+ IoTDBDescriptor.getInstance().getConfig().getDeviceIDTransformationMethod();
+
+ ConfigFactory.getConfig().setEnableIDTable(true);
+ ConfigFactory.getConfig().setDeviceIDTransformationMethod("SHA256");
+
+ EnvFactory.getEnv().initBeforeTest();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ ConfigFactory.getConfig().setEnableIDTable(isEnableIDTable);
+ ConfigFactory.getConfig().setDeviceIDTransformationMethod(originalDeviceIDTransformationMethod);
+
+ EnvFactory.getEnv().cleanAfterTest();
+ ConfigFactory.getConfig().setPartitionInterval(prevPartitionInterval);
+ }
+
+ @Test
+ public void testOverlap() throws SQLException {
+ logger.info("test...");
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("SET STORAGE GROUP TO root.mergeTest");
+ try {
+ statement.execute("CREATE TIMESERIES root.mergeTest.s1 WITH DATATYPE=INT64,ENCODING=PLAIN");
+ } catch (SQLException e) {
+ // ignore
+ }
+
+ statement.execute(
+ String.format("INSERT INTO root.mergeTest(timestamp,s1) VALUES (%d,%d)", 1, 1));
+ statement.execute(
+ String.format("INSERT INTO root.mergeTest(timestamp,s1) VALUES (%d,%d)", 2, 2));
+ statement.execute("FLUSH");
+ statement.execute(
+ String.format("INSERT INTO root.mergeTest(timestamp,s1) VALUES (%d,%d)", 5, 5));
+ statement.execute(
+ String.format("INSERT INTO root.mergeTest(timestamp,s1) VALUES (%d,%d)", 6, 6));
+ statement.execute("FLUSH");
+ statement.execute(
+ String.format("INSERT INTO root.mergeTest(timestamp,s1) VALUES (%d,%d)", 2, 3));
+ statement.execute(
+ String.format("INSERT INTO root.mergeTest(timestamp,s1) VALUES (%d,%d)", 3, 3));
+ statement.execute("FLUSH");
+
+ try (ResultSet resultSet = statement.executeQuery("SELECT * FROM root.mergeTest")) {
+ int cnt = 0;
+ while (resultSet.next()) {
+ long time = resultSet.getLong("Time");
+ long s1 = resultSet.getLong("root.mergeTest.s1");
+ if (time == 2) {
+ assertEquals(3, s1);
+ } else {
+ assertEquals(time, s1);
+ }
+ cnt++;
+ }
+ assertEquals(5, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void test() throws SQLException {
+ logger.info("test...");
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("SET STORAGE GROUP TO root.mergeTest");
+ for (int i = 1; i <= 3; i++) {
+ try {
+ statement.execute(
+ "CREATE TIMESERIES root.mergeTest.s"
+ + i
+ + " WITH DATATYPE=INT64,"
+ + "ENCODING=PLAIN");
+ } catch (SQLException e) {
+ // ignore
+ }
+ }
+
+ for (int i = 0; i < 10; i++) {
+ logger.info("Running the {} round merge", i);
+ for (int j = i * 10 + 1; j <= (i + 1) * 10; j++) {
+ statement.addBatch(
+ String.format(
+ "INSERT INTO root.mergeTest(timestamp,s1,s2,s3) VALUES (%d,%d," + "%d,%d)",
+ j, j + 1, j + 2, j + 3));
+ }
+ statement.executeBatch();
+ statement.execute("FLUSH");
+ for (int j = i * 10 + 1; j <= (i + 1) * 10; j++) {
+ statement.addBatch(
+ String.format(
+ "INSERT INTO root.mergeTest(timestamp,s1,s2,s3) VALUES (%d,%d," + "%d,%d)",
+ j, j + 10, j + 20, j + 30));
+ }
+ statement.executeBatch();
+ statement.execute("FLUSH");
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ int cnt;
+ try (ResultSet resultSet = statement.executeQuery("SELECT * FROM root.mergeTest")) {
+ cnt = 0;
+ while (resultSet.next()) {
+ long time = resultSet.getLong("Time");
+ long s1 = resultSet.getLong("root.mergeTest.s1");
+ long s2 = resultSet.getLong("root.mergeTest.s2");
+ long s3 = resultSet.getLong("root.mergeTest.s3");
+ assertEquals(time + 10, s1);
+ assertEquals(time + 20, s2);
+ assertEquals(time + 30, s3);
+ cnt++;
+ }
+ }
+ assertEquals((i + 1) * 10, cnt);
+ }
+ }
+ }
+
+ @Test
+ public void testInvertedOrder() {
+ logger.info("testInvertedOrder...");
+ // case: seq data and unseq data are written in reverted order
+ // e.g.: write 1. seq [10, 20), 2. seq [20, 30), 3. unseq [20, 30), 4. unseq [10, 20)
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("SET STORAGE GROUP TO root.mergeTest");
+ for (int i = 1; i <= 3; i++) {
+ try {
+ statement.execute(
+ "CREATE TIMESERIES root.mergeTest.s"
+ + i
+ + " WITH DATATYPE=INT64,"
+ + "ENCODING=PLAIN");
+ } catch (SQLException e) {
+ // ignore
+ }
+ }
+
+ for (int j = 10; j < 20; j++) {
+ statement.addBatch(
+ String.format(
+ "INSERT INTO root.mergeTest(timestamp,s1,s2,s3) VALUES (%d,%d," + "%d,%d)",
+ j, j + 1, j + 2, j + 3));
+ }
+ statement.executeBatch();
+ statement.execute("FLUSH");
+ for (int j = 20; j < 30; j++) {
+ statement.addBatch(
+ String.format(
+ "INSERT INTO root.mergeTest(timestamp,s1,s2,s3) VALUES (%d,%d," + "%d,%d)",
+ j, j + 1, j + 2, j + 3));
+ }
+ statement.executeBatch();
+ statement.execute("FLUSH");
+
+ for (int j = 20; j < 30; j++) {
+ statement.addBatch(
+ String.format(
+ "INSERT INTO root.mergeTest(timestamp,s1,s2,s3) VALUES (%d,%d," + "%d,%d)",
+ j, j + 10, j + 20, j + 30));
+ }
+ statement.executeBatch();
+ statement.execute("FLUSH");
+ for (int j = 10; j < 20; j++) {
+ statement.addBatch(
+ String.format(
+ "INSERT INTO root.mergeTest(timestamp,s1,s2,s3) VALUES (%d,%d," + "%d,%d)",
+ j, j + 10, j + 20, j + 30));
+ }
+ statement.executeBatch();
+ statement.execute("FLUSH");
+
+ statement.execute("MERGE");
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ int cnt;
+ try (ResultSet resultSet = statement.executeQuery("SELECT * FROM root.mergeTest")) {
+ cnt = 0;
+ while (resultSet.next()) {
+ long time = resultSet.getLong("Time");
+ long s1 = resultSet.getLong("root.mergeTest.s1");
+ long s2 = resultSet.getLong("root.mergeTest.s2");
+ long s3 = resultSet.getLong("root.mergeTest.s3");
+ assertEquals(cnt + 10, time);
+ assertEquals(time + 10, s1);
+ assertEquals(time + 20, s2);
+ assertEquals(time + 30, s3);
+ cnt++;
+ }
+ }
+ assertEquals(20, cnt);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testCrossPartition() throws SQLException {
+ logger.info("testCrossPartition...");
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("SET STORAGE GROUP TO root.mergeTest");
+ for (int i = 1; i <= 3; i++) {
+ try {
+ statement.execute(
+ "CREATE TIMESERIES root.mergeTest.s"
+ + i
+ + " WITH DATATYPE=INT64,"
+ + "ENCODING=PLAIN");
+ } catch (SQLException e) {
+ // ignore
+ }
+ }
+
+ // file in partition
+ for (int k = 0; k < 7; k++) {
+ // partition num
+ for (int i = 0; i < 10; i++) {
+ // sequence files
+ for (int j = i * 1000 + 300 + k * 100; j <= i * 1000 + 399 + k * 100; j++) {
+ statement.execute(
+ String.format(
+ "INSERT INTO root.mergeTest(timestamp,s1,s2,s3) VALUES (%d,%d," + "%d,%d)",
+ j, j + 1, j + 2, j + 3));
+ }
+ statement.execute("FLUSH");
+ // unsequence files
+ for (int j = i * 1000 + k * 100; j <= i * 1000 + 99 + k * 100; j++) {
+ statement.execute(
+ String.format(
+ "INSERT INTO root.mergeTest(timestamp,s1,s2,s3) VALUES (%d,%d," + "%d,%d)",
+ j, j + 10, j + 20, j + 30));
+ }
+ statement.execute("FLUSH");
+ }
+ }
+
+ statement.execute("MERGE");
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+
+ }
+
+ long totalTime = 0;
+ while (CompactionTaskManager.currentTaskNum.get() > 0) {
+ // wait
+ try {
+ Thread.sleep(1000);
+ totalTime += 1000;
+ if (totalTime > 240_000) {
+ fail();
+ break;
+ }
+ } catch (InterruptedException e) {
+
+ }
+ }
+ int cnt;
+ try (ResultSet resultSet = statement.executeQuery("SELECT * FROM root.mergeTest")) {
+ cnt = 0;
+ while (resultSet.next()) {
+ long time = resultSet.getLong("Time");
+ long s1 = resultSet.getLong("root.mergeTest.s1");
+ long s2 = resultSet.getLong("root.mergeTest.s2");
+ long s3 = resultSet.getLong("root.mergeTest.s3");
+ assertEquals(cnt, time);
+ if (time % 1000 < 700) {
+ assertEquals(time + 10, s1);
+ assertEquals(time + 20, s2);
+ assertEquals(time + 30, s3);
+ } else {
+ assertEquals(time + 1, s1);
+ assertEquals(time + 2, s2);
+ assertEquals(time + 3, s3);
+ }
+ cnt++;
+ }
+ }
+ assertEquals(10000, cnt);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java
index d4492c7..879c748 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfigCheck.java
@@ -18,17 +18,6 @@
*/
package org.apache.iotdb.db.conf;
-import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
-import org.apache.iotdb.db.exception.ConfigurationException;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
-import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
-import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
-
-import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@@ -39,6 +28,15 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
+import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.exception.ConfigurationException;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class IoTDBConfigCheck {
@@ -89,6 +87,12 @@ public class IoTDBConfigCheck {
private static final String VIRTUAL_STORAGE_GROUP_NUM = "virtual_storage_group_num";
private static String virtualStorageGroupNum = String.valueOf(config.getVirtualStorageGroupNum());
+ private static final String ENABLE_ID_TABLE = "enable_id_table";
+ private static String enableIDTable = String.valueOf(config.isEnableIDTable());
+
+ private static final String ENABLE_ID_TABLE_LOG_FILE = "enable_id_table_log_file";
+ private static String enableIdTableLogFile = String.valueOf(config.isEnableIDTableLogFile());
+
private static final String TIME_ENCODER_KEY = "time_encoder";
private static String timeEncoderValue =
String.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
@@ -149,6 +153,8 @@ public class IoTDBConfigCheck {
systemProperties.put(MAX_DEGREE_OF_INDEX_STRING, maxDegreeOfIndexNode);
systemProperties.put(VIRTUAL_STORAGE_GROUP_NUM, virtualStorageGroupNum);
systemProperties.put(TIME_ENCODER_KEY, timeEncoderValue);
+ systemProperties.put(ENABLE_ID_TABLE, enableIDTable);
+ systemProperties.put(ENABLE_ID_TABLE_LOG_FILE, enableIdTableLogFile);
}
/**
@@ -339,6 +345,18 @@ public class IoTDBConfigCheck {
if (!(properties.getProperty(TIME_ENCODER_KEY).equals(timeEncoderValue))) {
throwException(TIME_ENCODER_KEY, timeEncoderValue);
}
+
+ if (!(properties.getProperty(TIME_ENCODER_KEY).equals(timeEncoderValue))) {
+ throwException(TIME_ENCODER_KEY, timeEncoderValue);
+ }
+
+ if (!(properties.getProperty(ENABLE_ID_TABLE).equals(enableIDTable))) {
+ throwException(ENABLE_ID_TABLE, enableIDTable);
+ }
+
+ if (!(properties.getProperty(ENABLE_ID_TABLE_LOG_FILE).equals(enableIdTableLogFile))) {
+ throwException(ENABLE_ID_TABLE_LOG_FILE, enableIdTableLogFile);
+ }
}
private void throwException(String parameter, Object badValue) throws ConfigurationException {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
index 906ead9..8324cdd 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.engine.compaction;
import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.inner.utils.MultiTsFileDeviceIterator;
import org.apache.iotdb.db.engine.compaction.writer.AbstractCompactionWriter;
import org.apache.iotdb.db.engine.compaction.writer.CrossSpaceCompactionWriter;
@@ -32,6 +33,7 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.metadata.idtable.IDTableManager;
import org.apache.iotdb.db.metadata.path.AlignedPath;
import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
@@ -124,10 +126,13 @@ public class CompactionUtils {
Set<String> allMeasurements = alignedMeasurementIterator.getAllMeasurements();
List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
for (String measurement : allMeasurements) {
- // TODO: use IDTable
try {
- measurementSchemas.add(
- IoTDB.metaManager.getSeriesSchema(new PartialPath(device, measurement)));
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableIDTable()) {
+ measurementSchemas.add(IDTableManager.getInstance().getSeriesSchema(device, measurement));
+ } else {
+ measurementSchemas.add(
+ IoTDB.metaManager.getSeriesSchema(new PartialPath(device, measurement)));
+ }
} catch (PathNotExistException e) {
logger.info("A deleted path is skipped: {}", e.getMessage());
}
@@ -173,8 +178,12 @@ public class CompactionUtils {
for (String measurement : allMeasurements) {
List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
try {
- measurementSchemas.add(
- IoTDB.metaManager.getSeriesSchema(new PartialPath(device, measurement)));
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableIDTable()) {
+ measurementSchemas.add(IDTableManager.getInstance().getSeriesSchema(device, measurement));
+ } else {
+ measurementSchemas.add(
+ IoTDB.metaManager.getSeriesSchema(new PartialPath(device, measurement)));
+ }
} catch (PathNotExistException e) {
logger.info("A deleted path is skipped: {}", e.getMessage());
continue;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/manage/CrossSpaceCompactionResource.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/manage/CrossSpaceCompactionResource.java
index a19fef7..7ced2fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/manage/CrossSpaceCompactionResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/cross/rewrite/manage/CrossSpaceCompactionResource.java
@@ -21,9 +21,7 @@ package org.apache.iotdb.db.engine.compaction.cross.rewrite.manage;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
@@ -111,10 +109,6 @@ public class CrossSpaceCompactionResource {
chunkWriterCache.clear();
}
- public IMeasurementSchema getSchema(PartialPath path) throws MetadataException {
- return IoTDB.metaManager.getSeriesSchema(path);
- }
-
/**
* Construct the a new or get an existing TsFileSequenceReader of a TsFile.
*
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
index 91f5a9b..0fef121 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.metadata.idtable.IDTableManager;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.service.IoTDB;
@@ -120,7 +121,12 @@ public class InnerSpaceCompactionUtils {
LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList =
seriesIterator.getMetadataListForCurrentSeries();
try {
- measurementSchema = IoTDB.metaManager.getSeriesSchema(p);
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableIDTable()) {
+ measurementSchema =
+ IDTableManager.getInstance().getSeriesSchema(device, p.getMeasurement());
+ } else {
+ measurementSchema = IoTDB.metaManager.getSeriesSchema(p);
+ }
} catch (PathNotExistException e) {
logger.info("A deleted path is skipped: {}", e.getMessage());
continue;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java
index 5ab0d47..4c32feb 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTable.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -131,6 +132,15 @@ public interface IDTable {
public DeviceEntry getDeviceEntry(String deviceName);
/**
+ * get schema from device and measurements
+ *
+ * @param deviceName device name of the time series
+ * @param measurementName measurement name of the time series
+ * @return schema entry of the timeseries
+ */
+ public IMeasurementSchema getSeriesSchema(String deviceName, String measurementName);
+
+ /**
* get all device entries
*
* @return all device entries
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java
index fb21776..fb7732c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableHashmapImpl.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.db.utils.TypeInferenceUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -275,6 +276,33 @@ public class IDTableHashmapImpl implements IDTable {
return idTables[slot].get(deviceID);
}
+ /**
+ * get schema from device and measurements
+ *
+ * @param deviceName device name of the time series
+ * @param measurementName measurement name of the time series
+ * @return schema entry of the timeseries
+ */
+ @Override
+ public IMeasurementSchema getSeriesSchema(String deviceName, String measurementName) {
+ DeviceEntry deviceEntry = getDeviceEntry(deviceName);
+ if (deviceEntry == null) {
+ return null;
+ }
+
+ SchemaEntry schemaEntry = deviceEntry.getSchemaEntry(measurementName);
+ if (schemaEntry == null) {
+ return null;
+ }
+
+ // build measurement schema
+ return new MeasurementSchema(
+ measurementName,
+ schemaEntry.getTSDataType(),
+ schemaEntry.getTSEncoding(),
+ schemaEntry.getCompressionType());
+ }
+
@Override
public List<DeviceEntry> getAllDeviceEntry() {
List<DeviceEntry> res = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableManager.java
index 02026b9..c1ff2b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/idtable/IDTableManager.java
@@ -21,10 +21,12 @@ package org.apache.iotdb.db.metadata.idtable;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.tsfile.utils.FilePathUtils;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -94,6 +96,25 @@ public class IDTableManager {
return null;
}
+ /**
+ * get schema from device and measurements
+ *
+ * @param deviceName device name of the time series
+ * @param measurementName measurement name of the time series
+ * @return schema entry of the time series
+ */
+ public synchronized IMeasurementSchema getSeriesSchema(String deviceName, String measurementName)
+ throws MetadataException {
+ for (IDTable idTable : idTableMap.values()) {
+ IMeasurementSchema measurementSchema = idTable.getSeriesSchema(deviceName, measurementName);
+ if (measurementSchema != null) {
+ return measurementSchema;
+ }
+ }
+
+ throw new PathNotExistException(new PartialPath(deviceName, measurementName).toString());
+ }
+
/** clear id table map */
public void clear() throws IOException {
for (IDTable idTable : idTableMap.values()) {