You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/29 20:12:26 UTC

[49/50] [abbrv] incubator-ignite git commit: #[IGNITE-218]: mapred execution in correct user context.

#[IGNITE-218]: mapred execution in correct user context.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4d32feeb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4d32feeb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4d32feeb

Branch: refs/heads/ignite-218
Commit: 4d32feeb0d206ce6edd10b7e649dca928804bd74
Parents: 1b7cead
Author: iveselovskiy <iv...@gridgain.com>
Authored: Fri May 29 21:01:35 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Fri May 29 21:01:35 2015 +0300

----------------------------------------------------------------------
 config/hadoop/default-config.xml                |   3 +
 .../processors/hadoop/HadoopTaskContext.java    |  11 ++
 .../internal/processors/igfs/IgfsUtils.java     |   4 -
 .../fs/IgniteHadoopFileSystemCounterWriter.java |   2 +-
 .../hadoop/fs/v1/IgniteHadoopFileSystem.java    |  16 +--
 .../processors/hadoop/HadoopDefaultJobInfo.java |   2 +-
 .../internal/processors/hadoop/HadoopUtils.java |  53 +++++++++
 .../hadoop/SecondaryFileSystemProvider.java     |   2 +-
 .../hadoop/fs/HadoopLazyConcurrentMap.java      |   5 +-
 .../hadoop/taskexecutor/HadoopRunnableTask.java | 111 ++++++++++---------
 .../processors/hadoop/v2/HadoopV2Job.java       |   2 +-
 .../hadoop/v2/HadoopV2JobResourceManager.java   |   4 +-
 .../hadoop/v2/HadoopV2TaskContext.java          |  51 +++++++++
 .../hadoop/HadoopClientProtocolSelfTest.java    |   6 +-
 .../HadoopDefaultMapReducePlannerSelfTest.java  |   2 -
 .../processors/hadoop/HadoopMapReduceTest.java  |  15 ++-
 .../hadoop/HadoopTasksAllVersionsTest.java      |  15 ++-
 .../processors/hadoop/HadoopTasksV1Test.java    |   5 +-
 .../processors/hadoop/HadoopTasksV2Test.java    |   5 +-
 .../processors/hadoop/HadoopV2JobSelfTest.java  |   6 +-
 .../hadoop/examples/HadoopWordCount2.java       |  24 +---
 .../collections/HadoopAbstractMapTest.java      |  16 +++
 22 files changed, 238 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/config/hadoop/default-config.xml
----------------------------------------------------------------------
diff --git a/config/hadoop/default-config.xml b/config/hadoop/default-config.xml
index 30413b0..c1e4855 100644
--- a/config/hadoop/default-config.xml
+++ b/config/hadoop/default-config.xml
@@ -90,6 +90,9 @@
         Configuration of Ignite node.
     -->
     <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+        <!-- Temporary workaround for tests: -->
+        <property name="localHost" value="127.0.0.1"/>
+
         <!--
             Apache Hadoop Accelerator configuration.
         -->

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
index 371fd81..47c55bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.internal.processors.hadoop.counter.*;
 
 import java.util.*;
+import java.util.concurrent.*;
 
 /**
  * Task context.
@@ -187,4 +188,14 @@ public abstract class HadoopTaskContext {
      * @throws IgniteCheckedException If failed.
      */
     public abstract void cleanupTaskEnvironment() throws IgniteCheckedException;
