You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/10 00:13:14 UTC

[43/50] incubator-apex-core git commit: APEX-54 #resolve Added code to copy from local file to HDFS with overwrite option for AsyncFSStorageAgent

APEX-54 #resolve Added code to copy from local file to HDFS with overwrite option for AsyncFSStorageAgent


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/be4af0af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/be4af0af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/be4af0af

Branch: refs/heads/master
Commit: be4af0af1fe9800c690e98f770c1059ac2168143
Parents: 9d08532
Author: ishark <is...@datatorrent.com>
Authored: Mon Aug 17 18:28:47 2015 -0700
Committer: ishark <is...@datatorrent.com>
Committed: Fri Aug 21 14:28:40 2015 -0700

----------------------------------------------------------------------
 .../common/util/AsyncFSStorageAgent.java        | 50 +++++++++++++++-----
 1 file changed, 39 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/be4af0af/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
index d5de61c..f6077a7 100644
--- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
@@ -15,19 +15,16 @@
  */
 package com.datatorrent.common.util;
 
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.ObjectStreamException;
+import java.io.*;
+import java.util.EnumSet;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.netlet.util.DTThrowable;
 public class AsyncFSStorageAgent extends FSStorageAgent
 {
   private final transient FileSystem fs;
@@ -85,9 +82,40 @@ public class AsyncFSStorageAgent extends FSStorageAgent
     String operatorIdStr = String.valueOf(operatorId);
     File directory = new File(localBasePath, operatorIdStr);
     String window = Long.toHexString(windowId);
-    Path lPath = new Path(path + Path.SEPARATOR + operatorIdStr + Path.SEPARATOR + System.currentTimeMillis() + TMP_FILE);
-    FileUtil.copy(new File(directory, String.valueOf(windowId)), fs, lPath, true, conf);
-    fileContext.rename(lPath, new Path(path + Path.SEPARATOR + operatorIdStr + Path.SEPARATOR + window), Options.Rename.OVERWRITE);
+    Path lPath = new Path(path + Path.SEPARATOR + operatorIdStr + Path.SEPARATOR + TMP_FILE);
+    File srcFile = new File(directory, String.valueOf(windowId));
+    FSDataOutputStream stream = null;
+    boolean stateSaved = false;
+    try {
+      // Create the temporary file with OverWrite option to avoid dangling lease issue and avoid exception if file already exists
+      stream = fileContext.create(lPath, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), Options.CreateOpts.CreateParent.createParent());
+      InputStream in = null;
+      try {
+        in = new FileInputStream(srcFile);
+        IOUtils.copyBytes(in, stream, conf, false);
+      } finally {
+        IOUtils.closeStream(in);
+      }
+      stateSaved = true;
+    } catch (Throwable t) {
+      logger.debug("while saving {} {}", operatorId, window, t);
+      stateSaved = false;
+      DTThrowable.rethrow(t);
+    } finally {
+      try {
+        if (stream != null) {
+          stream.close();
+        }
+      } catch (IOException ie) {
+        stateSaved = false;
+        throw new RuntimeException(ie);
+      } finally {
+        if (stateSaved) {
+          fileContext.rename(lPath, new Path(path + Path.SEPARATOR + operatorIdStr + Path.SEPARATOR + window), Options.Rename.OVERWRITE);
+        }
+        FileUtil.fullyDelete(srcFile);
+      }
+    }
   }
 
   @Override