You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/06/22 08:34:27 UTC

[incubator-iotdb] branch feature_async_close_tsfile updated: add unseq recover performer

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
     new 4884a76  add unseq recover performer
     new 62f1cd7  Merge branch 'feature_async_close_tsfile' of github.com:apache/incubator-iotdb into feature_async_close_tsfile
4884a76 is described below

commit 4884a7611df575d31b84f8c6676a9404f160d2ac
Author: 江天 <jt...@163.com>
AuthorDate: Sat Jun 22 16:31:55 2019 +0800

    add unseq recover performer
---
 .../db/engine/overflow/io/OverflowProcessor.java   |   7 +-
 .../iotdb/db/writelog/recover/LogReplayer.java     |  11 +-
 .../recover/SeqTsFileRecoverPerformer.java         |   2 +-
 ...ormer.java => UnSeqTsFileRecoverPerformer.java} |  84 ++---------
 .../recover/UnseqTsFileRecoverPerformer.java       |  75 ----------
 .../iotdb/db/writelog/recover/LogReplayerTest.java |   2 +-
 .../writelog/recover/UnseqTsFileRecoverTest.java   | 153 ---------------------
 7 files changed, 25 insertions(+), 309 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index 9bc42d0..ebbcfc4 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -63,7 +63,6 @@ import org.apache.iotdb.db.query.control.FileReaderManager;
 import org.apache.iotdb.db.utils.ImmediateFuture;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.writelog.node.WriteLogNode;
-import org.apache.iotdb.db.writelog.recover.UnseqTsFileRecoverPerformer;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -192,9 +191,9 @@ public class OverflowProcessor extends Processor {
       throw new ProcessorException(e);
     }
 
-    UnseqTsFileRecoverPerformer recoverPerformer =
-        new UnseqTsFileRecoverPerformer(workResource, fileSchema);
-    recoverPerformer.recover();
+//    UnseqTsFileRecoverPerformer recoverPerformer =
+//        new UnseqTsFileRecoverPerformer(workResource, fileSchema);
+//    recoverPerformer.recover();
   }
 
   private String[] clearFile(String[] subFilePaths) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
