You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/14 19:44:30 UTC
ignite git commit: IGNITE-3902: Attempt to fix.
Repository: ignite
Updated Branches:
refs/heads/ignite-3902 [created] 587cc46af
IGNITE-3902: Attempt to fix.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/587cc46a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/587cc46a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/587cc46a
Branch: refs/heads/ignite-3902
Commit: 587cc46aff19e5b9c2cb07abc0799087acc16d00
Parents: 98914fe
Author: thatcoach <pp...@list.ru>
Authored: Wed Sep 14 22:44:07 2016 +0300
Committer: thatcoach <pp...@list.ru>
Committed: Wed Sep 14 22:44:07 2016 +0300
----------------------------------------------------------------------
.../hadoop/fs/BasicHadoopFileSystemFactory.java | 17 ++----
.../internal/processors/hadoop/HadoopUtils.java | 64 ++++++++++++++------
.../processors/hadoop/v2/HadoopV2Job.java | 47 +++++++-------
.../hadoop/v2/HadoopV2JobResourceManager.java | 26 ++++----
.../hadoop/v2/HadoopV2TaskContext.java | 30 ++++-----
5 files changed, 102 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/587cc46a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
index 06f76c3..a01bfaf 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
@@ -93,20 +93,13 @@ public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Ex
// FileSystem.get() might delegate to ServiceLoader to get the list of file system implementation.
// And ServiceLoader is known to be sensitive to context classloader. Therefore, we change context
// classloader to classloader of current class to avoid strange class-cast-exceptions.
- ClassLoader ctxClsLdr = Thread.currentThread().getContextClassLoader();
- ClassLoader clsLdr = getClass().getClassLoader();
+ ClassLoader oldLdr = HadoopUtils.setContextClassLoader(getClass().getClassLoader());
- if (ctxClsLdr == clsLdr)
+ try {
return create(usrName);
- else {
- Thread.currentThread().setContextClassLoader(clsLdr);
-
- try {
- return create(usrName);
- }
- finally {
- Thread.currentThread().setContextClassLoader(ctxClsLdr);
- }
+ }
+ finally {
+ HadoopUtils.restoreContextClassLoader(oldLdr);
}
}
catch (InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/587cc46a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
index 44d871a..976119e 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -17,6 +17,19 @@
package org.apache.ignite.internal.processors.hadoop;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobPriority;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.v2.HadoopSplitWrapper;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
@@ -32,17 +45,6 @@ import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.UUID;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.JobPriority;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.v2.HadoopSplitWrapper;
-import org.apache.ignite.internal.util.typedef.internal.U;
/**
* Hadoop utility methods.
@@ -326,15 +328,13 @@ public class HadoopUtils {
* @return New instance of {@link Configuration}.
*/
public static Configuration safeCreateConfiguration() {
- final ClassLoader cl0 = Thread.currentThread().getContextClassLoader();
-
- Thread.currentThread().setContextClassLoader(Configuration.class.getClassLoader());
+ final ClassLoader oldLdr = setContextClassLoader(Configuration.class.getClassLoader());
try {
return new Configuration();
}
finally {
- Thread.currentThread().setContextClassLoader(cl0);
+ restoreContextClassLoader(oldLdr);
}
}
@@ -344,15 +344,13 @@ public class HadoopUtils {
* @return New instance of {@link JobConf}.
*/
public static JobConf safeCreateJobConf() {
- final ClassLoader cl0 = Thread.currentThread().getContextClassLoader();
-
- Thread.currentThread().setContextClassLoader(JobConf.class.getClassLoader());
+ final ClassLoader oldLdr = setContextClassLoader(JobConf.class.getClassLoader());
try {
return new JobConf();
}
finally {
- Thread.currentThread().setContextClassLoader(cl0);
+ restoreContextClassLoader(oldLdr);
}
}
@@ -382,6 +380,33 @@ public class HadoopUtils {
}
/**
+ * Set context class loader.
+ *
+ * @param newLdr New class loader.
+ * @return Old class loader.
+ */
+ @Nullable public static ClassLoader setContextClassLoader(@Nullable ClassLoader newLdr) {
+ ClassLoader oldLdr = Thread.currentThread().getContextClassLoader();
+
+ if (newLdr != oldLdr)
+ Thread.currentThread().setContextClassLoader(newLdr);
+
+ return oldLdr;
+ }
+
+ /**
+ * Restore context class loader.
+ *
+ * @param oldLdr Original class loader.
+ */
+ public static void restoreContextClassLoader(@Nullable ClassLoader oldLdr) {
+ ClassLoader newLdr = Thread.currentThread().getContextClassLoader();
+
+ if (newLdr != oldLdr)
+ Thread.currentThread().setContextClassLoader(oldLdr);
+ }
+
+ /**
* Split wrapper for sorting.
*/
private static class SplitSortWrapper implements Comparable<SplitSortWrapper> {
@@ -432,5 +457,4 @@ public class HadoopUtils {
return obj instanceof SplitSortWrapper && id == ((SplitSortWrapper)obj).id;
}
}
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/587cc46a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
index 8804e29..2357ef5 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
@@ -17,24 +17,6 @@
package org.apache.ignite.internal.processors.hadoop.v2;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Method;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Queue;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -70,6 +52,25 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+
import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.jobLocalDir;
import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.taskLocalDir;
import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.transformException;
@@ -144,8 +145,6 @@ public class HadoopV2Job implements HadoopJob {
HadoopFileSystemsUtils.setupFileSystems(jobConf);
- Thread.currentThread().setContextClassLoader(null);
-
for (Map.Entry<String,String> e : jobInfo.properties().entrySet())
jobConf.set(e.getKey(), e.getValue());
@@ -166,7 +165,7 @@ public class HadoopV2Job implements HadoopJob {
/** {@inheritDoc} */
@Override public Collection<HadoopInputSplit> input() throws IgniteCheckedException {
- Thread.currentThread().setContextClassLoader(jobConf.getClassLoader());
+ ClassLoader oldLdr = HadoopUtils.setContextClassLoader(jobConf.getClassLoader());
try {
String jobDirPath = jobConf.get(MRJobConfig.MAPREDUCE_JOB_DIR);
@@ -223,7 +222,7 @@ public class HadoopV2Job implements HadoopJob {
}
}
finally {
- Thread.currentThread().setContextClassLoader(null);
+ HadoopUtils.restoreContextClassLoader(oldLdr);
}
}
@@ -299,13 +298,13 @@ public class HadoopV2Job implements HadoopJob {
assert ((HadoopClassLoader)getClass().getClassLoader()).name()
.equals(HadoopClassLoader.nameForJob(this.locNodeId));
- Thread.currentThread().setContextClassLoader(jobConf.getClassLoader());
+ ClassLoader oldLdr = HadoopUtils.setContextClassLoader(jobConf.getClassLoader());
try {
rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(locNodeId, jobId));
}
finally {
- Thread.currentThread().setContextClassLoader(null);
+ HadoopUtils.restoreContextClassLoader(oldLdr);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/587cc46a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
index 67ab600..8df2557 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
@@ -17,16 +17,6 @@
package org.apache.ignite.internal.processors.hadoop.v2;
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.file.FileSystemException;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
@@ -37,11 +27,23 @@ import org.apache.hadoop.util.RunJar;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.FileSystemException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+
/**
* Provides all resources are needed to the job execution. Downloads the main jar, the configuration and additional
* files are needed to be placed on local files system.
@@ -94,7 +96,7 @@ class HadoopV2JobResourceManager {
private void setLocalFSWorkingDirectory(File dir) throws IOException {
JobConf cfg = ctx.getJobConf();
- Thread.currentThread().setContextClassLoader(cfg.getClassLoader());
+ ClassLoader oldLdr = HadoopUtils.setContextClassLoader(cfg.getClassLoader());
try {
cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, dir.getAbsolutePath());
@@ -103,7 +105,7 @@ class HadoopV2JobResourceManager {
FileSystem.getLocal(cfg).setWorkingDirectory(new Path(dir.getAbsolutePath()));
}
finally {
- Thread.currentThread().setContextClassLoader(null);
+ HadoopUtils.restoreContextClassLoader(oldLdr);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/587cc46a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index 68c9ff8..af25942 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -17,13 +17,6 @@
package org.apache.ignite.internal.processors.hadoop.v2;
-import java.io.DataInput;
-import java.io.File;
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.Comparator;
-import java.util.UUID;
-import java.util.concurrent.Callable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -57,6 +50,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException
import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
+import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl;
@@ -71,6 +65,14 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.jetbrains.annotations.Nullable;
+import java.io.DataInput;
+import java.io.File;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Comparator;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+
import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.jobLocalDir;
import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.taskLocalDir;
import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.transformException;
@@ -158,7 +160,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
this.locNodeId = locNodeId;
// Before create JobConf instance we should set new context class loader.
- Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+ ClassLoader oldLdr = HadoopUtils.setContextClassLoader(getClass().getClassLoader());
try {
JobConf jobConf = new JobConf();
@@ -180,7 +182,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
useNewCombiner = jobConf.getCombinerClass() == null;
}
finally {
- Thread.currentThread().setContextClassLoader(null);
+ HadoopUtils.restoreContextClassLoader(oldLdr);
}
}
@@ -229,9 +231,9 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
/** {@inheritDoc} */
@Override public void run() throws IgniteCheckedException {
- try {
- Thread.currentThread().setContextClassLoader(jobConf().getClassLoader());
+ ClassLoader oldLdr = HadoopUtils.setContextClassLoader(jobConf().getClassLoader());
+ try {
try {
task = createTask();
}
@@ -258,7 +260,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
finally {
task = null;
- Thread.currentThread().setContextClassLoader(null);
+ HadoopUtils.restoreContextClassLoader(oldLdr);
}
}
@@ -289,7 +291,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
locDir = jobLocalDir(locNodeId, taskInfo().jobId());
}
- Thread.currentThread().setContextClassLoader(jobConf().getClassLoader());
+ ClassLoader oldLdr = HadoopUtils.setContextClassLoader(jobConf().getClassLoader());
try {
FileSystem.get(jobConf());
@@ -305,7 +307,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
throw transformException(e);
}
finally {
- Thread.currentThread().setContextClassLoader(null);
+ HadoopUtils.restoreContextClassLoader(oldLdr);
}
}