+
+    /**
+     *
+     * @param user
+     * @param callable
+     * @param <T>
+     * @return
+     * @throws IgniteCheckedException
+     */
+    public abstract <T> T runAs(String user, Callable<T> callable) throws IgniteCheckedException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index 71a1dbe..8026a44 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@ -92,11 +92,7 @@ public class IgfsUtils {
     }
 
     /**
-<<<<<<< HEAD
-     * Provides non-null interned user name.
-=======
      * Provides non-null user name.
->>>>>>> 8455c7a6ed6f7449c7ad31b1ef7b129705262e1b
      * If the user name is null or empty string, defaults to {@link FileSystemConfiguration#DFLT_USER_NAME},
      * which is the current process owner user.
      * @param user a user name to be fixed.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
index 821acdb..cb4f19b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
@@ -54,7 +54,7 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
     @Override public void write(HadoopJobInfo jobInfo, HadoopJobId jobId, HadoopCounters cntrs)
         throws IgniteCheckedException {
 
-        Configuration hadoopCfg = new Configuration();
+        Configuration hadoopCfg = HadoopUtils.safeCreateConfiguration();
 
         for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)jobInfo).properties().entrySet())
             hadoopCfg.set(e.getKey(), e.getValue());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index c0a9ade..bbb8c5f 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -179,14 +179,14 @@ public class IgniteHadoopFileSystem extends FileSystem {
     public static String getFsHadoopUser(Configuration cfg) throws IOException {
         String user = null;
 
-        // -------------------------------------------
-        // TODO: Temporary workaround, see https://issues.apache.org/jira/browse/IGNITE-761
-        // We have an issue there: sometimes FileSystem created from MR jobs gets incorrect
-        // UserGroupInformation.getCurrentUser() despite of the fact that it is invoked in correct
-        // ugi.doAs() closure.
-        if (cfg != null)
-            user = cfg.get(MRJobConfig.USER_NAME);
-        // -------------------------------------------
+//        // -------------------------------------------
+//        // TODO: Temporary workaround, see https://issues.apache.org/jira/browse/IGNITE-761
+//        // We have an issue there: sometimes FileSystem created from MR jobs gets incorrect
+//        // UserGroupInformation.getCurrentUser() despite of the fact that it is invoked in correct
+//        // ugi.doAs() closure.
+//        if (cfg != null)
+//            user = cfg.get(MRJobConfig.USER_NAME);
+//        // -------------------------------------------
 
         if (user == null) {
             UserGroupInformation currUgi = UserGroupInformation.getCurrentUser();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
index d0a327e..2e855d0 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
@@ -89,7 +89,7 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
             if (jobCls0 == null) { // It is enough to have only one class loader with only Hadoop classes.
                 synchronized (HadoopDefaultJobInfo.class) {
                     if ((jobCls0 = jobCls) == null) {
-                        HadoopClassLoader ldr = new HadoopClassLoader(null, "hadoop-main");
+                        HadoopClassLoader ldr = new HadoopClassLoader(null, "hadoop-job");
 
                         jobCls = jobCls0 = ldr.loadClass(HadoopV2Job.class.getName());
                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
index d493bd4..ca3a6c5 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -304,6 +304,59 @@ public class HadoopUtils {
     }
 
     /**
+     *
+     * @return
+     */
+    static Configuration checkPreconditionAndCreateConfiguration() {
+        ClassLoader confCl = Configuration.class.getClassLoader();
+
+        ClassLoader ctxCl = Thread.currentThread().getContextClassLoader();
+
+        if (ctxCl != null && confCl != ctxCl)
+            throw new IllegalStateException("Wrong classloader: " + confCl + " != " + ctxCl);
+
+        return new Configuration();
+    }
+
+    /**
+     * For diagnostic & test purposes.
+     * @param c the Configuration to check.
+     */
+    static void checkConfiguration(Configuration c) {
+        String name = Configuration.class.getName();
+
+        c.set("xxx", name);
+
+        Class clazz = c.getClass("xxx", null);
+
+        c.unset("xxx");
+
+        if (clazz != c.getClass())
+            throw new IllegalStateException("Wrong configuration.");
+    }
+
+    /**
+     *
+     * @return
+     */
+    public static Configuration safeCreateConfiguration() {
+        final ClassLoader cl0 = Thread.currentThread().getContextClassLoader();
+
+        Thread.currentThread().setContextClassLoader(Configuration.class.getClassLoader());
+
+        try {
+            Configuration c = checkPreconditionAndCreateConfiguration();
+
+            checkConfiguration(c);
+
+            return c;
+        }
+        finally {
+            Thread.currentThread().setContextClassLoader(cl0);
+        }
+    }
+
+    /**
      * Constructor.
      */
     private HadoopUtils() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
