You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2021/03/14 09:01:35 UTC

[iotdb] 03/03: [IOTDB-1228] Refactor PlanExecutor.insertTablet method to support aligned timeseries

This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch insertTablet
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f39bc420b766758e842dbead0bf1584142f2793c
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Sun Mar 14 16:57:54 2021 +0800

    [IOTDB-1228] Refactor PlanExecutor.insertTablet method to support aligned timeseries
---
 .../org/apache/iotdb/db/metadata/MManager.java     | 123 ++++++++++++++++-----
 .../org/apache/iotdb/db/metadata/MetaUtils.java    |   3 +-
 .../iotdb/db/qp/physical/InsertTabletPlanTest.java |  51 +++++++++
 .../write/schema/VectorMeasurementSchema.java      |   5 +-
 4 files changed, 150 insertions(+), 32 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 2bca513..6ddafbf 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -92,7 +92,6 @@ import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
 import org.apache.iotdb.db.utils.RandomDeleteCache;
 import org.apache.iotdb.db.utils.SchemaUtils;
 import org.apache.iotdb.db.utils.TestOnly;
-import org.apache.iotdb.db.utils.TypeInferenceUtils;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.exception.cache.CacheException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -107,6 +106,34 @@ import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
+
 /**
  * This class takes the responsibility of serialization of all the metadata info and persistent it
  * into files. This class contains all the interfaces to modify the metadata for delta system. All
@@ -1928,9 +1955,10 @@ public class MManager {
     Pair<MNode, Template> deviceMNode = getDeviceNodeWithAutoCreate(deviceId);
 
     // 2. get schema of each measurement
-    // if do not has measurement
+    // if do not have measurement
     MeasurementMNode measurementMNode;
-    TSDataType dataType;
+    int loc = 0;
+
     for (int i = 0; i < measurementList.length; i++) {
       try {
         MNode child = getMNode(deviceMNode.left, measurementList[i]);
@@ -1943,27 +1971,70 @@ public class MManager {
             throw new PathNotExistException(deviceId + PATH_SEPARATOR + measurementList[i]);
           } else {
             // child is null or child is type of MNode
-            dataType = getTypeInLoc(plan, i);
-            // create it, may concurrent created by multiple thread
-            internalCreateTimeseries(deviceId.concatNode(measurementList[i]), dataType);
-            measurementMNode = (MeasurementMNode) deviceMNode.left.getChild(measurementList[i]);
+            // get dataType of plan, only support InsertRowPlan and InsertTabletPlan
+            if (plan instanceof InsertRowPlan) {
+              TSDataType dataType = plan.getDataTypes()[i];
+              // create it, may concurrent created by multiple thread
+              internalCreateTimeseries(deviceId.concatNode(measurementList[i]), dataType);
+              measurementMNode = (MeasurementMNode) deviceMNode.left.getChild(measurementList[i]);
+            } else if (plan instanceof InsertTabletPlan) {
+              List<TSDataType> dataTypes = new ArrayList<>();
+              List<String> measurements =
+                  Arrays.asList(measurementList[i].replace("(", "").replace(")", "").split(","));
+              if (measurements.size() == 1) {
+                internalCreateTimeseries(
+                    deviceId.concatNode(measurementList[i]), plan.getDataTypes()[loc]);
+                measurementMNode = (MeasurementMNode) deviceMNode.left.getChild(measurementList[i]);
+              } else {
+                for (int j = 0; j < measurements.size(); j++) {
+                  dataTypes.add(plan.getDataTypes()[loc]);
+                  loc++;
+                }
+                internalAlignedCreateTimeseries(deviceId, measurements, dataTypes);
+                measurementMNode = (MeasurementMNode) deviceMNode.left.getChild(measurements.get(0));
+              }
+            } else {
+              throw new MetadataException(
+                  String.format(
+                      "Only support insertRow and insertTablet, plan is [%s]",
+                      plan.getOperatorType()));
+            }
           }
         }
 
         // check type is match
+        boolean mismatch = false;
         TSDataType insertDataType = null;
         if (plan instanceof InsertRowPlan) {
           if (!((InsertRowPlan) plan).isNeedInferType()) {
-            // only when InsertRowPlan's values is object[], we should check type
-            insertDataType = getTypeInLoc(plan, i);
+            // only when InsertRowPlan's values list is object[], we should check type
+            insertDataType = plan.getDataTypes()[i];
           } else {
             insertDataType = measurementMNode.getSchema().getType();
           }
+          mismatch = measurementMNode.getSchema().getType() != insertDataType;
         } else if (plan instanceof InsertTabletPlan) {
-          insertDataType = getTypeInLoc(plan, i);
+          int measurementSize = measurementList[i].split(",").length;
+          loc -= measurementSize;
+          if (measurementSize == 1) {
+            insertDataType = measurementMNode.getSchema().getType();
+            mismatch = measurementMNode.getSchema().getType() != insertDataType;
+          } else {
+            for (int j = 0; j < measurementSize; j++) {
+              TSDataType dataTypeInNode =
+                  measurementMNode.getSchema().getValueTSDataTypeList().get(j);
+              insertDataType = plan.getDataTypes()[loc];
+              if (dataTypeInNode != insertDataType) {
+                mismatch = true;
+                insertDataType = measurementMNode.getSchema().getType();
+                break;
+              }
+              loc++;
+            }
+          }
         }
 
-        if (measurementMNode.getSchema().getType() != insertDataType) {
+        if (mismatch) {
           logger.warn(
               "DataType mismatch, Insert measurement {} type {}, metadata tree type {}",
               measurementList[i],
@@ -1985,7 +2056,6 @@ public class MManager {
 
         // set measurementName instead of alias
         measurementList[i] = measurementMNode.getName();
-
       } catch (MetadataException e) {
         logger.warn(
             "meet error when check {}.{}, message: {}",
@@ -2008,7 +2078,7 @@ public class MManager {
     return deviceMNode.getChild(measurementName);
   }
 
-  /** create timeseries with ignore PathAlreadyExistException */
+  /** create timeseries ignoring PathAlreadyExistException */
   private void internalCreateTimeseries(PartialPath path, TSDataType dataType)
       throws MetadataException {
     createTimeseries(
@@ -2019,21 +2089,20 @@ public class MManager {
         Collections.emptyMap());
   }
 
-  /** get dataType of plan, in loc measurements only support InsertRowPlan and InsertTabletPlan */
-  private TSDataType getTypeInLoc(InsertPlan plan, int loc) throws MetadataException {
-    TSDataType dataType;
-    if (plan instanceof InsertRowPlan) {
-      InsertRowPlan tPlan = (InsertRowPlan) plan;
-      dataType =
-          TypeInferenceUtils.getPredictedDataType(tPlan.getValues()[loc], tPlan.isNeedInferType());
-    } else if (plan instanceof InsertTabletPlan) {
-      dataType = (plan).getDataTypes()[loc];
-    } else {
-      throw new MetadataException(
-          String.format(
-              "Only support insert and insertTablet, plan is [%s]", plan.getOperatorType()));
+  /** create aligned timeseries ignoring PathAlreadyExistException */
+  private void internalAlignedCreateTimeseries(
+      PartialPath devicePath, List<String> measurements, List<TSDataType> dataTypes)
+      throws MetadataException {
+    List<TSEncoding> encodings = new ArrayList<>();
+    for (TSDataType dataType : dataTypes) {
+      encodings.add(getDefaultEncoding(dataType));
     }
-    return dataType;
+    createAlignedTimeSeries(
+        devicePath,
+        measurements,
+        dataTypes,
+        encodings,
+        TSFileDescriptor.getInstance().getConfig().getCompressor());
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MetaUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/MetaUtils.java
index a02f5be..fab9114 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MetaUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MetaUtils.java
@@ -115,7 +115,8 @@ public class MetaUtils {
    */
   public static List<String> getMeasurementsInPartialPath(PartialPath fullPath) {
     if (fullPath.getMeasurement().contains("(") && fullPath.getMeasurement().contains(",")) {
-      return (Arrays.asList(fullPath.getMeasurement().split("\\(")[1].split("\\)")[0].split(",")));
+      return (Arrays.asList(
+          fullPath.getMeasurement().replace("(", "").replace(")", "").split(",")));
     } else {
       return Arrays.asList(fullPath.getMeasurement());
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java
index 52b5b64..1246eea 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java
@@ -106,4 +106,55 @@ public class InsertTabletPlanTest {
       Assert.assertEquals(6, record.getFields().size());
     }
   }
+
+  @Test
+  public void testInsertTabletPlanWithAlignedTimeseries()
+      throws QueryProcessException, MetadataException, InterruptedException,
+          QueryFilterOptimizationException, StorageEngineException, IOException {
+    long[] times = new long[] {110L, 111L, 112L, 113L};
+    List<Integer> dataTypes = new ArrayList<>();
+    dataTypes.add(TSDataType.DOUBLE.ordinal());
+    dataTypes.add(TSDataType.FLOAT.ordinal());
+    dataTypes.add(TSDataType.INT64.ordinal());
+    dataTypes.add(TSDataType.INT32.ordinal());
+    dataTypes.add(TSDataType.BOOLEAN.ordinal());
+    dataTypes.add(TSDataType.TEXT.ordinal());
+
+    Object[] columns = new Object[6];
+    columns[0] = new double[4];
+    columns[1] = new float[4];
+    columns[2] = new long[4];
+    columns[3] = new int[4];
+    columns[4] = new boolean[4];
+    columns[5] = new Binary[4];
+
+    for (int r = 0; r < 4; r++) {
+      ((double[]) columns[0])[r] = 1.0;
+      ((float[]) columns[1])[r] = 2;
+      ((long[]) columns[2])[r] = 10000;
+      ((int[]) columns[3])[r] = 100;
+      ((boolean[]) columns[4])[r] = false;
+      ((Binary[]) columns[5])[r] = new Binary("hh" + r);
+    }
+
+    InsertTabletPlan tabletPlan =
+        new InsertTabletPlan(
+            new PartialPath("root.isp.d1"),
+            new String[] {"(s1,s2,s3)", "(s4,s5)", "s6"},
+            dataTypes);
+    tabletPlan.setTimes(times);
+    tabletPlan.setColumns(columns);
+    tabletPlan.setRowCount(times.length);
+
+    PlanExecutor executor = new PlanExecutor();
+    executor.insertTablet(tabletPlan);
+
+    QueryPlan queryPlan = (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.isp.d1");
+    QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+    Assert.assertEquals(6, dataSet.getPaths().size());
+    while (dataSet.hasNext()) {
+      RowRecord record = dataSet.next();
+      Assert.assertEquals(6, record.getFields().size());
+    }
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
index 0411be9..def8386 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
@@ -318,10 +318,7 @@ public class VectorMeasurementSchema
           TSDataType.deserialize(types[i]).toString(),
           ",",
           TSEncoding.deserialize(encodings[i]).toString());
-      sc.addTail("]");
-      if (i != measurements.length - 1) {
-        sc.addTail(", ");
-      }
+      sc.addTail("],");
     }
     sc.addTail(CompressionType.deserialize(compressor).toString());
     return sc.toString();