You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/09/07 06:58:58 UTC

[iotdb] branch master updated: [IOTDB-1600] Support InsertRowsOfOneDevicePlan in cluster (#3877)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ba988ee  [IOTDB-1600] Support InsertRowsOfOneDevicePlan in cluster (#3877)
ba988ee is described below

commit ba988eec2778370df629913a30baae3f66abba8c
Author: BaiJian <er...@hotmail.com>
AuthorDate: Tue Sep 7 14:58:34 2021 +0800

    [IOTDB-1600] Support InsertRowsOfOneDevicePlan in cluster (#3877)
    
    * Support InsertRowsOfOneDevicePlan in cluster
    
    * Add deserializing InsertRowsOfOneDevicePlan
    
    * Discard client-go updating
    
    * Add error handling and unit test
    
    * Fix small issues by review comments
    
    * Fix small issues
---
 .../iotdb/cluster/query/ClusterPlanRouter.java     | 36 ++++++++++
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |  4 ++
 .../physical/crud/InsertRowsOfOneDevicePlan.java   | 80 +++++++++++++++++++++-
 .../qp/physical/InsertRowsOfOneDevicePlanTest.java | 75 ++++++++++++++++++++
 .../test/java/org/apache/iotdb/db/sql/Cases.java   | 62 +++++++++++++++++
 5 files changed, 254 insertions(+), 3 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
index cc06568..f34597a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
@@ -134,6 +135,8 @@ public class ClusterPlanRouter {
       return splitAndRoutePlan((CreateAlignedTimeSeriesPlan) plan);
     } else if (plan instanceof InsertRowPlan) {
       return splitAndRoutePlan((InsertRowPlan) plan);
+    } else if (plan instanceof InsertRowsOfOneDevicePlan) {
+      return splitAndRoutePlan((InsertRowsOfOneDevicePlan) plan);
     } else if (plan instanceof AlterTimeSeriesPlan) {
       return splitAndRoutePlan((AlterTimeSeriesPlan) plan);
     } else if (plan instanceof CreateMultiTimeSeriesPlan) {
@@ -504,4 +507,37 @@ public class ClusterPlanRouter {
     subPlan.setIndexes(new ArrayList<>());
     return subPlan;
   }
+
+  /**
+   * @param plan InsertRowsOfOneDevicePlan
+   * @return key is InsertRowsOfOneDevicePlan, value is the partition group the plan belongs to. All
+   *     InsertRowPlans in InsertRowsOfOneDevicePlan belong to one same storage group.
+   */
+  private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(InsertRowsOfOneDevicePlan plan)
+      throws MetadataException {
+    Map<PhysicalPlan, PartitionGroup> result = new HashMap<>();
+    Map<PartitionGroup, List<InsertRowPlan>> groupPlanMap = new HashMap<>();
+    Map<PartitionGroup, List<Integer>> groupPlanIndexMap = new HashMap<>();
+    PartialPath storageGroup = getMManager().getStorageGroupPath(plan.getPrefixPath());
+    for (int i = 0; i < plan.getRowPlans().length; i++) {
+      InsertRowPlan p = plan.getRowPlans()[i];
+      PartitionGroup group = partitionTable.route(storageGroup.getFullPath(), p.getTime());
+      List<InsertRowPlan> groupedPlans =
+          groupPlanMap.computeIfAbsent(group, k -> new ArrayList<>());
+      List<Integer> groupedPlanIndex =
+          groupPlanIndexMap.computeIfAbsent(group, k -> new ArrayList<>());
+      groupedPlans.add(p);
+      groupedPlanIndex.add(plan.getRowPlanIndexList()[i]);
+    }
+
+    for (Entry<PartitionGroup, List<InsertRowPlan>> entry : groupPlanMap.entrySet()) {
+      PhysicalPlan reducedPlan =
+          new InsertRowsOfOneDevicePlan(
+              plan.getPrefixPath(),
+              entry.getValue().toArray(new InsertRowPlan[0]),
+              groupPlanIndexMap.get(entry.getKey()).stream().mapToInt(i -> i).toArray());
+      result.put(reducedPlan, entry.getKey());
+    }
+    return result;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index e25cb2b..a8d3e31 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.qp.physical.crud.CreateTemplatePlan;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.crud.SelectIntoPlan;
@@ -360,6 +361,9 @@ public abstract class PhysicalPlan {
         case BATCH_INSERT_ROWS:
           plan = new InsertRowsPlan();
           break;
+        case BATCH_INSERT_ONE_DEVICE:
+          plan = new InsertRowsOfOneDevicePlan();
+          break;
         case CREATE_TRIGGER:
           plan = new CreateTriggerPlan();
           break;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
index 7fc2e55..64a8c11 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
@@ -29,7 +29,8 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -37,18 +38,42 @@ import java.util.Set;
 
 public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
 
+  /**
+   * This class has some duplicated codes with InsertRowsPlan, they should be refined in the future
+   */
   boolean[] isExecuted;
+
   private InsertRowPlan[] rowPlans;
 
+  /**
+   * Suppose there is an InsertRowsOfOneDevicePlan, which contains 5 InsertRowPlans,
+   * rowPlans={InsertRowPlan_0, InsertRowPlan_1, InsertRowPlan_2, InsertRowPlan_3, InsertRowPlan_4},
+   * then the rowPlanIndexList={0, 1, 2, 3, 4} respectively. But when the InsertRowsOfOneDevicePlan
+   * is split into two InsertRowsOfOneDevicePlans according to the time partition in cluster
+   * version, suppose that the InsertRowsOfOneDevicePlan_1's rowPlanIndexList = {InsertRowPlan_0,
+   * InsertRowPlan_3, InsertRowPlan_4}, then InsertRowsOfOneDevicePlan_1's rowPlanIndexList = {0, 3,
+   * 4}; InsertRowsOfOneDevicePlan_2's rowPlanIndexList = {InsertRowPlan_1, InsertRowPlan_2} then
+   * InsertRowsOfOneDevicePlan_2's rowPlanIndexList = {1, 2} respectively;
+   */
+  private int[] rowPlanIndexList;
+
+  /** record the result of insert rows */
+  private Map<Integer, TSStatus> results = new HashMap<>();
+
+  public InsertRowsOfOneDevicePlan() {
+    super(OperatorType.BATCH_INSERT_ONE_DEVICE);
+  }
+
   public InsertRowsOfOneDevicePlan(
       PartialPath deviceId,
       Long[] insertTimes,
       List<List<String>> measurements,
       ByteBuffer[] insertValues)
       throws QueryProcessException {
-    super(OperatorType.BATCH_INSERT_ONE_DEVICE);
+    this();
     this.prefixPath = deviceId;
     rowPlans = new InsertRowPlan[insertTimes.length];
+    rowPlanIndexList = new int[insertTimes.length];
     for (int i = 0; i < insertTimes.length; i++) {
       rowPlans[i] =
           new InsertRowPlan(
@@ -67,9 +92,22 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
                 + ", time:"
                 + insertTimes[i]);
       }
+      rowPlanIndexList[i] = i;
     }
   }
 
+  /**
+   * This constructor is used for splitting parent InsertRowsOfOneDevicePlan into sub ones. So
+   * there's no need to validate rowPlans.
+   */
+  public InsertRowsOfOneDevicePlan(
+      PartialPath deviceId, InsertRowPlan[] rowPlans, int[] rowPlanIndexList) {
+    this();
+    this.prefixPath = deviceId;
+    this.rowPlans = rowPlans;
+    this.rowPlanIndexList = rowPlanIndexList;
+  }
+
   @Override
   public void checkIntegrity() {}
 
@@ -106,6 +144,9 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
       stream.writeLong(plan.getTime());
       plan.serializeMeasurementsAndValues(stream);
     }
+    for (Integer index : rowPlanIndexList) {
+      stream.writeInt(index);
+    }
   }
 
   @Override
@@ -119,6 +160,9 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
       buffer.putLong(plan.getTime());
       plan.serializeMeasurementsAndValues(buffer);
     }
+    for (Integer index : rowPlanIndexList) {
+      buffer.putInt(index);
+    }
   }
 
   @Override
