You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/15 09:21:18 UTC
[12/18] ignite git commit: IGNITE-4426: Hadoop: tasks can share the
same classloader. This closes #1344.
IGNITE-4426: Hadoop: tasks can share the same classloader. This closes #1344.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cb1fd826
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cb1fd826
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cb1fd826
Branch: refs/heads/ignite-4371
Commit: cb1fd82698fc814d3a9b5b3b1792556de3a24501
Parents: 96160ad
Author: devozerov <vo...@gridgain.com>
Authored: Wed Dec 14 14:35:29 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Wed Dec 14 14:35:29 2016 +0300
----------------------------------------------------------------------
.../processors/hadoop/HadoopClassLoader.java | 10 +++++++++
.../processors/hadoop/HadoopJobProperty.java | 8 +++++++
.../processors/hadoop/impl/v2/HadoopV2Job.java | 22 ++++++++++++++++++--
3 files changed, 38 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/cb1fd826/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
index cd94c89..f6c2fa9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
@@ -101,6 +101,16 @@ public class HadoopClassLoader extends URLClassLoader implements ClassCache {
}
/**
+ * Classloader name for job.
+ *
+ * @param jobId Job ID.
+ * @return Name.
+ */
+ public static String nameForJob(HadoopJobId jobId) {
+ return "hadoop-job-" + jobId;
+ }
+
+ /**
* Gets name for the task class loader. Task class loader
* @param info The task info.
* @param prefix Get only prefix (without task type and number)
http://git-wip-us.apache.org/repos/asf/ignite/blob/cb1fd826/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
index 1f0ef1b..9e1dede 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
@@ -57,6 +57,13 @@ public enum HadoopJobProperty {
JOB_STATUS_POLL_DELAY("ignite.job.status.poll.delay"),
/**
+ * Whether job classloader can be shared between all tasks.
+ * <p>
+ * Defaults to {@code true}.
+ */
+ JOB_SHARED_CLASSLOADER("ignite.job.shared.classloader"),
+
+ /**
* Size in bytes of single memory page which will be allocated for data structures in shuffle.
* <p>
* By default is {@code 32 * 1024}.
@@ -105,6 +112,7 @@ public enum HadoopJobProperty {
*/
SHUFFLE_JOB_THROTTLE("ignite.shuffle.job.throttle");
+
/** Property name. */
private final String propName;
http://git-wip-us.apache.org/repos/asf/ignite/blob/cb1fd826/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
index 36da410..a24e581 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
import org.apache.ignite.internal.processors.hadoop.HadoopJob;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
@@ -73,6 +74,7 @@ import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
+import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.JOB_SHARED_CLASSLOADER;
import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.jobLocalDir;
import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.taskLocalDir;
import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.transformException;
@@ -121,6 +123,9 @@ public class HadoopV2Job implements HadoopJob {
/** File system cache map. */
private final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fsMap = createHadoopLazyConcurrentMap();
+ /** Shared class loader. */
+ private volatile HadoopClassLoader sharedClsLdr;
+
/** Local node ID */
private volatile UUID locNodeId;
@@ -261,8 +266,8 @@ public class HadoopV2Job implements HadoopJob {
// If there is no pooled class, then load new one.
// Note that the classloader identified by the task it was initially created for,
// but later it may be reused for other tasks.
- HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath(),
- HadoopClassLoader.nameForTask(info, false), libNames, helper);
+ HadoopClassLoader ldr = sharedClsLdr != null ?
+ sharedClsLdr : createClassLoader(HadoopClassLoader.nameForTask(info, false));
cls = (Class<? extends HadoopTaskContext>)ldr.loadClass(HadoopV2TaskContext.class.getName());
@@ -312,6 +317,9 @@ public class HadoopV2Job implements HadoopJob {
try {
rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(igniteWorkDirectory(), locNodeId, jobId));
+
+ if (HadoopJobProperty.get(jobInfo, JOB_SHARED_CLASSLOADER, true))
+ sharedClsLdr = createClassLoader(HadoopClassLoader.nameForJob(jobId));
}
finally {
HadoopCommonUtils.restoreContextClassLoader(oldLdr);
@@ -454,4 +462,14 @@ public class HadoopV2Job implements HadoopJob {
public FileSystem fileSystem(@Nullable URI uri, Configuration cfg) throws IOException {
return fileSystemForMrUserWithCaching(uri, cfg, fsMap);
}
+
+ /**
+ * Create class loader with the given name.
+ *
+ * @param name Name.
+ * @return Class loader.
+ */
+ private HadoopClassLoader createClassLoader(String name) {
+ return new HadoopClassLoader(rsrcMgr.classPath(), name, libNames, helper);
+ }
}
\ No newline at end of file