You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/26 07:17:12 UTC

[16/46] ignite git commit: IGNITE-3953: Hadoop: Merged back both modules.

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java
new file mode 100644
index 0000000..19c71e8
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopGroupingTest.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.internal.processors.hadoop.state.HadoopGroupingTestState;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.util.GridRandom;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo;
+
+/**
+ * Grouping test.
+ */
+public class HadoopGroupingTest extends HadoopAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    protected boolean igfsEnabled() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        startGrids(gridCount());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
+        HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+        // TODO: IGNITE-404: Uncomment when fixed.
+        //cfg.setExternalExecution(false);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGroupingReducer() throws Exception {
+        doTestGrouping(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGroupingCombiner() throws Exception {
+        doTestGrouping(true);
+    }
+
+    /**
+     * @param combiner With combiner.
+     * @throws Exception If failed.
+     */
+    public void doTestGrouping(boolean combiner) throws Exception {
+        HadoopGroupingTestState.values().clear();
+
+        Job job = Job.getInstance();
+
+        job.setInputFormatClass(InFormat.class);
+        job.setOutputFormatClass(OutFormat.class);
+
+        job.setOutputKeyClass(YearTemperature.class);
+        job.setOutputValueClass(Text.class);
+
+        job.setMapperClass(Mapper.class);
+
+        if (combiner) {
+            job.setCombinerClass(MyReducer.class);
+            job.setNumReduceTasks(0);
+            job.setCombinerKeyGroupingComparatorClass(YearComparator.class);
+        }
+        else {
+            job.setReducerClass(MyReducer.class);
+            job.setNumReduceTasks(4);
+            job.setGroupingComparatorClass(YearComparator.class);
+        }
+
+        grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 2),
+            createJobInfo(job.getConfiguration())).get(30000);
+
+        assertTrue(HadoopGroupingTestState.values().isEmpty());
+    }
+
+    public static class MyReducer extends Reducer<YearTemperature, Text, Text, Object> {
+        /** */
+        int lastYear;
+
+        @Override protected void reduce(YearTemperature key, Iterable<Text> vals0, Context context)
+            throws IOException, InterruptedException {
+            X.println("___ : " + context.getTaskAttemptID() + " --> " + key);
+
+            Set<UUID> ids = new HashSet<>();
+
+            for (Text val : vals0)
+                assertTrue(ids.add(UUID.fromString(val.toString())));
+
+            for (Text val : vals0)
+                assertTrue(ids.remove(UUID.fromString(val.toString())));
+
+            assertTrue(ids.isEmpty());
+
+            assertTrue(key.year > lastYear);
+
+            lastYear = key.year;
+
+            for (Text val : vals0)
+                assertTrue(HadoopGroupingTestState.values().remove(UUID.fromString(val.toString())));
+        }
+    }
+
+    public static class YearComparator implements RawComparator<YearTemperature> { // Grouping comparator.
+        /** {@inheritDoc} */
+        @Override public int compare(YearTemperature o1, YearTemperature o2) {
+            return Integer.compare(o1.year, o2.year);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+            throw new IllegalStateException();
+        }
+    }
+
+    public static class YearTemperature implements WritableComparable<YearTemperature>, Cloneable {
+        /** */
+        private int year;
+
+        /** */
+        private int temperature;
+
+        /** {@inheritDoc} */
+        @Override public void write(DataOutput out) throws IOException {
+            out.writeInt(year);
+            out.writeInt(temperature);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readFields(DataInput in) throws IOException {
+            year = in.readInt();
+            temperature = in.readInt();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            throw new IllegalStateException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() { // To be partitioned by year.
+            return year;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int compareTo(YearTemperature o) {
+            int res = Integer.compare(year, o.year);
+
+            if (res != 0)
+                return res;
+
+            // Sort comparator by year and temperature, to find max for year.
+            return Integer.compare(o.temperature, temperature);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(YearTemperature.class, this);
+        }
+    }
+
+    public static class InFormat extends InputFormat<YearTemperature, Text> {
+        /** {@inheritDoc} */
+        @Override public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
+            ArrayList<InputSplit> list = new ArrayList<>();
+
+            for (int i = 0; i < 10; i++)
+                list.add(new HadoopSortingTest.FakeSplit(20));
+
+            return list;
+        }
+
+        /** {@inheritDoc} */
+        @Override public RecordReader<YearTemperature, Text> createRecordReader(final InputSplit split,
+            TaskAttemptContext context) throws IOException, InterruptedException {
+            return new RecordReader<YearTemperature, Text>() {
+                /** */
+                int cnt;
+
+                /** */
+                Random rnd = new GridRandom();
+
+                /** */
+                YearTemperature key = new YearTemperature();
+
+                /** */
+                Text val = new Text();
+
+                @Override public void initialize(InputSplit split, TaskAttemptContext context) {
+                    // No-op.
+                }
+
+                @Override public boolean nextKeyValue() throws IOException, InterruptedException {
+                    return cnt++ < split.getLength();
+                }
+
+                @Override public YearTemperature getCurrentKey() {
+                    key.year = 1990 + rnd.nextInt(10);
+                    key.temperature = 10 + rnd.nextInt(20);
+
+                    return key;
+                }
+
+                @Override public Text getCurrentValue() {
+                    UUID id = UUID.randomUUID();
+
+                    assertTrue(HadoopGroupingTestState.values().add(id));
+
+                    val.set(id.toString());
+
+                    return val;
+                }
+
+                @Override public float getProgress() {
+                    return 0;
+                }
+
+                @Override public void close() {
+                    // No-op.
+                }
+            };
+        }
+    }
+
+    /**
+     *
+     */
+    public static class OutFormat extends OutputFormat {
+        /** {@inheritDoc} */
+        @Override public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
+            return null;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java
new file mode 100644
index 0000000..a3bf49c
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.hadoop.Hadoop;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo;
+import static org.apache.ignite.internal.processors.hadoop.state.HadoopJobTrackerSelfTestState.combineExecCnt;
+import static org.apache.ignite.internal.processors.hadoop.state.HadoopJobTrackerSelfTestState.latch;
+import static org.apache.ignite.internal.processors.hadoop.state.HadoopJobTrackerSelfTestState.mapExecCnt;
+import static org.apache.ignite.internal.processors.hadoop.state.HadoopJobTrackerSelfTestState.reduceExecCnt;
+
+/**
+ * Job tracker self test.
+ */
+public class HadoopJobTrackerSelfTest extends HadoopAbstractSelfTest {
+    /** */
+    private static final String PATH_OUTPUT = "/test-out";
+
+    /** Test block count parameter name. */
+    private static final int BLOCK_CNT = 10;
+
+    /** {@inheritDoc} */
+    @Override protected boolean igfsEnabled() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(gridCount());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        latch.put("mapAwaitLatch", new CountDownLatch(1));
+        latch.put("reduceAwaitLatch", new CountDownLatch(1));
+        latch.put("combineAwaitLatch", new CountDownLatch(1));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        mapExecCnt.set(0);
+        combineExecCnt.set(0);
+        reduceExecCnt.set(0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
+        HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+        cfg.setMapReducePlanner(new HadoopTestRoundRobinMrPlanner());
+
+        // TODO: IGNITE-404: Uncomment when fixed.
+        //cfg.setExternalExecution(false);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSimpleTaskSubmit() throws Exception {
+        try {
+            UUID globalId = UUID.randomUUID();
+
+            Job job = Job.getInstance();
+            setupFileSystems(job.getConfiguration());
+
+            job.setMapperClass(TestMapper.class);
+            job.setReducerClass(TestReducer.class);
+            job.setInputFormatClass(InFormat.class);
+
+            FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT + "1"));
+
+            HadoopJobId jobId = new HadoopJobId(globalId, 1);
+
+            grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+
+            checkStatus(jobId, false);
+
+            info("Releasing map latch.");
+
+            latch.get("mapAwaitLatch").countDown();
+
+            checkStatus(jobId, false);
+
+            info("Releasing reduce latch.");
+
+            latch.get("reduceAwaitLatch").countDown();
+
+            checkStatus(jobId, true);
+
+            assertEquals(10, mapExecCnt.get());
+            assertEquals(0, combineExecCnt.get());
+            assertEquals(1, reduceExecCnt.get());
+        }
+        finally {
+            // Safety.
+            latch.get("mapAwaitLatch").countDown();
+            latch.get("combineAwaitLatch").countDown();
+            latch.get("reduceAwaitLatch").countDown();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTaskWithCombinerPerMap() throws Exception {
+        try {
+            UUID globalId = UUID.randomUUID();
+
+            Job job = Job.getInstance();
+            setupFileSystems(job.getConfiguration());
+
+            job.setMapperClass(TestMapper.class);
+            job.setReducerClass(TestReducer.class);
+            job.setCombinerClass(TestCombiner.class);
+            job.setInputFormatClass(InFormat.class);
+
+            FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT + "2"));
+
+            HadoopJobId jobId = new HadoopJobId(globalId, 1);
+
+            grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+
+            checkStatus(jobId, false);
+
+            info("Releasing map latch.");
+
+            latch.get("mapAwaitLatch").countDown();
+
+            checkStatus(jobId, false);
+
+            // All maps are completed. We have a combiner, so no reducers should be executed
+            // before combiner latch is released.
+
+            U.sleep(50);
+
+            assertEquals(0, reduceExecCnt.get());
+
+            info("Releasing combiner latch.");
+
+            latch.get("combineAwaitLatch").countDown();
+
+            checkStatus(jobId, false);
+
+            info("Releasing reduce latch.");
+
+            latch.get("reduceAwaitLatch").countDown();
+
+            checkStatus(jobId, true);
+
+            assertEquals(10, mapExecCnt.get());
+            assertEquals(10, combineExecCnt.get());
+            assertEquals(1, reduceExecCnt.get());
+        }
+        finally {
+            // Safety.
+            latch.get("mapAwaitLatch").countDown();
+            latch.get("combineAwaitLatch").countDown();
+            latch.get("reduceAwaitLatch").countDown();
+        }
+    }
+
+    /**
+     * Checks job execution status.
+     *
+     * @param jobId Job ID.
+     * @param complete Completion status.
+     * @throws Exception If failed.
+     */
+    private void checkStatus(HadoopJobId jobId, boolean complete) throws Exception {
+        for (int i = 0; i < gridCount(); i++) {
+            IgniteKernal kernal = (IgniteKernal)grid(i);
+
+            Hadoop hadoop = kernal.hadoop();
+
+            HadoopJobStatus stat = hadoop.status(jobId);
+
+            assert stat != null;
+
+            IgniteInternalFuture<?> fut = hadoop.finishFuture(jobId);
+
+            if (!complete)
+                assertFalse(fut.isDone());
+            else {
+                info("Waiting for status future completion on node [idx=" + i + ", nodeId=" +
+                    kernal.getLocalNodeId() + ']');
+
+                fut.get();
+            }
+        }
+    }
+
+    /**
+     * Test input format
+     */
+    public static class InFormat extends InputFormat {
+
+        @Override public List<InputSplit> getSplits(JobContext ctx) throws IOException, InterruptedException {
+            List<InputSplit> res = new ArrayList<>(BLOCK_CNT);
+
+            for (int i = 0; i < BLOCK_CNT; i++)
+                try {
+                    res.add(new FileSplit(new Path(new URI("someFile")), i, i + 1, new String[] {"localhost"}));
+                }
+                catch (URISyntaxException e) {
+                    throw new IOException(e);
+                }
+
+            return res;
+        }
+
+        @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext ctx) throws IOException, InterruptedException {
+            return new RecordReader() {
+                @Override public void initialize(InputSplit split, TaskAttemptContext ctx) {
+                }
+
+                @Override public boolean nextKeyValue() {
+                    return false;
+                }
+
+                @Override public Object getCurrentKey() {
+                    return null;
+                }
+
+                @Override public Object getCurrentValue() {
+                    return null;
+                }
+
+                @Override public float getProgress() {
+                    return 0;
+                }
+
+                @Override public void close() {
+
+                }
+            };
+        }
+    }
+
+    /**
+     * Test mapper.
+     */
+    private static class TestMapper extends Mapper {
+        @Override public void run(Context ctx) throws IOException, InterruptedException {
+            System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId());
+
+            latch.get("mapAwaitLatch").await();
+
+            mapExecCnt.incrementAndGet();
+
+            System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId());
+        }
+    }
+
+    /**
+     * Test reducer.
+     */
+    private static class TestReducer extends Reducer {
+        @Override public void run(Context ctx) throws IOException, InterruptedException {
+            System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId());
+
+            latch.get("reduceAwaitLatch").await();
+
+            reduceExecCnt.incrementAndGet();
+
+            System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId());
+        }
+    }
+
+    /**
+     * Test combiner.
+     */
+    private static class TestCombiner extends Reducer {
+        @Override public void run(Context ctx) throws IOException, InterruptedException {
+            System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId());
+
+            latch.get("combineAwaitLatch").await();
+
+            combineExecCnt.incrementAndGet();
+
+            System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
new file mode 100644
index 0000000..b04deeb
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.util.UUID;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.impl.examples.HadoopWordCount1;
+import org.apache.ignite.internal.processors.hadoop.impl.examples.HadoopWordCount2;
+
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo;
+import static org.apache.ignite.internal.processors.hadoop.state.HadoopMapReduceEmbeddedSelfTestState.flags;
+
+/**
+ * Tests map-reduce execution with embedded mode.
+ */
+public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest {
+    /** {@inheritDoc} */
+    @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
+        HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+        // TODO: IGNITE-404: Uncomment when fixed.
+        //cfg.setExternalExecution(false);
+
+        return cfg;
+    }
+
+    /**
+     * Tests whole job execution with all phases in old and new versions of API with definition of custom
+     * Serialization, Partitioner and IO formats.
+     * @throws Exception If fails.
+     */
+    public void testMultiReducerWholeMapReduceExecution() throws Exception {
+        IgfsPath inDir = new IgfsPath(PATH_INPUT);
+
+        igfs.mkdirs(inDir);
+
+        IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input");
+
+        generateTestFile(inFile.toString(), "key1", 10000, "key2", 20000, "key3", 15000, "key4", 7000, "key5", 12000,
+            "key6", 18000 );
+
+        for (int i = 0; i < 2; i++) {
+            boolean useNewAPI = i == 1;
+
+            igfs.delete(new IgfsPath(PATH_OUTPUT), true);
+
+            flags.put("serializationWasConfigured", false);
+            flags.put("partitionerWasConfigured", false);
+            flags.put("inputFormatWasConfigured", false);
+            flags.put("outputFormatWasConfigured", false);
+
+            JobConf jobConf = new JobConf();
+
+            jobConf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName());
+
+            //To split into about 6-7 items for v2
+            jobConf.setInt(FileInputFormat.SPLIT_MAXSIZE, 65000);
+
+            //For v1
+            jobConf.setInt("fs.local.block.size", 65000);
+
+            // File system coordinates.
+            setupFileSystems(jobConf);
+
+            HadoopWordCount1.setTasksClasses(jobConf, !useNewAPI, !useNewAPI, !useNewAPI);
+
+            if (!useNewAPI) {
+                jobConf.setPartitionerClass(CustomV1Partitioner.class);
+                jobConf.setInputFormat(CustomV1InputFormat.class);
+                jobConf.setOutputFormat(CustomV1OutputFormat.class);
+            }
+
+            Job job = Job.getInstance(jobConf);
+
+            HadoopWordCount2.setTasksClasses(job, useNewAPI, useNewAPI, useNewAPI, false);
+
+            if (useNewAPI) {
+                job.setPartitionerClass(CustomV2Partitioner.class);
+                job.setInputFormatClass(CustomV2InputFormat.class);
+                job.setOutputFormatClass(CustomV2OutputFormat.class);
+            }
+
+            job.setOutputKeyClass(Text.class);
+            job.setOutputValueClass(IntWritable.class);
+
+            FileInputFormat.setInputPaths(job, new Path(igfsScheme() + inFile.toString()));
+            FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT));
+
+            job.setNumReduceTasks(3);
+
+            job.setJarByClass(HadoopWordCount2.class);
+
+            IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
+                    createJobInfo(job.getConfiguration()));
+
+            fut.get();
+
+            assertTrue("Serialization was configured (new API is " + useNewAPI + ")",
+                 flags.get("serializationWasConfigured"));
+
+            assertTrue("Partitioner was configured (new API is = " + useNewAPI + ")",
+                 flags.get("partitionerWasConfigured"));
+
+            assertTrue("Input format was configured (new API is = " + useNewAPI + ")",
+                 flags.get("inputFormatWasConfigured"));
+
+            assertTrue("Output format was configured (new API is = " + useNewAPI + ")",
+                 flags.get("outputFormatWasConfigured"));
+
+            assertEquals("Use new API = " + useNewAPI,
+                "key3\t15000\n" +
+                "key6\t18000\n",
+                readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00000")
+            );
+
+            assertEquals("Use new API = " + useNewAPI,
+                "key1\t10000\n" +
+                "key4\t7000\n",
+                readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00001")
+            );
+
+            assertEquals("Use new API = " + useNewAPI,
+                "key2\t20000\n" +
+                "key5\t12000\n",
+                readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00002")
+            );
+
+        }
+    }
+
+    /**
+     * Custom serialization class that inherits behaviour of native {@link WritableSerialization}.
+     */
+    protected static class CustomSerialization extends WritableSerialization {
+        @Override public void setConf(Configuration conf) {
+            super.setConf(conf);
+
+            flags.put("serializationWasConfigured", true);
+        }
+    }
+
+    /**
+     * Custom implementation of Partitioner in v1 API.
+     */
+    private static class CustomV1Partitioner extends org.apache.hadoop.mapred.lib.HashPartitioner {
+        /** {@inheritDoc} */
+        @Override public void configure(JobConf job) {
+            flags.put("partitionerWasConfigured", true);
+        }
+    }
+
+    /**
+     * Custom implementation of Partitioner in v2 API.
+     */
+    private static class CustomV2Partitioner extends org.apache.hadoop.mapreduce.lib.partition.HashPartitioner
+            implements Configurable {
+        /** {@inheritDoc} */
+        @Override public void setConf(Configuration conf) {
+            flags.put("partitionerWasConfigured", true);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Configuration getConf() {
+            return null;
+        }
+    }
+
+    /**
+     * Custom implementation of InputFormat in v2 API.
+     */
+    private static class CustomV2InputFormat extends org.apache.hadoop.mapreduce.lib.input.TextInputFormat implements Configurable {
+        /** {@inheritDoc} */
+        @Override public void setConf(Configuration conf) {
+            flags.put("inputFormatWasConfigured", true);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Configuration getConf() {
+            return null;
+        }
+    }
+
+    /**
+     * Custom implementation of OutputFormat in v2 API.
+     */
+    private static class CustomV2OutputFormat extends org.apache.hadoop.mapreduce.lib.output.TextOutputFormat implements Configurable {
+        /** {@inheritDoc} */
+        @Override public void setConf(Configuration conf) {
+            flags.put("outputFormatWasConfigured", true);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Configuration getConf() {
+            return null;
+        }
+    }
+
+    /**
+     * Custom implementation of InputFormat in v1 API.
+     */
+    private static class CustomV1InputFormat extends org.apache.hadoop.mapred.TextInputFormat {
+        /** {@inheritDoc} */
+        @Override public void configure(JobConf job) {
+            super.configure(job);
+
+            flags.put("inputFormatWasConfigured", true);
+        }
+    }
+
+    /**
+     * Custom implementation of OutputFormat in v1 API.
+     */
+    private static class CustomV1OutputFormat extends org.apache.hadoop.mapred.TextOutputFormat implements JobConfigurable {
+        /** {@inheritDoc} */
+        @Override public void configure(JobConf job) {
+            flags.put("outputFormatWasConfigured", true);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceErrorResilienceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceErrorResilienceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceErrorResilienceTest.java
new file mode 100644
index 0000000..afd6f26
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceErrorResilienceTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.processors.hadoop.impl.examples.HadoopWordCount2;
+
+/**
+ * Test of error resiliency after an error in a map-reduce job execution.
+ * Combinations tested:
+ * { new ALI, old API }
+ *   x { unchecked exception, checked exception, error }
+ *   x { phase where the error happens }.
+ */
+public class HadoopMapReduceErrorResilienceTest extends HadoopAbstractMapReduceTest {
+    /**
+     * Tests recovery.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRecoveryAfterAnError0_Runtime() throws Exception {
+        doTestRecoveryAfterAnError(0, HadoopErrorSimulator.Kind.Runtime);
+    }
+
+    /**
+     * Tests recovery.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRecoveryAfterAnError0_IOException() throws Exception {
+        doTestRecoveryAfterAnError(0, HadoopErrorSimulator.Kind.IOException);
+    }
+
+    /**
+     * Tests recovery.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRecoveryAfterAnError0_Error() throws Exception {
+        doTestRecoveryAfterAnError(0, HadoopErrorSimulator.Kind.Error);
+    }
+
+    /**
+     * Tests recovery.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRecoveryAfterAnError7_Runtime() throws Exception {
+        doTestRecoveryAfterAnError(7, HadoopErrorSimulator.Kind.Runtime);
+    }
+    /**
+     * Tests recovery.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRecoveryAfterAnError7_IOException() throws Exception {
+        doTestRecoveryAfterAnError(7, HadoopErrorSimulator.Kind.IOException);
+    }
+    /**
+     * Tests recovery.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRecoveryAfterAnError7_Error() throws Exception {
+        doTestRecoveryAfterAnError(7, HadoopErrorSimulator.Kind.Error);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 10 * 60 * 1000L;
+    }
+
+    /**
+     * Tests correct work after an error.
+     *
+     * @throws Exception On error.
+     */
+    private void doTestRecoveryAfterAnError(int useNewBits, HadoopErrorSimulator.Kind simulatorKind) throws Exception {
+        try {
+            IgfsPath inDir = new IgfsPath(PATH_INPUT);
+
+            igfs.mkdirs(inDir);
+
+            IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input");
+
+            generateTestFile(inFile.toString(), "red", red, "blue", blue, "green", green, "yellow", yellow);
+
+            boolean useNewMapper = (useNewBits & 1) == 0;
+            boolean useNewCombiner = (useNewBits & 2) == 0;
+            boolean useNewReducer = (useNewBits & 4) == 0;
+
+            for (int i = 0; i < 12; i++) {
+                int bits = 1 << i;
+
+                System.out.println("############################ Simulator kind = " + simulatorKind
+                    + ", Stage bits = " + bits);
+
+                HadoopErrorSimulator sim = HadoopErrorSimulator.create(simulatorKind, bits);
+
+                doTestWithErrorSimulator(sim, inFile, useNewMapper, useNewCombiner, useNewReducer);
+            }
+        } catch (Throwable t) {
+            t.printStackTrace();
+
+            fail("Unexpected throwable: " + t);
+        }
+    }
+
+    /**
+     * Performs test with given error simulator.
+     *
+     * @param sim The simulator.
+     * @param inFile Input file.
+     * @param useNewMapper If the use new mapper API.
+     * @param useNewCombiner If to use new combiner.
+     * @param useNewReducer If to use new reducer API.
+     * @throws Exception If failed.
+     */
+    private void doTestWithErrorSimulator(HadoopErrorSimulator sim, IgfsPath inFile, boolean useNewMapper,
+        boolean useNewCombiner, boolean useNewReducer) throws Exception {
+        // Set real simulating error simulator:
+        assertTrue(HadoopErrorSimulator.setInstance(HadoopErrorSimulator.noopInstance, sim));
+
+        try {
+            // Expect failure there:
+            doTest(inFile, useNewMapper, useNewCombiner, useNewReducer);
+        }
+        catch (Throwable t) { // This may be an Error.
+            // Expected:
+            System.out.println(t.toString()); // Ignore, continue the test.
+        }
+
+        // Set no-op error simulator:
+        assertTrue(HadoopErrorSimulator.setInstance(sim, HadoopErrorSimulator.noopInstance));
+
+        // Expect success there:
+        doTest(inFile, useNewMapper, useNewCombiner, useNewReducer);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceTest.java
new file mode 100644
index 0000000..feccb59
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.processors.hadoop.impl.examples.HadoopWordCount2;
+
+/**
+ * Test of whole cycle of map-reduce processing via Job tracker.
+ */
+public class HadoopMapReduceTest extends HadoopAbstractMapReduceTest {
+    /**
+     * Tests whole job execution with all phases in all combination of new and old versions of API.
+     * @throws Exception If fails.
+     */
+    public void testWholeMapReduceExecution() throws Exception {
+        IgfsPath inDir = new IgfsPath(PATH_INPUT);
+
+        igfs.mkdirs(inDir);
+
+        IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input");
+
+        generateTestFile(inFile.toString(), "red", red, "blue", blue, "green", green, "yellow", yellow );
+
+        for (boolean[] apiMode: getApiModes()) {
+            assert apiMode.length == 3;
+
+            boolean useNewMapper = apiMode[0];
+            boolean useNewCombiner = apiMode[1];
+            boolean useNewReducer = apiMode[2];
+
+            doTest(inFile, useNewMapper, useNewCombiner, useNewReducer);
+        }
+    }
+
+    /**
+     * Gets API mode combinations to be tested.
+     * Each boolean[] is { newMapper, newCombiner, newReducer } flag triplet.
+     *
+     * @return Arrays of booleans indicating API combinations to test.
+     */
+    protected boolean[][] getApiModes() {
+        return new boolean[][] {
+            { false, false, false },
+            { false, false, true },
+            { false, true,  false },
+            { true,  false, false },
+            { true,  true,  true },
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopNoHadoopMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopNoHadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopNoHadoopMapReduceTest.java
new file mode 100644
index 0000000..3bb8735
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopNoHadoopMapReduceTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+
+/**
+ * Test attempt to execute a map-reduce task while no Hadoop processor available.
+ */
+public class HadoopNoHadoopMapReduceTest extends HadoopMapReduceTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        c.setHadoopConfiguration(null);
+        c.setPeerClassLoadingEnabled(true);
+
+        return c;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void testWholeMapReduceExecution() throws Exception {
+        try {
+            super.testWholeMapReduceExecution();
+
+            fail("IllegalStateException expected.");
+        }
+        catch (IllegalStateException ignore) {
+            // No-op.
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java
new file mode 100644
index 0000000..220614c
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.hadoop.HadoopHelper;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Collection;
+import java.util.UUID;
+
+/**
+ * Mock job for planner tests.
+ */
+public class HadoopPlannerMockJob implements HadoopJob {
+    /** Input splits. */
+    private final Collection<HadoopInputSplit> splits;
+
+    /** Reducers count. */
+    private final int reducers;
+
+    /**
+     * Constructor.
+     *
+     * @param splits Input splits.
+     * @param reducers Reducers.
+     */
+    public HadoopPlannerMockJob(Collection<HadoopInputSplit> splits, int reducers) {
+        this.splits = splits;
+        this.reducers = reducers;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<HadoopInputSplit> input() throws IgniteCheckedException {
+        return splits;
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopJobInfo info() {
+        return new JobInfo(reducers);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopJobId id() {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopTaskContext getTaskContext(HadoopTaskInfo info) throws IgniteCheckedException {
+        throwUnsupported();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initialize(boolean external, UUID nodeId) throws IgniteCheckedException {
+        throwUnsupported();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void dispose(boolean external) throws IgniteCheckedException {
+        throwUnsupported();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepareTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException {
+        throwUnsupported();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cleanupTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException {
+        throwUnsupported();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cleanupStagingDirectory() {
+        throwUnsupported();
+    }
+
+    /**
+     * Throw {@link UnsupportedOperationException}.
+     */
+    private static void throwUnsupported() {
+        throw new UnsupportedOperationException("Should not be called!");
+    }
+
+    /**
+     * Mocked job info.
+     */
+    private static class JobInfo implements HadoopJobInfo {
+        /** Reducers. */
+        private final int reducers;
+
+        /**
+         * Constructor.
+         *
+         * @param reducers Reducers.
+         */
+        public JobInfo(int reducers) {
+            this.reducers = reducers;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int reducers() {
+            return reducers;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public String property(String name) {
+            throwUnsupported();
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasCombiner() {
+            throwUnsupported();
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasReducer() {
+            throwUnsupported();
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls, HadoopJobId jobId, IgniteLogger log,
+            @Nullable String[] libNames, HadoopHelper helper) throws IgniteCheckedException {
+            throwUnsupported();
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String jobName() {
+            throwUnsupported();
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String user() {
+            throwUnsupported();
+
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPopularWordsTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPopularWordsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPopularWordsTest.java
new file mode 100644
index 0000000..5a55430
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPopularWordsTest.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import com.google.common.collect.MinMaxPriorityQueue;
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Map.Entry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static com.google.common.collect.Maps.immutableEntry;
+import static com.google.common.collect.MinMaxPriorityQueue.orderedBy;
+import static java.util.Collections.reverseOrder;
+
+/**
+ * Hadoop-based 10 popular words example: all files in a given directory are tokenized and for each word longer than
+ * 3 characters the number of occurrences ins calculated. Finally, 10 words with the highest occurrence count are
+ * output.
+ *
+ * NOTE: in order to run this example on Windows please ensure that cygwin is installed and available in the system
+ * path.
+ */
+public class HadoopPopularWordsTest {
+    /** Ignite home. */
+    private static final String IGNITE_HOME = U.getIgniteHome();
+
+    /** The path to the input directory. ALl files in that directory will be processed. */
+    private static final Path BOOKS_LOCAL_DIR =
+        new Path("file:" + IGNITE_HOME, "modules/tests/java/org/apache/ignite/grid/hadoop/books");
+
+    /** The path to the output directory. THe result file will be written to this location. */
+    private static final Path RESULT_LOCAL_DIR =
+        new Path("file:" + IGNITE_HOME, "modules/tests/java/org/apache/ignite/grid/hadoop/output");
+
+    /** Popular books source dir in DFS. */
+    private static final Path BOOKS_DFS_DIR = new Path("tmp/word-count-example/in");
+
+    /** Popular books source dir in DFS. */
+    private static final Path RESULT_DFS_DIR = new Path("tmp/word-count-example/out");
+
+    /** Path to the distributed file system configuration. */
+    private static final String DFS_CFG = "examples/config/filesystem/core-site.xml";
+
+    /** Top N words to select **/
+    private static final int POPULAR_WORDS_CNT = 10;
+
+    /**
+     * For each token in the input string the mapper emits a {word, 1} pair.
+     */
+    private static class TokenizingMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
+        /** Constant value. */
+        private static final IntWritable ONE = new IntWritable(1);
+
+        /** The word converted into the Text. */
+        private Text word = new Text();
+
+        /**
+         * Emits a entry where the key is the word and the value is always 1.
+         *
+         * @param key the current position in the input file (not used here)
+         * @param val the text string
+         * @param ctx mapper context
+         * @throws IOException
+         * @throws InterruptedException
+         */
+        @Override protected void map(LongWritable key, Text val, Context ctx)
+            throws IOException, InterruptedException {
+            // Get the mapped object.
+            final String line = val.toString();
+
+            // Splits the given string to words.
+            final String[] words = line.split("[^a-zA-Z0-9]");
+
+            for (final String w : words) {
+                // Only emit counts for longer words.
+                if (w.length() <= 3)
+                    continue;
+
+                word.set(w);
+
+                // Write the word into the context with the initial count equals 1.
+                ctx.write(word, ONE);
+            }
+        }
+    }
+
+    /**
+     * The reducer uses a priority queue to rank the words based on its number of occurrences.
+     */
+    private static class TopNWordsReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
+        private MinMaxPriorityQueue<Entry<Integer, String>> q;
+
+        TopNWordsReducer() {
+            q = orderedBy(reverseOrder(new Comparator<Entry<Integer, String>>() {
+                @Override public int compare(Entry<Integer, String> o1, Entry<Integer, String> o2) {
+                    return o1.getKey().compareTo(o2.getKey());
+                }
+            })).expectedSize(POPULAR_WORDS_CNT).maximumSize(POPULAR_WORDS_CNT).create();
+        }
+
+        /**
+         * This method doesn't emit anything, but just keeps track of the top N words.
+         *
+         * @param key The word.
+         * @param vals The words counts.
+         * @param ctx Reducer context.
+         * @throws IOException If failed.
+         * @throws InterruptedException If failed.
+         */
+        @Override public void reduce(Text key, Iterable<IntWritable> vals, Context ctx) throws IOException,
+            InterruptedException {
+            int sum = 0;
+
+            for (IntWritable val : vals)
+                sum += val.get();
+
+            q.add(immutableEntry(sum, key.toString()));
+        }
+
+        /**
+         * This method is called after all the word entries have been processed. It writes the accumulated
+         * statistics to the job output file.
+         *
+         * @param ctx The job context.
+         * @throws IOException If failed.
+         * @throws InterruptedException If failed.
+         */
+        @Override protected void cleanup(Context ctx) throws IOException, InterruptedException {
+            IntWritable i = new IntWritable();
+
+            Text txt = new Text();
+
+            // iterate in desc order
+            while (!q.isEmpty()) {
+                Entry<Integer, String> e = q.removeFirst();
+
+                i.set(e.getKey());
+
+                txt.set(e.getValue());
+
+                ctx.write(txt, i);
+            }
+        }
+    }
+
+    /**
+     * Configures the Hadoop MapReduce job.
+     *
+     * @return Instance of the Hadoop MapRed job.
+     * @throws IOException If failed.
+     */
+    @SuppressWarnings("deprecation")
+    private Job createConfigBasedHadoopJob() throws IOException {
+        Job jobCfg = new Job();
+
+        Configuration cfg = jobCfg.getConfiguration();
+
+        // Use explicit configuration of distributed file system, if provided.
+        cfg.addResource(U.resolveIgniteUrl(DFS_CFG));
+
+        jobCfg.setJobName("HadoopPopularWordExample");
+        jobCfg.setJarByClass(HadoopPopularWordsTest.class);
+        jobCfg.setInputFormatClass(TextInputFormat.class);
+        jobCfg.setOutputKeyClass(Text.class);
+        jobCfg.setOutputValueClass(IntWritable.class);
+        jobCfg.setMapperClass(TokenizingMapper.class);
+        jobCfg.setReducerClass(TopNWordsReducer.class);
+
+        FileInputFormat.setInputPaths(jobCfg, BOOKS_DFS_DIR);
+        FileOutputFormat.setOutputPath(jobCfg, RESULT_DFS_DIR);
+
+        // Local job tracker allows the only task per wave, but text input format
+        // replaces it with the calculated value based on input split size option.
+        if ("local".equals(cfg.get("mapred.job.tracker", "local"))) {
+            // Split job into tasks using 32MB split size.
+            FileInputFormat.setMinInputSplitSize(jobCfg, 32 * 1024 * 1024);
+            FileInputFormat.setMaxInputSplitSize(jobCfg, Long.MAX_VALUE);
+        }
+
+        return jobCfg;
+    }
+
+    /**
+     * Runs the Hadoop job.
+     *
+     * @return {@code True} if succeeded, {@code false} otherwise.
+     * @throws Exception If failed.
+     */
+    private boolean runWordCountConfigBasedHadoopJob() throws Exception {
+        Job job = createConfigBasedHadoopJob();
+
+        // Distributed file system this job will work with.
+        FileSystem fs = FileSystem.get(job.getConfiguration());
+
+        X.println(">>> Using distributed file system: " + fs.getHomeDirectory());
+
+        // Prepare input and output job directories.
+        prepareDirectories(fs);
+
+        long time = System.currentTimeMillis();
+
+        // Run job.
+        boolean res = job.waitForCompletion(true);
+
+        X.println(">>> Job execution time: " + (System.currentTimeMillis() - time) / 1000 + " sec.");
+
+        // Move job results into local file system, so you can view calculated results.
+        publishResults(fs);
+
+        return res;
+    }
+
+    /**
+     * Prepare job's data: cleanup result directories that might have left over
+     * after previous runs, copy input files from the local file system into DFS.
+     *
+     * @param fs Distributed file system to use in job.
+     * @throws IOException If failed.
+     */
+    private void prepareDirectories(FileSystem fs) throws IOException {
+        X.println(">>> Cleaning up DFS result directory: " + RESULT_DFS_DIR);
+
+        fs.delete(RESULT_DFS_DIR, true);
+
+        X.println(">>> Cleaning up DFS input directory: " + BOOKS_DFS_DIR);
+
+        fs.delete(BOOKS_DFS_DIR, true);
+
+        X.println(">>> Copy local files into DFS input directory: " + BOOKS_DFS_DIR);
+
+        fs.copyFromLocalFile(BOOKS_LOCAL_DIR, BOOKS_DFS_DIR);
+    }
+
+    /**
+     * Publish job execution results into local file system, so you can view them.
+     *
+     * @param fs Distributed file sytem used in job.
+     * @throws IOException If failed.
+     */
+    private void publishResults(FileSystem fs) throws IOException {
+        X.println(">>> Cleaning up DFS input directory: " + BOOKS_DFS_DIR);
+
+        fs.delete(BOOKS_DFS_DIR, true);
+
+        X.println(">>> Cleaning up LOCAL result directory: " + RESULT_LOCAL_DIR);
+
+        fs.delete(RESULT_LOCAL_DIR, true);
+
+        X.println(">>> Moving job results into LOCAL result directory: " + RESULT_LOCAL_DIR);
+
+        fs.copyToLocalFile(true, RESULT_DFS_DIR, RESULT_LOCAL_DIR);
+    }
+
+    /**
+     * Executes a modified version of the Hadoop word count example. Here, in addition to counting the number of
+     * occurrences of the word in the source files, the N most popular words are selected.
+     *
+     * @param args None.
+     */
+    public static void main(String[] args) {
+        try {
+            new HadoopPopularWordsTest().runWordCountConfigBasedHadoopJob();
+        }
+        catch (Exception e) {
+            X.println(">>> Failed to run word count example: " + e.getMessage());
+        }
+
+        System.exit(0);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSerializationWrapperSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSerializationWrapperSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSerializationWrapperSelfTest.java
new file mode 100644
index 0000000..5ccc8ce
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSerializationWrapperSelfTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.util.Arrays;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.serializer.JavaSerialization;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
+import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopSerializationWrapper;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Test of wrapper of the native serialization.
+ */
+public class HadoopSerializationWrapperSelfTest extends GridCommonAbstractTest {
+    /**
+     * Tests read/write of IntWritable via native WritableSerialization.
+     * @throws Exception If fails.
+     */
+    public void testIntWritableSerialization() throws Exception {
+        HadoopSerialization ser = new HadoopSerializationWrapper(new WritableSerialization(), IntWritable.class);
+
+        ByteArrayOutputStream buf = new ByteArrayOutputStream();
+
+        DataOutput out = new DataOutputStream(buf);
+
+        ser.write(out, new IntWritable(3));
+        ser.write(out, new IntWritable(-5));
+
+        assertEquals("[0, 0, 0, 3, -1, -1, -1, -5]", Arrays.toString(buf.toByteArray()));
+
+        DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray()));
+
+        assertEquals(3, ((IntWritable)ser.read(in, null)).get());
+        assertEquals(-5, ((IntWritable)ser.read(in, null)).get());
+    }
+
+    /**
+     * Tests read/write of Integer via native JavaleSerialization.
+     * @throws Exception If fails.
+     */
+    public void testIntJavaSerialization() throws Exception {
+        HadoopSerialization ser = new HadoopSerializationWrapper(new JavaSerialization(), Integer.class);
+
+        ByteArrayOutputStream buf = new ByteArrayOutputStream();
+
+        DataOutput out = new DataOutputStream(buf);
+
+        ser.write(out, 3);
+        ser.write(out, -5);
+        ser.close();
+
+        DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray()));
+
+        assertEquals(3, ((Integer)ser.read(in, null)).intValue());
+        assertEquals(-5, ((Integer)ser.read(in, null)).intValue());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSnappyFullMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSnappyFullMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSnappyFullMapReduceTest.java
new file mode 100644
index 0000000..e27c212
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSnappyFullMapReduceTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+/**
+ * Same test as HadoopMapReduceTest, but with enabled Snappy output compression.
+ */
+public class HadoopSnappyFullMapReduceTest extends HadoopMapReduceTest {
+    /** {@inheritDoc} */
+    @Override protected boolean compressOutputSnappy() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean[][] getApiModes() {
+        return new boolean[][] {
+            { false, false, true },
+            { true, true, true },
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSnappyTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSnappyTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSnappyTest.java
new file mode 100644
index 0000000..80ff754
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSnappyTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.Arrays;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.SnappyCodec;
+import org.apache.hadoop.io.compress.snappy.SnappyCompressor;
+import org.apache.hadoop.util.NativeCodeLoader;
+import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
+import org.apache.ignite.internal.processors.hadoop.HadoopHelperImpl;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests isolated Hadoop Snappy codec usage.
+ */
+public class HadoopSnappyTest extends GridCommonAbstractTest {
+    /** Length of data. */
+    private static final int BYTE_SIZE = 1024 * 50;
+
+    /**
+     * Checks Snappy codec usage.
+     *
+     * @throws Exception On error.
+     */
+    public void testSnappy() throws Throwable {
+        // Run Snappy test in default class loader:
+        checkSnappy();
+
+        // Run the same in several more class loaders simulating jobs and tasks:
+        for (int i = 0; i < 2; i++) {
+            ClassLoader hadoopClsLdr = new HadoopClassLoader(null, "cl-" + i, null, new HadoopHelperImpl());
+
+            Class<?> cls = (Class)Class.forName(HadoopSnappyTest.class.getName(), true, hadoopClsLdr);
+
+            assertEquals(hadoopClsLdr, cls.getClassLoader());
+
+            U.invoke(cls, null, "checkSnappy");
+        }
+    }
+
+    /**
+     * Internal check routine.
+     *
+     * @throws Throwable If failed.
+     */
+    public static void checkSnappy() throws Throwable {
+        try {
+            byte[] expBytes = new byte[BYTE_SIZE];
+            byte[] actualBytes = new byte[BYTE_SIZE];
+
+            for (int i = 0; i < expBytes.length ; i++)
+                expBytes[i] = (byte)ThreadLocalRandom.current().nextInt(16);
+
+            SnappyCodec codec = new SnappyCodec();
+
+            codec.setConf(new Configuration());
+
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+            try (CompressionOutputStream cos = codec.createOutputStream(baos)) {
+                cos.write(expBytes);
+                cos.flush();
+            }
+
+            try (CompressionInputStream cis = codec.createInputStream(new ByteArrayInputStream(baos.toByteArray()))) {
+                int read = cis.read(actualBytes, 0, actualBytes.length);
+
+                assert read == actualBytes.length;
+            }
+
+            assert Arrays.equals(expBytes, actualBytes);
+        }
+        catch (Throwable e) {
+            System.out.println("Snappy check failed:");
+            System.out.println("### NativeCodeLoader.isNativeCodeLoaded:  " + NativeCodeLoader.isNativeCodeLoaded());
+            System.out.println("### SnappyCompressor.isNativeCodeLoaded:  " + SnappyCompressor.isNativeCodeLoaded());
+
+            throw e;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingExternalTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingExternalTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingExternalTest.java
new file mode 100644
index 0000000..eb4a7d4
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingExternalTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+
+/**
+ * External test for sorting.
+ */
+public class HadoopSortingExternalTest extends HadoopSortingTest {
+    /** {@inheritDoc} */
+    @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
+        HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+        // TODO: IGNITE-404: Uncomment when fixed.
+        //cfg.setExternalExecution(true);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setMarshaller(new JdkMarshaller());
+
+        return cfg;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4c1739ce/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java
new file mode 100644
index 0000000..a4e7368
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Scanner;
+import java.util.UUID;
+import org.apache.hadoop.fs.AbstractFileSystem;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.JavaSerialization;
+import org.apache.hadoop.io.serializer.JavaSerializationComparator;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.util.typedef.X;
+
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo;
+
+/**
+ * Tests correct sorting.
+ */
+public class HadoopSortingTest extends HadoopAbstractSelfTest {
+    /** */
+    private static final String PATH_INPUT = "/test-in";
+
+    /** */
+    private static final String PATH_OUTPUT = "/test-out";
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 3;
+    }
+
+    /**
+     * @return {@code True} if IGFS is enabled on Hadoop nodes.
+     */
+    @Override protected boolean igfsEnabled() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        startGrids(gridCount());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
+        HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+        // TODO: IGNITE-404: Uncomment when fixed.
+        //cfg.setExternalExecution(false);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSortSimple() throws Exception {
+        // Generate test data.
+        Job job = Job.getInstance();
+
+        job.setInputFormatClass(InFormat.class);
+
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(NullWritable.class);
+
+        job.setMapperClass(Mapper.class);
+        job.setNumReduceTasks(0);
+
+        setupFileSystems(job.getConfiguration());
+
+        FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_INPUT));
+
+        X.printerrln("Data generation started.");
+
+        grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
+            createJobInfo(job.getConfiguration())).get(180000);
+
+        X.printerrln("Data generation complete.");
+
+        // Run main map-reduce job.
+        job = Job.getInstance();
+
+        setupFileSystems(job.getConfiguration());
+
+        job.getConfiguration().set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, JavaSerialization.class.getName() +
+            "," + WritableSerialization.class.getName());
+
+        FileInputFormat.setInputPaths(job, new Path(igfsScheme() + PATH_INPUT));
+        FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT));
+
+        job.setSortComparatorClass(JavaSerializationComparator.class);
+
+        job.setMapperClass(MyMapper.class);
+        job.setReducerClass(MyReducer.class);
+
+        job.setNumReduceTasks(2);
+
+        job.setMapOutputKeyClass(UUID.class);
+        job.setMapOutputValueClass(NullWritable.class);
+
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(NullWritable.class);
+
+        X.printerrln("Job started.");
+
+        grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 2),
+            createJobInfo(job.getConfiguration())).get(180000);
+
+        X.printerrln("Job complete.");
+
+        // Check result.
+        Path outDir = new Path(igfsScheme() + PATH_OUTPUT);
+
+        AbstractFileSystem fs = AbstractFileSystem.get(new URI(igfsScheme()), job.getConfiguration());
+
+        for (FileStatus file : fs.listStatus(outDir)) {
+            X.printerrln("__ file: " + file);
+
+            if (file.getLen() == 0)
+                continue;
+
+            FSDataInputStream in = fs.open(file.getPath());
+
+            Scanner sc = new Scanner(in);
+
+            UUID prev = null;
+
+            while(sc.hasNextLine()) {
+                UUID next = UUID.fromString(sc.nextLine());
+
+//                X.printerrln("___ check: " + next);
+
+                if (prev != null)
+                    assertTrue(prev.compareTo(next) < 0);
+
+                prev = next;
+            }
+        }
+    }
+
+    public static class InFormat extends InputFormat<Text, NullWritable> {
+        /** {@inheritDoc} */
+        @Override public List<InputSplit> getSplits(JobContext ctx) throws IOException, InterruptedException {
+            List<InputSplit> res = new ArrayList<>();
+
+            FakeSplit split = new FakeSplit(20);
+
+            for (int i = 0; i < 10; i++)
+                res.add(split);
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public RecordReader<Text, NullWritable> createRecordReader(final InputSplit split,
+            TaskAttemptContext ctx) throws IOException, InterruptedException {
+            return new RecordReader<Text, NullWritable>() {
+                /** */
+                int cnt;
+
+                /** */
+                Text txt = new Text();
+
+                @Override public void initialize(InputSplit split, TaskAttemptContext ctx) {
+                    // No-op.
+                }
+
+                @Override public boolean nextKeyValue() throws IOException, InterruptedException {
+                    return ++cnt <= split.getLength();
+                }
+
+                @Override public Text getCurrentKey() {
+                    txt.set(UUID.randomUUID().toString());
+
+//                    X.printerrln("___ read: " + txt);
+
+                    return txt;
+                }
+
+                @Override public NullWritable getCurrentValue() {
+                    return NullWritable.get();
+                }
+
+                @Override public float getProgress() throws IOException, InterruptedException {
+                    return (float)cnt / split.getLength();
+                }
+
+                @Override public void close() {
+                    // No-op.
+                }
+            };
+        }
+    }
+
+    public static class MyMapper extends Mapper<LongWritable, Text, UUID, NullWritable> {
+        /** {@inheritDoc} */
+        @Override protected void map(LongWritable key, Text val, Context ctx) throws IOException, InterruptedException {
+//            X.printerrln("___ map: " + val);
+
+            ctx.write(UUID.fromString(val.toString()), NullWritable.get());
+        }
+    }
+
+    public static class MyReducer extends Reducer<UUID, NullWritable, Text, NullWritable> {
+        /** */
+        private Text text = new Text();
+
+        /** {@inheritDoc} */
+        @Override protected void reduce(UUID key, Iterable<NullWritable> vals, Context ctx)
+            throws IOException, InterruptedException {
+//            X.printerrln("___ rdc: " + key);
+
+            text.set(key.toString());
+
+            ctx.write(text, NullWritable.get());
+        }
+    }
+
+    public static class FakeSplit extends InputSplit implements Writable {
+        /** */
+        private static final String[] HOSTS = {"127.0.0.1"};
+
+        /** */
+        private int len;
+
+        /**
+         * @param len Length.
+         */
+        public FakeSplit(int len) {
+            this.len = len;
+        }
+
+        /**
+         *
+         */
+        public FakeSplit() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public long getLength() throws IOException, InterruptedException {
+            return len;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String[] getLocations() throws IOException, InterruptedException {
+            return HOSTS;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(DataOutput out) throws IOException {
+            out.writeInt(len);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readFields(DataInput in) throws IOException {
+            len = in.readInt();
+        }
+    }
+}
\ No newline at end of file