@@ -131,6 +175,10 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
       rowPlans[i].setTime(buffer.getLong());
       rowPlans[i].deserializeMeasurementsAndValues(buffer);
     }
+    this.rowPlanIndexList = new int[rowPlans.length];
+    for (int i = 0; i < rowPlans.length; i++) {
+      rowPlanIndexList[i] = buffer.getInt();
+    }
   }
 
   @Override
@@ -183,7 +231,7 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
   }
 
   public Map<Integer, TSStatus> getResults() {
-    return Collections.emptyMap();
+    return results;
   }
 
   @Override
@@ -191,11 +239,37 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
     return rowPlans.length;
   }
 
+  public int[] getRowPlanIndexList() {
+    return rowPlanIndexList;
+  }
+
   @Override
   public void unsetIsExecuted(int i) {
     if (isExecuted == null) {
       isExecuted = new boolean[getBatchSize()];
     }
     isExecuted[i] = false;
+    if (rowPlanIndexList != null && rowPlanIndexList.length > 0) {
+      results.remove(rowPlanIndexList[i]);
+    } else {
+      results.remove(i);
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    return o instanceof InsertRowsOfOneDevicePlan
+        && Arrays.equals(((InsertRowsOfOneDevicePlan) o).rowPlanIndexList, this.rowPlanIndexList)
+        && Arrays.equals(((InsertRowsOfOneDevicePlan) o).rowPlans, this.rowPlans)
+        && ((InsertRowsOfOneDevicePlan) o).results.equals(this.results)
+        && ((InsertRowsOfOneDevicePlan) o).getPrefixPath().equals(this.getPrefixPath());
+  }
+
+  @Override
+  public int hashCode() {
+    int result = rowPlans != null ? Arrays.hashCode(rowPlans) : 0;
+    result = 31 * result + (rowPlanIndexList != null ? Arrays.hashCode(rowPlanIndexList) : 0);
+    result = 31 * result + (results != null ? results.hashCode() : 0);
+    return result;
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowsOfOneDevicePlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowsOfOneDevicePlanTest.java
new file mode 100644
index 0000000..37a4204
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowsOfOneDevicePlanTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.physical;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class InsertRowsOfOneDevicePlanTest {
+
+  @Test
+  public void testSerializable() throws IllegalPathException, IOException {
+
+    PartialPath device = new PartialPath("root.sg.d");
+    InsertRowPlan[] rowPlans =
+        new InsertRowPlan[] {
+          new InsertRowPlan(
+              device,
+              1000L,
+              new String[] {"s1", "s2", "s3"},
+              new TSDataType[] {TSDataType.DOUBLE, TSDataType.FLOAT, TSDataType.INT64},
+              new String[] {"1.0", "2", "300"},
+              true),
+          new InsertRowPlan(
+              device,
+              2000L,
+              new String[] {"s1", "s4"},
+              new TSDataType[] {TSDataType.DOUBLE, TSDataType.TEXT},
+              new String[] {"2.0", "abc"},
+              true),
+        };
+
+    InsertRowsOfOneDevicePlan p = new InsertRowsOfOneDevicePlan(device, rowPlans, new int[] {0, 1});
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream w = new DataOutputStream(baos);
+    p.serialize(w);
+    w.flush();
+    byte[] res = baos.toByteArray();
+    ByteBuffer buf = ByteBuffer.wrap(res);
+    InsertRowsOfOneDevicePlan p2 = (InsertRowsOfOneDevicePlan) PhysicalPlan.Factory.create(buf);
+    Assert.assertEquals(p, p2);
+    res = new byte[1024];
+    p.serialize(ByteBuffer.wrap(res));
+    buf = ByteBuffer.wrap(res);
+    p2 = (InsertRowsOfOneDevicePlan) PhysicalPlan.Factory.create(buf);
+    Assert.assertEquals(p, p2);
+  }
+}
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java b/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
index 2f2b8dc..cd48873 100644
--- a/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
@@ -43,6 +43,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -193,6 +194,67 @@ public abstract class Cases {
       Assert.assertFalse(resultSet.next());
       resultSet.close();
     }
+
+    // test https://issues.apache.org/jira/browse/IOTDB-1600
+    try {
+      // Target data of device "root.ln.wf01.d_1600"
+      // Time s1   s2   s3
+      // 1000 1.0  2.0  null
+      // 2000 null 3.0  4.0
+      // 3000 5.0  6.0  7.0
+      String testDevice = "root.ln.wf01.d_1600";
+      session.createTimeseries(
+          testDevice + ".s1", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY);
+      session.createTimeseries(
+          testDevice + ".s2", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY);
+      session.createTimeseries(
+          testDevice + ".s3", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY);
+      List<Long> insertTimes = Arrays.asList(1000L, 2000L, 3000L);
+      List<List<String>> insertedMeasurements =
+          Arrays.asList(
+              Arrays.asList("s1", "s2"),
+              Arrays.asList("s2", "s3"),
+              Arrays.asList("s1", "s2", "s3"));
+      List<List<TSDataType>> insertedDataTypes =
+          Arrays.asList(
+              Arrays.asList(TSDataType.DOUBLE, TSDataType.DOUBLE),
+              Arrays.asList(TSDataType.DOUBLE, TSDataType.DOUBLE),
+              Arrays.asList(TSDataType.DOUBLE, TSDataType.DOUBLE, TSDataType.DOUBLE));
+      List<List<Object>> insertedValues =
+          Arrays.asList(
+              Arrays.asList(1.0D, 2.0D),
+              Arrays.asList(3.0D, 4.0D),
+              Arrays.asList(5.0D, 6.0D, 7.0D));
+      session.insertRecordsOfOneDevice(
+          testDevice, insertTimes, insertedMeasurements, insertedDataTypes, insertedValues);
+      final double E = 0.00001;
+      for (Statement readStatement : readStatements) {
+        resultSet = readStatement.executeQuery("select s1, s2, s3 from " + testDevice);
+        Assert.assertTrue(resultSet.next());
+        Assert.assertEquals(1000L, resultSet.getLong("Time"));
+        Assert.assertEquals(1.0D, resultSet.getDouble(testDevice + ".s1"), E);
+        Assert.assertEquals(2.0D, resultSet.getDouble(testDevice + ".s2"), E);
+        Assert.assertNull(resultSet.getObject(testDevice + ".s3"));
+
+        Assert.assertTrue(resultSet.next());
+        Assert.assertEquals(2000L, resultSet.getLong("Time"));
+        Assert.assertNull(resultSet.getObject(testDevice + ".s1"));
+        Assert.assertEquals(3.0D, resultSet.getDouble(testDevice + ".s2"), E);
+        Assert.assertEquals(4.0D, resultSet.getDouble(testDevice + ".s3"), E);
+
+        Assert.assertTrue(resultSet.next());
+        Assert.assertEquals(3000L, resultSet.getLong("Time"));
+        Assert.assertEquals(5.0D, resultSet.getDouble(testDevice + ".s1"), E);
+        Assert.assertEquals(6.0D, resultSet.getDouble(testDevice + ".s2"), E);
+        Assert.assertEquals(7.0D, resultSet.getDouble(testDevice + ".s3"), E);
+
+        Assert.assertFalse(resultSet.next());
+        resultSet.close();
+      }
+    } catch (IoTDBConnectionException | StatementExecutionException e) {
+      e.printStackTrace();
+      fail();
+    }
   }
 
   // test https://issues.apache.org/jira/browse/IOTDB-1266