You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2020/12/05 09:52:55 UTC
[iotdb] branch master updated: [ISSUE-2187] Fix compaction encoding
bug after deserialization merge (#2188)
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e652563 [ISSUE-2187] Fix compaction encoding bug after deserialization merge (#2188)
e652563 is described below
commit e6525637c9956133301408a9db9b45733f6ac23e
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Sat Dec 5 17:52:35 2020 +0800
[ISSUE-2187] Fix compaction encoding bug after deserialization merge (#2188)
* fix compaction bug after write data by deserialize merge
---
.../engine/compaction/utils/CompactionUtils.java | 21 ++++++---
.../iotdb/db/integration/IoTDBCompactionIT.java | 55 ++++++++++++++++++++++
2 files changed, 69 insertions(+), 7 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index 9e34b61..0f76436 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -36,6 +36,9 @@ import java.util.TreeMap;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
@@ -47,7 +50,6 @@ import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -141,8 +143,13 @@ public class CompactionUtils {
if (chunkMetadataList.isEmpty()) {
return maxVersion;
}
- IChunkWriter chunkWriter = new ChunkWriterImpl(
- new MeasurementSchema(entry.getKey(), chunkMetadataList.get(0).getDataType()), true);
+ IChunkWriter chunkWriter;
+ try {
+ chunkWriter = new ChunkWriterImpl(
+ IoTDB.metaManager.getSeriesSchema(new PartialPath(device), entry.getKey()), true);
+ } catch (MetadataException e) {
+ throw new IOException(e);
+ }
for (TimeValuePair timeValuePair : timeValuePairMap.values()) {
writeTVPair(timeValuePair, chunkWriter);
targetResource.updateStartTime(device, timeValuePair.getTimestamp());
@@ -172,11 +179,11 @@ public class CompactionUtils {
}
/**
- * @param targetResource the target resource to be merged to
- * @param tsFileResources the source resource to be merged
- * @param storageGroup the storage group name
+ * @param targetResource the target resource to be merged to
+ * @param tsFileResources the source resource to be merged
+ * @param storageGroup the storage group name
* @param compactionLogger the logger
- * @param devices the devices to be skipped(used by recover)
+ * @param devices the devices to be skipped(used by recover)
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public static void merge(TsFileResource targetResource,
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCompactionIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCompactionIT.java
index d3cc788..17265ce 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCompactionIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBCompactionIT.java
@@ -25,6 +25,7 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Random;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
@@ -96,4 +97,58 @@ public class IoTDBCompactionIT {
}
}
+ @Test
+ public void testAppendMergeAfterDeserializeMerge() throws SQLException {
+ boolean prevEnableUnseqCompaction = IoTDBDescriptor.getInstance().getConfig().isEnableUnseqCompaction();
+ IoTDBDescriptor.getInstance().getConfig().setEnableUnseqCompaction(false);
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ statement.execute("SET STORAGE GROUP TO root.compactionTest");
+ try {
+ statement.execute("CREATE TIMESERIES root.compactionTest.s1 WITH DATATYPE=INT64");
+ } catch (SQLException e) {
+ // ignore
+ }
+
+ long pageSize=100;
+ long timestamp = 1;
+
+ for (long row = 0; row < 10000; row++) {
+ statement
+ .execute(
+ String.format("INSERT INTO root.compactionTest(timestamp,s1) VALUES (%d,%d)", timestamp, 1));
+ if (row % pageSize == 0) {
+ statement.execute("FLUSH");
+ }
+ timestamp++;
+ }
+
+ timestamp = 8322;
+
+ for (long row = 0; row < 2400; row++) {
+ statement
+ .execute(
+ String.format("INSERT INTO root.compactionTest(timestamp,s1) VALUES (%d,%d)", timestamp, 1));
+ if (row % pageSize == 0) {
+ statement.execute("FLUSH");
+ }
+ timestamp++;
+ }
+
+ int cnt;
+ try (ResultSet resultSet = statement.executeQuery("SELECT COUNT(s1) FROM root.compactionTest")) {
+ cnt = 0;
+ while (resultSet.next()) {
+ System.out.println(resultSet.getLong(1));
+ assertEquals(10721, resultSet.getLong(1));
+ cnt++;
+ }
+ }
+ assertEquals(1, cnt);
+ }
+ IoTDBDescriptor.getInstance().getConfig().setEnableUnseqCompaction(prevEnableUnseqCompaction);
+ }
+
+
}