You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/10/21 03:30:18 UTC
[incubator-iotdb] 01/01: fix that CachedPriorityMergeReader fails
to deduplicate the last element in the cache
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch fix_duplicated_overlap_merge
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 0df35ae8ac4966215ac2451111003b0628784451
Author: 江天 <jt...@163.com>
AuthorDate: Mon Oct 21 11:29:57 2019 +0800
fix that CachedPriorityMergeReader fails to deduplicate the last element in the cache
---
.../universal/CachedPriorityMergeReader.java | 32 +++--
.../reader/universal/PriorityMergeReader.java | 2 +-
.../{MergeTest.java => MergeOverLapTest.java} | 146 +++++++++------------
.../apache/iotdb/db/engine/merge/MergeTest.java | 4 +-
4 files changed, 81 insertions(+), 103 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/CachedPriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/CachedPriorityMergeReader.java
index e28d6f6..61aeea5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/CachedPriorityMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/CachedPriorityMergeReader.java
@@ -34,10 +34,10 @@ public class CachedPriorityMergeReader extends PriorityMergeReader {
private TimeValuePair[] timeValuePairCache = new TimeValuePair[CACHE_SIZE];
private int cacheLimit = 0;
private int cacheIdx = 0;
- private TSDataType dataType;
+
+ private Long lastTimestamp = null;
public CachedPriorityMergeReader(TSDataType dataType) {
- this.dataType = dataType;
for (int i = 0; i < CACHE_SIZE; i++) {
timeValuePairCache[i] = TimeValuePairUtils.getEmptyTimeValuePair(dataType);
}
@@ -53,22 +53,19 @@ public class CachedPriorityMergeReader extends PriorityMergeReader {
cacheIdx = 0;
while (!heap.isEmpty() && cacheLimit < CACHE_SIZE) {
Element top = heap.poll();
- if (cacheLimit == 0 || top.currTime() != timeValuePairCache[cacheLimit - 1].getTimestamp()) {
+ if (lastTimestamp == null || top.currTime() != lastTimestamp) {
TimeValuePairUtils.setTimeValuePair(top.timeValuePair, timeValuePairCache[cacheLimit++]);
- if (top.hasNext()) {
- top.next();
- heap.add(top);
- } else {
- top.close();
- }
- } else if (top.currTime() == timeValuePairCache[cacheLimit - 1].getTimestamp()) {
- if (top.hasNext()) {
- top.next();
- heap.add(top);
- } else {
- top.close();
+ lastTimestamp = top.currTime();
+ while (heap.peek() != null && heap.peek().currTime() == lastTimestamp) {
+ heap.poll();
}
}
+ if (top.hasNext()) {
+ top.next();
+ heap.add(top);
+ } else {
+ top.close();
+ }
}
}
@@ -85,11 +82,12 @@ public class CachedPriorityMergeReader extends PriorityMergeReader {
}
@Override
- public TimeValuePair current() {
+ public TimeValuePair current() throws IOException {
if (0 <= cacheIdx && cacheIdx < cacheLimit) {
return timeValuePairCache[cacheIdx];
} else {
- return heap.peek().timeValuePair;
+ fetch();
+ return timeValuePairCache[cacheIdx];
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
index c3d8096..1e1e5d1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
@@ -76,7 +76,7 @@ public class PriorityMergeReader implements IPointReader {
}
@Override
- public TimeValuePair current() {
+ public TimeValuePair current() throws IOException {
return heap.peek().timeValuePair;
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
similarity index 51%
copy from server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
copy to server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
index 4d7978b..fd07863 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
@@ -15,105 +15,56 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
+ *
*/
package org.apache.iotdb.db.engine.merge;
-import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR;
+import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.List;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.cache.DeviceMetaDataCache;
-import org.apache.iotdb.db.engine.cache.TsFileMetaDataCache;
-import org.apache.iotdb.db.engine.merge.manage.MergeManager;
+import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.merge.task.MergeTask;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.MetadataErrorException;
import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.query.control.FileReaderManager;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.reader.resourceRelated.SeqResourceIterateReader;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Before;
+import org.junit.Test;
-abstract class MergeTest {
-
- static final String MERGE_TEST_SG = "root.mergeTest";
-
- int seqFileNum = 5;
- int unseqFileNum = 5;
- int measurementNum = 10;
- int deviceNum = 10;
- long ptNum = 100;
- long flushInterval = 20;
- TSEncoding encoding = TSEncoding.PLAIN;
-
- String[] deviceIds;
- MeasurementSchema[] measurementSchemas;
-
- List<TsFileResource> seqResources = new ArrayList<>();
- List<TsFileResource> unseqResources = new ArrayList<>();
+public class MergeOverLapTest extends MergeTest {
- private int prevMergeChunkThreshold;
+ private File tempSGDir;
@Before
public void setUp() throws IOException, WriteProcessException, MetadataErrorException {
- MManager.getInstance().init();
- prevMergeChunkThreshold =
- IoTDBDescriptor.getInstance().getConfig().getChunkMergePointThreshold();
- IoTDBDescriptor.getInstance().getConfig().setChunkMergePointThreshold(-1);
- prepareSeries();
- prepareFiles(seqFileNum, unseqFileNum);
- MergeManager.getINSTANCE().start();
+ ptNum = 1000;
+ super.setUp();
+ tempSGDir = new File("tempSG");
+ tempSGDir.mkdirs();
}
@After
public void tearDown() throws IOException, StorageEngineException {
- removeFiles();
- seqResources.clear();
- unseqResources.clear();
- IoTDBDescriptor.getInstance().getConfig().setChunkMergePointThreshold(prevMergeChunkThreshold);
- TsFileMetaDataCache.getInstance().clear();
- DeviceMetaDataCache.getInstance().clear();
- MManager.getInstance().clear();
- EnvironmentUtils.cleanAllDir();
- MergeManager.getINSTANCE().stop();
- }
-
- private void prepareSeries() throws MetadataErrorException {
- measurementSchemas = new MeasurementSchema[measurementNum];
- for (int i = 0; i < measurementNum; i++) {
- measurementSchemas[i] = new MeasurementSchema("sensor" + i, TSDataType.DOUBLE,
- encoding, CompressionType.UNCOMPRESSED);
- }
- deviceIds = new String[deviceNum];
- for (int i = 0; i < deviceNum; i++) {
- deviceIds[i] = MERGE_TEST_SG + PATH_SEPARATOR + "device" + i;
- }
- MManager.getInstance().setStorageGroupToMTree(MERGE_TEST_SG);
- for (String device : deviceIds) {
- for (MeasurementSchema measurementSchema : measurementSchemas) {
- MManager.getInstance().addPathToMTree(
- device + PATH_SEPARATOR + measurementSchema.getMeasurementId(), measurementSchema
- .getType(), measurementSchema.getEncodingType(), measurementSchema.getCompressor(),
- Collections.emptyMap());
- }
- }
+ super.tearDown();
+ FileUtils.deleteDirectory(tempSGDir);
}
- private void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WriteProcessException {
+ @Override
+ void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WriteProcessException {
for (int i = 0; i < seqFileNum; i++) {
File file = SystemFileFactory.INSTANCE.getFile(i + "seq.tsfile");
TsFileResource tsFileResource = new TsFileResource(file);
@@ -132,18 +83,7 @@ abstract class MergeTest {
prepareFile(tsFileResource, 0, ptNum * unseqFileNum, 20000);
}
- private void removeFiles() throws IOException {
- for (TsFileResource tsFileResource : seqResources) {
- tsFileResource.remove();
- }
- for (TsFileResource tsFileResource : unseqResources) {
- tsFileResource.remove();
- }
- FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
- FileReaderManager.getInstance().stop();
- }
-
- private void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum,
+ void prepareUnseqFile(TsFileResource tsFileResource, long timeOffset, long ptNum,
long valueOffset)
throws IOException, WriteProcessException {
TsFileWriter fileWriter = new TsFileWriter(tsFileResource.getFile());
@@ -161,10 +101,50 @@ abstract class MergeTest {
tsFileResource.updateStartTime(deviceIds[j], i);
tsFileResource.updateEndTime(deviceIds[j], i);
}
+ // insert overlapping tuples
+ if ((i + 1) % 100 == 0) {
+ for (int j = 0; j < deviceNum; j++) {
+ TSRecord record = new TSRecord(i, deviceIds[j]);
+ for (int k = 0; k < measurementNum; k++) {
+ record.addTuple(DataPoint.getDataPoint(measurementSchemas[k].getType(),
+ measurementSchemas[k].getMeasurementId(), String.valueOf(i + valueOffset)));
+ }
+ fileWriter.write(record);
+ tsFileResource.updateStartTime(deviceIds[j], i);
+ tsFileResource.updateEndTime(deviceIds[j], i);
+ }
+ }
if ((i + 1) % flushInterval == 0) {
fileWriter.flushForTest();
}
}
fileWriter.close();
}
-}
\ No newline at end of file
+
+ @Test
+ public void testFullMerge() throws Exception {
+ MergeTask mergeTask =
+ new MergeTask(new MergeResource(seqResources, unseqResources), tempSGDir.getPath(), (k, v, l) -> {}, "test",
+ true, 1, MERGE_TEST_SG);
+ mergeTask.call();
+
+ QueryContext context = new QueryContext();
+ Path path = new Path(deviceIds[0], measurementSchemas[0].getMeasurementId());
+ SeqResourceIterateReader tsFilesReader = new SeqResourceIterateReader(path,
+ Collections.singletonList(seqResources.get(0)),
+ null, context);
+ int cnt = 0;
+ try {
+ while (tsFilesReader.hasNext()) {
+ BatchData batchData = tsFilesReader.nextBatch();
+ for (int i = 0; i < batchData.length(); i++) {
+ cnt ++;
+ assertEquals(batchData.getTimeByIndex(i) + 20000.0, batchData.getDoubleByIndex(i), 0.001);
+ }
+ }
+ assertEquals(1000, cnt);
+ } finally {
+ tsFilesReader.close();
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
index 4d7978b..bc3a7b6 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
@@ -113,7 +113,7 @@ abstract class MergeTest {
}
}
- private void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WriteProcessException {
+ void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WriteProcessException {
for (int i = 0; i < seqFileNum; i++) {
File file = SystemFileFactory.INSTANCE.getFile(i + "seq.tsfile");
TsFileResource tsFileResource = new TsFileResource(file);
@@ -143,7 +143,7 @@ abstract class MergeTest {
FileReaderManager.getInstance().stop();
}
- private void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum,
+ void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum,
long valueOffset)
throws IOException, WriteProcessException {
TsFileWriter fileWriter = new TsFileWriter(tsFileResource.getFile());