You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/02/25 13:00:01 UTC
[iotdb] branch master updated: [IOTDB-2604] batch size is invalid in import-csv tool (#5115)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 95bb194 [IOTDB-2604] batch size is invalid in import-csv tool (#5115)
95bb194 is described below
commit 95bb194dd95d868fddc2b82ab2fe9dbb541f2c4f
Author: Xuan Ronaldo <xu...@qq.com>
AuthorDate: Fri Feb 25 20:59:24 2022 +0800
[IOTDB-2604] batch size is invalid in import-csv tool (#5115)
---
.../main/java/org/apache/iotdb/tool/ImportCsv.java | 31 +++++++++++++++++++---
docs/UserGuide/Write-And-Delete-Data/CSV-Tool.md | 4 +++
.../zh/UserGuide/Write-And-Delete-Data/CSV-Tool.md | 8 ++++--
3 files changed, 38 insertions(+), 5 deletions(-)
diff --git a/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java b/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java
index 55e9199..02e4c7c 100644
--- a/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java
+++ b/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java
@@ -51,6 +51,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -72,6 +73,9 @@ public class ImportCsv extends AbstractCsvTool {
private static final String FAILED_FILE_ARGS = "fd";
private static final String FAILED_FILE_NAME = "failed file directory";
+ private static final String BATCH_POINT_SIZE_ARGS = "batch";
+ private static final String BATCH_POINT_SIZE_NAME = "batch point size";
+
private static final String ALIGNED_ARGS = "aligned";
private static final String ALIGNED_NAME = "use the aligned interface";
@@ -87,7 +91,7 @@ public class ImportCsv extends AbstractCsvTool {
private static String timeColumn = "Time";
private static String deviceColumn = "Device";
- private static final int BATCH_SIZE = 10000;
+ private static int batchPointSize = 100_000;
/**
* create the commandline options.
@@ -141,6 +145,14 @@ public class ImportCsv extends AbstractCsvTool {
.build();
options.addOption(opTimeZone);
+ Option opBatchPointSize =
+ Option.builder(BATCH_POINT_SIZE_ARGS)
+ .argName(BATCH_POINT_SIZE_NAME)
+ .hasArg()
+ .desc("100000 (optional)")
+ .build();
+ options.addOption(opBatchPointSize);
+
return options;
}
@@ -152,6 +164,9 @@ public class ImportCsv extends AbstractCsvTool {
private static void parseSpecialParams(CommandLine commandLine) {
timeZoneID = commandLine.getOptionValue(TIME_ZONE_ARGS);
targetPath = commandLine.getOptionValue(FILE_ARGS);
+ if (commandLine.getOptionValue(BATCH_POINT_SIZE_ARGS) != null) {
+ batchPointSize = Integer.parseInt(commandLine.getOptionValue(BATCH_POINT_SIZE_ARGS));
+ }
if (commandLine.getOptionValue(FAILED_FILE_ARGS) != null) {
failedFileDirectory = commandLine.getOptionValue(FAILED_FILE_ARGS);
File file = new File(failedFileDirectory);
@@ -330,6 +345,7 @@ public class ImportCsv extends AbstractCsvTool {
AtomicReference<SimpleDateFormat> timeFormatter = new AtomicReference<>(null);
AtomicReference<Boolean> hasStarted = new AtomicReference<>(false);
+ AtomicInteger pointSize = new AtomicInteger(0);
ArrayList<List<Object>> failedRecords = new ArrayList<>();
@@ -338,8 +354,9 @@ public class ImportCsv extends AbstractCsvTool {
if (!hasStarted.get()) {
hasStarted.set(true);
timeFormatter.set(formatterInit(record.get(0)));
- } else if ((record.getRecordNumber() - 1) % BATCH_SIZE == 0) {
+ } else if (pointSize.get() >= batchPointSize) {
writeAndEmptyDataSet(deviceIds, times, typesList, valuesList, measurementsList, 3);
+ pointSize.set(0);
}
boolean isFail = false;
@@ -378,6 +395,7 @@ public class ImportCsv extends AbstractCsvTool {
measurements.add(headerNameMap.get(header).replace(deviceId + '.', ""));
types.add(type);
values.add(valueTrans);
+ pointSize.getAndIncrement();
}
}
}
@@ -406,6 +424,7 @@ public class ImportCsv extends AbstractCsvTool {
});
if (!deviceIds.isEmpty()) {
writeAndEmptyDataSet(deviceIds, times, typesList, valuesList, measurementsList, 3);
+ pointSize.set(0);
}
if (!failedRecords.isEmpty()) {
@@ -442,6 +461,8 @@ public class ImportCsv extends AbstractCsvTool {
List<List<Object>> valuesList = new ArrayList<>();
List<List<String>> measurementsList = new ArrayList<>();
+ AtomicInteger pointSize = new AtomicInteger(0);
+
ArrayList<List<Object>> failedRecords = new ArrayList<>();
records.forEach(
@@ -455,10 +476,12 @@ public class ImportCsv extends AbstractCsvTool {
writeAndEmptyDataSet(
deviceName.get(), times, typesList, valuesList, measurementsList, 3);
deviceName.set(record.get(1));
- } else if (record.getRecordNumber() - 1 % BATCH_SIZE == 0 && times.size() != 0) {
+ pointSize.set(0);
+ } else if (pointSize.get() >= batchPointSize) {
// insert a batch
writeAndEmptyDataSet(
deviceName.get(), times, typesList, valuesList, measurementsList, 3);
+ pointSize.set(0);
}
// the data of the record
@@ -508,6 +531,7 @@ public class ImportCsv extends AbstractCsvTool {
values.add(valueTrans);
measurements.add(headerNameMap.get(measurement));
types.add(type);
+ pointSize.getAndIncrement();
}
}
}
@@ -534,6 +558,7 @@ public class ImportCsv extends AbstractCsvTool {
});
if (times.size() != 0) {
writeAndEmptyDataSet(deviceName.get(), times, typesList, valuesList, measurementsList, 3);
+ pointSize.set(0);
}
if (!failedRecords.isEmpty()) {
writeCsvFile(headerNames, failedRecords, failedFilePath);
diff --git a/docs/UserGuide/Write-And-Delete-Data/CSV-Tool.md b/docs/UserGuide/Write-And-Delete-Data/CSV-Tool.md
index 08bad7d..d774ca3 100644
--- a/docs/UserGuide/Write-And-Delete-Data/CSV-Tool.md
+++ b/docs/UserGuide/Write-And-Delete-Data/CSV-Tool.md
@@ -191,6 +191,10 @@ Description:
- whether to use the aligned interface? The option `false` is default.
- example: `-aligned true`
+* `-batch`:
+ - specifying the point's number of a batch. If the program throw the exception `org.apache.thrift.transport.TTransportException: Frame size larger than protect max size`, you can lower this parameter as appropriate.
+ - example: `-batch 100000`, `100000` is the default value.
+
### Example
```sh
diff --git a/docs/zh/UserGuide/Write-And-Delete-Data/CSV-Tool.md b/docs/zh/UserGuide/Write-And-Delete-Data/CSV-Tool.md
index a34437c..80244ba 100644
--- a/docs/zh/UserGuide/Write-And-Delete-Data/CSV-Tool.md
+++ b/docs/zh/UserGuide/Write-And-Delete-Data/CSV-Tool.md
@@ -185,11 +185,15 @@ Time,Device,str(TEXT),int(INT32)
* `-fd`:
- 指定一个目录来存放保存失败的行的文件,如果你没有指定这个参数,失败的文件将会被保存到源数据的目录中,然后文件名是源文件名加上`.failed`的后缀。
- - example: `-fd ./failed/`
+ - 例如: `-fd ./failed/`
* `-aligned`:
- 是否使用`aligned`接口? 默认参数为`false`。
- - example: `-aligned true`
+ - 例如: `-aligned true`
+
+* `-batch`:
+ - 用于指定每一批插入的数据的点数。如果程序报了`org.apache.thrift.transport.TTransportException: Frame size larger than protect max size`这个错的话,就可以适当的调低这个参数。
+ - 例如: `-batch 100000`,`100000`是默认值。
### 运行示例