You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/14 19:44:30 UTC

ignite git commit: IGNITE-3902: Attempt to fix.

Repository: ignite
Updated Branches:
  refs/heads/ignite-3902 [created] 587cc46af


IGNITE-3902: Attempt to fix.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/587cc46a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/587cc46a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/587cc46a

Branch: refs/heads/ignite-3902
Commit: 587cc46aff19e5b9c2cb07abc0799087acc16d00
Parents: 98914fe
Author: thatcoach <pp...@list.ru>
Authored: Wed Sep 14 22:44:07 2016 +0300
Committer: thatcoach <pp...@list.ru>
Committed: Wed Sep 14 22:44:07 2016 +0300

----------------------------------------------------------------------
 .../hadoop/fs/BasicHadoopFileSystemFactory.java | 17 ++----
 .../internal/processors/hadoop/HadoopUtils.java | 64 ++++++++++++++------
 .../processors/hadoop/v2/HadoopV2Job.java       | 47 +++++++-------
 .../hadoop/v2/HadoopV2JobResourceManager.java   | 26 ++++----
 .../hadoop/v2/HadoopV2TaskContext.java          | 30 ++++-----
 5 files changed, 102 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/587cc46a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
index 06f76c3..a01bfaf 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java
@@ -93,20 +93,13 @@ public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Ex
             // FileSystem.get() might delegate to ServiceLoader to get the list of file system implementation.
             // And ServiceLoader is known to be sensitive to context classloader. Therefore, we change context
             // classloader to classloader of current class to avoid strange class-cast-exceptions.
-            ClassLoader ctxClsLdr = Thread.currentThread().getContextClassLoader();
-            ClassLoader clsLdr = getClass().getClassLoader();
+            ClassLoader oldLdr = HadoopUtils.setContextClassLoader(getClass().getClassLoader());
 
-            if (ctxClsLdr == clsLdr)
+            try {
                 return create(usrName);
-            else {
-                Thread.currentThread().setContextClassLoader(clsLdr);
-
-                try {
-                    return create(usrName);
-                }
-                finally {
-                    Thread.currentThread().setContextClassLoader(ctxClsLdr);
-                }
+            }
+            finally {
+                HadoopUtils.restoreContextClassLoader(oldLdr);
             }
         }
         catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/587cc46a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
index 44d871a..976119e 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -17,6 +17,19 @@
 
 package org.apache.ignite.internal.processors.hadoop;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobPriority;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.v2.HadoopSplitWrapper;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
@@ -32,17 +45,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeSet;
 import java.util.UUID;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.JobPriority;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.v2.HadoopSplitWrapper;
-import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
  * Hadoop utility methods.
