You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/06/21 14:07:43 UTC
[incubator-iotdb] branch master updated: add partialInsert for
insertTablets (#1395)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b39a23d add partialInsert for insertTablets (#1395)
b39a23d is described below
commit b39a23d5575a418232b779524ffe21125d6274e2
Author: chaow <xu...@gmail.com>
AuthorDate: Sun Jun 21 22:07:32 2020 +0800
add partialInsert for insertTablets (#1395)
* add partialInsert for insertTablets
---
.../iotdb/db/engine/memtable/AbstractMemTable.java | 6 +-
.../engine/storagegroup/StorageGroupProcessor.java | 3 +
.../org/apache/iotdb/db/metadata/MManager.java | 38 +++-------
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 11 ++-
.../db/qp/physical/crud/InsertTabletPlan.java | 49 +++++++++++--
.../java/org/apache/iotdb/db/utils/MemUtils.java | 3 +
.../apache/iotdb/db/writelog/WriteLogNodeTest.java | 34 +++++++++
.../java/org/apache/iotdb/session/SessionUT.java | 83 ++++++++++++++++++++--
8 files changed, 190 insertions(+), 37 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 429124a..48c4a38 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -120,7 +120,8 @@ public abstract class AbstractMemTable implements IMemTable {
try {
write(insertTabletPlan, start, end);
memSize += MemUtils.getRecordSize(insertTabletPlan, start, end);
- totalPointsNum += insertTabletPlan.getMeasurements().length * (end - start);
+ totalPointsNum += (insertTabletPlan.getMeasurements().length - insertTabletPlan.getFailedMeasurementNumber())
+ * (end - start);
} catch (RuntimeException e) {
throw new WriteProcessException(e.getMessage());
}
@@ -137,6 +138,9 @@ public abstract class AbstractMemTable implements IMemTable {
@Override
public void write(InsertTabletPlan insertTabletPlan, int start, int end) {
for (int i = 0; i < insertTabletPlan.getMeasurements().length; i++) {
+ if (insertTabletPlan.getColumns()[i] == null) {
+ continue;
+ }
IWritableMemChunk memSeries = createIfNotExistAndGet(insertTabletPlan.getDeviceId(),
insertTabletPlan.getMeasurements()[i], insertTabletPlan.getSchemas()[i]);
memSeries.write(insertTabletPlan.getTimes(), insertTabletPlan.getColumns()[i],
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 6f051cc..009acac 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -781,6 +781,9 @@ public class StorageGroupProcessor {
node = manager.getDeviceNodeWithAutoCreateAndReadLock(plan.getDeviceId());
String[] measurementList = plan.getMeasurements();
for (int i = 0; i < measurementList.length; i++) {
+ if (plan.getColumns()[i] == null) {
+ continue;
+ }
// Update cached last value with high priority
((MeasurementMNode) manager.getChild(node, measurementList[i]))
.updateCachedLast(plan.composeLastTimeValuePair(i), true, latestFlushedTime);
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index c80ef9c..066b76e 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -18,27 +18,6 @@
*/
package org.apache.iotdb.db.metadata;
-import static java.util.stream.Collectors.toList;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -47,12 +26,7 @@ import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.exception.ConfigAdjusterException;
-import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
-import org.apache.iotdb.db.exception.metadata.IllegalPathException;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.metadata.PathNotExistException;
-import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
-import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.exception.metadata.*;
import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
@@ -74,6 +48,16 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static java.util.stream.Collectors.toList;
+
/**
* This class takes the responsibility of serialization of all the metadata info and persistent it
* into files. This class contains all the interfaces to modify the metadata for delta system. All
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 45ea1fd..3cf570e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -1122,10 +1122,15 @@ public class PlanExecutor implements IPlanExecutor {
// check data type
if (measurementNode.getSchema().getType() != insertTabletPlan.getDataTypes()[i]) {
- throw new QueryProcessException(String.format(
+ if (!enablePartialInsert) {
+ throw new QueryProcessException(String.format(
"Datatype mismatch, Insert measurement %s type %s, metadata tree type %s",
measurement, insertTabletPlan.getDataTypes()[i],
measurementNode.getSchema().getType()));
+ } else {
+ insertTabletPlan.markMeasurementInsertionFailed(i);
+ continue;
+ }
}
schemas[i] = measurementNode.getSchema();
// reset measurement to common name instead of alias
@@ -1133,6 +1138,10 @@ public class PlanExecutor implements IPlanExecutor {
}
insertTabletPlan.setSchemas(schemas);
StorageEngine.getInstance().insertTablet(insertTabletPlan);
+ if (insertTabletPlan.getFailedMeasurements() != null) {
+ throw new StorageEngineException(
+ "failed to insert measurements " + insertTabletPlan.getFailedMeasurements());
+ }
} catch (StorageEngineException | MetadataException e) {
throw new QueryProcessException(e);
} finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
index 8b6a64c..8ee8ce8 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
@@ -28,6 +28,7 @@ import java.util.Set;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
+import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
@@ -67,6 +68,9 @@ public class InsertTabletPlan extends PhysicalPlan {
private int start;
private int end;
+ // record the failed measurements
+ private List<String> failedMeasurements;
+
public InsertTabletPlan() {
super(false, OperatorType.BATCHINSERT);
}
@@ -133,12 +137,18 @@ public class InsertTabletPlan extends PhysicalPlan {
putString(stream, deviceId);
- stream.writeInt(measurements.length);
+ stream.writeInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
for (String m : measurements) {
+ if (m == null) {
+ continue;
+ }
putString(stream, m);
}
for (TSDataType dataType : dataTypes) {
+ if (dataType == null) {
+ continue;
+ }
stream.writeShort(dataType.serialize());
}
@@ -163,6 +173,9 @@ public class InsertTabletPlan extends PhysicalPlan {
private void serializeValues(DataOutputStream stream) throws IOException {
for (int i = 0; i < measurements.length; i++) {
+ if (measurements[i] == null) {
+ continue;
+ }
serializeColumn(dataTypes[i], columns[i], stream, index);
}
}
@@ -221,13 +234,17 @@ public class InsertTabletPlan extends PhysicalPlan {
putString(buffer, deviceId);
- buffer.putInt(measurements.length);
+ buffer.putInt(measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
for (String m : measurements) {
- putString(buffer, m);
+ if (m != null) {
+ putString(buffer, m);
+ }
}
for (TSDataType dataType : dataTypes) {
- dataType.serializeTo(buffer);
+ if (dataType != null) {
+ dataType.serializeTo(buffer);
+ }
}
buffer.putInt(end - start);
@@ -251,6 +268,9 @@ public class InsertTabletPlan extends PhysicalPlan {
private void serializeValues(ByteBuffer buffer) {
for (int i = 0; i < measurements.length; i++) {
+ if (measurements[i] == null) {
+ continue;
+ }
serializeColumn(dataTypes[i], columns[i], buffer, start, end);
}
}
@@ -470,4 +490,25 @@ public class InsertTabletPlan extends PhysicalPlan {
this.rowCount = size;
}
+ /**
+ * @param index failed measurement index
+ */
+ public void markMeasurementInsertionFailed(int index) {
+ if (failedMeasurements == null) {
+ failedMeasurements = new ArrayList<>();
+ }
+ failedMeasurements.add(measurements[index]);
+ measurements[index] = null;
+ dataTypes[index] = null;
+ columns[index] = null;
+ }
+
+ public List<String> getFailedMeasurements() {
+ return failedMeasurements;
+ }
+
+ public int getFailedMeasurementNumber() {
+ return failedMeasurements == null ? 0 : failedMeasurements.size();
+ }
+
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
index daef654..affe70d 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/MemUtils.java
@@ -76,6 +76,9 @@ public class MemUtils {
}
long memSize = 0;
for (int i = 0; i < insertTabletPlan.getMeasurements().length; i++) {
+ if (insertTabletPlan.getDataTypes()[i] == null) {
+ continue;
+ }
switch (insertTabletPlan.getDataTypes()[i]) {
case INT32:
memSize += (end - start) * (8L + 4L); break;
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
index 953a76e..66f8f84 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/WriteLogNodeTest.java
@@ -24,10 +24,14 @@ import static junit.framework.TestCase.assertTrue;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.writelog.io.ILogReader;
import org.apache.iotdb.db.writelog.node.ExclusiveWriteLogNode;
@@ -71,8 +75,36 @@ public class WriteLogNodeTest {
new String[]{"1.0", "15", "str", "false"});
DeletePlan deletePlan = new DeletePlan(50, new Path(identifier + ".s1"));
+ long[] times = new long[]{110L, 111L, 112L, 113L};
+ List<Integer> dataTypes = new ArrayList<>();
+ dataTypes.add(TSDataType.DOUBLE.ordinal());
+ dataTypes.add(TSDataType.INT64.ordinal());
+ dataTypes.add(TSDataType.TEXT.ordinal());
+ dataTypes.add(TSDataType.BOOLEAN.ordinal());
+ Object[] columns = new Object[4];
+ columns[0] = new double[4];
+ columns[1] = new long[4];
+ columns[2] = new String[4];
+ columns[3] = new boolean[4];
+
+ for (int r = 0; r < 4; r++) {
+ ((double[]) columns[0])[r] = 1.0;
+ ((long[]) columns[1])[r] = 1;
+ ((String[]) columns[2])[r] = "hh" + r;
+ ((boolean[]) columns[3])[r] = false;
+ }
+
+ InsertTabletPlan tabletPlan = new InsertTabletPlan(identifier,
+ new String[]{"s1", "s2", "s3", "s4"}, dataTypes);
+ tabletPlan.setTimes(times);
+ tabletPlan.setColumns(columns);
+ tabletPlan.setRowCount(times.length);
+
+ tabletPlan.markMeasurementInsertionFailed(2);
+
logNode.write(bwInsertPlan);
logNode.write(deletePlan);
+ logNode.write(tabletPlan);
logNode.close();
@@ -83,6 +115,8 @@ public class WriteLogNodeTest {
ILogReader reader = logNode.getLogReader();
assertEquals(bwInsertPlan, reader.next());
assertEquals(deletePlan, reader.next());
+ InsertTabletPlan newPlan = (InsertTabletPlan) reader.next();
+ assertEquals(newPlan.getMeasurements().length, 3);
reader.close();
logNode.delete();
diff --git a/session/src/test/java/org/apache/iotdb/session/SessionUT.java b/session/src/test/java/org/apache/iotdb/session/SessionUT.java
index 2397734..2f60380 100644
--- a/session/src/test/java/org/apache/iotdb/session/SessionUT.java
+++ b/session/src/test/java/org/apache/iotdb/session/SessionUT.java
@@ -18,19 +18,44 @@
*/
package org.apache.iotdb.session;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.fail;
-
import java.util.ArrayList;
import java.util.List;
+
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
+import static org.junit.Assert.*;
+
public class SessionUT {
+ private Session session;
+
+ @Before
+ public void setUp() {
+ System.setProperty(IoTDBConstant.IOTDB_CONF, "src/test/resources/");
+ EnvironmentUtils.closeStatMonitor();
+ EnvironmentUtils.envSetUp();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ session.close();
+ EnvironmentUtils.cleanEnv();
+ }
+
@Test
public void testSortTablet() {
/*
@@ -39,7 +64,7 @@ public class SessionUT {
Before testing, change the sortTablet from private method to public method
!!!
*/
- Session session = new Session("127.0.0.1", 6667, "root", "root");
+ session = new Session("127.0.0.1", 6667, "root", "root");
List<MeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(new MeasurementSchema("s1",TSDataType.INT64, TSEncoding.RLE));
// insert three rows data
@@ -89,4 +114,54 @@ public class SessionUT {
fail();
}
}
+
+ @Test
+ public void testInsertByStrAndSelectFailedData() throws IoTDBConnectionException, StatementExecutionException {
+ session = new Session("127.0.0.1", 6667, "root", "root");
+ session.open();
+
+ String deviceId = "root.sg1.d1";
+
+ session.createTimeseries(deviceId + "s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.UNCOMPRESSED);
+ session.createTimeseries(deviceId + "s2", TSDataType.INT64, TSEncoding.RLE, CompressionType.UNCOMPRESSED);
+ session.createTimeseries(deviceId + "s3", TSDataType.INT64, TSEncoding.RLE, CompressionType.UNCOMPRESSED);
+
+ List<MeasurementSchema> schemaList = new ArrayList<>();
+ schemaList.add(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE));
+ schemaList.add(new MeasurementSchema("s2", TSDataType.DOUBLE, TSEncoding.RLE));
+ schemaList.add(new MeasurementSchema("s3", TSDataType.TEXT, TSEncoding.PLAIN));
+
+ Tablet tablet = new Tablet("root.sg1.d1", schemaList, 10);
+
+ long[] timestamps = tablet.timestamps;
+ Object[] values = tablet.values;
+
+ for (long time = 0; time < 10; time++) {
+ int row = tablet.rowSize++;
+ timestamps[row] = time;
+ long[] sensor = (long[]) values[0];
+ sensor[row] = time;
+ double[] sensor2 = (double[]) values[1];
+ sensor2[row] = 0.1 + time;
+ Binary[] sensor3 = (Binary[]) values[2];
+ sensor3[row] = Binary.valueOf("ha" + time);
+ }
+
+ if (tablet.rowSize != 0) {
+ session.insertTablet(tablet);
+ tablet.reset();
+ }
+
+ SessionDataSet dataSet = session.executeQueryStatement("select * from root.sg1.d1");
+ int i = 0;
+ while (dataSet.hasNext()) {
+ RowRecord record = dataSet.next();
+ System.out.println(record.toString());
+ assertEquals(i, record.getFields().get(1).getLongV());
+ assertTrue(record.getFields().get(2).isNull());
+ assertTrue(record.getFields().get(3).isNull());
+ i++;
+ }
+ }
+
}