You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/09/27 11:09:09 UTC

[14/63] [abbrv] ignite git commit: IGNITE-3912: Hadoop: Implemented new class loading architecture for embedded execution mode.

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
new file mode 100644
index 0000000..027f921
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
@@ -0,0 +1,550 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteFileSystem;
+import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.hadoop.Hadoop;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+
+import static org.apache.ignite.internal.processors.hadoop.state.HadoopTaskExecutionSelfTestValues.cancelledTasks;
+import static org.apache.ignite.internal.processors.hadoop.state.HadoopTaskExecutionSelfTestValues.executedTasks;
+import static org.apache.ignite.internal.processors.hadoop.state.HadoopTaskExecutionSelfTestValues.failMapperId;
+import static org.apache.ignite.internal.processors.hadoop.state.HadoopTaskExecutionSelfTestValues.splitsCount;
+import static org.apache.ignite.internal.processors.hadoop.state.HadoopTaskExecutionSelfTestValues.taskWorkDirs;
+import static org.apache.ignite.internal.processors.hadoop.state.HadoopTaskExecutionSelfTestValues.totalLineCnt;
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo;
+
+/**
+ * Tests map-reduce task execution basics.
+ */
+public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
+    /** Test param. */
+    private static final String MAP_WRITE = "test.map.write";
+
+    /** {@inheritDoc} */
+    @Override public FileSystemConfiguration igfsConfiguration() throws Exception {
+        FileSystemConfiguration cfg = super.igfsConfiguration();
+
+        cfg.setFragmentizerEnabled(false);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean igfsEnabled() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(gridCount());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        grid(0).fileSystem(igfsName).format();
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
+        HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+        cfg.setMaxParallelTasks(5);
+
+        // TODO: IGNITE-404: Uncomment when fixed.
+        //cfg.setExternalExecution(false);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMapRun() throws Exception {
+        int lineCnt = 10000;
+        String fileName = "/testFile";
+
+        prepareFile(fileName, lineCnt);
+
+        totalLineCnt.set(0);
+        taskWorkDirs.clear();
+
+        Configuration cfg = new Configuration();
+
+        cfg.setStrings("fs.igfs.impl", IgniteHadoopFileSystem.class.getName());
+
+        Job job = Job.getInstance(cfg);
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(IntWritable.class);
+
+        job.setMapperClass(TestMapper.class);
+
+        job.setNumReduceTasks(0);
+
+        job.setInputFormatClass(TextInputFormat.class);
+
+        FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/"));
+        FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output/"));
+
+        job.setJarByClass(getClass());
+
+        IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
+                createJobInfo(job.getConfiguration()));
+
+        fut.get();
+
+        assertEquals(lineCnt, totalLineCnt.get());
+
+        assertEquals(32, taskWorkDirs.size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMapCombineRun() throws Exception {
+        int lineCnt = 10001;
+        String fileName = "/testFile";
+
+        prepareFile(fileName, lineCnt);
+
+        totalLineCnt.set(0);
+        taskWorkDirs.clear();
+
+        Configuration cfg = new Configuration();
+
+        cfg.setStrings("fs.igfs.impl", IgniteHadoopFileSystem.class.getName());
+        cfg.setBoolean(MAP_WRITE, true);
+
+        Job job = Job.getInstance(cfg);
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(IntWritable.class);
+
+        job.setMapperClass(TestMapper.class);
+        job.setCombinerClass(TestCombiner.class);
+        job.setReducerClass(TestReducer.class);
+
+        job.setNumReduceTasks(2);
+
+        job.setInputFormatClass(TextInputFormat.class);
+
+        FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/"));
+        FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output"));
+
+        job.setJarByClass(getClass());
+
+        HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 2);
+
+        IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+
+        fut.get();
+
+        assertEquals(lineCnt, totalLineCnt.get());
+
+        assertEquals(34, taskWorkDirs.size());
+
+        for (int g = 0; g < gridCount(); g++)
+            grid(g).hadoop().finishFuture(jobId).get();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMapperException() throws Exception {
+        prepareFile("/testFile", 1000);
+
+        Configuration cfg = new Configuration();
+
+        cfg.setStrings("fs.igfs.impl", IgniteHadoopFileSystem.class.getName());
+
+        Job job = Job.getInstance(cfg);
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(IntWritable.class);
+
+        job.setMapperClass(FailMapper.class);
+
+        job.setNumReduceTasks(0);
+
+        job.setInputFormatClass(TextInputFormat.class);
+
+        FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/"));
+        FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output/"));
+
+        job.setJarByClass(getClass());
+
+        final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 3),
+                createJobInfo(job.getConfiguration()));
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                fut.get();
+
+                return null;
+            }
+        }, IgniteCheckedException.class, null);
+    }
+
+    /**
+     * @param fileName File name.
+     * @param lineCnt Line count.
+     * @throws Exception If failed.
+     */
+    private void prepareFile(String fileName, int lineCnt) throws Exception {
+        IgniteFileSystem igfs = grid(0).fileSystem(igfsName);
+
+        try (OutputStream os = igfs.create(new IgfsPath(fileName), true)) {
+            PrintWriter w = new PrintWriter(new OutputStreamWriter(os));
+
+            for (int i = 0; i < lineCnt; i++)
+                w.print("Hello, Hadoop map-reduce!\n");
+
+            w.flush();
+        }
+    }
+
+    /**
+     * Prepare job with mappers to cancel.
+     * @return Fully configured job.
+     * @throws Exception If fails.
+     */
+    private Configuration prepareJobForCancelling() throws Exception {
+        prepareFile("/testFile", 1500);
+
+        executedTasks.set(0);
+        cancelledTasks.set(0);
+        failMapperId.set(0);
+        splitsCount.set(0);
+
+        Configuration cfg = new Configuration();
+
+        setupFileSystems(cfg);
+
+        Job job = Job.getInstance(cfg);
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(IntWritable.class);
+
+        job.setMapperClass(CancellingTestMapper.class);
+
+        job.setNumReduceTasks(0);
+
+        job.setInputFormatClass(InFormat.class);
+
+        FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/"));
+        FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output/"));
+
+        job.setJarByClass(getClass());
+
+        return job.getConfiguration();
+    }
+
+    /**
+     * Test input format.
+     */
+    private static class InFormat extends TextInputFormat {
+        @Override public List<InputSplit> getSplits(JobContext ctx) throws IOException {
+            List<InputSplit> res = super.getSplits(ctx);
+
+            splitsCount.set(res.size());
+
+            X.println("___ split of input: " + splitsCount.get());
+
+            return res;
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTaskCancelling() throws Exception {
+        Configuration cfg = prepareJobForCancelling();
+
+        HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1);
+
+        final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(cfg));
+
+        if (!GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return splitsCount.get() > 0;
+            }
+        }, 20000)) {
+            U.dumpThreads(log);
+
+            assertTrue(false);
+        }
+
+        if (!GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return executedTasks.get() == splitsCount.get();
+            }
+        }, 20000)) {
+            U.dumpThreads(log);
+
+            assertTrue(false);
+        }
+
+        // Fail mapper with id "1", cancels others
+        failMapperId.set(1);
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                fut.get();
+
+                return null;
+            }
+        }, IgniteCheckedException.class, null);
+
+        assertEquals(executedTasks.get(), cancelledTasks.get() + 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJobKill() throws Exception {
+        Configuration cfg = prepareJobForCancelling();
+
+        Hadoop hadoop = grid(0).hadoop();
+
+        HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1);
+
+        //Kill unknown job.
+        boolean killRes = hadoop.kill(jobId);
+
+        assertFalse(killRes);
+
+        final IgniteInternalFuture<?> fut = hadoop.submit(jobId, createJobInfo(cfg));
+
+        if (!GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return splitsCount.get() > 0;
+            }
+        }, 20000)) {
+            U.dumpThreads(log);
+
+            assertTrue(false);
+        }
+
+        if (!GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                X.println("___ executed tasks: " + executedTasks.get());
+
+                return executedTasks.get() == splitsCount.get();
+            }
+        }, 20000)) {
+            U.dumpThreads(log);
+
+            fail();
+        }
+
+        //Kill really ran job.
+        killRes = hadoop.kill(jobId);
+
+        assertTrue(killRes);
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                fut.get();
+
+                return null;
+            }
+        }, IgniteCheckedException.class, null);
+
+        assertEquals(executedTasks.get(), cancelledTasks.get());
+
+        //Kill the same job again.
+        killRes = hadoop.kill(jobId);
+
+        assertFalse(killRes);
+    }
+
+    private static class CancellingTestMapper extends Mapper<Object, Text, Text, IntWritable> {
+        private int mapperId;
+
+        /** {@inheritDoc} */
+        @Override protected void setup(Context ctx) throws IOException, InterruptedException {
+            mapperId = executedTasks.incrementAndGet();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run(Context ctx) throws IOException, InterruptedException {
+            try {
+                super.run(ctx);
+            }
+            catch (HadoopTaskCancelledException e) {
+                cancelledTasks.incrementAndGet();
+
+                throw e;
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException {
+            if (mapperId == failMapperId.get())
+                throw new IOException();
+
+            Thread.sleep(1000);
+        }
+    }
+
+    /**
+     * Test failing mapper.
+     */
+    private static class FailMapper extends Mapper<Object, Text, Text, IntWritable> {
+        /** {@inheritDoc} */
+        @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException {
+            throw new IOException("Expected");
+        }
+    }
+
+    /**
+     * Mapper calculates number of lines.
+     */
+    private static class TestMapper extends Mapper<Object, Text, Text, IntWritable> {
+        /** Writable integer constant of '1'. */
+        private static final IntWritable ONE = new IntWritable(1);
+
+        /** Line count constant. */
+        public static final Text LINE_COUNT = new Text("lineCount");
+
+        /** {@inheritDoc} */
+        @Override protected void setup(Context ctx) throws IOException, InterruptedException {
+            X.println("___ Mapper: " + ctx.getTaskAttemptID());
+
+            String taskId = ctx.getTaskAttemptID().toString();
+
+            LocalFileSystem locFs = FileSystem.getLocal(ctx.getConfiguration());
+
+            String workDir = locFs.getWorkingDirectory().toString();
+
+            assertNull(taskWorkDirs.put(workDir, taskId));
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException {
+            if (ctx.getConfiguration().getBoolean(MAP_WRITE, false))
+                ctx.write(LINE_COUNT, ONE);
+            else
+                totalLineCnt.incrementAndGet();
+        }
+    }
+
+    /**
+     * Combiner calculates number of lines.
+     */
+    private static class TestCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
+        /** */
+        IntWritable sum = new IntWritable();
+
+        /** {@inheritDoc} */
+        @Override protected void setup(Context ctx) throws IOException, InterruptedException {
+            X.println("___ Combiner: ");
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException,
+            InterruptedException {
+            int lineCnt = 0;
+
+            for (IntWritable value : values)
+                lineCnt += value.get();
+
+            sum.set(lineCnt);
+
+            X.println("___ combo: " + lineCnt);
+
+            ctx.write(key, sum);
+        }
+    }
+
+    /**
+     * Combiner calculates number of lines.
+     */
+    private static class TestReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
+        /** */
+        IntWritable sum = new IntWritable();
+
+        /** {@inheritDoc} */
+        @Override protected void setup(Context ctx) throws IOException, InterruptedException {
+            X.println("___ Reducer: " + ctx.getTaskAttemptID());
+
+            String taskId = ctx.getTaskAttemptID().toString();
+            String workDir = FileSystem.getLocal(ctx.getConfiguration()).getWorkingDirectory().toString();
+
+            assertNull(taskWorkDirs.put(workDir, taskId));
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException,
+            InterruptedException {
+            int lineCnt = 0;
+
+            for (IntWritable value : values) {
+                lineCnt += value.get();
+
+                X.println("___ rdcr: " + value.get());
+            }
+
+            sum.set(lineCnt);
+
+            ctx.write(key, sum);
+
+            X.println("___ RDCR SUM: " + lineCnt);
+
+            totalLineCnt.addAndGet(lineCnt);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksAllVersionsTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksAllVersionsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksAllVersionsTest.java
new file mode 100644
index 0000000..8b1b693
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksAllVersionsTest.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import com.google.common.base.Joiner;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
+import org.apache.ignite.internal.processors.hadoop.impl.examples.HadoopWordCount2;
+
+/**
+ * Tests of Map, Combine and Reduce task executions of any version of hadoop API.
+ */
+abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
+    /** Empty hosts array. */
+    private static final String[] HOSTS = new String[0];
+
+    /**
+     * Creates some grid hadoop job. Override this method to create tests for any job implementation.
+     *
+     * @param inFile Input file name for the job.
+     * @param outFile Output file name for the job.
+     * @return Hadoop job.
+     * @throws IOException If fails.
+     */
+    public abstract HadoopJob getHadoopJob(String inFile, String outFile) throws Exception;
+
+    /**
+     * @return prefix of reducer output file name. It's "part-" for v1 and "part-r-" for v2 API
+     */
+    public abstract String getOutputFileNamePrefix();
+
+    /**
+     * Tests map task execution.
+     *
+     * @throws Exception If fails.
+     */
+    @SuppressWarnings("ConstantConditions")
+    public void testMapTask() throws Exception {
+        IgfsPath inDir = new IgfsPath(PATH_INPUT);
+
+        igfs.mkdirs(inDir);
+
+        IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input");
+
+        URI inFileUri = URI.create(igfsScheme() + inFile.toString());
+
+        try (PrintWriter pw = new PrintWriter(igfs.create(inFile, true))) {
+            pw.println("hello0 world0");
+            pw.println("world1 hello1");
+        }
+
+        HadoopFileBlock fileBlock1 = new HadoopFileBlock(HOSTS, inFileUri, 0, igfs.info(inFile).length() - 1);
+
+        try (PrintWriter pw = new PrintWriter(igfs.append(inFile, false))) {
+            pw.println("hello2 world2");
+            pw.println("world3 hello3");
+        }
+        HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, fileBlock1.length(),
+                igfs.info(inFile).length() - fileBlock1.length());
+
+        HadoopJob gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT);
+
+        HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock1);
+
+        HadoopTestTaskContext ctx = new HadoopTestTaskContext(taskInfo, gridJob);
+
+        ctx.mockOutput().clear();
+
+        ctx.run();
+
+        assertEquals("hello0,1; world0,1; world1,1; hello1,1", Joiner.on("; ").join(ctx.mockOutput()));
+
+        ctx.mockOutput().clear();
+
+        ctx.taskInfo(new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock2));
+
+        ctx.run();
+
+        assertEquals("hello2,1; world2,1; world3,1; hello3,1", Joiner.on("; ").join(ctx.mockOutput()));
+    }
+
+    /**
+     * Generates input data for reduce-like operation into mock context input and runs the operation.
+     *
+     * @param gridJob Job is to create reduce task from.
+     * @param taskType Type of task - combine or reduce.
+     * @param taskNum Number of task in job.
+     * @param words Pairs of words and its counts.
+     * @return Context with mock output.
+     * @throws IgniteCheckedException If fails.
+     */
+    private HadoopTestTaskContext runTaskWithInput(HadoopJob gridJob, HadoopTaskType taskType,
+        int taskNum, String... words) throws IgniteCheckedException {
+        HadoopTaskInfo taskInfo = new HadoopTaskInfo(taskType, gridJob.id(), taskNum, 0, null);
+
+        HadoopTestTaskContext ctx = new HadoopTestTaskContext(taskInfo, gridJob);
+
+        for (int i = 0; i < words.length; i+=2) {
+            List<IntWritable> valList = new ArrayList<>();
+
+            for (int j = 0; j < Integer.parseInt(words[i + 1]); j++)
+                valList.add(new IntWritable(1));
+
+            ctx.mockInput().put(new Text(words[i]), valList);
+        }
+
+        ctx.run();
+
+        return ctx;
+    }
+
+    /**
+     * Tests reduce task execution.
+     *
+     * @throws Exception If fails.
+     */
+    public void testReduceTask() throws Exception {
+        HadoopJob gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT);
+
+        runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 0, "word1", "5", "word2", "10");
+        runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 1, "word3", "7", "word4", "15");
+
+        assertEquals(
+            "word1\t5\n" +
+            "word2\t10\n",
+            readAndSortFile(PATH_OUTPUT + "/_temporary/0/task_00000000-0000-0000-0000-000000000000_0000_r_000000/" +
+                getOutputFileNamePrefix() + "00000")
+        );
+
+        assertEquals(
+            "word3\t7\n" +
+            "word4\t15\n",
+            readAndSortFile(PATH_OUTPUT + "/_temporary/0/task_00000000-0000-0000-0000-000000000000_0000_r_000001/" +
+                getOutputFileNamePrefix() + "00001")
+        );
+    }
+
+    /**
+     * Tests combine task execution.
+     *
+     * @throws Exception If fails.
+     */
+    public void testCombinerTask() throws Exception {
+        HadoopJob gridJob = getHadoopJob("/", "/");
+
+        HadoopTestTaskContext ctx =
+            runTaskWithInput(gridJob, HadoopTaskType.COMBINE, 0, "word1", "5", "word2", "10");
+
+        assertEquals("word1,5; word2,10", Joiner.on("; ").join(ctx.mockOutput()));
+
+        ctx = runTaskWithInput(gridJob, HadoopTaskType.COMBINE, 1, "word3", "7", "word4", "15");
+
+        assertEquals("word3,7; word4,15", Joiner.on("; ").join(ctx.mockOutput()));
+    }
+
+    /**
+     * Runs chain of map-combine task on file block.
+     *
+     * @param fileBlock block of input file to be processed.
+     * @param gridJob Hadoop job implementation.
+     * @return Context of combine task with mock output.
+     * @throws IgniteCheckedException If fails.
+     */
+    private HadoopTestTaskContext runMapCombineTask(HadoopFileBlock fileBlock, HadoopJob gridJob)
+        throws IgniteCheckedException {
+        HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock);
+
+        HadoopTestTaskContext mapCtx = new HadoopTestTaskContext(taskInfo, gridJob);
+
+        mapCtx.run();
+
+        //Prepare input for combine
+        taskInfo = new HadoopTaskInfo(HadoopTaskType.COMBINE, gridJob.id(), 0, 0, null);
+
+        HadoopTestTaskContext combineCtx = new HadoopTestTaskContext(taskInfo, gridJob);
+
+        combineCtx.makeTreeOfWritables(mapCtx.mockOutput());
+
+        combineCtx.run();
+
+        return combineCtx;
+    }
+
+    /**
+     * Tests all job in complex.
+     * Runs 2 chains of map-combine tasks and sends result into one reduce task.
+     *
+     * @throws Exception If fails.
+     */
+    @SuppressWarnings("ConstantConditions")
+    public void testAllTasks() throws Exception {
+        IgfsPath inDir = new IgfsPath(PATH_INPUT);
+
+        igfs.mkdirs(inDir);
+
+        IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input");
+
+        URI inFileUri = URI.create(igfsScheme() + inFile.toString());
+
+        generateTestFile(inFile.toString(), "red", 100, "blue", 200, "green", 150, "yellow", 70);
+
+        //Split file into two blocks
+        long fileLen = igfs.info(inFile).length();
+
+        Long l = fileLen / 2;
+
+        HadoopFileBlock fileBlock1 = new HadoopFileBlock(HOSTS, inFileUri, 0, l);
+        HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, l, fileLen - l);
+
+        HadoopJob gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT);
+
+        HadoopTestTaskContext combine1Ctx = runMapCombineTask(fileBlock1, gridJob);
+
+        HadoopTestTaskContext combine2Ctx = runMapCombineTask(fileBlock2, gridJob);
+
+        //Prepare input for combine
+        HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.REDUCE, gridJob.id(), 0, 0, null);
+
+        HadoopTestTaskContext reduceCtx = new HadoopTestTaskContext(taskInfo, gridJob);
+
+        reduceCtx.makeTreeOfWritables(combine1Ctx.mockOutput());
+        reduceCtx.makeTreeOfWritables(combine2Ctx.mockOutput());
+
+        reduceCtx.run();
+
+        reduceCtx.taskInfo(new HadoopTaskInfo(HadoopTaskType.COMMIT, gridJob.id(), 0, 0, null));
+
+        reduceCtx.run();
+
+        assertEquals(
+            "blue\t200\n" +
+            "green\t150\n" +
+            "red\t100\n" +
+            "yellow\t70\n",
+            readAndSortFile(PATH_OUTPUT + "/" + getOutputFileNamePrefix() + "00000")
+        );
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV1Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV1Test.java
new file mode 100644
index 0000000..d7cd738
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV1Test.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.io.IOException;
+import java.util.UUID;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopHelperImpl;
+import org.apache.ignite.internal.processors.hadoop.impl.examples.HadoopWordCount1;
+import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopV2Job;
+
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo;
+
+/**
+ * Tests of Map, Combine and Reduce task executions via running of job of hadoop API v1.
+ */
+public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest {
+    /**
+     * Creates WordCount hadoop job for API v1.
+     *
+     * @param inFile Input file name for the job.
+     * @param outFile Output file name for the job.
+     * @return Hadoop job.
+     * @throws IOException If fails.
+     */
+    @Override public HadoopJob getHadoopJob(String inFile, String outFile) throws Exception {
+        JobConf jobConf = HadoopWordCount1.getJob(inFile, outFile);
+
+        setupFileSystems(jobConf);
+
+        HadoopDefaultJobInfo jobInfo = createJobInfo(jobConf);
+
+        UUID uuid = new UUID(0, 0);
+
+        HadoopJobId jobId = new HadoopJobId(uuid, 0);
+
+        return jobInfo.createJob(HadoopV2Job.class, jobId, log, null, new HadoopHelperImpl());
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getOutputFileNamePrefix() {
+        return "part-";
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV2Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV2Test.java
new file mode 100644
index 0000000..c635c41
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTasksV2Test.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopHelperImpl;
+import org.apache.ignite.internal.processors.hadoop.impl.examples.HadoopWordCount2;
+import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopV2Job;
+
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo;
+
+/**
+ * Tests of Map, Combine and Reduce task executions via running of job of hadoop API v2.
+ */
+public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest {
+    /**
+     * Creates WordCount hadoop job for API v2.
+     *
+     * @param inFile Input file name for the job.
+     * @param outFile Output file name for the job.
+     * @return Hadoop job.
+     * @throws Exception if fails.
+     */
+    @Override public HadoopJob getHadoopJob(String inFile, String outFile) throws Exception {
+        Job job = Job.getInstance();
+
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(IntWritable.class);
+
+        HadoopWordCount2.setTasksClasses(job, true, true, true, false);
+
+        Configuration conf = job.getConfiguration();
+
+        setupFileSystems(conf);
+
+        FileInputFormat.setInputPaths(job, new Path(inFile));
+        FileOutputFormat.setOutputPath(job, new Path(outFile));
+
+        job.setJarByClass(HadoopWordCount2.class);
+
+        Job hadoopJob = HadoopWordCount2.getJob(inFile, outFile);
+
+        HadoopDefaultJobInfo jobInfo = createJobInfo(hadoopJob.getConfiguration());
+
+        UUID uuid = new UUID(0, 0);
+
+        HadoopJobId jobId = new HadoopJobId(uuid, 0);
+
+        return jobInfo.createJob(HadoopV2Job.class, jobId, log, null, new HadoopHelperImpl());
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getOutputFileNamePrefix() {
+        return "part-r-";
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTestRoundRobinMrPlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTestRoundRobinMrPlanner.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTestRoundRobinMrPlanner.java
new file mode 100644
index 0000000..81f6f3c
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTestRoundRobinMrPlanner.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner;
+import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Round-robin mr planner.
+ */
+public class HadoopTestRoundRobinMrPlanner implements HadoopMapReducePlanner {
+    /** {@inheritDoc} */
+    @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top,
+        @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException {
+        if (top.isEmpty())
+            throw new IllegalArgumentException("Topology is empty");
+
+        // Has at least one element.
+        Iterator<ClusterNode> it = top.iterator();
+
+        Map<UUID, Collection<HadoopInputSplit>> mappers = new HashMap<>();
+
+        for (HadoopInputSplit block : job.input()) {
+            ClusterNode node = it.next();
+
+            Collection<HadoopInputSplit> nodeBlocks = mappers.get(node.id());
+
+            if (nodeBlocks == null) {
+                nodeBlocks = new ArrayList<>();
+
+                mappers.put(node.id(), nodeBlocks);
+            }
+
+            nodeBlocks.add(block);
+
+            if (!it.hasNext())
+                it = top.iterator();
+        }
+
+        int[] rdc = new int[job.info().reducers()];
+
+        for (int i = 0; i < rdc.length; i++)
+            rdc[i] = i;
+
+        return new HadoopDefaultMapReducePlan(mappers, Collections.singletonMap(it.next().id(), rdc));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTestTaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTestTaskContext.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTestTaskContext.java
new file mode 100644
index 0000000..cfd41cf
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTestTaskContext.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
+import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopV2TaskContext;
+
+/**
+ * Context for test purpose.
+ */
+class HadoopTestTaskContext extends HadoopV2TaskContext {
+    /**
+     * Simple key-vale pair.
+     * @param <K> Key class.
+     * @param <V> Value class.
+     */
+    public static class Pair<K,V> {
+        /** Key */
+        private K key;
+
+        /** Value */
+        private V val;
+
+        /**
+         * @param key key.
+         * @param val value.
+         */
+        Pair(K key, V val) {
+            this.key = key;
+            this.val = val;
+        }
+
+        /**
+         * Getter of key.
+         * @return key.
+         */
+        K key() {
+            return key;
+        }
+
+        /**
+         * Getter of value.
+         * @return value.
+         */
+        V value() {
+            return val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return key + "," + val;
+        }
+    }
+
+    /** Mock output container- result data of task execution if it is not overridden. */
+    private List<Pair<String, Integer>> mockOutput = new ArrayList<>();
+
+    /** Mock input container- input data if it is not overridden. */
+    private Map<Object,List> mockInput = new TreeMap<>();
+
+    /** Context output implementation to write data into mockOutput. */
+    private HadoopTaskOutput output = new HadoopTaskOutput() {
+        /** {@inheritDoc} */
+        @Override public void write(Object key, Object val) {
+            //Check of casting and extract/copy values
+            String strKey = new String(((Text)key).getBytes());
+            int intVal = ((IntWritable)val).get();
+
+            mockOutput().add(new Pair<>(strKey, intVal));
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() {
+            throw new UnsupportedOperationException();
+        }
+    };
+
+    /** Context input implementation to read data from mockInput. */
+    private HadoopTaskInput input = new HadoopTaskInput() {
+        /** Iterator of keys and associated lists of values. */
+        Iterator<Map.Entry<Object, List>> iter;
+
+        /** Current key and associated value list. */
+        Map.Entry<Object, List> currEntry;
+
+        /** {@inheritDoc} */
+        @Override public boolean next() {
+            if (iter == null)
+                iter = mockInput().entrySet().iterator();
+
+            if (iter.hasNext())
+                currEntry = iter.next();
+            else
+                currEntry = null;
+
+            return currEntry != null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object key() {
+            return currEntry.getKey();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Iterator<?> values() {
+            return currEntry.getValue().iterator() ;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() {
+            throw new UnsupportedOperationException();
+        }
+    };
+
+    /**
+     * Getter of mock output container - result of task if it is not overridden.
+     *
+     * @return mock output.
+     */
+    public List<Pair<String, Integer>> mockOutput() {
+        return mockOutput;
+    }
+
+    /**
+     * Getter of mock input container- input data if it is not overridden.
+     *
+     * @return mock output.
+     */
+    public Map<Object, List> mockInput() {
+        return mockInput;
+    }
+
+    /**
+     * Generate one-key-multiple-values tree from array of key-value pairs, and wrap its into Writable objects.
+     * The result is placed into mock input.
+     *
+     * @param flatData list of key-value pair.
+     */
+    public void makeTreeOfWritables(Iterable<Pair<String, Integer>> flatData) {
+        Text key = new Text();
+
+        for (Pair<String, Integer> pair : flatData) {
+            key.set(pair.key);
+            ArrayList<IntWritable> valList;
+
+            if (!mockInput.containsKey(key)) {
+                valList = new ArrayList<>();
+                mockInput.put(key, valList);
+                key = new Text();
+            }
+            else
+                valList = (ArrayList<IntWritable>) mockInput.get(key);
+            valList.add(new IntWritable(pair.value()));
+        }
+    }
+
+    /**
+     * @param taskInfo Task info.
+     * @param gridJob Grid Hadoop job.
+     */
+    public HadoopTestTaskContext(HadoopTaskInfo taskInfo, HadoopJob gridJob) throws IgniteCheckedException {
+        super(taskInfo, gridJob, gridJob.id(), null, jobConfDataInput(gridJob));
+    }
+
+    /**
+     * Creates DataInput to read JobConf.
+     *
+     * @param job Job.
+     * @return DataInput with JobConf.
+     * @throws IgniteCheckedException If failed.
+     */
+    private static DataInput jobConfDataInput(HadoopJob job) throws IgniteCheckedException {
+        JobConf jobConf = new JobConf();
+
+        for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)job.info()).properties().entrySet())
+            jobConf.set(e.getKey(), e.getValue());
+
+        ByteArrayOutputStream buf = new ByteArrayOutputStream();
+
+        try {
+            jobConf.write(new DataOutputStream(buf));
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+
+        return new DataInputStream(new ByteArrayInputStream(buf.toByteArray()));
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopTaskOutput output() {
+        return output;
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopTaskInput input() {
+        return input;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTestUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTestUtils.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTestUtils.java
new file mode 100644
index 0000000..e8ec8a9
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTestUtils.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Utility class for tests.
+ */
+public class HadoopTestUtils {
+    /** Base test directory. */
+    private static final File BASE_TEST_DIR = new File(U.getIgniteHome() + "/work/test/hadoop/");
+
+    /**
+     * @return Base directory for tests.
+     */
+    public static File baseTestDir() {
+        return BASE_TEST_DIR;
+    }
+
+    /**
+     * Get test directory.
+     *
+     * @param parts Parts.
+     * @return Directory.
+     */
+    public static File testDir(String... parts) {
+        File res = BASE_TEST_DIR;
+
+        if (parts != null) {
+            for (String part : parts)
+                res = new File(res, part);
+        }
+
+        return res;
+    }
+
+    /**
+     * Clear base test directory.
+     */
+    public static void clearBaseTestDir() {
+        if (baseTestDir().exists())
+            assert delete(baseTestDir());
+    }
+
+    /**
+     * Checks that job statistics file contains valid strings only.
+     *
+     * @param reader Buffered reader to get lines of job statistics.
+     * @return Amount of events.
+     * @throws IOException If failed.
+     */
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    public static long simpleCheckJobStatFile(BufferedReader reader) throws IOException {
+        Collection<String> phases = new HashSet<>();
+
+        phases.add("submit");
+        phases.add("prepare");
+        phases.add("start");
+        phases.add("finish");
+        phases.add("requestId");
+        phases.add("responseId");
+
+        Collection<String> evtTypes = new HashSet<>();
+
+        evtTypes.add("JOB");
+        evtTypes.add("SETUP");
+        evtTypes.add("MAP");
+        evtTypes.add("SHUFFLE");
+        evtTypes.add("REDUCE");
+        evtTypes.add("COMBINE");
+        evtTypes.add("COMMIT");
+
+        long evtCnt = 0;
+        String line;
+
+        Map<Long, String> reduceNodes = new HashMap<>();
+
+        while((line = reader.readLine()) != null) {
+            String[] splitLine = line.split(":");
+
+            //Try parse timestamp
+            Long.parseLong(splitLine[1]);
+
+            String[] evt = splitLine[0].split(" ");
+
+            assertTrue("Unknown event '" + evt[0] + "'", evtTypes.contains(evt[0]));
+
+            String phase;
+
+            if ("JOB".equals(evt[0]))
+                phase = evt[1];
+            else {
+                assertEquals(4, evt.length);
+                assertTrue("The node id is not defined", !F.isEmpty(evt[3]));
+
+                long taskNum = Long.parseLong(evt[1]);
+
+                if (("REDUCE".equals(evt[0]) || "SHUFFLE".equals(evt[0]))) {
+                    String nodeId = reduceNodes.get(taskNum);
+
+                    if (nodeId == null)
+                        reduceNodes.put(taskNum, evt[3]);
+                    else
+                        assertEquals("Different nodes for SHUFFLE and REDUCE tasks", nodeId, evt[3]);
+                }
+
+                phase = evt[2];
+            }
+
+            assertTrue("Unknown phase '" + phase + "' in " + Arrays.toString(evt), phases.contains(phase));
+
+            evtCnt++;
+        }
+
+        return evtCnt;
+    }
+
+    /**
+     * Deletes file or directory with all sub-directories and files.
+     *
+     * @param file File or directory to delete.
+     * @return {@code true} if and only if the file or directory is successfully deleted,
+     *      {@code false} otherwise
+     */
+    public static boolean delete(@Nullable File file) {
+        if (file == null)
+            return false;
+
+        boolean res = true;
+
+        if (file.isDirectory()) {
+            File[] files = file.listFiles();
+
+            if (files != null && files.length > 0)
+                for (File file1 : files)
+                    if (file1.isDirectory())
+                        res &= delete(file1);
+                    else
+                        res &= file1.delete();
+
+            res &= file.delete();
+        }
+        else
+            res = file.delete();
+
+        return res;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTxConfigCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTxConfigCacheTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTxConfigCacheTest.java
new file mode 100644
index 0000000..e85baed
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTxConfigCacheTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.IgniteTxConfigCacheSelfTest;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+
+/**
+ * Test checks whether hadoop system cache doesn't use user defined TX config.
+ */
+public class HadoopTxConfigCacheTest  extends IgniteTxConfigCacheSelfTest {
+    /**
+     * Success if system caches weren't timed out.
+     *
+     * @throws Exception
+     */
+    public void testSystemCacheTx() throws Exception {
+        final Ignite ignite = grid(0);
+
+        final IgniteInternalCache<Object, Object> hadoopCache = getSystemCache(ignite, CU.SYS_CACHE_HADOOP_MR);
+
+        checkImplicitTxSuccess(hadoopCache);
+        checkStartTxSuccess(hadoopCache);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUserLibsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUserLibsSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUserLibsSelfTest.java
new file mode 100644
index 0000000..0e4a0ef
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUserLibsSelfTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.ignite.internal.processors.hadoop.HadoopClasspathUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+
+/**
+ * Tests for user libs parsing.
+ */
+public class HadoopUserLibsSelfTest extends GridCommonAbstractTest {
+    /** Directory 1. */
+    private static final File DIR_1 = HadoopTestUtils.testDir("dir1");
+
+    /** File 1 in directory 1. */
+    private static final File FILE_1_1 = new File(DIR_1, "file1.jar");
+
+    /** File 2 in directory 1. */
+    private static final File FILE_1_2 = new File(DIR_1, "file2.jar");
+
+    /** Directory 2. */
+    private static final File DIR_2 = HadoopTestUtils.testDir("dir2");
+
+    /** File 1 in directory 2. */
+    private static final File FILE_2_1 = new File(DIR_2, "file1.jar");
+
+    /** File 2 in directory 2. */
+    private static final File FILE_2_2 = new File(DIR_2, "file2.jar");
+
+    /** Missing directory. */
+    private static final File MISSING_DIR = HadoopTestUtils.testDir("missing_dir");
+
+    /** Missing file. */
+    private static final File MISSING_FILE = new File(MISSING_DIR, "file.jar");
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        HadoopTestUtils.clearBaseTestDir();
+
+        assert DIR_1.mkdirs();
+        assert DIR_2.mkdirs();
+
+        assert FILE_1_1.createNewFile();
+        assert FILE_1_2.createNewFile();
+        assert FILE_2_1.createNewFile();
+        assert FILE_2_2.createNewFile();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        // Sanity checks before test start.
+        ensureExists(FILE_1_1);
+        ensureExists(FILE_1_2);
+        ensureExists(FILE_2_1);
+        ensureExists(FILE_2_2);
+
+        ensureNotExists(MISSING_DIR);
+        ensureNotExists(MISSING_FILE);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        HadoopTestUtils.clearBaseTestDir();
+    }
+
+    /**
+     * Test null or empty user libs.
+     *
+     * @throws Exception If failed.
+     */
+    public void testNullOrEmptyUserLibs() throws Exception {
+        assert parse(null).isEmpty();
+        assert parse("").isEmpty();
+    }
+
+    /**
+     * Test single file.
+     *
+     * @throws Exception If failed.
+     */
+    public void testSingle() throws Exception {
+        Collection<File> res = parse(single(FILE_1_1));
+
+        assert res.size() == 1;
+        assert res.contains(FILE_1_1);
+
+        res = parse(single(MISSING_FILE));
+
+        assert res.size() == 0;
+    }
+
+    /**
+     * Test multiple files.
+     *
+     * @throws Exception If failed.
+     */
+    public void testMultiple() throws Exception {
+        Collection<File> res =
+            parse(merge(single(FILE_1_1), single(FILE_1_2), single(FILE_2_1), single(FILE_2_2), single(MISSING_FILE)));
+
+        assert res.size() == 4;
+        assert res.contains(FILE_1_1);
+        assert res.contains(FILE_1_2);
+        assert res.contains(FILE_2_1);
+        assert res.contains(FILE_2_2);
+    }
+
+    /**
+     * Test single wildcard.
+     *
+     * @throws Exception If failed.
+     */
+    public void testSingleWildcard() throws Exception {
+        Collection<File> res = parse(wildcard(DIR_1));
+
+        assert res.size() == 2;
+        assert res.contains(FILE_1_1);
+        assert res.contains(FILE_1_2);
+
+        res = parse(wildcard(MISSING_DIR));
+
+        assert res.size() == 0;
+    }
+
+    /**
+     * Test multiple wildcards.
+     *
+     * @throws Exception If failed.
+     */
+    public void testMultipleWildcards() throws Exception {
+        Collection<File> res = parse(merge(wildcard(DIR_1), wildcard(DIR_2), wildcard(MISSING_DIR)));
+
+        assert res.size() == 4;
+        assert res.contains(FILE_1_1);
+        assert res.contains(FILE_1_2);
+        assert res.contains(FILE_2_1);
+        assert res.contains(FILE_2_2);
+    }
+
+    /**
+     * Test mixed tokens.
+     *
+     * @throws Exception If failed.
+     */
+    public void testMixed() throws Exception {
+        String str = merge(
+            single(FILE_1_1),
+            wildcard(DIR_2),
+            single(MISSING_FILE),
+            wildcard(MISSING_DIR)
+        );
+
+        Collection<File> res = parse(str);
+
+        assert res.size() == 3;
+        assert res.contains(FILE_1_1);
+        assert res.contains(FILE_2_1);
+        assert res.contains(FILE_2_2);
+    }
+    /**
+     * Ensure provided file exists.
+     *
+     * @param file File.
+     */
+    private static void ensureExists(File file) {
+        assert file.exists();
+    }
+
+    /**
+     * Ensure provided file doesn't exist.
+     *
+     * @param file File.
+     */
+    private static void ensureNotExists(File file) {
+        assert !file.exists();
+    }
+
+    /**
+     * Merge string using path separator.
+     *
+     * @param vals Values.
+     * @return Result.
+     */
+    private static String merge(String... vals) {
+        StringBuilder res = new StringBuilder();
+
+        if (vals != null) {
+            boolean first = true;
+
+            for (String val : vals) {
+                if (first)
+                    first = false;
+                else
+                    res.append(File.pathSeparatorChar);
+
+                res.append(val);
+            }
+        }
+
+        return res.toString();
+    }
+
+    /**
+     * Parse string.
+     *
+     * @param str String.
+     * @return Files.
+     * @throws IOException If failed.
+     */
+    Collection<File> parse(String str) throws IOException {
+        Collection<HadoopClasspathUtils.SearchDirectory> dirs = HadoopClasspathUtils.parseUserLibs(str);
+
+        Collection<File> res = new HashSet<>();
+
+        for (HadoopClasspathUtils.SearchDirectory dir : dirs)
+            Collections.addAll(res, dir.files());
+
+        return res;
+    }
+
+    /**
+     * Get absolute path to a single file.
+     *
+     * @param file File.
+     * @return Path.
+     */
+    private static String single(File file) {
+        return file.getAbsolutePath();
+    }
+
+    /**
+     * Create a wildcard.
+     *
+     * @param file File.
+     * @return Wildcard.
+     */
+    private static String wildcard(File file) {
+        return file.getAbsolutePath() + File.separatorChar + "*";
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java
new file mode 100644
index 0000000..540a7aa
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.InputStream;
+import java.util.UUID;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
+import org.apache.ignite.internal.processors.hadoop.HadoopHelperImpl;
+import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopSerializationWrapper;
+import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopV2Job;
+
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo;
+
+/**
+ * Self test of {@link org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopV2Job}.
+ */
+public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest {
+    /** */
+    private static final String TEST_SERIALIZED_VALUE = "Test serialized value";
+
+    /**
+     * Custom serialization class that accepts {@link Writable}.
+     */
+    private static class CustomSerialization extends WritableSerialization {
+        /** {@inheritDoc} */
+        @Override public Deserializer<Writable> getDeserializer(Class<Writable> c) {
+            return new Deserializer<Writable>() {
+                @Override public void open(InputStream in) { }
+
+                @Override public Writable deserialize(Writable writable) {
+                    return new Text(TEST_SERIALIZED_VALUE);
+                }
+
+                @Override public void close() { }
+            };
+        }
+    }
+
+    /**
+     * Tests that {@link HadoopJob} provides wrapped serializer if it's set in configuration.
+     *
+     * @throws IgniteCheckedException If fails.
+     */
+    public void testCustomSerializationApplying() throws IgniteCheckedException {
+        JobConf cfg = new JobConf();
+
+        cfg.setMapOutputKeyClass(IntWritable.class);
+        cfg.setMapOutputValueClass(Text.class);
+        cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName());
+
+        HadoopDefaultJobInfo info = createJobInfo(cfg);
+
+        final UUID uuid = UUID.randomUUID();
+
+        HadoopJobId id = new HadoopJobId(uuid, 1);
+
+        HadoopJob job = info.createJob(HadoopV2Job.class, id, log, null, new HadoopHelperImpl());
+
+        HadoopTaskContext taskCtx = job.getTaskContext(new HadoopTaskInfo(HadoopTaskType.MAP, null, 0, 0,
+            null));
+
+        HadoopSerialization ser = taskCtx.keySerialization();
+
+        assertEquals(HadoopSerializationWrapper.class.getName(), ser.getClass().getName());
+
+        DataInput in = new DataInputStream(new ByteArrayInputStream(new byte[0]));
+
+        assertEquals(TEST_SERIALIZED_VALUE, ser.read(in, null).toString());
+
+        ser = taskCtx.valueSerialization();
+
+        assertEquals(HadoopSerializationWrapper.class.getName(), ser.getClass().getName());
+
+        assertEquals(TEST_SERIALIZED_VALUE, ser.read(in, null).toString());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopValidationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopValidationSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopValidationSelfTest.java
new file mode 100644
index 0000000..2d61016
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopValidationSelfTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+
+/**
+ * Configuration validation tests.
+ */
+public class HadoopValidationSelfTest extends HadoopAbstractSelfTest {
+    /** Peer class loading enabled flag. */
+    public boolean peerClassLoading;
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids(true);
+
+        peerClassLoading = false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setPeerClassLoadingEnabled(peerClassLoading);
+
+        return cfg;
+    }
+
+    /**
+     * Ensure that Grid starts when all configuration parameters are valid.
+     *
+     * @throws Exception If failed.
+     */
+    public void testValid() throws Exception {
+        startGrids(1);
+    }
+}
\ No newline at end of file