You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/10 16:11:23 UTC

[02/28] incubator-ignite git commit: ignite-545: merge from sprint-6

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
index 7dca049..f23c62c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsOutProc.java
@@ -81,6 +81,9 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
     /** IGFS name. */
     private final String igfs;
 
+    /** The user this out proc is performing on behalf of. */
+    private final String userName;
+
     /** Client log. */
     private final Log log;
 
@@ -100,8 +103,8 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
      * @param log Client logger.
      * @throws IOException If failed.
      */
-    public HadoopIgfsOutProc(String host, int port, String grid, String igfs, Log log) throws IOException {
-        this(host, port, grid, igfs, false, log);
+    public HadoopIgfsOutProc(String host, int port, String grid, String igfs, Log log, String user) throws IOException {
+        this(host, port, grid, igfs, false, log, user);
     }
 
     /**
@@ -113,8 +116,8 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
      * @param log Client logger.
      * @throws IOException If failed.
      */
-    public HadoopIgfsOutProc(int port, String grid, String igfs, Log log) throws IOException {
-        this(null, port, grid, igfs, true, log);
+    public HadoopIgfsOutProc(int port, String grid, String igfs, Log log, String user) throws IOException {
+        this(null, port, grid, igfs, true, log, user);
     }
 
     /**
@@ -128,7 +131,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
      * @param log Client logger.
      * @throws IOException If failed.
      */
-    private HadoopIgfsOutProc(String host, int port, String grid, String igfs, boolean shmem, Log log)
+    private HadoopIgfsOutProc(String host, int port, String grid, String igfs, boolean shmem, Log log, String user)
         throws IOException {
         assert host != null && !shmem || host == null && shmem :
             "Invalid arguments [host=" + host + ", port=" + port + ", shmem=" + shmem + ']';
@@ -138,6 +141,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
         this.grid = grid;
         this.igfs = igfs;
         this.log = log;
+        this.userName = IgfsUtils.fixUserName(user);
 
         io = HadoopIgfsIpcIo.get(log, endpoint);
 
@@ -173,6 +177,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
 
         msg.command(INFO);
         msg.path(path);
+        msg.userName(userName);
 
         return io.send(msg).chain(FILE_RES).get();
     }
@@ -184,6 +189,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
         msg.command(UPDATE);
         msg.path(path);
         msg.properties(props);
+        msg.userName(userName);
 
         return io.send(msg).chain(FILE_RES).get();
     }
@@ -196,6 +202,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
         msg.path(path);
         msg.accessTime(accessTime);
         msg.modificationTime(modificationTime);
+        msg.userName(userName);
 
         return io.send(msg).chain(BOOL_RES).get();
     }
@@ -207,6 +214,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
         msg.command(RENAME);
         msg.path(src);
         msg.destinationPath(dest);
+        msg.userName(userName);
 
         return io.send(msg).chain(BOOL_RES).get();
     }
@@ -218,6 +226,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
         msg.command(DELETE);
         msg.path(path);
         msg.flag(recursive);
+        msg.userName(userName);
 
         return io.send(msg).chain(BOOL_RES).get();
     }
@@ -231,6 +240,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
         msg.path(path);
         msg.start(start);
         msg.length(len);
+        msg.userName(userName);
 
         return io.send(msg).chain(BLOCK_LOCATION_COL_RES).get();
     }
@@ -241,6 +251,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
 
         msg.command(PATH_SUMMARY);
         msg.path(path);
+        msg.userName(userName);
 
         return io.send(msg).chain(SUMMARY_RES).get();
     }
@@ -252,6 +263,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
         msg.command(MAKE_DIRECTORIES);
         msg.path(path);
         msg.properties(props);
+        msg.userName(userName);
 
         return io.send(msg).chain(BOOL_RES).get();
     }
@@ -262,6 +274,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
 
         msg.command(LIST_FILES);
         msg.path(path);
+        msg.userName(userName);
 
         return io.send(msg).chain(FILE_COL_RES).get();
     }
@@ -272,6 +285,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
 
         msg.command(LIST_PATHS);
         msg.path(path);
+        msg.userName(userName);
 
         return io.send(msg).chain(PATH_COL_RES).get();
     }
@@ -288,6 +302,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
         msg.command(OPEN_READ);
         msg.path(path);
         msg.flag(false);
+        msg.userName(userName);
 
         IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get();
 
@@ -303,6 +318,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
         msg.path(path);
         msg.flag(true);
         msg.sequentialReadsBeforePrefetch(seqReadsBeforePrefetch);
+        msg.userName(userName);
 
         IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get();
 
@@ -321,6 +337,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
         msg.properties(props);
         msg.replication(replication);
         msg.blockSize(blockSize);
+        msg.userName(userName);
 
         Long streamId = io.send(msg).chain(LONG_RES).get();
 
@@ -336,6 +353,7 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
         msg.path(path);
         msg.flag(create);
         msg.properties(props);
+        msg.userName(userName);
 
         Long streamId = io.send(msg).chain(LONG_RES).get();
 
@@ -471,4 +489,9 @@ public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener
             }
         };
     }
+
+    /** {@inheritDoc} */
+    @Override public String user() {
+        return userName;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
index 1dada21..7d0db49 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
@@ -55,6 +55,9 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
     /** Logger. */
     private final Log log;
 
+    /** The user name this wrapper works on behalf of. */
+    private final String userName;
+
     /**
      * Constructor.
      *
@@ -63,13 +66,15 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
      * @param conf Configuration.
      * @param log Current logger.
      */
-    public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log) throws IOException {
+    public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log, String user)
+            throws IOException {
         try {
             this.authority = authority;
             this.endpoint = new HadoopIgfsEndpoint(authority);
             this.logDir = logDir;
             this.conf = conf;
             this.log = log;
+            this.userName = user;
         }
         catch (IgniteCheckedException e) {
             throw new IOException("Failed to parse endpoint: " + authority, e);
@@ -362,13 +367,14 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
                 HadoopIgfsEx hadoop = null;
 
                 try {
-                    hadoop = new HadoopIgfsInProc(igfs, log);
+                    hadoop = new HadoopIgfsInProc(igfs, log, userName);
 
                     curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
                 }
                 catch (IOException | IgniteCheckedException e) {
                     if (e instanceof HadoopIgfsCommunicationException)
-                        hadoop.close(true);
+                        if (hadoop != null)
+                            hadoop.close(true);
 
                     if (log.isDebugEnabled())
                         log.debug("Failed to connect to in-proc IGFS, fallback to IPC mode.", e);
@@ -384,7 +390,7 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
                 HadoopIgfsEx hadoop = null;
 
                 try {
-                    hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log);
+                    hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log, userName);
 
                     curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
                 }
@@ -409,7 +415,7 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
 
                 try {
                     hadoop = new HadoopIgfsOutProc(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.igfs(),
-                        log);
+                        log, userName);
 
                     curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
                 }
