You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/06/21 13:16:31 UTC
[incubator-iotdb] 01/03: add test of unseqFile
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit aa6995434b10dbd3fc5bc642eca87523dd2310b1
Author: 江天 <jt...@163.com>
AuthorDate: Fri Jun 21 21:06:23 2019 +0800
add test of unseqFile
---
.../org/apache/iotdb/db/writelog/RecoverTest.java | 123 ----------------
.../writelog/recover/UnseqTsFileRecoverTest.java | 162 +++++++++++++++++++++
2 files changed, 162 insertions(+), 123 deletions(-)
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/RecoverTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/RecoverTest.java
deleted file mode 100644
index 4b18bb2..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/RecoverTest.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.writelog;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
-import org.apache.iotdb.db.engine.version.SysTimeVersionController;
-import org.apache.iotdb.db.exception.PathErrorException;
-import org.apache.iotdb.db.exception.ProcessorException;
-import org.apache.iotdb.db.exception.RecoverException;
-import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.db.writelog.node.ExclusiveWriteLogNode;
-import org.apache.iotdb.db.writelog.recover.SeqTsFileRecoverPerformer;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.write.schema.FileSchema;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class RecoverTest {
-
- private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-
- private boolean enableWal;
-
- @Before
- public void setUp() throws Exception {
- enableWal = config.isEnableWal();
- config.setEnableWal(true);
- EnvironmentUtils.envSetUp();
- }
-
- @After
- public void tearDown() throws Exception {
- EnvironmentUtils.cleanEnv();
- config.setEnableWal(enableWal);
- }
-
- @Test
- public void testFullRecover() throws IOException, RecoverException, ProcessorException {
- // this test insert a log file and try to recover from these logs as if no previous attempts exist.
- File insertFile = new File("testTemp");
- FileSchema schema = new FileSchema();
- String deviceId = "root.testLogNode";
- schema.registerMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.PLAIN));
- schema.registerMeasurement(new MeasurementSchema("s2", TSDataType.INT32, TSEncoding.PLAIN));
- schema.registerMeasurement(new MeasurementSchema("s3", TSDataType.TEXT, TSEncoding.PLAIN));
- schema.registerMeasurement(new MeasurementSchema("s4", TSDataType.BOOLEAN, TSEncoding.PLAIN));
-
- TsFileIOWriter writer = new TsFileIOWriter(insertFile);
- writer.endFile(schema);
- TsFileResource tsFileResource = new TsFileResource(insertFile, true);
- try {
- MManager.getInstance().setStorageLevelToMTree(deviceId);
- } catch (PathErrorException ignored) {
- }
- ExclusiveWriteLogNode logNode = new ExclusiveWriteLogNode(deviceId);
-
- try {
- InsertPlan bwInsertPlan = new InsertPlan(1, deviceId, 100,
- new String[]{"s1", "s2", "s3", "s4"},
- new String[]{"1.0", "15", "str", "false"});
- UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0", new Path("root.logTestDevice.s1"));
- DeletePlan deletePlan = new DeletePlan(50, new Path("root.logTestDevice.s1"));
-
- List<PhysicalPlan> plansToCheck = new ArrayList<>();
- plansToCheck.add(bwInsertPlan);
- plansToCheck.add(updatePlan);
- plansToCheck.add(deletePlan);
-
- logNode.write(bwInsertPlan);
- logNode.write(updatePlan);
- logNode.notifyStartFlush();
- logNode.write(deletePlan);
- logNode.forceSync();
-
- SeqTsFileRecoverPerformer performer = new SeqTsFileRecoverPerformer(deviceId, schema,
- SysTimeVersionController.INSTANCE, tsFileResource);
- // used to check if logs are replayed in order
- performer.recover();
-
- // the log diretory should be empty now
- File logDir = new File(logNode.getLogDirectory());
- File[] files = logDir.listFiles();
- assertTrue(files == null || files.length == 0);
- } finally {
- logNode.delete();
- insertFile.delete();
- }
- }
-}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
new file mode 100644
index 0000000..eafa5e1
--- /dev/null
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.writelog.recover;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
+import org.apache.iotdb.db.engine.overflow.io.OverflowResource;
+import org.apache.iotdb.db.engine.version.VersionController;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
+import org.apache.iotdb.db.query.reader.unsequence.EngineChunkReader;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
+import org.apache.iotdb.db.writelog.node.WriteLogNode;
+import org.apache.iotdb.tsfile.common.constant.StatisticConstant;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
+import org.apache.iotdb.tsfile.read.filter.DigestForFilter;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.schema.FileSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class UnseqTsFileRecoverTest {
+
+ private FileSchema fileSchema;
+ private OverflowResource resource;
+ private VersionController versionController = new VersionController() {
+ int i;
+ @Override
+ public long nextVersion() {
+ return ++i;
+ }
+
+ @Override
+ public long currVersion() {
+ return i;
+ }
+ };
+ private String processorName = "test";
+ private String parentPath = "tempOf";
+ private String dataPath = "0";
+ private WriteLogNode node;
+
+ @Before
+ public void setup() throws IOException {
+ fileSchema = new FileSchema();
+ for (int i = 0; i < 10; i++) {
+ fileSchema.registerMeasurement(new MeasurementSchema("sensor" + i, TSDataType.INT64, TSEncoding.PLAIN));
+ }
+ resource = new OverflowResource(parentPath, dataPath, versionController, processorName);
+ IMemTable memTable = new PrimitiveMemTable();
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 10; j++) {
+ TSRecord tsRecord = new TSRecord(i, "device" + j);
+ for (int k = 0; k < 10; k++) {
+ tsRecord.addTuple(DataPoint.getDataPoint(TSDataType.INT64, "sensor" + k,
+ String.valueOf(k)));
+ }
+ memTable.insert(tsRecord);
+ }
+ }
+ resource.flush(fileSchema, memTable, processorName, 0, (a,b) -> {});
+ node =
+ MultiFileLogNodeManager.getInstance().getNode(resource.logNodePrefix() + resource.getInsertFile().getName());
+ for (int i = 10; i < 20; i++) {
+ for (int j = 0; j < 10; j++) {
+ String[] measurements = new String[10];
+ String[] values = new String[10];
+ for (int k = 0; k < 10; k++) {
+ measurements[k] = "sensor" + k;
+ values[k] = String.valueOf(k);
+ }
+ InsertPlan insertPlan = new InsertPlan("device" + j, i, measurements, values);
+ node.write(insertPlan);
+ }
+ node.notifyStartFlush();
+ }
+ }
+
+ @After
+ public void teardown() throws IOException {
+ resource.close();
+ resource.deleteResource();
+ node.delete();
+ }
+
+ @Test
+ public void test() throws ProcessorException, IOException {
+ UnseqTsFileRecoverPerformer performer = new UnseqTsFileRecoverPerformer(resource, fileSchema);
+ performer.recover();
+
+ TsFileSequenceReader sequenceReader = new TsFileSequenceReader(resource.getInsertFilePath());
+ ChunkLoaderImpl chunkLoader = new ChunkLoaderImpl(sequenceReader);
+
+ String deviceId = "device1";
+ String measurementId = "sensor1";
+ QueryContext context = new QueryContext();
+ List<ChunkMetaData> chunkMetaDataList = resource.getInsertMetadatas(deviceId, measurementId,
+ TSDataType.INT64,
+ context);
+
+ int priorityValue = 0;
+ PriorityMergeReader unSeqMergeReader = new PriorityMergeReader();
+ for (ChunkMetaData chunkMetaData : chunkMetaDataList) {
+
+ DigestForFilter digest = new DigestForFilter(chunkMetaData.getStartTime(),
+ chunkMetaData.getEndTime(),
+ chunkMetaData.getDigest().getStatistics().get(StatisticConstant.MIN_VALUE),
+ chunkMetaData.getDigest().getStatistics().get(StatisticConstant.MAX_VALUE),
+ chunkMetaData.getTsDataType());
+
+
+ Chunk chunk = chunkLoader.getChunk(chunkMetaData);
+ ChunkReader chunkReader = new ChunkReaderWithoutFilter(chunk);
+
+ unSeqMergeReader
+ .addReaderWithPriority(new EngineChunkReader(chunkReader, sequenceReader),
+ priorityValue ++);
+ }
+ for (int i = 0; i < 20; i++) {
+ TimeValuePair timeValuePair = unSeqMergeReader.current();
+ assertEquals(i, timeValuePair.getTimestamp());
+ assertEquals(1, timeValuePair.getValue().getLong());
+ unSeqMergeReader.next();
+ }
+ assertFalse(unSeqMergeReader.hasNext());
+ }
+}