index b1a057c..f9a68f1 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/SecondaryFileSystemProvider.java
@@ -34,7 +34,7 @@ import java.security.*;
  */
 public class SecondaryFileSystemProvider {
     /** Configuration of the secondary filesystem, never null. */
-    private final Configuration cfg = new Configuration();
+    private final Configuration cfg = HadoopUtils.safeCreateConfiguration();
 
     /** The secondary filesystem URI, never null. */
     private final URI uri;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
index f1c10b6..71b38c4 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLazyConcurrentMap.java
@@ -18,13 +18,12 @@
 package org.apache.ignite.internal.processors.hadoop.fs;
 
 import org.apache.ignite.*;
+import org.apache.ignite.internal.util.future.*;
 import org.jsr166.*;
 
+import java.io.*;
 import java.util.*;
 import java.util.concurrent.*;
-import org.apache.ignite.internal.util.future.*;
-
-import java.io.*;
 import java.util.concurrent.locks.*;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
index 1919fc4..ac141a9 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
@@ -17,21 +17,14 @@
 
 package org.apache.ignite.internal.processors.hadoop.taskexecutor;
 
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.security.*;
 import org.apache.ignite.*;
 import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.processors.hadoop.counter.*;
 import org.apache.ignite.internal.processors.hadoop.shuffle.collections.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.offheap.unsafe.*;
-import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 
-import java.io.*;
-import java.security.*;
 import java.util.*;
 import java.util.concurrent.*;
 
@@ -111,56 +104,76 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
 
         user = IgfsUtils.fixUserName(user);
 
-        String ugiUser;
-        try {
-            UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
-
-            ugiUser = currUser.getShortUserName();
-        }
-        catch (IOException ioe) {
-            throw new IgniteCheckedException(ioe);
-        }
-
-        if (F.eq(user, ugiUser))
-            // if current UGI context user is the same, do direct call:
-            return callImpl();
-        else {
-            // do the call in the context of 'user':
-            try {
-                final String ticketCachePath = getJobProperty(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
-
-                UserGroupInformation ugi = UserGroupInformation.getBestUGI(ticketCachePath, user);
-
-                return ugi.doAs(new PrivilegedExceptionAction<Void>() {
-                    @Override public Void run() throws IgniteCheckedException {
-                        return callImpl();
-                    }
-                });
-            } catch (IOException | InterruptedException e) {
-                throw new IgniteCheckedException(e);
-            }
-        }
+        return callImpl(user);
+
+//        String ugiUser;
+//        try {
+//            UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
+//
+//            ugiUser = currUser.getShortUserName();
+//        }
+//        catch (IOException ioe) {
+//            throw new IgniteCheckedException(ioe);
+//        }
+//
+//        if (F.eq(user, ugiUser))
+//            // if current UGI context user is the same, do direct call:
+//            return callImpl();
+//        else {
+//            // do the call in the context of 'user':
+//            try {
+//                final String ticketCachePath = getJobProperty(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
+//
+//                UserGroupInformation ugi = UserGroupInformation.getBestUGI(ticketCachePath, user);
+//
+//                return ugi.doAs(new PrivilegedExceptionAction<Void>() {
+//                    @Override public Void run() throws IgniteCheckedException {
+//                        return callImpl();
+//                    }
+//                });
+//            } catch (IOException | InterruptedException e) {
+//                throw new IgniteCheckedException(e);
+//            }
+//        }
     }
 
+//    /**
+//     * Gets the job property.
+//     */
+//    private String getJobProperty(String key) {
+//        if (job instanceof HadoopV2Job) {
+//            Configuration conf = ((HadoopV2Job)job).jobConf();
+//
+//            return conf.get(key);
+//        }
+//        else
+//            return job.info().property(key);
+//    }
+
     /**
-     * Gets the job property.
+     * Runnable task call implementation
+     * @return null.
+     * @throws IgniteCheckedException
      */
-    private String getJobProperty(String key) {
-        if (job instanceof HadoopV2Job) {
-            Configuration conf = ((HadoopV2Job)job).jobConf();
+    Void callImpl(final String user) throws IgniteCheckedException {
+        assert user != null;
 
-            return conf.get(key);
-        }
-        else
-            return job.info().property(key);
+        ctx = job.getTaskContext(info);
+
+        return ctx.runAs(user, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                runTaskImpl();
+
+                return null;
+            }
+        });
     }
 
     /**
-     * Runnable task call implementation
-     * @return null.
+     * Implements actual task running.
      * @throws IgniteCheckedException
      */
