You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2015/09/04 07:00:58 UTC

[1/2] incubator-apex-malhar git commit: MLHR-1825 #comment handling case when no open part is updated but stream is not created

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 cdc8b28af -> 30e4b1ff6


MLHR-1825 #comment handling case when no open part is updated but stream is not created


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/c6da4a05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/c6da4a05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/c6da4a05

Branch: refs/heads/devel-3
Commit: c6da4a05e3df215a39eb06752e31760bb206528b
Parents: 91321ce
Author: Chandni Singh <cs...@apache.org>
Authored: Tue Sep 1 15:57:10 2015 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Wed Sep 2 13:48:12 2015 -0700

----------------------------------------------------------------------
 .../lib/io/fs/AbstractFileOutputOperator.java   | 36 +++++++++++++-------
 1 file changed, 23 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c6da4a05/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
index 3f45afb..a589751 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
@@ -484,34 +484,40 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
         //of this operator.
         for(String seenFileName: endOffsets.keySet()) {
           try {
-            Integer part = openPart.get(seenFileName).getValue() + 1;
+            Integer fileOpenPart = this.openPart.get(seenFileName).getValue();
+            int nextPart = fileOpenPart + 1;
             String seenPartFileName;
             while (true) {
-              seenPartFileName = getPartFileName(seenFileName, part);
-              Path activePath;
+              seenPartFileName = getPartFileName(seenFileName, nextPart);
+              Path activePath = null;
               if (alwaysWriteToTmp) {
                 String tmpFileName = fileNameToTmpName.get(seenPartFileName);
-                activePath = new Path(filePath + Path.SEPARATOR + tmpFileName);
+                if (tmpFileName != null) {
+                  activePath = new Path(filePath + Path.SEPARATOR + tmpFileName);
+                }
               } else {
                 activePath = new Path(filePath + Path.SEPARATOR + seenPartFileName);
               }
-              if (!fs.exists(activePath)) {
+              if (activePath == null || !fs.exists(activePath)) {
                 break;
               }
 
               fs.delete(activePath, true);
-              part = part + 1;
+              nextPart++;
             }
 
-            seenPartFileName = getPartFileName(seenFileName, openPart.get(seenFileName).intValue());
-            Path activeFilePath;
+            seenPartFileName = getPartFileName(seenFileName, fileOpenPart);
+            Path activePath = null;
             if (alwaysWriteToTmp) {
-              activeFilePath = new Path(filePath + Path.SEPARATOR + fileNameToTmpName.get(seenPartFileName));
+              String tmpFileName = fileNameToTmpName.get(seenPartFileName);
+              if (tmpFileName != null) {
+                activePath = new Path(filePath + Path.SEPARATOR + fileNameToTmpName.get(seenPartFileName));
+              }
             } else {
-              activeFilePath = new Path(filePath + Path.SEPARATOR + seenPartFileName);
+              activePath = new Path(filePath + Path.SEPARATOR + seenPartFileName);
             }
 
-            if (fs.getFileStatus(activeFilePath).getLen() > maxLength) {
+            if (activePath != null && fs.getFileStatus(activePath).getLen() > maxLength) {
               //Handle the case when restoring to a checkpoint where the current rolling file
               //already has a length greater than max length.
               LOG.debug("rotating file at setup.");
@@ -1143,8 +1149,12 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
     for (FileStatus status : statuses) {
       String statusName = status.getPath().getName();
       if (statusName.endsWith(TMP_EXTENSION) && statusName.startsWith(fileName)) {
-        LOG.debug("deleting vagrant file {}", statusName);
-        fs.delete(status.getPath(), true);
+        //a tmp file has tmp extension always preceded by timestamp
+        String actualFileName = statusName.substring(0, statusName.lastIndexOf('.', statusName.lastIndexOf('.') - 1));
+        if (fileName.equals(actualFileName)) {
+          LOG.debug("deleting vagrant file {}", statusName);
+          fs.delete(status.getPath(), true);
+        }
       }
     }
   }


[2/2] incubator-apex-malhar git commit: Merge branch 'devel-3' of github.com:chandnisingh/incubator-apex-malhar into devel-3

Posted by pr...@apache.org.
Merge branch 'devel-3' of github.com:chandnisingh/incubator-apex-malhar into devel-3


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/30e4b1ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/30e4b1ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/30e4b1ff

Branch: refs/heads/devel-3
Commit: 30e4b1ff6758d7cb2b91e9d11f585fd8e5bf78b8
Parents: cdc8b28 c6da4a0
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Thu Sep 3 21:42:07 2015 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Thu Sep 3 21:42:07 2015 -0700

----------------------------------------------------------------------
 .../lib/io/fs/AbstractFileOutputOperator.java   | 36 +++++++++++++-------
 1 file changed, 23 insertions(+), 13 deletions(-)
----------------------------------------------------------------------