You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by yu...@apache.org on 2020/07/07 15:26:09 UTC
[incubator-iotdb] 01/01: auto create from InsertTablePlan
This is an automated email from the ASF dual-hosted git repository.
yuyuankang pushed a commit to branch kyy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 6ae135174d958d6c6cf60717af722f3ae6e6e899
Author: Ring-k <yu...@hotmail.com>
AuthorDate: Tue Jul 7 23:24:26 2020 +0800
auto create from InsertTablePlan
---
.../cluster/server/member/MetaGroupMember.java | 32 +++++++++++-----------
.../iotdb/db/qp/physical/crud/InsertPlan.java | 2 ++
.../db/qp/physical/crud/InsertTabletPlan.java | 22 +++++++++++++++
3 files changed, 40 insertions(+), 16 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index d212933..f000c9a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -407,7 +407,6 @@ public class MetaGroupMember extends RaftMember {
}
-
/**
* Apply the addition of a new node. Register its identifier, add it to the node list and
* partition table, serialize the partition table and update the DataGroupMembers.
@@ -693,7 +692,6 @@ public class MetaGroupMember extends RaftMember {
}
-
/**
* @return Whether all nodes' identifier is known.
*/
@@ -722,7 +720,7 @@ public class MetaGroupMember extends RaftMember {
/**
* Process the join cluster request of "node". Only proceed when the partition table is ready.
*
- * @param node cannot be the local node
+ * @param node cannot be the local node
*/
public AddNodeResponse addNode(Node node, StartUpStatus startUpStatus)
throws AddSelfException, LogExecutionException {
@@ -1432,7 +1430,7 @@ public class MetaGroupMember extends RaftMember {
setStorageGroupResult.getCode(), storageGroupName)
);
}
- if(plan instanceof InsertRowPlan){
+ if (plan instanceof InsertRowPlan) {
// try to create timeseries
boolean isAutoCreateTimeseriesSuccess = autoCreateTimeseries((InsertRowPlan) plan);
if (!isAutoCreateTimeseriesSuccess) {
@@ -1451,9 +1449,9 @@ public class MetaGroupMember extends RaftMember {
* @return
*/
TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, PhysicalPlan plan) {
- InsertRowPlan backup = null;
- if (plan instanceof InsertRowPlan) {
- backup = (InsertRowPlan) ((InsertRowPlan) plan).clone();
+ InsertPlan backup = null;
+ if (plan instanceof InsertPlan) {
+ backup = (InsertPlan) ((InsertPlan) plan).clone();
}
// the error codes from the groups that cannot execute the plan
TSStatus status;
@@ -1466,7 +1464,7 @@ public class MetaGroupMember extends RaftMember {
status = forwardToMultipleGroup(planGroupMap);
}
}
- if (plan instanceof InsertRowPlan
+ if (plan instanceof InsertPlan
&& status.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode()
&& ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) {
// try to create timeseries
@@ -1607,7 +1605,7 @@ public class MetaGroupMember extends RaftMember {
* @param insertPlan, some of the timeseries in it are not created yet
* @return true of all uncreated timeseries are created
*/
- boolean autoCreateTimeseries(InsertRowPlan insertPlan) {
+ boolean autoCreateTimeseries(InsertPlan insertPlan) {
List<String> seriesList = new ArrayList<>();
String deviceId = insertPlan.getDeviceId();
String storageGroupName;
@@ -1629,7 +1627,9 @@ public class MetaGroupMember extends RaftMember {
for (String seriesPath : unregisteredSeriesList) {
int index = seriesList.indexOf(seriesPath);
TSDataType dataType = TypeInferenceUtils
- .getPredictedDataType(insertPlan.getValues()[index], true);
+ .getPredictedDataType(insertPlan instanceof InsertTabletPlan
+ ? ((Object[]) ((InsertTabletPlan) insertPlan).getColumns()[index])[0]
+ : ((InsertRowPlan) insertPlan).getValues()[index], true);
TSEncoding encoding = getDefaultEncoding(dataType);
CompressionType compressionType = TSFileDescriptor.getInstance().getConfig().getCompressor();
CreateTimeSeriesPlan createTimeSeriesPlan = new CreateTimeSeriesPlan(new Path(seriesPath),
@@ -1824,7 +1824,8 @@ public class MetaGroupMember extends RaftMember {
if (logger.isDebugEnabled()) {
logger.debug("{}: Pulled {} timeseries schemas of {} and other {} paths from {} of {}",
name,
- schemas.size(), request.getPrefixPaths().get(0), request.getPrefixPaths().size() - 1, node,
+ schemas.size(), request.getPrefixPaths().get(0), request.getPrefixPaths().size() - 1,
+ node,
request.getHeader());
}
results.addAll(schemas);
@@ -2654,16 +2655,16 @@ public class MetaGroupMember extends RaftMember {
}
-
/**
* Process the request of removing a node from the cluster. Reject the request if partition table
* is unavailable or the node is not the MetaLeader and it does not know who the leader is.
* Otherwise (being the MetaLeader), the request will be processed locally and broadcast to every
* node.
*
- * @param node the node to be removed.
+ * @param node the node to be removed.
*/
- public long removeNode(Node node) throws PartitionTableUnavailableException, LogExecutionException {
+ public long removeNode(Node node)
+ throws PartitionTableUnavailableException, LogExecutionException {
if (partitionTable == null) {
logger.info("Cannot add node now because the partition table is not set");
throw new PartitionTableUnavailableException(thisNode);
@@ -2675,13 +2676,12 @@ public class MetaGroupMember extends RaftMember {
}
-
/**
* Process a node removal request locally and broadcast it to the whole cluster. The removal will
* be rejected if number of nodes will fall below half of the replication number after this
* operation.
*
- * @param node the node to be removed.
+ * @param node the node to be removed.
* @return Long.MIN_VALUE if further forwarding is required, or the execution result
*/
private long processRemoveNodeLocally(Node node)
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index 3b4222c..5c81cdc 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -95,4 +95,6 @@ public abstract class InsertPlan extends PhysicalPlan {
dataTypes[index] = null;
}
+ @Override
+ public abstract Object clone();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
index f7b59db..dd6b5d5 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
@@ -482,4 +482,26 @@ public class InsertTabletPlan extends InsertPlan {
columns[index] = null;
}
+ @Override
+ public Object clone() {
+ String deviceIdClone = deviceId;
+ String[] measurementsClone = new String[this.measurements.length];
+ System.arraycopy(this.measurements, 0, measurementsClone, 0, measurementsClone.length);
+ TSDataType[] typesClone = new TSDataType[this.dataTypes.length];
+ System.arraycopy(this.dataTypes, 0, typesClone, 0, typesClone.length);
+ InsertTabletPlan ret = new InsertTabletPlan(deviceIdClone, measurementsClone);
+
+ ret.setDataTypes(typesClone);
+
+ long[] timesClone = new long[times.length];
+ System.arraycopy(this.times, 0, timesClone, 0, times.length);
+ ret.setTimes(timesClone);
+ Object[] columnsClone = new Object[columns.length];
+ System.arraycopy(this.columns, 0, columnsClone, 0, columns.length);
+ ret.setColumns(columnsClone);
+
+ ret.setRowCount(rowCount);
+ return ret;
+ }
+
}