You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ti...@apache.org on 2015/11/03 23:31:08 UTC
[1/2] incubator-apex-malhar git commit: MLHR-1886: moving restoring
of a file in its own method and added a test that covers re-writing the file
when it isn't closed before failure
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/devel-3 8f671f2dd -> d081d0cee
MLHR-1886: moving restoring of a file in its own method and added a test that covers re-writing the file when it isn't closed before failure
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/8f0c27b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/8f0c27b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/8f0c27b6
Branch: refs/heads/devel-3
Commit: 8f0c27b6df5511c5048774baa78f7a2e25dd200c
Parents: 8f671f2
Author: Chandni Singh <cs...@apache.org>
Authored: Tue Nov 3 13:06:25 2015 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Tue Nov 3 13:14:48 2015 -0800
----------------------------------------------------------------------
.../lib/io/fs/AbstractFileOutputOperator.java | 113 +++++++++++--------
.../io/fs/AbstractFileOutputOperatorTest.java | 52 +++++++++
2 files changed, 116 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/8f0c27b6/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
index 09294a2..744f024 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
@@ -351,50 +351,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
}
if (fs.exists(activeFilePath)) {
- LOG.debug("path exists {}", activeFilePath);
- long offset = endOffsets.get(seenFileName).longValue();
- FSDataInputStream inputStream = fs.open(activeFilePath);
- FileStatus status = fs.getFileStatus(activeFilePath);
-
- if (status.getLen() != offset) {
- LOG.info("path corrupted {} {} {}", activeFilePath, offset, status.getLen());
- byte[] buffer = new byte[COPY_BUFFER_SIZE];
- String recoveryFileName = seenFileNamePart + '.' + System.currentTimeMillis() + TMP_EXTENSION;
- Path recoveryFilePath = new Path(filePath + Path.SEPARATOR + recoveryFileName);
- FSDataOutputStream fsOutput = openStream(recoveryFilePath, false);
-
- while (inputStream.getPos() < offset) {
- long remainingBytes = offset - inputStream.getPos();
- int bytesToWrite = remainingBytes < COPY_BUFFER_SIZE ? (int) remainingBytes : COPY_BUFFER_SIZE;
- inputStream.read(buffer);
- fsOutput.write(buffer, 0, bytesToWrite);
- }
-
- flush(fsOutput);
- fsOutput.close();
- inputStream.close();
-
- FileContext fileContext = FileContext.getFileContext(fs.getUri());
- LOG.debug("active {} recovery {} ", activeFilePath, recoveryFilePath);
-
- if (alwaysWriteToTmp) {
- //recovery file is used as the new tmp file and we cannot delete the old tmp file because when the operator
- //is restored to an earlier check-pointed window, it will look for an older tmp.
- fileNameToTmpName.put(seenFileNamePart, recoveryFileName);
- } else {
- LOG.debug("recovery path {} actual path {} ", recoveryFilePath, status.getPath());
- fileContext.rename(recoveryFilePath, status.getPath(), Options.Rename.OVERWRITE);
- }
- } else {
- if (alwaysWriteToTmp && filesWithOpenStreams.contains(seenFileName)) {
- String currentTmp = seenFileNamePart + '.' + System.currentTimeMillis() + TMP_EXTENSION;
- FSDataOutputStream outputStream = openStream(new Path(filePath + Path.SEPARATOR + currentTmp), false);
- IOUtils.copy(inputStream, outputStream);
- outputStream.close();
- fileNameToTmpName.put(seenFileNamePart, currentTmp);
- }
- inputStream.close();
- }
+ recoverFile(seenFileName, seenFileNamePart, activeFilePath);
}
}
}
@@ -448,10 +405,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
}
}
}
-
LOG.debug("setup completed");
- LOG.debug("end-offsets {}", endOffsets);
-
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -463,6 +417,68 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
}
/**
+ * Recovers a file which exists on the disk. If the length of the file is not same as the
+ * length which the operator remembers then the file is truncated. <br/>
+ * When always writing to a temporary file, then a file is restored even when the length is same as what the
+ * operator remembers however this is done only for files which had open streams that weren't closed before
+ * failure.
+ *
+ * @param filename name of the actual file.
+ * @param partFileName name of the part file. When not rolling this is same as filename; otherwise this is the
+ * latest open part file name.
+ * @param filepath path of the file. When always writing to temp file, this is the path of the temp file; otherwise
+ * path of the actual file.
+ * @throws IOException
+ */
+ private void recoverFile(String filename, String partFileName, Path filepath) throws IOException
+ {
+ LOG.debug("path exists {}", filepath);
+ long offset = endOffsets.get(filename).longValue();
+ FSDataInputStream inputStream = fs.open(filepath);
+ FileStatus status = fs.getFileStatus(filepath);
+
+ if (status.getLen() != offset) {
+ LOG.info("path corrupted {} {} {}", filepath, offset, status.getLen());
+ byte[] buffer = new byte[COPY_BUFFER_SIZE];
+ String recoveryFileName = partFileName + '.' + System.currentTimeMillis() + TMP_EXTENSION;
+ Path recoveryFilePath = new Path(filePath + Path.SEPARATOR + recoveryFileName);
+ FSDataOutputStream fsOutput = openStream(recoveryFilePath, false);
+
+ while (inputStream.getPos() < offset) {
+ long remainingBytes = offset - inputStream.getPos();
+ int bytesToWrite = remainingBytes < COPY_BUFFER_SIZE ? (int)remainingBytes : COPY_BUFFER_SIZE;
+ inputStream.read(buffer);
+ fsOutput.write(buffer, 0, bytesToWrite);
+ }
+
+ flush(fsOutput);
+ fsOutput.close();
+ inputStream.close();
+
+ FileContext fileContext = FileContext.getFileContext(fs.getUri());
+ LOG.debug("active {} recovery {} ", filepath, recoveryFilePath);
+
+ if (alwaysWriteToTmp) {
+ //recovery file is used as the new tmp file and we cannot delete the old tmp file because when the operator
+ //is restored to an earlier check-pointed window, it will look for an older tmp.
+ fileNameToTmpName.put(partFileName, recoveryFileName);
+ } else {
+ LOG.debug("recovery path {} actual path {} ", recoveryFilePath, status.getPath());
+ fileContext.rename(recoveryFilePath, status.getPath(), Options.Rename.OVERWRITE);
+ }
+ } else {
+ if (alwaysWriteToTmp && filesWithOpenStreams.contains(filename)) {
+ String currentTmp = partFileName + '.' + System.currentTimeMillis() + TMP_EXTENSION;
+ FSDataOutputStream outputStream = openStream(new Path(filePath + Path.SEPARATOR + currentTmp), false);
+ IOUtils.copy(inputStream, outputStream);
+ streamsCache.put(filename, new FSFilterStreamContext(outputStream));
+ fileNameToTmpName.put(partFileName, currentTmp);
+ }
+ inputStream.close();
+ }
+ }
+
+ /**
* Creates the {@link CacheLoader} for loading an output stream when it is not present in the cache.
* @return cache loader
*/
@@ -656,7 +672,6 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
fileName = getPartFileNamePri(fileName);
part.setValue(currentOpenPart.getValue());
}
- LOG.debug("request finalize {}", fileName);
filesPerWindow.add(fileName);
}
@@ -1215,7 +1230,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
//a tmp file has tmp extension always preceded by timestamp
String actualFileName = statusName.substring(0, statusName.lastIndexOf('.', statusName.lastIndexOf('.') - 1));
if (fileName.equals(actualFileName)) {
- LOG.debug("deleting vagrant file {}", statusName);
+ LOG.debug("deleting stray file {}", statusName);
fs.delete(status.getPath(), true);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/8f0c27b6/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
index f7d1731..cbcc8b4 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
@@ -1883,6 +1883,58 @@ public class AbstractFileOutputOperatorTest
checkCompressedFile(oddFile, oddOffsets, 1, 5, 1000, null, null);
}
+ @Test
+ public void testRecoveryOfOpenFiles()
+ {
+ EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
+ writer.setMaxLength(4);
+ File meta = new File(testMeta.getDir());
+ writer.setFilePath(meta.getAbsolutePath());
+ writer.setAlwaysWriteToTmp(true);
+ writer.setup(testMeta.testOperatorContext);
+
+ writer.beginWindow(0);
+ writer.input.put(0);
+ writer.input.put(1);
+ writer.input.put(2);
+ writer.input.put(3);
+ writer.endWindow();
+
+ //failure and restored
+ writer.setup(testMeta.testOperatorContext);
+ writer.input.put(4);
+ writer.input.put(5);
+ writer.endWindow();
+
+ writer.beginWindow(1);
+ writer.input.put(6);
+ writer.input.put(7);
+ writer.input.put(8);
+ writer.input.put(9);
+ writer.input.put(6);
+ writer.input.put(7);
+ writer.endWindow();
+
+ writer.committed(1);
+
+ //Part 0 checks
+ String evenFileName = testMeta.getDir() + File.separator + EVEN_FILE;
+ String correctContents = "0\n" + "2\n" + "4\n";
+ checkOutput(0, evenFileName, correctContents);
+
+ String oddFileName = testMeta.getDir() + File.separator + ODD_FILE;
+ correctContents = "1\n" + "3\n" + "5\n";
+ checkOutput(0, oddFileName, correctContents);
+
+
+ //Part 1 checks
+ correctContents = "6\n" + "8\n" + "6\n";
+ checkOutput(1, evenFileName, correctContents);
+
+ correctContents = "7\n" + "9\n" + "7\n";
+ checkOutput(1, oddFileName, correctContents);
+ }
+
private void checkCompressedFile(File file, List<Long> offsets, int startVal, int totalWindows, int totalRecords, SecretKey secretKey, byte[] iv) throws IOException
{
FileInputStream fis;
[2/2] incubator-apex-malhar git commit: Merge branch 'MLHR-1886' into
devel-3
Posted by ti...@apache.org.
Merge branch 'MLHR-1886' into devel-3
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/d081d0ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/d081d0ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/d081d0ce
Branch: refs/heads/devel-3
Commit: d081d0cee2ff5f0194df11f6ab030c0ff545c601
Parents: 8f671f2 8f0c27b
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Tue Nov 3 14:30:17 2015 -0800
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Tue Nov 3 14:30:17 2015 -0800
----------------------------------------------------------------------
.../lib/io/fs/AbstractFileOutputOperator.java | 113 +++++++++++--------
.../io/fs/AbstractFileOutputOperatorTest.java | 52 +++++++++
2 files changed, 116 insertions(+), 49 deletions(-)
----------------------------------------------------------------------