You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/05/25 12:25:08 UTC
[iotdb] branch master updated: [IOTDB-5915] Adjust the granularity of data migration example from timeseries to device (#9916)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ea3badc24e8 [IOTDB-5915] Adjust the granularity of data migration example from timeseries to device (#9916)
ea3badc24e8 is described below
commit ea3badc24e858de8372c96b356e7e89632a227f9
Author: YuFengLiu <38...@users.noreply.github.com>
AuthorDate: Thu May 25 20:25:02 2023 +0800
[IOTDB-5915] Adjust the granularity of data migration example from timeseries to device (#9916)
---
example/session/pom.xml | 6 +
.../org/apache/iotdb/DataMigrationExample.java | 142 +++++++++++----------
.../org/apache/iotdb/isession/SessionDataSet.java | 8 ++
3 files changed, 89 insertions(+), 67 deletions(-)
diff --git a/example/session/pom.xml b/example/session/pom.xml
index 6bf75ac6d61..5c8280d92ad 100644
--- a/example/session/pom.xml
+++ b/example/session/pom.xml
@@ -39,5 +39,11 @@
<artifactId>iotdb-session</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>node-commons</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java b/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java
index a3b8bf44ff1..ed503e6068e 100644
--- a/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java
@@ -18,14 +18,13 @@
*/
package org.apache.iotdb;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.isession.SessionDataSet.DataIterator;
import org.apache.iotdb.isession.pool.SessionDataSetWrapper;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
@@ -39,7 +38,7 @@ import java.util.concurrent.Future;
/**
* Migrate all data belongs to a path from one IoTDB to another IoTDB Each thread migrate one
- * series, the concurrent thread can be configured by concurrency
+ * series, the concurrent thread can be configured by CONCURRENCY
*
* <p>This example is migrating all timeseries from a local IoTDB with 6667 port to a local IoTDB
* with 6668 port
@@ -51,53 +50,49 @@ public class DataMigrationExample {
// used to write data into the destination IoTDB
private static SessionPool writerPool;
// concurrent thread of loading timeseries data
- private static int concurrency = 5;
+ private static final int CONCURRENCY = 5;
public static void main(String[] args)
throws IoTDBConnectionException, StatementExecutionException, ExecutionException,
InterruptedException {
- ExecutorService executorService = Executors.newFixedThreadPool(2 * concurrency + 1);
+ ExecutorService executorService = Executors.newFixedThreadPool(CONCURRENCY + 1);
- String path = "root";
+ String path = "root.**";
if (args.length != 0) {
path = args[0];
}
- readerPool = new SessionPool("127.0.0.1", 6667, "root", "root", concurrency);
- writerPool = new SessionPool("127.0.0.1", 6668, "root", "root", concurrency);
+ readerPool = new SessionPool("127.0.0.1", 6667, "root", "root", CONCURRENCY);
+ writerPool = new SessionPool("127.0.0.1", 6668, "root", "root", CONCURRENCY);
- SessionDataSetWrapper schemaDataSet =
- readerPool.executeQueryStatement("count timeseries " + path);
- DataIterator schemaIter = schemaDataSet.iterator();
+ SessionDataSetWrapper deviceDataSet = readerPool.executeQueryStatement("count devices " + path);
+ DataIterator deviceIter = deviceDataSet.iterator();
int total;
- if (schemaIter.next()) {
- total = schemaIter.getInt(1);
- System.out.println("Total timeseries: " + total);
+ if (deviceIter.next()) {
+ total = deviceIter.getInt(1);
+ System.out.println("Total devices: " + total);
} else {
- System.out.println("Can not get timeseries schema");
+ System.out.println("Can not get devices schema");
System.exit(1);
}
- readerPool.closeResultSet(schemaDataSet);
+ readerPool.closeResultSet(deviceDataSet);
- schemaDataSet = readerPool.executeQueryStatement("show timeseries " + path);
- schemaIter = schemaDataSet.iterator();
+ deviceDataSet = readerPool.executeQueryStatement("show devices " + path);
+ deviceIter = deviceDataSet.iterator();
- List<Future> futureList = new ArrayList<>();
+ List<Future<Void>> futureList = new ArrayList<>();
int count = 0;
- while (schemaIter.next()) {
+ while (deviceIter.next()) {
count++;
- Path currentPath = new Path(schemaIter.getString("Timeseries"), true);
- Future future =
- executorService.submit(
- new LoadThread(
- count, currentPath, TSDataType.valueOf(schemaIter.getString("DataType"))));
+ Future<Void> future =
+ executorService.submit(new LoadThread(count, deviceIter.getString("Device")));
futureList.add(future);
}
- readerPool.closeResultSet(schemaDataSet);
+ readerPool.closeResultSet(deviceDataSet);
- for (Future future : futureList) {
+ for (Future<Void> future : futureList) {
future.get();
}
executorService.shutdown();
@@ -109,57 +104,65 @@ public class DataMigrationExample {
static class LoadThread implements Callable<Void> {
String device;
- String measurement;
- Path series;
- TSDataType dataType;
Tablet tablet;
int i;
- public LoadThread(int i, Path series, TSDataType dataType) {
+ public LoadThread(int i, String device) {
this.i = i;
- this.device = series.getDevice();
- this.measurement = series.getMeasurement();
- this.dataType = dataType;
- this.series = series;
+ this.device = device;
}
@Override
public Void call() {
-
- List<MeasurementSchema> schemaList = new ArrayList<>();
- schemaList.add(new MeasurementSchema(measurement, dataType));
- tablet = new Tablet(device, schemaList, 300000);
SessionDataSetWrapper dataSet = null;
-
+ long startTime = System.currentTimeMillis();
try {
-
- dataSet =
- readerPool.executeQueryStatement(
- String.format("select %s from %s", measurement, device));
-
+ dataSet = readerPool.executeQueryStatement(String.format("select * from %s", device));
DataIterator dataIter = dataSet.iterator();
+ List<String> columnNameList = dataIter.getColumnNameList();
+ List<String> columnTypeList = dataIter.getColumnTypeList();
+ List<MeasurementSchema> schemaList = new ArrayList<>();
+ for (int j = 1; j < columnNameList.size(); j++) {
+ PartialPath currentPath = new PartialPath(columnNameList.get(j));
+ schemaList.add(
+ new MeasurementSchema(
+ currentPath.getMeasurement(), TSDataType.valueOf(columnTypeList.get(j))));
+ }
+ tablet = new Tablet(device, schemaList, 300000);
while (dataIter.next()) {
int row = tablet.rowSize++;
tablet.timestamps[row] = dataIter.getLong(1);
- switch (dataType) {
- case BOOLEAN:
- ((boolean[]) tablet.values[0])[row] = dataIter.getBoolean(2);
- break;
- case INT32:
- ((int[]) tablet.values[0])[row] = dataIter.getInt(2);
- break;
- case INT64:
- ((long[]) tablet.values[0])[row] = dataIter.getLong(2);
- break;
- case FLOAT:
- ((float[]) tablet.values[0])[row] = dataIter.getFloat(2);
- break;
- case DOUBLE:
- ((double[]) tablet.values[0])[row] = dataIter.getDouble(2);
- break;
- case TEXT:
- ((Binary[]) tablet.values[0])[row] = new Binary(dataIter.getString(2));
- break;
+ for (int j = 0; j < schemaList.size(); ++j) {
+ if (dataIter.isNull(j + 2)) {
+ tablet.addValue(schemaList.get(j).getMeasurementId(), row, null);
+ continue;
+ }
+ switch (schemaList.get(j).getType()) {
+ case BOOLEAN:
+ tablet.addValue(
+ schemaList.get(j).getMeasurementId(), row, dataIter.getBoolean(j + 2));
+ break;
+ case INT32:
+ tablet.addValue(schemaList.get(j).getMeasurementId(), row, dataIter.getInt(j + 2));
+ break;
+ case INT64:
+ tablet.addValue(schemaList.get(j).getMeasurementId(), row, dataIter.getLong(j + 2));
+ break;
+ case FLOAT:
+ tablet.addValue(
+ schemaList.get(j).getMeasurementId(), row, dataIter.getFloat(j + 2));
+ break;
+ case DOUBLE:
+ tablet.addValue(
+ schemaList.get(j).getMeasurementId(), row, dataIter.getDouble(j + 2));
+ break;
+ case TEXT:
+ tablet.addValue(
+ schemaList.get(j).getMeasurementId(), row, dataIter.getString(j + 2));
+ break;
+ default:
+ System.out.println("Migration of this type of data is not supported");
+ }
}
if (tablet.rowSize == tablet.getMaxRowNumber()) {
writerPool.insertTablet(tablet, true);
@@ -173,13 +176,18 @@ public class DataMigrationExample {
} catch (Exception e) {
System.out.println(
- "Loading the " + i + "-th timeseries: " + series + " failed " + e.getMessage());
+ "Loading the " + i + "-th device: " + device + " failed " + e.getMessage());
return null;
} finally {
- readerPool.closeResultSet(dataSet);
+ if (dataSet != null) {
+ readerPool.closeResultSet(dataSet);
+ }
+ long endTime = System.currentTimeMillis();
+ long totalTime = endTime - startTime;
+ System.out.println("migrate device :" + device + " using " + totalTime + " ms");
}
- System.out.println("Loading the " + i + "-th timeseries: " + series + " success");
+ System.out.println("Loading the " + i + "-th device: " + device + " success");
return null;
}
}
diff --git a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionDataSet.java b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionDataSet.java
index b843b4aa656..b5dab86b1c7 100644
--- a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionDataSet.java
+++ b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionDataSet.java
@@ -314,5 +314,13 @@ public class SessionDataSet implements AutoCloseable {
public int findColumn(String columnName) {
return ioTDBRpcDataSet.findColumn(columnName);
}
+
+ public List<String> getColumnNameList() {
+ return ioTDBRpcDataSet.columnNameList;
+ }
+
+ public List<String> getColumnTypeList() {
+ return ioTDBRpcDataSet.columnTypeList;
+ }
}
}