You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yu...@apache.org on 2020/07/07 15:26:09 UTC

[incubator-iotdb] 01/01: auto create from InsertTablePlan

This is an automated email from the ASF dual-hosted git repository.

yuyuankang pushed a commit to branch kyy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 6ae135174d958d6c6cf60717af722f3ae6e6e899
Author: Ring-k <yu...@hotmail.com>
AuthorDate: Tue Jul 7 23:24:26 2020 +0800

    auto create from InsertTablePlan
---
 .../cluster/server/member/MetaGroupMember.java     | 32 +++++++++++-----------
 .../iotdb/db/qp/physical/crud/InsertPlan.java      |  2 ++
 .../db/qp/physical/crud/InsertTabletPlan.java      | 22 +++++++++++++++
 3 files changed, 40 insertions(+), 16 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index d212933..f000c9a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -407,7 +407,6 @@ public class MetaGroupMember extends RaftMember {
   }
 
 
-
   /**
    * Apply the addition of a new node. Register its identifier, add it to the node list and
    * partition table, serialize the partition table and update the DataGroupMembers.
@@ -693,7 +692,6 @@ public class MetaGroupMember extends RaftMember {
   }
 
 
-
   /**
    * @return Whether all nodes' identifier is known.
    */
@@ -722,7 +720,7 @@ public class MetaGroupMember extends RaftMember {
   /**
    * Process the join cluster request of "node". Only proceed when the partition table is ready.
    *
-   * @param node          cannot be the local node
+   * @param node cannot be the local node
    */
   public AddNodeResponse addNode(Node node, StartUpStatus startUpStatus)
       throws AddSelfException, LogExecutionException {
@@ -1432,7 +1430,7 @@ public class MetaGroupMember extends RaftMember {
               setStorageGroupResult.getCode(), storageGroupName)
       );
     }
-    if(plan instanceof InsertRowPlan){
+    if (plan instanceof InsertRowPlan) {
       // try to create timeseries
       boolean isAutoCreateTimeseriesSuccess = autoCreateTimeseries((InsertRowPlan) plan);
       if (!isAutoCreateTimeseriesSuccess) {
@@ -1451,9 +1449,9 @@ public class MetaGroupMember extends RaftMember {
    * @return
    */
   TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, PhysicalPlan plan) {
-    InsertRowPlan backup = null;
-    if (plan instanceof InsertRowPlan) {
-      backup = (InsertRowPlan) ((InsertRowPlan) plan).clone();
+    InsertPlan backup = null;
+    if (plan instanceof InsertPlan) {
+      backup = (InsertPlan) ((InsertPlan) plan).clone();
     }
     // the error codes from the groups that cannot execute the plan
     TSStatus status;
@@ -1466,7 +1464,7 @@ public class MetaGroupMember extends RaftMember {
         status = forwardToMultipleGroup(planGroupMap);
       }
     }
-    if (plan instanceof InsertRowPlan
+    if (plan instanceof InsertPlan
         && status.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
         && ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
       // try to create timeseries
@@ -1607,7 +1605,7 @@ public class MetaGroupMember extends RaftMember {
    * @param insertPlan, some of the timeseries in it are not created yet
    * @return true of all uncreated timeseries are created
    */
-  boolean autoCreateTimeseries(InsertRowPlan insertPlan) {
+  boolean autoCreateTimeseries(InsertPlan insertPlan) {
     List<String> seriesList = new ArrayList<>();
     String deviceId = insertPlan.getDeviceId();
     String storageGroupName;
@@ -1629,7 +1627,9 @@ public class MetaGroupMember extends RaftMember {
     for (String seriesPath : unregisteredSeriesList) {
       int index = seriesList.indexOf(seriesPath);
       TSDataType dataType = TypeInferenceUtils
-          .getPredictedDataType(insertPlan.getValues()[index], true);
+          .getPredictedDataType(insertPlan instanceof InsertTabletPlan
+              ? ((Object[]) ((InsertTabletPlan) insertPlan).getColumns()[index])[0]
+              : ((InsertRowPlan) insertPlan).getValues()[index], true);
       TSEncoding encoding = getDefaultEncoding(dataType);
       CompressionType compressionType = TSFileDescriptor.getInstance().getConfig().getCompressor();
       CreateTimeSeriesPlan createTimeSeriesPlan = new CreateTimeSeriesPlan(new Path(seriesPath),
@@ -1824,7 +1824,8 @@ public class MetaGroupMember extends RaftMember {
       if (logger.isDebugEnabled()) {
         logger.debug("{}: Pulled {} timeseries schemas of {} and other {} paths from {} of {}",
             name,
-            schemas.size(), request.getPrefixPaths().get(0), request.getPrefixPaths().size() - 1, node,
+            schemas.size(), request.getPrefixPaths().get(0), request.getPrefixPaths().size() - 1,
+            node,
             request.getHeader());
       }
       results.addAll(schemas);
@@ -2654,16 +2655,16 @@ public class MetaGroupMember extends RaftMember {
   }
 
 
-
   /**
    * Process the request of removing a node from the cluster. Reject the request if partition table
    * is unavailable or the node is not the MetaLeader and it does not know who the leader is.
    * Otherwise (being the MetaLeader), the request will be processed locally and broadcast to every
    * node.
    *
-   * @param node          the node to be removed.
+   * @param node the node to be removed.
    */
-  public long removeNode(Node node) throws PartitionTableUnavailableException, LogExecutionException {
+  public long removeNode(Node node)
+      throws PartitionTableUnavailableException, LogExecutionException {
     if (partitionTable == null) {
       logger.info("Cannot add node now because the partition table is not set");
       throw new PartitionTableUnavailableException(thisNode);
@@ -2675,13 +2676,12 @@ public class MetaGroupMember extends RaftMember {
   }
 
 
-
   /**
    * Process a node removal request locally and broadcast it to the whole cluster. The removal will
    * be rejected if number of nodes will fall below half of the replication number after this
    * operation.
    *
-   * @param node          the node to be removed.
+   * @param node the node to be removed.
    * @return Long.MIN_VALUE if further forwarding is required, or the execution result
    */
   private long processRemoveNodeLocally(Node node)
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index 3b4222c..5c81cdc 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -95,4 +95,6 @@ public abstract class InsertPlan extends PhysicalPlan {
     dataTypes[index] = null;
   }
 
+  @Override
+  public abstract Object clone();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
index f7b59db..dd6b5d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
@@ -482,4 +482,26 @@ public class InsertTabletPlan extends InsertPlan {
     columns[index] = null;
   }
 
+  @Override
+  public Object clone() {
+    String deviceIdClone = deviceId;
+    String[] measurementsClone = new String[this.measurements.length];
+    System.arraycopy(this.measurements, 0, measurementsClone, 0, measurementsClone.length);
+    TSDataType[] typesClone = new TSDataType[this.dataTypes.length];
+    System.arraycopy(this.dataTypes, 0, typesClone, 0, typesClone.length);
+    InsertTabletPlan ret = new InsertTabletPlan(deviceIdClone, measurementsClone);
+
+    ret.setDataTypes(typesClone);
+
+    long[] timesClone = new long[times.length];
+    System.arraycopy(this.times, 0, timesClone, 0, times.length);
+    ret.setTimes(timesClone);
+    Object[] columnsClone = new Object[columns.length];
+    System.arraycopy(this.columns, 0, columnsClone, 0, columns.length);
+    ret.setColumns(columnsClone);
+
+    ret.setRowCount(rowCount);
+    return ret;
+  }
+
 }