You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ti...@apache.org on 2015/11/03 23:31:08 UTC

[1/2] incubator-apex-malhar git commit: MLHR-1886: moving restoring of a file in its own method and added a test that covers re-writing the file when it isn't closed before failure

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 8f671f2dd -> d081d0cee


MLHR-1886: moving restoring of a file in its own method and added a test that covers re-writing the file when it isn't closed before failure


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/8f0c27b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/8f0c27b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/8f0c27b6

Branch: refs/heads/devel-3
Commit: 8f0c27b6df5511c5048774baa78f7a2e25dd200c
Parents: 8f671f2
Author: Chandni Singh <cs...@apache.org>
Authored: Tue Nov 3 13:06:25 2015 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Tue Nov 3 13:14:48 2015 -0800

----------------------------------------------------------------------
 .../lib/io/fs/AbstractFileOutputOperator.java   | 113 +++++++++++--------
 .../io/fs/AbstractFileOutputOperatorTest.java   |  52 +++++++++
 2 files changed, 116 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/8f0c27b6/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
index 09294a2..744f024 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
@@ -351,50 +351,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
           }
 
           if (fs.exists(activeFilePath)) {
-            LOG.debug("path exists {}", activeFilePath);
-            long offset = endOffsets.get(seenFileName).longValue();
-            FSDataInputStream inputStream = fs.open(activeFilePath);
-            FileStatus status = fs.getFileStatus(activeFilePath);
-
-            if (status.getLen() != offset) {
-              LOG.info("path corrupted {} {} {}", activeFilePath, offset, status.getLen());
-              byte[] buffer = new byte[COPY_BUFFER_SIZE];
-              String recoveryFileName = seenFileNamePart + '.' + System.currentTimeMillis() + TMP_EXTENSION;
-              Path recoveryFilePath = new Path(filePath + Path.SEPARATOR + recoveryFileName);
-              FSDataOutputStream fsOutput = openStream(recoveryFilePath, false);
-
-              while (inputStream.getPos() < offset) {
-                long remainingBytes = offset - inputStream.getPos();
-                int bytesToWrite = remainingBytes < COPY_BUFFER_SIZE ? (int) remainingBytes : COPY_BUFFER_SIZE;
-                inputStream.read(buffer);
-                fsOutput.write(buffer, 0, bytesToWrite);
-              }
-
-              flush(fsOutput);
-              fsOutput.close();
-              inputStream.close();
-
-              FileContext fileContext = FileContext.getFileContext(fs.getUri());
-              LOG.debug("active {} recovery {} ", activeFilePath, recoveryFilePath);
-
-              if (alwaysWriteToTmp) {
-                //recovery file is used as the new tmp file and we cannot delete the old tmp file because when the operator
-                //is restored to an earlier check-pointed window, it will look for an older tmp.
-                fileNameToTmpName.put(seenFileNamePart, recoveryFileName);
-              } else {
-                LOG.debug("recovery path {} actual path {} ", recoveryFilePath, status.getPath());
-                fileContext.rename(recoveryFilePath, status.getPath(), Options.Rename.OVERWRITE);
-              }
-            } else {
-              if (alwaysWriteToTmp && filesWithOpenStreams.contains(seenFileName)) {
-                String currentTmp = seenFileNamePart + '.' + System.currentTimeMillis() + TMP_EXTENSION;
-                FSDataOutputStream outputStream = openStream(new Path(filePath + Path.SEPARATOR + currentTmp), false);
-                IOUtils.copy(inputStream, outputStream);
-                outputStream.close();
-                fileNameToTmpName.put(seenFileNamePart, currentTmp);
-              }
-              inputStream.close();
-            }
+            recoverFile(seenFileName, seenFileNamePart, activeFilePath);
           }
         }
       }
@@ -448,10 +405,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
           }
         }
       }
-
       LOG.debug("setup completed");
