You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/06/22 08:34:27 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: add
unseq recover performer
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new 4884a76 add unseq recover performer
new 62f1cd7 Merge branch 'feature_async_close_tsfile' of github.com:apache/incubator-iotdb into feature_async_close_tsfile
4884a76 is described below
commit 4884a7611df575d31b84f8c6676a9404f160d2ac
Author: 江天 <jt...@163.com>
AuthorDate: Sat Jun 22 16:31:55 2019 +0800
add unseq recover performer
---
.../db/engine/overflow/io/OverflowProcessor.java | 7 +-
.../iotdb/db/writelog/recover/LogReplayer.java | 11 +-
.../recover/SeqTsFileRecoverPerformer.java | 2 +-
...ormer.java => UnSeqTsFileRecoverPerformer.java} | 84 ++---------
.../recover/UnseqTsFileRecoverPerformer.java | 75 ----------
.../iotdb/db/writelog/recover/LogReplayerTest.java | 2 +-
.../writelog/recover/UnseqTsFileRecoverTest.java | 153 ---------------------
7 files changed, 25 insertions(+), 309 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
index 9bc42d0..ebbcfc4 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/overflow/io/OverflowProcessor.java
@@ -63,7 +63,6 @@ import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.utils.ImmediateFuture;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
-import org.apache.iotdb.db.writelog.recover.UnseqTsFileRecoverPerformer;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -192,9 +191,9 @@ public class OverflowProcessor extends Processor {
throw new ProcessorException(e);
}
- UnseqTsFileRecoverPerformer recoverPerformer =
- new UnseqTsFileRecoverPerformer(workResource, fileSchema);
- recoverPerformer.recover();
+// UnseqTsFileRecoverPerformer recoverPerformer =
+// new UnseqTsFileRecoverPerformer(workResource, fileSchema);
+// recoverPerformer.recover();
}
private String[] clearFile(String[] subFilePaths) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
index 75e65c2..960b78d 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java
@@ -56,11 +56,14 @@ public class LogReplayer {
private FileSchema fileSchema;
private IMemTable recoverMemTable;
+ // overflow file tolerates duplicated data
+ private boolean acceptDuplication;
+
public LogReplayer(String logNodePrefix, String insertFilePath,
ModificationFile modFile,
VersionController versionController,
TsFileResource currentTsFileResource,
- FileSchema fileSchema, IMemTable memTable) {
+ FileSchema fileSchema, IMemTable memTable, boolean acceptDuplication) {
this.logNodePrefix = logNodePrefix;
this.insertFilePath = insertFilePath;
this.modFile = modFile;
@@ -68,6 +71,7 @@ public class LogReplayer {
this.currentTsFileResource = currentTsFileResource;
this.fileSchema = fileSchema;
this.recoverMemTable = memTable;
+ this.acceptDuplication = acceptDuplication;
}
/**
@@ -112,8 +116,9 @@ public class LogReplayer {
private void replayInsert(InsertPlan insertPlan) {
TSRecord tsRecord = new TSRecord(insertPlan.getTime(), insertPlan.getDeviceId());
if (currentTsFileResource != null) {
- // the last chunk group may contain the same data with the logs, ignore such logs
- if (currentTsFileResource.getEndTime(insertPlan.getDeviceId()) >= insertPlan.getTime()) {
+ // the last chunk group may contain the same data with the logs, ignore such logs in seq file
+ if (currentTsFileResource.getEndTime(insertPlan.getDeviceId()) >= insertPlan.getTime() &&
+ !acceptDuplication) {
return;
}
currentTsFileResource.updateTime(insertPlan.getDeviceId(), insertPlan.getTime());
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java
index 339a39a..8c6d324 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java
@@ -69,7 +69,7 @@ public class SeqTsFileRecoverPerformer {
IMemTable recoverMemTable = new PrimitiveMemTable();
this.logReplayer = new LogReplayer(logNodePrefix, insertFilePath, tsFileResource.getModFile(),
versionController,
- tsFileResource, fileSchema, recoverMemTable);
+ tsFileResource, fileSchema, recoverMemTable, false);
File insertFile = new File(insertFilePath);
if (!insertFile.exists()) {
return;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/UnSeqTsFileRecoverPerformer.java
similarity index 59%
copy from iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java
copy to iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/UnSeqTsFileRecoverPerformer.java
index 339a39a..756f0a7 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/SeqTsFileRecoverPerformer.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/UnSeqTsFileRecoverPerformer.java
@@ -39,7 +39,7 @@ import org.apache.iotdb.tsfile.write.writer.NativeRestorableIOWriter;
* SeqTsFileRecoverPerformer recovers a SeqTsFile to correct status, redoes the WALs since last
* crash and removes the redone logs.
*/
-public class SeqTsFileRecoverPerformer {
+public class UnSeqTsFileRecoverPerformer {
private String insertFilePath;
private String logNodePrefix;
@@ -48,7 +48,7 @@ public class SeqTsFileRecoverPerformer {
private LogReplayer logReplayer;
private TsFileResource tsFileResource;
- public SeqTsFileRecoverPerformer(String logNodePrefix,
+ public UnSeqTsFileRecoverPerformer(String logNodePrefix,
FileSchema fileSchema, VersionController versionController,
TsFileResource currentTsFileResource) {
this.insertFilePath = currentTsFileResource.getFilePath();
@@ -59,26 +59,28 @@ public class SeqTsFileRecoverPerformer {
}
/**
- * 1. recover the TsFile by NativeRestorableIOWriter and truncate position of last recovery
- * 2. redo the WALs to recover unpersisted data
- * 3. flush and close the file
- * 4. clean WALs
+ * 1. redo the WALs to recover unpersisted data
+ * 2. flush and close the file
+ * 3. clean WALs
* @throws ProcessorException
*/
public void recover() throws ProcessorException {
IMemTable recoverMemTable = new PrimitiveMemTable();
this.logReplayer = new LogReplayer(logNodePrefix, insertFilePath, tsFileResource.getModFile(),
versionController,
- tsFileResource, fileSchema, recoverMemTable);
+ tsFileResource, fileSchema, recoverMemTable, true);
File insertFile = new File(insertFilePath);
if (!insertFile.exists()) {
return;
}
// remove corrupted part of the TsFile
- NativeRestorableIOWriter restorableTsFileIOWriter = recoverFile(insertFile);
+ NativeRestorableIOWriter restorableTsFileIOWriter = null;
+ try {
+ restorableTsFileIOWriter = new NativeRestorableIOWriter(insertFile);
+ } catch (IOException e) {
+ throw new ProcessorException(e);
+ }
- // due to failure, the last ChunkGroup may contain the same data as the WALs, so the time
- // map must be updated first to avoid duplicated insertion
for (ChunkGroupMetaData chunkGroupMetaData : restorableTsFileIOWriter.getChunkGroupMetaDatas()) {
for (ChunkMetaData chunkMetaData : chunkGroupMetaData.getChunkMetaDataList()) {
tsFileResource.updateTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getStartTime());
@@ -89,7 +91,6 @@ public class SeqTsFileRecoverPerformer {
// redo logs
logReplayer.replayLogs();
if (recoverMemTable.isEmpty()) {
- removeTruncatePosition(insertFile);
return;
}
@@ -105,8 +106,6 @@ public class SeqTsFileRecoverPerformer {
throw new ProcessorException("Cannot setCloseMark file when recovering", e);
}
- removeTruncatePosition(insertFile);
-
// clean logs
try {
MultiFileLogNodeManager.getInstance().deleteNode(logNodePrefix + new File(insertFilePath).getName());
@@ -114,63 +113,4 @@ public class SeqTsFileRecoverPerformer {
throw new ProcessorException(e);
}
}
-
-
- private NativeRestorableIOWriter recoverFile(File insertFile) throws ProcessorException {
- long truncatePos = getTruncatePosition(insertFile);
- NativeRestorableIOWriter restorableIOWriter;
- if (truncatePos != -1 && insertFile.length() != truncatePos) {
- try (FileChannel channel = new FileOutputStream(insertFile).getChannel()) {
- channel.truncate(truncatePos);
- restorableIOWriter = new NativeRestorableIOWriter(insertFile, true);
- } catch (IOException e) {
- throw new ProcessorException(e);
- }
- } else {
- try {
- restorableIOWriter = new NativeRestorableIOWriter(insertFile, true);
- saveTruncatePosition(insertFile);
- } catch (IOException e) {
- throw new ProcessorException(e);
- }
- }
- return restorableIOWriter;
- }
-
- private long getTruncatePosition(File insertFile) {
- File parentDir = insertFile.getParentFile();
- File[] truncatePoses = parentDir.listFiles((dir, name) -> name.contains(insertFile.getName() +
- "@"));
- long maxPos = -1;
- if (truncatePoses != null) {
- for (File truncatePos : truncatePoses) {
- long pos = Long.parseLong(truncatePos.getName().split("@")[1]);
- if (pos > maxPos) {
- maxPos = pos;
- }
- }
- }
- return maxPos;
- }
-
- private void saveTruncatePosition(File insertFile)
- throws IOException {
- File truncatePosFile = new File(insertFile.getParent(),
- insertFile.getName() + "@" + insertFile.length());
- try (FileOutputStream outputStream = new FileOutputStream(truncatePosFile)) {
- outputStream.write(0);
- outputStream.flush();
- }
- }
-
- private void removeTruncatePosition(File insertFile) {
- File parentDir = insertFile.getParentFile();
- File[] truncatePoses = parentDir.listFiles((dir, name) -> name.contains(insertFile.getName() +
- "@"));
- if (truncatePoses != null) {
- for (File truncatePos : truncatePoses) {
- truncatePos.delete();
- }
- }
- }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverPerformer.java b/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverPerformer.java
deleted file mode 100644
index fe6aa50..0000000
--- a/iotdb/src/main/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverPerformer.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.writelog.recover;
-
-import java.io.File;
-import java.io.IOException;
-import org.apache.iotdb.db.engine.memtable.IMemTable;
-import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
-import org.apache.iotdb.db.engine.overflow.io.OverflowResource;
-import org.apache.iotdb.db.exception.ProcessorException;
-import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
-import org.apache.iotdb.tsfile.write.schema.FileSchema;
-
-
-/**
- * UnseqTsFileRecoverPerformer redoes the WALs since last crash and removes the redone logs.
- */
-public class UnseqTsFileRecoverPerformer {
-
- private FileSchema fileSchema;
- private OverflowResource resource;
-
-
- public UnseqTsFileRecoverPerformer(OverflowResource resource, FileSchema fileSchema) {
- this.resource = resource;
- this.fileSchema = fileSchema;
- }
-
- /**
- * The UnseqTsFile itself has already been recovered by the OverResource, and duplicated
- * operation will not break the file, so merely redoing and flush are enough.
- * 1. redo the WALs to recover unpersisted data
- * 2. flush
- * 3. clean WALs
- * @throws ProcessorException
- */
- public void recover() throws ProcessorException {
- IMemTable memTable = new PrimitiveMemTable();
- String logNodePrefix = resource.logNodePrefix();
- String insertFilePath = resource.getInsertFilePath();
- LogReplayer replayer = new LogReplayer(logNodePrefix, insertFilePath,
- resource.getModificationFile(), resource.getVersionController(), null,
- fileSchema, memTable);
-
- replayer.replayLogs();
- if (memTable.isEmpty()) {
- return;
- }
- try {
- resource.flush(fileSchema, memTable, logNodePrefix, 0, (a,b) -> {});
- resource.appendMetadatas();
- MultiFileLogNodeManager.getInstance().deleteNode(logNodePrefix + new File(insertFilePath).getName());
- } catch (IOException e) {
- throw new ProcessorException(e);
- }
- }
-
-}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
index ee65c53..9a4823c 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/LogReplayerTest.java
@@ -77,7 +77,7 @@ public class LogReplayerTest {
}
LogReplayer replayer = new LogReplayer(logNodePrefix, tsFile.getPath(), modFile,
- versionController, tsFileResource, schema, memTable);
+ versionController, tsFileResource, schema, memTable, true);
WriteLogNode node =
MultiFileLogNodeManager.getInstance().getNode(logNodePrefix + tsFile.getName());
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java b/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
deleted file mode 100644
index b2f36ec..0000000
--- a/iotdb/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.writelog.recover;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-import java.io.IOException;
-import java.util.List;
-import org.apache.iotdb.db.engine.memtable.IMemTable;
-import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
-import org.apache.iotdb.db.engine.overflow.io.OverflowResource;
-import org.apache.iotdb.db.engine.version.VersionController;
-import org.apache.iotdb.db.exception.ProcessorException;
-import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
-import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.reader.merge.PriorityMergeReader;
-import org.apache.iotdb.db.query.reader.unsequence.EngineChunkReader;
-import org.apache.iotdb.db.utils.TimeValuePair;
-import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
-import org.apache.iotdb.db.writelog.node.WriteLogNode;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
-import org.apache.iotdb.tsfile.read.common.Chunk;
-import org.apache.iotdb.tsfile.read.controller.ChunkLoaderImpl;
-import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
-import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderWithoutFilter;
-import org.apache.iotdb.tsfile.write.record.TSRecord;
-import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
-import org.apache.iotdb.tsfile.write.schema.FileSchema;
-import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class UnseqTsFileRecoverTest {
-
- private FileSchema fileSchema;
- private OverflowResource resource;
- private VersionController versionController = new VersionController() {
- int i;
- @Override
- public long nextVersion() {
- return ++i;
- }
-
- @Override
- public long currVersion() {
- return i;
- }
- };
- private String processorName = "test";
- private String parentPath = "tempOf";
- private String dataPath = "0";
- private WriteLogNode node;
-
- @Before
- public void setup() throws IOException {
- fileSchema = new FileSchema();
- for (int i = 0; i < 10; i++) {
- fileSchema.registerMeasurement(new MeasurementSchema("sensor" + i, TSDataType.INT64, TSEncoding.PLAIN));
- }
- resource = new OverflowResource(parentPath, dataPath, versionController, processorName);
- IMemTable memTable = new PrimitiveMemTable();
- for (int i = 0; i < 10; i++) {
- for (int j = 0; j < 10; j++) {
- TSRecord tsRecord = new TSRecord(i, "device" + j);
- for (int k = 0; k < 10; k++) {
- tsRecord.addTuple(DataPoint.getDataPoint(TSDataType.INT64, "sensor" + k,
- String.valueOf(k)));
- }
- memTable.insert(tsRecord);
- }
- }
- resource.flush(fileSchema, memTable, processorName, 0, (a,b) -> {});
- node =
- MultiFileLogNodeManager.getInstance().getNode(resource.logNodePrefix() + resource.getInsertFile().getName());
- for (int i = 10; i < 20; i++) {
- for (int j = 0; j < 10; j++) {
- String[] measurements = new String[10];
- String[] values = new String[10];
- for (int k = 0; k < 10; k++) {
- measurements[k] = "sensor" + k;
- values[k] = String.valueOf(k);
- }
- InsertPlan insertPlan = new InsertPlan("device" + j, i, measurements, values);
- node.write(insertPlan);
- }
- node.notifyStartFlush();
- }
- }
-
- @After
- public void teardown() throws IOException {
- resource.close();
- resource.deleteResource();
- node.delete();
- }
-
- @Test
- public void test() throws ProcessorException, IOException {
- UnseqTsFileRecoverPerformer performer = new UnseqTsFileRecoverPerformer(resource, fileSchema);
- performer.recover();
-
- TsFileSequenceReader sequenceReader = new TsFileSequenceReader(resource.getInsertFilePath());
- ChunkLoaderImpl chunkLoader = new ChunkLoaderImpl(sequenceReader);
-
- String deviceId = "device1";
- String measurementId = "sensor1";
- QueryContext context = new QueryContext();
- List<ChunkMetaData> chunkMetaDataList = resource.getInsertMetadatas(deviceId, measurementId,
- TSDataType.INT64,
- context);
-
- int priorityValue = 0;
- PriorityMergeReader unSeqMergeReader = new PriorityMergeReader();
- for (ChunkMetaData chunkMetaData : chunkMetaDataList) {
-
- Chunk chunk = chunkLoader.getChunk(chunkMetaData);
- ChunkReader chunkReader = new ChunkReaderWithoutFilter(chunk);
-
- unSeqMergeReader
- .addReaderWithPriority(new EngineChunkReader(chunkReader, sequenceReader),
- priorityValue ++);
- }
- for (int i = 0; i < 20; i++) {
- TimeValuePair timeValuePair = unSeqMergeReader.current();
- assertEquals(i, timeValuePair.getTimestamp());
- assertEquals(1, timeValuePair.getValue().getLong());
- unSeqMergeReader.next();
- }
- assertFalse(unSeqMergeReader.hasNext());
- }
-}