You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/11/12 09:25:47 UTC
[iotdb] 01/01: [IOTDB-2000] DataMigrationExample should migrate
data device by device
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch jira-2000
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 302241c1cca1b763a85c14510dc3ed1edf1269ef
Author: HTHou <hh...@outlook.com>
AuthorDate: Fri Nov 12 17:22:24 2021 +0800
[IOTDB-2000] DataMigrationExample should migrate data device by device
---
.../org/apache/iotdb/DataMigrationExample.java | 107 ++++++++++++++-------
1 file changed, 70 insertions(+), 37 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java b/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java
index 2959f1e..ee48154 100644
--- a/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/DataMigrationExample.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
@@ -87,14 +88,30 @@ public class DataMigrationExample {
List<Future> futureList = new ArrayList<>();
int count = 0;
+ int seriesNumInOneTask = 0;
+ String currentDevice = "";
+ List<String> measurementsInCurrentDevice = new ArrayList<>();
+ List<TSDataType> dataTypesInCurrentDevice = new ArrayList<>();
while (schemaIter.next()) {
- count++;
Path currentPath = new Path(schemaIter.getString("timeseries"), true);
- Future future =
- executorService.submit(
- new LoadThread(
- count, currentPath, TSDataType.valueOf(schemaIter.getString("dataType"))));
- futureList.add(future);
+ if (!currentDevice.equals(currentPath.getDevice())) {
+ if (!currentDevice.equals("") || seriesNumInOneTask > 300) {
+ count++;
+ Future future =
+ executorService.submit(
+ new LoadThread(
+ count, currentDevice, measurementsInCurrentDevice, dataTypesInCurrentDevice));
+ futureList.add(future);
+ seriesNumInOneTask = 0;
+ }
+ seriesNumInOneTask++;
+ currentDevice = currentPath.getDevice();
+ measurementsInCurrentDevice = new ArrayList<>();
+ dataTypesInCurrentDevice = new ArrayList<>();
+ }
+ measurementsInCurrentDevice.add(currentPath.getMeasurement());
+ dataTypesInCurrentDevice.add(TSDataType.valueOf(schemaIter.getString("dataType")));
+
}
readerPool.closeResultSet(schemaDataSet);
@@ -110,57 +127,73 @@ public class DataMigrationExample {
static class LoadThread implements Callable<Void> {
String device;
- String measurement;
- Path series;
- TSDataType dataType;
+ List<String> measurements;
+ List<TSDataType> dataTypes;
Tablet tablet;
int i;
- public LoadThread(int i, Path series, TSDataType dataType) {
+ public LoadThread(int i, String device, List<String> measurements, List<TSDataType> dataTypes) {
this.i = i;
- this.device = series.getDevice();
- this.measurement = series.getMeasurement();
- this.dataType = dataType;
- this.series = series;
+ this.device = device;
+ this.measurements = measurements;
+ this.dataTypes = dataTypes;
}
@Override
public Void call() {
List<IMeasurementSchema> schemaList = new ArrayList<>();
- schemaList.add(new UnaryMeasurementSchema(measurement, dataType));
+ StringBuffer measurementsString = new StringBuffer();
+ for (int i = 0; i < measurements.size(); i++) {
+ schemaList.add(new UnaryMeasurementSchema(measurements.get(i), dataTypes.get(i)));
+ measurementsString.append(measurements.get(i));
+ if (i != measurements.size() - 1) {
+ measurementsString.append(", ");
+ }
+ }
tablet = new Tablet(device, schemaList, 300000);
+ tablet.bitMaps = new BitMap[schemaList.size()];
+ for (int i = 0; i < measurements.size(); i++) {
+ tablet.bitMaps[i] = new BitMap(tablet.getMaxRowNumber());
+ }
SessionDataSetWrapper dataSet = null;
try {
dataSet =
readerPool.executeQueryStatement(
- String.format("select %s from %s", measurement, device));
+ String.format("select %s from %s", measurementsString.toString(), device));
DataIterator dataIter = dataSet.iterator();
while (dataIter.next()) {
int row = tablet.rowSize++;
tablet.timestamps[row] = dataIter.getLong(1);
- switch (dataType) {
- case BOOLEAN:
- ((boolean[]) tablet.values[0])[row] = dataIter.getBoolean(2);
- break;
- case INT32:
- ((int[]) tablet.values[0])[row] = dataIter.getInt(2);
- break;
- case INT64:
- ((long[]) tablet.values[0])[row] = dataIter.getLong(2);
- break;
- case FLOAT:
- ((float[]) tablet.values[0])[row] = dataIter.getFloat(2);
- break;
- case DOUBLE:
- ((double[]) tablet.values[0])[row] = dataIter.getDouble(2);
- break;
- case TEXT:
- ((Binary[]) tablet.values[0])[row] = new Binary(dataIter.getString(2));
- break;
+ for (int i = 0; i < measurements.size(); i++) {
+ if (dataIter.isNull(i + 2)) {
+ tablet.bitMaps[i].mark((int) row);
+ }
+ switch (dataTypes.get(i)) {
+ case BOOLEAN:
+ ((boolean[]) tablet.values[i])[row] = dataIter.getBoolean(i + 2);
+ break;
+ case INT32:
+ ((int[]) tablet.values[i])[row] = dataIter.getInt(i + 2);
+ break;
+ case INT64:
+ ((long[]) tablet.values[i])[row] = dataIter.getLong(i + 2);
+ break;
+ case FLOAT:
+ ((float[]) tablet.values[i])[row] = dataIter.getFloat(i + 2);
+ break;
+ case DOUBLE:
+ ((double[]) tablet.values[i])[row] = dataIter.getDouble(i + 2);
+ break;
+ case TEXT:
+ ((Binary[]) tablet.values[i])[row] = new Binary(dataIter.getString(i + 2));
+ break;
+ default:
+ break;
+ }
}
if (tablet.rowSize == tablet.getMaxRowNumber()) {
writerPool.insertTablet(tablet, true);
@@ -174,13 +207,13 @@ public class DataMigrationExample {
} catch (Exception e) {
System.out.println(
- "Loading the " + i + "-th timeseries: " + series + " failed " + e.getMessage());
+ "Loading the " + i + "-th devices: " + device + " failed " + e.getMessage());
return null;
} finally {
readerPool.closeResultSet(dataSet);
}
- System.out.println("Loading the " + i + "-th timeseries: " + series + " success");
+ System.out.println("Loading the " + i + "-th devices: " + device + " success");
return null;
}
}