You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2019/08/22 12:06:10 UTC
[incubator-iotdb] 02/03: also fix replayInsert in LogReplayer;
modify unit tests
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit a2d6f42a8911a38cfb83cfb98ab2f22cbb0fb944
Author: RuiLei <ru...@gmail.com>
AuthorDate: Wed Aug 21 18:08:08 2019 +0800
also fix replayInsert in LogReplayer; modify unit tests
---
.../iotdb/db/writelog/recover/LogReplayer.java | 7 +++--
.../iotdb/db/writelog/recover/LogReplayerTest.java | 21 ++++++++------
.../db/writelog/recover/SeqTsFileRecoverTest.java | 32 +++++++++++++++++++++-
.../writelog/recover/UnseqTsFileRecoverTest.java | 26 ++++++++++++++++--
4 files changed, 73 insertions(+), 13 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
index b0e63eb..a9e82c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
@@ -24,10 +24,10 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -123,7 +123,10 @@ public class LogReplayer {
!acceptDuplication) {
return;
}
- tempStartTimeMap.putIfAbsent(insertPlan.getDeviceId(), insertPlan.getTime());
+ Long startTime = tempStartTimeMap.get(insertPlan.getDeviceId());
+ if (startTime == null || startTime > insertPlan.getTime()) {
+ tempStartTimeMap.put(insertPlan.getDeviceId(), insertPlan.getTime());
+ }
Long endTime = tempEndTimeMap.get(insertPlan.getDeviceId());
if (endTime == null || endTime < insertPlan.getTime()) {
tempEndTimeMap.put(insertPlan.getDeviceId(), insertPlan.getTime());
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
index 6fede66..503a824 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
@@ -27,13 +27,13 @@ import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
-import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
@@ -73,7 +73,8 @@ public class LogReplayerTest {
try {
for (int i = 0; i < 5; i++) {
- schema.registerMeasurement(new MeasurementSchema("sensor" + i, TSDataType.INT64, TSEncoding.PLAIN));
+ schema.registerMeasurement(
+ new MeasurementSchema("sensor" + i, TSDataType.INT64, TSEncoding.PLAIN));
}
LogReplayer replayer = new LogReplayer(logNodePrefix, tsFile.getPath(), modFile,
@@ -81,10 +82,12 @@ public class LogReplayerTest {
WriteLogNode node =
MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsFile.getName());
- for (int i = 0; i < 5; i++) {
+ node.write(new InsertPlan("device0", 100, "sensor0", String.valueOf(0)));
+ node.write(new InsertPlan("device0", 2, "sensor1", String.valueOf(0)));
+ for (int i = 1; i < 5; i++) {
node.write(new InsertPlan("device" + i, i, "sensor" + i, String.valueOf(i)));
}
- DeletePlan deletePlan = new DeletePlan(3, new Path("device0", "sensor0"));
+ DeletePlan deletePlan = new DeletePlan(200, new Path("device0", "sensor0"));
node.write(deletePlan);
node.close();
@@ -107,11 +110,13 @@ public class LogReplayerTest {
Modification[] mods = modFile.getModifications().toArray(new Modification[0]);
assertEquals(1, mods.length);
- assertEquals(new Deletion(new Path("device0", "sensor0"), 5, 3), mods[0]);
+ assertEquals(new Deletion(new Path("device0", "sensor0"), 5, 200), mods[0]);
- for (int i = 0; i < 5; i++) {
- assertEquals(i, (long)tsFileResource.getStartTimeMap().get("device" + i));
- assertEquals(i, (long)tsFileResource.getEndTimeMap().get("device" + i));
+ assertEquals(2, (long) tsFileResource.getStartTimeMap().get("device0"));
+ assertEquals(100, (long) tsFileResource.getEndTimeMap().get("device0"));
+ for (int i = 1; i < 5; i++) {
+ assertEquals(i, (long) tsFileResource.getStartTimeMap().get("device" + i));
+ assertEquals(i, (long) tsFileResource.getEndTimeMap().get("device" + i));
}
} finally {
modFile.close();
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
index d47214a..f39ed4f 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverTest.java
@@ -48,10 +48,12 @@ import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
import org.apache.iotdb.tsfile.write.schema.FileSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class SeqTsFileRecoverTest {
+
private File tsF;
private TsFileWriter writer;
private WriteLogNode node;
@@ -60,6 +62,7 @@ public class SeqTsFileRecoverTest {
private TsFileResource resource;
private VersionController versionController = new VersionController() {
private int i;
+
@Override
public long nextVersion() {
return ++i;
@@ -83,9 +86,16 @@ public class SeqTsFileRecoverTest {
}
writer = new TsFileWriter(tsF, schema);
+ TSRecord tsRecord = new TSRecord(100, "device99");
+ tsRecord.addTuple(DataPoint.getDataPoint(TSDataType.INT64, "sensor4", String.valueOf(0)));
+ writer.write(tsRecord);
+ tsRecord = new TSRecord(2, "device99");
+ tsRecord.addTuple(DataPoint.getDataPoint(TSDataType.INT64, "sensor1", String.valueOf(0)));
+ writer.write(tsRecord);
+
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
- TSRecord tsRecord = new TSRecord(i, "device" + j);
+ tsRecord = new TSRecord(i, "device" + j);
for (int k = 0; k < 10; k++) {
tsRecord.addTuple(DataPoint.getDataPoint(TSDataType.INT64, "sensor" + k,
String.valueOf(k)));
@@ -126,6 +136,13 @@ public class SeqTsFileRecoverTest {
versionController, resource, true);
performer.recover();
+ assertEquals(2, (long) resource.getStartTimeMap().get("device99"));
+ assertEquals(100, (long) resource.getEndTimeMap().get("device99"));
+ for (int i = 0; i < 10; i++) {
+ assertEquals(0, (long) resource.getStartTimeMap().get("device" + i));
+ assertEquals(19, (long) resource.getEndTimeMap().get("device" + i));
+ }
+
ReadOnlyTsFile readOnlyTsFile = new ReadOnlyTsFile(new TsFileSequenceReader(tsF.getPath()));
List<Path> pathList = new ArrayList<>();
for (int j = 0; j < 10; j++) {
@@ -144,6 +161,19 @@ public class SeqTsFileRecoverTest {
assertEquals(j % 10, fields.get(j).getLongV());
}
}
+
+ pathList = new ArrayList<>();
+ pathList.add(new Path("device99", "sensor1"));
+ pathList.add(new Path("device99", "sensor4"));
+ queryExpression = QueryExpression.create(pathList, null);
+ dataSet = readOnlyTsFile.query(queryExpression);
+ Assert.assertTrue(dataSet.hasNext());
+ RowRecord record = dataSet.next();
+ Assert.assertEquals("2\t0\tnull", record.toString());
+ Assert.assertTrue(dataSet.hasNext());
+ record = dataSet.next();
+ Assert.assertEquals("100\tnull\t0", record.toString());
+
readOnlyTsFile.close();
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
index 38c373d..3747d54 100644
--- a/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
@@ -56,6 +56,7 @@ import org.junit.Before;
import org.junit.Test;
public class UnseqTsFileRecoverTest {
+
private File tsF;
private TsFileWriter writer;
private WriteLogNode node;
@@ -64,6 +65,7 @@ public class UnseqTsFileRecoverTest {
private TsFileResource resource;
private VersionController versionController = new VersionController() {
private int i;
+
@Override
public long nextVersion() {
return ++i;
@@ -87,9 +89,16 @@ public class UnseqTsFileRecoverTest {
}
writer = new TsFileWriter(tsF, schema);
+ TSRecord tsRecord = new TSRecord(100, "device99");
+ tsRecord.addTuple(DataPoint.getDataPoint(TSDataType.INT64, "sensor4", String.valueOf(0)));
+ writer.write(tsRecord);
+ tsRecord = new TSRecord(2, "device99");
+ tsRecord.addTuple(DataPoint.getDataPoint(TSDataType.INT64, "sensor1", String.valueOf(0)));
+ writer.write(tsRecord);
+
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
- TSRecord tsRecord = new TSRecord(i, "device" + j);
+ tsRecord = new TSRecord(i, "device" + j);
for (int k = 0; k < 10; k++) {
tsRecord.addTuple(DataPoint.getDataPoint(TSDataType.INT64, "sensor" + k,
String.valueOf(k)));
@@ -97,6 +106,7 @@ public class UnseqTsFileRecoverTest {
writer.write(tsRecord);
}
}
+
writer.flushForTest();
writer.getIOWriter().close();
@@ -114,6 +124,12 @@ public class UnseqTsFileRecoverTest {
}
node.notifyStartFlush();
}
+ InsertPlan insertPlan = new InsertPlan("device99", 1, "sensor4", "4");
+ node.write(insertPlan);
+ insertPlan = new InsertPlan("device99", 300, "sensor2", "2");
+ node.write(insertPlan);
+ node.close();
+
resource = new TsFileResource(tsF);
}
@@ -129,6 +145,13 @@ public class UnseqTsFileRecoverTest {
versionController, resource, true);
performer.recover();
+ assertEquals(1, (long) resource.getStartTimeMap().get("device99"));
+ assertEquals(300, (long) resource.getEndTimeMap().get("device99"));
+ for (int i = 0; i < 10; i++) {
+ assertEquals(0, (long) resource.getStartTimeMap().get("device" + i));
+ assertEquals(9, (long) resource.getEndTimeMap().get("device" + i));
+ }
+
TsFileSequenceReader fileReader = new TsFileSequenceReader(tsF.getPath(), true);
MetadataQuerier metadataQuerier = new MetadataQuerierByFileImpl(fileReader);
ChunkLoader chunkLoader = new ChunkLoaderImpl(fileReader);
@@ -150,7 +173,6 @@ public class UnseqTsFileRecoverTest {
assertEquals(i, timeValuePair.getTimestamp());
assertEquals(11, timeValuePair.getValue().getLong());
unSeqMergeReader.next();
-
}
unSeqMergeReader.close();
fileReader.close();