You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/09/23 08:14:54 UTC
[iotdb] branch master updated: [IOTDB-1694] change name for loading
dir and add an IT for loading dir (#3985)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ceafab1 [IOTDB-1694] change name for loading dir and add an IT for loading dir (#3985)
ceafab1 is described below
commit ceafab1436c4aef888ac49d1160caad0eea7b03d
Author: yschengzi <87...@users.noreply.github.com>
AuthorDate: Thu Sep 23 16:14:30 2021 +0800
[IOTDB-1694] change name for loading dir and add an IT for loading dir (#3985)
---
.../level/LevelCompactionTsFileManagement.java | 2 +-
.../engine/storagegroup/StorageGroupProcessor.java | 25 ++--
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 32 ++++-
...IoTDBLoadExternalTsFileWithTimePartitionIT.java | 137 +++++++++++++++++++--
4 files changed, 166 insertions(+), 30 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index cc4566c..03c14ba 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -193,7 +193,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
List<TsFileResource> result = new ArrayList<>();
if (sequence) {
List<SortedSet<TsFileResource>> sequenceTsFileList =
- sequenceTsFileResources.get(timePartition);
+ sequenceTsFileResources.getOrDefault(timePartition, new ArrayList<>());
for (int i = sequenceTsFileList.size() - 1; i >= 0; i--) {
result.addAll(sequenceTsFileList.get(i));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 88c296d..54cb225 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -2407,9 +2407,10 @@ public class StorageGroupProcessor {
long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck();
writeLock("loadNewTsFile");
try {
- List<TsFileResource> sequenceList = tsFileManagement.getTsFileList(true);
+ List<TsFileResource> sequenceList =
+ tsFileManagement.getTsFileListByTimePartition(true, newFilePartitionId);
- int insertPos = findInsertionPosition(newTsFileResource, newFilePartitionId, sequenceList);
+ int insertPos = findInsertionPosition(newTsFileResource, sequenceList);
String newFileName, renameInfo;
LoadTsFileType tsFileType;
@@ -2447,6 +2448,7 @@ public class StorageGroupProcessor {
updateLatestTimeMap(newTsFileResource);
long partitionNum = newTsFileResource.getTimePartition();
updatePartitionFileVersion(partitionNum, newTsFileResource.getVersion());
+ logger.info("TsFile {} is successfully loaded in {} list.", newFileName, renameInfo);
} catch (DiskSpaceInsufficientException e) {
logger.error(
"Failed to append the tsfile {} to storage group processor {} because the disk space is insufficient.",
@@ -2486,21 +2488,13 @@ public class StorageGroupProcessor {
* file can be inserted between [i, i+1]
*/
private int findInsertionPosition(
- TsFileResource newTsFileResource,
- long newFilePartitionId,
- List<TsFileResource> sequenceList) {
+ TsFileResource newTsFileResource, List<TsFileResource> sequenceList) {
int insertPos = -1;
// find the position where the new file should be inserted
for (int i = 0; i < sequenceList.size(); i++) {
TsFileResource localFile = sequenceList.get(i);
- long localPartitionId = Long.parseLong(localFile.getTsFile().getParentFile().getName());
-
- if (newFilePartitionId > localPartitionId) {
- insertPos = i;
- continue;
- }
if (!localFile.isClosed() && localFile.getProcessor() != null) {
// we cannot compare two files by TsFileResource unless they are both closed
@@ -2531,8 +2525,10 @@ public class StorageGroupProcessor {
*/
private int compareTsFileDevices(TsFileResource fileA, TsFileResource fileB) {
boolean hasPre = false, hasSubsequence = false;
- for (String device : fileA.getDevices()) {
- if (!fileB.getDevices().contains(device)) {
+ Set<String> fileADevices = fileA.getDevices();
+ Set<String> fileBDevices = fileB.getDevices();
+ for (String device : fileADevices) {
+ if (!fileBDevices.contains(device)) {
continue;
}
long startTimeA = fileA.getStartTime(device);
@@ -2665,6 +2661,7 @@ public class StorageGroupProcessor {
private String getFileNameForSequenceLoadingFile(
int insertIndex, TsFileResource newTsFileResource, List<TsFileResource> sequenceList)
throws LoadFileException {
+ int sequenceListLength = sequenceList.size();
long timePartitionId = newTsFileResource.getTimePartition();
long preTime, subsequenceTime;
@@ -2674,7 +2671,7 @@ public class StorageGroupProcessor {
String preName = sequenceList.get(insertIndex).getTsFile().getName();
preTime = Long.parseLong(preName.split(FILE_NAME_SEPARATOR)[0]);
}
- if (insertIndex == tsFileManagement.size(true) - 1) {
+ if (insertIndex == sequenceListLength - 1) {
subsequenceTime = preTime + ((System.currentTimeMillis() - preTime) << 1);
} else {
String subsequenceName = sequenceList.get(insertIndex + 1).getTsFile().getName();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 162dda6..4933329 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -198,6 +198,7 @@ import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TASK_NAME;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TTL;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_USER;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_VALUE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_UDAF;
import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_UDTF;
import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_EXTERNAL_UDAF;
@@ -1062,19 +1063,40 @@ public class PlanExecutor implements IPlanExecutor {
String.format("File path %s doesn't exists.", file.getPath()));
}
if (file.isDirectory()) {
- recursionFileDir(file, plan);
+ loadDir(file, plan);
} else {
loadFile(file, plan);
}
}
- private void recursionFileDir(File curFile, OperateFilePlan plan) throws QueryProcessException {
+ private void loadDir(File curFile, OperateFilePlan plan) throws QueryProcessException {
File[] files = curFile.listFiles();
+ long[] establishTime = new long[files.length];
+ List<Integer> tsfiles = new ArrayList<>();
+
+ for (int i = 0; i < files.length; i++) {
+ File file = files[i];
+ if (!file.isDirectory()) {
+ String fileName = file.getName();
+ if (fileName.endsWith(TSFILE_SUFFIX)) {
+ establishTime[i] = Long.parseLong(fileName.split(FILE_NAME_SEPARATOR)[0]);
+ tsfiles.add(i);
+ }
+ }
+ }
+ Collections.sort(
+ tsfiles,
+ (o1, o2) -> {
+ if (establishTime[o1] == establishTime[o2]) return 0;
+ return establishTime[o1] < establishTime[o2] ? -1 : 1;
+ });
+ for (Integer i : tsfiles) {
+ loadFile(files[i], plan);
+ }
+
for (File file : files) {
if (file.isDirectory()) {
- recursionFileDir(file, plan);
- } else {
- loadFile(file, plan);
+ loadDir(file, plan);
}
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsFileWithTimePartitionIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsFileWithTimePartitionIT.java
index d991123..80e0642 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsFileWithTimePartitionIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLoadExternalTsFileWithTimePartitionIT.java
@@ -46,10 +46,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
+import java.sql.*;
import java.util.Arrays;
public class IoTDBLoadExternalTsFileWithTimePartitionIT {
@@ -58,8 +55,8 @@ public class IoTDBLoadExternalTsFileWithTimePartitionIT {
String tempDir = "temp";
String STORAGE_GROUP = "root.ln";
- String[] devices = new String[] {"d1", "d2"};
- String[] measurements = new String[] {"s1", "s2"};
+ String[] devices = new String[] {"d1", "d2", "d3"};
+ String[] measurements = new String[] {"s1", "s2", "s3"};
// generate several tsFiles, with timestamp from startTime(inclusive) to endTime(exclusive)
long startTime = 0;
@@ -69,6 +66,8 @@ public class IoTDBLoadExternalTsFileWithTimePartitionIT {
long recordTimeGap = 1000;
+ long[] deviceDataPointNumber = new long[devices.length];
+
int originalTsFileNum = 0;
boolean originalIsEnablePartition;
@@ -86,8 +85,6 @@ public class IoTDBLoadExternalTsFileWithTimePartitionIT {
StorageEngine.setEnablePartition(true);
StorageEngine.setTimePartitionInterval(timePartition);
-
- prepareData();
}
@After
@@ -106,7 +103,7 @@ public class IoTDBLoadExternalTsFileWithTimePartitionIT {
/** get the name of tsfile given counter */
String getName(int counter) {
- return tempDir + File.separator + System.currentTimeMillis() + "-" + counter + "-0.tsfile";
+ return tempDir + File.separator + System.currentTimeMillis() + "-" + counter + "-0-0.tsfile";
}
/** write a record, given timestamp */
@@ -171,10 +168,11 @@ public class IoTDBLoadExternalTsFileWithTimePartitionIT {
}
@Test
- public void loadTsFileWithTimePartition() {
+ public void loadTsFileWithTimePartitionTest() {
try (Connection connection =
DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
+ prepareData();
statement.execute(String.format("load \"%s\"", new File(tempDir).getAbsolutePath()));
@@ -205,6 +203,125 @@ public class IoTDBLoadExternalTsFileWithTimePartitionIT {
}
} catch (SQLException | IllegalPathException throwables) {
throwables.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ void writeDataWithDifferentDevice(TsFileWriter tsFileWriter, long timestamp, int counter)
+ throws IOException, WriteProcessException {
+ int mod = (counter % 6);
+ if (mod < 3) {
+ TSRecord tsRecord = new TSRecord(timestamp, STORAGE_GROUP + DOT + devices[mod]);
+ deviceDataPointNumber[mod] += 1;
+ for (String measurement : measurements) {
+ DataPoint dPoint = new LongDataPoint(measurement, 10000);
+ tsRecord.addTuple(dPoint);
+ }
+ tsFileWriter.write(tsRecord);
+ } else {
+ for (int i = 2; i <= devices.length; i++) {
+ for (int j = 1; j < i; j++) {
+ if (i + j == mod) {
+ TSRecord tsRecord1 = new TSRecord(timestamp, STORAGE_GROUP + DOT + devices[i - 1]);
+ TSRecord tsRecord2 = new TSRecord(timestamp, STORAGE_GROUP + DOT + devices[j - 1]);
+ deviceDataPointNumber[i - 1] += 1;
+ deviceDataPointNumber[j - 1] += 1;
+ for (String measurement : measurements) {
+ DataPoint dataPoint1 = new LongDataPoint(measurement, 100);
+ DataPoint dataPoint2 = new LongDataPoint(measurement, 10000);
+ tsRecord1.addTuple(dataPoint1);
+ tsRecord2.addTuple(dataPoint2);
+ }
+ tsFileWriter.write(tsRecord1);
+ tsFileWriter.write(tsRecord2);
+ return;
+ }
+ }
+ }
+ }
+ }
+
+ void prepareDataWithDifferentDevice() {
+ startTime = 0;
+ endTime = 100_000;
+ recordTimeGap = 10;
+
+ long tsfileMaxTime = 1000;
+ File dir = new File(tempDir);
+ if (dir.exists()) {
+ FileUtils.deleteDirectory(dir);
+ }
+ dir.mkdir();
+ try {
+ File f;
+ TsFileWriter tsFileWriter = null;
+ int counter = 0;
+ for (long timestamp = startTime; timestamp < endTime; timestamp += recordTimeGap) {
+ if (timestamp % tsfileMaxTime == 0) {
+ if (tsFileWriter != null) {
+ tsFileWriter.flushAllChunkGroups();
+ tsFileWriter.close();
+ counter++;
+ }
+ String path = getName(counter);
+ f = FSFactoryProducer.getFSFactory().getFile(path);
+ tsFileWriter = new TsFileWriter(new TsFileIOWriter(f));
+ register(tsFileWriter);
+ }
+ writeDataWithDifferentDevice(tsFileWriter, timestamp, counter);
+ }
+ tsFileWriter.flushAllChunkGroups();
+ tsFileWriter.close();
+ originalTsFileNum = counter + 1;
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void loadTsFileWithDifferentDevice() {
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+ prepareDataWithDifferentDevice();
+
+ statement.execute(String.format("load \"%s\"", new File(tempDir).getAbsolutePath()));
+
+ String dataDir = config.getDataDirs()[0];
+ // sequence/logical_sg/virtual_sg/time_partitions
+ File f =
+ new File(
+ dataDir,
+ new PartialPath("sequence") + File.separator + "root.ln" + File.separator + "0");
+ Assert.assertEquals((endTime - startTime) / (timePartition), f.list().length);
+
+ int totalPartitionsNum = (int) ((endTime - startTime) / (timePartition));
+ int[] splitTsFilePartitions = new int[totalPartitionsNum];
+ for (int i = 0; i < splitTsFilePartitions.length; i++) {
+ splitTsFilePartitions[i] = Integer.parseInt(f.list()[i]);
+ }
+ Arrays.sort(splitTsFilePartitions);
+
+ for (int i = 0; i < (endTime - startTime) / (timePartition); i++) {
+ Assert.assertEquals(i, splitTsFilePartitions[i]);
+ }
+
+ // each time partition folder should contain 2 files, the tsfile and the resource file
+ for (int i = 0; i < (endTime - startTime) / (timePartition); i++) {
+ Assert.assertEquals(2, new File(f.getAbsolutePath(), "" + i).list().length);
+ }
+
+ for (int i = 0; i < devices.length; i++) {
+ statement.executeQuery(
+ "select count(" + measurements[0] + ") from " + STORAGE_GROUP + DOT + devices[i]);
+ ResultSet set = statement.getResultSet();
+ set.next();
+ long number = set.getLong(1);
+ Assert.assertEquals(deviceDataPointNumber[i], number);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
}
}
}