You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/06/22 11:04:26 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: add UnseqRecoverTest

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new 7fc0d26  add UnseqRecoverTest
     new f08eaf5  Merge branch 'feature_async_close_tsfile' of github.com:apache/incubator-iotdb into feature_async_close_tsfile
7fc0d26 is described below

commit 7fc0d268e50b63be49cdc47e75684a3cc9fbeaee
Author: 江天 <jt...@163.com>
AuthorDate: Sat Jun 22 16:58:58 2019 +0800

    add UnseqRecoverTest
---
 .../writelog/recover/UnseqTsFileRecoverTest.java   | 177 +++++++++++++++++++++
 1 file changed, 177 insertions(+)

diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
new file mode 100644
index 0000000..2623e18
--- /dev/null
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.writelog.recover;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.engine.filenode.TsFileResource;
+import org.apache.iotdb.db.engine.querycontext.OverflowInsertFile;
+import org.apache.iotdb.db.engine.version.VersionController;
+import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
+import org.apache.iotdb.db.query.reader.sequence.SequenceDataReader;
+import org.apache.iotdb.db.query.reader.unsequence.EngineChunkReader;
+import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
+import org.apache.iotdb.db.writelog.node.WriteLogNode;
+import org.apache.iotdb.tsfile.common.constant.StatisticConstant;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.ReadOnlyTsFile;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.controller.ChunkLoader;
+import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
+import org.apache.iotdb.tsfile.read.controller.MetadataQuerier;
+import org.apache.iotdb.tsfile.read.controller.MetadataQuerierByFileImpl;
+import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+import org.apache.iotdb.tsfile.read.filter.DigestForFilter;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithFilter;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
+import org.apache.iotdb.tsfile.write.TsFileWriter;
+import org.apache.iotdb.tsfile.write.record.TSRecord;
+import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
+import org.apache.iotdb.tsfile.write.schema.FileSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.writer.NativeRestorableIOWriter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class UnseqTsFileRecoverTest {
+  private File tsF;
+  private TsFileWriter writer;
+  private WriteLogNode node;
+  private String logNodePrefix = "testNode";
+  private FileSchema schema;
+  private TsFileResource resource;
+  private VersionController versionController = new VersionController() {
+    private int i;
+    @Override
+    public long nextVersion() {
+      return ++i;
+    }
+
+    @Override
+    public long currVersion() {
+      return i;
+    }
+  };
+
+  @Before
+  public void setup() throws IOException, WriteProcessException {
+    tsF = new File("temp", "test.ts");
+    tsF.getParentFile().mkdirs();
+
+    schema = new FileSchema();
+    for (int i = 0; i < 10; i++) {
+      schema.registerMeasurement(new MeasurementSchema("sensor" + i, TSDataType.INT64,
+          TSEncoding.PLAIN));
+    }
+    writer = new TsFileWriter(tsF, schema);
+
+    for (int i = 0; i < 10; i++) {
+      for (int j = 0; j < 10; j++) {
+        TSRecord tsRecord = new TSRecord(i, "device" + j);
+        for (int k = 0; k < 10; k++) {
+          tsRecord.addTuple(DataPoint.getDataPoint(TSDataType.INT64, "sensor" + k,
+              String.valueOf(k)));
+        }
+        writer.write(tsRecord);
+      }
+    }
+    writer.flushForTest();
+    writer.getIOWriter().close();
+
+    node = MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsF.getName());
+    for (int i = 0; i < 10; i++) {
+      for (int j = 0; j < 10; j++) {
+        String[] measurements = new String[10];
+        String[] values = new String[10];
+        for (int k = 0; k < 10; k++) {
+          measurements[k] = "sensor" + k;
+          values[k] = String.valueOf(k + 10);
+        }
+        InsertPlan insertPlan = new InsertPlan("device" + j, i, measurements, values);
+        node.write(insertPlan);
+      }
+      node.notifyStartFlush();
+    }
+    resource = new TsFileResource(tsF, true);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    FileUtils.deleteDirectory(tsF.getParentFile());
+    node.delete();
+  }
+
+  @Test
+  public void test() throws ProcessorException, IOException {
+    UnSeqTsFileRecoverPerformer performer = new UnSeqTsFileRecoverPerformer(logNodePrefix, schema,
+        versionController, resource);
+    performer.recover();
+
+    TsFileSequenceReader fileReader = new TsFileSequenceReader(tsF.getPath(), true);
+    MetadataQuerier metadataQuerier = new MetadataQuerierByFileImpl(fileReader);
+    ChunkLoader chunkLoader = new ChunkLoaderImpl(fileReader);
+
+    Path path = new Path("device1", "sensor1");
+
+    OverflowInsertFile overflowInsertFile = new OverflowInsertFile(tsF.getPath(),
+        metadataQuerier.getChunkMetaDataList(path));
+
+    PriorityMergeReader unSeqMergeReader = new PriorityMergeReader();
+    int priorityValue = 1;
+
+    for (ChunkMetaData chunkMetaData : overflowInsertFile.getChunkMetaDataList()) {
+      Chunk chunk = chunkLoader.getChunk(chunkMetaData);
+      ChunkReader chunkReader = new ChunkReaderWithoutFilter(chunk);
+
+      unSeqMergeReader
+          .addReaderWithPriority(new EngineChunkReader(chunkReader, fileReader),
+              priorityValue);
+      priorityValue++;
+    }
+    for (int i = 0; i < 10; i++) {
+      TimeValuePair timeValuePair = unSeqMergeReader.current();
+      assertEquals(i, timeValuePair.getTimestamp());
+      assertEquals(11, timeValuePair.getValue().getLong());
+      unSeqMergeReader.next();
+
+    }
+    unSeqMergeReader.close();
+    fileReader.close();
+  }
+}