You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/02/25 13:00:01 UTC

[iotdb] branch master updated: [IOTDB-2604] batch size is invalid in import-csv tool (#5115)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 95bb194  [IOTDB-2604] batch size is invalid in import-csv tool (#5115)
95bb194 is described below

commit 95bb194dd95d868fddc2b82ab2fe9dbb541f2c4f
Author: Xuan Ronaldo <xu...@qq.com>
AuthorDate: Fri Feb 25 20:59:24 2022 +0800

    [IOTDB-2604] batch size is invalid in import-csv tool (#5115)
---
 .../main/java/org/apache/iotdb/tool/ImportCsv.java | 31 +++++++++++++++++++---
 docs/UserGuide/Write-And-Delete-Data/CSV-Tool.md   |  4 +++
 .../zh/UserGuide/Write-And-Delete-Data/CSV-Tool.md |  8 ++++--
 3 files changed, 38 insertions(+), 5 deletions(-)

diff --git a/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java b/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java
index 55e9199..02e4c7c 100644
--- a/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java
+++ b/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java
@@ -51,6 +51,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -72,6 +73,9 @@ public class ImportCsv extends AbstractCsvTool {
   private static final String FAILED_FILE_ARGS = "fd";
   private static final String FAILED_FILE_NAME = "failed file directory";
 
+  private static final String BATCH_POINT_SIZE_ARGS = "batch";
+  private static final String BATCH_POINT_SIZE_NAME = "batch point size";
+
   private static final String ALIGNED_ARGS = "aligned";
   private static final String ALIGNED_NAME = "use the aligned interface";
 
@@ -87,7 +91,7 @@ public class ImportCsv extends AbstractCsvTool {
   private static String timeColumn = "Time";
   private static String deviceColumn = "Device";
 
-  private static final int BATCH_SIZE = 10000;
+  private static int batchPointSize = 100_000;
 
   /**
    * create the commandline options.
@@ -141,6 +145,14 @@ public class ImportCsv extends AbstractCsvTool {
             .build();
     options.addOption(opTimeZone);
 
+    Option opBatchPointSize =
+        Option.builder(BATCH_POINT_SIZE_ARGS)
+            .argName(BATCH_POINT_SIZE_NAME)
+            .hasArg()
+            .desc("100000 (optional)")
+            .build();
+    options.addOption(opBatchPointSize);
+
     return options;
   }
 
@@ -152,6 +164,9 @@ public class ImportCsv extends AbstractCsvTool {
   private static void parseSpecialParams(CommandLine commandLine) {
     timeZoneID = commandLine.getOptionValue(TIME_ZONE_ARGS);
     targetPath = commandLine.getOptionValue(FILE_ARGS);
+    if (commandLine.getOptionValue(BATCH_POINT_SIZE_ARGS) != null) {
+      batchPointSize = Integer.parseInt(commandLine.getOptionValue(BATCH_POINT_SIZE_ARGS));
+    }
     if (commandLine.getOptionValue(FAILED_FILE_ARGS) != null) {
       failedFileDirectory = commandLine.getOptionValue(FAILED_FILE_ARGS);
       File file = new File(failedFileDirectory);
@@ -330,6 +345,7 @@ public class ImportCsv extends AbstractCsvTool {
 
     AtomicReference<SimpleDateFormat> timeFormatter = new AtomicReference<>(null);
     AtomicReference<Boolean> hasStarted = new AtomicReference<>(false);
+    AtomicInteger pointSize = new AtomicInteger(0);
 
     ArrayList<List<Object>> failedRecords = new ArrayList<>();
 
@@ -338,8 +354,9 @@ public class ImportCsv extends AbstractCsvTool {
           if (!hasStarted.get()) {
             hasStarted.set(true);
             timeFormatter.set(formatterInit(record.get(0)));
-          } else if ((record.getRecordNumber() - 1) % BATCH_SIZE == 0) {
+          } else if (pointSize.get() >= batchPointSize) {
             writeAndEmptyDataSet(deviceIds, times, typesList, valuesList, measurementsList, 3);
+            pointSize.set(0);
           }
 
           boolean isFail = false;
@@ -378,6 +395,7 @@ public class ImportCsv extends AbstractCsvTool {
                     measurements.add(headerNameMap.get(header).replace(deviceId + '.', ""));
                     types.add(type);
                     values.add(valueTrans);
+                    pointSize.getAndIncrement();
                   }
                 }
               }
@@ -406,6 +424,7 @@ public class ImportCsv extends AbstractCsvTool {
         });
     if (!deviceIds.isEmpty()) {
       writeAndEmptyDataSet(deviceIds, times, typesList, valuesList, measurementsList, 3);
+      pointSize.set(0);
     }
 
     if (!failedRecords.isEmpty()) {
@@ -442,6 +461,8 @@ public class ImportCsv extends AbstractCsvTool {
     List<List<Object>> valuesList = new ArrayList<>();
     List<List<String>> measurementsList = new ArrayList<>();
 
+    AtomicInteger pointSize = new AtomicInteger(0);
+
     ArrayList<List<Object>> failedRecords = new ArrayList<>();
 
     records.forEach(
@@ -455,10 +476,12 @@ public class ImportCsv extends AbstractCsvTool {
             writeAndEmptyDataSet(
                 deviceName.get(), times, typesList, valuesList, measurementsList, 3);
             deviceName.set(record.get(1));
-          } else if (record.getRecordNumber() - 1 % BATCH_SIZE == 0 && times.size() != 0) {
+            pointSize.set(0);
+          } else if (pointSize.get() >= batchPointSize) {
             // insert a batch
             writeAndEmptyDataSet(
                 deviceName.get(), times, typesList, valuesList, measurementsList, 3);
+            pointSize.set(0);
           }
 
           // the data of the record
@@ -508,6 +531,7 @@ public class ImportCsv extends AbstractCsvTool {
                   values.add(valueTrans);
                   measurements.add(headerNameMap.get(measurement));
                   types.add(type);
+                  pointSize.getAndIncrement();
                 }
               }
             }
@@ -534,6 +558,7 @@ public class ImportCsv extends AbstractCsvTool {
         });
     if (times.size() != 0) {
       writeAndEmptyDataSet(deviceName.get(), times, typesList, valuesList, measurementsList, 3);
+      pointSize.set(0);
     }
     if (!failedRecords.isEmpty()) {
       writeCsvFile(headerNames, failedRecords, failedFilePath);
diff --git a/docs/UserGuide/Write-And-Delete-Data/CSV-Tool.md b/docs/UserGuide/Write-And-Delete-Data/CSV-Tool.md
index 08bad7d..d774ca3 100644
--- a/docs/UserGuide/Write-And-Delete-Data/CSV-Tool.md
+++ b/docs/UserGuide/Write-And-Delete-Data/CSV-Tool.md
@@ -191,6 +191,10 @@ Description:
   - whether to use the aligned interface? The option `false` is default.
   - example: `-aligned true`
 
+* `-batch`:
+  - specifying the point's number of a batch. If the program throw the exception `org.apache.thrift.transport.TTransportException: Frame size larger than protect max size`, you can lower this parameter as appropriate.
+  - example: `-batch 100000`, `100000` is the default value.
+
 ### Example
 
 ```sh
diff --git a/docs/zh/UserGuide/Write-And-Delete-Data/CSV-Tool.md b/docs/zh/UserGuide/Write-And-Delete-Data/CSV-Tool.md
index a34437c..80244ba 100644
--- a/docs/zh/UserGuide/Write-And-Delete-Data/CSV-Tool.md
+++ b/docs/zh/UserGuide/Write-And-Delete-Data/CSV-Tool.md
@@ -185,11 +185,15 @@ Time,Device,str(TEXT),int(INT32)
 
 * `-fd`:
   - 指定一个目录来存放保存失败的行的文件,如果你没有指定这个参数,失败的文件将会被保存到源数据的目录中,然后文件名是源文件名加上`.failed`的后缀。
-  - example: `-fd ./failed/`
+  - 例如: `-fd ./failed/`
 
 * `-aligned`:
   - 是否使用`aligned`接口? 默认参数为`false`。
-  - example: `-aligned true`
+  - 例如: `-aligned true`
+
+* `-batch`:
+  - 用于指定每一批插入的数据的点数。如果程序报了`org.apache.thrift.transport.TTransportException: Frame size larger than protect max size`这个错的话,就可以适当的调低这个参数。
+  - 例如: `-batch 100000`,`100000`是默认值。
 
 ### 运行示例