You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/09/27 11:09:11 UTC

[16/63] [abbrv] ignite git commit: IGNITE-3912: Hadoop: Implemented new class loading architecture for embedded execution mode.

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractSelfTest.java
new file mode 100644
index 0000000..68009dd
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractSelfTest.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.io.File;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.ConnectorConfiguration;
+import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem;
+import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper;
+import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
+import org.apache.ignite.igfs.IgfsIpcEndpointType;
+import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemsUtils;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Abstract class for Hadoop tests.
+ */
+public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** REST port. */
+    protected static final int REST_PORT = 11212;
+
+    /** IGFS name. */
+    protected static final String igfsName = null;
+
+    /** IGFS name. */
+    protected static final String igfsMetaCacheName = "meta";
+
+    /** IGFS name. */
+    protected static final String igfsDataCacheName = "data";
+
+    /** IGFS block size. */
+    protected static final int igfsBlockSize = 1024;
+
+    /** IGFS block group size. */
+    protected static final int igfsBlockGroupSize = 8;
+
+    /** Initial REST port. */
+    private int restPort = REST_PORT;
+
+    /** Secondary file system REST endpoint configuration. */
+    protected static final IgfsIpcEndpointConfiguration SECONDARY_REST_CFG;
+
+    static {
+        SECONDARY_REST_CFG = new IgfsIpcEndpointConfiguration();
+
+        SECONDARY_REST_CFG.setType(IgfsIpcEndpointType.TCP);
+        SECONDARY_REST_CFG.setPort(11500);
+    }
+
+
+    /** Initial classpath. */
+    private static String initCp;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        // Add surefire classpath to regular classpath.
+        initCp = System.getProperty("java.class.path");
+
+        String surefireCp = System.getProperty("surefire.test.class.path");
+
+        if (surefireCp != null)
+            System.setProperty("java.class.path", initCp + File.pathSeparatorChar + surefireCp);
+
+        super.beforeTestsStarted();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        // Restore classpath.
+        System.setProperty("java.class.path", initCp);
+
+        initCp = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setHadoopConfiguration(hadoopConfiguration(gridName));
+
+        TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
+
+        commSpi.setSharedMemoryPort(-1);
+
+        cfg.setCommunicationSpi(commSpi);
+
+        TcpDiscoverySpi discoSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
+
+        discoSpi.setIpFinder(IP_FINDER);
+
+        if (igfsEnabled()) {
+            cfg.setCacheConfiguration(metaCacheConfiguration(), dataCacheConfiguration());
+
+            cfg.setFileSystemConfiguration(igfsConfiguration());
+        }
+
+        if (restEnabled()) {
+            ConnectorConfiguration clnCfg = new ConnectorConfiguration();
+
+            clnCfg.setPort(restPort++);
+
+            cfg.setConnectorConfiguration(clnCfg);
+        }
+
+        cfg.setLocalHost("127.0.0.1");
+        cfg.setPeerClassLoadingEnabled(false);
+
+        return cfg;
+    }
+
+    /**
+     * @param gridName Grid name.
+     * @return Hadoop configuration.
+     */
+    public HadoopConfiguration hadoopConfiguration(String gridName) {
+        HadoopConfiguration cfg = new HadoopConfiguration();
+
+        cfg.setMaxParallelTasks(3);
+
+        return cfg;
+    }
+
+    /**
+     * @return IGFS configuration.
+     */
+    public FileSystemConfiguration igfsConfiguration() throws Exception {
+        FileSystemConfiguration cfg = new FileSystemConfiguration();
+
+        cfg.setName(igfsName);
+        cfg.setBlockSize(igfsBlockSize);
+        cfg.setDataCacheName(igfsDataCacheName);
+        cfg.setMetaCacheName(igfsMetaCacheName);
+        cfg.setFragmentizerEnabled(false);
+
+        return cfg;
+    }
+
+    /**
+     * @return IGFS meta cache configuration.
+     */
+    public CacheConfiguration metaCacheConfiguration() {
+        CacheConfiguration cfg = new CacheConfiguration();
+
+        cfg.setName(igfsMetaCacheName);
+        cfg.setCacheMode(REPLICATED);
+        cfg.setAtomicityMode(TRANSACTIONAL);
+        cfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        return cfg;
+    }
+
+    /**
+     * @return IGFS data cache configuration.
+     */
+    private CacheConfiguration dataCacheConfiguration() {
+        CacheConfiguration cfg = new CacheConfiguration();
+
+        cfg.setName(igfsDataCacheName);
+        cfg.setCacheMode(PARTITIONED);
+        cfg.setAtomicityMode(TRANSACTIONAL);
+        cfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(igfsBlockGroupSize));
+        cfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        return cfg;
+    }
+
+    /**
+     * @return {@code True} if IGFS is enabled on Hadoop nodes.
+     */
+    protected boolean igfsEnabled() {
+        return false;
+    }
+
+    /**
+     * @return {@code True} if REST is enabled on Hadoop nodes.
+     */
+    protected boolean restEnabled() {
+        return false;
+    }
+
+    /**
+     * @return Number of nodes to start.
+     */
+    protected int gridCount() {
+        return 3;
+    }
+
+    /**
+     * @param cfg Config.
+     */
+    protected void setupFileSystems(Configuration cfg) {
+        cfg.set("fs.defaultFS", igfsScheme());
+        cfg.set("fs.igfs.impl", org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.class.getName());
+        cfg.set("fs.AbstractFileSystem.igfs.impl", IgniteHadoopFileSystem.
+            class.getName());
+
+        HadoopFileSystemsUtils.setupFileSystems(cfg);
+    }
+
+    /**
+     * @return IGFS scheme for test.
+     */
+    protected String igfsScheme() {
+        return "igfs://:" + getTestGridName(0) + "@/";
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractWordCountTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractWordCountTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractWordCountTest.java
new file mode 100644
index 0000000..3cb8f91
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractWordCountTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import com.google.common.base.Joiner;
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.processors.igfs.IgfsEx;
+
+/**
+ * Abstract class for tests based on WordCount test job.
+ */
+public abstract class HadoopAbstractWordCountTest extends HadoopAbstractSelfTest {
+    /** Input path. */
+    protected static final String PATH_INPUT = "/input";
+
+    /** Output path. */
+    protected static final String PATH_OUTPUT = "/output";
+
+    /** IGFS instance. */
+    protected IgfsEx igfs;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        Configuration cfg = new Configuration();
+
+        setupFileSystems(cfg);
+
+        // Init cache by correct LocalFileSystem implementation
+        FileSystem.getLocal(cfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        igfs = (IgfsEx)startGrids(gridCount()).fileSystem(igfsName);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean igfsEnabled() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 1;
+    }
+
+    /**
+     * Generates test file.
+     *
+     * @param path File name.
+     * @param wordCounts Words and counts.
+     * @throws Exception If failed.
+     */
+    protected void generateTestFile(String path, Object... wordCounts) throws Exception {
+        List<String> wordsArr = new ArrayList<>();
+
+        //Generating
+        for (int i = 0; i < wordCounts.length; i += 2) {
+            String word = (String) wordCounts[i];
+            int cnt = (Integer) wordCounts[i + 1];
+
+            while (cnt-- > 0)
+                wordsArr.add(word);
+        }
+
+        //Shuffling
+        for (int i = 0; i < wordsArr.size(); i++) {
+            int j = (int)(Math.random() * wordsArr.size());
+
+            Collections.swap(wordsArr, i, j);
+        }
+
+        //Input file preparing
+        PrintWriter testInputFileWriter = new PrintWriter(igfs.create(new IgfsPath(path), true));
+
+        int j = 0;
+
+        while (j < wordsArr.size()) {
+            int i = 5 + (int)(Math.random() * 5);
+
+            List<String> subList = wordsArr.subList(j, Math.min(j + i, wordsArr.size()));
+            j += i;
+
+            testInputFileWriter.println(Joiner.on(' ').join(subList));
+        }
+
+        testInputFileWriter.close();
+    }
+
+    /**
+     * Read w/o decoding (default).
+     *
+     * @param fileName The file.
+     * @return The file contents, human-readable.
+     * @throws Exception On error.
+     */
+    protected String readAndSortFile(String fileName) throws Exception {
+        return readAndSortFile(fileName, null);
+    }
+
+    /**
+     * Reads whole text file into String.
+     *
+     * @param fileName Name of the file to read.
+     * @return Content of the file as String value.
+     * @throws Exception If could not read the file.
+     */
+    protected String readAndSortFile(String fileName, Configuration conf) throws Exception {
+        final List<String> list = new ArrayList<>();
+
+        final boolean snappyDecode = conf != null && conf.getBoolean(FileOutputFormat.COMPRESS, false);
+
+        if (snappyDecode) {
+            try (SequenceFile.Reader reader = new SequenceFile.Reader(conf,
+                    SequenceFile.Reader.file(new Path(fileName)))) {
+                Text key = new Text();
+
+                IntWritable val = new IntWritable();
+
+                while (reader.next(key, val))
+                    list.add(key + "\t" + val);
+            }
+        }
+        else {
+            try (InputStream is0 = igfs.open(new IgfsPath(fileName))) {
+                BufferedReader reader = new BufferedReader(new InputStreamReader(is0));
+
+                String line;
+
+                while ((line = reader.readLine()) != null)
+                    list.add(line);
+            }
+        }
+
+        Collections.sort(list);
+
+        return Joiner.on('\n').join(list) + "\n";
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopCommandLineTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopCommandLineTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopCommandLineTest.java
new file mode 100644
index 0000000..0be8bf9
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopCommandLineTest.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import com.google.common.base.Joiner;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter;
+import org.apache.ignite.igfs.IgfsInputStream;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker;
+import org.apache.ignite.internal.processors.igfs.IgfsEx;
+import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jsr166.ConcurrentHashMap8;
+
+/**
+ * Test of integration with Hadoop client via command line interface.
+ */
+public class HadoopCommandLineTest extends GridCommonAbstractTest {
+    /** IGFS instance. */
+    private IgfsEx igfs;
+
+    /** */
+    private static final String igfsName = "igfs";
+
+    /** */
+    private static File testWorkDir;
+
+    /** */
+    private static String hadoopHome;
+
+    /** */
+    private static String hiveHome;
+
+    /** */
+    private static File examplesJar;
+
+    /**
+     *
+     * @param path File name.
+     * @param wordCounts Words and counts.
+     * @throws Exception If failed.
+     */
+    private void generateTestFile(File path, Object... wordCounts) throws Exception {
+        List<String> wordsArr = new ArrayList<>();
+
+        //Generating
+        for (int i = 0; i < wordCounts.length; i += 2) {
+            String word = (String) wordCounts[i];
+            int cnt = (Integer) wordCounts[i + 1];
+
+            while (cnt-- > 0)
+                wordsArr.add(word);
+        }
+
+        //Shuffling
+        for (int i = 0; i < wordsArr.size(); i++) {
+            int j = (int)(Math.random() * wordsArr.size());
+
+            Collections.swap(wordsArr, i, j);
+        }
+
+        //Writing file
+        try (PrintWriter writer = new PrintWriter(path)) {
+            int j = 0;
+
+            while (j < wordsArr.size()) {
+                int i = 5 + (int)(Math.random() * 5);
+
+                List<String> subList = wordsArr.subList(j, Math.min(j + i, wordsArr.size()));
+                j += i;
+
+                writer.println(Joiner.on(' ').join(subList));
+            }
+
+            writer.flush();
+        }
+    }
+
+    /**
+     * Generates two data files to join its with Hive.
+     *
+     * @throws FileNotFoundException If failed.
+     */
+    private void generateHiveTestFiles() throws FileNotFoundException {
+        try (PrintWriter writerA = new PrintWriter(new File(testWorkDir, "data-a"));
+             PrintWriter writerB = new PrintWriter(new File(testWorkDir, "data-b"))) {
+            char sep = '\t';
+
+            int idB = 0;
+            int idA = 0;
+            int v = 1000;
+
+            for (int i = 0; i < 1000; i++) {
+                writerA.print(idA++);
+                writerA.print(sep);
+                writerA.println(idB);
+
+                writerB.print(idB++);
+                writerB.print(sep);
+                writerB.println(v += 2);
+
+                writerB.print(idB++);
+                writerB.print(sep);
+                writerB.println(v += 2);
+            }
+
+            writerA.flush();
+            writerB.flush();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        hiveHome = IgniteSystemProperties.getString("HIVE_HOME");
+
+        assertFalse("HIVE_HOME hasn't been set.", F.isEmpty(hiveHome));
+
+        hadoopHome = IgniteSystemProperties.getString("HADOOP_HOME");
+
+        assertFalse("HADOOP_HOME hasn't been set.", F.isEmpty(hadoopHome));
+
+        String mapredHome = hadoopHome + "/share/hadoop/mapreduce";
+
+        File[] fileList = new File(mapredHome).listFiles(new FileFilter() {
+            @Override public boolean accept(File pathname) {
+                return pathname.getName().startsWith("hadoop-mapreduce-examples-") &&
+                    pathname.getName().endsWith(".jar");
+            }
+        });
+
+        assertEquals("Invalid hadoop distribution.", 1, fileList.length);
+
+        examplesJar = fileList[0];
+
+        testWorkDir = Files.createTempDirectory("hadoop-cli-test").toFile();
+
+        U.copy(resolveHadoopConfig("core-site.ignite.xml"), new File(testWorkDir, "core-site.xml"), false);
+
+        File srcFile = resolveHadoopConfig("mapred-site.ignite.xml");
+        File dstFile = new File(testWorkDir, "mapred-site.xml");
+
+        try (BufferedReader in = new BufferedReader(new FileReader(srcFile));
+             PrintWriter out = new PrintWriter(dstFile)) {
+            String line;
+
+            while ((line = in.readLine()) != null) {
+                if (line.startsWith("</configuration>"))
+                    out.println(
+                        "    <property>\n" +
+                        "        <name>" + HadoopCommonUtils.JOB_COUNTER_WRITER_PROPERTY + "</name>\n" +
+                        "        <value>" + IgniteHadoopFileSystemCounterWriter.class.getName() + "</value>\n" +
+                        "    </property>\n");
+
+                out.println(line);
+            }
+
+            out.flush();
+        }
+
+        generateTestFile(new File(testWorkDir, "test-data"), "red", 100, "green", 200, "blue", 150, "yellow", 50);
+
+        generateHiveTestFiles();
+    }
+
+    /**
+     * Resolve Hadoop configuration file.
+     *
+     * @param name File name.
+     * @return Resolve file.
+     */
+    private static File resolveHadoopConfig(String name) {
+        File path = U.resolveIgnitePath("modules/hadoop/config/" + name);
+
+        return path != null ? path : U.resolveIgnitePath("config/hadoop/" + name);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        U.delete(testWorkDir);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        String cfgPath = "config/hadoop/default-config.xml";
+
+        IgniteBiTuple<IgniteConfiguration, GridSpringResourceContext> tup = IgnitionEx.loadConfiguration(cfgPath);
+
+        IgniteConfiguration cfg = tup.get1();
+
+        cfg.setLocalHost("127.0.0.1"); // Avoid connecting to other nodes.
+
+        igfs = (IgfsEx) Ignition.start(cfg).fileSystem(igfsName);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids(true);
+    }
+
+    /**
+     * Creates the process build with appropriate environment to run Hadoop CLI.
+     *
+     * @return Process builder.
+     */
+    private ProcessBuilder createProcessBuilder() {
+        String sep = ":";
+
+        String ggClsPath = HadoopJob.class.getProtectionDomain().getCodeSource().getLocation().getPath() + sep +
+            HadoopJobTracker.class.getProtectionDomain().getCodeSource().getLocation().getPath() + sep +
+            ConcurrentHashMap8.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+
+        ProcessBuilder res = new ProcessBuilder();
+
+        res.environment().put("HADOOP_HOME", hadoopHome);
+        res.environment().put("HADOOP_CLASSPATH", ggClsPath);
+        res.environment().put("HADOOP_CONF_DIR", testWorkDir.getAbsolutePath());
+
+        res.redirectErrorStream(true);
+
+        return res;
+    }
+
+    /**
+     * Waits for process exit and prints the its output.
+     *
+     * @param proc Process.
+     * @return Exit code.
+     * @throws Exception If failed.
+     */
+    private int watchProcess(Process proc) throws Exception {
+        BufferedReader reader = new BufferedReader(new InputStreamReader(proc.getInputStream()));
+
+        String line;
+
+        while ((line = reader.readLine()) != null)
+            log().info(line);
+
+        return proc.waitFor();
+    }
+
+    /**
+     * Executes Hadoop command line tool.
+     *
+     * @param args Arguments for Hadoop command line tool.
+     * @return Process exit code.
+     * @throws Exception If failed.
+     */
+    private int executeHadoopCmd(String... args) throws Exception {
+        ProcessBuilder procBuilder = createProcessBuilder();
+
+        List<String> cmd = new ArrayList<>();
+
+        cmd.add(hadoopHome + "/bin/hadoop");
+        cmd.addAll(Arrays.asList(args));
+
+        procBuilder.command(cmd);
+
+        log().info("Execute: " + procBuilder.command());
+
+        return watchProcess(procBuilder.start());
+    }
+
+    /**
+     * Executes Hive query.
+     *
+     * @param qry Query.
+     * @return Process exit code.
+     * @throws Exception If failed.
+     */
+    private int executeHiveQuery(String qry) throws Exception {
+        ProcessBuilder procBuilder = createProcessBuilder();
+
+        List<String> cmd = new ArrayList<>();
+
+        procBuilder.command(cmd);
+
+        cmd.add(hiveHome + "/bin/hive");
+
+        cmd.add("--hiveconf");
+        cmd.add("hive.rpc.query.plan=true");
+
+        cmd.add("--hiveconf");
+        cmd.add("javax.jdo.option.ConnectionURL=jdbc:derby:" + testWorkDir.getAbsolutePath() + "/metastore_db;" +
+            "databaseName=metastore_db;create=true");
+
+        cmd.add("-e");
+        cmd.add(qry);
+
+        procBuilder.command(cmd);
+
+        log().info("Execute: " + procBuilder.command());
+
+        return watchProcess(procBuilder.start());
+    }
+
+    /**
+     * Tests Hadoop command line integration.
+     */
+    public void testHadoopCommandLine() throws Exception {
+        assertEquals(0, executeHadoopCmd("fs", "-ls", "/"));
+
+        assertEquals(0, executeHadoopCmd("fs", "-mkdir", "/input"));
+
+        assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "test-data").getAbsolutePath(), "/input"));
+
+        assertTrue(igfs.exists(new IgfsPath("/input/test-data")));
+
+        assertEquals(0, executeHadoopCmd("jar", examplesJar.getAbsolutePath(), "wordcount", "/input", "/output"));
+
+        IgfsPath path = new IgfsPath("/user/" + System.getProperty("user.name") + "/");
+
+        assertTrue(igfs.exists(path));
+
+        IgfsPath jobStatPath = null;
+
+        for (IgfsPath jobPath : igfs.listPaths(path)) {
+            assertNull(jobStatPath);
+
+            jobStatPath = jobPath;
+        }
+
+        File locStatFile = new File(testWorkDir, "performance");
+
+        assertEquals(0, executeHadoopCmd("fs", "-get", jobStatPath.toString() + "/performance", locStatFile.toString()));
+
+        long evtCnt = HadoopTestUtils.simpleCheckJobStatFile(new BufferedReader(new FileReader(locStatFile)));
+
+        assertTrue(evtCnt >= 22); //It's the minimum amount of events for job with combiner.
+
+        assertTrue(igfs.exists(new IgfsPath("/output")));
+
+        BufferedReader in = new BufferedReader(new InputStreamReader(igfs.open(new IgfsPath("/output/part-r-00000"))));
+
+        List<String> res = new ArrayList<>();
+
+        String line;
+
+        while ((line = in.readLine()) != null)
+            res.add(line);
+
+        Collections.sort(res);
+
+        assertEquals("[blue\t150, green\t200, red\t100, yellow\t50]", res.toString());
+    }
+
+    /**
+     * Runs query check result.
+     *
+     * @param expRes Expected result.
+     * @param qry Query.
+     * @throws Exception If failed.
+     */
+    private void checkQuery(String expRes, String qry) throws Exception {
+        assertEquals(0, executeHiveQuery("drop table if exists result"));
+
+        assertEquals(0, executeHiveQuery(
+            "create table result " +
+            "row format delimited fields terminated by ' ' " +
+            "stored as textfile " +
+            "location '/result' as " + qry
+        ));
+
+        IgfsInputStream in = igfs.open(new IgfsPath("/result/000000_0"));
+
+        byte[] buf = new byte[(int) in.length()];
+
+        in.read(buf);
+
+        assertEquals(expRes, new String(buf));
+    }
+
+    /**
+     * Tests Hive integration.
+     */
+    public void testHiveCommandLine() throws Exception {
+        assertEquals(0, executeHiveQuery(
+            "create table table_a (" +
+                "id_a int," +
+                "id_b int" +
+            ") " +
+            "row format delimited fields terminated by '\\t'" +
+            "stored as textfile " +
+            "location '/table-a'"
+        ));
+
+        assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "data-a").getAbsolutePath(), "/table-a"));
+
+        assertEquals(0, executeHiveQuery(
+            "create table table_b (" +
+                "id_b int," +
+                "rndv int" +
+            ") " +
+            "row format delimited fields terminated by '\\t'" +
+            "stored as textfile " +
+            "location '/table-b'"
+        ));
+
+        assertEquals(0, executeHadoopCmd("fs", "-put", new File(testWorkDir, "data-b").getAbsolutePath(), "/table-b"));
+
+        checkQuery(
+            "0 0\n" +
+            "1 2\n" +
+            "2 4\n" +
+            "3 6\n" +
+            "4 8\n" +
+            "5 10\n" +
+            "6 12\n" +
+            "7 14\n" +
+            "8 16\n" +
+            "9 18\n",
+            "select * from table_a order by id_a limit 10"
+        );
+
+        checkQuery("2000\n", "select count(id_b) from table_b");
+
+        checkQuery(
+            "250 500 2002\n" +
+            "251 502 2006\n" +
+            "252 504 2010\n" +
+            "253 506 2014\n" +
+            "254 508 2018\n" +
+            "255 510 2022\n" +
+            "256 512 2026\n" +
+            "257 514 2030\n" +
+            "258 516 2034\n" +
+            "259 518 2038\n",
+            "select a.id_a, a.id_b, b.rndv" +
+            " from table_a a" +
+            " inner join table_b b on a.id_b = b.id_b" +
+            " where b.rndv > 2000" +
+            " order by a.id_a limit 10"
+        );
+
+        checkQuery("1000\n", "select count(b.id_b) from table_a a inner join table_b b on a.id_b = b.id_b");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopDefaultMapReducePlannerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopDefaultMapReducePlannerSelfTest.java
new file mode 100644
index 0000000..ee1c88f
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopDefaultMapReducePlannerSelfTest.java
@@ -0,0 +1,619 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteFileSystem;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.hadoop.mapreduce.IgniteHadoopMapReducePlanner;
+import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner;
+import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner;
+import org.apache.ignite.internal.processors.igfs.IgfsBlockLocationImpl;
+import org.apache.ignite.internal.processors.igfs.IgfsIgniteMock;
+import org.apache.ignite.internal.processors.igfs.IgfsMock;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.GridTestNode;
+import org.apache.ignite.testframework.GridTestUtils;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ *
+ */
+public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTest {
+    /** */
+    private static final UUID ID_1 = new UUID(0, 1);
+
+    /** */
+    private static final UUID ID_2 = new UUID(0, 2);
+
+    /** */
+    private static final UUID ID_3 = new UUID(0, 3);
+
+    /** */
+    private static final String HOST_1 = "host1";
+
+    /** */
+    private static final String HOST_2 = "host2";
+
+    /** */
+    private static final String HOST_3 = "host3";
+
+    /** */
+    private static final String INVALID_HOST_1 = "invalid_host1";
+
+    /** */
+    private static final String INVALID_HOST_2 = "invalid_host2";
+
+    /** */
+    private static final String INVALID_HOST_3 = "invalid_host3";
+
+    /** Mocked IGFS. */
+    private static final IgniteFileSystem IGFS = new MockIgfs();
+
+    /** Mocked Grid. */
+    private static final IgfsIgniteMock GRID = new IgfsIgniteMock(null, IGFS);
+
+    /** Planner. */
+    private static final HadoopMapReducePlanner PLANNER = new IgniteHadoopMapReducePlanner();
+
+    /** Block locations. */
+    private static final Map<Block, Collection<IgfsBlockLocation>> BLOCK_MAP = new HashMap<>();
+
+    /** Proxy map. */
+    private static final Map<URI, Boolean> PROXY_MAP = new HashMap<>();
+
+    /** Last created plan. */
+    private static final ThreadLocal<HadoopMapReducePlan> PLAN = new ThreadLocal<>();
+
+    /**
+     * Static initializer.
+     */
+    static {
+        GridTestUtils.setFieldValue(PLANNER, HadoopAbstractMapReducePlanner.class, "ignite", GRID);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        GridTestUtils.setFieldValue(PLANNER, HadoopAbstractMapReducePlanner.class, "log", log());
+
+        BLOCK_MAP.clear();
+        PROXY_MAP.clear();
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testIgfsOneBlockPerNode() throws IgniteCheckedException {
+        HadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1);
+        HadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_2);
+        HadoopFileBlock split3 = split(true, "/file3", 0, 100, HOST_3);
+
+        mapIgfsBlock(split1.file(), 0, 100, location(0, 100, ID_1));
+        mapIgfsBlock(split2.file(), 0, 100, location(0, 100, ID_2));
+        mapIgfsBlock(split3.file(), 0, 100, location(0, 100, ID_3));
+
+        plan(1, split1);
+        assert ensureMappers(ID_1, split1);
+        assert ensureReducers(ID_1, 1);
+        assert ensureEmpty(ID_2);
+        assert ensureEmpty(ID_3);
+
+        plan(2, split1);
+        assert ensureMappers(ID_1, split1);
+        assert ensureReducers(ID_1, 2);
+        assert ensureEmpty(ID_2);
+        assert ensureEmpty(ID_3);
+
+        plan(1, split1, split2);
+        assert ensureMappers(ID_1, split1);
+        assert ensureMappers(ID_2, split2);
+        assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1);
+        assert ensureEmpty(ID_3);
+
+        plan(2, split1, split2);
+        assert ensureMappers(ID_1, split1);
+        assert ensureMappers(ID_2, split2);
+        assert ensureReducers(ID_1, 1);
+        assert ensureReducers(ID_2, 1);
+        assert ensureEmpty(ID_3);
+
+        plan(3, split1, split2);
+        assert ensureMappers(ID_1, split1);
+        assert ensureMappers(ID_2, split2);
+        assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) || ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1);
+        assert ensureEmpty(ID_3);
+
+        plan(3, split1, split2, split3);
+        assert ensureMappers(ID_1, split1);
+        assert ensureMappers(ID_2, split2);
+        assert ensureMappers(ID_3, split3);
+        assert ensureReducers(ID_1, 1);
+        assert ensureReducers(ID_2, 1);
+        assert ensureReducers(ID_3, 1);
+
+        plan(5, split1, split2, split3);
+        assert ensureMappers(ID_1, split1);
+        assert ensureMappers(ID_2, split2);
+        assert ensureMappers(ID_3, split3);
+        assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) ||
+            ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) ||
+            ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testNonIgfsOneBlockPerNode() throws IgniteCheckedException {
+        HadoopFileBlock split1 = split(false, "/file1", 0, 100, HOST_1);
+        HadoopFileBlock split2 = split(false, "/file2", 0, 100, HOST_2);
+        HadoopFileBlock split3 = split(false, "/file3", 0, 100, HOST_3);
+
+        plan(1, split1);
+        assert ensureMappers(ID_1, split1);
+        assert ensureReducers(ID_1, 1);
+        assert ensureEmpty(ID_2);
+        assert ensureEmpty(ID_3);
+
+        plan(2, split1);
+        assert ensureMappers(ID_1, split1);
+        assert ensureReducers(ID_1, 2);
+        assert ensureEmpty(ID_2);
+        assert ensureEmpty(ID_3);
+
+        plan(1, split1, split2);
+        assert ensureMappers(ID_1, split1);
+        assert ensureMappers(ID_2, split2);
+        assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1);
+        assert ensureEmpty(ID_3);
+
+        plan(2, split1, split2);
+        assert ensureMappers(ID_1, split1);
+        assert ensureMappers(ID_2, split2);
+        assert ensureReducers(ID_1, 1);
+        assert ensureReducers(ID_2, 1);
+        assert ensureEmpty(ID_3);
+
+        plan(3, split1, split2);
+        assert ensureMappers(ID_1, split1);
+        assert ensureMappers(ID_2, split2);
+        assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) || ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1);
+        assert ensureEmpty(ID_3);
+
+        plan(3, split1, split2, split3);
+        assert ensureMappers(ID_1, split1);
+        assert ensureMappers(ID_2, split2);
+        assert ensureMappers(ID_3, split3);
+        assert ensureReducers(ID_1, 1);
+        assert ensureReducers(ID_2, 1);
+        assert ensureReducers(ID_3, 1);
+
+        plan(5, split1, split2, split3);
+        assert ensureMappers(ID_1, split1);
+        assert ensureMappers(ID_2, split2);
+        assert ensureMappers(ID_3, split3);
+        assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) ||
+            ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) ||
+            ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testIgfsSeveralBlocksPerNode() throws IgniteCheckedException {
+        HadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1, HOST_2);
+        HadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_1, HOST_2);
+        HadoopFileBlock split3 = split(true, "/file3", 0, 100, HOST_1, HOST_3);
+
+        mapIgfsBlock(split1.file(), 0, 100, location(0, 100, ID_1, ID_2));
+        mapIgfsBlock(split2.file(), 0, 100, location(0, 100, ID_1, ID_2));
+        mapIgfsBlock(split3.file(), 0, 100, location(0, 100, ID_1, ID_3));
+
+        plan(1, split1);
+        assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && ensureEmpty(ID_2) ||
+            ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 1);
+        assert ensureEmpty(ID_3);
+
+        plan(2, split1);
+        assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && ensureEmpty(ID_2) ||
+            ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 2);
+        assert ensureEmpty(ID_3);
+
+        plan(1, split1, split2);
+        assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) ||
+            ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1);
+        assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1);
+        assert ensureEmpty(ID_3);
+
+        plan(2, split1, split2);
+        assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) ||
+            ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1);
+        assert ensureReducers(ID_1, 1);
+        assert ensureReducers(ID_2, 1);
+        assert ensureEmpty(ID_3);
+
+        plan(3, split1, split2, split3);
+        assert ensureReducers(ID_1, 1);
+        assert ensureReducers(ID_2, 1);
+        assert ensureReducers(ID_3, 1);
+
+        plan(5, split1, split2, split3);
+        assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) ||
+            ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) ||
+            ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testNonIgfsSeveralBlocksPerNode() throws IgniteCheckedException {
+        HadoopFileBlock split1 = split(false, "/file1", 0, 100, HOST_1, HOST_2);
+        HadoopFileBlock split2 = split(false, "/file2", 0, 100, HOST_1, HOST_2);
+        HadoopFileBlock split3 = split(false, "/file3", 0, 100, HOST_1, HOST_3);
+
+        plan(1, split1);
+        assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && ensureEmpty(ID_2) ||
+            ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 1);
+        assert ensureEmpty(ID_3);
+
+        plan(2, split1);
+        assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && ensureEmpty(ID_2) ||
+            ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 2);
+        assert ensureEmpty(ID_3);
+
+        plan(1, split1, split2);
+        assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) ||
+            ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1);
+        assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1);
+        assert ensureEmpty(ID_3);
+
+        plan(2, split1, split2);
+        assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) ||
+            ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1);
+        assert ensureReducers(ID_1, 1);
+        assert ensureReducers(ID_2, 1);
+        assert ensureEmpty(ID_3);
+
+        plan(3, split1, split2, split3);
+        assert ensureReducers(ID_1, 1);
+        assert ensureReducers(ID_2, 1);
+        assert ensureReducers(ID_3, 1);
+
+        plan(5, split1, split2, split3);
+        assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) ||
+            ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) ||
+            ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testIgfsSeveralComplexBlocksPerNode() throws IgniteCheckedException {
+        HadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1, HOST_2, HOST_3);
+        HadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_1, HOST_2, HOST_3);
+
+        mapIgfsBlock(split1.file(), 0, 100, location(0, 50, ID_1, ID_2), location(51, 100, ID_1, ID_3));
+        mapIgfsBlock(split2.file(), 0, 100, location(0, 50, ID_1, ID_2), location(51, 100, ID_2, ID_3));
+
+        plan(1, split1);
+        assert ensureMappers(ID_1, split1);
+        assert ensureReducers(ID_1, 1);
+        assert ensureEmpty(ID_2);
+        assert ensureEmpty(ID_3);
+
+        plan(1, split2);
+        assert ensureMappers(ID_2, split2);
+        assert ensureReducers(ID_2, 1);
+        assert ensureEmpty(ID_1);
+        assert ensureEmpty(ID_3);
+
+        plan(1, split1, split2);
+        assert ensureMappers(ID_1, split1);
+        assert ensureMappers(ID_2, split2);
+        assert ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1) || ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0);
+        assert ensureEmpty(ID_3);
+
+        plan(2, split1, split2);
+        assert ensureMappers(ID_1, split1);
+        assert ensureMappers(ID_2, split2);
+        assert ensureReducers(ID_1, 1);
+        assert ensureReducers(ID_2, 1);
+        assert ensureEmpty(ID_3);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testNonIgfsOrphans() throws IgniteCheckedException {
+        HadoopFileBlock split1 = split(false, "/file1", 0, 100, INVALID_HOST_1, INVALID_HOST_2);
+        HadoopFileBlock split2 = split(false, "/file2", 0, 100, INVALID_HOST_1, INVALID_HOST_3);
+        HadoopFileBlock split3 = split(false, "/file3", 0, 100, INVALID_HOST_2, INVALID_HOST_3);
+
+        plan(1, split1);
+        assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && ensureEmpty(ID_2) && ensureEmpty(ID_3) ||
+            ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 1) && ensureEmpty(ID_3) ||
+            ensureEmpty(ID_1) && ensureEmpty(ID_2) && ensureMappers(ID_3, split1) && ensureReducers(ID_3, 1);
+
+        plan(2, split1);
+        assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && ensureEmpty(ID_2) && ensureEmpty(ID_3) ||
+            ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 2) && ensureEmpty(ID_3) ||
+            ensureEmpty(ID_1) && ensureEmpty(ID_2) && ensureMappers(ID_3, split1) && ensureReducers(ID_3, 2);
+
+        plan(1, split1, split2, split3);
+        assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split3) ||
+            ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split2) ||
+            ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split3) ||
+            ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split1) ||
+            ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split2) ||
+            ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split1);
+        assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) && ensureReducers(ID_3, 0) ||
+            ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 0) ||
+            ensureReducers(ID_1, 0) && ensureReducers(ID_2, 0) && ensureReducers(ID_3, 1);
+
+        plan(3, split1, split2, split3);
+        assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split3) ||
+            ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split2) ||
+            ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split3) ||
+            ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split1) ||
+            ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split2) ||
+            ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split1);
+        assert ensureReducers(ID_1, 1);
+        assert ensureReducers(ID_2, 1);
+        assert ensureReducers(ID_3, 1);
+
+        plan(5, split1, split2, split3);
+        assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split3) ||
+            ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split2) ||
+            ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split3) ||
+            ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split1) ||
+            ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split2) ||
+            ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split1);
+        assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) ||
+            ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) ||
+            ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1);
+    }
+
+    /**
+     * Create plan.
+     *
+     * @param reducers Reducers count.
+     * @param splits Splits.
+     * @return Plan.
+     * @throws IgniteCheckedException If failed.
+     */
+    private static HadoopMapReducePlan plan(int reducers, HadoopInputSplit... splits) throws IgniteCheckedException {
+        assert reducers > 0;
+        assert splits != null && splits.length > 0;
+
+        Collection<HadoopInputSplit> splitList = new ArrayList<>(splits.length);
+
+        Collections.addAll(splitList, splits);
+
+        Collection<ClusterNode> top = new ArrayList<>();
+
+        GridTestNode node1 = new GridTestNode(ID_1);
+        GridTestNode node2 = new GridTestNode(ID_2);
+        GridTestNode node3 = new GridTestNode(ID_3);
+
+        node1.setHostName(HOST_1);
+        node2.setHostName(HOST_2);
+        node3.setHostName(HOST_3);
+
+        top.add(node1);
+        top.add(node2);
+        top.add(node3);
+
+        HadoopMapReducePlan plan = PLANNER.preparePlan(new HadoopPlannerMockJob(splitList, reducers), top, null);
+
+        PLAN.set(plan);
+
+        return plan;
+    }
+
+    /**
+     * Ensure that node contains the given mappers.
+     *
+     * @param nodeId Node ID.
+     * @param expSplits Expected splits.
+     * @return {@code True} if this assumption is valid.
+     */
+    private static boolean ensureMappers(UUID nodeId, HadoopInputSplit... expSplits) {
+        Collection<HadoopInputSplit> expSplitsCol = new ArrayList<>();
+
+        Collections.addAll(expSplitsCol, expSplits);
+
+        Collection<HadoopInputSplit> splits = PLAN.get().mappers(nodeId);
+
+        return F.eq(expSplitsCol, splits);
+    }
+
+    /**
+     * Ensure that node contains the given amount of reducers.
+     *
+     * @param nodeId Node ID.
+     * @param reducers Reducers.
+     * @return {@code True} if this assumption is valid.
+     */
+    private static boolean ensureReducers(UUID nodeId, int reducers) {
+        int[] reducersArr = PLAN.get().reducers(nodeId);
+
+        return reducers == 0 ? F.isEmpty(reducersArr) : (reducersArr != null && reducersArr.length == reducers);
+    }
+
+    /**
+     * Ensure that no mappers and reducers is located on this node.
+     *
+     * @param nodeId Node ID.
+     * @return {@code True} if this assumption is valid.
+     */
+    private static boolean ensureEmpty(UUID nodeId) {
+        return F.isEmpty(PLAN.get().mappers(nodeId)) && F.isEmpty(PLAN.get().reducers(nodeId));
+    }
+
+    /**
+     * Create split.
+     *
+     * @param igfs IGFS flag.
+     * @param file File.
+     * @param start Start.
+     * @param len Length.
+     * @param hosts Hosts.
+     * @return Split.
+     */
+    private static HadoopFileBlock split(boolean igfs, String file, long start, long len, String... hosts) {
+        URI uri = URI.create((igfs ? "igfs://igfs@" : "hdfs://") + file);
+
+        return new HadoopFileBlock(hosts, uri, start, len);
+    }
+
+    /**
+     * Create block location.
+     *
+     * @param start Start.
+     * @param len Length.
+     * @param nodeIds Node IDs.
+     * @return Block location.
+     */
+    private static IgfsBlockLocation location(long start, long len, UUID... nodeIds) {
+        assert nodeIds != null && nodeIds.length > 0;
+
+        Collection<ClusterNode> nodes = new ArrayList<>(nodeIds.length);
+
+        for (UUID id : nodeIds)
+            nodes.add(new GridTestNode(id));
+
+        return new IgfsBlockLocationImpl(start, len, nodes);
+    }
+
+    /**
+     * Map IGFS block to nodes.
+     *
+     * @param file File.
+     * @param start Start.
+     * @param len Length.
+     * @param locations Locations.
+     */
+    private static void mapIgfsBlock(URI file, long start, long len, IgfsBlockLocation... locations) {
+        assert locations != null && locations.length > 0;
+
+        IgfsPath path = new IgfsPath(file);
+
+        Block block = new Block(path, start, len);
+
+        Collection<IgfsBlockLocation> locationsList = new ArrayList<>();
+
+        Collections.addAll(locationsList, locations);
+
+        BLOCK_MAP.put(block, locationsList);
+    }
+
+    /**
+     * Block.
+     */
+    private static class Block {
+        /** */
+        private final IgfsPath path;
+
+        /** */
+        private final long start;
+
+        /** */
+        private final long len;
+
+        /**
+         * Constructor.
+         *
+         * @param path Path.
+         * @param start Start.
+         * @param len Length.
+         */
+        private Block(IgfsPath path, long start, long len) {
+            this.path = path;
+            this.start = start;
+            this.len = len;
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("RedundantIfStatement")
+        @Override public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof Block)) return false;
+
+            Block block = (Block) o;
+
+            if (len != block.len)
+                return false;
+
+            if (start != block.start)
+                return false;
+
+            if (!path.equals(block.path))
+                return false;
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = path.hashCode();
+
+            res = 31 * res + (int) (start ^ (start >>> 32));
+            res = 31 * res + (int) (len ^ (len >>> 32));
+
+            return res;
+        }
+    }
+
+    /**
+     * Mocked IGFS.
+     */
+    private static class MockIgfs extends IgfsMock {
+        /**
+         * Constructor.
+         */
+        public MockIgfs() {
+            super("igfs");
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isProxy(URI path) {
+            return PROXY_MAP.containsKey(path) && PROXY_MAP.get(path);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) {
+            return BLOCK_MAP.get(new Block(path, start, len));
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean exists(IgfsPath path) {
+            return true;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopErrorSimulator.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopErrorSimulator.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopErrorSimulator.java
new file mode 100644
index 0000000..b89dcc1
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopErrorSimulator.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Error simulator.
+ */
+public class HadoopErrorSimulator {
+    /** No-op singleton instance. */
+    public static final HadoopErrorSimulator noopInstance = new HadoopErrorSimulator();
+
+    /** Instance ref. */
+    private static final AtomicReference<HadoopErrorSimulator> ref = new AtomicReference<>(noopInstance);
+
+    /**
+     * Creates simulator of given kind with given stage bits.
+     *
+     * @param kind The kind.
+     * @param bits The stage bits.
+     * @return The simulator.
+     */
+    public static HadoopErrorSimulator create(Kind kind, int bits) {
+        switch (kind) {
+            case Noop:
+                return noopInstance;
+            case Runtime:
+                return new RuntimeExceptionBitHadoopErrorSimulator(bits);
+            case IOException:
+                return new IOExceptionBitHadoopErrorSimulator(bits);
+            case Error:
+                return new ErrorBitHadoopErrorSimulator(bits);
+            default:
+                throw new IllegalStateException("Unknown kind: " + kind);
+        }
+    }
+
+    /**
+     * Gets the error simulator instance.
+     */
+    public static HadoopErrorSimulator instance() {
+        return ref.get();
+    }
+
+    /**
+     * Sets instance.
+     */
+    public static boolean setInstance(HadoopErrorSimulator expect, HadoopErrorSimulator update) {
+        return ref.compareAndSet(expect, update);
+    }
+
+    /**
+     * Constructor.
+     */
+    private HadoopErrorSimulator() {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onMapConfigure() {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onMapSetup()  throws IOException, InterruptedException {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onMap() throws IOException {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onMapCleanup()  throws IOException, InterruptedException {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onMapClose()  throws IOException {
+        // no-op
+    }
+
+    /**
+     * setConf() does not declare IOException to be thrown.
+     */
+    public void onCombineConfigure() {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onCombineSetup() throws IOException, InterruptedException {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onCombine() throws IOException {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onCombineCleanup() throws IOException, InterruptedException {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onReduceConfigure() {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onReduceSetup()  throws IOException, InterruptedException {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onReduce()  throws IOException {
+        // no-op
+    }
+
+    /**
+     * Invoked on the named stage.
+     */
+    public void onReduceCleanup()  throws IOException, InterruptedException {
+        // no-op
+    }
+
+    /**
+     * Error kind.
+     */
+    public enum Kind {
+        /** No error. */
+        Noop,
+
+        /** Runtime. */
+        Runtime,
+
+        /** IOException. */
+        IOException,
+
+        /** java.lang.Error. */
+        Error
+    }
+
+    /**
+     * Runtime error simulator.
+     */
+    public static class RuntimeExceptionBitHadoopErrorSimulator extends HadoopErrorSimulator {
+        /** Stage bits: defines what map-reduce stages will cause errors. */
+        private final int bits;
+
+        /**
+         * Constructor.
+         */
+        protected RuntimeExceptionBitHadoopErrorSimulator(int b) {
+            bits = b;
+        }
+
+        /**
+         * Simulates an error.
+         */
+        protected void simulateError() throws IOException {
+            throw new RuntimeException("An error simulated by " + getClass().getSimpleName());
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onMapConfigure() {
+            try {
+                if ((bits & 1) != 0)
+                    simulateError();
+            }
+            catch (IOException e) {
+                // ignore
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onMapSetup() throws IOException, InterruptedException {
+            if ((bits & 2) != 0)
+                simulateError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onMap() throws IOException {
+            if ((bits & 4) != 0)
+                simulateError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onMapCleanup() throws IOException, InterruptedException {
+            if ((bits & 8) != 0)
+                simulateError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onCombineConfigure() {
+            try {
+                if ((bits & 16) != 0)
+                    simulateError();
+            }
+            catch (IOException e) {
+                // ignore
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onCombineSetup() throws IOException, InterruptedException {
+            if ((bits & 32) != 0)
+                simulateError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onCombine() throws IOException {
+            if ((bits & 64) != 0)
+                simulateError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onCombineCleanup() throws IOException, InterruptedException {
+            if ((bits & 128) != 0)
+                simulateError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onReduceConfigure() {
+            try {
+                if ((bits & 256) != 0)
+                    simulateError();
+            }
+            catch (IOException e) {
+                // ignore
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onReduceSetup() throws IOException, InterruptedException {
+            if ((bits & 512) != 0)
+                simulateError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onReduce() throws IOException {
+            if ((bits & 1024) != 0)
+                simulateError();
+        }
+
+        /** {@inheritDoc} */
+        @Override public final void onReduceCleanup() throws IOException, InterruptedException {
+            if ((bits & 2048) != 0)
+                simulateError();
+        }
+    }
+
+    /**
+     * java.lang.Error simulator.
+     */
+    public static class ErrorBitHadoopErrorSimulator extends RuntimeExceptionBitHadoopErrorSimulator {
+        /**
+         * Constructor.
+         */
+        public ErrorBitHadoopErrorSimulator(int bits) {
+            super(bits);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void simulateError() {
+            throw new Error("An error simulated by " + getClass().getSimpleName());
+        }
+    }
+
+    /**
+     * IOException simulator.
+     */
+    public static class IOExceptionBitHadoopErrorSimulator extends RuntimeExceptionBitHadoopErrorSimulator {
+        /**
+         * Constructor.
+         */
+        public IOExceptionBitHadoopErrorSimulator(int bits) {
+            super(bits);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void simulateError() throws IOException {
+            throw new IOException("An IOException simulated by " + getClass().getSimpleName());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopFileSystemsTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopFileSystemsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopFileSystemsTest.java
new file mode 100644
index 0000000..252d6cb
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopFileSystemsTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSystemsUtils;
+import org.apache.ignite.testframework.GridTestUtils;
+
+/**
+ * Test file systems for the working directory multi-threading support.
+ */
+public class HadoopFileSystemsTest extends HadoopAbstractSelfTest {
+    /** the number of threads */
+    private static final int THREAD_COUNT = 3;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        startGrids(gridCount());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean igfsEnabled() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 1;
+    }
+
+
+    /**
+     * Test the file system with specified URI for the multi-thread working directory support.
+     *
+     * @param uri Base URI of the file system (scheme and authority).
+     * @throws Exception If fails.
+     */
+    private void testFileSystem(final URI uri) throws Exception {
+        final Configuration cfg = new Configuration();
+
+        setupFileSystems(cfg);
+
+        cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP,
+            new Path(new Path(uri), "user/" + System.getProperty("user.name")).toString());
+
+        final CountDownLatch changeUserPhase = new CountDownLatch(THREAD_COUNT);
+        final CountDownLatch changeDirPhase = new CountDownLatch(THREAD_COUNT);
+        final CountDownLatch changeAbsDirPhase = new CountDownLatch(THREAD_COUNT);
+        final CountDownLatch finishPhase = new CountDownLatch(THREAD_COUNT);
+
+        final Path[] newUserInitWorkDir = new Path[THREAD_COUNT];
+        final Path[] newWorkDir = new Path[THREAD_COUNT];
+        final Path[] newAbsWorkDir = new Path[THREAD_COUNT];
+        final Path[] newInstanceWorkDir = new Path[THREAD_COUNT];
+
+        final AtomicInteger threadNum = new AtomicInteger(0);
+
+        GridTestUtils.runMultiThreadedAsync(new Runnable() {
+            @Override public void run() {
+                try {
+                    int curThreadNum = threadNum.getAndIncrement();
+
+                    if ("file".equals(uri.getScheme()))
+                        FileSystem.get(uri, cfg).setWorkingDirectory(new Path("file:///user/user" + curThreadNum));
+
+                    changeUserPhase.countDown();
+                    changeUserPhase.await();
+
+                    newUserInitWorkDir[curThreadNum] = FileSystem.get(uri, cfg).getWorkingDirectory();
+
+                    FileSystem.get(uri, cfg).setWorkingDirectory(new Path("folder" + curThreadNum));
+
+                    changeDirPhase.countDown();
+                    changeDirPhase.await();
+
+                    newWorkDir[curThreadNum] = FileSystem.get(uri, cfg).getWorkingDirectory();
+
+                    FileSystem.get(uri, cfg).setWorkingDirectory(new Path("/folder" + curThreadNum));
+
+                    changeAbsDirPhase.countDown();
+                    changeAbsDirPhase.await();
+
+                    newAbsWorkDir[curThreadNum] = FileSystem.get(uri, cfg).getWorkingDirectory();
+
+                    newInstanceWorkDir[curThreadNum] = FileSystem.newInstance(uri, cfg).getWorkingDirectory();
+
+                    finishPhase.countDown();
+                }
+                catch (InterruptedException | IOException e) {
+                    error("Failed to execute test thread.", e);
+
+                    fail();
+                }
+            }
+        }, THREAD_COUNT, "filesystems-test");
+
+        finishPhase.await();
+
+        for (int i = 0; i < THREAD_COUNT; i ++) {
+            cfg.set(MRJobConfig.USER_NAME, "user" + i);
+
+            Path workDir = new Path(new Path(uri), "user/user" + i);
+
+            cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, workDir.toString());
+
+            assertEquals(workDir, FileSystem.newInstance(uri, cfg).getWorkingDirectory());
+
+            assertEquals(workDir, newUserInitWorkDir[i]);
+
+            assertEquals(new Path(new Path(uri), "user/user" + i + "/folder" + i), newWorkDir[i]);
+
+            assertEquals(new Path("/folder" + i), newAbsWorkDir[i]);
+
+            assertEquals(new Path(new Path(uri), "user/" + System.getProperty("user.name")), newInstanceWorkDir[i]);
+        }
+
+        System.out.println(System.getProperty("user.dir"));
+    }
+
+    /**
+     * Test LocalFS multi-thread working directory.
+     *
+     * @throws Exception If fails.
+     */
+    public void testLocal() throws Exception {
+        testFileSystem(URI.create("file:///"));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java
new file mode 100644
index 0000000..19c71e8
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.internal.processors.hadoop.state.HadoopGroupingTestState;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.util.GridRandom;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo;
+
+/**
+ * Grouping test.
+ */
+public class HadoopGroupingTest extends HadoopAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    protected boolean igfsEnabled() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        startGrids(gridCount());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
+        HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+        // TODO: IGNITE-404: Uncomment when fixed.
+        //cfg.setExternalExecution(false);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGroupingReducer() throws Exception {
+        doTestGrouping(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGroupingCombiner() throws Exception {
+        doTestGrouping(true);
+    }
+
+    /**
+     * @param combiner With combiner.
+     * @throws Exception If failed.
+     */
+    public void doTestGrouping(boolean combiner) throws Exception {
+        HadoopGroupingTestState.values().clear();
+
+        Job job = Job.getInstance();
+
+        job.setInputFormatClass(InFormat.class);
+        job.setOutputFormatClass(OutFormat.class);
+
+        job.setOutputKeyClass(YearTemperature.class);
+        job.setOutputValueClass(Text.class);
+
+        job.setMapperClass(Mapper.class);
+
+        if (combiner) {
+            job.setCombinerClass(MyReducer.class);
+            job.setNumReduceTasks(0);
+            job.setCombinerKeyGroupingComparatorClass(YearComparator.class);
+        }
+        else {
+            job.setReducerClass(MyReducer.class);
+            job.setNumReduceTasks(4);
+            job.setGroupingComparatorClass(YearComparator.class);
+        }
+
+        grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 2),
+            createJobInfo(job.getConfiguration())).get(30000);
+
+        assertTrue(HadoopGroupingTestState.values().isEmpty());
+    }
+
+    public static class MyReducer extends Reducer<YearTemperature, Text, Text, Object> {
+        /** */
+        int lastYear;
+
+        @Override protected void reduce(YearTemperature key, Iterable<Text> vals0, Context context)
+            throws IOException, InterruptedException {
+            X.println("___ : " + context.getTaskAttemptID() + " --> " + key);
+
+            Set<UUID> ids = new HashSet<>();
+
+            for (Text val : vals0)
+                assertTrue(ids.add(UUID.fromString(val.toString())));
+
+            for (Text val : vals0)
+                assertTrue(ids.remove(UUID.fromString(val.toString())));
+
+            assertTrue(ids.isEmpty());
+
+            assertTrue(key.year > lastYear);
+
+            lastYear = key.year;
+
+            for (Text val : vals0)
+                assertTrue(HadoopGroupingTestState.values().remove(UUID.fromString(val.toString())));
+        }
+    }
+
+    public static class YearComparator implements RawComparator<YearTemperature> { // Grouping comparator.
+        /** {@inheritDoc} */
+        @Override public int compare(YearTemperature o1, YearTemperature o2) {
+            return Integer.compare(o1.year, o2.year);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            throw new IllegalStateException();
+        }
+    }
+
+    public static class YearTemperature implements WritableComparable<YearTemperature>, Cloneable {
+        /** */
+        private int year;
+
+        /** */
+        private int temperature;
+
+        /** {@inheritDoc} */
+        @Override public void write(DataOutput out) throws IOException {
+            out.writeInt(year);
+            out.writeInt(temperature);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readFields(DataInput in) throws IOException {
+            year = in.readInt();
+            temperature = in.readInt();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            throw new IllegalStateException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() { // To be partitioned by year.
+            return year;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int compareTo(YearTemperature o) {
+            int res = Integer.compare(year, o.year);
+
+            if (res != 0)
+                return res;
+
+            // Sort comparator by year and temperature, to find max for year.
+            return Integer.compare(o.temperature, temperature);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(YearTemperature.class, this);
+        }
+    }
+
+    public static class InFormat extends InputFormat<YearTemperature, Text> {
+        /** {@inheritDoc} */
+        @Override public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
+            ArrayList<InputSplit> list = new ArrayList<>();
+
+            for (int i = 0; i < 10; i++)
+                list.add(new HadoopSortingTest.FakeSplit(20));
+
+            return list;
+        }
+
+        /** {@inheritDoc} */
+        @Override public RecordReader<YearTemperature, Text> createRecordReader(final InputSplit split,
+            TaskAttemptContext context) throws IOException, InterruptedException {
+            return new RecordReader<YearTemperature, Text>() {
+                /** */
+                int cnt;
+
+                /** */
+                Random rnd = new GridRandom();
+
+                /** */
+                YearTemperature key = new YearTemperature();
+
+                /** */
+                Text val = new Text();
+
+                @Override public void initialize(InputSplit split, TaskAttemptContext context) {
+                    // No-op.
+                }
+
+                @Override public boolean nextKeyValue() throws IOException, InterruptedException {
+                    return cnt++ < split.getLength();
+                }
+
+                @Override public YearTemperature getCurrentKey() {
+                    key.year = 1990 + rnd.nextInt(10);
+                    key.temperature = 10 + rnd.nextInt(20);
+
+                    return key;
+                }
+
+                @Override public Text getCurrentValue() {
+                    UUID id = UUID.randomUUID();
+
+                    assertTrue(HadoopGroupingTestState.values().add(id));
+
+                    val.set(id.toString());
+
+                    return val;
+                }
+
+                @Override public float getProgress() {
+                    return 0;
+                }
+
+                @Override public void close() {
+                    // No-op.
+                }
+            };
+        }
+    }
+
+    /**
+     *
+     */
+    public static class OutFormat extends OutputFormat {
+        /** {@inheritDoc} */
+        @Override public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
+            return null;
+        }
+    }
+}
\ No newline at end of file