You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/06/01 14:04:50 UTC

[incubator-iotdb] 01/01: continue write inside InsertPlan

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch continue_write
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit ba15f610f28624ea425d72b98ef1b1b2d5ae4491
Author: qiaojialin <64...@qq.com>
AuthorDate: Mon Jun 1 22:04:30 2020 +0800

    continue write inside InsertPlan
---
 .../iotdb/db/engine/memtable/AbstractMemTable.java |  4 ++
 .../engine/storagegroup/StorageGroupProcessor.java |  5 +-
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  | 36 ++++++++-----
 .../iotdb/db/qp/physical/crud/InsertPlan.java      | 59 +++++++++++++++++++---
 .../iotdb/db/integration/IoTDBSimpleQueryIT.java   | 31 ++++++++++++
 5 files changed, 113 insertions(+), 22 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 5661dda..f5bfe5e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -100,6 +100,10 @@ public abstract class AbstractMemTable implements IMemTable {
   public void insert(InsertPlan insertPlan) {
     for (int i = 0; i < insertPlan.getValues().length; i++) {
 
+      if (insertPlan.getSchemas()[i] == null) {
+        continue;
+      }
+
       Object value = insertPlan.getValues()[i];
       memSize += MemUtils.getRecordSize(insertPlan.getSchemas()[i].getType(), value);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 80a915a..2f8e977 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -817,13 +817,16 @@ public class StorageGroupProcessor {
       node = MManager.getInstance().getDeviceNodeWithAutoCreateAndReadLock(plan.getDeviceId());
       String[] measurementList = plan.getMeasurements();
       for (int i = 0; i < measurementList.length; i++) {
+        if (plan.getSchemas()[i] == null) {
+          continue;
+        }
         // Update cached last value with high priority
         MNode measurementNode = node.getChild(measurementList[i]);
 
         ((LeafMNode) measurementNode)
             .updateCachedLast(plan.composeTimeValuePair(i), true, latestFlushedTime);
       }
-    } catch (MetadataException | QueryProcessException e) {
+    } catch (MetadataException e) {
       throw new WriteProcessException(e);
     } finally {
       if (node != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index a3fb017..e393040 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -873,28 +873,36 @@ public class PlanExecutor implements IPlanExecutor {
 
       for (int i = 0; i < measurementList.length; i++) {
         String measurement = measurementList[i];
-        if (!node.hasChild(measurement)) {
-          if (!IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
-            throw new PathNotExistException(deviceId + PATH_SEPARATOR + measurement);
+        try {
+          if (!node.hasChild(measurement)) {
+            if (!IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
+              throw new PathNotExistException(deviceId + PATH_SEPARATOR + measurement);
+            }
+            TSDataType dataType = TypeInferenceUtils
+                .getPredictedDataType(insertPlan.getValues()[i], insertPlan.isInferType());
+            Path path = new Path(deviceId, measurement);
+            internalCreateTimeseries(path.toString(), dataType);
           }
-          TSDataType dataType = TypeInferenceUtils
-              .getPredictedDataType(insertPlan.getValues()[i], insertPlan.isInferType());
-          Path path = new Path(deviceId, measurement);
-          internalCreateTimeseries(path.toString(), dataType);
-        }
-        LeafMNode measurementNode = (LeafMNode) node.getChild(measurement);
-        schemas[i] = measurementNode.getSchema();
-        // reset measurement to common name instead of alias
-        measurementList[i] = measurementNode.getName();
+          LeafMNode measurementNode = (LeafMNode) node.getChild(measurement);
+          schemas[i] = measurementNode.getSchema();
+          // reset measurement to common name instead of alias
+          measurementList[i] = measurementNode.getName();
 
-        if(!insertPlan.isInferType()) {
-          checkType(insertPlan, i, measurementNode.getSchema().getType());
+          if (!insertPlan.isInferType()) {
+            checkType(insertPlan, i, measurementNode.getSchema().getType());
+          }
+        } catch (MetadataException e) {
+          logger.warn("meet error when check {}.{}", deviceId, measurement, e);
+          insertPlan.markMeasurementInsertionFailed(i);
         }
       }
 
       insertPlan.setMeasurements(measurementList);
       insertPlan.setSchemasAndTransferType(schemas);
       StorageEngine.getInstance().insert(insertPlan);
+      if (insertPlan.getFailedMeasurements() != null) {
+        throw new StorageEngineException("failed to insert points " + insertPlan.getFailedMeasurements());
+      }
     } catch (StorageEngineException | MetadataException e) {
       throw new QueryProcessException(e);
     } finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index 116ca7a..3f7241d 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -23,8 +23,10 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
@@ -40,9 +42,13 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 import org.apache.iotdb.tsfile.write.record.TSRecord;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class InsertPlan extends PhysicalPlan {
 
+  private static final Logger logger = LoggerFactory.getLogger(InsertPlan.class);
+
   private long time;
   private String deviceId;
   private String[] measurements;
@@ -54,6 +60,8 @@ public class InsertPlan extends PhysicalPlan {
   // if inferType is true, values is String[], and infer types from them
   private boolean inferType = false;
 
+  // record the failed measurements
+  private List<String> failedMeasurements;
 
   public InsertPlan() {
     super(false, OperatorType.INSERT);
@@ -168,12 +176,35 @@ public class InsertPlan extends PhysicalPlan {
     this.schemas = schemas;
     if (inferType) {
       for (int i = 0; i < schemas.length; i++) {
+        if (schemas[i] == null) {
+          continue;
+        }
         types[i] = schemas[i].getType();
-        values[i] = CommonUtils.parseValue(types[i], values[i].toString());
+        try {
+          values[i] = CommonUtils.parseValue(types[i], values[i].toString());
+        } catch (Exception e) {
+          logger.warn("{}.{} data type is not consistent, input {}, registered {}", deviceId,
+              measurements[i], values[i], types[i]);
+          markMeasurementInsertionFailed(i);
+        }
       }
     }
   }
 
+  /**
+   * @param index failed measurement index
+   */
+  public void markMeasurementInsertionFailed(int index) {
+    if (failedMeasurements == null) {
+      failedMeasurements = new ArrayList<>();
+    }
+    failedMeasurements.add(measurements[index]);
+    schemas[index] = null;
+    measurements[index] = null;
+    types[index] = null;
+    values[index] = null;
+  }
+
   @Override
   public List<Path> getPaths() {
     List<Path> ret = new ArrayList<>();
@@ -235,10 +266,12 @@ public class InsertPlan extends PhysicalPlan {
 
     putString(stream, deviceId);
 
-    stream.writeInt(measurements.length);
+    stream.writeInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
 
     for (String m : measurements) {
-      putString(stream, m);
+      if (m != null) {
+        putString(stream, m);
+      }
     }
 
     for (MeasurementSchema schema : schemas) {
@@ -254,6 +287,9 @@ public class InsertPlan extends PhysicalPlan {
 
   private void putValues(DataOutputStream outputStream) throws QueryProcessException, IOException {
     for (int i = 0; i < values.length; i++) {
+      if (types[i] == null) {
+        continue;
+      }
       ReadWriteIOUtils.write(types[i], outputStream);
       switch (types[i]) {
         case BOOLEAN:
@@ -282,6 +318,9 @@ public class InsertPlan extends PhysicalPlan {
 
   private void putValues(ByteBuffer buffer) throws QueryProcessException {
     for (int i = 0; i < values.length; i++) {
+      if (types[i] == null) {
+        continue;
+      }
       ReadWriteIOUtils.write(types[i], buffer);
       switch (types[i]) {
         case BOOLEAN:
@@ -308,6 +347,10 @@ public class InsertPlan extends PhysicalPlan {
     }
   }
 
+  public List<String> getFailedMeasurements() {
+    return failedMeasurements;
+  }
+
   public TSDataType[] getTypes() {
     return types;
   }
@@ -352,10 +395,12 @@ public class InsertPlan extends PhysicalPlan {
 
     putString(buffer, deviceId);
 
-    buffer.putInt(measurements.length);
+    buffer.putInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
 
-    for (String m : measurements) {
-      putString(buffer, m);
+    for (String measurement : measurements) {
+      if (measurement != null) {
+        putString(buffer, measurement);
+      }
     }
 
     try {
@@ -391,7 +436,7 @@ public class InsertPlan extends PhysicalPlan {
     return "deviceId: " + deviceId + ", time: " + time;
   }
 
-  public TimeValuePair composeTimeValuePair(int measurementIndex) throws QueryProcessException {
+  public TimeValuePair composeTimeValuePair(int measurementIndex) {
     if (measurementIndex >= values.length) {
       return null;
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java
index 595c23f..cf89bc4 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.db.integration;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.sql.Connection;
@@ -27,6 +29,7 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.jdbc.IoTDBSQLException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -170,6 +173,34 @@ public class IoTDBSimpleQueryIT {
 
 
   @Test
+  public void testPartialInsertion() throws SQLException, ClassNotFoundException {
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try(Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/",
+            "root", "root");
+        Statement statement = connection.createStatement()){
+      statement.execute("SET STORAGE GROUP TO root.sg1");
+      statement.execute("CREATE TIMESERIES root.sg1.d0.s0 WITH DATATYPE=INT32,ENCODING=PLAIN");
+      statement.execute("CREATE TIMESERIES root.sg1.d0.s1 WITH DATATYPE=INT32,ENCODING=PLAIN");
+
+      // seq chunk : [1,10]
+      try {
+        statement.execute("INSERT INTO root.sg1.d0(timestamp, s0, s1) VALUES (1, 1, 2.2)");
+        fail();
+      } catch (IoTDBSQLException e) {
+        assertTrue(e.getMessage().contains("s1"));
+      }
+
+      ResultSet resultSet = statement.executeQuery("select s0, s1 from root.sg1.d0");
+
+      while(resultSet.next()) {
+        assertEquals(resultSet.getInt("root.sg1.d0.s0"), 1);
+        assertEquals(resultSet.getString("root.sg1.d0.s1"), null);
+      }
+    }
+  }
+
+  @Test
   public void testOverlappedPagesMerge() throws SQLException, ClassNotFoundException {
     Class.forName(Config.JDBC_DRIVER_NAME);
     try(Connection connection = DriverManager