You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by yo...@apache.org on 2016/10/03 15:07:22 UTC
apex-malhar git commit: APEXMALHAR-2226 Fixed the Not supported
exception while re-deploying the AbstractFileOutput Operator
Repository: apex-malhar
Updated Branches:
refs/heads/master 12d6183cf -> 1333910fd
APEXMALHAR-2226 Fixed the Not supported exception while re-deploying the AbstractFileOutput Operator
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/1333910f
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/1333910f
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/1333910f
Branch: refs/heads/master
Commit: 1333910fd8b8a79e74a503fc0a9fa337e93beba0
Parents: 12d6183
Author: chaitanya <ch...@apache.org>
Authored: Thu Sep 29 12:18:37 2016 +0530
Committer: chaitanya <ch...@apache.org>
Committed: Mon Oct 3 16:52:38 2016 +0530
----------------------------------------------------------------------
.../lib/io/fs/AbstractFileOutputOperator.java | 45 +++++++++++++++++++-
1 file changed, 44 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1333910f/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
index 0195f7f..ab8bedc 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
@@ -116,6 +116,8 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
private static final String TMP_EXTENSION = ".tmp";
+ private static final String APPEND_TMP_FILE = "_APPENDING";
+
private static final int MAX_NUMBER_FILES_IN_TEARDOWN_EXCEPTION = 25;
/**
@@ -636,7 +638,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
{
FSDataOutputStream fsOutput;
if (append) {
- fsOutput = fs.append(filepath);
+ fsOutput = openStreamInAppendMode(filepath);
} else {
fsOutput = fs.create(filepath, (short)replication);
fs.setPermission(filepath, FsPermission.createImmutable(filePermission));
@@ -645,6 +647,47 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
}
/**
+ * Opens the stream for the given file path in append mode. Catch the exception if the FS doesnt support
+ * append operation and calls the openStreamForNonAppendFS().
+ * @param filepath given file path
+ * @return output stream
+ */
+ protected FSDataOutputStream openStreamInAppendMode(Path filepath)
+ {
+ FSDataOutputStream fsOutput = null;
+ try {
+ fsOutput = fs.append(filepath);
+ } catch (IOException e) {
+ if (e.getMessage().equals("Not supported")) {
+ fsOutput = openStreamForNonAppendFS(filepath);
+ }
+ }
+ return fsOutput;
+ }
+
+ /**
+ * Opens the stream for the given file path for the file systems which are not supported append operation.
+ * @param filepath given file path
+ * @return output stream
+ */
+ protected FSDataOutputStream openStreamForNonAppendFS(Path filepath)
+ {
+ try {
+ Path appendTmpFile = new Path(filepath + APPEND_TMP_FILE);
+ fs.rename(filepath, appendTmpFile);
+ FSDataInputStream fsIn = fs.open(appendTmpFile);
+ FSDataOutputStream fsOut = fs.create(filepath);
+ IOUtils.copy(fsIn, fsOut);
+ flush(fsOut);
+ fsIn.close();
+ fs.delete(appendTmpFile);
+ return fsOut;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
* Closes the stream which has been removed from the cache.
*
* @param streamContext stream context which is removed from the cache.