You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/04/28 02:27:21 UTC

[incubator-iotdb] branch master updated: [IOTDB-624]Fix a Last cache bug when WAL is disabled (#1109)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f9c63e5  [IOTDB-624]Fix a Last cache bug when WAL is disabled (#1109)
f9c63e5 is described below

commit f9c63e594e89a50a036b069106f4fb908caf337b
Author: wshao08 <59...@users.noreply.github.com>
AuthorDate: Tue Apr 28 10:27:13 2020 +0800

    [IOTDB-624]Fix a Last cache bug when WAL is disabled (#1109)
    
    * Fix a Last cache bug when WAL is disabled
---
 .../engine/storagegroup/StorageGroupProcessor.java |  9 ++--
 .../db/engine/storagegroup/TsFileProcessor.java    |  9 ++++
 .../db/qp/physical/crud/InsertTabletPlan.java      | 14 +++---
 .../org/apache/iotdb/session/IoTDBSessionIT.java   | 54 ++++++++++++++++++++++
 4 files changed, 76 insertions(+), 10 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 803e147..f5b078f 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -545,6 +545,9 @@ public class StorageGroupProcessor {
         insertTabletToTsFileProcessor(insertTabletPlan, before, loc, isSequence, results,
             beforeTimePartition);
       }
+      long globalLatestFlushedTime = globalLatestFlushedTimeForEachDevice.getOrDefault(
+          insertTabletPlan.getDeviceId(), Long.MIN_VALUE);
+      tryToUpdateBatchInsertLastCache(insertTabletPlan, globalLatestFlushedTime);
 
       return results;
     } finally {
@@ -561,9 +564,12 @@ public class StorageGroupProcessor {
 
   /**
    * insert batch to tsfile processor thread-safety that the caller need to guarantee
+   * The rows to be inserted are in the range [start, end)
    *
    * @param insertTabletPlan insert a tablet of a device
    * @param sequence whether is sequence
+   * @param start start index of rows to be inserted in insertTabletPlan
+   * @param end end index of rows to be inserted in insertTabletPlan
    * @param results result array
    * @param timePartitionId time partition id
    */
@@ -599,9 +605,6 @@ public class StorageGroupProcessor {
       latestTimeForEachDevice.get(timePartitionId)
           .put(insertTabletPlan.getDeviceId(), insertTabletPlan.getTimes()[end - 1]);
     }
-    long globalLatestFlushedTime = globalLatestFlushedTimeForEachDevice.getOrDefault(
-        insertTabletPlan.getDeviceId(), Long.MIN_VALUE);
-    tryToUpdateBatchInsertLastCache(insertTabletPlan, globalLatestFlushedTime);
 
     // check memtable size and may async try to flush the work memtable
     if (tsFileProcessor.shouldFlush()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 5b2234e..79618d9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -177,6 +177,15 @@ public class TsFileProcessor {
     }
   }
 
+  /**
+   * insert batch data of insertTabletPlan into the workingMemtable
+   * The rows to be inserted are in the range [start, end)
+   *
+   * @param insertTabletPlan insert a tablet of a device
+   * @param start start index of rows to be inserted in insertTabletPlan
+   * @param end end index of rows to be inserted in insertTabletPlan
+   * @param results result array
+   */
   public void insertTablet(InsertTabletPlan insertTabletPlan, int start, int end,
       TSStatus[] results) throws WriteProcessException {
 
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
index a26ac3b..4fcd813 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
@@ -425,33 +425,33 @@ public class InsertTabletPlan extends PhysicalPlan {
     switch (dataTypes[measurementIndex]) {
       case INT32:
         int[] intValues = (int[]) columns[measurementIndex];
-        value = new TsInt(intValues[end - 1]);
+        value = new TsInt(intValues[rowCount - 1]);
         break;
       case INT64:
         long[] longValues = (long[]) columns[measurementIndex];
-        value = new TsLong(longValues[end - 1]);
+        value = new TsLong(longValues[rowCount - 1]);
         break;
       case FLOAT:
         float[] floatValues = (float[]) columns[measurementIndex];
-        value = new TsFloat(floatValues[end - 1]);
+        value = new TsFloat(floatValues[rowCount - 1]);
         break;
       case DOUBLE:
         double[] doubleValues = (double[]) columns[measurementIndex];
-        value = new TsDouble(doubleValues[end - 1]);
+        value = new TsDouble(doubleValues[rowCount - 1]);
         break;
       case BOOLEAN:
         boolean[] boolValues = (boolean[]) columns[measurementIndex];
-        value = new TsBoolean(boolValues[end - 1]);
+        value = new TsBoolean(boolValues[rowCount - 1]);
         break;
       case TEXT:
         Binary[] binaryValues = (Binary[]) columns[measurementIndex];
-        value = new TsBinary(binaryValues[end - 1]);
+        value = new TsBinary(binaryValues[rowCount - 1]);
         break;
       default:
         throw new UnSupportedDataTypeException(
             String.format(DATATYPE_UNSUPPORTED, dataTypes[measurementIndex]));
     }
-    return new TimeValuePair(times[end - 1], value);
+    return new TimeValuePair(times[rowCount - 1], value);
   }
 
   public long[] getTimes() {
diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
index fcc03b9..820dc3c 100644
--- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
+++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionIT.java
@@ -34,6 +34,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.jdbc.Config;
 import org.apache.iotdb.rpc.BatchExecutionException;
@@ -351,6 +352,59 @@ public class IoTDBSessionIT {
     session.close();
   }
 
+  @Test
+  public void TestSessionInterfacesWithDisabledWAL()
+      throws StatementExecutionException, IoTDBConnectionException,
+          BatchExecutionException {
+    session = new Session("127.0.0.1", 6667, "root", "root");
+    try {
+      session.open();
+    } catch (IoTDBConnectionException e) {
+      e.printStackTrace();
+    }
+
+    session.setStorageGroup("root.sg1");
+    String deviceId = "root.sg1.d1";
+
+    boolean isEnableWAL = IoTDBDescriptor.getInstance().getConfig().isEnableWal();
+    IoTDBDescriptor.getInstance().getConfig().setEnableWal(false);
+    createTimeseries();
+
+    // test insert record
+    List<String> measurements = new ArrayList<>();
+    measurements.add("s1");
+    measurements.add("s2");
+    measurements.add("s3");
+    for (long time = 0; time < 100; time++) {
+      List<String> values = new ArrayList<>();
+      values.add("1");
+      values.add("2");
+      values.add("3");
+      session.insertRecord(deviceId, time, measurements, values);
+    }
+
+    // test insert tablet
+    List<MeasurementSchema> schemaList = new ArrayList<>();
+    schemaList.add(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE));
+    schemaList.add(new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE));
+    schemaList.add(new MeasurementSchema("s3", TSDataType.INT64, TSEncoding.RLE));
+
+    Tablet tablet = new Tablet(deviceId, schemaList, 100);
+    long[] timestamps = tablet.timestamps;
+    Object[] values = tablet.values;
+    for (int time = 1; time <= 100; time++) {
+      timestamps[time - 1] = time;
+      for (int i = 0; i < 3; i++) {
+        long[] sensor = (long[]) values[i];
+        sensor[time - 1] = i;
+      }
+      tablet.rowSize++;
+    }
+
+    session.insertTablet(tablet);
+    IoTDBDescriptor.getInstance().getConfig().setEnableWal(isEnableWAL);
+    session.close();
+  }
 
   private void createTimeseriesForTime()
       throws StatementExecutionException, IoTDBConnectionException {