-      LOG.debug("end-offsets {}", endOffsets);
-
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -463,6 +417,68 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
   }
 
   /**
+   * Recovers a file which exists on the disk. If the length of the file is not same as the
+   * length which the operator remembers then the file is truncated. <br/>
+   * When always writing to a temporary file, then a file is restored even when the length is same as what the
+   * operator remembers however this is done only for files which had open streams that weren't closed before
+   * failure.
+   *
+   * @param filename     name of the actual file.
+   * @param partFileName name of the part file. When not rolling this is same as filename; otherwise this is the
+   *                     latest open part file name.
+   * @param filepath     path of the file. When always writing to temp file, this is the path of the temp file; otherwise
+   *                     path of the actual file.
+   * @throws IOException
+   */
+  private void recoverFile(String filename, String partFileName, Path filepath) throws IOException
+  {
+    LOG.debug("path exists {}", filepath);
+    long offset = endOffsets.get(filename).longValue();
+    FSDataInputStream inputStream = fs.open(filepath);
+    FileStatus status = fs.getFileStatus(filepath);
+
+    if (status.getLen() != offset) {
+      LOG.info("path corrupted {} {} {}", filepath, offset, status.getLen());
+      byte[] buffer = new byte[COPY_BUFFER_SIZE];
+      String recoveryFileName = partFileName + '.' + System.currentTimeMillis() + TMP_EXTENSION;
+      Path recoveryFilePath = new Path(filePath + Path.SEPARATOR + recoveryFileName);
+      FSDataOutputStream fsOutput = openStream(recoveryFilePath, false);
+
+      while (inputStream.getPos() < offset) {
+        long remainingBytes = offset - inputStream.getPos();
+        int bytesToWrite = remainingBytes < COPY_BUFFER_SIZE ? (int)remainingBytes : COPY_BUFFER_SIZE;
+        inputStream.read(buffer);
+        fsOutput.write(buffer, 0, bytesToWrite);
+      }
+
+      flush(fsOutput);
+      fsOutput.close();
+      inputStream.close();
+
+      FileContext fileContext = FileContext.getFileContext(fs.getUri());
+      LOG.debug("active {} recovery {} ", filepath, recoveryFilePath);
+
+      if (alwaysWriteToTmp) {
+        //recovery file is used as the new tmp file and we cannot delete the old tmp file because when the operator
+        //is restored to an earlier check-pointed window, it will look for an older tmp.
+        fileNameToTmpName.put(partFileName, recoveryFileName);
+      } else {
+        LOG.debug("recovery path {} actual path {} ", recoveryFilePath, status.getPath());
+        fileContext.rename(recoveryFilePath, status.getPath(), Options.Rename.OVERWRITE);
+      }
+    } else {
+      if (alwaysWriteToTmp && filesWithOpenStreams.contains(filename)) {
+        String currentTmp = partFileName + '.' + System.currentTimeMillis() + TMP_EXTENSION;
+        FSDataOutputStream outputStream = openStream(new Path(filePath + Path.SEPARATOR + currentTmp), false);
+        IOUtils.copy(inputStream, outputStream);
+        streamsCache.put(filename, new FSFilterStreamContext(outputStream));
+        fileNameToTmpName.put(partFileName, currentTmp);
+      }
+      inputStream.close();
+    }
+  }
+
+  /**
    * Creates the {@link CacheLoader} for loading an output stream when it is not present in the cache.
    * @return cache loader
    */
@@ -656,7 +672,6 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
       fileName = getPartFileNamePri(fileName);
       part.setValue(currentOpenPart.getValue());
     }
-    LOG.debug("request finalize {}", fileName);
     filesPerWindow.add(fileName);
   }
 
@@ -1215,7 +1230,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
         //a tmp file has tmp extension always preceded by timestamp
         String actualFileName = statusName.substring(0, statusName.lastIndexOf('.', statusName.lastIndexOf('.') - 1));
         if (fileName.equals(actualFileName)) {
-          LOG.debug("deleting vagrant file {}", statusName);
+          LOG.debug("deleting stray file {}", statusName);
           fs.delete(status.getPath(), true);
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/8f0c27b6/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
index f7d1731..cbcc8b4 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java
@@ -1883,6 +1883,58 @@ public class AbstractFileOutputOperatorTest
     checkCompressedFile(oddFile, oddOffsets, 1, 5, 1000, null, null);
   }
 
+  @Test
+  public void testRecoveryOfOpenFiles()
+  {
+    EvenOddHDFSExactlyOnceWriter writer = new EvenOddHDFSExactlyOnceWriter();
+    writer.setMaxLength(4);
+    File meta = new File(testMeta.getDir());
+    writer.setFilePath(meta.getAbsolutePath());
+    writer.setAlwaysWriteToTmp(true);
+    writer.setup(testMeta.testOperatorContext);
+
+    writer.beginWindow(0);
+    writer.input.put(0);
+    writer.input.put(1);
+    writer.input.put(2);
+    writer.input.put(3);
+    writer.endWindow();
+
+    //failure and restored
+    writer.setup(testMeta.testOperatorContext);
+    writer.input.put(4);
+    writer.input.put(5);
+    writer.endWindow();
+
+    writer.beginWindow(1);
+    writer.input.put(6);
+    writer.input.put(7);
+    writer.input.put(8);
+    writer.input.put(9);
+    writer.input.put(6);
+    writer.input.put(7);
+    writer.endWindow();
+
+    writer.committed(1);
+
+    //Part 0 checks
+    String evenFileName = testMeta.getDir() + File.separator + EVEN_FILE;
+    String correctContents = "0\n" + "2\n" + "4\n";
+    checkOutput(0, evenFileName, correctContents);
+
+    String oddFileName = testMeta.getDir() + File.separator + ODD_FILE;
+    correctContents = "1\n" + "3\n" + "5\n";
+    checkOutput(0, oddFileName, correctContents);
+
+
+    //Part 1 checks
+    correctContents = "6\n" + "8\n" + "6\n";
+    checkOutput(1, evenFileName, correctContents);
+
+    correctContents = "7\n" + "9\n" + "7\n";
+    checkOutput(1, oddFileName, correctContents);
+  }
+
   private void checkCompressedFile(File file, List<Long> offsets, int startVal, int totalWindows, int totalRecords, SecretKey secretKey, byte[] iv) throws IOException
   {
     FileInputStream fis;


[2/2] incubator-apex-malhar git commit: Merge branch 'MLHR-1886' into devel-3

Posted by ti...@apache.org.
Merge branch 'MLHR-1886' into devel-3


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/d081d0ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/d081d0ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/d081d0ce

Branch: refs/heads/devel-3
Commit: d081d0cee2ff5f0194df11f6ab030c0ff545c601
Parents: 8f671f2 8f0c27b
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Tue Nov 3 14:30:17 2015 -0800
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Tue Nov 3 14:30:17 2015 -0800

----------------------------------------------------------------------
 .../lib/io/fs/AbstractFileOutputOperator.java   | 113 +++++++++++--------
 .../io/fs/AbstractFileOutputOperatorTest.java   |  52 +++++++++
 2 files changed, 116 insertions(+), 49 deletions(-)
----------------------------------------------------------------------