You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/07/19 03:56:06 UTC

[iotdb] branch select-into updated: InsertTabletPlanGenerator

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch select-into
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/select-into by this push:
     new ed4628f  InsertTabletPlanGenerator
ed4628f is described below

commit ed4628f8c274f7ed36c95b2f4788139481c785d8
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Mon Jul 19 11:55:32 2021 +0800

    InsertTabletPlanGenerator
---
 .../transporter/InsertTabletPlanGenerator.java     | 148 ++++++++++++++++++---
 .../engine/transporter/SelectIntoTransporter.java  |   7 +-
 .../java/org/apache/iotdb/tsfile/utils/BitMap.java |   5 +
 3 files changed, 138 insertions(+), 22 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/transporter/InsertTabletPlanGenerator.java b/server/src/main/java/org/apache/iotdb/db/engine/transporter/InsertTabletPlanGenerator.java
index c631b10..b94112c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/transporter/InsertTabletPlanGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/transporter/InsertTabletPlanGenerator.java
@@ -19,43 +19,155 @@
 
 package org.apache.iotdb.db.engine.transporter;
 
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Field;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BitMap;
 
 import java.util.ArrayList;
 import java.util.List;
 
 public class InsertTabletPlanGenerator {
 
-  private final QueryDataSet queryDataSet;
-  private final List<PartialPath> intoPaths;
+  private final String intoDevice;
+  private final List<Integer> intoMeasurementIndexes;
+  private final List<String> intoMeasurementIds;
 
-  private final String device;
-  private final List<Integer> measurementIdIndexes;
+  private final int fetchSize;
 
-  private Tablet tablet;
+  private InsertTabletPlan insertTabletPlan;
+  private int rowCount;
+  private long[] times;
+  private Object[] columns;
+  private BitMap[] bitMaps;
 
-  public InsertTabletPlanGenerator(
-      QueryDataSet queryDataSet, List<PartialPath> intoPaths, String device) {
-    this.queryDataSet = queryDataSet;
-    this.intoPaths = intoPaths;
+  public InsertTabletPlanGenerator(String intoDevice, int fetchSize) {
+    this.intoDevice = intoDevice;
+    // column index of insertTabletPlan -> column index of queryDataSet (intoPaths)
+    intoMeasurementIndexes = new ArrayList<>();
+    intoMeasurementIds = new ArrayList<>();
 
-    this.device = device;
-    measurementIdIndexes = new ArrayList<>();
+    this.fetchSize = fetchSize;
   }
 
-  public void addMeasurementIdIndex(int measurementIdIndex) {
-    measurementIdIndexes.add(measurementIdIndex);
+  public void addMeasurementIdIndex(List<PartialPath> intoPaths, int intoMeasurementIndex) {
+    intoMeasurementIndexes.add(intoMeasurementIndex);
+    intoMeasurementIds.add(intoPaths.get(intoMeasurementIndex).getMeasurement());
   }
 
-  public void constructNewTablet() {}
+  public void constructNewTablet() throws IllegalPathException {
+    insertTabletPlan = new InsertTabletPlan(new PartialPath(intoDevice), intoMeasurementIds);
+    insertTabletPlan.setAligned(false);
 
-  public void collectRowRecord(RowRecord rowRecord) {}
+    rowCount = 0;
+    insertTabletPlan.setRowCount(rowCount);
+
+    times = new long[fetchSize];
+    insertTabletPlan.setTimes(times);
+
+    columns = new Object[intoMeasurementIds.size()];
+    insertTabletPlan.setColumns(columns);
+
+    bitMaps = new BitMap[intoMeasurementIds.size()];
+    for (int i = 0; i < bitMaps.length; ++i) {
+      bitMaps[i] = new BitMap(fetchSize);
+      bitMaps[i].markAll();
+    }
+    insertTabletPlan.setBitMaps(bitMaps);
+  }
+
+  public void collectRowRecord(RowRecord rowRecord) {
+    if (rowCount == 0) {
+      setDataTypes(rowRecord);
+      initColumns();
+    }
+
+    times[rowCount] = rowRecord.getTimestamp();
+
+    for (int i = 0; i < columns.length; ++i) {
+      Field field = rowRecord.getFields().get(intoMeasurementIndexes.get(i));
+
+      // if the field is NULL
+      if (field == null || field.getDataType() == null) {
+        // bit in bitMaps are marked as 1 (NULL) by default
+        continue;
+      }
+
+      bitMaps[i].unmark(rowCount);
+      switch (field.getDataType()) {
+        case INT32:
+          ((int[]) columns[i])[rowCount] = field.getIntV();
+          break;
+        case INT64:
+          ((long[]) columns[i])[rowCount] = field.getLongV();
+          break;
+        case FLOAT:
+          ((float[]) columns[i])[rowCount] = field.getFloatV();
+          break;
+        case DOUBLE:
+          ((double[]) columns[i])[rowCount] = field.getDoubleV();
+          break;
+        case BOOLEAN:
+          ((boolean[]) columns[i])[rowCount] = field.getBoolV();
+          break;
+        case TEXT:
+          ((Binary[]) columns[i])[rowCount] = field.getBinaryV();
+          break;
+        default:
+          throw new UnSupportedDataTypeException(
+              String.format(
+                  "data type %s is not supported when convert data at client",
+                  field.getDataType()));
+      }
+    }
+
+    ++rowCount;
+  }
+
+  private void setDataTypes(RowRecord rowRecord) {
+    List<TSDataType> dataTypes = new ArrayList<>();
+    for (Field field : rowRecord.getFields()) {
+      dataTypes.add(field.getDataType());
+    }
+    insertTabletPlan.setDataTypes(dataTypes.toArray(new TSDataType[0]));
+  }
+
+  private void initColumns() {
+    final TSDataType[] dataTypes = insertTabletPlan.getDataTypes();
+    for (int i = 0; i < dataTypes.length; ++i) {
+      switch (dataTypes[i]) {
+        case BOOLEAN:
+          columns[i] = new boolean[fetchSize];
+          break;
+        case INT32:
+          columns[i] = new int[fetchSize];
+          break;
+        case INT64:
+          columns[i] = new long[fetchSize];
+          break;
+        case FLOAT:
+          columns[i] = new float[fetchSize];
+          break;
+        case DOUBLE:
+          columns[i] = new double[fetchSize];
+          break;
+        case TEXT:
+          columns[i] = new Binary[fetchSize];
+          break;
+        default:
+          throw new UnSupportedDataTypeException(
+              String.format(
+                  "data type %s is not supported when convert data at client", dataTypes[i]));
+      }
+    }
+  }
 
   public InsertTabletPlan getInsertTabletPlan() {
-    throw new UnsupportedOperationException();
+    return insertTabletPlan;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/transporter/SelectIntoTransporter.java b/server/src/main/java/org/apache/iotdb/db/engine/transporter/SelectIntoTransporter.java
index f0d4718..5b5e42c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/transporter/SelectIntoTransporter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/transporter/SelectIntoTransporter.java
@@ -92,16 +92,15 @@ public class SelectIntoTransporter {
     for (int i = 0, intoPathsSize = intoPaths.size(); i < intoPathsSize; i++) {
       String device = intoPaths.get(i).getDevice();
       if (!deviceTabletGeneratorMap.containsKey(device)) {
-        deviceTabletGeneratorMap.put(
-            device, new InsertTabletPlanGenerator(queryDataSet, intoPaths, device));
+        deviceTabletGeneratorMap.put(device, new InsertTabletPlanGenerator(device, fetchSize));
       }
-      deviceTabletGeneratorMap.get(device).addMeasurementIdIndex(i);
+      deviceTabletGeneratorMap.get(device).addMeasurementIdIndex(intoPaths, i);
     }
     insertTabletPlanGenerators =
         deviceTabletGeneratorMap.values().toArray(new InsertTabletPlanGenerator[0]);
   }
 
-  private void doTransport() throws IOException {
+  private void doTransport() throws IOException, IllegalPathException {
     while (queryDataSet.hasNext()) {
       List<InsertTabletPlan> insertTabletPlanList = new ArrayList<>();
       for (InsertTabletPlanGenerator insertTabletPlanGenerator : insertTabletPlanGenerators) {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BitMap.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BitMap.java
index 91affdb..78fa5b1 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BitMap.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/BitMap.java
@@ -64,6 +64,11 @@ public class BitMap {
     return (bits[position / Byte.SIZE] & BIT_UTIL[position % Byte.SIZE]) != 0;
   }
 
+  /** mark as 1 at all positions. */
+  public void markAll() {
+    Arrays.fill(bits, (byte) 0XFF);
+  }
+
   /** mark as 1 at the given bit position. */
   public void mark(int position) {
     bits[position / Byte.SIZE] |= BIT_UTIL[position % Byte.SIZE];