You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2021/03/14 09:01:32 UTC

[iotdb] branch insertTablet created (now f39bc42)

This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a change to branch insertTablet
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at f39bc42  [IOTDB-1228] Refactor PlanExecutor.insertTablet method to support aligned timeseries

This branch includes the following new commits:

     new 2e667d3  Merge remote-tracking branch 'origin/Vector' into Vector
     new bbcadd9  Merge remote-tracking branch 'origin/Vector' into Vector
     new f39bc42  [IOTDB-1228] Refactor PlanExecutor.insertTablet method to support aligned timeseries

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 02/03: Merge remote-tracking branch 'origin/Vector' into Vector

Posted by su...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch insertTablet
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit bbcadd9d614f31c1b1327b251aa220903bb018fa
Merge: 2e667d3 0867605
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Sun Mar 14 16:46:06 2021 +0800

    Merge remote-tracking branch 'origin/Vector' into Vector

 .github/workflows/sonar-coveralls.yml              |  14 +-
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |  35 +++--
 .../iotdb/db/engine/memtable/AbstractMemTable.java |  59 ++++-----
 .../db/engine/memtable/IWritableMemChunk.java      |   2 +-
 .../db/engine/memtable/PrimitiveMemTable.java      |   5 +
 .../iotdb/db/engine/memtable/WritableMemChunk.java |   6 +-
 .../org/apache/iotdb/db/metadata/MManager.java     | 140 ++++++++++++++------
 .../java/org/apache/iotdb/db/metadata/MTree.java   |  84 ++++++------
 .../iotdb/db/metadata/logfile/MLogWriter.java      |  32 +++--
 .../org/apache/iotdb/db/metadata/mnode/MNode.java  |  21 ++-
 .../iotdb/db/metadata/template/Template.java       |  35 ++++-
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  | 100 +++++++-------
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |  27 ++--
 .../db/qp/physical/crud/CreateTemplatePlan.java    | 146 ++++++++++++++++++++-
 .../db/qp/physical/crud/SetDeviceTemplatePlan.java |  55 +++++++-
 .../physical/sys/CreateAlignedTimeSeriesPlan.java  |  17 +--
 .../apache/iotdb/db/rescon/TVListAllocator.java    |  13 +-
 .../iotdb/db/utils/datastructure/VectorTVList.java |  12 +-
 .../db/engine/memtable/MemTableFlushTaskTest.java  |  40 +++++-
 .../db/engine/memtable/MemTableTestUtils.java      |  58 ++++++++
 .../db/engine/memtable/PrimitiveMemTableTest.java  | 124 +++++++++--------
 .../iotdb/db/metadata/MManagerBasicTest.java       |  85 +++++++++---
 .../iotdb/db/metadata/MManagerImproveTest.java     |  18 ++-
 .../write/schema/VectorMeasurementSchema.java      |   2 +-
 24 files changed, 810 insertions(+), 320 deletions(-)


[iotdb] 01/03: Merge remote-tracking branch 'origin/Vector' into Vector