@@ -326,15 +328,13 @@ public class HadoopUtils {
      * @return New instance of {@link Configuration}.
      */
     public static Configuration safeCreateConfiguration() {
-        final ClassLoader cl0 = Thread.currentThread().getContextClassLoader();
-
-        Thread.currentThread().setContextClassLoader(Configuration.class.getClassLoader());
+        final ClassLoader oldLdr = setContextClassLoader(Configuration.class.getClassLoader());
 
         try {
             return new Configuration();
         }
         finally {
-            Thread.currentThread().setContextClassLoader(cl0);
+            restoreContextClassLoader(oldLdr);
         }
     }
 
@@ -344,15 +344,13 @@ public class HadoopUtils {
      * @return New instance of {@link JobConf}.
      */
     public static JobConf safeCreateJobConf() {
-        final ClassLoader cl0 = Thread.currentThread().getContextClassLoader();
-
-        Thread.currentThread().setContextClassLoader(JobConf.class.getClassLoader());
+        final ClassLoader oldLdr = setContextClassLoader(JobConf.class.getClassLoader());
 
         try {
             return new JobConf();
         }
         finally {
-            Thread.currentThread().setContextClassLoader(cl0);
+            restoreContextClassLoader(oldLdr);
         }
     }
 
@@ -382,6 +380,33 @@ public class HadoopUtils {
     }
 
     /**
+     * Set context class loader.
+     *
+     * @param newLdr New class loader.
+     * @return Old class loader.
+     */
+    @Nullable public static ClassLoader setContextClassLoader(@Nullable ClassLoader newLdr) {
+        ClassLoader oldLdr = Thread.currentThread().getContextClassLoader();
+
+        if (newLdr != oldLdr)
+            Thread.currentThread().setContextClassLoader(newLdr);
+
+        return oldLdr;
+    }
+
+    /**
+     * Restore context class loader.
+     *
+     * @param oldLdr Original class loader.
+     */
+    public static void restoreContextClassLoader(@Nullable ClassLoader oldLdr) {
+        ClassLoader newLdr = Thread.currentThread().getContextClassLoader();
+
+        if (newLdr != oldLdr)
+            Thread.currentThread().setContextClassLoader(oldLdr);
+    }
+
+    /**
      * Split wrapper for sorting.
      */
     private static class SplitSortWrapper implements Comparable<SplitSortWrapper> {
@@ -432,5 +457,4 @@ public class HadoopUtils {
             return obj instanceof SplitSortWrapper && id == ((SplitSortWrapper)obj).id;
         }
     }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/587cc46a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
index 8804e29..2357ef5 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
@@ -17,24 +17,6 @@
 
 package org.apache.ignite.internal.processors.hadoop.v2;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Method;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Queue;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -70,6 +52,25 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+
 import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.jobLocalDir;
 import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.taskLocalDir;
 import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.transformException;
@@ -144,8 +145,6 @@ public class HadoopV2Job implements HadoopJob {
 
         HadoopFileSystemsUtils.setupFileSystems(jobConf);
 
-        Thread.currentThread().setContextClassLoader(null);
-
         for (Map.Entry<String,String> e : jobInfo.properties().entrySet())
             jobConf.set(e.getKey(), e.getValue());
 
@@ -166,7 +165,7 @@ public class HadoopV2Job implements HadoopJob {
 
     /** {@inheritDoc} */
     @Override public Collection<HadoopInputSplit> input() throws IgniteCheckedException {
-        Thread.currentThread().setContextClassLoader(jobConf.getClassLoader());
+        ClassLoader oldLdr = HadoopUtils.setContextClassLoader(jobConf.getClassLoader());
 
         try {
             String jobDirPath = jobConf.get(MRJobConfig.MAPREDUCE_JOB_DIR);
@@ -223,7 +222,7 @@ public class HadoopV2Job implements HadoopJob {
             }
         }
         finally {
-            Thread.currentThread().setContextClassLoader(null);
+            HadoopUtils.restoreContextClassLoader(oldLdr);
         }
     }
 
@@ -299,13 +298,13 @@ public class HadoopV2Job implements HadoopJob {
         assert ((HadoopClassLoader)getClass().getClassLoader()).name()
             .equals(HadoopClassLoader.nameForJob(this.locNodeId));
 
-        Thread.currentThread().setContextClassLoader(jobConf.getClassLoader());
+        ClassLoader oldLdr = HadoopUtils.setContextClassLoader(jobConf.getClassLoader());
 
         try {
             rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(locNodeId, jobId));
         }
         finally {
-            Thread.currentThread().setContextClassLoader(null);
+            HadoopUtils.restoreContextClassLoader(oldLdr);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/587cc46a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
index 67ab600..8df2557 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
@@ -17,16 +17,6 @@
 
 package org.apache.ignite.internal.processors.hadoop.v2;
 
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.file.FileSystemException;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
@@ -37,11 +27,23 @@ import org.apache.hadoop.util.RunJar;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
 import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.FileSystemException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+
 /**
  * Provides all resources are needed to the job execution. Downloads the main jar, the configuration and additional
  * files are needed to be placed on local files system.
@@ -94,7 +96,7 @@ class HadoopV2JobResourceManager {
     private void setLocalFSWorkingDirectory(File dir) throws IOException {
         JobConf cfg = ctx.getJobConf();
 
-        Thread.currentThread().setContextClassLoader(cfg.getClassLoader());
+        ClassLoader oldLdr = HadoopUtils.setContextClassLoader(cfg.getClassLoader());
 
         try {
             cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, dir.getAbsolutePath());
@@ -103,7 +105,7 @@ class HadoopV2JobResourceManager {
                 FileSystem.getLocal(cfg).setWorkingDirectory(new Path(dir.getAbsolutePath()));
         }
         finally {
-            Thread.currentThread().setContextClassLoader(null);
+            HadoopUtils.restoreContextClassLoader(oldLdr);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/587cc46a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index 68c9ff8..af25942 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -17,13 +17,6 @@
 
 package org.apache.ignite.internal.processors.hadoop.v2;
 
-import java.io.DataInput;
-import java.io.File;
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.Comparator;
-import java.util.UUID;
-import java.util.concurrent.Callable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -57,6 +50,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
+import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl;
@@ -71,6 +65,14 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.jetbrains.annotations.Nullable;
 
+import java.io.DataInput;
+import java.io.File;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Comparator;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+
 import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.jobLocalDir;
 import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.taskLocalDir;
 import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.transformException;
@@ -158,7 +160,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
         this.locNodeId = locNodeId;
 
         // Before create JobConf instance we should set new context class loader.
-        Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+        ClassLoader oldLdr = HadoopUtils.setContextClassLoader(getClass().getClassLoader());
 
         try {
             JobConf jobConf = new JobConf();
@@ -180,7 +182,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
             useNewCombiner = jobConf.getCombinerClass() == null;
         }
         finally {
-            Thread.currentThread().setContextClassLoader(null);
+            HadoopUtils.restoreContextClassLoader(oldLdr);
         }
     }
 
@@ -229,9 +231,9 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
 
     /** {@inheritDoc} */
     @Override public void run() throws IgniteCheckedException {
-        try {
-            Thread.currentThread().setContextClassLoader(jobConf().getClassLoader());
+        ClassLoader oldLdr = HadoopUtils.setContextClassLoader(jobConf().getClassLoader());
 
+        try {
             try {
                 task = createTask();
             }
@@ -258,7 +260,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
         finally {
             task = null;
 
-            Thread.currentThread().setContextClassLoader(null);
+            HadoopUtils.restoreContextClassLoader(oldLdr);
         }
     }
 
@@ -289,7 +291,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
                 locDir = jobLocalDir(locNodeId, taskInfo().jobId());
         }
 
-        Thread.currentThread().setContextClassLoader(jobConf().getClassLoader());
+        ClassLoader oldLdr = HadoopUtils.setContextClassLoader(jobConf().getClassLoader());
 
         try {
             FileSystem.get(jobConf());
@@ -305,7 +307,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
             throw transformException(e);
         }
         finally {
-            Thread.currentThread().setContextClassLoader(null);
+            HadoopUtils.restoreContextClassLoader(oldLdr);
         }
     }