You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/06/21 13:16:31 UTC

[incubator-iotdb] 01/03: add test of unseqFile

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit aa6995434b10dbd3fc5bc642eca87523dd2310b1
Author: 江天 <jt...@163.com>
AuthorDate: Fri Jun 21 21:06:23 2019 +0800

    add test of unseqFile
---
 .../org/apache/iotdb/db/writelog/RecoverTest.java  | 123 ----------------
 .../writelog/recover/UnseqTsFileRecoverTest.java   | 162 +++++++++++++++++++++
 2 files changed, 162 insertions(+), 123 deletions(-)

diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/RecoverTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/RecoverTest.java
deleted file mode 100644
index 4b18bb2..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/RecoverTest.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.writelog;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.filenode.TsFileResource;
-import org.apache.iotdb.db.engine.version.SysTimeVersionController;
-import org.apache.iotdb.db.exception.PathErrorException;
-import org.apache.iotdb.db.exception.ProcessorException;
-import org.apache.iotdb.db.exception.RecoverException;
-import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.qp.physical.crud.UpdatePlan;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.db.writelog.node.ExclusiveWriteLogNode;
-import org.apache.iotdb.db.writelog.recover.SeqTsFileRecoverPerformer;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.write.schema.FileSchema;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class RecoverTest {
-
-  private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-
-  private boolean enableWal;
-
-  @Before
-  public void setUp() throws Exception {
-    enableWal = config.isEnableWal();
-    config.setEnableWal(true);
-    EnvironmentUtils.envSetUp();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    EnvironmentUtils.cleanEnv();
-    config.setEnableWal(enableWal);
-  }
-
-  @Test
-  public void testFullRecover() throws IOException, RecoverException, ProcessorException {
-    // this test insert a log file and try to recover from these logs as if no previous attempts exist.
-    File insertFile = new File("testTemp");
-    FileSchema schema = new FileSchema();
-    String deviceId = "root.testLogNode";
-    schema.registerMeasurement(new MeasurementSchema("s1", TSDataType.FLOAT, TSEncoding.PLAIN));
-    schema.registerMeasurement(new MeasurementSchema("s2", TSDataType.INT32, TSEncoding.PLAIN));
-    schema.registerMeasurement(new MeasurementSchema("s3", TSDataType.TEXT, TSEncoding.PLAIN));
-    schema.registerMeasurement(new MeasurementSchema("s4", TSDataType.BOOLEAN, TSEncoding.PLAIN));
-
-    TsFileIOWriter writer = new TsFileIOWriter(insertFile);
-    writer.endFile(schema);
-    TsFileResource tsFileResource = new TsFileResource(insertFile, true);
-    try {
-      MManager.getInstance().setStorageLevelToMTree(deviceId);
-    } catch (PathErrorException ignored) {
-    }
-    ExclusiveWriteLogNode logNode = new ExclusiveWriteLogNode(deviceId);
-
-    try {
-      InsertPlan bwInsertPlan = new InsertPlan(1, deviceId, 100,
-          new String[]{"s1", "s2", "s3", "s4"},
-          new String[]{"1.0", "15", "str", "false"});
-      UpdatePlan updatePlan = new UpdatePlan(0, 100, "2.0", new Path("root.logTestDevice.s1"));
-      DeletePlan deletePlan = new DeletePlan(50, new Path("root.logTestDevice.s1"));
-
-      List<PhysicalPlan> plansToCheck = new ArrayList<>();
-      plansToCheck.add(bwInsertPlan);
-      plansToCheck.add(updatePlan);
-      plansToCheck.add(deletePlan);
-
-      logNode.write(bwInsertPlan);
-      logNode.write(updatePlan);
-      logNode.notifyStartFlush();
-      logNode.write(deletePlan);
-      logNode.forceSync();
-
-      SeqTsFileRecoverPerformer performer = new SeqTsFileRecoverPerformer(deviceId, schema,
-          SysTimeVersionController.INSTANCE, tsFileResource);
-      // used to check if logs are replayed in order
-      performer.recover();
-
-      // the log diretory should be empty now
-      File logDir = new File(logNode.getLogDirectory());
-      File[] files = logDir.listFiles();
-      assertTrue(files == null || files.length == 0);
-    } finally {
-      logNode.delete();
-      insertFile.delete();
-    }
-  }
-}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
new file mode 100644
index 0000000..eafa5e1
--- /dev/null
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.writelog.recover;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iotdb.db.engine.memtable.IMemTable;
+import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
+import org.apache.iotdb.db.engine.overflow.io.OverflowResource;
+import org.apache.iotdb.db.engine.version.VersionController;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
+import org.apache.iotdb.db.query.reader.unsequence.EngineChunkReader;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
+import org.apache.iotdb.db.writelog.node.WriteLogNode;
+import org.apache.iotdb.tsfile.common.constant.StatisticConstant;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
+import org.apache.iotdb.tsfile.read.filter.DigestForFilter;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.schema.FileSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class UnseqTsFileRecoverTest {
+
+  private FileSchema fileSchema;
+  private OverflowResource resource;
+  private VersionController versionController = new VersionController() {
+    int i;
+    @Override
+    public long nextVersion() {
+      return ++i;
+    }
+
+    @Override
+    public long currVersion() {
+      return i;
+    }
+  };
+  private String processorName = "test";
+  private String parentPath = "tempOf";
+  private String dataPath = "0";
+  private WriteLogNode node;
+
+  @Before
+  public void setup() throws IOException {
+    fileSchema = new FileSchema();
+    for (int i = 0; i < 10; i++) {
+      fileSchema.registerMeasurement(new MeasurementSchema("sensor" + i, TSDataType.INT64, TSEncoding.PLAIN));
+    }
+    resource = new OverflowResource(parentPath, dataPath, versionController, processorName);
+    IMemTable memTable = new PrimitiveMemTable();
+    for (int i = 0; i < 10; i++) {
+      for (int j = 0; j < 10; j++) {
+        TSRecord tsRecord = new TSRecord(i, "device" + j);
+        for (int k = 0; k < 10; k++) {
+          tsRecord.addTuple(DataPoint.getDataPoint(TSDataType.INT64, "sensor" + k,
+              String.valueOf(k)));
+        }
+        memTable.insert(tsRecord);
+      }
+    }
+    resource.flush(fileSchema, memTable, processorName, 0, (a,b) -> {});
+    node =
+        MultiFileLogNodeManager.getInstance().getNode(resource.logNodePrefix() + resource.getInsertFile().getName());
+    for (int i = 10; i < 20; i++) {
+      for (int j = 0; j < 10; j++) {
+        String[] measurements = new String[10];
+        String[] values = new String[10];
+        for (int k = 0; k < 10; k++) {
+          measurements[k] = "sensor" + k;
+          values[k] = String.valueOf(k);
+        }
+        InsertPlan insertPlan = new InsertPlan("device" + j, i, measurements, values);
+        node.write(insertPlan);
+      }
+      node.notifyStartFlush();
+    }
+  }
+
+  @After
+  public void teardown() throws IOException {
+    resource.close();
+    resource.deleteResource();
+    node.delete();
+  }
+
+  @Test
+  public void test() throws ProcessorException, IOException {
+    UnseqTsFileRecoverPerformer performer = new UnseqTsFileRecoverPerformer(resource, fileSchema);
+    performer.recover();
+
+    TsFileSequenceReader sequenceReader = new TsFileSequenceReader(resource.getInsertFilePath());
+    ChunkLoaderImpl chunkLoader = new ChunkLoaderImpl(sequenceReader);
+
+    String deviceId = "device1";
+    String measurementId = "sensor1";
+    QueryContext context = new QueryContext();
+    List<ChunkMetaData> chunkMetaDataList = resource.getInsertMetadatas(deviceId, measurementId,
+        TSDataType.INT64,
+        context);
+
+    int priorityValue = 0;
+    PriorityMergeReader unSeqMergeReader = new PriorityMergeReader();
+    for (ChunkMetaData chunkMetaData : chunkMetaDataList) {
+
+      DigestForFilter digest = new DigestForFilter(chunkMetaData.getStartTime(),
+          chunkMetaData.getEndTime(),
+          chunkMetaData.getDigest().getStatistics().get(StatisticConstant.MIN_VALUE),
+          chunkMetaData.getDigest().getStatistics().get(StatisticConstant.MAX_VALUE),
+          chunkMetaData.getTsDataType());
+
+
+      Chunk chunk = chunkLoader.getChunk(chunkMetaData);
+      ChunkReader chunkReader = new ChunkReaderWithoutFilter(chunk);
+
+      unSeqMergeReader
+          .addReaderWithPriority(new EngineChunkReader(chunkReader, sequenceReader),
+              priorityValue ++);
+    }
+    for (int i = 0; i < 20; i++) {
+      TimeValuePair timeValuePair = unSeqMergeReader.current();
+      assertEquals(i, timeValuePair.getTimestamp());
+      assertEquals(1, timeValuePair.getValue().getLong());
+      unSeqMergeReader.next();
+    }
+    assertFalse(unSeqMergeReader.hasNext());
+  }
+}