You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/10 00:13:14 UTC
[43/50] incubator-apex-core git commit: APEX-54 #resolve Added code
to copy from local file to HDFS with overwrite option for AsyncFSStorageAgent
APEX-54 #resolve Added code to copy from local file to HDFS with overwrite option for AsyncFSStorageAgent
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/be4af0af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/be4af0af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/be4af0af
Branch: refs/heads/master
Commit: be4af0af1fe9800c690e98f770c1059ac2168143
Parents: 9d08532
Author: ishark <is...@datatorrent.com>
Authored: Mon Aug 17 18:28:47 2015 -0700
Committer: ishark <is...@datatorrent.com>
Committed: Fri Aug 21 14:28:40 2015 -0700
----------------------------------------------------------------------
.../common/util/AsyncFSStorageAgent.java | 50 +++++++++++++++-----
1 file changed, 39 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/be4af0af/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
index d5de61c..f6077a7 100644
--- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
@@ -15,19 +15,16 @@
*/
package com.datatorrent.common.util;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.ObjectStreamException;
+import java.io.*;
+import java.util.EnumSet;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datatorrent.netlet.util.DTThrowable;
public class AsyncFSStorageAgent extends FSStorageAgent
{
private final transient FileSystem fs;
@@ -85,9 +82,40 @@ public class AsyncFSStorageAgent extends FSStorageAgent
String operatorIdStr = String.valueOf(operatorId);
File directory = new File(localBasePath, operatorIdStr);
String window = Long.toHexString(windowId);
- Path lPath = new Path(path + Path.SEPARATOR + operatorIdStr + Path.SEPARATOR + System.currentTimeMillis() + TMP_FILE);
- FileUtil.copy(new File(directory, String.valueOf(windowId)), fs, lPath, true, conf);
- fileContext.rename(lPath, new Path(path + Path.SEPARATOR + operatorIdStr + Path.SEPARATOR + window), Options.Rename.OVERWRITE);
+ Path lPath = new Path(path + Path.SEPARATOR + operatorIdStr + Path.SEPARATOR + TMP_FILE);
+ File srcFile = new File(directory, String.valueOf(windowId));
+ FSDataOutputStream stream = null;
+ boolean stateSaved = false;
+ try {
+ // Create the temporary file with OverWrite option to avoid dangling lease issue and avoid exception if file already exists
+ stream = fileContext.create(lPath, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), Options.CreateOpts.CreateParent.createParent());
+ InputStream in = null;
+ try {
+ in = new FileInputStream(srcFile);
+ IOUtils.copyBytes(in, stream, conf, false);
+ } finally {
+ IOUtils.closeStream(in);
+ }
+ stateSaved = true;
+ } catch (Throwable t) {
+ logger.debug("while saving {} {}", operatorId, window, t);
+ stateSaved = false;
+ DTThrowable.rethrow(t);
+ } finally {
+ try {
+ if (stream != null) {
+ stream.close();
+ }
+ } catch (IOException ie) {
+ stateSaved = false;
+ throw new RuntimeException(ie);
+ } finally {
+ if (stateSaved) {
+ fileContext.rename(lPath, new Path(path + Path.SEPARATOR + operatorIdStr + Path.SEPARATOR + window), Options.Rename.OVERWRITE);
+ }
+ FileUtil.fullyDelete(srcFile);
+ }
+ }
}
@Override