You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2019/08/22 12:06:10 UTC

[incubator-iotdb] 02/03: also fix replayInsert in LogReplayer; modify unit tests

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit a2d6f42a8911a38cfb83cfb98ab2f22cbb0fb944
Author: RuiLei <ru...@gmail.com>
AuthorDate: Wed Aug 21 18:08:08 2019 +0800

    also fix replayInsert in LogReplayer; modify unit tests
---
 .../iotdb/db/writelog/recover/LogReplayer.java     |  7 +++--
 .../iotdb/db/writelog/recover/LogReplayerTest.java | 21 ++++++++------
 .../db/writelog/recover/SeqTsFileRecoverTest.java  | 32 +++++++++++++++++++++-
 .../writelog/recover/UnseqTsFileRecoverTest.java   | 26 ++++++++++++++++--
 4 files changed, 73 insertions(+), 13 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
index b0e63eb..a9e82c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
@@ -24,10 +24,10 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -123,7 +123,10 @@ public class LogReplayer {
           !acceptDuplication) {
         return;
       }
-      tempStartTimeMap.putIfAbsent(insertPlan.getDeviceId(), insertPlan.getTime());
+      Long startTime = tempStartTimeMap.get(insertPlan.getDeviceId());
+      if (startTime == null || startTime > insertPlan.getTime()) {
+        tempStartTimeMap.put(insertPlan.getDeviceId(), insertPlan.getTime());
+      }
       Long endTime = tempEndTimeMap.get(insertPlan.getDeviceId());
       if (endTime == null || endTime < insertPlan.getTime()) {
         tempEndTimeMap.put(insertPlan.getDeviceId(), insertPlan.getTime());
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
index 6fede66..503a824 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
@@ -27,13 +27,13 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Iterator;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.memtable.IMemTable;
 import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
@@ -73,7 +73,8 @@ public class LogReplayerTest {
 
     try {
       for (int i = 0; i < 5; i++) {
-        schema.registerMeasurement(new MeasurementSchema("sensor" + i, TSDataType.INT64, TSEncoding.PLAIN));
+        schema.registerMeasurement(
+            new MeasurementSchema("sensor" + i, TSDataType.INT64, TSEncoding.PLAIN));
       }
 
       LogReplayer replayer = new LogReplayer(logNodePrefix, tsFile.getPath(), modFile,
@@ -81,10 +82,12 @@ public class LogReplayerTest {
 
       WriteLogNode node =
           MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsFile.getName());
-      for (int i = 0; i < 5; i++) {
+      node.write(new InsertPlan("device0", 100, "sensor0", String.valueOf(0)));
+      node.write(new InsertPlan("device0", 2, "sensor1", String.valueOf(0)));
+      for (int i = 1; i < 5; i++) {
         node.write(new InsertPlan("device" + i, i, "sensor" + i, String.valueOf(i)));
       }
-      DeletePlan deletePlan = new DeletePlan(3, new Path("device0", "sensor0"));
+      DeletePlan deletePlan = new DeletePlan(200, new Path("device0", "sensor0"));
       node.write(deletePlan);
       node.close();
 
@@ -107,11 +110,13 @@ public class LogReplayerTest {
 
       Modification[] mods = modFile.getModifications().toArray(new Modification[0]);
       assertEquals(1, mods.length);
-      assertEquals(new Deletion(new Path("device0", "sensor0"), 5, 3), mods[0]);
+      assertEquals(new Deletion(new Path("device0", "sensor0"), 5, 200), mods[0]);
 
-      for (int i = 0; i < 5; i++) {
-        assertEquals(i, (long)tsFileResource.getStartTimeMap().get("device" + i));
-        assertEquals(i, (long)tsFileResource.getEndTimeMap().get("device" + i));
+      assertEquals(2, (long) tsFileResource.getStartTimeMap().get("device0"));
+      assertEquals(100, (long) tsFileResource.getEndTimeMap().get("device0"));
+      for (int i = 1; i < 5; i++) {
+        assertEquals(i, (long) tsFileResource.getStartTimeMap().get("device" + i));
+        assertEquals(i, (long) tsFileResource.getEndTimeMap().get("device" + i));
       }
     } finally {
       modFile.close();
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
index d47214a..f39ed4f 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
@@ -48,10 +48,12 @@ import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
 import org.apache.iotdb.tsfile.write.schema.FileSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 public class SeqTsFileRecoverTest {
+
   private File tsF;
   private TsFileWriter writer;
   private WriteLogNode node;
@@ -60,6 +62,7 @@ public class SeqTsFileRecoverTest {
   private TsFileResource resource;
   private VersionController versionController = new VersionController() {
     private int i;
+
     @Override
     public long nextVersion() {
       return ++i;
@@ -83,9 +86,16 @@ public class SeqTsFileRecoverTest {
     }
     writer = new TsFileWriter(tsF, schema);
 
+    TSRecord tsRecord = new TSRecord(100, "device99");
+    tsRecord.addTuple(DataPoint.getDataPoint(TSDataType.INT64, "sensor4", String.valueOf(0)));
+    writer.write(tsRecord);
+    tsRecord = new TSRecord(2, "device99");
+    tsRecord.addTuple(DataPoint.getDataPoint(TSDataType.INT64, "sensor1", String.valueOf(0)));
+    writer.write(tsRecord);
+
     for (int i = 0; i < 10; i++) {
       for (int j = 0; j < 10; j++) {
-        TSRecord tsRecord = new TSRecord(i, "device" + j);
+        tsRecord = new TSRecord(i, "device" + j);
         for (int k = 0; k < 10; k++) {
           tsRecord.addTuple(DataPoint.getDataPoint(TSDataType.INT64, "sensor" + k,
               String.valueOf(k)));
@@ -126,6 +136,13 @@ public class SeqTsFileRecoverTest {
         versionController, resource, true);
     performer.recover();
 
+    assertEquals(2, (long) resource.getStartTimeMap().get("device99"));
+    assertEquals(100, (long) resource.getEndTimeMap().get("device99"));
+    for (int i = 0; i < 10; i++) {
+      assertEquals(0, (long) resource.getStartTimeMap().get("device" + i));
+      assertEquals(19, (long) resource.getEndTimeMap().get("device" + i));
+    }
+
     ReadOnlyTsFile readOnlyTsFile = new ReadOnlyTsFile(new TsFileSequenceReader(tsF.getPath()));
     List<Path> pathList = new ArrayList<>();
     for (int j = 0; j < 10; j++) {
@@ -144,6 +161,19 @@ public class SeqTsFileRecoverTest {
         assertEquals(j % 10, fields.get(j).getLongV());
       }
     }
+
+    pathList = new ArrayList<>();
+    pathList.add(new Path("device99", "sensor1"));
+    pathList.add(new Path("device99", "sensor4"));
+    queryExpression = QueryExpression.create(pathList, null);
+    dataSet = readOnlyTsFile.query(queryExpression);
+    Assert.assertTrue(dataSet.hasNext());
+    RowRecord record = dataSet.next();
+    Assert.assertEquals("2\t0\tnull", record.toString());
+    Assert.assertTrue(dataSet.hasNext());
+    record = dataSet.next();
+    Assert.assertEquals("100\tnull\t0", record.toString());
+
     readOnlyTsFile.close();
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
index 38c373d..3747d54 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
@@ -56,6 +56,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 public class UnseqTsFileRecoverTest {
+
   private File tsF;
   private TsFileWriter writer;
   private WriteLogNode node;
@@ -64,6 +65,7 @@ public class UnseqTsFileRecoverTest {
   private TsFileResource resource;
   private VersionController versionController = new VersionController() {
     private int i;
+
     @Override
     public long nextVersion() {
       return ++i;
@@ -87,9 +89,16 @@ public class UnseqTsFileRecoverTest {
     }
     writer = new TsFileWriter(tsF, schema);
 
+    TSRecord tsRecord = new TSRecord(100, "device99");
+    tsRecord.addTuple(DataPoint.getDataPoint(TSDataType.INT64, "sensor4", String.valueOf(0)));
+    writer.write(tsRecord);
+    tsRecord = new TSRecord(2, "device99");
+    tsRecord.addTuple(DataPoint.getDataPoint(TSDataType.INT64, "sensor1", String.valueOf(0)));
+    writer.write(tsRecord);
+
     for (int i = 0; i < 10; i++) {
       for (int j = 0; j < 10; j++) {
-        TSRecord tsRecord = new TSRecord(i, "device" + j);
+        tsRecord = new TSRecord(i, "device" + j);
         for (int k = 0; k < 10; k++) {
           tsRecord.addTuple(DataPoint.getDataPoint(TSDataType.INT64, "sensor" + k,
               String.valueOf(k)));
@@ -97,6 +106,7 @@ public class UnseqTsFileRecoverTest {
         writer.write(tsRecord);
       }
     }
+
     writer.flushForTest();
     writer.getIOWriter().close();
 
@@ -114,6 +124,12 @@ public class UnseqTsFileRecoverTest {
       }
       node.notifyStartFlush();
     }
+    InsertPlan insertPlan = new InsertPlan("device99", 1, "sensor4", "4");
+    node.write(insertPlan);
+    insertPlan = new InsertPlan("device99", 300, "sensor2", "2");
+    node.write(insertPlan);
+    node.close();
+
     resource = new TsFileResource(tsF);
   }
 
@@ -129,6 +145,13 @@ public class UnseqTsFileRecoverTest {
         versionController, resource, true);
     performer.recover();
 
+    assertEquals(1, (long) resource.getStartTimeMap().get("device99"));
+    assertEquals(300, (long) resource.getEndTimeMap().get("device99"));
+    for (int i = 0; i < 10; i++) {
+      assertEquals(0, (long) resource.getStartTimeMap().get("device" + i));
+      assertEquals(9, (long) resource.getEndTimeMap().get("device" + i));
+    }
+
     TsFileSequenceReader fileReader = new TsFileSequenceReader(tsF.getPath(), true);
     MetadataQuerier metadataQuerier = new MetadataQuerierByFileImpl(fileReader);
     ChunkLoader chunkLoader = new ChunkLoaderImpl(fileReader);
@@ -150,7 +173,6 @@ public class UnseqTsFileRecoverTest {
       assertEquals(i, timeValuePair.getTimestamp());
       assertEquals(11, timeValuePair.getValue().getLong());
       unSeqMergeReader.next();
-
     }
     unSeqMergeReader.close();
     fileReader.close();