index 75e65c2..960b78d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
@@ -56,11 +56,14 @@ public class LogReplayer {
   private FileSchema fileSchema;
   private IMemTable recoverMemTable;
 
+  // overflow file tolerates duplicated data
+  private boolean acceptDuplication;
+
   public LogReplayer(String logNodePrefix, String insertFilePath,
       ModificationFile modFile,
       VersionController versionController,
       TsFileResource currentTsFileResource,
-      FileSchema fileSchema, IMemTable memTable) {
+      FileSchema fileSchema, IMemTable memTable, boolean acceptDuplication) {
     this.logNodePrefix = logNodePrefix;
     this.insertFilePath = insertFilePath;
     this.modFile = modFile;
@@ -68,6 +71,7 @@ public class LogReplayer {
     this.currentTsFileResource = currentTsFileResource;
     this.fileSchema = fileSchema;
     this.recoverMemTable = memTable;
+    this.acceptDuplication = acceptDuplication;
   }
 
   /**
@@ -112,8 +116,9 @@ public class LogReplayer {
   private void replayInsert(InsertPlan insertPlan) {
     TSRecord tsRecord = new TSRecord(insertPlan.getTime(), insertPlan.getDeviceId());
     if (currentTsFileResource != null) {
-      // the last chunk group may contain the same data with the logs, ignore such logs
-      if (currentTsFileResource.getEndTime(insertPlan.getDeviceId()) >= insertPlan.getTime()) {
+      // the last chunk group may contain the same data with the logs, ignore such logs in seq file
+      if (currentTsFileResource.getEndTime(insertPlan.getDeviceId()) >= insertPlan.getTime() &&
+      !acceptDuplication) {
         return;
       }
       currentTsFileResource.updateTime(insertPlan.getDeviceId(), insertPlan.getTime());
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java
index 339a39a..8c6d324 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java
@@ -69,7 +69,7 @@ public class SeqTsFileRecoverPerformer {
     IMemTable recoverMemTable = new PrimitiveMemTable();
     this.logReplayer = new LogReplayer(logNodePrefix, insertFilePath, tsFileResource.getModFile(),
         versionController,
-        tsFileResource, fileSchema, recoverMemTable);
+        tsFileResource, fileSchema, recoverMemTable, false);
     File insertFile = new File(insertFilePath);
     if (!insertFile.exists()) {
       return;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/UnSeqTsFileRecoverPerformer.java
similarity index 59%
copy from iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java
copy to iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/UnSeqTsFileRecoverPerformer.java
index 339a39a..756f0a7 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/UnSeqTsFileRecoverPerformer.java
@@ -39,7 +39,7 @@ import org.apache.iotdb.tsfile.write.writer.NativeRestorableIOWriter;
  * SeqTsFileRecoverPerformer recovers a SeqTsFile to correct status, redoes the WALs since last
  * crash and removes the redone logs.
  */
-public class SeqTsFileRecoverPerformer {
+public class UnSeqTsFileRecoverPerformer {
 
   private String insertFilePath;
   private String logNodePrefix;
@@ -48,7 +48,7 @@ public class SeqTsFileRecoverPerformer {
   private LogReplayer logReplayer;
   private TsFileResource tsFileResource;
 
-  public SeqTsFileRecoverPerformer(String logNodePrefix,
+  public UnSeqTsFileRecoverPerformer(String logNodePrefix,
       FileSchema fileSchema, VersionController versionController,
       TsFileResource currentTsFileResource) {
     this.insertFilePath = currentTsFileResource.getFilePath();
@@ -59,26 +59,28 @@ public class SeqTsFileRecoverPerformer {
   }
 
   /**
-   * 1. recover the TsFile by NativeRestorableIOWriter and truncate position of last recovery
-   * 2. redo the WALs to recover unpersisted data
-   * 3. flush and close the file
-   * 4. clean WALs
+   * 1. redo the WALs to recover unpersisted data
+   * 2. flush and close the file
+   * 3. clean WALs
    * @throws ProcessorException
    */
   public void recover() throws ProcessorException {
     IMemTable recoverMemTable = new PrimitiveMemTable();
     this.logReplayer = new LogReplayer(logNodePrefix, insertFilePath, tsFileResource.getModFile(),
         versionController,
-        tsFileResource, fileSchema, recoverMemTable);
+        tsFileResource, fileSchema, recoverMemTable, true);
     File insertFile = new File(insertFilePath);
     if (!insertFile.exists()) {
       return;
     }
     // remove corrupted part of the TsFile
-    NativeRestorableIOWriter restorableTsFileIOWriter = recoverFile(insertFile);
+    NativeRestorableIOWriter restorableTsFileIOWriter = null;
+    try {
+      restorableTsFileIOWriter = new NativeRestorableIOWriter(insertFile);
+    } catch (IOException e) {
+      throw new ProcessorException(e);
+    }
 
-    // due to failure, the last ChunkGroup may contain the same data as the WALs, so the time
-    // map must be updated first to avoid duplicated insertion
     for (ChunkGroupMetaData chunkGroupMetaData : restorableTsFileIOWriter.getChunkGroupMetaDatas()) {
       for (ChunkMetaData chunkMetaData : chunkGroupMetaData.getChunkMetaDataList()) {
         tsFileResource.updateTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getStartTime());
@@ -89,7 +91,6 @@ public class SeqTsFileRecoverPerformer {
     // redo logs
     logReplayer.replayLogs();
     if (recoverMemTable.isEmpty()) {
-      removeTruncatePosition(insertFile);
       return;
     }
 
@@ -105,8 +106,6 @@ public class SeqTsFileRecoverPerformer {
       throw new ProcessorException("Cannot setCloseMark file when recovering", e);
     }
 
-    removeTruncatePosition(insertFile);
-
     // clean logs
     try {
       MultiFileLogNodeManager.getInstance().deleteNode(logNodePrefix + new File(insertFilePath).getName());
@@ -114,63 +113,4 @@ public class SeqTsFileRecoverPerformer {
       throw new ProcessorException(e);
     }
   }
-
-
-  private NativeRestorableIOWriter recoverFile(File insertFile) throws ProcessorException {
-    long truncatePos = getTruncatePosition(insertFile);
-    NativeRestorableIOWriter restorableIOWriter;
-    if (truncatePos != -1 && insertFile.length() != truncatePos) {
-      try (FileChannel channel = new FileOutputStream(insertFile).getChannel()) {
-        channel.truncate(truncatePos);
-        restorableIOWriter = new NativeRestorableIOWriter(insertFile, true);
-      } catch (IOException e) {
-        throw new ProcessorException(e);
-      }
-    } else {
-      try {
-        restorableIOWriter = new NativeRestorableIOWriter(insertFile, true);
-        saveTruncatePosition(insertFile);
-      } catch (IOException e) {
-        throw new ProcessorException(e);
-      }
-    }
-    return restorableIOWriter;
-  }
-
-  private long getTruncatePosition(File insertFile) {
-    File parentDir = insertFile.getParentFile();
-    File[] truncatePoses = parentDir.listFiles((dir, name) -> name.contains(insertFile.getName() +
-        "@"));
-    long maxPos = -1;
-    if (truncatePoses != null) {
-      for (File truncatePos : truncatePoses) {
-        long pos = Long.parseLong(truncatePos.getName().split("@")[1]);
-        if (pos > maxPos) {
-          maxPos = pos;
-        }
-      }
-    }
-    return maxPos;
-  }
-
-  private void saveTruncatePosition(File insertFile)
-      throws IOException {
-    File truncatePosFile = new File(insertFile.getParent(),
-        insertFile.getName() + "@" + insertFile.length());
-    try (FileOutputStream outputStream = new FileOutputStream(truncatePosFile)) {
-      outputStream.write(0);
-      outputStream.flush();
-    }
-  }
-
-  private void removeTruncatePosition(File insertFile) {
-    File parentDir = insertFile.getParentFile();
-    File[] truncatePoses = parentDir.listFiles((dir, name) -> name.contains(insertFile.getName() +
-        "@"));
-    if (truncatePoses != null) {
-      for (File truncatePos : truncatePoses) {
-        truncatePos.delete();
-      }
-    }
-  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverPerformer.java
deleted file mode 100644
index fe6aa50..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverPerformer.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.writelog.recover;
-
-import java.io.File;
-import java.io.IOException;
-import org.apache.iotdb.db.engine.memtable.IMemTable;
-import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
-import org.apache.iotdb.db.engine.overflow.io.OverflowResource;
-import org.apache.iotdb.db.exception.ProcessorException;
-import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
-import org.apache.iotdb.tsfile.write.schema.FileSchema;
-
-
-/**
- * UnseqTsFileRecoverPerformer redoes the WALs since last crash and removes the redone logs.
- */
-public class UnseqTsFileRecoverPerformer {
-
-  private FileSchema fileSchema;
-  private OverflowResource resource;
-
-
-  public UnseqTsFileRecoverPerformer(OverflowResource resource, FileSchema fileSchema) {
-    this.resource = resource;
-    this.fileSchema = fileSchema;
-  }
-
-  /**
-   * The UnseqTsFile itself has already been recovered by the OverResource, and duplicated
-   * operation will not break the file, so merely redoing and flush are enough.
-   * 1. redo the WALs to recover unpersisted data
-   * 2. flush
-   * 3. clean WALs
-   * @throws ProcessorException
-   */
-  public void recover() throws ProcessorException {
-    IMemTable memTable = new PrimitiveMemTable();
-    String logNodePrefix = resource.logNodePrefix();
-    String insertFilePath = resource.getInsertFilePath();
-    LogReplayer replayer = new LogReplayer(logNodePrefix, insertFilePath,
-        resource.getModificationFile(), resource.getVersionController(), null,
-        fileSchema, memTable);
-
-    replayer.replayLogs();
-    if (memTable.isEmpty()) {
-      return;
-    }
-    try {
-      resource.flush(fileSchema, memTable, logNodePrefix, 0, (a,b) -> {});
-      resource.appendMetadatas();
-      MultiFileLogNodeManager.getInstance().deleteNode(logNodePrefix + new File(insertFilePath).getName());
-    } catch (IOException e) {
-      throw new ProcessorException(e);
-    }
-  }
-
-}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
index ee65c53..9a4823c 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
@@ -77,7 +77,7 @@ public class LogReplayerTest {
       }
 
       LogReplayer replayer = new LogReplayer(logNodePrefix, tsFile.getPath(), modFile,
-          versionController, tsFileResource, schema, memTable);
+          versionController, tsFileResource, schema, memTable, true);
 
       WriteLogNode node =
           MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsFile.getName());
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
deleted file mode 100644
index b2f36ec..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.writelog.recover;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-import java.io.IOException;
-import java.util.List;
-import org.apache.iotdb.db.engine.memtable.IMemTable;
-import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
-import org.apache.iotdb.db.engine.overflow.io.OverflowResource;
-import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.ProcessorException;
-import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
-import org.apache.iotdb.db.query.reader.unsequence.EngineChunkReader;
-import org.apache.iotdb.db.utils.TimeValuePair;
-import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
-import org.apache.iotdb.db.writelog.node.WriteLogNode;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.read.common.Chunk;
-import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
-import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
-import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
-import org.apache.iotdb.tsfile.write.record.TSRecord;
-import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
-import org.apache.iotdb.tsfile.write.schema.FileSchema;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class UnseqTsFileRecoverTest {
-
-  private FileSchema fileSchema;
-  private OverflowResource resource;
-  private VersionController versionController = new VersionController() {
-    int i;
-    @Override
-    public long nextVersion() {
-      return ++i;
-    }
-
-    @Override
-    public long currVersion() {
-      return i;
-    }
-  };
-  private String processorName = "test";
-  private String parentPath = "tempOf";
-  private String dataPath = "0";
-  private WriteLogNode node;
-
-  @Before
-  public void setup() throws IOException {
-    fileSchema = new FileSchema();
-    for (int i = 0; i < 10; i++) {
-      fileSchema.registerMeasurement(new MeasurementSchema("sensor" + i, TSDataType.INT64, TSEncoding.PLAIN));
-    }
-    resource = new OverflowResource(parentPath, dataPath, versionController, processorName);
-    IMemTable memTable = new PrimitiveMemTable();
-    for (int i = 0; i < 10; i++) {
-      for (int j = 0; j < 10; j++) {
-        TSRecord tsRecord = new TSRecord(i, "device" + j);
-        for (int k = 0; k < 10; k++) {
-          tsRecord.addTuple(DataPoint.getDataPoint(TSDataType.INT64, "sensor" + k,
-              String.valueOf(k)));
-        }
-        memTable.insert(tsRecord);
-      }
-    }
-    resource.flush(fileSchema, memTable, processorName, 0, (a,b) -> {});
-    node =
-        MultiFileLogNodeManager.getInstance().getNode(resource.logNodePrefix() + resource.getInsertFile().getName());
-    for (int i = 10; i < 20; i++) {
-      for (int j = 0; j < 10; j++) {
-        String[] measurements = new String[10];
-        String[] values = new String[10];
-        for (int k = 0; k < 10; k++) {
-          measurements[k] = "sensor" + k;
-          values[k] = String.valueOf(k);
-        }
-        InsertPlan insertPlan = new InsertPlan("device" + j, i, measurements, values);
-        node.write(insertPlan);
-      }
-      node.notifyStartFlush();
-    }
-  }
-
-  @After
-  public void teardown() throws IOException {
-    resource.close();
-    resource.deleteResource();
-    node.delete();
-  }
-
-  @Test
-  public void test() throws ProcessorException, IOException {
-    UnseqTsFileRecoverPerformer performer = new UnseqTsFileRecoverPerformer(resource, fileSchema);
-    performer.recover();
-
-    TsFileSequenceReader sequenceReader = new TsFileSequenceReader(resource.getInsertFilePath());
-    ChunkLoaderImpl chunkLoader = new ChunkLoaderImpl(sequenceReader);
-
-    String deviceId = "device1";
-    String measurementId = "sensor1";
-    QueryContext context = new QueryContext();
-    List<ChunkMetaData> chunkMetaDataList = resource.getInsertMetadatas(deviceId, measurementId,
-        TSDataType.INT64,
-        context);
-
-    int priorityValue = 0;
-    PriorityMergeReader unSeqMergeReader = new PriorityMergeReader();
-    for (ChunkMetaData chunkMetaData : chunkMetaDataList) {
-
-      Chunk chunk = chunkLoader.getChunk(chunkMetaData);
-      ChunkReader chunkReader = new ChunkReaderWithoutFilter(chunk);
-
-      unSeqMergeReader
-          .addReaderWithPriority(new EngineChunkReader(chunkReader, sequenceReader),
-              priorityValue ++);
-    }
-    for (int i = 0; i < 20; i++) {
-      TimeValuePair timeValuePair = unSeqMergeReader.current();
-      assertEquals(i, timeValuePair.getTimestamp());
-      assertEquals(1, timeValuePair.getValue().getLong());
-      unSeqMergeReader.next();
-    }
-    assertFalse(unSeqMergeReader.hasNext());
-  }
-}