You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/11/12 09:25:47 UTC

[iotdb] 01/01: [IOTDB-2000] DataMigrationExample should migrate data device by device

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch jira-2000
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 302241c1cca1b763a85c14510dc3ed1edf1269ef
Author: HTHou <hh...@outlook.com>
AuthorDate: Fri Nov 12 17:22:24 2021 +0800

    [IOTDB-2000] DataMigrationExample should migrate data device by device
---
 .../org/apache/iotdb/DataMigrationExample.java     | 107 ++++++++++++++-------
 1 file changed, 70 insertions(+), 37 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java b/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java
index 2959f1e..ee48154 100644
--- a/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.session.pool.SessionPool;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
@@ -87,14 +88,30 @@ public class DataMigrationExample {
 
     List<Future> futureList = new ArrayList<>();
     int count = 0;
+    int seriesNumInOneTask = 0;
+    String currentDevice = "";
+    List<String> measurementsInCurrentDevice = new ArrayList<>();
+    List<TSDataType> dataTypesInCurrentDevice = new ArrayList<>();
     while (schemaIter.next()) {
-      count++;
       Path currentPath = new Path(schemaIter.getString("timeseries"), true);
-      Future future =
-          executorService.submit(
-              new LoadThread(
-                  count, currentPath, TSDataType.valueOf(schemaIter.getString("dataType"))));
-      futureList.add(future);
+      if (!currentDevice.equals(currentPath.getDevice())) {
+        if (!currentDevice.equals("") || seriesNumInOneTask > 300) {
+          count++;
+          Future future =
+            executorService.submit(
+                new LoadThread(
+                    count, currentDevice, measurementsInCurrentDevice, dataTypesInCurrentDevice));
+          futureList.add(future);
+          seriesNumInOneTask = 0;
+        }
+        seriesNumInOneTask++;
+        currentDevice = currentPath.getDevice();
+        measurementsInCurrentDevice = new ArrayList<>();
+        dataTypesInCurrentDevice = new ArrayList<>();
+      }
+      measurementsInCurrentDevice.add(currentPath.getMeasurement());
+      dataTypesInCurrentDevice.add(TSDataType.valueOf(schemaIter.getString("dataType")));
+
     }
     readerPool.closeResultSet(schemaDataSet);
 
@@ -110,57 +127,73 @@ public class DataMigrationExample {
   static class LoadThread implements Callable<Void> {
 
     String device;
-    String measurement;
-    Path series;
-    TSDataType dataType;
+    List<String> measurements;
+    List<TSDataType> dataTypes;
     Tablet tablet;
     int i;
 
-    public LoadThread(int i, Path series, TSDataType dataType) {
+    public LoadThread(int i, String device, List<String> measurements, List<TSDataType> dataTypes) {
       this.i = i;
-      this.device = series.getDevice();
-      this.measurement = series.getMeasurement();
-      this.dataType = dataType;
-      this.series = series;
+      this.device = device;
+      this.measurements = measurements;
+      this.dataTypes = dataTypes;
     }
 
     @Override
     public Void call() {
 
       List<IMeasurementSchema> schemaList = new ArrayList<>();
-      schemaList.add(new UnaryMeasurementSchema(measurement, dataType));
+      StringBuffer measurementsString = new StringBuffer();
+      for (int i = 0; i < measurements.size(); i++) {
+        schemaList.add(new UnaryMeasurementSchema(measurements.get(i), dataTypes.get(i)));
+        measurementsString.append(measurements.get(i));
+        if (i != measurements.size() - 1) {
+           measurementsString.append(", ");
+        }
+      }
       tablet = new Tablet(device, schemaList, 300000);
+      tablet.bitMaps = new BitMap[schemaList.size()];
+      for (int i = 0; i < measurements.size(); i++) {
+        tablet.bitMaps[i] = new BitMap(tablet.getMaxRowNumber());
+      }
       SessionDataSetWrapper dataSet = null;
 
       try {
 
         dataSet =
             readerPool.executeQueryStatement(
-                String.format("select %s from %s", measurement, device));
+                String.format("select %s from %s", measurementsString.toString(), device));
 
         DataIterator dataIter = dataSet.iterator();
         while (dataIter.next()) {
           int row = tablet.rowSize++;
           tablet.timestamps[row] = dataIter.getLong(1);
-          switch (dataType) {
-            case BOOLEAN:
-              ((boolean[]) tablet.values[0])[row] = dataIter.getBoolean(2);
-              break;
-            case INT32:
-              ((int[]) tablet.values[0])[row] = dataIter.getInt(2);
-              break;
-            case INT64:
-              ((long[]) tablet.values[0])[row] = dataIter.getLong(2);
-              break;
-            case FLOAT:
-              ((float[]) tablet.values[0])[row] = dataIter.getFloat(2);
-              break;
-            case DOUBLE:
-              ((double[]) tablet.values[0])[row] = dataIter.getDouble(2);
-              break;
-            case TEXT:
-              ((Binary[]) tablet.values[0])[row] = new Binary(dataIter.getString(2));
-              break;
+          for (int i = 0; i < measurements.size(); i++) {
+            if (dataIter.isNull(i + 2)) {
+              tablet.bitMaps[i].mark((int) row);
+            }
+            switch (dataTypes.get(i)) {
+              case BOOLEAN:
+                ((boolean[]) tablet.values[i])[row] = dataIter.getBoolean(i + 2);
+                break;
+              case INT32:
+                ((int[]) tablet.values[i])[row] = dataIter.getInt(i + 2);
+                break;
+              case INT64:
+                ((long[]) tablet.values[i])[row] = dataIter.getLong(i + 2);
+                break;
+              case FLOAT:
+                ((float[]) tablet.values[i])[row] = dataIter.getFloat(i + 2);
+                break;
+              case DOUBLE:
+                ((double[]) tablet.values[i])[row] = dataIter.getDouble(i + 2);
+                break;
+              case TEXT:
+                ((Binary[]) tablet.values[i])[row] = new Binary(dataIter.getString(i + 2));
+                break;
+              default:
+                break;
+            }
           }
           if (tablet.rowSize == tablet.getMaxRowNumber()) {
             writerPool.insertTablet(tablet, true);
@@ -174,13 +207,13 @@ public class DataMigrationExample {
 
       } catch (Exception e) {
         System.out.println(
-            "Loading the " + i + "-th timeseries: " + series + " failed " + e.getMessage());
+            "Loading the " + i + "-th devices: " + device + " failed " + e.getMessage());
         return null;
       } finally {
         readerPool.closeResultSet(dataSet);
       }
 
-      System.out.println("Loading the " + i + "-th timeseries: " + series + " success");
+      System.out.println("Loading the " + i + "-th devices: " + device + " success");
       return null;
     }
   }