Posted by su...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch insertTablet
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2e667d326430b4cdba3f6d3097c41a49b3a47cb5
Merge: 14f8046 a9d6687
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Sun Mar 14 12:18:06 2021 +0800

    Merge remote-tracking branch 'origin/Vector' into Vector

 .github/workflows/client.yml                       |  30 ++---
 antlr/pom.xml                                      |  19 ++++
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlBase.g4   |   2 +-
 client-cpp/README.md                               |   6 +-
 .../resources/conf/iotdb-cluster.properties        |  10 +-
 .../java/org/apache/iotdb/cluster/ClusterMain.java |   7 ++
 .../cluster/client/async/AsyncClientPool.java      |  10 +-
 .../iotdb/cluster/client/sync/SyncClientPool.java  |  10 +-
 .../apache/iotdb/cluster/config/ClusterConfig.java |  15 +++
 .../iotdb/cluster/config/ClusterDescriptor.java    |  11 ++
 .../iotdb/cluster/coordinator/Coordinator.java     |  27 ++---
 .../apache/iotdb/cluster/metadata/CMManager.java   |  40 +++----
 .../cluster/query/ClusterPhysicalGenerator.java    |  20 +++-
 .../iotdb/cluster/query/ClusterPlanExecutor.java   |   4 +
 docs/UserGuide/Operation Manual/Kill Query.md      |   2 +-
 .../Operation Manual/UDF User Defined Function.md  |   2 +-
 docs/UserGuide/Server/Cluster Setup.md             |   4 +-
 docs/zh/UserGuide/Operation Manual/Kill Query.md   |   2 +-
 .../Operation Manual/UDF User Defined Function.md  |   2 +-
 docs/zh/UserGuide/Server/Cluster Setup.md          |   4 +-
 example/client-cpp-example/README.md               |   2 +-
 hive-connector/pom.xml                             |  12 +-
 .../resources/conf/iotdb-engine.properties         |   2 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  10 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   4 +-
 .../db/engine/compaction/TsFileManagement.java     |   8 ++
 .../level/LevelCompactionTsFileManagement.java     |  31 ++++++
 .../engine/compaction/utils/CompactionUtils.java   |   3 +
 .../iotdb/db/engine/flush/MemTableFlushTask.java   |  94 ++++++----------
 .../db/engine/storagegroup/TsFileProcessor.java    |  11 +-
 .../org/apache/iotdb/db/metadata/MManager.java     |  10 ++
 .../iotdb/db/metadata/template/Template.java       |  70 ++++++++++++
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  26 +++++
 .../org/apache/iotdb/db/qp/logical/Operator.java   |   4 +-
 .../db/qp/physical/crud/CreateTemplatePlan.java    |  78 +++++++++++++
 .../db/qp/physical/crud/SetDeviceTemplatePlan.java |  23 ++++
 .../iotdb/db/query/control/QueryTimeManager.java   |   2 +-
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  79 ++++++++++++-
 .../iotdb/db/utils/datastructure/VectorTVList.java | 122 ++++++++++++++++++++-
 .../iotdb/db/integration/IoTDBQueryDemoIT.java     |   8 +-
 .../java/org/apache/iotdb/session/Session.java     |  54 +++++++++
 .../apache/iotdb/session/SessionConnection.java    |  40 +++++++
 .../java/org/apache/iotdb/session/SessionUT.java   |  47 ++++++++
 thrift/pom.xml                                     |  19 ++++
 thrift/src/main/thrift/rpc.thrift                  |  19 ++++
 .../file/metadata/statistics/Statistics.java       |   2 +
 .../iotdb/tsfile/read/TsFileSequenceReader.java    |  15 ++-
 ...easurementChunkMetadataListMapIteratorTest.java |  65 +++++++++--
 .../zeppelin/iotdb/IoTDBInterpreterTest.java       |   2 +-
 49 files changed, 911 insertions(+), 178 deletions(-)


[iotdb] 03/03: [IOTDB-1228] Refactor PlanExecutor.insertTablet method to support aligned timeseries

Posted by su...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch insertTablet
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f39bc420b766758e842dbead0bf1584142f2793c
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Sun Mar 14 16:57:54 2021 +0800

    [IOTDB-1228] Refactor PlanExecutor.insertTablet method to support aligned timeseries
---
 .../org/apache/iotdb/db/metadata/MManager.java     | 123 ++++++++++++++++-----
 .../org/apache/iotdb/db/metadata/MetaUtils.java    |   3 +-
 .../iotdb/db/qp/physical/InsertTabletPlanTest.java |  51 +++++++++
 .../write/schema/VectorMeasurementSchema.java      |   5 +-
 4 files changed, 150 insertions(+), 32 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 2bca513..6ddafbf 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -92,7 +92,6 @@ import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
 import org.apache.iotdb.db.utils.RandomDeleteCache;
 import org.apache.iotdb.db.utils.SchemaUtils;
 import org.apache.iotdb.db.utils.TestOnly;
