You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/06/01 14:04:50 UTC
[incubator-iotdb] 01/01: continue write inside InsertPlan
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch continue_write
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit ba15f610f28624ea425d72b98ef1b1b2d5ae4491
Author: qiaojialin <64...@qq.com>
AuthorDate: Mon Jun 1 22:04:30 2020 +0800
continue write inside InsertPlan
---
.../iotdb/db/engine/memtable/AbstractMemTable.java | 4 ++
.../engine/storagegroup/StorageGroupProcessor.java | 5 +-
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 36 ++++++++-----
.../iotdb/db/qp/physical/crud/InsertPlan.java | 59 +++++++++++++++++++---
.../iotdb/db/integration/IoTDBSimpleQueryIT.java | 31 ++++++++++++
5 files changed, 113 insertions(+), 22 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 5661dda..f5bfe5e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -100,6 +100,10 @@ public abstract class AbstractMemTable implements IMemTable {
public void insert(InsertPlan insertPlan) {
for (int i = 0; i < insertPlan.getValues().length; i++) {
+ if (insertPlan.getSchemas()[i] == null) {
+ continue;
+ }
+
Object value = insertPlan.getValues()[i];
memSize += MemUtils.getRecordSize(insertPlan.getSchemas()[i].getType(), value);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 80a915a..2f8e977 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -817,13 +817,16 @@ public class StorageGroupProcessor {
node = MManager.getInstance().getDeviceNodeWithAutoCreateAndReadLock(plan.getDeviceId());
String[] measurementList = plan.getMeasurements();
for (int i = 0; i < measurementList.length; i++) {
+ if (plan.getSchemas()[i] == null) {
+ continue;
+ }
// Update cached last value with high priority
MNode measurementNode = node.getChild(measurementList[i]);
((LeafMNode) measurementNode)
.updateCachedLast(plan.composeTimeValuePair(i), true, latestFlushedTime);
}
- } catch (MetadataException | QueryProcessException e) {
+ } catch (MetadataException e) {
throw new WriteProcessException(e);
} finally {
if (node != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index a3fb017..e393040 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -873,28 +873,36 @@ public class PlanExecutor implements IPlanExecutor {
for (int i = 0; i < measurementList.length; i++) {
String measurement = measurementList[i];
- if (!node.hasChild(measurement)) {
- if (!IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
- throw new PathNotExistException(deviceId + PATH_SEPARATOR + measurement);
+ try {
+ if (!node.hasChild(measurement)) {
+ if (!IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
+ throw new PathNotExistException(deviceId + PATH_SEPARATOR + measurement);
+ }
+ TSDataType dataType = TypeInferenceUtils
+ .getPredictedDataType(insertPlan.getValues()[i], insertPlan.isInferType());
+ Path path = new Path(deviceId, measurement);
+ internalCreateTimeseries(path.toString(), dataType);
}
- TSDataType dataType = TypeInferenceUtils
- .getPredictedDataType(insertPlan.getValues()[i], insertPlan.isInferType());
- Path path = new Path(deviceId, measurement);
- internalCreateTimeseries(path.toString(), dataType);
- }
- LeafMNode measurementNode = (LeafMNode) node.getChild(measurement);
- schemas[i] = measurementNode.getSchema();
- // reset measurement to common name instead of alias
- measurementList[i] = measurementNode.getName();
+ LeafMNode measurementNode = (LeafMNode) node.getChild(measurement);
+ schemas[i] = measurementNode.getSchema();
+ // reset measurement to common name instead of alias
+ measurementList[i] = measurementNode.getName();
- if(!insertPlan.isInferType()) {
- checkType(insertPlan, i, measurementNode.getSchema().getType());
+ if (!insertPlan.isInferType()) {
+ checkType(insertPlan, i, measurementNode.getSchema().getType());
+ }
+ } catch (MetadataException e) {
+ logger.warn("meet error when check {}.{}", deviceId, measurement, e);
+ insertPlan.markMeasurementInsertionFailed(i);
}
}
insertPlan.setMeasurements(measurementList);
insertPlan.setSchemasAndTransferType(schemas);
StorageEngine.getInstance().insert(insertPlan);
+ if (insertPlan.getFailedMeasurements() != null) {
+ throw new StorageEngineException("failed to insert points " + insertPlan.getFailedMeasurements());
+ }
} catch (StorageEngineException | MetadataException e) {
throw new QueryProcessException(e);
} finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index 116ca7a..3f7241d 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -23,8 +23,10 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Objects;
+import java.util.Set;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
@@ -40,9 +42,13 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class InsertPlan extends PhysicalPlan {
+ private static final Logger logger = LoggerFactory.getLogger(InsertPlan.class);
+
private long time;
private String deviceId;
private String[] measurements;
@@ -54,6 +60,8 @@ public class InsertPlan extends PhysicalPlan {
// if inferType is true, values is String[], and infer types from them
private boolean inferType = false;
+ // record the failed measurements
+ private List<String> failedMeasurements;
public InsertPlan() {
super(false, OperatorType.INSERT);
@@ -168,12 +176,35 @@ public class InsertPlan extends PhysicalPlan {
this.schemas = schemas;
if (inferType) {
for (int i = 0; i < schemas.length; i++) {
+ if (schemas[i] == null) {
+ continue;
+ }
types[i] = schemas[i].getType();
- values[i] = CommonUtils.parseValue(types[i], values[i].toString());
+ try {
+ values[i] = CommonUtils.parseValue(types[i], values[i].toString());
+ } catch (Exception e) {
+ logger.warn("{}.{} data type is not consistent, input {}, registered {}", deviceId,
+ measurements[i], values[i], types[i]);
+ markMeasurementInsertionFailed(i);
+ }
}
}
}
+ /**
+ * @param index failed measurement index
+ */
+ public void markMeasurementInsertionFailed(int index) {
+ if (failedMeasurements == null) {
+ failedMeasurements = new ArrayList<>();
+ }
+ failedMeasurements.add(measurements[index]);
+ schemas[index] = null;
+ measurements[index] = null;
+ types[index] = null;
+ values[index] = null;
+ }
+
@Override
public List<Path> getPaths() {
List<Path> ret = new ArrayList<>();
@@ -235,10 +266,12 @@ public class InsertPlan extends PhysicalPlan {
putString(stream, deviceId);
- stream.writeInt(measurements.length);
+ stream.writeInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
for (String m : measurements) {
- putString(stream, m);
+ if (m != null) {
+ putString(stream, m);
+ }
}
for (MeasurementSchema schema : schemas) {
@@ -254,6 +287,9 @@ public class InsertPlan extends PhysicalPlan {
private void putValues(DataOutputStream outputStream) throws QueryProcessException, IOException {
for (int i = 0; i < values.length; i++) {
+ if (types[i] == null) {
+ continue;
+ }
ReadWriteIOUtils.write(types[i], outputStream);
switch (types[i]) {
case BOOLEAN:
@@ -282,6 +318,9 @@ public class InsertPlan extends PhysicalPlan {
private void putValues(ByteBuffer buffer) throws QueryProcessException {
for (int i = 0; i < values.length; i++) {
+ if (types[i] == null) {
+ continue;
+ }
ReadWriteIOUtils.write(types[i], buffer);
switch (types[i]) {
case BOOLEAN:
@@ -308,6 +347,10 @@ public class InsertPlan extends PhysicalPlan {
}
}
+ public List<String> getFailedMeasurements() {
+ return failedMeasurements;
+ }
+
public TSDataType[] getTypes() {
return types;
}
@@ -352,10 +395,12 @@ public class InsertPlan extends PhysicalPlan {
putString(buffer, deviceId);
- buffer.putInt(measurements.length);
+ buffer.putInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
- for (String m : measurements) {
- putString(buffer, m);
+ for (String measurement : measurements) {
+ if (measurement != null) {
+ putString(buffer, measurement);
+ }
}
try {
@@ -391,7 +436,7 @@ public class InsertPlan extends PhysicalPlan {
return "deviceId: " + deviceId + ", time: " + time;
}
- public TimeValuePair composeTimeValuePair(int measurementIndex) throws QueryProcessException {
+ public TimeValuePair composeTimeValuePair(int measurementIndex) {
if (measurementIndex >= values.length) {
return null;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java
index 595c23f..cf89bc4 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.integration;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.sql.Connection;
@@ -27,6 +29,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.jdbc.IoTDBSQLException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -170,6 +173,34 @@ public class IoTDBSimpleQueryIT {
@Test
+ public void testPartialInsertion() throws SQLException, ClassNotFoundException {
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ try(Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/",
+ "root", "root");
+ Statement statement = connection.createStatement()){
+ statement.execute("SET STORAGE GROUP TO root.sg1");
+ statement.execute("CREATE TIMESERIES root.sg1.d0.s0 WITH DATATYPE=INT32,ENCODING=PLAIN");
+ statement.execute("CREATE TIMESERIES root.sg1.d0.s1 WITH DATATYPE=INT32,ENCODING=PLAIN");
+
+ // seq chunk : [1,10]
+ try {
+ statement.execute("INSERT INTO root.sg1.d0(timestamp, s0, s1) VALUES (1, 1, 2.2)");
+ fail();
+ } catch (IoTDBSQLException e) {
+ assertTrue(e.getMessage().contains("s1"));
+ }
+
+ ResultSet resultSet = statement.executeQuery("select s0, s1 from root.sg1.d0");
+
+ while(resultSet.next()) {
+ assertEquals(resultSet.getInt("root.sg1.d0.s0"), 1);
+ assertEquals(resultSet.getString("root.sg1.d0.s1"), null);
+ }
+ }
+ }
+
+ @Test
public void testOverlappedPagesMerge() throws SQLException, ClassNotFoundException {
Class.forName(Config.JDBC_DRIVER_NAME);
try(Connection connection = DriverManager