@@ -430,7 +436,8 @@ public class HadoopIgfsWrapper implements HadoopIgfs {
             HadoopIgfsEx hadoop = null;
 
             try {
-                hadoop = new HadoopIgfsOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(), log);
+                hadoop = new HadoopIgfsOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(),
+                    log, userName);
 
                 curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
index 2e04ac1..b170125 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
@@ -99,6 +99,22 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
 
     /** {@inheritDoc} */
     @Override public Void call() throws IgniteCheckedException {
+        ctx = job.getTaskContext(info);
+
+        return ctx.runAsJobOwner(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                call0();
+
+                return null;
+            }
+        });
+    }
+
+    /**
+     * Implements actual task running.
+     * @throws IgniteCheckedException
+     */
+    void call0() throws IgniteCheckedException {
         execStartTs = U.currentTimeMillis();
 
         Throwable err = null;
@@ -108,8 +124,6 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
         HadoopPerformanceCounter perfCntr = null;
 
         try {
-            ctx = job.getTaskContext(info);
-
             perfCntr = HadoopPerformanceCounter.getCounter(ctx.counters(), nodeId);
 
             perfCntr.onTaskSubmit(info, submitTs);
@@ -156,8 +170,6 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
             if (ctx != null)
                 ctx.cleanupTaskEnvironment();
         }
-
-        return null;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
index d265ca8..d754039 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.hadoop.v2;
 
 import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.mapred.JobID;
@@ -68,7 +67,7 @@ public class HadoopV2Job implements HadoopJob {
         new ConcurrentHashMap8<>();
 
     /** Pooling task context class and thus class loading environment. */
-    private final Queue<Class<?>> taskCtxClsPool = new ConcurrentLinkedQueue<>();
+    private final Queue<Class<? extends HadoopTaskContext>> taskCtxClsPool = new ConcurrentLinkedQueue<>();
 
     /** All created contexts. */
     private final Queue<Class<?>> fullCtxClsQueue = new ConcurrentLinkedDeque<>();
@@ -93,12 +92,7 @@ public class HadoopV2Job implements HadoopJob {
 
         hadoopJobID = new JobID(jobId.globalId().toString(), jobId.localId());
 
-        HadoopClassLoader clsLdr = (HadoopClassLoader)getClass().getClassLoader();
-
-        // Before create JobConf instance we should set new context class loader.
-        Thread.currentThread().setContextClassLoader(clsLdr);
-
-        jobConf = new JobConf();
+        jobConf = HadoopUtils.safeCreateJobConf();
 
         HadoopFileSystemsUtils.setupFileSystems(jobConf);
 
@@ -139,7 +133,9 @@ public class HadoopV2Job implements HadoopJob {
 
             Path jobDir = new Path(jobDirPath);
 
-            try (FileSystem fs = FileSystem.get(jobDir.toUri(), jobConf)) {
+            try {
+                FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf, true);
+
                 JobSplit.TaskSplitMetaInfo[] metaInfos = SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf,
                     jobDir);
 
@@ -197,7 +193,7 @@ public class HadoopV2Job implements HadoopJob {
         if (old != null)
             return old.get();
 
-        Class<?> cls = taskCtxClsPool.poll();
+        Class<? extends HadoopTaskContext> cls = taskCtxClsPool.poll();
 
         try {
             if (cls == null) {
@@ -205,9 +201,9 @@ public class HadoopV2Job implements HadoopJob {
                 // Note that the classloader identified by the task it was initially created for,
                 // but later it may be reused for other tasks.
                 HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath(),
-                    "hadoop-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber());
+                    "hadoop-task-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber());
 
-                cls = ldr.loadClass(HadoopV2TaskContext.class.getName());
+                cls = (Class<? extends HadoopTaskContext>)ldr.loadClass(HadoopV2TaskContext.class.getName());
 
                 fullCtxClsQueue.add(cls);
             }
@@ -325,7 +321,14 @@ public class HadoopV2Job implements HadoopJob {
 
     /** {@inheritDoc} */
     @Override public void cleanupStagingDirectory() {
-        if (rsrcMgr != null)
-            rsrcMgr.cleanupStagingDirectory();
+        rsrcMgr.cleanupStagingDirectory();
+    }
+
+    /**
+     * Getter for job configuration.
+     * @return The job configuration.
+     */
+    public JobConf jobConf() {
+        return jobConf;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
index 6f6bfa1..2f64e77 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
@@ -40,6 +40,9 @@ import java.util.*;
  * files are needed to be placed on local files system.
  */
 public class HadoopV2JobResourceManager {
+    /** File type Fs disable caching property name. */
+    private static final String FILE_DISABLE_CACHING_PROPERTY_NAME = HadoopUtils.disableFsCachePropertyName("file");
+
     /** Hadoop job context. */
     private final JobContextImpl ctx;
 
@@ -84,7 +87,7 @@ public class HadoopV2JobResourceManager {
         try {
             cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, dir.getAbsolutePath());
 
-            if(!cfg.getBoolean("fs.file.impl.disable.cache", false))
+            if (!cfg.getBoolean(FILE_DISABLE_CACHING_PROPERTY_NAME, false))
                 FileSystem.getLocal(cfg).setWorkingDirectory(new Path(dir.getAbsolutePath()));
         }
         finally {
@@ -112,15 +115,17 @@ public class HadoopV2JobResourceManager {
                 stagingDir = new Path(new URI(mrDir));
 
                 if (download) {
-                    FileSystem fs = FileSystem.get(stagingDir.toUri(), cfg);
+                    FileSystem fs = HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), cfg, true);
 
                     if (!fs.exists(stagingDir))
-                        throw new IgniteCheckedException("Failed to find map-reduce submission directory (does not exist): " +
-                            stagingDir);
+                        throw new IgniteCheckedException("Failed to find map-reduce submission " +
+                            "directory (does not exist): " + stagingDir);
 
                     if (!FileUtil.copy(fs, stagingDir, jobLocDir, false, cfg))
-                        throw new IgniteCheckedException("Failed to copy job submission directory contents to local file system " +
-                            "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath() + ", jobId=" + jobId + ']');
+                        throw new IgniteCheckedException("Failed to copy job submission directory "
+                            + "contents to local file system "
+                            + "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath()
+                            + ", jobId=" + jobId + ']');
                 }
 
                 File jarJobFile = new File(jobLocDir, "job.jar");
@@ -144,7 +149,8 @@ public class HadoopV2JobResourceManager {
                 }
             }
             else if (!jobLocDir.mkdirs())
-                throw new IgniteCheckedException("Failed to create local job directory: " + jobLocDir.getAbsolutePath());
+                throw new IgniteCheckedException("Failed to create local job directory: "
+                    + jobLocDir.getAbsolutePath());
 
             setLocalFSWorkingDirectory(jobLocDir);
         }
@@ -204,14 +210,14 @@ public class HadoopV2JobResourceManager {
 
             FileSystem dstFs = FileSystem.getLocal(cfg);
 
-            FileSystem srcFs = srcPath.getFileSystem(cfg);
+            FileSystem srcFs = HadoopUtils.fileSystemForMrUser(srcPath.toUri(), cfg, true);
 
             if (extract) {
                 File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives");
 
                 if (!archivesPath.exists() && !archivesPath.mkdir())
                     throw new IOException("Failed to create directory " +
-                         "[path=" + archivesPath + ", jobId=" + jobId + ']');
+                        "[path=" + archivesPath + ", jobId=" + jobId + ']');
 
                 File archiveFile = new File(archivesPath, locName);
 
@@ -287,7 +293,7 @@ public class HadoopV2JobResourceManager {
     public void cleanupStagingDirectory() {
         try {
             if (stagingDir != null)
-                stagingDir.getFileSystem(ctx.getJobConf()).delete(stagingDir, true);
+                HadoopUtils.fileSystemForMrUser(stagingDir.toUri(), ctx.getJobConf(), true).delete(stagingDir, true);
         }
         catch (Exception e) {
             log.error("Failed to remove job staging directory [path=" + stagingDir + ", jobId=" + jobId + ']' , e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
index e9c859bd..e89feba 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -28,17 +28,21 @@ import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapred.TaskID;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.security.*;
 import org.apache.ignite.*;
 import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.processors.hadoop.counter.*;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.processors.hadoop.fs.*;
 import org.apache.ignite.internal.processors.hadoop.v1.*;
+import org.apache.ignite.internal.processors.igfs.*;
+import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
+import java.security.*;
 import java.util.*;
+import java.util.concurrent.*;
 
 import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*;
 import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
@@ -239,9 +243,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
         Thread.currentThread().setContextClassLoader(jobConf().getClassLoader());
 
         try {
-            FileSystem fs = FileSystem.get(jobConf());
-
-            HadoopFileSystemsUtils.setUser(fs, jobConf().getUser());
+            FileSystem.get(jobConf());
 
             LocalFileSystem locFs = FileSystem.getLocal(jobConf());
 
@@ -421,7 +423,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
     private Object readExternalSplit(HadoopExternalSplit split) throws IgniteCheckedException {
         Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR));
 
-        try (FileSystem fs = FileSystem.get(jobDir.toUri(), jobConf());
+        try (FileSystem fs = fileSystemForMrUser(jobDir.toUri(), jobConf(), false);
             FSDataInputStream in = fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) {
 
             in.seek(split.offset());
@@ -450,4 +452,44 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
             throw new IgniteCheckedException(e);
         }
     }
+
+    /** {@inheritDoc} */
+    @Override public <T> T runAsJobOwner(final Callable<T> c) throws IgniteCheckedException {
+        String user = job.info().user();
+
+        user = IgfsUtils.fixUserName(user);
+
+        assert user != null;
+
+        String ugiUser;
+
+        try {
+            UserGroupInformation currUser = UserGroupInformation.getCurrentUser();
+
+            assert currUser != null;
+
+            ugiUser = currUser.getShortUserName();
+        }
+        catch (IOException ioe) {
+            throw new IgniteCheckedException(ioe);
+        }
+
+        try {
+            if (F.eq(user, ugiUser))
+                // if current UGI context user is the same, do direct call:
+                return c.call();
+            else {
+                UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, user);
+
+                return ugi.doAs(new PrivilegedExceptionAction<T>() {
+                    @Override public T run() throws Exception {
+                        return c.call();
+                    }
+                });
+            }
+        }
+        catch (Exception e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
index b94d9d1..b9f8179 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/client/hadoop/HadoopClientProtocolSelfTest.java
@@ -28,7 +28,6 @@ import org.apache.ignite.*;
 import org.apache.ignite.hadoop.mapreduce.*;
 import org.apache.ignite.igfs.*;
 import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.proto.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -449,7 +448,7 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
      * @return Configuration.
      */
     private Configuration config(int port) {
-        Configuration conf = new Configuration();
+        Configuration conf = HadoopUtils.safeCreateConfiguration();
 
         setupFileSystems(conf);
 
@@ -521,9 +520,8 @@ public class HadoopClientProtocolSelfTest extends HadoopAbstractSelfTest {
             ctx.getCounter(TestCounter.COUNTER2).increment(1);
 
             int sum = 0;
-            for (IntWritable value : values) {
+            for (IntWritable value : values)
                 sum += value.get();
-            }
 
             ctx.write(key, new IntWritable(sum));
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java
index d11cabb..9bcd5de 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopIgfs20FileSystemAbstractSelfTest.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.security.*;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
@@ -39,6 +40,7 @@ import org.jsr166.*;
 
 import java.io.*;
 import java.net.*;
+import java.security.*;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
@@ -58,6 +60,9 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
     /** Thread count for multithreaded tests. */
     private static final int THREAD_CNT = 8;
 
+    /** Secondary file system user. */
+    private static final String SECONDARY_FS_USER = "secondary-default";
+
     /** IP finder. */
     private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
 
@@ -255,7 +260,7 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
 
         if (mode != PRIMARY)
             cfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem(secondaryFileSystemUriPath(),
-                secondaryFileSystemConfigPath()));
+                secondaryFileSystemConfigPath(), SECONDARY_FS_USER));
 
         cfg.setIpcEndpointConfiguration(primaryIpcEndpointConfiguration(gridName));
         cfg.setManagementPort(-1);
@@ -278,11 +283,28 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
 
         primaryFsCfg.addResource(U.resolveIgniteUrl(primaryFileSystemConfigPath()));
 
-        fs = AbstractFileSystem.get(primaryFsUri, primaryFsCfg);
+        UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, getClientFsUser());
+
+        // Create Fs on behalf of the client user:
+        ugi.doAs(new PrivilegedExceptionAction<Object>() {
+            @Override public Object run() throws Exception {
+                fs = AbstractFileSystem.get(primaryFsUri, primaryFsCfg);
+
+                return null;
+            }
+        });
 
         barrier = new CyclicBarrier(THREAD_CNT);
     }
 
+    /**
+     * Gets the user the Fs client operates on bahalf of.
+     * @return The user the Fs client operates on bahalf of.
+     */
+    protected String getClientFsUser() {
+        return "foo";
+    }
+
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         try {
@@ -297,14 +319,17 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
 
     /** @throws Exception If failed. */
     public void testStatus() throws Exception {
+        Path file1 = new Path("/file1");
 
-        try (FSDataOutputStream file = fs.create(new Path("/file1"), EnumSet.noneOf(CreateFlag.class),
+        try (FSDataOutputStream file = fs.create(file1, EnumSet.noneOf(CreateFlag.class),
             Options.CreateOpts.perms(FsPermission.getDefault()))) {
             file.write(new byte[1024 * 1024]);
         }
 
         FsStatus status = fs.getFsStatus();
 
+        assertEquals(getClientFsUser(), fs.getFileStatus(file1).getOwner());
+
         assertEquals(4, grid(0).cluster().nodes().size());
 
         long used = 0, max = 0;
@@ -707,6 +732,8 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
 
         os.close();
 
+        assertEquals(getClientFsUser(), fs.getFileStatus(file).getOwner());
+
         fs.setOwner(file, "aUser", "aGroup");
 
         assertEquals("aUser", fs.getFileStatus(file).getOwner());
@@ -796,20 +823,20 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
 
         int cnt = 2 * 1024;
 
-        FSDataOutputStream out = fs.create(file, EnumSet.noneOf(CreateFlag.class),
-            Options.CreateOpts.perms(FsPermission.getDefault()));
+        try (FSDataOutputStream out = fs.create(file, EnumSet.noneOf(CreateFlag.class),
+            Options.CreateOpts.perms(FsPermission.getDefault()))) {
 
-        for (long i = 0; i < cnt; i++)
-            out.writeLong(i);
+            for (long i = 0; i < cnt; i++)
+                out.writeLong(i);
+        }
 
-        out.close();
+        assertEquals(getClientFsUser(), fs.getFileStatus(file).getOwner());
 
-        FSDataInputStream in = fs.open(file, 1024);
+        try (FSDataInputStream in = fs.open(file, 1024)) {
 
-        for (long i = 0; i < cnt; i++)
-            assertEquals(i, in.readLong());
-
-        in.close();
+            for (long i = 0; i < cnt; i++)
+                assertEquals(i, in.readLong());
+        }
     }
 
     /** @throws Exception If failed. */
@@ -1191,6 +1218,9 @@ public abstract class HadoopIgfs20FileSystemAbstractSelfTest extends IgfsCommonA
 
         assertEquals(dirPerm, fs.getFileStatus(dir).getPermission());
         assertEquals(nestedDirPerm, fs.getFileStatus(nestedDir).getPermission());
+
+        assertEquals(getClientFsUser(), fs.getFileStatus(dir).getOwner());
+        assertEquals(getClientFsUser(), fs.getFileStatus(nestedDir).getOwner());
     }
 
     /** @throws Exception If failed. */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
index 9e84c51..b089995 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopSecondaryFileSystemConfigurationTest.java
@@ -162,9 +162,9 @@ public class HadoopSecondaryFileSystemConfigurationTest extends IgfsCommonAbstra
             primaryConfFullPath = null;
 
         SecondaryFileSystemProvider provider =
-            new SecondaryFileSystemProvider(primaryFsUriStr, primaryConfFullPath, null);
+            new SecondaryFileSystemProvider(primaryFsUriStr, primaryConfFullPath);
 
-        primaryFs = provider.createFileSystem();
+        primaryFs = provider.createFileSystem(null);
 
         primaryFsUri = provider.uri();
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java
index d3440fc..c0f73af 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsNearOnlyMultiNodeSelfTest.java
@@ -73,10 +73,7 @@ public class IgfsNearOnlyMultiNodeSelfTest extends GridCommonAbstractTest {
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
-        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-        discoSpi.setIpFinder(IP_FINDER);
-
-        cfg.setDiscoverySpi(discoSpi);
+        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER).setForceServerMode(true));
 
         FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
index e8ef414..f215efb 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemAbstractSelfTest.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.security.*;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
@@ -43,6 +44,7 @@ import org.jsr166.*;
 import java.io.*;
 import java.lang.reflect.*;
 import java.net.*;
+import java.security.*;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
@@ -72,6 +74,9 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
     /** Secondary file system configuration path. */
     private static final String SECONDARY_CFG_PATH = "/work/core-site-test.xml";
 
+    /** Secondary file system user. */
+    private static final String SECONDARY_FS_USER = "secondary-default";
+
     /** Secondary endpoint configuration. */
     protected static final IgfsIpcEndpointConfiguration SECONDARY_ENDPOINT_CFG;
 
@@ -145,6 +150,14 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
         endpoint = skipLocShmem ? "127.0.0.1:10500" : "shmem:10500";
     }
 
+    /**
+     * Gets the user the Fs client operates on bahalf of.
+     * @return The user the Fs client operates on bahalf of.
+     */
+    protected String getClientFsUser() {
+        return "foo";
+    }
+
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
         Configuration secondaryConf = configuration(SECONDARY_AUTHORITY, true, true);
@@ -235,7 +248,17 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
 
         primaryFsCfg = configuration(PRIMARY_AUTHORITY, skipEmbed, skipLocShmem);
 
-        fs = FileSystem.get(primaryFsUri, primaryFsCfg);
+        UserGroupInformation clientUgi = UserGroupInformation.getBestUGI(null, getClientFsUser());
+        assertNotNull(clientUgi);
+
+        // Create the Fs on behalf of the specific user:
+        clientUgi.doAs(new PrivilegedExceptionAction<Object>() {
+            @Override public Object run() throws Exception {
+                fs = FileSystem.get(primaryFsUri, primaryFsCfg);
+
+                return null;
+            }
+        });
 
         barrier = new CyclicBarrier(THREAD_CNT);
     }
@@ -324,7 +347,8 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
         cfg.setDefaultMode(mode);
 
         if (mode != PRIMARY)
-            cfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG_PATH));
+            cfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem(
+                SECONDARY_URI, SECONDARY_CFG_PATH, SECONDARY_FS_USER));
 
         cfg.setIpcEndpointConfiguration(primaryIpcEndpointConfiguration(gridName));
 
@@ -870,6 +894,8 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
 
         os.close();
 
+        assertEquals(getClientFsUser(), fs.getFileStatus(file).getOwner());
+
         fs.setOwner(file, "aUser", "aGroup");
 
         assertEquals("aUser", fs.getFileStatus(file).getOwner());
@@ -1001,19 +1027,19 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
 
         int cnt = 2 * 1024;
 
-        FSDataOutputStream out = fs.create(file, true, 1024);
-
-        for (long i = 0; i < cnt; i++)
-            out.writeLong(i);
+        try (FSDataOutputStream out = fs.create(file, true, 1024)) {
 
-        out.close();
+            for (long i = 0; i < cnt; i++)
+                out.writeLong(i);
+        }
 
-        FSDataInputStream in = fs.open(file, 1024);
+        assertEquals(getClientFsUser(), fs.getFileStatus(file).getOwner());
 
-        for (long i = 0; i < cnt; i++)
-            assertEquals(i, in.readLong());
+        try (FSDataInputStream in = fs.open(file, 1024)) {
 
-        in.close();
+            for (long i = 0; i < cnt; i++)
+                assertEquals(i, in.readLong());
+        }
     }
 
     /** @throws Exception If failed. */
@@ -1344,7 +1370,7 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
 
         String path = fs.getFileStatus(file).getPath().toString();
 
-        assertTrue(path.endsWith("/user/" + System.getProperty("user.name", "anonymous") + "/file"));
+        assertTrue(path.endsWith("/user/" + getClientFsUser() + "/file"));
     }
 
     /** @throws Exception If failed. */
@@ -1374,7 +1400,7 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
     public void testGetWorkingDirectoryIfDefault() throws Exception {
         String path = fs.getWorkingDirectory().toString();
 
-        assertTrue(path.endsWith("/user/" + System.getProperty("user.name", "anonymous")));
+        assertTrue(path.endsWith("/user/" + getClientFsUser()));
     }
 
     /** @throws Exception If failed. */
@@ -1412,17 +1438,20 @@ public abstract class IgniteHadoopFileSystemAbstractSelfTest extends IgfsCommonA
     @SuppressWarnings("OctalInteger")
     public void testMkdirs() throws Exception {
         Path fsHome = new Path(PRIMARY_URI);
-        Path dir = new Path(fsHome, "/tmp/staging");
-        Path nestedDir = new Path(dir, "nested");
+        final Path dir = new Path(fsHome, "/tmp/staging");
+        final Path nestedDir = new Path(dir, "nested");
 
-        FsPermission dirPerm = FsPermission.createImmutable((short)0700);
-        FsPermission nestedDirPerm = FsPermission.createImmutable((short)111);
+        final FsPermission dirPerm = FsPermission.createImmutable((short)0700);
+        final FsPermission nestedDirPerm = FsPermission.createImmutable((short)111);
 
         assertTrue(fs.mkdirs(dir, dirPerm));
         assertTrue(fs.mkdirs(nestedDir, nestedDirPerm));
 
         assertEquals(dirPerm, fs.getFileStatus(dir).getPermission());
         assertEquals(nestedDirPerm, fs.getFileStatus(nestedDir).getPermission());
+
+        assertEquals(getClientFsUser(), fs.getFileStatus(dir).getOwner());
+        assertEquals(getClientFsUser(), fs.getFileStatus(nestedDir).getOwner());
     }
 
     /** @throws Exception If failed. */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java
index b92b213..fcfd587 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemClientSelfTest.java
@@ -125,7 +125,7 @@ public class IgniteHadoopFileSystemClientSelfTest extends IgfsCommonAbstractTest
         try {
             switchHandlerErrorFlag(true);
 
-            HadoopIgfs client = new HadoopIgfsOutProc("127.0.0.1", 10500, getTestGridName(0), "igfs", LOG);
+            HadoopIgfs client = new HadoopIgfsOutProc("127.0.0.1", 10500, getTestGridName(0), "igfs", LOG, null);
 
             client.handshake(null);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java
index e103c5f..2c17ba9 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgniteHadoopFileSystemIpcCacheSelfTest.java
@@ -144,6 +144,8 @@ public class IgniteHadoopFileSystemIpcCacheSelfTest extends IgfsCommonAbstractTe
 
         Map<String, HadoopIgfsIpcIo> cache = (Map<String, HadoopIgfsIpcIo>)cacheField.get(null);
 
+        cache.clear(); // avoid influence of previous tests in the same process.
+
         String name = "igfs:" + getTestGridName(0) + "@";
 
         Configuration cfg = new Configuration();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
index af1a1e1..e8a0a6f 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java
@@ -22,7 +22,6 @@ import org.apache.ignite.configuration.*;
 import org.apache.ignite.igfs.*;
 import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem;
 import org.apache.ignite.internal.processors.hadoop.fs.*;
-import org.apache.ignite.spi.communication.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
@@ -62,6 +61,17 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest {
     /** Initial REST port. */
     private int restPort = REST_PORT;
 
+    /** Secondary file system REST endpoint configuration. */
+    protected static final IgfsIpcEndpointConfiguration SECONDARY_REST_CFG;
+
+    static {
+        SECONDARY_REST_CFG = new IgfsIpcEndpointConfiguration();
+
+        SECONDARY_REST_CFG.setType(IgfsIpcEndpointType.TCP);
+        SECONDARY_REST_CFG.setPort(11500);
+    }
+
+
     /** Initial classpath. */
     private static String initCp;
 
@@ -133,7 +143,7 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest {
     /**
      * @return IGFS configuration.
      */
-    public FileSystemConfiguration igfsConfiguration() {
+    public FileSystemConfiguration igfsConfiguration() throws Exception {
         FileSystemConfiguration cfg = new FileSystemConfiguration();
 
         cfg.setName(igfsName);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java
index d10ee5c..c66cdf3 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java
@@ -19,12 +19,16 @@ package org.apache.ignite.internal.processors.hadoop;
 
 import com.google.common.base.*;
 import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
 import org.apache.ignite.hadoop.fs.*;
 import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
+import org.apache.ignite.internal.processors.resource.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.testframework.junits.common.*;
 import org.jsr166.*;
 
@@ -205,7 +209,15 @@ public class HadoopCommandLineTest extends GridCommonAbstractTest {
 
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
-        igfs = (IgfsEx) Ignition.start("config/hadoop/default-config.xml").fileSystem(igfsName);
+        String cfgPath = "config/hadoop/default-config.xml";
+
+        IgniteBiTuple<IgniteConfiguration, GridSpringResourceContext> tup = IgnitionEx.loadConfiguration(cfgPath);
+
+        IgniteConfiguration cfg = tup.get1();
+
+        cfg.setLocalHost("127.0.0.1"); // Avoid connecting to other nodes.
+
+        igfs = (IgfsEx) Ignition.start(cfg).fileSystem(igfsName);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java
index 8cf31a2..5f90bd4 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopFileSystemsTest.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.*;
  * Test file systems for the working directory multi-threading support.
  */
 public class HadoopFileSystemsTest extends HadoopAbstractSelfTest {
+    /** the number of threads */
     private static final int THREAD_COUNT = 3;
 
     /** {@inheritDoc} */
@@ -87,10 +88,6 @@ public class HadoopFileSystemsTest extends HadoopAbstractSelfTest {
                 try {
                     int curThreadNum = threadNum.getAndIncrement();
 
-                    FileSystem fs = FileSystem.get(uri, cfg);
-
-                    HadoopFileSystemsUtils.setUser(fs, "user" + curThreadNum);
-
                     if ("file".equals(uri.getScheme()))
                         FileSystem.get(uri, cfg).setWorkingDirectory(new Path("file:///user/user" + curThreadNum));
 
@@ -149,24 +146,6 @@ public class HadoopFileSystemsTest extends HadoopAbstractSelfTest {
     }
 
     /**
-     * Test IGFS multi-thread working directory.
-     *
-     * @throws Exception If fails.
-     */
-    public void testIgfs() throws Exception {
-        testFileSystem(URI.create(igfsScheme()));
-    }
-
-    /**
-     * Test HDFS multi-thread working directory.
-     *
-     * @throws Exception If fails.
-     */
-    public void testHdfs() throws Exception {
-        testFileSystem(URI.create("hdfs://localhost/"));
-    }
-
-    /**
      * Test LocalFS multi-thread working directory.
      *
      * @throws Exception If fails.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
index 8a3a0ac..66c14b5 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java
@@ -24,31 +24,104 @@ import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
 import org.apache.ignite.hadoop.fs.*;
 import org.apache.ignite.igfs.*;
+import org.apache.ignite.igfs.secondary.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.hadoop.counter.*;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
 import org.apache.ignite.internal.processors.hadoop.examples.*;
+import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 import org.apache.ignite.testframework.*;
+import org.jetbrains.annotations.*;
 
 import java.io.*;
 import java.util.*;
 
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.igfs.IgfsMode.*;
 import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
 
 /**
  * Test of whole cycle of map-reduce processing via Job tracker.
  */
 public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
+    /** IGFS block size. */
+    protected static final int IGFS_BLOCK_SIZE = 512 * 1024;
+
+    /** Amount of blocks to prefetch. */
+    protected static final int PREFETCH_BLOCKS = 1;
+
+    /** Amount of sequential block reads before prefetch is triggered. */
+    protected static final int SEQ_READS_BEFORE_PREFETCH = 2;
+
+    /** Secondary file system URI. */
+    protected static final String SECONDARY_URI = "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/";
+
+    /** Secondary file system configuration path. */
+    protected static final String SECONDARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml";
+
+    /** The user to run Hadoop job on behalf of. */
+    protected static final String USER = "vasya";
+
+    /** Secondary IGFS name. */
+    protected static final String SECONDARY_IGFS_NAME = "igfs-secondary";
+
+    /** The secondary Ignite node. */
+    protected Ignite igniteSecondary;
+
+    /** The secondary Fs. */
+    protected IgfsSecondaryFileSystem secondaryFs;
+
     /** {@inheritDoc} */
     @Override protected int gridCount() {
         return 3;
     }
 
     /**
+     * Gets owner of a IgfsEx path.
+     * @param p The path.
+     * @return The owner.
+     */
+    private static String getOwner(IgfsEx i, IgfsPath p) {
+        return i.info(p).property(IgfsEx.PROP_USER_NAME);
+    }
+
+    /**
+     * Gets owner of a secondary Fs path.
+     * @param secFs The sec Fs.
+     * @param p The path.
+     * @return The owner.
+     */
+    private static String getOwnerSecondary(final IgfsSecondaryFileSystem secFs, final IgfsPath p) {
+        return IgfsUserContext.doAs(USER, new IgniteOutClosure<String>() {
+            @Override public String apply() {
+                return secFs.info(p).property(IgfsEx.PROP_USER_NAME);
+            }
+        });
+    }
+
+    /**
+     * Checks owner of the path.
+     * @param p The path.
+     */
+    private void checkOwner(IgfsPath p) {
+        String ownerPrim = getOwner(igfs, p);
+        assertEquals(USER, ownerPrim);
+
+        String ownerSec = getOwnerSecondary(secondaryFs, p);
+        assertEquals(USER, ownerSec);
+    }
+
+    /**
      * Tests whole job execution with all phases in all combination of new and old versions of API.
      * @throws Exception If fails.
      */
@@ -59,9 +132,14 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
 
         IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input");
 
-        generateTestFile(inFile.toString(), "red", 100000, "blue", 200000, "green", 150000, "yellow", 70000 );
+        final int red = 10_000;
+        final int blue = 20_000;
+        final int green = 15_000;
+        final int yellow = 7_000;
 
-        for (int i = 0; i < 8; i++) {
+        generateTestFile(inFile.toString(), "red", red, "blue", blue, "green", green, "yellow", yellow );
+
+        for (int i = 0; i < 3; i++) {
             igfs.delete(new IgfsPath(PATH_OUTPUT), true);
 
             boolean useNewMapper = (i & 1) == 0;
@@ -71,7 +149,7 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
             JobConf jobConf = new JobConf();
 
             jobConf.set(JOB_COUNTER_WRITER_PROPERTY, IgniteHadoopFileSystemCounterWriter.class.getName());
-            jobConf.setUser("yyy");
+            jobConf.setUser(USER);
             jobConf.set(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY, "/xxx/${USER}/zzz");
 
             //To split into about 40 items for v2
@@ -105,13 +183,19 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
 
             checkJobStatistics(jobId);
 
+            final String outFile = PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000";
+
+            checkOwner(new IgfsPath(PATH_OUTPUT + "/" + "_SUCCESS"));
+
+            checkOwner(new IgfsPath(outFile));
+
             assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " +
                 useNewReducer,
-                "blue\t200000\n" +
-                "green\t150000\n" +
-                "red\t100000\n" +
-                "yellow\t70000\n",
-                readAndSortFile(PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000")
+                "blue\t" + blue + "\n" +
+                "green\t" + green + "\n" +
+                "red\t" + red + "\n" +
+                "yellow\t" + yellow + "\n",
+                readAndSortFile(outFile)
             );
         }
     }
@@ -182,7 +266,7 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
             }
         }
 
-        final IgfsPath statPath = new IgfsPath("/xxx/yyy/zzz/" + jobId + "/performance");
+        final IgfsPath statPath = new IgfsPath("/xxx/" + USER + "/zzz/" + jobId + "/performance");
 
         assert GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
@@ -212,4 +296,85 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest {
                 ", actual=" + HadoopTestUtils.simpleCheckJobStatFile(reader) + ']';
         }
     }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        igniteSecondary = startGridWithIgfs("grid-secondary", SECONDARY_IGFS_NAME, PRIMARY, null, SECONDARY_REST_CFG);
+
+        super.beforeTest();
+    }
+
+    /**
+     * Start grid with IGFS.
+     *
+     * @param gridName Grid name.
+     * @param igfsName IGFS name
+     * @param mode IGFS mode.
+     * @param secondaryFs Secondary file system (optional).
+     * @param restCfg Rest configuration string (optional).
+     * @return Started grid instance.
+     * @throws Exception If failed.
+     */
+    protected Ignite startGridWithIgfs(String gridName, String igfsName, IgfsMode mode,
+        @Nullable IgfsSecondaryFileSystem secondaryFs, @Nullable IgfsIpcEndpointConfiguration restCfg) throws Exception {
+        FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
+
+        igfsCfg.setDataCacheName("dataCache");
+        igfsCfg.setMetaCacheName("metaCache");
+        igfsCfg.setName(igfsName);
+        igfsCfg.setBlockSize(IGFS_BLOCK_SIZE);
+        igfsCfg.setDefaultMode(mode);
+        igfsCfg.setIpcEndpointConfiguration(restCfg);
+        igfsCfg.setSecondaryFileSystem(secondaryFs);
+        igfsCfg.setPrefetchBlocks(PREFETCH_BLOCKS);
+        igfsCfg.setSequentialReadsBeforePrefetch(SEQ_READS_BEFORE_PREFETCH);
+
+        CacheConfiguration dataCacheCfg = defaultCacheConfiguration();
+
+        dataCacheCfg.setName("dataCache");
+        dataCacheCfg.setCacheMode(PARTITIONED);
+        dataCacheCfg.setNearConfiguration(null);
+        dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2));
+        dataCacheCfg.setBackups(0);
+        dataCacheCfg.setAtomicityMode(TRANSACTIONAL);
+        dataCacheCfg.setOffHeapMaxMemory(0);
+
+        CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
+
+        metaCacheCfg.setName("metaCache");
+        metaCacheCfg.setCacheMode(REPLICATED);
+        metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+        IgniteConfiguration cfg = new IgniteConfiguration();
+
+        cfg.setGridName(gridName);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
+
+        cfg.setDiscoverySpi(discoSpi);
+        cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg);
+        cfg.setFileSystemConfiguration(igfsCfg);
+
+        cfg.setLocalHost("127.0.0.1");
+        cfg.setConnectorConfiguration(null);
+
+        return G.start(cfg);
+    }
+
+    /**
+     * @return IGFS configuration.
+     */
+    @Override public FileSystemConfiguration igfsConfiguration() throws Exception {
+        FileSystemConfiguration fsCfg = super.igfsConfiguration();
+
+        secondaryFs = new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG);
+
+        fsCfg.setSecondaryFileSystem(secondaryFs);
+
+        return fsCfg;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
index 8dc9830..eee5c8b 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
@@ -72,7 +72,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
 
 
     /** {@inheritDoc} */
-    @Override public FileSystemConfiguration igfsConfiguration() {
+    @Override public FileSystemConfiguration igfsConfiguration() throws Exception {
         FileSystemConfiguration cfg = super.igfsConfiguration();
 
         cfg.setFragmentizerEnabled(false);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
index aaf0f92..6930020 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.io.*;
 import org.apache.ignite.*;
 import org.apache.ignite.igfs.*;
 import org.apache.ignite.internal.processors.hadoop.examples.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
 
 import java.io.*;
 import java.net.*;
@@ -43,7 +42,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
      * @return Hadoop job.
      * @throws IOException If fails.
      */
-    public abstract HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception;
+    public abstract HadoopJob getHadoopJob(String inFile, String outFile) throws Exception;
 
     /**
      * @return prefix of reducer output file name. It's "part-" for v1 and "part-r-" for v2 API
@@ -79,7 +78,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
         HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, fileBlock1.length(),
                 igfs.info(inFile).length() - fileBlock1.length());
 
-        HadoopV2Job gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT);
+        HadoopJob gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT);
 
         HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock1);
 
@@ -110,7 +109,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
      * @return Context with mock output.
      * @throws IgniteCheckedException If fails.
      */
-    private HadoopTestTaskContext runTaskWithInput(HadoopV2Job gridJob, HadoopTaskType taskType,
+    private HadoopTestTaskContext runTaskWithInput(HadoopJob gridJob, HadoopTaskType taskType,
         int taskNum, String... words) throws IgniteCheckedException {
         HadoopTaskInfo taskInfo = new HadoopTaskInfo(taskType, gridJob.id(), taskNum, 0, null);
 
@@ -136,7 +135,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
      * @throws Exception If fails.
      */
     public void testReduceTask() throws Exception {
-        HadoopV2Job gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT);
+        HadoopJob gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT);
 
         runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 0, "word1", "5", "word2", "10");
         runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 1, "word3", "7", "word4", "15");
@@ -162,7 +161,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
      * @throws Exception If fails.
      */
     public void testCombinerTask() throws Exception {
-        HadoopV2Job gridJob = getHadoopJob("/", "/");
+        HadoopJob gridJob = getHadoopJob("/", "/");
 
         HadoopTestTaskContext ctx =
             runTaskWithInput(gridJob, HadoopTaskType.COMBINE, 0, "word1", "5", "word2", "10");
@@ -182,7 +181,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
      * @return Context of combine task with mock output.
      * @throws IgniteCheckedException If fails.
      */
-    private HadoopTestTaskContext runMapCombineTask(HadoopFileBlock fileBlock, HadoopV2Job gridJob)
+    private HadoopTestTaskContext runMapCombineTask(HadoopFileBlock fileBlock, HadoopJob gridJob)
         throws IgniteCheckedException {
         HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock);
 
@@ -228,7 +227,7 @@ abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
         HadoopFileBlock fileBlock1 = new HadoopFileBlock(HOSTS, inFileUri, 0, l);
         HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, l, fileLen - l);
 
-        HadoopV2Job gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT);
+        HadoopJob gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT);
 
         HadoopTestTaskContext combine1Ctx = runMapCombineTask(fileBlock1, gridJob);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
index b41a260..48e83cc 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.hadoop;
 
 import org.apache.hadoop.mapred.*;
 import org.apache.ignite.internal.processors.hadoop.examples.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
 
 import java.io.*;
 import java.util.*;
@@ -38,7 +37,7 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest {
      * @return Hadoop job.
      * @throws IOException If fails.
      */
-    @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception {
+    @Override public HadoopJob getHadoopJob(String inFile, String outFile) throws Exception {
         JobConf jobConf = HadoopWordCount1.getJob(inFile, outFile);
 
         setupFileSystems(jobConf);
@@ -47,7 +46,7 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest {
 
         HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0);
 
-        return new HadoopV2Job(jobId, jobInfo, log);
+        return jobInfo.createJob(jobId, log);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
index b677c63..e73fae3 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.mapreduce.lib.input.*;
 import org.apache.hadoop.mapreduce.lib.output.*;
 import org.apache.ignite.internal.processors.hadoop.examples.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
 
 import java.util.*;
 
@@ -42,7 +41,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest {
      * @return Hadoop job.
      * @throws Exception if fails.
      */
-    @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception {
+    @Override public HadoopJob getHadoopJob(String inFile, String outFile) throws Exception {
         Job job = Job.getInstance();
 
         job.setOutputKeyClass(Text.class);
@@ -65,7 +64,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest {
 
         HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0);
 
-        return new HadoopV2Job(jobId, jobInfo, log);
+        return jobInfo.createJob(jobId, log);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
index ebc89f4..f3b9307 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
@@ -66,7 +66,11 @@ public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest {
         cfg.setMapOutputValueClass(Text.class);
         cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName());
 
-        HadoopJob job = new HadoopV2Job(new HadoopJobId(UUID.randomUUID(), 1), createJobInfo(cfg), log);
+        HadoopDefaultJobInfo info = createJobInfo(cfg);
+
+        HadoopJobId id = new HadoopJobId(UUID.randomUUID(), 1);
+
+        HadoopJob job = info.createJob(id, log);
 
         HadoopTaskContext taskCtx = job.getTaskContext(new HadoopTaskInfo(HadoopTaskType.MAP, null, 0, 0,
             null));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
index b4ed5e1..9395c5e 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
@@ -28,6 +28,7 @@ import org.apache.ignite.testframework.junits.common.*;
 import org.jetbrains.annotations.*;
 
 import java.util.*;
+import java.util.concurrent.*;
 
 /**
  * Abstract class for maps test.
@@ -95,9 +96,20 @@ public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest {
             assert false;
         }
 
+        /** {@inheritDoc} */
         @Override public void cleanupTaskEnvironment() throws IgniteCheckedException {
             assert false;
         }
+
+        /** {@inheritDoc} */
+        @Override public <T> T runAsJobOwner(Callable<T> c) throws IgniteCheckedException {
+            try {
+                return c.call();
+            }
+            catch (Exception e) {
+                throw new IgniteCheckedException(e);
+            }
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java
index 8a046e0..89bf830 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java
@@ -61,10 +61,10 @@ public class HadoopSkipListSelfTest extends HadoopAbstractMapTest {
 
             int sigma = max((int)ceil(precission * exp), 5);
 
-            X.println("Level: " + level + " exp: " + exp + " act: " + levelsCnts[level] + " precission: " + precission +
+            X.println("Level: " + level + " exp: " + exp + " act: " + levelsCnts[level] + " precision: " + precission +
                 " sigma: " + sigma);
 
-            assertTrue(abs(exp - levelsCnts[level]) <= sigma);
+            assertTrue(abs(exp - levelsCnts[level]) <= sigma); // Sometimes fails.
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheHibernateStoreSessionListener.java
----------------------------------------------------------------------
diff --git a/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheHibernateStoreSessionListener.java b/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheHibernateStoreSessionListener.java
new file mode 100644
index 0000000..cfad322
--- /dev/null
+++ b/modules/hibernate/src/main/java/org/apache/ignite/cache/store/hibernate/CacheHibernateStoreSessionListener.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.hibernate;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lifecycle.*;
+import org.apache.ignite.resources.*;
+import org.hibernate.*;
+import org.hibernate.cfg.*;
+
+import javax.cache.integration.*;
+import java.io.*;
+import java.net.*;
+
+/**
+ * Hibernate-based cache store session listener.
+ * <p>
+ * This listener creates a new Hibernate session for each store
+ * session. If there is an ongoing cache transaction, a corresponding
+ * Hibernate transaction is created as well.
+ * <p>
+ * The Hibernate session is saved as a store session
+ * {@link CacheStoreSession#attachment() attachment}.
+ * The listener guarantees that the session will be
+ * available for any store operation. If there is an
+ * ongoing cache transaction, all operations within this
+ * transaction will share a DB transaction.
+ * <p>
+ * As an example, here is how the {@link CacheStore#write(javax.cache.Cache.Entry)}
+ * method can be implemented if {@link CacheHibernateStoreSessionListener}
+ * is configured:
+ * <pre name="code" class="java">
+ * private static class Store extends CacheStoreAdapter&lt;Integer, Integer&gt; {
+ *     &#64;CacheStoreSessionResource
+ *     private CacheStoreSession ses;
+ *
+ *     &#64;Override public void write(Cache.Entry&lt;? extends Integer, ? extends Integer&gt; entry) throws CacheWriterException {
+ *         // Get Hibernate session from the current store session.
+ *         Session hibSes = ses.attachment();
+ *
+ *         // Persist the value.
+ *         hibSes.persist(entry.getValue());
+ *     }
+ * }
+ * </pre>
+ * Hibernate session will be automatically created by the listener
+ * at the start of the session and closed when it ends.
+ * <p>
+ * {@link CacheHibernateStoreSessionListener} requires that either
+ * {@link #setSessionFactory(SessionFactory)} session factory}
+ * or {@link #setHibernateConfigurationPath(String) Hibernate configuration file}
+ * is provided. If non of them is set, exception is thrown. Is both are provided,
+ * session factory will be used.
+ */
+public class CacheHibernateStoreSessionListener implements CacheStoreSessionListener, LifecycleAware {
+    /** Hibernate session factory. */
+    private SessionFactory sesFactory;
+
+    /** Hibernate configuration file path. */
+    private String hibernateCfgPath;
+
+    /** Logger. */
+    @LoggerResource
+    private IgniteLogger log;
+
+    /** Whether to close session on stop. */
+    private boolean closeSesOnStop;
+
+    /**
+     * Sets Hibernate session factory.
+     * <p>
+     * Either session factory or configuration file is required.
+     * If none is provided, exception will be thrown on startup.
+     *
+     * @param sesFactory Session factory.
+     */
+    public void setSessionFactory(SessionFactory sesFactory) {
+        this.sesFactory = sesFactory;
+    }
+
+    /**
+     * Gets Hibernate session factory.
+     *
+     * @return Session factory.
+     */
+    public SessionFactory getSessionFactory() {
+        return sesFactory;
+    }
+
+    /**
+     * Sets hibernate configuration path.
+     * <p>
+     * Either session factory or configuration file is required.
+     * If none is provided, exception will be thrown on startup.
+     *
+     * @param hibernateCfgPath Hibernate configuration path.
+     */
+    public void setHibernateConfigurationPath(String hibernateCfgPath) {
+        this.hibernateCfgPath = hibernateCfgPath;
+    }
+
+    /**
+     * Gets hibernate configuration path.
+     *
+     * @return Hibernate configuration path.
+     */
+    public String getHibernateConfigurationPath() {
+        return hibernateCfgPath;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("deprecation")
+    @Override public void start() throws IgniteException {
+        if (sesFactory == null && F.isEmpty(hibernateCfgPath))
+            throw new IgniteException("Either session factory or Hibernate configuration file is required by " +
+                getClass().getSimpleName() + '.');
+
+        if (!F.isEmpty(hibernateCfgPath)) {
+            if (sesFactory == null) {
+                try {
+                    URL url = new URL(hibernateCfgPath);
+
+                    sesFactory = new Configuration().configure(url).buildSessionFactory();
+                }
+                catch (MalformedURLException ignored) {
+                    // No-op.
+                }
+
+                if (sesFactory == null) {
+                    File cfgFile = new File(hibernateCfgPath);
+
+                    if (cfgFile.exists())
+                        sesFactory = new Configuration().configure(cfgFile).buildSessionFactory();
+                }
+
+                if (sesFactory == null)
+                    sesFactory = new Configuration().configure(hibernateCfgPath).buildSessionFactory();
+
+                if (sesFactory == null)
+                    throw new IgniteException("Failed to resolve Hibernate configuration file: " + hibernateCfgPath);
+
+                closeSesOnStop = true;
+            }
+            else
+                U.warn(log, "Hibernate configuration file configured in " + getClass().getSimpleName() +
+                    " will be ignored (session factory is already set).");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() throws IgniteException {
+        if (closeSesOnStop && sesFactory != null && !sesFactory.isClosed())
+            sesFactory.close();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onSessionStart(CacheStoreSession ses) {
+        if (ses.attachment() == null) {
+            try {
+                Session hibSes = sesFactory.openSession();
+
+                ses.attach(hibSes);
+
+                if (ses.isWithinTransaction())
+                    hibSes.beginTransaction();
+            }
+            catch (HibernateException e) {
+                throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
+        Session hibSes = ses.attach(null);
+
+        if (hibSes != null) {
+            try {
+                Transaction tx = hibSes.getTransaction();
+
+                if (commit) {
+                    hibSes.flush();
+
+                    if (tx.isActive())
+                        tx.commit();
+                }
+                else if (tx.isActive())
+                    tx.rollback();
+            }
+            catch (HibernateException e) {
+                throw new CacheWriterException("Failed to end store session [tx=" + ses.transaction() + ']', e);
+            }
+            finally {
+                hibSes.close();
+            }
+        }
+    }
+}