You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/04/28 02:27:21 UTC
[incubator-iotdb] branch master updated: [IOTDB-624]Fix a Last
cache bug when WAL is disabled (#1109)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f9c63e5 [IOTDB-624]Fix a Last cache bug when WAL is disabled (#1109)
f9c63e5 is described below
commit f9c63e594e89a50a036b069106f4fb908caf337b
Author: wshao08 <59...@users.noreply.github.com>
AuthorDate: Tue Apr 28 10:27:13 2020 +0800
[IOTDB-624]Fix a Last cache bug when WAL is disabled (#1109)
* Fix a Last cache bug when WAL is disabled
---
.../engine/storagegroup/StorageGroupProcessor.java | 9 ++--
.../db/engine/storagegroup/TsFileProcessor.java | 9 ++++
.../db/qp/physical/crud/InsertTabletPlan.java | 14 +++---
.../org/apache/iotdb/session/IoTDBSessionIT.java | 54 ++++++++++++++++++++++
4 files changed, 76 insertions(+), 10 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 803e147..f5b078f 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -545,6 +545,9 @@ public class StorageGroupProcessor {
insertTabletToTsFileProcessor(insertTabletPlan, before, loc, isSequence, results,
beforeTimePartition);
}
+ long globalLatestFlushedTime = globalLatestFlushedTimeForEachDevice.getOrDefault(
+ insertTabletPlan.getDeviceId(), Long.MIN_VALUE);
+ tryToUpdateBatchInsertLastCache(insertTabletPlan, globalLatestFlushedTime);
return results;
} finally {
@@ -561,9 +564,12 @@ public class StorageGroupProcessor {
/**
* insert batch to tsfile processor thread-safety that the caller need to guarantee
+ * The rows to be inserted are in the range [start, end)
*
* @param insertTabletPlan insert a tablet of a device
* @param sequence whether is sequence
+ * @param start start index of rows to be inserted in insertTabletPlan
+ * @param end end index of rows to be inserted in insertTabletPlan
* @param results result array
* @param timePartitionId time partition id
*/
@@ -599,9 +605,6 @@ public class StorageGroupProcessor {
latestTimeForEachDevice.get(timePartitionId)
.put(insertTabletPlan.getDeviceId(), insertTabletPlan.getTimes()[end - 1]);
}
- long globalLatestFlushedTime = globalLatestFlushedTimeForEachDevice.getOrDefault(
- insertTabletPlan.getDeviceId(), Long.MIN_VALUE);
- tryToUpdateBatchInsertLastCache(insertTabletPlan, globalLatestFlushedTime);
// check memtable size and may async try to flush the work memtable
if (tsFileProcessor.shouldFlush()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 5b2234e..79618d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -177,6 +177,15 @@ public class TsFileProcessor {
}
}
+ /**
+ * insert batch data of insertTabletPlan into the workingMemtable
+ * The rows to be inserted are in the range [start, end)
+ *
+ * @param insertTabletPlan insert a tablet of a device
+ * @param start start index of rows to be inserted in insertTabletPlan
+ * @param end end index of rows to be inserted in insertTabletPlan
+ * @param results result array
+ */
public void insertTablet(InsertTabletPlan insertTabletPlan, int start, int end,
TSStatus[] results) throws WriteProcessException {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
index a26ac3b..4fcd813 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
@@ -425,33 +425,33 @@ public class InsertTabletPlan extends PhysicalPlan {
switch (dataTypes[measurementIndex]) {
case INT32:
int[] intValues = (int[]) columns[measurementIndex];
- value = new TsInt(intValues[end - 1]);
+ value = new TsInt(intValues[rowCount - 1]);
break;
case INT64:
long[] longValues = (long[]) columns[measurementIndex];
- value = new TsLong(longValues[end - 1]);
+ value = new TsLong(longValues[rowCount - 1]);
break;
case FLOAT:
float[] floatValues = (float[]) columns[measurementIndex];
- value = new TsFloat(floatValues[end - 1]);
+ value = new TsFloat(floatValues[rowCount - 1]);
break;
case DOUBLE:
double[] doubleValues = (double[]) columns[measurementIndex];
- value = new TsDouble(doubleValues[end - 1]);
+ value = new TsDouble(doubleValues[rowCount - 1]);
break;
case BOOLEAN:
boolean[] boolValues = (boolean[]) columns[measurementIndex];
- value = new TsBoolean(boolValues[end - 1]);
+ value = new TsBoolean(boolValues[rowCount - 1]);
break;
case TEXT:
Binary[] binaryValues = (Binary[]) columns[measurementIndex];
- value = new TsBinary(binaryValues[end - 1]);
+ value = new TsBinary(binaryValues[rowCount - 1]);
break;
default:
throw new UnSupportedDataTypeException(
String.format(DATATYPE_UNSUPPORTED, dataTypes[measurementIndex]));
}
- return new TimeValuePair(times[end - 1], value);
+ return new TimeValuePair(times[rowCount - 1], value);
}
public long[] getTimes() {
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
index fcc03b9..820dc3c 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
@@ -34,6 +34,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
import org.apache.iotdb.rpc.BatchExecutionException;
@@ -351,6 +352,59 @@ public class IoTDBSessionIT {
session.close();
}
+ @Test
+ public void TestSessionInterfacesWithDisabledWAL()
+ throws StatementExecutionException, IoTDBConnectionException,
+ BatchExecutionException {
+ session = new Session("127.0.0.1", 6667, "root", "root");
+ try {
+ session.open();
+ } catch (IoTDBConnectionException e) {
+ e.printStackTrace();
+ }
+
+ session.setStorageGroup("root.sg1");
+ String deviceId = "root.sg1.d1";
+
+ boolean isEnableWAL = IoTDBDescriptor.getInstance().getConfig().isEnableWal();
+ IoTDBDescriptor.getInstance().getConfig().setEnableWal(false);
+ createTimeseries();
+
+ // test insert record
+ List<String> measurements = new ArrayList<>();
+ measurements.add("s1");
+ measurements.add("s2");
+ measurements.add("s3");
+ for (long time = 0; time < 100; time++) {
+ List<String> values = new ArrayList<>();
+ values.add("1");
+ values.add("2");
+ values.add("3");
+ session.insertRecord(deviceId, time, measurements, values);
+ }
+
+ // test insert tablet
+ List<MeasurementSchema> schemaList = new ArrayList<>();
+ schemaList.add(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE));
+ schemaList.add(new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE));
+ schemaList.add(new MeasurementSchema("s3", TSDataType.INT64, TSEncoding.RLE));
+
+ Tablet tablet = new Tablet(deviceId, schemaList, 100);
+ long[] timestamps = tablet.timestamps;
+ Object[] values = tablet.values;
+ for (int time = 1; time <= 100; time++) {
+ timestamps[time - 1] = time;
+ for (int i = 0; i < 3; i++) {
+ long[] sensor = (long[]) values[i];
+ sensor[time - 1] = i;
+ }
+ tablet.rowSize++;
+ }
+
+ session.insertTablet(tablet);
+ IoTDBDescriptor.getInstance().getConfig().setEnableWal(isEnableWAL);
+ session.close();
+ }
private void createTimeseriesForTime()
throws StatementExecutionException, IoTDBConnectionException {