You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ma...@apache.org on 2022/10/27 02:16:44 UTC
[iotdb] branch master updated: [IOTDB-4693] Support broken tsfile rewrite (#7677)
This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new baed39decc [IOTDB-4693] Support broken tsfile rewrite (#7677)
baed39decc is described below
commit baed39decc2ac76e77aac3de7507aa401c74654c
Author: Liu Xuxin <37...@users.noreply.github.com>
AuthorDate: Thu Oct 27 10:16:39 2022 +0800
[IOTDB-4693] Support broken tsfile rewrite (#7677)
---
.../db/integration/IoTDBRewriteTsFileToolIT.java | 523 ++++++++++++++++-
.../java/org/apache/iotdb/RewriteTsFileTool.java | 620 +++++++++++++++------
2 files changed, 932 insertions(+), 211 deletions(-)
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBRewriteTsFileToolIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBRewriteTsFileToolIT.java
index 00175fb202..012f5b6c92 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBRewriteTsFileToolIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBRewriteTsFileToolIT.java
@@ -19,11 +19,14 @@
package org.apache.iotdb.db.integration;
import org.apache.iotdb.RewriteTsFileTool;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
@@ -36,6 +39,7 @@ import java.io.File;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
+import java.sql.SQLException;
import java.sql.Statement;
public class IoTDBRewriteTsFileToolIT {
@@ -55,6 +59,41 @@ public class IoTDBRewriteTsFileToolIT {
@After
public void tearDown() throws Exception {
EnvironmentUtils.cleanEnv();
+ if (tmpDir != null) {
+ FileUtils.deleteDirectory(new File(tmpDir));
+ }
+ }
+
+ public void unload(Statement statement)
+ throws IllegalPathException, SQLException, StorageEngineException {
+ for (TsFileResource resource :
+ StorageEngine.getInstance()
+ .getProcessor(new PartialPath("root.sg"))
+ .getSequenceFileList()) {
+ if (tmpDir == null) {
+ tmpDir =
+ resource
+ .getTsFile()
+ .getParentFile()
+ .getParentFile()
+ .getParentFile()
+ .getParentFile()
+ .getParent()
+ + File.separator
+ + "tmp";
+ File tmpFile = new File(tmpDir);
+ if (!tmpFile.exists()) {
+ tmpFile.mkdirs();
+ }
+ }
+ statement.execute(String.format("unload '%s' '%s'", resource.getTsFilePath(), tmpDir));
+ }
+ for (TsFileResource resource :
+ StorageEngine.getInstance()
+ .getProcessor(new PartialPath("root.sg"))
+ .getUnSequenceFileList()) {
+ statement.execute(String.format("unload '%s' '%s'", resource.getTsFilePath(), tmpDir));
+ }
}
public void prepareTsFiles() throws Exception {
@@ -92,34 +131,7 @@ public class IoTDBRewriteTsFileToolIT {
statement.execute("delete from root.sg.a.s1 where time > 2");
- for (TsFileResource resource :
- StorageEngine.getInstance()
- .getProcessor(new PartialPath("root.sg"))
- .getSequenceFileList()) {
- if (tmpDir == null) {
- tmpDir =
- resource
- .getTsFile()
- .getParentFile()
- .getParentFile()
- .getParentFile()
- .getParentFile()
- .getParent()
- + File.separator
- + "tmp";
- File tmpFile = new File(tmpDir);
- if (!tmpFile.exists()) {
- tmpFile.mkdirs();
- }
- }
- statement.execute(String.format("unload '%s' '%s'", resource.getTsFilePath(), tmpDir));
- }
- for (TsFileResource resource :
- StorageEngine.getInstance()
- .getProcessor(new PartialPath("root.sg"))
- .getUnSequenceFileList()) {
- statement.execute(String.format("unload '%s' '%s'", resource.getTsFilePath(), tmpDir));
- }
+ unload(statement);
}
}
@@ -151,4 +163,459 @@ public class IoTDBRewriteTsFileToolIT {
Assert.fail();
}
}
+
+ @Test
+ public void testAlignedTsFileR() throws Exception {
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ for (int fileIndex = 0; fileIndex < 5; fileIndex++) {
+ for (long timestamp = fileIndex * 512, end = fileIndex * 512 + 512;
+ timestamp < end;
+ ++timestamp) {
+ for (int deviceIndex = 0; deviceIndex < 5; ++deviceIndex) {
+ statement.execute(
+ String.format(
+ "insert into root.sg.d%d(time, s1, s2, s3) aligned values(%d, %d, %d, %d)",
+ deviceIndex, timestamp, timestamp + 1, timestamp + 2, timestamp + 3));
+ }
+ }
+ statement.execute("FLUSH");
+ }
+ unload(statement);
+ }
+ RewriteTsFileTool.main(
+ new String[] {
+ "-h",
+ "127.0.0.1",
+ "-p",
+ "6667",
+ "-u",
+ "root",
+ "-pw",
+ "root",
+ "-f",
+ tmpDir,
+ "-rm",
+ "r",
+ "-ig"
+ });
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ for (long timestamp = 0; timestamp < 512 * 5; ++timestamp) {
+ for (int deviceIndex = 0; deviceIndex < 5; ++deviceIndex) {
+ ResultSet resultSet =
+ statement.executeQuery(
+ String.format(
+ "select s1, s2, s3 from root.sg.d%d where time=%d", deviceIndex, timestamp));
+ Assert.assertTrue(resultSet.next());
+ float s1Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s1");
+ float s2Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s2");
+ float s3Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s3");
+ Assert.assertEquals(s1Val, timestamp + 1, 0.001);
+ Assert.assertEquals(s2Val, timestamp + 2, 0.001);
+ Assert.assertEquals(s3Val, timestamp + 3, 0.001);
+ Assert.assertFalse(resultSet.next());
+ resultSet.close();
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testAlignedTsFileS() throws Exception {
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ for (int fileIndex = 0; fileIndex < 5; fileIndex++) {
+ for (long timestamp = fileIndex * 512, end = fileIndex * 512 + 512;
+ timestamp < end;
+ ++timestamp) {
+ for (int deviceIndex = 0; deviceIndex < 5; ++deviceIndex) {
+ statement.execute(
+ String.format(
+ "insert into root.sg.d%d(time, s1, s2, s3) aligned values(%d, %d, %d, %d)",
+ deviceIndex, timestamp, timestamp + 1, timestamp + 2, timestamp + 3));
+ }
+ }
+ statement.execute("FLUSH");
+ }
+ unload(statement);
+ }
+ RewriteTsFileTool.main(
+ new String[] {
+ "-h",
+ "127.0.0.1",
+ "-p",
+ "6667",
+ "-u",
+ "root",
+ "-pw",
+ "root",
+ "-f",
+ tmpDir,
+ "-rm",
+ "s",
+ "-ig"
+ });
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ for (long timestamp = 0; timestamp < 512 * 5; ++timestamp) {
+ for (int deviceIndex = 0; deviceIndex < 5; ++deviceIndex) {
+ ResultSet resultSet =
+ statement.executeQuery(
+ String.format(
+ "select s1, s2, s3 from root.sg.d%d where time=%d", deviceIndex, timestamp));
+ Assert.assertTrue(resultSet.next());
+ float s1Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s1");
+ float s2Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s2");
+ float s3Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s3");
+ Assert.assertEquals(s1Val, timestamp + 1, 0.001);
+ Assert.assertEquals(s2Val, timestamp + 2, 0.001);
+ Assert.assertEquals(s3Val, timestamp + 3, 0.001);
+ Assert.assertFalse(resultSet.next());
+ resultSet.close();
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testAlignedTsFileWithNullR() throws Exception {
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ for (int fileIndex = 0; fileIndex < 5; fileIndex++) {
+ for (long timestamp = fileIndex * 512, end = fileIndex * 512 + 512;
+ timestamp < end;
+ ++timestamp) {
+ for (int deviceIndex = 0; deviceIndex < 5; ++deviceIndex) {
+ if (timestamp % 3 == 0) {
+ statement.execute(
+ String.format(
+ "insert into root.sg.d%d(time, s2, s3) aligned values(%d, %d, %d)",
+ deviceIndex, timestamp, timestamp + 2, timestamp + 3));
+ } else if (timestamp % 3 == 1) {
+ statement.execute(
+ String.format(
+ "insert into root.sg.d%d(time, s1, s3) aligned values(%d, %d, %d)",
+ deviceIndex, timestamp, timestamp + 1, timestamp + 3));
+ } else {
+ statement.execute(
+ String.format(
+ "insert into root.sg.d%d(time, s1, s2) aligned values(%d, %d, %d)",
+ deviceIndex, timestamp, timestamp + 1, timestamp + 2));
+ }
+ }
+ }
+ statement.execute("FLUSH");
+ }
+ unload(statement);
+ }
+ RewriteTsFileTool.main(
+ new String[] {
+ "-h",
+ "127.0.0.1",
+ "-p",
+ "6667",
+ "-u",
+ "root",
+ "-pw",
+ "root",
+ "-f",
+ tmpDir,
+ "-rm",
+ "r",
+ "-ig"
+ });
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ for (long timestamp = 0; timestamp < 512 * 5; ++timestamp) {
+ for (int deviceIndex = 0; deviceIndex < 5; ++deviceIndex) {
+ ResultSet resultSet =
+ statement.executeQuery(
+ String.format(
+ "select s1, s2, s3 from root.sg.d%d where time=%d", deviceIndex, timestamp));
+ Assert.assertTrue(resultSet.next());
+ float s1Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s1");
+ float s2Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s2");
+ float s3Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s3");
+ if (timestamp % 3 != 0) {
+ Assert.assertEquals(s1Val, timestamp + 1, 0.001);
+ } else {
+ Assert.assertEquals(s1Val, 0, 0.001);
+ }
+ if (timestamp % 3 != 1) {
+ Assert.assertEquals(s2Val, timestamp + 2, 0.001);
+ } else {
+ Assert.assertEquals(s2Val, 0, 0.001);
+ }
+ if (timestamp % 3 != 2) {
+ Assert.assertEquals(s3Val, timestamp + 3, 0.001);
+ } else {
+ Assert.assertEquals(s3Val, 0, 0.001);
+ }
+ Assert.assertFalse(resultSet.next());
+ resultSet.close();
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testAlignedTsFileWithNullS() throws Exception {
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ for (int fileIndex = 0; fileIndex < 5; fileIndex++) {
+ for (long timestamp = fileIndex * 512, end = fileIndex * 512 + 512;
+ timestamp < end;
+ ++timestamp) {
+ for (int deviceIndex = 0; deviceIndex < 5; ++deviceIndex) {
+ if (timestamp % 3 == 0) {
+ statement.execute(
+ String.format(
+ "insert into root.sg.d%d(time, s2, s3) aligned values(%d, %d, %d)",
+ deviceIndex, timestamp, timestamp + 2, timestamp + 3));
+ } else if (timestamp % 3 == 1) {
+ statement.execute(
+ String.format(
+ "insert into root.sg.d%d(time, s1, s3) aligned values(%d, %d, %d)",
+ deviceIndex, timestamp, timestamp + 1, timestamp + 3));
+ } else {
+ statement.execute(
+ String.format(
+ "insert into root.sg.d%d(time, s1, s2) aligned values(%d, %d, %d)",
+ deviceIndex, timestamp, timestamp + 1, timestamp + 2));
+ }
+ }
+ }
+ statement.execute("FLUSH");
+ }
+ unload(statement);
+ }
+ RewriteTsFileTool.main(
+ new String[] {
+ "-h",
+ "127.0.0.1",
+ "-p",
+ "6667",
+ "-u",
+ "root",
+ "-pw",
+ "root",
+ "-f",
+ tmpDir,
+ "-rm",
+ "s",
+ "-ig"
+ });
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ for (long timestamp = 0; timestamp < 512 * 5; ++timestamp) {
+ for (int deviceIndex = 0; deviceIndex < 5; ++deviceIndex) {
+ ResultSet resultSet =
+ statement.executeQuery(
+ String.format(
+ "select s1, s2, s3 from root.sg.d%d where time=%d", deviceIndex, timestamp));
+ Assert.assertTrue(resultSet.next());
+ float s1Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s1");
+ float s2Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s2");
+ float s3Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s3");
+ if (timestamp % 3 != 0) {
+ Assert.assertEquals(s1Val, timestamp + 1, 0.001);
+ } else {
+ Assert.assertEquals(s1Val, 0, 0.001);
+ }
+ if (timestamp % 3 != 1) {
+ Assert.assertEquals(s2Val, timestamp + 2, 0.001);
+ } else {
+ Assert.assertEquals(s2Val, 0, 0.001);
+ }
+ if (timestamp % 3 != 2) {
+ Assert.assertEquals(s3Val, timestamp + 3, 0.001);
+ } else {
+ Assert.assertEquals(s3Val, 0, 0.001);
+ }
+ Assert.assertFalse(resultSet.next());
+ resultSet.close();
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testSimpleTsFileWithDeletionR() throws Exception {
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ for (int fileIndex = 0; fileIndex < 5; fileIndex++) {
+ for (long timestamp = fileIndex * 512, end = fileIndex * 512 + 512;
+ timestamp < end;
+ ++timestamp) {
+ for (int deviceIndex = 0; deviceIndex < 5; ++deviceIndex) {
+ statement.execute(
+ String.format(
+ "insert into root.sg.d%d(time, s1, s2, s3) aligned values(%d, %d, %d, %d)",
+ deviceIndex, timestamp, timestamp + 1, timestamp + 2, timestamp + 3));
+ }
+ }
+ statement.execute("FLUSH");
+ }
+ }
+ long deleteStart = 1024, deleteEnd = 2048;
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ for (int deviceIndex = 0; deviceIndex < 5; ++deviceIndex) {
+ statement.execute(
+ "delete from root.sg.d"
+ + deviceIndex
+ + ".** where time >= "
+ + deleteStart
+ + " and time <= "
+ + deleteEnd);
+ }
+ unload(statement);
+ }
+ RewriteTsFileTool.main(
+ new String[] {
+ "-h",
+ "127.0.0.1",
+ "-p",
+ "6667",
+ "-u",
+ "root",
+ "-pw",
+ "root",
+ "-f",
+ tmpDir,
+ "-rm",
+ "r",
+ "-ig"
+ });
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ for (long timestamp = 0; timestamp < 512 * 5; ++timestamp) {
+ for (int deviceIndex = 0; deviceIndex < 5; ++deviceIndex) {
+ ResultSet resultSet =
+ statement.executeQuery(
+ String.format(
+ "select s1, s2, s3 from root.sg.d%d where time=%d", deviceIndex, timestamp));
+ if (timestamp >= deleteStart && timestamp <= deleteEnd) {
+ Assert.assertFalse(resultSet.next());
+ continue;
+ }
+ Assert.assertTrue(resultSet.next());
+ float s1Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s1");
+ float s2Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s2");
+ float s3Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s3");
+ Assert.assertEquals(s1Val, timestamp + 1, 0.001);
+ Assert.assertEquals(s2Val, timestamp + 2, 0.001);
+ Assert.assertEquals(s3Val, timestamp + 3, 0.001);
+ Assert.assertFalse(resultSet.next());
+ resultSet.close();
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testSimpleTsFileWithDeletionS() throws Exception {
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ for (int fileIndex = 0; fileIndex < 5; fileIndex++) {
+ for (long timestamp = fileIndex * 512, end = fileIndex * 512 + 512;
+ timestamp < end;
+ ++timestamp) {
+ for (int deviceIndex = 0; deviceIndex < 5; ++deviceIndex) {
+ statement.execute(
+ String.format(
+ "insert into root.sg.d%d(time, s1, s2, s3) aligned values(%d, %d, %d, %d)",
+ deviceIndex, timestamp, timestamp + 1, timestamp + 2, timestamp + 3));
+ }
+ }
+ statement.execute("FLUSH");
+ }
+ }
+ long deleteStart = 1024, deleteEnd = 2048;
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ for (int deviceIndex = 0; deviceIndex < 5; ++deviceIndex) {
+ statement.execute(
+ "delete from root.sg.d"
+ + deviceIndex
+ + ".** where time >= "
+ + deleteStart
+ + " and time <= "
+ + deleteEnd);
+ }
+ unload(statement);
+ }
+ RewriteTsFileTool.main(
+ new String[] {
+ "-h",
+ "127.0.0.1",
+ "-p",
+ "6667",
+ "-u",
+ "root",
+ "-pw",
+ "root",
+ "-f",
+ tmpDir,
+ "-rm",
+ "s",
+ "-ig"
+ });
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ for (long timestamp = 0; timestamp < 512 * 5; ++timestamp) {
+ for (int deviceIndex = 0; deviceIndex < 5; ++deviceIndex) {
+ ResultSet resultSet =
+ statement.executeQuery(
+ String.format(
+ "select s1, s2, s3 from root.sg.d%d where time=%d", deviceIndex, timestamp));
+ if (timestamp >= deleteStart && timestamp <= deleteEnd) {
+ Assert.assertFalse(resultSet.next());
+ continue;
+ }
+ Assert.assertTrue(resultSet.next());
+ float s1Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s1");
+ float s2Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s2");
+ float s3Val = resultSet.getFloat("root.sg.d" + deviceIndex + ".s3");
+ Assert.assertEquals(s1Val, timestamp + 1, 0.001);
+ Assert.assertEquals(s2Val, timestamp + 2, 0.001);
+ Assert.assertEquals(s3Val, timestamp + 3, 0.001);
+ Assert.assertFalse(resultSet.next());
+ resultSet.close();
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testWriteAlignedTsFileWithDeletion() {}
}
diff --git a/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java b/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java
index 4b1d47976d..f403f60bfd 100644
--- a/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java
+++ b/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java
@@ -27,42 +27,38 @@ import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
+import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
import org.apache.iotdb.tsfile.exception.write.NoMeasurementException;
import org.apache.iotdb.tsfile.file.MetaMarker;
import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
+import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.TsFileAlignedSeriesReaderIterator;
+import org.apache.iotdb.tsfile.read.TsFileCheckStatus;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
-import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.IBatchDataIterator;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.controller.CachedChunkLoaderImpl;
-import org.apache.iotdb.tsfile.read.controller.IChunkLoader;
-import org.apache.iotdb.tsfile.read.controller.IMetadataQuerier;
-import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
-import org.apache.iotdb.tsfile.read.query.dataset.DataSetWithoutTimeGenerator;
-import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.read.reader.IChunkReader;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.read.reader.chunk.AlignedChunkReader;
import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
-import org.apache.iotdb.tsfile.read.reader.series.AbstractFileSeriesReader;
-import org.apache.iotdb.tsfile.read.reader.series.EmptyFileSeriesReader;
-import org.apache.iotdb.tsfile.read.reader.series.FileSeriesReader;
+import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.read.reader.page.TimePageReader;
+import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader;
import org.apache.iotdb.tsfile.utils.FilePathUtils;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -79,12 +75,13 @@ import org.apache.commons.cli.ParseException;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -99,8 +96,7 @@ public class RewriteTsFileTool {
private static String password = "root";
private static String filePath = "";
private static String readMode = "s";
-
- private static Map<String, Set<MeasurementSchema>> device2Measurements;
+ private static boolean ignoreBrokenChunk = false;
public static void main(String[] args) {
Session session = null;
@@ -138,6 +134,7 @@ public class RewriteTsFileTool {
password = getArgOrDefault(commandLine, "pw", password);
filePath = getArgOrDefault(commandLine, "f", filePath);
readMode = getArgOrDefault(commandLine, "rm", readMode);
+ ignoreBrokenChunk = commandLine.hasOption("ig");
} catch (ParseException e) {
System.out.printf("Parse Args Error. %s%n", e.getMessage());
priHelp(options);
@@ -203,6 +200,8 @@ public class RewriteTsFileTool {
.required()
.build();
options.addOption(readModeOpt);
+
+ options.addOption("ig", "ignore-broken chunks");
return options;
}
@@ -231,27 +230,10 @@ public class RewriteTsFileTool {
*/
public static void writeToIoTDB(List<File> files, Session session) {
sortTsFiles(files);
- int size = files.size();
- List<File> unloadTsFiles = new ArrayList<>();
- System.out.printf("Collect TsFiles successfully, %d files to be loaded.%n", size);
+ System.out.printf("Collect TsFiles successfully, %d files to be loaded.%n", files.size());
System.out.println("Start Loading TsFiles...");
if (readMode.equals("s")) {
- for (int i = 0; i < size; i++) {
- File file = files.get(i);
- System.out.printf("Loading %s(%d/%d)...", file.getPath(), i + 1, size);
- try {
- seqWriteTsFile(file.getPath(), session);
- } catch (Exception e) {
- System.out.println(
- "------------------------------Error Message------------------------------");
- e.printStackTrace();
- System.out.println(
- "------------------------------End Message------------------------------");
- unloadTsFiles.add(file);
- continue;
- }
- System.out.println("Done");
- }
+ writeTsFileSequentially(files, session);
} else {
try {
reverseWriteTsFile(files, session);
@@ -268,6 +250,28 @@ public class RewriteTsFileTool {
}
}
System.out.println("Finish Loading TsFiles");
+ }
+
+ private static void writeTsFileSequentially(List<File> files, Session session) {
+ int size = files.size();
+ List<File> unloadTsFiles = new ArrayList<>();
+ for (int i = 0; i < files.size(); i++) {
+ File file = files.get(i);
+ System.out.printf("Loading %s(%d/%d)...", file.getPath(), i + 1, size);
+ try {
+ seqWriteSingleTsFile(file.getPath(), session);
+ session.executeNonQueryStatement("FLUSH");
+ } catch (Exception e) {
+ System.out.println(
+ "------------------------------Error Message------------------------------");
+ e.printStackTrace();
+ System.out.println(
+ "------------------------------End Message------------------------------");
+ unloadTsFiles.add(file);
+ continue;
+ }
+ System.out.println("Done");
+ }
System.out.printf(
"Load %d TsFiles successfully, %d TsFiles not loaded.%n",
size - unloadTsFiles.size(), unloadTsFiles.size());
@@ -306,104 +310,31 @@ public class RewriteTsFileTool {
* @param filename the file path to be loaded
* @param session IoTDB session
*/
- public static void seqWriteTsFile(String filename, Session session)
+ public static void seqWriteSingleTsFile(String filename, Session session)
throws IOException, IllegalPathException, IoTDBConnectionException,
StatementExecutionException, NoMeasurementException {
- // parse modifications from .mods
- List<Modification> modifications = null;
- if (FSFactoryProducer.getFSFactory()
- .getFile(filename + ModificationFile.FILE_SUFFIX)
- .exists()) {
- modifications =
- (List<Modification>)
- new ModificationFile(filename + ModificationFile.FILE_SUFFIX).getModifications();
- }
-
- // read all device and their measurements
- parseDeviceFromTsFile(filename);
try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) {
- for (Map.Entry<String, Set<MeasurementSchema>> entry : device2Measurements.entrySet()) {
- // collect measurements for device
- boolean isAligned = false;
- String curDevice = entry.getKey();
- List<MeasurementSchema> measurementSchemas = new ArrayList<>();
- ArrayList<Path> paths = new ArrayList<>();
- for (MeasurementSchema measurementSchema : entry.getValue()) {
- if (!measurementSchema.getType().equals(TSDataType.VECTOR)) {
- measurementSchemas.add(measurementSchema);
- } else {
- isAligned = true;
- }
- }
- for (MeasurementSchema measurementSchema : measurementSchemas) {
- paths.add(new Path(curDevice, measurementSchema.getMeasurementId()));
- }
-
- // construct query to this tsfile
- List<AbstractFileSeriesReader> readersOfSelectedSeries = new ArrayList<>();
- List<TSDataType> dataTypes = new ArrayList<>();
- IMetadataQuerier metadataQuerier = new MetadataQuerierByFileImpl(reader);
- IChunkLoader chunkLoader = new CachedChunkLoaderImpl(reader);
- for (Path path : paths) {
- List<IChunkMetadata> chunkMetadataList = metadataQuerier.getChunkMetaDataList(path);
- modifyChunkMetadata(isAligned, path, chunkMetadataList, modifications);
- AbstractFileSeriesReader seriesReader;
- if (chunkMetadataList.isEmpty()) {
- seriesReader = new EmptyFileSeriesReader();
- dataTypes.add(metadataQuerier.getDataType(path));
- } else {
- seriesReader = new FileSeriesReader(chunkLoader, chunkMetadataList, null);
- dataTypes.add(chunkMetadataList.get(0).getDataType());
- }
- readersOfSelectedSeries.add(seriesReader);
- }
-
- // read data from tsfile and construct session to send to IoTDB
- QueryDataSet dataSet =
- new DataSetWithoutTimeGenerator(paths, dataTypes, readersOfSelectedSeries);
- Tablet tablet = new Tablet(curDevice, measurementSchemas, MAX_TABLET_LENGTH);
- tablet.initBitMaps();
- int measurementSize = measurementSchemas.size();
- while (dataSet.hasNext()) {
- RowRecord rowRecord = dataSet.next();
- tablet.addTimestamp(tablet.rowSize, rowRecord.getTimestamp());
- for (int i = 0; i < measurementSize; i++) {
- Field field = rowRecord.getFields().get(i);
- if (field == null) {
- tablet.bitMaps[i].mark(tablet.rowSize);
- } else {
- tablet.addValue(
- measurementSchemas.get(i).getMeasurementId(),
- tablet.rowSize,
- field.getObjectValue(field.getDataType()));
- }
- }
- tablet.rowSize++;
- if (tablet.rowSize == MAX_TABLET_LENGTH) {
- if (isAligned) {
- session.insertAlignedTablet(tablet);
- } else {
- session.insertTablet(tablet);
- }
- tablet.reset();
- }
- }
- if (isAligned) {
- session.insertAlignedTablet(tablet);
- } else {
- session.insertTablet(tablet);
+ if (!ignoreBrokenChunk) {
+ long status = reader.selfCheck(new HashMap<>(), new ArrayList<>(), true);
+ if (status == TsFileCheckStatus.INCOMPATIBLE_FILE
+ || status == TsFileCheckStatus.FILE_EXISTS_MISTAKES) {
+ throw new IOException(
+ String.format(
+ "The file %s is incompatible, cannot rewrite it to IoTDB. If you want to rewrite "
+ + "all the good chunks in the file, retry with option -ignore-broken.",
+ filename));
}
}
- }
- }
-
- private static void parseDeviceFromTsFile(String filename) throws IOException {
- device2Measurements = new HashMap<>();
- try (TsFileSequenceReader reader = new TsFileSequenceReader(filename)) {
reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1);
- String curDevice = null;
+ List<long[]> timeBatch = new ArrayList<>();
+ int pageIndex = 0;
byte marker;
+ String currentDevice = null;
+ boolean isAlignedChunk = false;
+ List<List<TsPrimitiveType>> valueForAlignedSeries = new ArrayList<>();
+ List<Long> timeForAlignedSeries = new ArrayList<>();
+ List<MeasurementSchema> schemaForAlignedSeries = new ArrayList<>();
while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) {
switch (marker) {
case MetaMarker.CHUNK_HEADER:
@@ -413,60 +344,316 @@ public class RewriteTsFileTool {
case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
ChunkHeader header = reader.readChunkHeader(marker);
- MeasurementSchema measurementSchema =
- new MeasurementSchema(
- header.getMeasurementID(),
- header.getDataType(),
- header.getEncodingType(),
- header.getCompressionType());
- device2Measurements
- .computeIfAbsent(curDevice, o -> new HashSet<>())
- .add(measurementSchema);
- reader.position(reader.position() + header.getDataSize());
+ if (header.getDataSize() == 0) {
+ // empty value chunk
+ break;
+ }
+ Decoder defaultTimeDecoder =
+ Decoder.getDecoderByType(
+ TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+ TSDataType.INT64);
+ Decoder valueDecoder =
+ Decoder.getDecoderByType(header.getEncodingType(), header.getDataType());
+ int dataSize = header.getDataSize();
+ pageIndex = 0;
+ if (header.getDataType() == TSDataType.VECTOR) {
+ timeBatch.clear();
+ }
+ boolean addSchema = false;
+ List<TsPrimitiveType> valueList = new ArrayList<>();
+ while (dataSize > 0) {
+ valueDecoder.reset();
+ PageHeader pageHeader =
+ reader.readPageHeader(
+ header.getDataType(),
+ (header.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER);
+ ByteBuffer pageData = reader.readPage(pageHeader, header.getCompressionType());
+ if ((header.getChunkType() & (byte) TsFileConstant.TIME_COLUMN_MASK)
+ == (byte) TsFileConstant.TIME_COLUMN_MASK) { // Time Chunk
+ readAndSendTimePage(
+ addSchema,
+ schemaForAlignedSeries,
+ header,
+ pageHeader,
+ pageData,
+ defaultTimeDecoder,
+ timeBatch,
+ pageIndex,
+ timeForAlignedSeries);
+ addSchema = true;
+ isAlignedChunk = true;
+ } else if ((header.getChunkType() & (byte) TsFileConstant.VALUE_COLUMN_MASK)
+ == (byte) TsFileConstant.VALUE_COLUMN_MASK) { // Value Chunk
+ readAndSendValuePage(
+ addSchema,
+ schemaForAlignedSeries,
+ header,
+ pageHeader,
+ pageData,
+ valueDecoder,
+ timeBatch,
+ pageIndex,
+ valueList);
+ addSchema = true;
+ } else { // NonAligned Chunk
+ readAndSendSingleSeriesPage(
+ currentDevice, header, pageData, valueDecoder, defaultTimeDecoder, session);
+ }
+ pageIndex++;
+ dataSize -= pageHeader.getSerializedPageSize();
+ }
+ if (isAlignedChunk && header.getDataType() != TSDataType.VECTOR) {
+ valueForAlignedSeries.add(valueList);
+ }
break;
case MetaMarker.CHUNK_GROUP_HEADER:
+ // get the next chunk group
+ if (isAlignedChunk) {
+ Tablet tablet = new Tablet(currentDevice, schemaForAlignedSeries, MAX_TABLET_LENGTH);
+ for (int i = 0; i < timeForAlignedSeries.size(); ++i) {
+ tablet.addTimestamp(tablet.rowSize, timeForAlignedSeries.get(i));
+ for (int j = 0; j < valueForAlignedSeries.size(); ++j) {
+ if (valueForAlignedSeries.get(j).get(i) == null) {
+ continue;
+ }
+ switch (valueForAlignedSeries.get(j).get(i).getDataType()) {
+ case INT32:
+ tablet.addValue(
+ schemaForAlignedSeries.get(j).getMeasurementId(),
+ tablet.rowSize,
+ valueForAlignedSeries.get(j).get(i).getInt());
+ break;
+ case INT64:
+ tablet.addValue(
+ schemaForAlignedSeries.get(j).getMeasurementId(),
+ tablet.rowSize,
+ valueForAlignedSeries.get(j).get(i).getLong());
+ break;
+ case TEXT:
+ tablet.addValue(
+ schemaForAlignedSeries.get(j).getMeasurementId(),
+ tablet.rowSize,
+ valueForAlignedSeries.get(j).get(i).getStringValue());
+ break;
+ case BOOLEAN:
+ tablet.addValue(
+ schemaForAlignedSeries.get(j).getMeasurementId(),
+ tablet.rowSize,
+ valueForAlignedSeries.get(j).get(i).getBoolean());
+ break;
+ case FLOAT:
+ tablet.addValue(
+ schemaForAlignedSeries.get(j).getMeasurementId(),
+ tablet.rowSize,
+ valueForAlignedSeries.get(j).get(i).getFloat());
+ break;
+ case DOUBLE:
+ tablet.addValue(
+ schemaForAlignedSeries.get(j).getMeasurementId(),
+ tablet.rowSize,
+ valueForAlignedSeries.get(j).get(i).getDouble());
+ break;
+ }
+ }
+ tablet.rowSize++;
+ if (tablet.rowSize >= MAX_TABLET_LENGTH) {
+ session.insertAlignedTablet(tablet);
+ tablet.reset();
+ }
+ }
+ if (tablet.rowSize >= 0) {
+ session.insertAlignedTablet(tablet);
+ tablet.reset();
+ }
+ timeForAlignedSeries.clear();
+ valueForAlignedSeries.clear();
+ schemaForAlignedSeries.clear();
+ }
+ isAlignedChunk = false;
ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader();
- curDevice = chunkGroupHeader.getDeviceID();
+ currentDevice = chunkGroupHeader.getDeviceID();
break;
case MetaMarker.OPERATION_INDEX_RANGE:
reader.readPlanIndex();
break;
default:
- MetaMarker.handleUnexpectedMarker(marker);
+ System.out.printf(
+ "Cannot handle marker %d in position %d, stop reading %s%n",
+ marker, reader.position(), filename);
}
}
+
+ if (isAlignedChunk) {
+ Tablet tablet = new Tablet(currentDevice, schemaForAlignedSeries, MAX_TABLET_LENGTH);
+ for (int i = 0; i < timeForAlignedSeries.size(); ++i) {
+ tablet.addTimestamp(tablet.rowSize, timeForAlignedSeries.get(i));
+ for (int j = 0; j < valueForAlignedSeries.size(); ++j) {
+ if (valueForAlignedSeries.get(j).get(i) == null) {
+ continue;
+ }
+ switch (valueForAlignedSeries.get(j).get(i).getDataType()) {
+ case INT32:
+ tablet.addValue(
+ schemaForAlignedSeries.get(j).getMeasurementId(),
+ tablet.rowSize,
+ valueForAlignedSeries.get(j).get(i).getInt());
+ break;
+ case INT64:
+ tablet.addValue(
+ schemaForAlignedSeries.get(j).getMeasurementId(),
+ tablet.rowSize,
+ valueForAlignedSeries.get(j).get(i).getLong());
+ break;
+ case TEXT:
+ tablet.addValue(
+ schemaForAlignedSeries.get(j).getMeasurementId(),
+ tablet.rowSize,
+ valueForAlignedSeries.get(j).get(i).getStringValue());
+ break;
+ case BOOLEAN:
+ tablet.addValue(
+ schemaForAlignedSeries.get(j).getMeasurementId(),
+ tablet.rowSize,
+ valueForAlignedSeries.get(j).get(i).getBoolean());
+ break;
+ case FLOAT:
+ tablet.addValue(
+ schemaForAlignedSeries.get(j).getMeasurementId(),
+ tablet.rowSize,
+ valueForAlignedSeries.get(j).get(i).getFloat());
+ break;
+ case DOUBLE:
+ tablet.addValue(
+ schemaForAlignedSeries.get(j).getMeasurementId(),
+ tablet.rowSize,
+ valueForAlignedSeries.get(j).get(i).getDouble());
+ break;
+ }
+ }
+ tablet.rowSize++;
+ if (tablet.rowSize >= MAX_TABLET_LENGTH) {
+ session.insertAlignedTablet(tablet);
+ tablet.reset();
+ }
+ }
+ if (tablet.rowSize >= 0) {
+ session.insertAlignedTablet(tablet);
+ tablet.reset();
+ }
+ timeForAlignedSeries.clear();
+ valueForAlignedSeries.clear();
+ schemaForAlignedSeries.clear();
+ }
}
+
+ writeModification(filename, session);
}
- private static void modifyChunkMetadata(
- boolean isAligned,
- Path path,
- List<IChunkMetadata> chunkMetadataList,
- List<Modification> modifications)
- throws IllegalPathException {
- if (modifications == null || modifications.isEmpty()) {
- return;
+ private static void readAndSendValuePage(
+ boolean addSchema,
+ List<MeasurementSchema> schemaForAlignedSeries,
+ ChunkHeader header,
+ PageHeader pageHeader,
+ ByteBuffer pageData,
+ Decoder valueDecoder,
+ List<long[]> timeBatch,
+ int pageIndex,
+ List<TsPrimitiveType> valueList) {
+ if (!addSchema && header.getDataType() != TSDataType.VECTOR) {
+ schemaForAlignedSeries.add(
+ new MeasurementSchema(
+ header.getMeasurementID(),
+ header.getDataType(),
+ header.getEncodingType(),
+ header.getCompressionType()));
}
- List<Modification> measurementModifications = new ArrayList<>();
- Iterator<Modification> modsIterator = modifications.listIterator();
- Deletion currentDeletion;
- while (modsIterator.hasNext()) {
- currentDeletion = (Deletion) modsIterator.next();
- // if deletion path match the chunkPath, then add the deletion to the list
- if (currentDeletion.getPath().matchFullPath(new PartialPath(path.getFullPath()))) {
- measurementModifications.add(currentDeletion);
+ ValuePageReader valuePageReader =
+ new ValuePageReader(pageHeader, pageData, header.getDataType(), valueDecoder);
+ TsPrimitiveType[] valueBatch = valuePageReader.nextValueBatch(timeBatch.get(pageIndex));
+ valueList.addAll(Arrays.asList(valueBatch));
+ }
+
+ private static void readAndSendTimePage(
+ boolean addSchema,
+ List<MeasurementSchema> schemaForAlignedSeries,
+ ChunkHeader header,
+ PageHeader pageHeader,
+ ByteBuffer pageData,
+ Decoder defaultTimeDecoder,
+ List<long[]> timeBatch,
+ int pageIndex,
+ List<Long> timeForAlignedSeries)
+ throws IOException {
+ if (!addSchema && header.getDataType() != TSDataType.VECTOR) {
+ schemaForAlignedSeries.add(
+ new MeasurementSchema(
+ header.getMeasurementID(),
+ header.getDataType(),
+ header.getEncodingType(),
+ header.getCompressionType()));
+ }
+ TimePageReader timePageReader = new TimePageReader(pageHeader, pageData, defaultTimeDecoder);
+ timeBatch.add(timePageReader.getNextTimeBatch());
+ for (int i = 0; i < timeBatch.get(pageIndex).length; i++) {
+ timeForAlignedSeries.add(timeBatch.get(pageIndex)[i]);
+ }
+ }
+
+ private static void readAndSendSingleSeriesPage(
+ String currentDevice,
+ ChunkHeader header,
+ ByteBuffer pageData,
+ Decoder valueDecoder,
+ Decoder defaultTimeDecoder,
+ Session session)
+ throws IOException, IoTDBConnectionException, StatementExecutionException {
+ String measurementId = header.getMeasurementID();
+ Tablet tablet =
+ new Tablet(
+ currentDevice,
+ Collections.singletonList(
+ new MeasurementSchema(
+ measurementId,
+ header.getDataType(),
+ header.getEncodingType(),
+ header.getCompressionType())),
+ MAX_TABLET_LENGTH);
+ PageReader pageReader =
+ new PageReader(pageData, header.getDataType(), valueDecoder, defaultTimeDecoder, null);
+ BatchData batchData = pageReader.getAllSatisfiedPageData();
+ while (batchData.hasCurrent()) {
+ tablet.addTimestamp(tablet.rowSize, batchData.currentTime());
+ tablet.addValue(measurementId, tablet.rowSize++, batchData.currentValue());
+ if (tablet.rowSize >= MAX_TABLET_LENGTH) {
+ session.insertTablet(tablet);
+ tablet.reset();
}
+ batchData.next();
}
- if (!isAligned) {
- QueryUtils.modifyChunkMetaData(chunkMetadataList, measurementModifications);
- } else {
- List<AlignedChunkMetadata> alignedChunkMetadataList = new ArrayList<>();
- for (IChunkMetadata chunkMetadata : chunkMetadataList) {
- alignedChunkMetadataList.add((AlignedChunkMetadata) chunkMetadata);
+ if (tablet.rowSize > 0) {
+ session.insertTablet(tablet);
+ tablet.reset();
+ }
+ }
+
+ private static void writeModification(String filename, Session session)
+ throws IoTDBConnectionException, StatementExecutionException {
+ List<Modification> modifications = null;
+ if (FSFactoryProducer.getFSFactory()
+ .getFile(filename + ModificationFile.FILE_SUFFIX)
+ .exists()) {
+ modifications =
+ (List<Modification>)
+ new ModificationFile(filename + ModificationFile.FILE_SUFFIX).getModifications();
+ for (Modification modification : modifications) {
+ session.executeNonQueryStatement(
+ String.format(
+ "delete from %s.%s where time >= %d and time <= %d",
+ modification.getDevice(),
+ modification.getMeasurement(),
+ ((Deletion) modification).getStartTime(),
+ ((Deletion) modification).getEndTime()));
}
- // AlignedChunk only contains one valueChunkMetadata which is measurement with this path
- QueryUtils.modifyAlignedChunkMetaData(
- alignedChunkMetadataList, Collections.singletonList(measurementModifications));
}
}
@@ -486,26 +673,30 @@ public class RewriteTsFileTool {
StatementExecutionException, NoMeasurementException {
List<TsFileResource> resources = new ArrayList<>();
files.forEach(x -> resources.add(new TsFileResource(x)));
- try (MultiTsFileDeviceIterator deviceIterator = new MultiTsFileDeviceIterator(resources)) {
- while (deviceIterator.hasNextDevice()) {
- Pair<String, Boolean> devicePair = deviceIterator.nextDevice();
- String device = devicePair.left;
- boolean isAligned = devicePair.right;
- if (isAligned) {
- try {
- writeAlignedSeries(device, deviceIterator, session);
- } catch (Throwable t) {
- // this is a broken aligned chunk, skip it
- System.out.println("Skip aligned chunk " + device);
- }
- } else {
- MultiTsFileDeviceIterator.MeasurementIterator seriesIterator =
- deviceIterator.iterateNotAlignedSeries(device, true);
- while (seriesIterator.hasNextSeries()) {
- writeSingleSeries(device, seriesIterator, session);
+ for (TsFileResource resource : resources) {
+ try (MultiTsFileDeviceIterator deviceIterator =
+ new MultiTsFileDeviceIterator(Collections.singletonList(resource))) {
+ while (deviceIterator.hasNextDevice()) {
+ Pair<String, Boolean> devicePair = deviceIterator.nextDevice();
+ String device = devicePair.left;
+ boolean isAligned = devicePair.right;
+ if (isAligned) {
+ try {
+ writeAlignedSeries(device, deviceIterator, session);
+ } catch (Throwable t) {
+ // this is a broken aligned chunk, skip it
+ System.out.println("Skip aligned chunk " + device);
+ }
+ } else {
+ MultiTsFileDeviceIterator.MeasurementIterator seriesIterator =
+ deviceIterator.iterateNotAlignedSeries(device, true);
+ while (seriesIterator.hasNextSeries()) {
+ writeSingleSeries(device, seriesIterator, session);
+ }
}
}
}
+ writeModification(resource.getTsFile().getAbsolutePath(), session);
}
}
@@ -526,6 +717,7 @@ public class RewriteTsFileTool {
writeSingleChunk(device, p, chunkMetadata, reader, session);
} catch (Throwable t) {
// this is a broken chunk, skip it
+ t.printStackTrace();
System.out.printf("Skip broken chunk in device %s.%s%n", device, p.getMeasurement());
}
}
@@ -554,8 +746,33 @@ public class RewriteTsFileTool {
IPointReader batchIterator = chunkReader.nextPageData().getBatchDataIterator();
while (batchIterator.hasNextTimeValuePair()) {
TimeValuePair timeValuePair = batchIterator.nextTimeValuePair();
- tablet.timestamps[tablet.rowSize] = timeValuePair.getTimestamp();
- tablet.values[tablet.rowSize++] = timeValuePair.getValue();
+ tablet.addTimestamp(tablet.rowSize, timeValuePair.getTimestamp());
+ switch (timeValuePair.getValue().getDataType()) {
+ case TEXT:
+ tablet.addValue(
+ p.getMeasurement(), tablet.rowSize++, timeValuePair.getValue().getStringValue());
+ break;
+ case BOOLEAN:
+ tablet.addValue(
+ p.getMeasurement(), tablet.rowSize++, timeValuePair.getValue().getBoolean());
+ break;
+ case DOUBLE:
+ tablet.addValue(
+ p.getMeasurement(), tablet.rowSize++, timeValuePair.getValue().getDouble());
+ break;
+ case FLOAT:
+ tablet.addValue(
+ p.getMeasurement(), tablet.rowSize++, timeValuePair.getValue().getFloat());
+ break;
+ case INT64:
+ tablet.addValue(
+ p.getMeasurement(), tablet.rowSize++, timeValuePair.getValue().getLong());
+ break;
+ case INT32:
+ tablet.addValue(
+ p.getMeasurement(), tablet.rowSize++, timeValuePair.getValue().getInt());
+ break;
+ }
if (tablet.rowSize >= MAX_TABLET_LENGTH) {
session.insertTablet(tablet);
tablet.reset();
@@ -640,9 +857,46 @@ public class RewriteTsFileTool {
IBatchDataIterator batchDataIterator =
alignedChunkReader.nextPageData().getBatchDataIterator();
while (batchDataIterator.hasNext()) {
+ tablet.addTimestamp(tablet.rowSize, batchDataIterator.currentTime());
TsPrimitiveType[] pointsData = (TsPrimitiveType[]) batchDataIterator.currentValue();
- tablet.timestamps[tablet.rowSize] = batchDataIterator.currentTime();
- tablet.values[tablet.rowSize++] = batchDataIterator.currentValue();
+ for (int i = 0; i < schemaList.size(); ++i) {
+ if (pointsData[i] == null) {
+ continue;
+ }
+ switch (pointsData[i].getDataType()) {
+ case INT32:
+ tablet.addValue(
+ schemaList.get(i).getMeasurementId(), tablet.rowSize, pointsData[i].getInt());
+ break;
+ case INT64:
+ tablet.addValue(
+ schemaList.get(i).getMeasurementId(), tablet.rowSize, pointsData[i].getLong());
+ break;
+ case FLOAT:
+ tablet.addValue(
+ schemaList.get(i).getMeasurementId(), tablet.rowSize, pointsData[i].getFloat());
+ break;
+ case DOUBLE:
+ tablet.addValue(
+ schemaList.get(i).getMeasurementId(),
+ tablet.rowSize,
+ pointsData[i].getDouble());
+ break;
+ case BOOLEAN:
+ tablet.addValue(
+ schemaList.get(i).getMeasurementId(),
+ tablet.rowSize,
+ pointsData[i].getBoolean());
+ break;
+ case TEXT:
+ tablet.addValue(
+ schemaList.get(i).getMeasurementId(),
+ tablet.rowSize,
+ pointsData[i].getStringValue());
+ break;
+ }
+ }
+ tablet.rowSize++;
batchDataIterator.next();
if (tablet.rowSize >= MAX_TABLET_LENGTH) {
session.insertAlignedTablet(tablet);