-import org.apache.iotdb.db.utils.TypeInferenceUtils;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.exception.cache.CacheException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -107,6 +106,34 @@ import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
+
 /**
  * This class takes the responsibility of serialization of all the metadata info and persistent it
  * into files. This class contains all the interfaces to modify the metadata for delta system. All
@@ -1928,9 +1955,10 @@ public class MManager {
     Pair<MNode, Template> deviceMNode = getDeviceNodeWithAutoCreate(deviceId);
 
     // 2. get schema of each measurement
-    // if do not has measurement
+    // if do not have measurement
     MeasurementMNode measurementMNode;
-    TSDataType dataType;
+    int loc = 0;
+
     for (int i = 0; i < measurementList.length; i++) {
       try {
         MNode child = getMNode(deviceMNode.left, measurementList[i]);
@@ -1943,27 +1971,70 @@ public class MManager {
             throw new PathNotExistException(deviceId + PATH_SEPARATOR + measurementList[i]);
           } else {
             // child is null or child is type of MNode
-            dataType = getTypeInLoc(plan, i);
-            // create it, may concurrent created by multiple thread
-            internalCreateTimeseries(deviceId.concatNode(measurementList[i]), dataType);
-            measurementMNode = (MeasurementMNode) deviceMNode.left.getChild(measurementList[i]);
+            // get dataType of plan, only support InsertRowPlan and InsertTabletPlan
+            if (plan instanceof InsertRowPlan) {
+              TSDataType dataType = plan.getDataTypes()[i];
+              // create it, may concurrent created by multiple thread
+              internalCreateTimeseries(deviceId.concatNode(measurementList[i]), dataType);
+              measurementMNode = (MeasurementMNode) deviceMNode.left.getChild(measurementList[i]);
+            } else if (plan instanceof InsertTabletPlan) {
+              List<TSDataType> dataTypes = new ArrayList<>();
+              List<String> measurements =
+                  Arrays.asList(measurementList[i].replace("(", "").replace(")", "").split(","));
+              if (measurements.size() == 1) {
+                internalCreateTimeseries(
+                    deviceId.concatNode(measurementList[i]), plan.getDataTypes()[loc]);
+                measurementMNode = (MeasurementMNode) deviceMNode.left.getChild(measurementList[i]);
+              } else {
+                for (int j = 0; j < measurements.size(); j++) {
+                  dataTypes.add(plan.getDataTypes()[loc]);
+                  loc++;
+                }
+                internalAlignedCreateTimeseries(deviceId, measurements, dataTypes);
+                measurementMNode = (MeasurementMNode) deviceMNode.left.getChild(measurements.get(0));
+              }
+            } else {
+              throw new MetadataException(
+                  String.format(
+                      "Only support insertRow and insertTablet, plan is [%s]",
+                      plan.getOperatorType()));
+            }
           }
         }
 
         // check type is match
+        boolean mismatch = false;
         TSDataType insertDataType = null;
         if (plan instanceof InsertRowPlan) {
           if (!((InsertRowPlan) plan).isNeedInferType()) {
-            // only when InsertRowPlan's values is object[], we should check type
-            insertDataType = getTypeInLoc(plan, i);
+            // only when InsertRowPlan's values list is object[], we should check type
+            insertDataType = plan.getDataTypes()[i];
           } else {
             insertDataType = measurementMNode.getSchema().getType();
           }
+          mismatch = measurementMNode.getSchema().getType() != insertDataType;
         } else if (plan instanceof InsertTabletPlan) {
-          insertDataType = getTypeInLoc(plan, i);
+          int measurementSize = measurementList[i].split(",").length;
+          loc -= measurementSize;
+          if (measurementSize == 1) {
+            insertDataType = measurementMNode.getSchema().getType();
+            mismatch = measurementMNode.getSchema().getType() != insertDataType;
+          } else {
+            for (int j = 0; j < measurementSize; j++) {
+              TSDataType dataTypeInNode =
+                  measurementMNode.getSchema().getValueTSDataTypeList().get(j);
+              insertDataType = plan.getDataTypes()[loc];
+              if (dataTypeInNode != insertDataType) {
+                mismatch = true;
+                insertDataType = measurementMNode.getSchema().getType();
+                break;
+              }
+              loc++;
+            }
+          }
         }
 
-        if (measurementMNode.getSchema().getType() != insertDataType) {
+        if (mismatch) {
           logger.warn(
               "DataType mismatch, Insert measurement {} type {}, metadata tree type {}",
               measurementList[i],
@@ -1985,7 +2056,6 @@ public class MManager {
 
         // set measurementName instead of alias
         measurementList[i] = measurementMNode.getName();
-
       } catch (MetadataException e) {
         logger.warn(
             "meet error when check {}.{}, message: {}",
@@ -2008,7 +2078,7 @@ public class MManager {
     return deviceMNode.getChild(measurementName);
   }
 
-  /** create timeseries with ignore PathAlreadyExistException */
+  /** create timeseries ignoring PathAlreadyExistException */
   private void internalCreateTimeseries(PartialPath path, TSDataType dataType)
       throws MetadataException {
     createTimeseries(
@@ -2019,21 +2089,20 @@ public class MManager {
         Collections.emptyMap());
   }
 
-  /** get dataType of plan, in loc measurements only support InsertRowPlan and InsertTabletPlan */
-  private TSDataType getTypeInLoc(InsertPlan plan, int loc) throws MetadataException {
-    TSDataType dataType;
-    if (plan instanceof InsertRowPlan) {
-      InsertRowPlan tPlan = (InsertRowPlan) plan;
-      dataType =
-          TypeInferenceUtils.getPredictedDataType(tPlan.getValues()[loc], tPlan.isNeedInferType());
-    } else if (plan instanceof InsertTabletPlan) {
-      dataType = (plan).getDataTypes()[loc];
-    } else {
-      throw new MetadataException(
-          String.format(
-              "Only support insert and insertTablet, plan is [%s]", plan.getOperatorType()));
+  /** create aligned timeseries ignoring PathAlreadyExistException */
+  private void internalAlignedCreateTimeseries(
+      PartialPath devicePath, List<String> measurements, List<TSDataType> dataTypes)
+      throws MetadataException {
+    List<TSEncoding> encodings = new ArrayList<>();
+    for (TSDataType dataType : dataTypes) {
+      encodings.add(getDefaultEncoding(dataType));
     }
-    return dataType;
+    createAlignedTimeSeries(
+        devicePath,
+        measurements,
+        dataTypes,
+        encodings,
+        TSFileDescriptor.getInstance().getConfig().getCompressor());
   }
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MetaUtils.java b/server/src/main/java/org/apache/iotdb/db/metadata/MetaUtils.java
index a02f5be..fab9114 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MetaUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MetaUtils.java
@@ -115,7 +115,8 @@ public class MetaUtils {
    */
   public static List<String> getMeasurementsInPartialPath(PartialPath fullPath) {
     if (fullPath.getMeasurement().contains("(") && fullPath.getMeasurement().contains(",")) {
-      return (Arrays.asList(fullPath.getMeasurement().split("\\(")[1].split("\\)")[0].split(",")));
+      return (Arrays.asList(
+          fullPath.getMeasurement().replace("(", "").replace(")", "").split(",")));
     } else {
       return Arrays.asList(fullPath.getMeasurement());
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java
index 52b5b64..1246eea 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java
@@ -106,4 +106,55 @@ public class InsertTabletPlanTest {
       Assert.assertEquals(6, record.getFields().size());
     }
   }
+
+  @Test
+  public void testInsertTabletPlanWithAlignedTimeseries()
+      throws QueryProcessException, MetadataException, InterruptedException,
+          QueryFilterOptimizationException, StorageEngineException, IOException {
+    long[] times = new long[] {110L, 111L, 112L, 113L};
+    List<Integer> dataTypes = new ArrayList<>();
+    dataTypes.add(TSDataType.DOUBLE.ordinal());
+    dataTypes.add(TSDataType.FLOAT.ordinal());
+    dataTypes.add(TSDataType.INT64.ordinal());
+    dataTypes.add(TSDataType.INT32.ordinal());
+    dataTypes.add(TSDataType.BOOLEAN.ordinal());
+    dataTypes.add(TSDataType.TEXT.ordinal());
+
+    Object[] columns = new Object[6];
+    columns[0] = new double[4];
+    columns[1] = new float[4];
+    columns[2] = new long[4];
+    columns[3] = new int[4];
+    columns[4] = new boolean[4];
+    columns[5] = new Binary[4];
+
+    for (int r = 0; r < 4; r++) {
+      ((double[]) columns[0])[r] = 1.0;
+      ((float[]) columns[1])[r] = 2;
+      ((long[]) columns[2])[r] = 10000;
+      ((int[]) columns[3])[r] = 100;
+      ((boolean[]) columns[4])[r] = false;
+      ((Binary[]) columns[5])[r] = new Binary("hh" + r);
+    }
+
+    InsertTabletPlan tabletPlan =
+        new InsertTabletPlan(
+            new PartialPath("root.isp.d1"),
+            new String[] {"(s1,s2,s3)", "(s4,s5)", "s6"},
+            dataTypes);
+    tabletPlan.setTimes(times);
+    tabletPlan.setColumns(columns);
+    tabletPlan.setRowCount(times.length);
+
+    PlanExecutor executor = new PlanExecutor();
+    executor.insertTablet(tabletPlan);
+
+    QueryPlan queryPlan = (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.isp.d1");
+    QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+    Assert.assertEquals(6, dataSet.getPaths().size());
+    while (dataSet.hasNext()) {
+      RowRecord record = dataSet.next();
+      Assert.assertEquals(6, record.getFields().size());
+    }
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
index 0411be9..def8386 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
@@ -318,10 +318,7 @@ public class VectorMeasurementSchema
           TSDataType.deserialize(types[i]).toString(),
           ",",
           TSEncoding.deserialize(encodings[i]).toString());
-      sc.addTail("]");
-      if (i != measurements.length - 1) {
-        sc.addTail(", ");
-      }
+      sc.addTail("],");
     }
     sc.addTail(CompressionType.deserialize(compressor).toString());
     return sc.toString();