You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/06/21 14:07:43 UTC

[incubator-iotdb] branch master updated: add partialInsert for insertTablets (#1395)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b39a23d  add partialInsert for insertTablets (#1395)
b39a23d is described below

commit b39a23d5575a418232b779524ffe21125d6274e2
Author: chaow <xu...@gmail.com>
AuthorDate: Sun Jun 21 22:07:32 2020 +0800

    add partialInsert for insertTablets (#1395)
    
    * add partialInsert for insertTablets
---
 .../iotdb/db/engine/memtable/AbstractMemTable.java |  6 +-
 .../engine/storagegroup/StorageGroupProcessor.java |  3 +
 .../org/apache/iotdb/db/metadata/MManager.java     | 38 +++-------
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  | 11 ++-
 .../db/qp/physical/crud/InsertTabletPlan.java      | 49 +++++++++++--
 .../java/org/apache/iotdb/db/utils/MemUtils.java   |  3 +
 .../apache/iotdb/db/writelog/WriteLogNodeTest.java | 34 +++++++++
 .../java/org/apache/iotdb/session/SessionUT.java   | 83 ++++++++++++++++++++--
 8 files changed, 190 insertions(+), 37 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 429124a..48c4a38 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -120,7 +120,8 @@ public abstract class AbstractMemTable implements IMemTable {
     try {
       write(insertTabletPlan, start, end);
       memSize += MemUtils.getRecordSize(insertTabletPlan, start, end);
-      totalPointsNum += insertTabletPlan.getMeasurements().length * (end - start);
+      totalPointsNum += (insertTabletPlan.getMeasurements().length - insertTabletPlan.getFailedMeasurementNumber())
+        * (end - start);
     } catch (RuntimeException e) {
       throw new WriteProcessException(e.getMessage());
     }
@@ -137,6 +138,9 @@ public abstract class AbstractMemTable implements IMemTable {
   @Override
   public void write(InsertTabletPlan insertTabletPlan, int start, int end) {
     for (int i = 0; i < insertTabletPlan.getMeasurements().length; i++) {
+      if (insertTabletPlan.getColumns()[i] == null) {
+        continue;
+      }
       IWritableMemChunk memSeries = createIfNotExistAndGet(insertTabletPlan.getDeviceId(),
           insertTabletPlan.getMeasurements()[i], insertTabletPlan.getSchemas()[i]);
       memSeries.write(insertTabletPlan.getTimes(), insertTabletPlan.getColumns()[i],
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 6f051cc..009acac 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -781,6 +781,9 @@ public class StorageGroupProcessor {
       node = manager.getDeviceNodeWithAutoCreateAndReadLock(plan.getDeviceId());
       String[] measurementList = plan.getMeasurements();
       for (int i = 0; i < measurementList.length; i++) {
+        if (plan.getColumns()[i] == null) {
+          continue;
+        }
         // Update cached last value with high priority
         ((MeasurementMNode) manager.getChild(node, measurementList[i]))
             .updateCachedLast(plan.composeLastTimeValuePair(i), true, latestFlushedTime);
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index c80ef9c..066b76e 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -18,27 +18,6 @@
  */
 package org.apache.iotdb.db.metadata;
 
-import static java.util.stream.Collectors.toList;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -47,12 +26,7 @@ import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.exception.ConfigAdjusterException;
-import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.metadata.PathNotExistException;
-import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
-import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.exception.metadata.*;
 import org.apache.iotdb.db.metadata.mnode.MNode;
 import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
 import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
@@ -74,6 +48,16 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static java.util.stream.Collectors.toList;
+
 /**
  * This class takes the responsibility of serialization of all the metadata info and persistent it
  * into files. This class contains all the interfaces to modify the metadata for delta system. All
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 45ea1fd..3cf570e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -1122,10 +1122,15 @@ public class PlanExecutor implements IPlanExecutor {
 
         // check data type
         if (measurementNode.getSchema().getType() != insertTabletPlan.getDataTypes()[i]) {
-          throw new QueryProcessException(String.format(
+          if (!enablePartialInsert) {
+            throw new QueryProcessException(String.format(
               "Datatype mismatch, Insert measurement %s type %s, metadata tree type %s",
               measurement, insertTabletPlan.getDataTypes()[i],
               measurementNode.getSchema().getType()));
+          } else {
+            insertTabletPlan.markMeasurementInsertionFailed(i);
+            continue;
+          }
         }
         schemas[i] = measurementNode.getSchema();
         // reset measurement to common name instead of alias
@@ -1133,6 +1138,10 @@ public class PlanExecutor implements IPlanExecutor {
       }
       insertTabletPlan.setSchemas(schemas);
       StorageEngine.getInstance().insertTablet(insertTabletPlan);
+      if (insertTabletPlan.getFailedMeasurements() != null) {
+        throw new StorageEngineException(
+          "failed to insert measurements " + insertTabletPlan.getFailedMeasurements());
+      }
     } catch (StorageEngineException | MetadataException e) {
       throw new QueryProcessException(e);
     } finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
index 8b6a64c..8ee8ce8 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
+import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -67,6 +68,9 @@ public class InsertTabletPlan extends PhysicalPlan {
   private int start;
   private int end;
 
+  // record the failed measurements
+  private List<String> failedMeasurements;
+
   public InsertTabletPlan() {
     super(false, OperatorType.BATCHINSERT);
   }
@@ -133,12 +137,18 @@ public class InsertTabletPlan extends PhysicalPlan {
 
     putString(stream, deviceId);
 
-    stream.writeInt(measurements.length);
+    stream.writeInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
     for (String m : measurements) {
+      if (m == null) {
+        continue;
+      }
       putString(stream, m);
     }
 
     for (TSDataType dataType : dataTypes) {
+      if (dataType == null) {
+        continue;
+      }
       stream.writeShort(dataType.serialize());
     }
 
@@ -163,6 +173,9 @@ public class InsertTabletPlan extends PhysicalPlan {
 
   private void serializeValues(DataOutputStream stream) throws IOException {
     for (int i = 0; i < measurements.length; i++) {
+      if (measurements[i] == null) {
+        continue;
+      }
       serializeColumn(dataTypes[i], columns[i], stream, index);
     }
   }
@@ -221,13 +234,17 @@ public class InsertTabletPlan extends PhysicalPlan {
 
     putString(buffer, deviceId);
 
-    buffer.putInt(measurements.length);
+    buffer.putInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
     for (String m : measurements) {
-      putString(buffer, m);
+      if (m != null) {
+        putString(buffer, m);
+      }
     }
 
     for (TSDataType dataType : dataTypes) {
-      dataType.serializeTo(buffer);
+      if (dataType != null) {
+        dataType.serializeTo(buffer);
+      }
     }
 
     buffer.putInt(end - start);
@@ -251,6 +268,9 @@ public class InsertTabletPlan extends PhysicalPlan {
 
   private void serializeValues(ByteBuffer buffer) {
     for (int i = 0; i < measurements.length; i++) {
+      if (measurements[i] == null) {
+        continue;
+      }
       serializeColumn(dataTypes[i], columns[i], buffer, start, end);
     }
   }
@@ -470,4 +490,25 @@ public class InsertTabletPlan extends PhysicalPlan {
     this.rowCount = size;
   }
 
+  /**
+   * @param index failed measurement index
+   */
+  public void markMeasurementInsertionFailed(int index) {
+    if (failedMeasurements == null) {
+      failedMeasurements = new ArrayList<>();
+    }
+    failedMeasurements.add(measurements[index]);
+    measurements[index] = null;
+    dataTypes[index] = null;
+    columns[index] = null;
+  }
+
+  public List<String> getFailedMeasurements() {
+    return failedMeasurements;
+  }
+
+  public int getFailedMeasurementNumber() {
+    return failedMeasurements == null ? 0 : failedMeasurements.size();
+  }
+
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
index daef654..affe70d 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
@@ -76,6 +76,9 @@ public class MemUtils {
     }
     long memSize = 0;
     for (int i = 0; i < insertTabletPlan.getMeasurements().length; i++) {
+      if (insertTabletPlan.getDataTypes()[i] == null) {
+        continue;
+      }
       switch (insertTabletPlan.getDataTypes()[i]) {
         case INT32:
           memSize += (end - start) * (8L + 4L); break;
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
index 953a76e..66f8f84 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
@@ -24,10 +24,14 @@ import static junit.framework.TestCase.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.db.writelog.io.ILogReader;
 import org.apache.iotdb.db.writelog.node.ExclusiveWriteLogNode;
@@ -71,8 +75,36 @@ public class WriteLogNodeTest {
         new String[]{"1.0", "15", "str", "false"});
     DeletePlan deletePlan = new DeletePlan(50, new Path(identifier + ".s1"));
 
+    long[] times = new long[]{110L, 111L, 112L, 113L};
+    List<Integer> dataTypes = new ArrayList<>();
+    dataTypes.add(TSDataType.DOUBLE.ordinal());
+    dataTypes.add(TSDataType.INT64.ordinal());
+    dataTypes.add(TSDataType.TEXT.ordinal());
+    dataTypes.add(TSDataType.BOOLEAN.ordinal());
+    Object[] columns = new Object[4];
+    columns[0] = new double[4];
+    columns[1] = new long[4];
+    columns[2] = new String[4];
+    columns[3] = new boolean[4];
+
+    for (int r = 0; r < 4; r++) {
+      ((double[]) columns[0])[r] = 1.0;
+      ((long[]) columns[1])[r] = 1;
+      ((String[]) columns[2])[r] = "hh" + r;
+      ((boolean[]) columns[3])[r] = false;
+    }
+
+    InsertTabletPlan tabletPlan = new InsertTabletPlan(identifier,
+      new String[]{"s1", "s2", "s3", "s4"}, dataTypes);
+    tabletPlan.setTimes(times);
+    tabletPlan.setColumns(columns);
+    tabletPlan.setRowCount(times.length);
+
+    tabletPlan.markMeasurementInsertionFailed(2);
+
     logNode.write(bwInsertPlan);
     logNode.write(deletePlan);
+    logNode.write(tabletPlan);
 
     logNode.close();
 
@@ -83,6 +115,8 @@ public class WriteLogNodeTest {
     ILogReader reader = logNode.getLogReader();
     assertEquals(bwInsertPlan, reader.next());
     assertEquals(deletePlan, reader.next());
+    InsertTabletPlan newPlan = (InsertTabletPlan) reader.next();
+    assertEquals(newPlan.getMeasurements().length, 3);
     reader.close();
 
     logNode.delete();
diff --git a/session/src/test/java/org/apache/iotdb/session/SessionUT.java b/session/src/test/java/org/apache/iotdb/session/SessionUT.java
index 2397734..2f60380 100644
--- a/session/src/test/java/org/apache/iotdb/session/SessionUT.java
+++ b/session/src/test/java/org/apache/iotdb/session/SessionUT.java
@@ -18,19 +18,44 @@
  */
 package org.apache.iotdb.session;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.fail;
-
 import java.util.ArrayList;
 import java.util.List;
+
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.tsfile.write.record.Tablet;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
+import static org.junit.Assert.*;
+
 public class SessionUT {
 
+    private Session session;
+
+    @Before
+    public void setUp() {
+        System.setProperty(IoTDBConstant.IOTDB_CONF, "src/test/resources/");
+        EnvironmentUtils.closeStatMonitor();
+        EnvironmentUtils.envSetUp();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        session.close();
+        EnvironmentUtils.cleanEnv();
+    }
+
     @Test
     public void testSortTablet() {
         /*
@@ -39,7 +64,7 @@ public class SessionUT {
         Before testing, change the sortTablet from private method to public method
         !!!
          */
-        Session session = new Session("127.0.0.1", 6667, "root", "root");
+        session = new Session("127.0.0.1", 6667, "root", "root");
         List<MeasurementSchema> schemaList = new ArrayList<>();
         schemaList.add(new MeasurementSchema("s1",TSDataType.INT64, TSEncoding.RLE));
         // insert three rows data
@@ -89,4 +114,54 @@ public class SessionUT {
             fail();
         }
     }
+
+    @Test
+    public void testInsertByStrAndSelectFailedData() throws IoTDBConnectionException, StatementExecutionException {
+        session = new Session("127.0.0.1", 6667, "root", "root");
+        session.open();
+
+        String deviceId = "root.sg1.d1";
+
+        session.createTimeseries(deviceId + "s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.UNCOMPRESSED);
+        session.createTimeseries(deviceId + "s2", TSDataType.INT64, TSEncoding.RLE, CompressionType.UNCOMPRESSED);
+        session.createTimeseries(deviceId + "s3", TSDataType.INT64, TSEncoding.RLE, CompressionType.UNCOMPRESSED);
+
+        List<MeasurementSchema> schemaList = new ArrayList<>();
+        schemaList.add(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE));
+        schemaList.add(new MeasurementSchema("s2", TSDataType.DOUBLE, TSEncoding.RLE));
+        schemaList.add(new MeasurementSchema("s3", TSDataType.TEXT, TSEncoding.PLAIN));
+
+        Tablet tablet = new Tablet("root.sg1.d1", schemaList, 10);
+
+        long[] timestamps = tablet.timestamps;
+        Object[] values = tablet.values;
+
+        for (long time = 0; time < 10; time++) {
+            int row = tablet.rowSize++;
+            timestamps[row] = time;
+            long[] sensor = (long[]) values[0];
+            sensor[row] = time;
+            double[] sensor2 = (double[]) values[1];
+            sensor2[row] = 0.1 + time;
+            Binary[] sensor3 = (Binary[]) values[2];
+            sensor3[row] = Binary.valueOf("ha" + time);
+        }
+
+        if (tablet.rowSize != 0) {
+            session.insertTablet(tablet);
+            tablet.reset();
+        }
+
+        SessionDataSet dataSet = session.executeQueryStatement("select * from root.sg1.d1");
+        int i = 0;
+        while (dataSet.hasNext()) {
+            RowRecord record = dataSet.next();
+            System.out.println(record.toString());
+            assertEquals(i, record.getFields().get(1).getLongV());
+            assertTrue(record.getFields().get(2).isNull());
+            assertTrue(record.getFields().get(3).isNull());
+            i++;
+        }
+    }
+
 }