You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/12 23:03:53 UTC
[2/9] git commit: Distributed Cache general purpose copy method,
thread safety
Distributed Cache general purpose copy method, thread safety
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/1dd0acd4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/1dd0acd4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/1dd0acd4
Branch: refs/heads/release-0.5.1
Commit: 1dd0acd4d572b286f6afbb9562acffb3fdc20217
Parents: e9f4225
Author: zentol <s....@web.de>
Authored: Fri Jun 6 23:44:04 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Jun 12 20:47:45 2014 +0200
----------------------------------------------------------------------
.../pact/runtime/cache/FileCache.java | 58 ++++++++++++--------
1 file changed, 34 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1dd0acd4/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
index 6a38cd8..882c3a9 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/cache/FileCache.java
@@ -49,6 +49,8 @@ public class FileCache {
private LocalFileSystem lfs = new LocalFileSystem();
+ private static final Object lock = new Object();
+
private Map<Pair<JobID, String>, Integer> count = new HashMap<Pair<JobID,String>, Integer>();
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10, ExecutorThreadFactory.INSTANCE);
@@ -102,6 +104,35 @@ public class FileCache {
}
}
+ public static void copy(Path sourcePath, Path targetPath, boolean executable) throws IOException {
+ FileSystem sFS = sourcePath.getFileSystem();
+ FileSystem tFS = targetPath.getFileSystem();
+ if (!tFS.exists(targetPath)) {
+ if (sFS.getFileStatus(sourcePath).isDir()) {
+ tFS.mkdirs(targetPath);
+ FileStatus[] contents = sFS.listStatus(sourcePath);
+ for (FileStatus content : contents) {
+ String distPath = content.getPath().toString();
+ if (content.isDir()) {
+ if (distPath.endsWith("/")) {
+ distPath = distPath.substring(0, distPath.length() - 1);
+ }
+ }
+ String localPath = targetPath.toString() + distPath.substring(distPath.lastIndexOf("/"));
+ copy(content.getPath(), new Path(localPath), executable);
+ }
+ } else {
+ try {
+ FSDataOutputStream lfsOutput = tFS.create(targetPath, false);
+ FSDataInputStream fsInput = sFS.open(sourcePath);
+ IOUtils.copyBytes(fsInput, lfsOutput);
+ new File(targetPath.toString()).setExecutable(executable);
+ } catch (IOException ioe) {
+ }
+ }
+ }
+ }
+
/**
* Asynchronous file copy process
*/
@@ -121,35 +152,14 @@ public class FileCache {
public Path call() {
Path tmp = getTempDir(jobID, filePath.substring(filePath.lastIndexOf("/") + 1));
try {
- create(new Path(filePath), tmp);
+ synchronized (lock) {
+ copy(new Path(filePath), tmp, this.executable);
+ }
} catch (IOException e1) {
throw new RuntimeException("Error copying a file from hdfs to the local fs", e1);
}
return tmp;
}
-
- private void create(Path distributedFilePath, Path localFilePath) throws IOException {
- if (!lfs.exists(localFilePath)) {
- FileSystem dfs = distributedFilePath.getFileSystem();
- if (dfs.getFileStatus(distributedFilePath).isDir()) {
- lfs.mkdirs(localFilePath);
- FileStatus[] contents = dfs.listStatus(distributedFilePath);
- for (FileStatus content : contents) {
- String distPath = content.getPath().toString();
- if (content.isDir()){
- distPath = distPath.substring(0,distPath.length() - 1);
- }
- String localPath = localFilePath.toString() + distPath.substring(distPath.lastIndexOf("/"));
- create(content.getPath(), new Path(localPath));
- }
- } else {
- FSDataOutputStream lfsOutput = lfs.create(localFilePath, false);
- FSDataInputStream fsInput = dfs.open(distributedFilePath);
- IOUtils.copyBytes(fsInput, lfsOutput);
- new File(localFilePath.toString()).setExecutable(executable);
- }
- }
- }
}
/**