-    Void callImpl() throws IgniteCheckedException {
+    void runTaskImpl() throws IgniteCheckedException {
         execStartTs = U.currentTimeMillis();
 
         Throwable err = null;
@@ -170,8 +183,6 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
         HadoopPerformanceCounter perfCntr = null;
 
         try {
-            ctx = job.getTaskContext(info);
-
             perfCntr = HadoopPerformanceCounter.getCounter(ctx.counters(), nodeId);
 
             perfCntr.onTaskSubmit(info, submitTs);
@@ -218,8 +229,6 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
             if (ctx != null)
                 ctx.cleanupTaskEnvironment();
         }
-
-        return null;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
index a4a7b25..3d47960 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
@@ -205,7 +205,7 @@ public class HadoopV2Job implements HadoopJob {
                 // Note that the classloader identified by the task it was initially created for,
                 // but later it may be reused for other tasks.
                 HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath(),
-                    "hadoop-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber());
+                    "hadoop-task-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber());
 
                 cls = (Class<? extends HadoopTaskContext>)ldr.loadClass(HadoopV2TaskContext.class.getName());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
index 7dcd10b..7bc0fb0 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
@@ -163,7 +163,6 @@ public class HadoopV2JobResourceManager {
                 stagingDir = new Path(new URI(mrDir));
 
                 if (download) {
-                    // TODO: Out of bounds.
                     try (FileSystem fs = fileSystemForMrUser(stagingDir.toUri(), cfg)) {
                         if (!fs.exists(stagingDir))
                             throw new IgniteCheckedException("Failed to find map-reduce " +
@@ -199,7 +198,8 @@ public class HadoopV2JobResourceManager {
                 }
             }
             else if (!jobLocDir.mkdirs())
-                throw new IgniteCheckedException("Failed to create local job directory: " + jobLocDir.getAbsolutePath());
+                throw new IgniteCheckedException("Failed to create local job directory: "
+                    + jobLocDir.getAbsolutePath());
 
             setLocalFSWorkingDirectory(jobLocDir);
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index 9086874..0bbe1d7 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -28,16 +28,20 @@ import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapred.TaskID;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.security.*;
 import org.apache.ignite.*;
 import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.processors.hadoop.counter.*;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
 import org.apache.ignite.internal.processors.hadoop.v1.*;
+import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
+import java.security.*;
 import java.util.*;
+import java.util.concurrent.*;
 
 import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*;
 import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
@@ -447,4 +451,51 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
             throw new IgniteCheckedException(e);
         }
     }
+
+    @Override public <T> T runAs(final String user, final Callable<T> callable) throws IgniteCheckedException {
+        String ugiUser;
+        try {
+            UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
+
+            ugiUser = currUser.getShortUserName();
+        }
+        catch (IOException ioe) {
+            throw new IgniteCheckedException(ioe);
+        }
+
+        try {
+            if (F.eq(user, ugiUser))
+                // if current UGI context user is the same, do direct call:
+                return callable.call();
+            else {
+            // do the call in the context of 'user':
+//                final String ticketCachePath = getJobProperty(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH);
+//
+                UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, user);
+
+                return ugi.doAs(new PrivilegedExceptionAction<T>() {
+                    @Override public T run() throws Exception {
+                        return callable.call();
+                    }
+                });
+            }
+        }
+        catch (Exception e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+//    /**
+//     * Gets the job property.
+//     */
+//    private String getJobProperty(String key) {
+//        if (job instanceof HadoopV2Job) {
+//            Configuration conf = ((HadoopV2Job)job).jobConf();
+//
+//            return conf.get(key);
+//        }
+//        else
+//            return job.info().property(key);
+//    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
index b94d9d1..b9f8179 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
@@ -28,7 +28,6 @@ import org.apache.ignite.*;
 import org.apache.ignite.hadoop.mapreduce.*;
 import org.apache.ignite.igfs.*;
 import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.proto.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -449,7 +448,7 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
      * @return Configuration.
      */
     private Configuration config(int port) {
-        Configuration conf = new Configuration();
+        Configuration conf = HadoopUtils.safeCreateConfiguration();
 
         setupFileSystems(conf);
 
@@ -521,9 +520,8 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
             ctx.getCounter(TestCounter.COUNTER2).increment(1);
 
             int sum = 0;
-            for (IntWritable value : values) {
+            for (IntWritable value : values)
                 sum += value.get();
-            }
 
             ctx.write(key, new IntWritable(sum));
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
index 30babe3..f3c74fc 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
@@ -1000,7 +1000,5 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes
         @Override public GridKernalContext context() {
             return null;
         }
-
-
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
index 8a3a0ac..f96eb74 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.hadoop.counter.*;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
 import org.apache.ignite.internal.processors.hadoop.examples.*;
+import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.testframework.*;
@@ -105,14 +106,16 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
 
             checkJobStatistics(jobId);
 
+            final String outFile = PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000";
+
             assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " +
-                useNewReducer,
+                    useNewReducer,
                 "blue\t200000\n" +
-                "green\t150000\n" +
-                "red\t100000\n" +
-                "yellow\t70000\n",
-                readAndSortFile(PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000")
-            );
+                    "green\t150000\n" +
+                    "red\t100000\n" +
+                    "yellow\t70000\n",
+                readAndSortFile(outFile)
+                );
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
index aaf0f92..6930020 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.io.*;
 import org.apache.ignite.*;
 import org.apache.ignite.igfs.*;
 import org.apache.ignite.internal.processors.hadoop.examples.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
 
 import java.io.*;
 import java.net.*;
@@ -43,7 +42,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
      * @return Hadoop job.
      * @throws IOException If fails.
      */
-    public abstract HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception;
+    public abstract HadoopJob getHadoopJob(String inFile, String outFile) throws Exception;
 
     /**
      * @return prefix of reducer output file name. It's "part-" for v1 and "part-r-" for v2 API
@@ -79,7 +78,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
         HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, fileBlock1.length(),
                 igfs.info(inFile).length() - fileBlock1.length());
 
-        HadoopV2Job gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT);
+        HadoopJob gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT);
 
         HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock1);
 
@@ -110,7 +109,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
      * @return Context with mock output.
      * @throws IgniteCheckedException If fails.
      */
-    private HadoopTestTaskContext runTaskWithInput(HadoopV2Job gridJob, HadoopTaskType taskType,
+    private HadoopTestTaskContext runTaskWithInput(HadoopJob gridJob, HadoopTaskType taskType,
         int taskNum, String... words) throws IgniteCheckedException {
         HadoopTaskInfo taskInfo = new HadoopTaskInfo(taskType, gridJob.id(), taskNum, 0, null);
 
@@ -136,7 +135,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
      * @throws Exception If fails.
      */
     public void testReduceTask() throws Exception {
-        HadoopV2Job gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT);
+        HadoopJob gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT);
 
         runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 0, "word1", "5", "word2", "10");
         runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 1, "word3", "7", "word4", "15");
@@ -162,7 +161,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
      * @throws Exception If fails.
      */
     public void testCombinerTask() throws Exception {
-        HadoopV2Job gridJob = getHadoopJob("/", "/");
+        HadoopJob gridJob = getHadoopJob("/", "/");
 
         HadoopTestTaskContext ctx =
             runTaskWithInput(gridJob, HadoopTaskType.COMBINE, 0, "word1", "5", "word2", "10");
@@ -182,7 +181,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
      * @return Context of combine task with mock output.
      * @throws IgniteCheckedException If fails.
      */
-    private HadoopTestTaskContext runMapCombineTask(HadoopFileBlock fileBlock, HadoopV2Job gridJob)
+    private HadoopTestTaskContext runMapCombineTask(HadoopFileBlock fileBlock, HadoopJob gridJob)
         throws IgniteCheckedException {
         HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock);
 
@@ -228,7 +227,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
         HadoopFileBlock fileBlock1 = new HadoopFileBlock(HOSTS, inFileUri, 0, l);
         HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, l, fileLen - l);
 
-        HadoopV2Job gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT);
+        HadoopJob gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT);
 
         HadoopTestTaskContext combine1Ctx = runMapCombineTask(fileBlock1, gridJob);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
index b41a260..48e83cc 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.hadoop;
 
 import org.apache.hadoop.mapred.*;
 import org.apache.ignite.internal.processors.hadoop.examples.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
 
 import java.io.*;
 import java.util.*;
@@ -38,7 +37,7 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest {
      * @return Hadoop job.
      * @throws IOException If fails.
      */
-    @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception {
+    @Override public HadoopJob getHadoopJob(String inFile, String outFile) throws Exception {
         JobConf jobConf = HadoopWordCount1.getJob(inFile, outFile);
 
         setupFileSystems(jobConf);
@@ -47,7 +46,7 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest {
 
         HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0);
 
-        return new HadoopV2Job(jobId, jobInfo, log);
+        return jobInfo.createJob(jobId, log);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
index b677c63..e73fae3 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.mapreduce.lib.input.*;
 import org.apache.hadoop.mapreduce.lib.output.*;
 import org.apache.ignite.internal.processors.hadoop.examples.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
 
 import java.util.*;
 
@@ -42,7 +41,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest {
      * @return Hadoop job.
      * @throws Exception if fails.
      */
-    @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception {
+    @Override public HadoopJob getHadoopJob(String inFile, String outFile) throws Exception {
         Job job = Job.getInstance();
 
         job.setOutputKeyClass(Text.class);
@@ -65,7 +64,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest {
 
         HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0);
 
-        return new HadoopV2Job(jobId, jobInfo, log);
+        return jobInfo.createJob(jobId, log);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
index ebc89f4..f3b9307 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
@@ -66,7 +66,11 @@ public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest {
         cfg.setMapOutputValueClass(Text.class);
         cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName());
 
-        HadoopJob job = new HadoopV2Job(new HadoopJobId(UUID.randomUUID(), 1), createJobInfo(cfg), log);
+        HadoopDefaultJobInfo info = createJobInfo(cfg);
+
+        HadoopJobId id = new HadoopJobId(UUID.randomUUID(), 1);
+
+        HadoopJob job = info.createJob(id, log);
 
         HadoopTaskContext taskCtx = job.getTaskContext(new HadoopTaskInfo(HadoopTaskType.MAP, null, 0, 0,
             null));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
index 8637a4e..dc68df7 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/examples/HadoopWordCount2.java
@@ -17,13 +17,11 @@
 
 package org.apache.ignite.internal.processors.hadoop.examples;
 
-import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.mapreduce.lib.input.*;
 import org.apache.hadoop.mapreduce.lib.output.*;
-import org.apache.ignite.internal.processors.hadoop.*;
 
 import java.io.*;
 
@@ -46,24 +44,6 @@ public class HadoopWordCount2 {
         Job job = getJob(args[0], args[1]);
 
         job.submit();
-
-        printCounters(job);
-
-        job.waitForCompletion(true);
-
-        printCounters(job);
-    }
-
-    private static void printCounters(Job job) throws IOException {
-        Counters counters = job.getCounters();
-
-        for (CounterGroup group : counters) {
-            System.out.println("Group: " + group.getDisplayName() + "," + group.getName());
-            System.out.println("  number of counters: " + group.size());
-            for (Counter counter : group) {
-                System.out.println("  - " + counter.getDisplayName() + ": " + counter.getName() + ": "+counter.getValue());
-            }
-        }
     }
 
     /**
@@ -75,9 +55,7 @@ public class HadoopWordCount2 {
      * @throws IOException If fails.
      */
     public static Job getJob(String input, String output) throws IOException {
-        Configuration cfg = HadoopStartup.configuration();
-
-        Job job = Job.getInstance(cfg);
+        Job job = Job.getInstance();
 
         job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(IntWritable.class);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4d32feeb/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
index b4ed5e1..c894b76 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
@@ -28,6 +28,7 @@ import org.apache.ignite.testframework.junits.common.*;
 import org.jetbrains.annotations.*;
 
 import java.util.*;
+import java.util.concurrent.*;
 
 /**
  * Abstract class for maps test.
@@ -98,6 +99,21 @@ public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest {
         @Override public void cleanupTaskEnvironment() throws IgniteCheckedException {
             assert false;
         }
+
+        /**
+         * @param user
+         * @param callable
+         * @return
+         * @throws IgniteCheckedException
+         */
+        @Override public <T> T runAs(String user, Callable<T> callable) throws IgniteCheckedException {
+            try {
+                return callable.call();
+            }
+            catch (Exception e) {
+                throw new IgniteCheckedException(e);
+            }
+        }
     }
 
     /**