You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by yo...@apache.org on 2016/10/03 15:07:22 UTC

apex-malhar git commit: APEXMALHAR-2226 Fixed the Not supported exception while re-deploying the AbstractFileOutput Operator

Repository: apex-malhar
Updated Branches:
  refs/heads/master 12d6183cf -> 1333910fd


APEXMALHAR-2226 Fixed the Not supported exception while re-deploying the AbstractFileOutput Operator


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/1333910f
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/1333910f
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/1333910f

Branch: refs/heads/master
Commit: 1333910fd8b8a79e74a503fc0a9fa337e93beba0
Parents: 12d6183
Author: chaitanya <ch...@apache.org>
Authored: Thu Sep 29 12:18:37 2016 +0530
Committer: chaitanya <ch...@apache.org>
Committed: Mon Oct 3 16:52:38 2016 +0530

----------------------------------------------------------------------
 .../lib/io/fs/AbstractFileOutputOperator.java   | 45 +++++++++++++++++++-
 1 file changed, 44 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1333910f/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
index 0195f7f..ab8bedc 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
@@ -116,6 +116,8 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
 
   private static final String TMP_EXTENSION = ".tmp";
 
+  private static final String APPEND_TMP_FILE = "_APPENDING";
+
   private static final int MAX_NUMBER_FILES_IN_TEARDOWN_EXCEPTION = 25;
 
   /**
@@ -636,7 +638,7 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
   {
     FSDataOutputStream fsOutput;
     if (append) {
-      fsOutput = fs.append(filepath);
+      fsOutput = openStreamInAppendMode(filepath);
     } else {
       fsOutput = fs.create(filepath, (short)replication);
       fs.setPermission(filepath, FsPermission.createImmutable(filePermission));
@@ -645,6 +647,47 @@ public abstract class AbstractFileOutputOperator<INPUT> extends BaseOperator imp
   }
 
   /**
+   * Opens the stream for the given file path in append mode. Catch the exception if the FS doesnt support
+   * append operation and calls the openStreamForNonAppendFS().
+   * @param filepath given file path
+   * @return output stream
+   */
+  protected FSDataOutputStream openStreamInAppendMode(Path filepath)
+  {
+    FSDataOutputStream fsOutput = null;
+    try {
+      fsOutput = fs.append(filepath);
+    } catch (IOException e) {
+      if (e.getMessage().equals("Not supported")) {
+        fsOutput = openStreamForNonAppendFS(filepath);
+      }
+    }
+    return fsOutput;
+  }
+
+  /**
+   * Opens the stream for the given file path for the file systems which are not supported append operation.
+   * @param filepath given file path
+   * @return output stream
+   */
+  protected FSDataOutputStream openStreamForNonAppendFS(Path filepath)
+  {
+    try {
+      Path appendTmpFile = new Path(filepath + APPEND_TMP_FILE);
+      fs.rename(filepath, appendTmpFile);
+      FSDataInputStream fsIn = fs.open(appendTmpFile);
+      FSDataOutputStream fsOut = fs.create(filepath);
+      IOUtils.copy(fsIn, fsOut);
+      flush(fsOut);
+      fsIn.close();
+      fs.delete(appendTmpFile);
+      return fsOut;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
    * Closes the stream which has been removed from the cache.
    *
    * @param streamContext stream context which is removed from the cache.