You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2016/12/20 01:05:58 UTC

crunch git commit: Quick and Dirty Workaround for Crunch DistCache

Repository: crunch
Updated Branches:
  refs/heads/master c14acfab0 -> 901d0644d


Quick and Dirty Workaround for Crunch DistCache

Signed-off-by: Micah Whitacre <mk...@gmail.com>
Signed-off-by: Josh Wills <jw...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/901d0644
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/901d0644
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/901d0644

Branch: refs/heads/master
Commit: 901d0644dcaec5309670b7a2eeff228cea2c7767
Parents: c14acfa
Author: Dimitry Goldin <dm...@spotify.com>
Authored: Fri Oct 14 18:39:41 2016 +0200
Committer: Josh Wills <jw...@apache.org>
Committed: Mon Dec 19 16:44:08 2016 -0800

----------------------------------------------------------------------
 .../crunch/impl/mr/run/CrunchTaskContext.java   |  3 +-
 .../java/org/apache/crunch/util/DistCache.java  | 42 +++++++++++---------
 2 files changed, 24 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/901d0644/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
index b81df05..0eb246a 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
@@ -40,8 +40,7 @@ class CrunchTaskContext {
     this.taskContext = taskContext;
     this.nodeContext = nodeContext;
     Configuration conf = taskContext.getConfiguration();
-    Path path = new Path(new Path(conf.get(PlanningParameters.CRUNCH_WORKING_DIRECTORY)),
-        nodeContext.toString());
+    Path path = new Path(nodeContext.toString());
     try {
       this.nodes = (List<RTNode>) DistCache.read(conf, path);
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/901d0644/crunch-core/src/main/java/org/apache/crunch/util/DistCache.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/util/DistCache.java b/crunch-core/src/main/java/org/apache/crunch/util/DistCache.java
index 046f038..ad26a67 100644
--- a/crunch-core/src/main/java/org/apache/crunch/util/DistCache.java
+++ b/crunch-core/src/main/java/org/apache/crunch/util/DistCache.java
@@ -21,7 +21,6 @@ import java.io.File;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.net.URI;
 import java.net.URL;
 import java.net.URLDecoder;
 import java.util.Enumeration;
@@ -67,27 +66,32 @@ public class DistCache {
     DistributedCache.addCacheFile(path.toUri(), conf);
   }
 
-  public static Object read(Configuration conf, Path path) throws IOException {
-    URI target = null;
-    for (URI uri : DistributedCache.getCacheFiles(conf)) {
-      if (uri.toString().equals(path.toString())) {
-        target = uri;
-        break;
-      }
+  public static Object read(Configuration conf, Path requestedFile) throws IOException {
+    FileSystem localFs = FileSystem.getLocal(conf);
+
+    Path cachedPath = null;
+
+    try {
+      cachedPath = getPathToCacheFile(requestedFile, conf);
+    } catch (CrunchRuntimeException cre) {
+      throw new IOException("Can not determine cached location for " + requestedFile.toString(), cre);
     }
-    Object value = null;
-    if (target != null) {
-      Path targetPath = new Path(target.toString());
-      ObjectInputStream ois = new ClassloaderFallbackObjectInputStream(
-          targetPath.getFileSystem(conf).open(targetPath));
-      try {
-        value = ois.readObject();
-      } catch (ClassNotFoundException e) {
-        throw new CrunchRuntimeException(e);
+
+    if(cachedPath == null || !localFs.exists(cachedPath)) {
+      throw new IOException("Expected file with path: " + requestedFile.toString() + " to be cached");
+    }
+
+    ObjectInputStream ois = null;
+    try {
+      ois = new ObjectInputStream(localFs.open(cachedPath));
+      return ois.readObject();
+    } catch (ClassNotFoundException e) {
+      throw new CrunchRuntimeException(e);
+    } finally {
+      if (ois != null) {
+        ois.close();
       }
-      ois.close();
     }
-    return value;
   }
 
   public static void addCacheFile(Path path, Configuration conf) {