You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/12 23:03:53 UTC

[2/9] git commit: Distributed Cache general purpose copy method, thread safety

Distributed Cache general purpose copy method, thread safety


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/1dd0acd4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/1dd0acd4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/1dd0acd4

Branch: refs/heads/release-0.5.1
Commit: 1dd0acd4d572b286f6afbb9562acffb3fdc20217
Parents: e9f4225
Author: zentol <s....@web.de>
Authored: Fri Jun 6 23:44:04 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 12 20:47:45 2014 +0200

----------------------------------------------------------------------
 .../pact/runtime/cache/FileCache.java           | 58 ++++++++++++--------
 1 file changed, 34 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1dd0acd4/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
index 6a38cd8..882c3a9 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
@@ -49,6 +49,8 @@ public class FileCache {
 
 	private LocalFileSystem lfs = new LocalFileSystem();
 
+	private static final Object lock = new Object();
+
 	private Map<Pair<JobID, String>, Integer> count = new HashMap<Pair<JobID,String>, Integer>();
 
 	private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10, ExecutorThreadFactory.INSTANCE);
@@ -102,6 +104,35 @@ public class FileCache {
 		}
 	}
 
+	public static void copy(Path sourcePath, Path targetPath, boolean executable) throws IOException {
+		FileSystem sFS = sourcePath.getFileSystem();
+		FileSystem tFS = targetPath.getFileSystem();
+		if (!tFS.exists(targetPath)) {
+			if (sFS.getFileStatus(sourcePath).isDir()) {
+				tFS.mkdirs(targetPath);
+				FileStatus[] contents = sFS.listStatus(sourcePath);
+				for (FileStatus content : contents) {
+					String distPath = content.getPath().toString();
+					if (content.isDir()) {
+						if (distPath.endsWith("/")) {
+							distPath = distPath.substring(0, distPath.length() - 1);
+						}
+					}
+					String localPath = targetPath.toString() + distPath.substring(distPath.lastIndexOf("/"));
+					copy(content.getPath(), new Path(localPath), executable);
+				}
+			} else {
+				try {
+					FSDataOutputStream lfsOutput = tFS.create(targetPath, false);
+					FSDataInputStream fsInput = sFS.open(sourcePath);
+					IOUtils.copyBytes(fsInput, lfsOutput);
+					new File(targetPath.toString()).setExecutable(executable);
+				} catch (IOException ioe) {
+				}
+			}
+		}
+	}
+
 	/**
 	 * Asynchronous file copy process
 	 */
@@ -121,35 +152,14 @@ public class FileCache {
 		public Path call()  {
 			Path tmp = getTempDir(jobID, filePath.substring(filePath.lastIndexOf("/") + 1));
 			try {
-				create(new Path(filePath), tmp);
+				synchronized (lock) {
+					copy(new Path(filePath), tmp, this.executable);
+				}
 			} catch (IOException e1) {
 				throw new RuntimeException("Error copying a file from hdfs to the local fs", e1);
 			}
 			return tmp;
 		}
-		
-		private void create(Path distributedFilePath, Path localFilePath) throws IOException {
-			if (!lfs.exists(localFilePath)) {
-				FileSystem dfs = distributedFilePath.getFileSystem();
-				if (dfs.getFileStatus(distributedFilePath).isDir()) {
-					lfs.mkdirs(localFilePath);
-					FileStatus[] contents = dfs.listStatus(distributedFilePath);
-					for (FileStatus content : contents) {
-						String distPath = content.getPath().toString();
-						if (content.isDir()){
-							distPath = distPath.substring(0,distPath.length() - 1);
-						}
-						String localPath = localFilePath.toString() + distPath.substring(distPath.lastIndexOf("/"));
-						create(content.getPath(), new Path(localPath));
-					}
-				} else {
-					FSDataOutputStream lfsOutput = lfs.create(localFilePath, false);
-					FSDataInputStream fsInput = dfs.open(distributedFilePath);
-					IOUtils.copyBytes(fsInput, lfsOutput);
-					new File(localFilePath.toString()).setExecutable(executable);
-				}
-			}
-		}
 	}
 
 	/**