You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/06/01 14:04:49 UTC

[incubator-iotdb] branch continue_write created (now ba15f61)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a change to branch continue_write
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at ba15f61  continue write inside InsertPlan

This branch includes the following new commits:

     new ba15f61  continue write inside InsertPlan

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: continue write inside InsertPlan

Posted by qi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch continue_write
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit ba15f610f28624ea425d72b98ef1b1b2d5ae4491
Author: qiaojialin <64...@qq.com>
AuthorDate: Mon Jun 1 22:04:30 2020 +0800

    continue write inside InsertPlan
---
 .../iotdb/db/engine/memtable/AbstractMemTable.java |  4 ++
 .../engine/storagegroup/StorageGroupProcessor.java |  5 +-
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  | 36 ++++++++-----
 .../iotdb/db/qp/physical/crud/InsertPlan.java      | 59 +++++++++++++++++++---
 .../iotdb/db/integration/IoTDBSimpleQueryIT.java   | 31 ++++++++++++
 5 files changed, 113 insertions(+), 22 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 5661dda..f5bfe5e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -100,6 +100,10 @@ public abstract class AbstractMemTable implements IMemTable {
   public void insert(InsertPlan insertPlan) {
     for (int i = 0; i < insertPlan.getValues().length; i++) {
 
+      if (insertPlan.getSchemas()[i] == null) {
+        continue;
+      }
+
       Object value = insertPlan.getValues()[i];
       memSize += MemUtils.getRecordSize(insertPlan.getSchemas()[i].getType(), value);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 80a915a..2f8e977 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -817,13 +817,16 @@ public class StorageGroupProcessor {
       node = MManager.getInstance().getDeviceNodeWithAutoCreateAndReadLock(plan.getDeviceId());
       String[] measurementList = plan.getMeasurements();
       for (int i = 0; i < measurementList.length; i++) {
+        if (plan.getSchemas()[i] == null) {
+          continue;
+        }
         // Update cached last value with high priority
         MNode measurementNode = node.getChild(measurementList[i]);
 
         ((LeafMNode) measurementNode)
             .updateCachedLast(plan.composeTimeValuePair(i), true, latestFlushedTime);
       }
-    } catch (MetadataException | QueryProcessException e) {
+    } catch (MetadataException e) {
       throw new WriteProcessException(e);
     } finally {
       if (node != null) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index a3fb017..e393040 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -873,28 +873,36 @@ public class PlanExecutor implements IPlanExecutor {
 
       for (int i = 0; i < measurementList.length; i++) {
         String measurement = measurementList[i];
-        if (!node.hasChild(measurement)) {
-          if (!IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
-            throw new PathNotExistException(deviceId + PATH_SEPARATOR + measurement);
+        try {
+          if (!node.hasChild(measurement)) {
+            if (!IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
+              throw new PathNotExistException(deviceId + PATH_SEPARATOR + measurement);
+            }
+            TSDataType dataType = TypeInferenceUtils
+                .getPredictedDataType(insertPlan.getValues()[i], insertPlan.isInferType());
+            Path path = new Path(deviceId, measurement);
+            internalCreateTimeseries(path.toString(), dataType);
           }
-          TSDataType dataType = TypeInferenceUtils
-              .getPredictedDataType(insertPlan.getValues()[i], insertPlan.isInferType());
-          Path path = new Path(deviceId, measurement);
-          internalCreateTimeseries(path.toString(), dataType);
-        }
-        LeafMNode measurementNode = (LeafMNode) node.getChild(measurement);
-        schemas[i] = measurementNode.getSchema();
-        // reset measurement to common name instead of alias
-        measurementList[i] = measurementNode.getName();
+          LeafMNode measurementNode = (LeafMNode) node.getChild(measurement);
+          schemas[i] = measurementNode.getSchema();
+          // reset measurement to common name instead of alias
+          measurementList[i] = measurementNode.getName();
 
-        if(!insertPlan.isInferType()) {
-          checkType(insertPlan, i, measurementNode.getSchema().getType());
+          if (!insertPlan.isInferType()) {
+            checkType(insertPlan, i, measurementNode.getSchema().getType());
+          }
+        } catch (MetadataException e) {
+          logger.warn("meet error when check {}.{}", deviceId, measurement, e);
+          insertPlan.markMeasurementInsertionFailed(i);
         }
       }
 
       insertPlan.setMeasurements(measurementList);
       insertPlan.setSchemasAndTransferType(schemas);
       StorageEngine.getInstance().insert(insertPlan);
+      if (insertPlan.getFailedMeasurements() != null) {
+        throw new StorageEngineException("failed to insert points " + insertPlan.getFailedMeasurements());
+      }
     } catch (StorageEngineException | MetadataException e) {
       throw new QueryProcessException(e);
     } finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index 116ca7a..3f7241d 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -23,8 +23,10 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
@@ -40,9 +42,13 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 import org.apache.iotdb.tsfile.write.record.TSRecord;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class InsertPlan extends PhysicalPlan {
 
+  private static final Logger logger = LoggerFactory.getLogger(InsertPlan.class);
+
   private long time;
   private String deviceId;
   private String[] measurements;
@@ -54,6 +60,8 @@ public class InsertPlan extends PhysicalPlan {
   // if inferType is true, values is String[], and infer types from them
   private boolean inferType = false;
 
+  // record the failed measurements
+  private List<String> failedMeasurements;
 
   public InsertPlan() {
     super(false, OperatorType.INSERT);
@@ -168,12 +176,35 @@ public class InsertPlan extends PhysicalPlan {
     this.schemas = schemas;
     if (inferType) {
       for (int i = 0; i < schemas.length; i++) {
+        if (schemas[i] == null) {
+          continue;
+        }
         types[i] = schemas[i].getType();
-        values[i] = CommonUtils.parseValue(types[i], values[i].toString());
+        try {
+          values[i] = CommonUtils.parseValue(types[i], values[i].toString());
+        } catch (Exception e) {
+          logger.warn("{}.{} data type is not consistent, input {}, registered {}", deviceId,
+              measurements[i], values[i], types[i]);
+          markMeasurementInsertionFailed(i);
+        }
       }
     }
   }
 
+  /**
+   * @param index failed measurement index
+   */
+  public void markMeasurementInsertionFailed(int index) {
+    if (failedMeasurements == null) {
+      failedMeasurements = new ArrayList<>();
+    }
+    failedMeasurements.add(measurements[index]);
+    schemas[index] = null;
+    measurements[index] = null;
+    types[index] = null;
+    values[index] = null;
+  }
+
   @Override
   public List<Path> getPaths() {
     List<Path> ret = new ArrayList<>();
@@ -235,10 +266,12 @@ public class InsertPlan extends PhysicalPlan {
 
     putString(stream, deviceId);
 
-    stream.writeInt(measurements.length);
+    stream.writeInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
 
     for (String m : measurements) {
-      putString(stream, m);
+      if (m != null) {
+        putString(stream, m);
+      }
     }
 
     for (MeasurementSchema schema : schemas) {
@@ -254,6 +287,9 @@ public class InsertPlan extends PhysicalPlan {
 
   private void putValues(DataOutputStream outputStream) throws QueryProcessException, IOException {
     for (int i = 0; i < values.length; i++) {
+      if (types[i] == null) {
+        continue;
+      }
       ReadWriteIOUtils.write(types[i], outputStream);
       switch (types[i]) {
         case BOOLEAN:
@@ -282,6 +318,9 @@ public class InsertPlan extends PhysicalPlan {
 
   private void putValues(ByteBuffer buffer) throws QueryProcessException {
     for (int i = 0; i < values.length; i++) {
+      if (types[i] == null) {
+        continue;
+      }
       ReadWriteIOUtils.write(types[i], buffer);
       switch (types[i]) {
         case BOOLEAN:
@@ -308,6 +347,10 @@ public class InsertPlan extends PhysicalPlan {
     }
   }
 
+  public List<String> getFailedMeasurements() {
+    return failedMeasurements;
+  }
+
   public TSDataType[] getTypes() {
     return types;
   }
@@ -352,10 +395,12 @@ public class InsertPlan extends PhysicalPlan {
 
     putString(buffer, deviceId);
 
-    buffer.putInt(measurements.length);
+    buffer.putInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
 
-    for (String m : measurements) {
-      putString(buffer, m);
+    for (String measurement : measurements) {
+      if (measurement != null) {
+        putString(buffer, measurement);
+      }
     }
 
     try {
@@ -391,7 +436,7 @@ public class InsertPlan extends PhysicalPlan {
     return "deviceId: " + deviceId + ", time: " + time;
   }
 
-  public TimeValuePair composeTimeValuePair(int measurementIndex) throws QueryProcessException {
+  public TimeValuePair composeTimeValuePair(int measurementIndex) {
     if (measurementIndex >= values.length) {
       return null;
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java
index 595c23f..cf89bc4 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.db.integration;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.sql.Connection;
@@ -27,6 +29,7 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.jdbc.IoTDBSQLException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -170,6 +173,34 @@ public class IoTDBSimpleQueryIT {
 
 
   @Test
+  public void testPartialInsertion() throws SQLException, ClassNotFoundException {
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try(Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/",
+            "root", "root");
+        Statement statement = connection.createStatement()){
+      statement.execute("SET STORAGE GROUP TO root.sg1");
+      statement.execute("CREATE TIMESERIES root.sg1.d0.s0 WITH DATATYPE=INT32,ENCODING=PLAIN");
+      statement.execute("CREATE TIMESERIES root.sg1.d0.s1 WITH DATATYPE=INT32,ENCODING=PLAIN");
+
+      // seq chunk : [1,10]
+      try {
+        statement.execute("INSERT INTO root.sg1.d0(timestamp, s0, s1) VALUES (1, 1, 2.2)");
+        fail();
+      } catch (IoTDBSQLException e) {
+        assertTrue(e.getMessage().contains("s1"));
+      }
+
+      ResultSet resultSet = statement.executeQuery("select s0, s1 from root.sg1.d0");
+
+      while(resultSet.next()) {
+        assertEquals(resultSet.getInt("root.sg1.d0.s0"), 1);
+        assertEquals(resultSet.getString("root.sg1.d0.s1"), null);
+      }
+    }
+  }
+
+  @Test
   public void testOverlappedPagesMerge() throws SQLException, ClassNotFoundException {
     Class.forName(Config.JDBC_DRIVER_NAME);
     try(Connection connection = DriverManager