You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2016/12/20 01:05:58 UTC
crunch git commit: Quick and Dirty Workaround for Crunch DistCache
Repository: crunch
Updated Branches:
refs/heads/master c14acfab0 -> 901d0644d
Quick and Dirty Workaround for Crunch DistCache
Signed-off-by: Micah Whitacre <mk...@gmail.com>
Signed-off-by: Josh Wills <jw...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/901d0644
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/901d0644
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/901d0644
Branch: refs/heads/master
Commit: 901d0644dcaec5309670b7a2eeff228cea2c7767
Parents: c14acfa
Author: Dimitry Goldin <dm...@spotify.com>
Authored: Fri Oct 14 18:39:41 2016 +0200
Committer: Josh Wills <jw...@apache.org>
Committed: Mon Dec 19 16:44:08 2016 -0800
----------------------------------------------------------------------
.../crunch/impl/mr/run/CrunchTaskContext.java | 3 +-
.../java/org/apache/crunch/util/DistCache.java | 42 +++++++++++---------
2 files changed, 24 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/901d0644/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
index b81df05..0eb246a 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
@@ -40,8 +40,7 @@ class CrunchTaskContext {
this.taskContext = taskContext;
this.nodeContext = nodeContext;
Configuration conf = taskContext.getConfiguration();
- Path path = new Path(new Path(conf.get(PlanningParameters.CRUNCH_WORKING_DIRECTORY)),
- nodeContext.toString());
+ Path path = new Path(nodeContext.toString());
try {
this.nodes = (List<RTNode>) DistCache.read(conf, path);
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/crunch/blob/901d0644/crunch-core/src/main/java/org/apache/crunch/util/DistCache.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/util/DistCache.java b/crunch-core/src/main/java/org/apache/crunch/util/DistCache.java
index 046f038..ad26a67 100644
--- a/crunch-core/src/main/java/org/apache/crunch/util/DistCache.java
+++ b/crunch-core/src/main/java/org/apache/crunch/util/DistCache.java
@@ -21,7 +21,6 @@ import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-import java.net.URI;
import java.net.URL;
import java.net.URLDecoder;
import java.util.Enumeration;
@@ -67,27 +66,32 @@ public class DistCache {
DistributedCache.addCacheFile(path.toUri(), conf);
}
- public static Object read(Configuration conf, Path path) throws IOException {
- URI target = null;
- for (URI uri : DistributedCache.getCacheFiles(conf)) {
- if (uri.toString().equals(path.toString())) {
- target = uri;
- break;
- }
+ public static Object read(Configuration conf, Path requestedFile) throws IOException {
+ FileSystem localFs = FileSystem.getLocal(conf);
+
+ Path cachedPath = null;
+
+ try {
+ cachedPath = getPathToCacheFile(requestedFile, conf);
+ } catch (CrunchRuntimeException cre) {
+ throw new IOException("Can not determine cached location for " + requestedFile.toString(), cre);
}
- Object value = null;
- if (target != null) {
- Path targetPath = new Path(target.toString());
- ObjectInputStream ois = new ClassloaderFallbackObjectInputStream(
- targetPath.getFileSystem(conf).open(targetPath));
- try {
- value = ois.readObject();
- } catch (ClassNotFoundException e) {
- throw new CrunchRuntimeException(e);
+
+ if(cachedPath == null || !localFs.exists(cachedPath)) {
+ throw new IOException("Expected file with path: " + requestedFile.toString() + " to be cached");
+ }
+
+ ObjectInputStream ois = null;
+ try {
+ ois = new ObjectInputStream(localFs.open(cachedPath));
+ return ois.readObject();
+ } catch (ClassNotFoundException e) {
+ throw new CrunchRuntimeException(e);
+ } finally {
+ if (ois != null) {
+ ois.close();
}
- ois.close();
}
- return value;
}
public static void addCacheFile(Path path, Configuration conf) {