You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/09/07 06:58:58 UTC
[iotdb] branch master updated: [IOTDB-1600] Support
InsertRowsOfOneDevicePlan in cluster (#3877)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ba988ee [IOTDB-1600] Support InsertRowsOfOneDevicePlan in cluster (#3877)
ba988ee is described below
commit ba988eec2778370df629913a30baae3f66abba8c
Author: BaiJian <er...@hotmail.com>
AuthorDate: Tue Sep 7 14:58:34 2021 +0800
[IOTDB-1600] Support InsertRowsOfOneDevicePlan in cluster (#3877)
* Support InsertRowsOfOneDevicePlan in cluster
* Add deserializing InsertRowsOfOneDevicePlan
* Discard client-go updating
* Add error handling and unit test
* Fix small issues by review comments
* Fix small issues
---
.../iotdb/cluster/query/ClusterPlanRouter.java | 36 ++++++++++
.../apache/iotdb/db/qp/physical/PhysicalPlan.java | 4 ++
.../physical/crud/InsertRowsOfOneDevicePlan.java | 80 +++++++++++++++++++++-
.../qp/physical/InsertRowsOfOneDevicePlanTest.java | 75 ++++++++++++++++++++
.../test/java/org/apache/iotdb/db/sql/Cases.java | 62 +++++++++++++++++
5 files changed, 254 insertions(+), 3 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
index cc06568..f34597a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
@@ -134,6 +135,8 @@ public class ClusterPlanRouter {
return splitAndRoutePlan((CreateAlignedTimeSeriesPlan) plan);
} else if (plan instanceof InsertRowPlan) {
return splitAndRoutePlan((InsertRowPlan) plan);
+ } else if (plan instanceof InsertRowsOfOneDevicePlan) {
+ return splitAndRoutePlan((InsertRowsOfOneDevicePlan) plan);
} else if (plan instanceof AlterTimeSeriesPlan) {
return splitAndRoutePlan((AlterTimeSeriesPlan) plan);
} else if (plan instanceof CreateMultiTimeSeriesPlan) {
@@ -504,4 +507,37 @@ public class ClusterPlanRouter {
subPlan.setIndexes(new ArrayList<>());
return subPlan;
}
+
+ /**
+ * @param plan InsertRowsOfOneDevicePlan
+ * @return key is InsertRowsOfOneDevicePlan, value is the partition group the plan belongs to. All
+ * InsertRowPlans in InsertRowsOfOneDevicePlan belong to one same storage group.
+ */
+ private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(InsertRowsOfOneDevicePlan plan)
+ throws MetadataException {
+ Map<PhysicalPlan, PartitionGroup> result = new HashMap<>();
+ Map<PartitionGroup, List<InsertRowPlan>> groupPlanMap = new HashMap<>();
+ Map<PartitionGroup, List<Integer>> groupPlanIndexMap = new HashMap<>();
+ PartialPath storageGroup = getMManager().getStorageGroupPath(plan.getPrefixPath());
+ for (int i = 0; i < plan.getRowPlans().length; i++) {
+ InsertRowPlan p = plan.getRowPlans()[i];
+ PartitionGroup group = partitionTable.route(storageGroup.getFullPath(), p.getTime());
+ List<InsertRowPlan> groupedPlans =
+ groupPlanMap.computeIfAbsent(group, k -> new ArrayList<>());
+ List<Integer> groupedPlanIndex =
+ groupPlanIndexMap.computeIfAbsent(group, k -> new ArrayList<>());
+ groupedPlans.add(p);
+ groupedPlanIndex.add(plan.getRowPlanIndexList()[i]);
+ }
+
+ for (Entry<PartitionGroup, List<InsertRowPlan>> entry : groupPlanMap.entrySet()) {
+ PhysicalPlan reducedPlan =
+ new InsertRowsOfOneDevicePlan(
+ plan.getPrefixPath(),
+ entry.getValue().toArray(new InsertRowPlan[0]),
+ groupPlanIndexMap.get(entry.getKey()).stream().mapToInt(i -> i).toArray());
+ result.put(reducedPlan, entry.getKey());
+ }
+ return result;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index e25cb2b..a8d3e31 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.qp.physical.crud.CreateTemplatePlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.SelectIntoPlan;
@@ -360,6 +361,9 @@ public abstract class PhysicalPlan {
case BATCH_INSERT_ROWS:
plan = new InsertRowsPlan();
break;
+ case BATCH_INSERT_ONE_DEVICE:
+ plan = new InsertRowsOfOneDevicePlan();
+ break;
case CREATE_TRIGGER:
plan = new CreateTriggerPlan();
break;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
index 7fc2e55..64a8c11 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowsOfOneDevicePlan.java
@@ -29,7 +29,8 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -37,18 +38,42 @@ import java.util.Set;
public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
+ /**
+ * This class has some duplicated codes with InsertRowsPlan, they should be refined in the future
+ */
boolean[] isExecuted;
+
private InsertRowPlan[] rowPlans;
+ /**
+ * Suppose there is an InsertRowsOfOneDevicePlan, which contains 5 InsertRowPlans,
+ * rowPlans={InsertRowPlan_0, InsertRowPlan_1, InsertRowPlan_2, InsertRowPlan_3, InsertRowPlan_4},
+ * then the rowPlanIndexList={0, 1, 2, 3, 4} respectively. But when the InsertRowsOfOneDevicePlan
+ * is split into two InsertRowsOfOneDevicePlans according to the time partition in cluster
+ * version, suppose that the InsertRowsOfOneDevicePlan_1's rowPlanIndexList = {InsertRowPlan_0,
+ * InsertRowPlan_3, InsertRowPlan_4}, then InsertRowsOfOneDevicePlan_1's rowPlanIndexList = {0, 3,
+ * 4}; InsertRowsOfOneDevicePlan_2's rowPlanIndexList = {InsertRowPlan_1, InsertRowPlan_2} then
+ * InsertRowsOfOneDevicePlan_2's rowPlanIndexList = {1, 2} respectively;
+ */
+ private int[] rowPlanIndexList;
+
+ /** record the result of insert rows */
+ private Map<Integer, TSStatus> results = new HashMap<>();
+
+ public InsertRowsOfOneDevicePlan() {
+ super(OperatorType.BATCH_INSERT_ONE_DEVICE);
+ }
+
public InsertRowsOfOneDevicePlan(
PartialPath deviceId,
Long[] insertTimes,
List<List<String>> measurements,
ByteBuffer[] insertValues)
throws QueryProcessException {
- super(OperatorType.BATCH_INSERT_ONE_DEVICE);
+ this();
this.prefixPath = deviceId;
rowPlans = new InsertRowPlan[insertTimes.length];
+ rowPlanIndexList = new int[insertTimes.length];
for (int i = 0; i < insertTimes.length; i++) {
rowPlans[i] =
new InsertRowPlan(
@@ -67,9 +92,22 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
+ ", time:"
+ insertTimes[i]);
}
+ rowPlanIndexList[i] = i;
}
}
+ /**
+ * This constructor is used for splitting parent InsertRowsOfOneDevicePlan into sub ones. So
+ * there's no need to validate rowPlans.
+ */
+ public InsertRowsOfOneDevicePlan(
+ PartialPath deviceId, InsertRowPlan[] rowPlans, int[] rowPlanIndexList) {
+ this();
+ this.prefixPath = deviceId;
+ this.rowPlans = rowPlans;
+ this.rowPlanIndexList = rowPlanIndexList;
+ }
+
@Override
public void checkIntegrity() {}
@@ -106,6 +144,9 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
stream.writeLong(plan.getTime());
plan.serializeMeasurementsAndValues(stream);
}
+ for (Integer index : rowPlanIndexList) {
+ stream.writeInt(index);
+ }
}
@Override
@@ -119,6 +160,9 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
buffer.putLong(plan.getTime());
plan.serializeMeasurementsAndValues(buffer);
}
+ for (Integer index : rowPlanIndexList) {
+ buffer.putInt(index);
+ }
}
@Override
@@ -131,6 +175,10 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
rowPlans[i].setTime(buffer.getLong());
rowPlans[i].deserializeMeasurementsAndValues(buffer);
}
+ this.rowPlanIndexList = new int[rowPlans.length];
+ for (int i = 0; i < rowPlans.length; i++) {
+ rowPlanIndexList[i] = buffer.getInt();
+ }
}
@Override
@@ -183,7 +231,7 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
}
public Map<Integer, TSStatus> getResults() {
- return Collections.emptyMap();
+ return results;
}
@Override
@@ -191,11 +239,37 @@ public class InsertRowsOfOneDevicePlan extends InsertPlan implements BatchPlan {
return rowPlans.length;
}
+ public int[] getRowPlanIndexList() {
+ return rowPlanIndexList;
+ }
+
@Override
public void unsetIsExecuted(int i) {
if (isExecuted == null) {
isExecuted = new boolean[getBatchSize()];
}
isExecuted[i] = false;
+ if (rowPlanIndexList != null && rowPlanIndexList.length > 0) {
+ results.remove(rowPlanIndexList[i]);
+ } else {
+ results.remove(i);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return o instanceof InsertRowsOfOneDevicePlan
+ && Arrays.equals(((InsertRowsOfOneDevicePlan) o).rowPlanIndexList, this.rowPlanIndexList)
+ && Arrays.equals(((InsertRowsOfOneDevicePlan) o).rowPlans, this.rowPlans)
+ && ((InsertRowsOfOneDevicePlan) o).results.equals(this.results)
+ && ((InsertRowsOfOneDevicePlan) o).getPrefixPath().equals(this.getPrefixPath());
+ }
+
+ @Override
+ public int hashCode() {
+ int result = rowPlans != null ? Arrays.hashCode(rowPlans) : 0;
+ result = 31 * result + (rowPlanIndexList != null ? Arrays.hashCode(rowPlanIndexList) : 0);
+ result = 31 * result + (results != null ? results.hashCode() : 0);
+ return result;
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowsOfOneDevicePlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowsOfOneDevicePlanTest.java
new file mode 100644
index 0000000..37a4204
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowsOfOneDevicePlanTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.physical;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class InsertRowsOfOneDevicePlanTest {
+
+ @Test
+ public void testSerializable() throws IllegalPathException, IOException {
+
+ PartialPath device = new PartialPath("root.sg.d");
+ InsertRowPlan[] rowPlans =
+ new InsertRowPlan[] {
+ new InsertRowPlan(
+ device,
+ 1000L,
+ new String[] {"s1", "s2", "s3"},
+ new TSDataType[] {TSDataType.DOUBLE, TSDataType.FLOAT, TSDataType.INT64},
+ new String[] {"1.0", "2", "300"},
+ true),
+ new InsertRowPlan(
+ device,
+ 2000L,
+ new String[] {"s1", "s4"},
+ new TSDataType[] {TSDataType.DOUBLE, TSDataType.TEXT},
+ new String[] {"2.0", "abc"},
+ true),
+ };
+
+ InsertRowsOfOneDevicePlan p = new InsertRowsOfOneDevicePlan(device, rowPlans, new int[] {0, 1});
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream w = new DataOutputStream(baos);
+ p.serialize(w);
+ w.flush();
+ byte[] res = baos.toByteArray();
+ ByteBuffer buf = ByteBuffer.wrap(res);
+ InsertRowsOfOneDevicePlan p2 = (InsertRowsOfOneDevicePlan) PhysicalPlan.Factory.create(buf);
+ Assert.assertEquals(p, p2);
+ res = new byte[1024];
+ p.serialize(ByteBuffer.wrap(res));
+ buf = ByteBuffer.wrap(res);
+ p2 = (InsertRowsOfOneDevicePlan) PhysicalPlan.Factory.create(buf);
+ Assert.assertEquals(p, p2);
+ }
+}
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java b/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
index 2f2b8dc..cd48873 100644
--- a/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
@@ -43,6 +43,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -193,6 +194,67 @@ public abstract class Cases {
Assert.assertFalse(resultSet.next());
resultSet.close();
}
+
+ // test https://issues.apache.org/jira/browse/IOTDB-1600
+ try {
+ // Target data of device "root.ln.wf01.d_1600"
+ // Time s1 s2 s3
+ // 1000 1.0 2.0 null
+ // 2000 null 3.0 4.0
+ // 3000 5.0 6.0 7.0
+ String testDevice = "root.ln.wf01.d_1600";
+ session.createTimeseries(
+ testDevice + ".s1", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY);
+ session.createTimeseries(
+ testDevice + ".s2", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY);
+ session.createTimeseries(
+ testDevice + ".s3", TSDataType.DOUBLE, TSEncoding.GORILLA, CompressionType.SNAPPY);
+ List<Long> insertTimes = Arrays.asList(1000L, 2000L, 3000L);
+ List<List<String>> insertedMeasurements =
+ Arrays.asList(
+ Arrays.asList("s1", "s2"),
+ Arrays.asList("s2", "s3"),
+ Arrays.asList("s1", "s2", "s3"));
+ List<List<TSDataType>> insertedDataTypes =
+ Arrays.asList(
+ Arrays.asList(TSDataType.DOUBLE, TSDataType.DOUBLE),
+ Arrays.asList(TSDataType.DOUBLE, TSDataType.DOUBLE),
+ Arrays.asList(TSDataType.DOUBLE, TSDataType.DOUBLE, TSDataType.DOUBLE));
+ List<List<Object>> insertedValues =
+ Arrays.asList(
+ Arrays.asList(1.0D, 2.0D),
+ Arrays.asList(3.0D, 4.0D),
+ Arrays.asList(5.0D, 6.0D, 7.0D));
+ session.insertRecordsOfOneDevice(
+ testDevice, insertTimes, insertedMeasurements, insertedDataTypes, insertedValues);
+ final double E = 0.00001;
+ for (Statement readStatement : readStatements) {
+ resultSet = readStatement.executeQuery("select s1, s2, s3 from " + testDevice);
+ Assert.assertTrue(resultSet.next());
+ Assert.assertEquals(1000L, resultSet.getLong("Time"));
+ Assert.assertEquals(1.0D, resultSet.getDouble(testDevice + ".s1"), E);
+ Assert.assertEquals(2.0D, resultSet.getDouble(testDevice + ".s2"), E);
+ Assert.assertNull(resultSet.getObject(testDevice + ".s3"));
+
+ Assert.assertTrue(resultSet.next());
+ Assert.assertEquals(2000L, resultSet.getLong("Time"));
+ Assert.assertNull(resultSet.getObject(testDevice + ".s1"));
+ Assert.assertEquals(3.0D, resultSet.getDouble(testDevice + ".s2"), E);
+ Assert.assertEquals(4.0D, resultSet.getDouble(testDevice + ".s3"), E);
+
+ Assert.assertTrue(resultSet.next());
+ Assert.assertEquals(3000L, resultSet.getLong("Time"));
+ Assert.assertEquals(5.0D, resultSet.getDouble(testDevice + ".s1"), E);
+ Assert.assertEquals(6.0D, resultSet.getDouble(testDevice + ".s2"), E);
+ Assert.assertEquals(7.0D, resultSet.getDouble(testDevice + ".s3"), E);
+
+ Assert.assertFalse(resultSet.next());
+ resultSet.close();
+ }
+ } catch (IoTDBConnectionException | StatementExecutionException e) {
+ e.printStackTrace();
+ fail();
+ }
}
// test https://issues.apache.org/jira/browse/IOTDB-1266