You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2015/09/04 07:00:58 UTC
[1/2] incubator-apex-malhar git commit: MLHR-1825 #comment handling
case when no open part is updated but stream is not created
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/devel-3 cdc8b28af -> 30e4b1ff6
MLHR-1825 #comment handling case when no open part is updated but stream is not created
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/c6da4a05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/c6da4a05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/c6da4a05
Branch: refs/heads/devel-3
Commit: c6da4a05e3df215a39eb06752e31760bb206528b
Parents: 91321ce
Author: Chandni Singh <cs...@apache.org>
Authored: Tue Sep 1 15:57:10 2015 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Wed Sep 2 13:48:12 2015 -0700
----------------------------------------------------------------------
.../lib/io/fs/AbstractFileOutputOperator.java | 36 +++++++++++++-------
1 file changed, 23 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c6da4a05/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
index 3f45afb..a589751 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
@@ -484,34 +484,40 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
//of this operator.
for(String seenFileName: endOffsets.keySet()) {
try {
- Integer part = openPart.get(seenFileName).getValue() + 1;
+ Integer fileOpenPart = this.openPart.get(seenFileName).getValue();
+ int nextPart = fileOpenPart + 1;
String seenPartFileName;
while (true) {
- seenPartFileName = getPartFileName(seenFileName, part);
- Path activePath;
+ seenPartFileName = getPartFileName(seenFileName, nextPart);
+ Path activePath = null;
if (alwaysWriteToTmp) {
String tmpFileName = fileNameToTmpName.get(seenPartFileName);
- activePath = new Path(filePath + Path.SEPARATOR + tmpFileName);
+ if (tmpFileName != null) {
+ activePath = new Path(filePath + Path.SEPARATOR + tmpFileName);
+ }
} else {
activePath = new Path(filePath + Path.SEPARATOR + seenPartFileName);
}
- if (!fs.exists(activePath)) {
+ if (activePath == null || !fs.exists(activePath)) {
break;
}
fs.delete(activePath, true);
- part = part + 1;
+ nextPart++;
}
- seenPartFileName = getPartFileName(seenFileName, openPart.get(seenFileName).intValue());
- Path activeFilePath;
+ seenPartFileName = getPartFileName(seenFileName, fileOpenPart);
+ Path activePath = null;
if (alwaysWriteToTmp) {
- activeFilePath = new Path(filePath + Path.SEPARATOR + fileNameToTmpName.get(seenPartFileName));
+ String tmpFileName = fileNameToTmpName.get(seenPartFileName);
+ if (tmpFileName != null) {
+ activePath = new Path(filePath + Path.SEPARATOR + fileNameToTmpName.get(seenPartFileName));
+ }
} else {
- activeFilePath = new Path(filePath + Path.SEPARATOR + seenPartFileName);
+ activePath = new Path(filePath + Path.SEPARATOR + seenPartFileName);
}
- if (fs.getFileStatus(activeFilePath).getLen() > maxLength) {
+ if (activePath != null && fs.getFileStatus(activePath).getLen() > maxLength) {
//Handle the case when restoring to a checkpoint where the current rolling file
//already has a length greater than max length.
LOG.debug("rotating file at setup.");
@@ -1143,8 +1149,12 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
for (FileStatus status : statuses) {
String statusName = status.getPath().getName();
if (statusName.endsWith(TMP_EXTENSION) && statusName.startsWith(fileName)) {
- LOG.debug("deleting vagrant file {}", statusName);
- fs.delete(status.getPath(), true);
+ //a tmp file has tmp extension always preceded by timestamp
+ String actualFileName = statusName.substring(0, statusName.lastIndexOf('.', statusName.lastIndexOf('.') - 1));
+ if (fileName.equals(actualFileName)) {
+ LOG.debug("deleting vagrant file {}", statusName);
+ fs.delete(status.getPath(), true);
+ }
}
}
}
[2/2] incubator-apex-malhar git commit: Merge branch 'devel-3' of
github.com:chandnisingh/incubator-apex-malhar into devel-3
Posted by pr...@apache.org.
Merge branch 'devel-3' of github.com:chandnisingh/incubator-apex-malhar into devel-3
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/30e4b1ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/30e4b1ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/30e4b1ff
Branch: refs/heads/devel-3
Commit: 30e4b1ff6758d7cb2b91e9d11f585fd8e5bf78b8
Parents: cdc8b28 c6da4a0
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Thu Sep 3 21:42:07 2015 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Thu Sep 3 21:42:07 2015 -0700
----------------------------------------------------------------------
.../lib/io/fs/AbstractFileOutputOperator.java | 36 +++++++++++++-------
1 file changed, 23 insertions(+), 13 deletions(-)
----------------------------------------------------------------------