You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/05/25 12:25:08 UTC

[iotdb] branch master updated: [IOTDB-5915] Adjust the granularity of data migration example from timeseries to device (#9916)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ea3badc24e8 [IOTDB-5915] Adjust the granularity of data migration example from timeseries to device (#9916)
ea3badc24e8 is described below

commit ea3badc24e858de8372c96b356e7e89632a227f9
Author: YuFengLiu <38...@users.noreply.github.com>
AuthorDate: Thu May 25 20:25:02 2023 +0800

    [IOTDB-5915] Adjust the granularity of data migration example from timeseries to device (#9916)
---
 example/session/pom.xml                            |   6 +
 .../org/apache/iotdb/DataMigrationExample.java     | 142 +++++++++++----------
 .../org/apache/iotdb/isession/SessionDataSet.java  |   8 ++
 3 files changed, 89 insertions(+), 67 deletions(-)

diff --git a/example/session/pom.xml b/example/session/pom.xml
index 6bf75ac6d61..5c8280d92ad 100644
--- a/example/session/pom.xml
+++ b/example/session/pom.xml
@@ -39,5 +39,11 @@
             <artifactId>iotdb-session</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>node-commons</artifactId>
+            <version>${project.version}</version>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java b/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java
index a3b8bf44ff1..ed503e6068e 100644
--- a/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java
@@ -18,14 +18,13 @@
  */
 package org.apache.iotdb;
 
+import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.isession.SessionDataSet.DataIterator;
 import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.session.pool.SessionPool;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
@@ -39,7 +38,7 @@ import java.util.concurrent.Future;
 
 /**
  * Migrate all data belongs to a path from one IoTDB to another IoTDB Each thread migrate one
- * series, the concurrent thread can be configured by concurrency
+ * series, the concurrent thread can be configured by CONCURRENCY
  *
  * <p>This example is migrating all timeseries from a local IoTDB with 6667 port to a local IoTDB
  * with 6668 port
@@ -51,53 +50,49 @@ public class DataMigrationExample {
   // used to write data into the destination IoTDB
   private static SessionPool writerPool;
   // concurrent thread of loading timeseries data
-  private static int concurrency = 5;
+  private static final int CONCURRENCY = 5;
 
   public static void main(String[] args)
       throws IoTDBConnectionException, StatementExecutionException, ExecutionException,
           InterruptedException {
 
-    ExecutorService executorService = Executors.newFixedThreadPool(2 * concurrency + 1);
+    ExecutorService executorService = Executors.newFixedThreadPool(CONCURRENCY + 1);
 
-    String path = "root";
+    String path = "root.**";
 
     if (args.length != 0) {
       path = args[0];
     }
 
-    readerPool = new SessionPool("127.0.0.1", 6667, "root", "root", concurrency);
-    writerPool = new SessionPool("127.0.0.1", 6668, "root", "root", concurrency);
+    readerPool = new SessionPool("127.0.0.1", 6667, "root", "root", CONCURRENCY);
+    writerPool = new SessionPool("127.0.0.1", 6668, "root", "root", CONCURRENCY);
 
-    SessionDataSetWrapper schemaDataSet =
-        readerPool.executeQueryStatement("count timeseries " + path);
-    DataIterator schemaIter = schemaDataSet.iterator();
+    SessionDataSetWrapper deviceDataSet = readerPool.executeQueryStatement("count devices " + path);
+    DataIterator deviceIter = deviceDataSet.iterator();
     int total;
-    if (schemaIter.next()) {
-      total = schemaIter.getInt(1);
-      System.out.println("Total timeseries: " + total);
+    if (deviceIter.next()) {
+      total = deviceIter.getInt(1);
+      System.out.println("Total devices: " + total);
     } else {
-      System.out.println("Can not get timeseries schema");
+      System.out.println("Can not get devices schema");
       System.exit(1);
     }
-    readerPool.closeResultSet(schemaDataSet);
+    readerPool.closeResultSet(deviceDataSet);
 
-    schemaDataSet = readerPool.executeQueryStatement("show timeseries " + path);
-    schemaIter = schemaDataSet.iterator();
+    deviceDataSet = readerPool.executeQueryStatement("show devices " + path);
+    deviceIter = deviceDataSet.iterator();
 
-    List<Future> futureList = new ArrayList<>();
+    List<Future<Void>> futureList = new ArrayList<>();
     int count = 0;
-    while (schemaIter.next()) {
+    while (deviceIter.next()) {
       count++;
-      Path currentPath = new Path(schemaIter.getString("Timeseries"), true);
-      Future future =
-          executorService.submit(
-              new LoadThread(
-                  count, currentPath, TSDataType.valueOf(schemaIter.getString("DataType"))));
+      Future<Void> future =
+          executorService.submit(new LoadThread(count, deviceIter.getString("Device")));
       futureList.add(future);
     }
-    readerPool.closeResultSet(schemaDataSet);
+    readerPool.closeResultSet(deviceDataSet);
 
-    for (Future future : futureList) {
+    for (Future<Void> future : futureList) {
       future.get();
     }
     executorService.shutdown();
@@ -109,57 +104,65 @@ public class DataMigrationExample {
   static class LoadThread implements Callable<Void> {
 
     String device;
-    String measurement;
-    Path series;
-    TSDataType dataType;
     Tablet tablet;
     int i;
 
-    public LoadThread(int i, Path series, TSDataType dataType) {
+    public LoadThread(int i, String device) {
       this.i = i;
-      this.device = series.getDevice();
-      this.measurement = series.getMeasurement();
-      this.dataType = dataType;
-      this.series = series;
+      this.device = device;
     }
 
     @Override
     public Void call() {
-
-      List<MeasurementSchema> schemaList = new ArrayList<>();
-      schemaList.add(new MeasurementSchema(measurement, dataType));
-      tablet = new Tablet(device, schemaList, 300000);
       SessionDataSetWrapper dataSet = null;
-
+      long startTime = System.currentTimeMillis();
       try {
-
-        dataSet =
-            readerPool.executeQueryStatement(
-                String.format("select %s from %s", measurement, device));
-
+        dataSet = readerPool.executeQueryStatement(String.format("select * from %s", device));
         DataIterator dataIter = dataSet.iterator();
+        List<String> columnNameList = dataIter.getColumnNameList();
+        List<String> columnTypeList = dataIter.getColumnTypeList();
+        List<MeasurementSchema> schemaList = new ArrayList<>();
+        for (int j = 1; j < columnNameList.size(); j++) {
+          PartialPath currentPath = new PartialPath(columnNameList.get(j));
+          schemaList.add(
+              new MeasurementSchema(
+                  currentPath.getMeasurement(), TSDataType.valueOf(columnTypeList.get(j))));
+        }
+        tablet = new Tablet(device, schemaList, 300000);
         while (dataIter.next()) {
           int row = tablet.rowSize++;
           tablet.timestamps[row] = dataIter.getLong(1);
-          switch (dataType) {
-            case BOOLEAN:
-              ((boolean[]) tablet.values[0])[row] = dataIter.getBoolean(2);
-              break;
-            case INT32:
-              ((int[]) tablet.values[0])[row] = dataIter.getInt(2);
-              break;
-            case INT64:
-              ((long[]) tablet.values[0])[row] = dataIter.getLong(2);
-              break;
-            case FLOAT:
-              ((float[]) tablet.values[0])[row] = dataIter.getFloat(2);
-              break;
-            case DOUBLE:
-              ((double[]) tablet.values[0])[row] = dataIter.getDouble(2);
-              break;
-            case TEXT:
-              ((Binary[]) tablet.values[0])[row] = new Binary(dataIter.getString(2));
-              break;
+          for (int j = 0; j < schemaList.size(); ++j) {
+            if (dataIter.isNull(j + 2)) {
+              tablet.addValue(schemaList.get(j).getMeasurementId(), row, null);
+              continue;
+            }
+            switch (schemaList.get(j).getType()) {
+              case BOOLEAN:
+                tablet.addValue(
+                    schemaList.get(j).getMeasurementId(), row, dataIter.getBoolean(j + 2));
+                break;
+              case INT32:
+                tablet.addValue(schemaList.get(j).getMeasurementId(), row, dataIter.getInt(j + 2));
+                break;
+              case INT64:
+                tablet.addValue(schemaList.get(j).getMeasurementId(), row, dataIter.getLong(j + 2));
+                break;
+              case FLOAT:
+                tablet.addValue(
+                    schemaList.get(j).getMeasurementId(), row, dataIter.getFloat(j + 2));
+                break;
+              case DOUBLE:
+                tablet.addValue(
+                    schemaList.get(j).getMeasurementId(), row, dataIter.getDouble(j + 2));
+                break;
+              case TEXT:
+                tablet.addValue(
+                    schemaList.get(j).getMeasurementId(), row, dataIter.getString(j + 2));
+                break;
+              default:
+                System.out.println("Migration of this type of data is not supported");
+            }
           }
           if (tablet.rowSize == tablet.getMaxRowNumber()) {
             writerPool.insertTablet(tablet, true);
@@ -173,13 +176,18 @@ public class DataMigrationExample {
 
       } catch (Exception e) {
         System.out.println(
-            "Loading the " + i + "-th timeseries: " + series + " failed " + e.getMessage());
+            "Loading the " + i + "-th device: " + device + " failed " + e.getMessage());
         return null;
       } finally {
-        readerPool.closeResultSet(dataSet);
+        if (dataSet != null) {
+          readerPool.closeResultSet(dataSet);
+        }
+        long endTime = System.currentTimeMillis();
+        long totalTime = endTime - startTime;
+        System.out.println("migrate device :" + device + " using " + totalTime + " ms");
       }
 
-      System.out.println("Loading the " + i + "-th timeseries: " + series + " success");
+      System.out.println("Loading the " + i + "-th device: " + device + " success");
       return null;
     }
   }
diff --git a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionDataSet.java b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionDataSet.java
index b843b4aa656..b5dab86b1c7 100644
--- a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionDataSet.java
+++ b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionDataSet.java
@@ -314,5 +314,13 @@ public class SessionDataSet implements AutoCloseable {
     public int findColumn(String columnName) {
       return ioTDBRpcDataSet.findColumn(columnName);
     }
+
+    public List<String> getColumnNameList() {
+      return ioTDBRpcDataSet.columnNameList;
+    }
+
+    public List<String> getColumnTypeList() {
+      return ioTDBRpcDataSet.columnTypeList;
+    }
   }
 }