You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/07/19 03:56:06 UTC
[iotdb] branch select-into updated: InsertTabletPlanGenerator
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch select-into
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/select-into by this push:
new ed4628f InsertTabletPlanGenerator
ed4628f is described below
commit ed4628f8c274f7ed36c95b2f4788139481c785d8
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon Jul 19 11:55:32 2021 +0800
InsertTabletPlanGenerator
---
.../transporter/InsertTabletPlanGenerator.java | 148 ++++++++++++++++++---
.../engine/transporter/SelectIntoTransporter.java | 7 +-
.../java/org/apache/iotdb/tsfile/utils/BitMap.java | 5 +
3 files changed, 138 insertions(+), 22 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/transporter/InsertTabletPlanGenerator.java b/server/src/main/java/org/apache/iotdb/db/engine/transporter/InsertTabletPlanGenerator.java
index c631b10..b94112c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/transporter/InsertTabletPlanGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/transporter/InsertTabletPlanGenerator.java
@@ -19,43 +19,155 @@
package org.apache.iotdb.db.engine.transporter;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BitMap;
import java.util.ArrayList;
import java.util.List;
public class InsertTabletPlanGenerator {
- private final QueryDataSet queryDataSet;
- private final List<PartialPath> intoPaths;
+ private final String intoDevice;
+ private final List<Integer> intoMeasurementIndexes;
+ private final List<String> intoMeasurementIds;
- private final String device;
- private final List<Integer> measurementIdIndexes;
+ private final int fetchSize;
- private Tablet tablet;
+ private InsertTabletPlan insertTabletPlan;
+ private int rowCount;
+ private long[] times;
+ private Object[] columns;
+ private BitMap[] bitMaps;
- public InsertTabletPlanGenerator(
- QueryDataSet queryDataSet, List<PartialPath> intoPaths, String device) {
- this.queryDataSet = queryDataSet;
- this.intoPaths = intoPaths;
+ public InsertTabletPlanGenerator(String intoDevice, int fetchSize) {
+ this.intoDevice = intoDevice;
+ // column index of insertTabletPlan -> column index of queryDataSet (intoPaths)
+ intoMeasurementIndexes = new ArrayList<>();
+ intoMeasurementIds = new ArrayList<>();
- this.device = device;
- measurementIdIndexes = new ArrayList<>();
+ this.fetchSize = fetchSize;
}
- public void addMeasurementIdIndex(int measurementIdIndex) {
- measurementIdIndexes.add(measurementIdIndex);
+ public void addMeasurementIdIndex(List<PartialPath> intoPaths, int intoMeasurementIndex) {
+ intoMeasurementIndexes.add(intoMeasurementIndex);
+ intoMeasurementIds.add(intoPaths.get(intoMeasurementIndex).getMeasurement());
}
- public void constructNewTablet() {}
+ public void constructNewTablet() throws IllegalPathException {
+ insertTabletPlan = new InsertTabletPlan(new PartialPath(intoDevice), intoMeasurementIds);
+ insertTabletPlan.setAligned(false);
- public void collectRowRecord(RowRecord rowRecord) {}
+ rowCount = 0;
+ insertTabletPlan.setRowCount(rowCount);
+
+ times = new long[fetchSize];
+ insertTabletPlan.setTimes(times);
+
+ columns = new Object[intoMeasurementIds.size()];
+ insertTabletPlan.setColumns(columns);
+
+ bitMaps = new BitMap[intoMeasurementIds.size()];
+ for (int i = 0; i < bitMaps.length; ++i) {
+ bitMaps[i] = new BitMap(fetchSize);
+ bitMaps[i].markAll();
+ }
+ insertTabletPlan.setBitMaps(bitMaps);
+ }
+
+ public void collectRowRecord(RowRecord rowRecord) {
+ if (rowCount == 0) {
+ setDataTypes(rowRecord);
+ initColumns();
+ }
+
+ times[rowCount] = rowRecord.getTimestamp();
+
+ for (int i = 0; i < columns.length; ++i) {
+ Field field = rowRecord.getFields().get(intoMeasurementIndexes.get(i));
+
+ // if the field is NULL
+ if (field == null || field.getDataType() == null) {
+ // bit in bitMaps are marked as 1 (NULL) by default
+ continue;
+ }
+
+ bitMaps[i].unmark(rowCount);
+ switch (field.getDataType()) {
+ case INT32:
+ ((int[]) columns[i])[rowCount] = field.getIntV();
+ break;
+ case INT64:
+ ((long[]) columns[i])[rowCount] = field.getLongV();
+ break;
+ case FLOAT:
+ ((float[]) columns[i])[rowCount] = field.getFloatV();
+ break;
+ case DOUBLE:
+ ((double[]) columns[i])[rowCount] = field.getDoubleV();
+ break;
+ case BOOLEAN:
+ ((boolean[]) columns[i])[rowCount] = field.getBoolV();
+ break;
+ case TEXT:
+ ((Binary[]) columns[i])[rowCount] = field.getBinaryV();
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format(
+ "data type %s is not supported when convert data at client",
+ field.getDataType()));
+ }
+ }
+
+ ++rowCount;
+ }
+
+ private void setDataTypes(RowRecord rowRecord) {
+ List<TSDataType> dataTypes = new ArrayList<>();
+ for (Field field : rowRecord.getFields()) {
+ dataTypes.add(field.getDataType());
+ }
+ insertTabletPlan.setDataTypes(dataTypes.toArray(new TSDataType[0]));
+ }
+
+ private void initColumns() {
+ final TSDataType[] dataTypes = insertTabletPlan.getDataTypes();
+ for (int i = 0; i < dataTypes.length; ++i) {
+ switch (dataTypes[i]) {
+ case BOOLEAN:
+ columns[i] = new boolean[fetchSize];
+ break;
+ case INT32:
+ columns[i] = new int[fetchSize];
+ break;
+ case INT64:
+ columns[i] = new long[fetchSize];
+ break;
+ case FLOAT:
+ columns[i] = new float[fetchSize];
+ break;
+ case DOUBLE:
+ columns[i] = new double[fetchSize];
+ break;
+ case TEXT:
+ columns[i] = new Binary[fetchSize];
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format(
+ "data type %s is not supported when convert data at client", dataTypes[i]));
+ }
+ }
+ }
public InsertTabletPlan getInsertTabletPlan() {
- throw new UnsupportedOperationException();
+ return insertTabletPlan;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/transporter/SelectIntoTransporter.java b/server/src/main/java/org/apache/iotdb/db/engine/transporter/SelectIntoTransporter.java
index f0d4718..5b5e42c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/transporter/SelectIntoTransporter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/transporter/SelectIntoTransporter.java
@@ -92,16 +92,15 @@ public class SelectIntoTransporter {
for (int i = 0, intoPathsSize = intoPaths.size(); i < intoPathsSize; i++) {
String device = intoPaths.get(i).getDevice();
if (!deviceTabletGeneratorMap.containsKey(device)) {
- deviceTabletGeneratorMap.put(
- device, new InsertTabletPlanGenerator(queryDataSet, intoPaths, device));
+ deviceTabletGeneratorMap.put(device, new InsertTabletPlanGenerator(device, fetchSize));
}
- deviceTabletGeneratorMap.get(device).addMeasurementIdIndex(i);
+ deviceTabletGeneratorMap.get(device).addMeasurementIdIndex(intoPaths, i);
}
insertTabletPlanGenerators =
deviceTabletGeneratorMap.values().toArray(new InsertTabletPlanGenerator[0]);
}
- private void doTransport() throws IOException {
+ private void doTransport() throws IOException, IllegalPathException {
while (queryDataSet.hasNext()) {
List<InsertTabletPlan> insertTabletPlanList = new ArrayList<>();
for (InsertTabletPlanGenerator insertTabletPlanGenerator : insertTabletPlanGenerators) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BitMap.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BitMap.java
index 91affdb..78fa5b1 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BitMap.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BitMap.java
@@ -64,6 +64,11 @@ public class BitMap {
return (bits[position / Byte.SIZE] & BIT_UTIL[position % Byte.SIZE]) != 0;
}
+ /** mark as 1 at all positions. */
+ public void markAll() {
+ Arrays.fill(bits, (byte) 0XFF);
+ }
+
/** mark as 1 at the given bit position. */
public void mark(int position) {
bits[position / Byte.SIZE] |= BIT_UTIL[position % Byte.SIZE];