You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/11/12 09:25:46 UTC

[iotdb] branch jira-2000 created (now 302241c)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a change to branch jira-2000
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 302241c  [IOTDB-2000] DataMigrationExample should migrate data device by device

This branch includes the following new commits:

     new 302241c  [IOTDB-2000] DataMigrationExample should migrate data device by device

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: [IOTDB-2000] DataMigrationExample should migrate data device by device

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch jira-2000
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 302241c1cca1b763a85c14510dc3ed1edf1269ef
Author: HTHou <hh...@outlook.com>
AuthorDate: Fri Nov 12 17:22:24 2021 +0800

    [IOTDB-2000] DataMigrationExample should migrate data device by device
---
 .../org/apache/iotdb/DataMigrationExample.java     | 107 ++++++++++++++-------
 1 file changed, 70 insertions(+), 37 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java b/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java
index 2959f1e..ee48154 100644
--- a/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.session.pool.SessionPool;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BitMap;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
@@ -87,14 +88,30 @@ public class DataMigrationExample {
 
     List<Future> futureList = new ArrayList<>();
     int count = 0;
+    int seriesNumInOneTask = 0;
+    String currentDevice = "";
+    List<String> measurementsInCurrentDevice = new ArrayList<>();
+    List<TSDataType> dataTypesInCurrentDevice = new ArrayList<>();
     while (schemaIter.next()) {
-      count++;
       Path currentPath = new Path(schemaIter.getString("timeseries"), true);
-      Future future =
-          executorService.submit(
-              new LoadThread(
-                  count, currentPath, TSDataType.valueOf(schemaIter.getString("dataType"))));
-      futureList.add(future);
+      if (!currentDevice.equals(currentPath.getDevice())) {
+        if (!currentDevice.equals("") || seriesNumInOneTask > 300) {
+          count++;
+          Future future =
+            executorService.submit(
+                new LoadThread(
+                    count, currentDevice, measurementsInCurrentDevice, dataTypesInCurrentDevice));
+          futureList.add(future);
+          seriesNumInOneTask = 0;
+        }
+        seriesNumInOneTask++;
+        currentDevice = currentPath.getDevice();
+        measurementsInCurrentDevice = new ArrayList<>();
+        dataTypesInCurrentDevice = new ArrayList<>();
+      }
+      measurementsInCurrentDevice.add(currentPath.getMeasurement());
+      dataTypesInCurrentDevice.add(TSDataType.valueOf(schemaIter.getString("dataType")));
+
     }
     readerPool.closeResultSet(schemaDataSet);
 
@@ -110,57 +127,73 @@ public class DataMigrationExample {
   static class LoadThread implements Callable<Void> {
 
     String device;
-    String measurement;
-    Path series;
-    TSDataType dataType;
+    List<String> measurements;
+    List<TSDataType> dataTypes;
     Tablet tablet;
     int i;
 
-    public LoadThread(int i, Path series, TSDataType dataType) {
+    public LoadThread(int i, String device, List<String> measurements, List<TSDataType> dataTypes) {
       this.i = i;
-      this.device = series.getDevice();
-      this.measurement = series.getMeasurement();
-      this.dataType = dataType;
-      this.series = series;
+      this.device = device;
+      this.measurements = measurements;
+      this.dataTypes = dataTypes;
     }
 
     @Override
     public Void call() {
 
       List<IMeasurementSchema> schemaList = new ArrayList<>();
-      schemaList.add(new UnaryMeasurementSchema(measurement, dataType));
+      StringBuffer measurementsString = new StringBuffer();
+      for (int i = 0; i < measurements.size(); i++) {
+        schemaList.add(new UnaryMeasurementSchema(measurements.get(i), dataTypes.get(i)));
+        measurementsString.append(measurements.get(i));
+        if (i != measurements.size() - 1) {
+           measurementsString.append(", ");
+        }
+      }
       tablet = new Tablet(device, schemaList, 300000);
+      tablet.bitMaps = new BitMap[schemaList.size()];
+      for (int i = 0; i < measurements.size(); i++) {
+        tablet.bitMaps[i] = new BitMap(tablet.getMaxRowNumber());
+      }
       SessionDataSetWrapper dataSet = null;
 
       try {
 
         dataSet =
             readerPool.executeQueryStatement(
-                String.format("select %s from %s", measurement, device));
+                String.format("select %s from %s", measurementsString.toString(), device));
 
         DataIterator dataIter = dataSet.iterator();
         while (dataIter.next()) {
           int row = tablet.rowSize++;
           tablet.timestamps[row] = dataIter.getLong(1);
-          switch (dataType) {
-            case BOOLEAN:
-              ((boolean[]) tablet.values[0])[row] = dataIter.getBoolean(2);
-              break;
-            case INT32:
-              ((int[]) tablet.values[0])[row] = dataIter.getInt(2);
-              break;
-            case INT64:
-              ((long[]) tablet.values[0])[row] = dataIter.getLong(2);
-              break;
-            case FLOAT:
-              ((float[]) tablet.values[0])[row] = dataIter.getFloat(2);
-              break;
-            case DOUBLE:
-              ((double[]) tablet.values[0])[row] = dataIter.getDouble(2);
-              break;
-            case TEXT:
-              ((Binary[]) tablet.values[0])[row] = new Binary(dataIter.getString(2));
-              break;
+          for (int i = 0; i < measurements.size(); i++) {
+            if (dataIter.isNull(i + 2)) {
+              tablet.bitMaps[i].mark((int) row);
+            }
+            switch (dataTypes.get(i)) {
+              case BOOLEAN:
+                ((boolean[]) tablet.values[i])[row] = dataIter.getBoolean(i + 2);
+                break;
+              case INT32:
+                ((int[]) tablet.values[i])[row] = dataIter.getInt(i + 2);
+                break;
+              case INT64:
+                ((long[]) tablet.values[i])[row] = dataIter.getLong(i + 2);
+                break;
+              case FLOAT:
+                ((float[]) tablet.values[i])[row] = dataIter.getFloat(i + 2);
+                break;
+              case DOUBLE:
+                ((double[]) tablet.values[i])[row] = dataIter.getDouble(i + 2);
+                break;
+              case TEXT:
+                ((Binary[]) tablet.values[i])[row] = new Binary(dataIter.getString(i + 2));
+                break;
+              default:
+                break;
+            }
           }
           if (tablet.rowSize == tablet.getMaxRowNumber()) {
             writerPool.insertTablet(tablet, true);
@@ -174,13 +207,13 @@ public class DataMigrationExample {
 
       } catch (Exception e) {
         System.out.println(
-            "Loading the " + i + "-th timeseries: " + series + " failed " + e.getMessage());
+            "Loading the " + i + "-th devices: " + device + " failed " + e.getMessage());
         return null;
       } finally {
         readerPool.closeResultSet(dataSet);
       }
 
-      System.out.println("Loading the " + i + "-th timeseries: " + series + " success");
+      System.out.println("Loading the " + i + "-th devices: " + device + " success");
       return null;
     }
   }