You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/16 11:42:05 UTC

[36/49] ignite git commit: IGNITE-4426: Hadoop: tasks can share the same classloader. This closes #1344.

IGNITE-4426: Hadoop: tasks can share the same classloader. This closes #1344.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cb1fd826
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cb1fd826
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cb1fd826

Branch: refs/heads/ignite-2.0
Commit: cb1fd82698fc814d3a9b5b3b1792556de3a24501
Parents: 96160ad
Author: devozerov <vo...@gridgain.com>
Authored: Wed Dec 14 14:35:29 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Wed Dec 14 14:35:29 2016 +0300

----------------------------------------------------------------------
 .../processors/hadoop/HadoopClassLoader.java    | 10 +++++++++
 .../processors/hadoop/HadoopJobProperty.java    |  8 +++++++
 .../processors/hadoop/impl/v2/HadoopV2Job.java  | 22 ++++++++++++++++++--
 3 files changed, 38 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/cb1fd826/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
index cd94c89..f6c2fa9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
@@ -101,6 +101,16 @@ public class HadoopClassLoader extends URLClassLoader implements ClassCache {
     }
 
     /**
+     * Classloader name for job.
+     *
+     * @param jobId Job ID.
+     * @return Name.
+     */
+    public static String nameForJob(HadoopJobId jobId) {
+        return "hadoop-job-" + jobId;
+    }
+
+    /**
      * Gets name for the task class loader. Task class loader
      * @param info The task info.
      * @param prefix Get only prefix (without task type and number)

http://git-wip-us.apache.org/repos/asf/ignite/blob/cb1fd826/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
index 1f0ef1b..9e1dede 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
@@ -57,6 +57,13 @@ public enum HadoopJobProperty {
     JOB_STATUS_POLL_DELAY("ignite.job.status.poll.delay"),
 
     /**
+     * Whether job classloader can be shared between all tasks.
+     * <p>
+     * Defaults to {@code true}.
+     */
+    JOB_SHARED_CLASSLOADER("ignite.job.shared.classloader"),
+
+    /**
      * Size in bytes of single memory page which will be allocated for data structures in shuffle.
      * <p>
      * By default is {@code 32 * 1024}.
@@ -105,6 +112,7 @@ public enum HadoopJobProperty {
      */
     SHUFFLE_JOB_THROTTLE("ignite.shuffle.job.throttle");
 
+
     /** Property name. */
     private final String propName;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/cb1fd826/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
index 36da410..a24e581 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
 import org.apache.ignite.internal.processors.hadoop.HadoopJob;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
@@ -73,6 +74,7 @@ import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 
+import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.JOB_SHARED_CLASSLOADER;
 import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.jobLocalDir;
 import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.taskLocalDir;
 import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.transformException;
@@ -121,6 +123,9 @@ public class HadoopV2Job implements HadoopJob {
     /** File system cache map. */
     private final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fsMap = createHadoopLazyConcurrentMap();
 
+    /** Shared class loader. */
+    private volatile HadoopClassLoader sharedClsLdr;
+
     /** Local node ID */
     private volatile UUID locNodeId;
 
@@ -261,8 +266,8 @@ public class HadoopV2Job implements HadoopJob {
                 // If there is no pooled class, then load new one.
                 // Note that the classloader identified by the task it was initially created for,
                 // but later it may be reused for other tasks.
-                HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath(),
-                    HadoopClassLoader.nameForTask(info, false), libNames, helper);
+                HadoopClassLoader ldr = sharedClsLdr != null ?
+                    sharedClsLdr : createClassLoader(HadoopClassLoader.nameForTask(info, false));
 
                 cls = (Class<? extends HadoopTaskContext>)ldr.loadClass(HadoopV2TaskContext.class.getName());
 
@@ -312,6 +317,9 @@ public class HadoopV2Job implements HadoopJob {
 
         try {
             rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(igniteWorkDirectory(), locNodeId, jobId));
+
+            if (HadoopJobProperty.get(jobInfo, JOB_SHARED_CLASSLOADER, true))
+                sharedClsLdr = createClassLoader(HadoopClassLoader.nameForJob(jobId));
         }
         finally {
             HadoopCommonUtils.restoreContextClassLoader(oldLdr);
@@ -454,4 +462,14 @@ public class HadoopV2Job implements HadoopJob {
     public FileSystem fileSystem(@Nullable URI uri, Configuration cfg) throws IOException {
         return fileSystemForMrUserWithCaching(uri, cfg, fsMap);
     }
+
+    /**
+     * Create class loader with the given name.
+     *
+     * @param name Name.
+     * @return Class loader.
+     */
+    private HadoopClassLoader createClassLoader(String name) {
+        return new HadoopClassLoader(rsrcMgr.classPath(), name, libNames, helper);
+    }
 }
\ No newline at end of file