You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/03/03 15:14:20 UTC

[02/13] incubator-ignite git commit: # IGNITE-386: Moving core classes (6).

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java
index 040730b..ee490be 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java
@@ -29,7 +29,7 @@ import java.util.concurrent.*;
 /**
  * Self test of {@link org.apache.ignite.internal.processors.hadoop.v2.HadoopSplitWrapper}.
  */
-public class HadoopSplitWrapperSelfTest extends GridHadoopAbstractSelfTest {
+public class HadoopSplitWrapperSelfTest extends HadoopAbstractSelfTest {
     /**
      * Tests serialization of wrapper and the wrapped native split.
      * @throws Exception If fails.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
new file mode 100644
index 0000000..a489f28
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
@@ -0,0 +1,551 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.*;
+import org.apache.hadoop.mapreduce.lib.output.*;
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.hadoop.fs.v1.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.testframework.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
+
+/**
+ * Tests map-reduce task execution basics.
+ */
+public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
+    /** */
+    private static GridHadoopSharedMap m = GridHadoopSharedMap.map(HadoopTaskExecutionSelfTest.class);
+
+    /** Line count. */
+    private static final AtomicInteger totalLineCnt = m.put("totalLineCnt", new AtomicInteger());
+
+    /** Executed tasks. */
+    private static final AtomicInteger executedTasks = m.put("executedTasks", new AtomicInteger());
+
+    /** Cancelled tasks. */
+    private static final AtomicInteger cancelledTasks = m.put("cancelledTasks", new AtomicInteger());
+
+    /** Working directory of each task. */
+    private static final Map<String, String> taskWorkDirs = m.put("taskWorkDirs",
+        new ConcurrentHashMap<String, String>());
+
+    /** Mapper id to fail. */
+    private static final AtomicInteger failMapperId = m.put("failMapperId", new AtomicInteger());
+
+    /** Number of splits of the current input. */
+    private static final AtomicInteger splitsCount = m.put("splitsCount", new AtomicInteger());
+
+    /** Test param. */
+    private static final String MAP_WRITE = "test.map.write";
+
+
+    /** {@inheritDoc} */
+    @Override public IgfsConfiguration igfsConfiguration() {
+        IgfsConfiguration cfg = super.igfsConfiguration();
+
+        cfg.setFragmentizerEnabled(false);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean igfsEnabled() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(gridCount());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        grid(0).fileSystem(igfsName).format();
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) {
+        GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+        cfg.setMaxParallelTasks(5);
+        cfg.setExternalExecution(false);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMapRun() throws Exception {
+        int lineCnt = 10000;
+        String fileName = "/testFile";
+
+        prepareFile(fileName, lineCnt);
+
+        totalLineCnt.set(0);
+        taskWorkDirs.clear();
+
+        Configuration cfg = new Configuration();
+
+        cfg.setStrings("fs.igfs.impl", IgniteHadoopFileSystem.class.getName());
+
+        Job job = Job.getInstance(cfg);
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(IntWritable.class);
+
+        job.setMapperClass(TestMapper.class);
+
+        job.setNumReduceTasks(0);
+
+        job.setInputFormatClass(TextInputFormat.class);
+
+        FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/"));
+        FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output/"));
+
+        job.setJarByClass(getClass());
+
+        IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 1),
+                createJobInfo(job.getConfiguration()));
+
+        fut.get();
+
+        assertEquals(lineCnt, totalLineCnt.get());
+
+        assertEquals(32, taskWorkDirs.size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMapCombineRun() throws Exception {
+        int lineCnt = 10001;
+        String fileName = "/testFile";
+
+        prepareFile(fileName, lineCnt);
+
+        totalLineCnt.set(0);
+        taskWorkDirs.clear();
+
+        Configuration cfg = new Configuration();
+
+        cfg.setStrings("fs.igfs.impl", IgniteHadoopFileSystem.class.getName());
+        cfg.setBoolean(MAP_WRITE, true);
+
+        Job job = Job.getInstance(cfg);
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(IntWritable.class);
+
+        job.setMapperClass(TestMapper.class);
+        job.setCombinerClass(TestCombiner.class);
+        job.setReducerClass(TestReducer.class);
+
+        job.setNumReduceTasks(2);
+
+        job.setInputFormatClass(TextInputFormat.class);
+
+        FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/"));
+        FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output"));
+
+        job.setJarByClass(getClass());
+
+        GridHadoopJobId jobId = new GridHadoopJobId(UUID.randomUUID(), 2);
+
+        IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+
+        fut.get();
+
+        assertEquals(lineCnt, totalLineCnt.get());
+
+        assertEquals(34, taskWorkDirs.size());
+
+        for (int g = 0; g < gridCount(); g++)
+            grid(g).hadoop().finishFuture(jobId).get();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMapperException() throws Exception {
+        prepareFile("/testFile", 1000);
+
+        Configuration cfg = new Configuration();
+
+        cfg.setStrings("fs.igfs.impl", IgniteHadoopFileSystem.class.getName());
+
+        Job job = Job.getInstance(cfg);
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(IntWritable.class);
+
+        job.setMapperClass(FailMapper.class);
+
+        job.setNumReduceTasks(0);
+
+        job.setInputFormatClass(TextInputFormat.class);
+
+        FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/"));
+        FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output/"));
+
+        job.setJarByClass(getClass());
+
+        final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 3),
+                createJobInfo(job.getConfiguration()));
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                fut.get();
+
+                return null;
+            }
+        }, IgniteCheckedException.class, null);
+    }
+
+    /**
+     * @param fileName File name.
+     * @param lineCnt Line count.
+     * @throws Exception If failed.
+     */
+    private void prepareFile(String fileName, int lineCnt) throws Exception {
+        IgniteFs igfs = grid(0).fileSystem(igfsName);
+
+        try (OutputStream os = igfs.create(new IgfsPath(fileName), true)) {
+            PrintWriter w = new PrintWriter(new OutputStreamWriter(os));
+
+            for (int i = 0; i < lineCnt; i++)
+                w.print("Hello, Hadoop map-reduce!\n");
+
+            w.flush();
+        }
+    }
+
+    /**
+     * Prepare job with mappers to cancel.
+     * @return Fully configured job.
+     * @throws Exception If fails.
+     */
+    private Configuration prepareJobForCancelling() throws Exception {
+        prepareFile("/testFile", 1500);
+
+        executedTasks.set(0);
+        cancelledTasks.set(0);
+        failMapperId.set(0);
+        splitsCount.set(0);
+
+        Configuration cfg = new Configuration();
+
+        setupFileSystems(cfg);
+
+        Job job = Job.getInstance(cfg);
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(IntWritable.class);
+
+        job.setMapperClass(CancellingTestMapper.class);
+
+        job.setNumReduceTasks(0);
+
+        job.setInputFormatClass(InFormat.class);
+
+        FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/"));
+        FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output/"));
+
+        job.setJarByClass(getClass());
+
+        return job.getConfiguration();
+    }
+
+    /**
+     * Test input format.
+     */
+    private static class InFormat extends TextInputFormat {
+        @Override public List<InputSplit> getSplits(JobContext ctx) throws IOException {
+            List<InputSplit> res = super.getSplits(ctx);
+
+            splitsCount.set(res.size());
+
+            X.println("___ split of input: " + splitsCount.get());
+
+            return res;
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTaskCancelling() throws Exception {
+        Configuration cfg = prepareJobForCancelling();
+
+        GridHadoopJobId jobId = new GridHadoopJobId(UUID.randomUUID(), 1);
+
+        final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(cfg));
+
+        if (!GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return splitsCount.get() > 0;
+            }
+        }, 20000)) {
+            U.dumpThreads(log);
+
+            assertTrue(false);
+        }
+
+        if (!GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return executedTasks.get() == splitsCount.get();
+            }
+        }, 20000)) {
+            U.dumpThreads(log);
+
+            assertTrue(false);
+        }
+
+        // Fail mapper with id "1", cancels others
+        failMapperId.set(1);
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                fut.get();
+
+                return null;
+            }
+        }, IgniteCheckedException.class, null);
+
+        assertEquals(executedTasks.get(), cancelledTasks.get() + 1);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJobKill() throws Exception {
+        Configuration cfg = prepareJobForCancelling();
+
+        Hadoop hadoop = grid(0).hadoop();
+
+        GridHadoopJobId jobId = new GridHadoopJobId(UUID.randomUUID(), 1);
+
+        //Kill unknown job.
+        boolean killRes = hadoop.kill(jobId);
+
+        assertFalse(killRes);
+
+        final IgniteInternalFuture<?> fut = hadoop.submit(jobId, createJobInfo(cfg));
+
+        if (!GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return splitsCount.get() > 0;
+            }
+        }, 20000)) {
+            U.dumpThreads(log);
+
+            assertTrue(false);
+        }
+
+        if (!GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                X.println("___ executed tasks: " + executedTasks.get());
+
+                return executedTasks.get() == splitsCount.get();
+            }
+        }, 20000)) {
+            U.dumpThreads(log);
+
+            fail();
+        }
+
+        //Kill really ran job.
+        killRes = hadoop.kill(jobId);
+
+        assertTrue(killRes);
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                fut.get();
+
+                return null;
+            }
+        }, IgniteCheckedException.class, null);
+
+        assertEquals(executedTasks.get(), cancelledTasks.get());
+
+        //Kill the same job again.
+        killRes = hadoop.kill(jobId);
+
+        assertTrue(killRes);
+    }
+
+    private static class CancellingTestMapper extends Mapper<Object, Text, Text, IntWritable> {
+        private int mapperId;
+
+        /** {@inheritDoc} */
+        @Override protected void setup(Context ctx) throws IOException, InterruptedException {
+            mapperId = executedTasks.incrementAndGet();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run(Context ctx) throws IOException, InterruptedException {
+            try {
+                super.run(ctx);
+            }
+            catch (HadoopTaskCancelledException e) {
+                cancelledTasks.incrementAndGet();
+
+                throw e;
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException {
+            if (mapperId == failMapperId.get())
+                throw new IOException();
+
+            Thread.sleep(1000);
+        }
+    }
+
+    /**
+     * Test failing mapper.
+     */
+    private static class FailMapper extends Mapper<Object, Text, Text, IntWritable> {
+        /** {@inheritDoc} */
+        @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException {
+            throw new IOException("Expected");
+        }
+    }
+
+    /**
+     * Mapper calculates number of lines.
+     */
+    private static class TestMapper extends Mapper<Object, Text, Text, IntWritable> {
+        /** Writable integer constant of '1'. */
+        private static final IntWritable ONE = new IntWritable(1);
+
+        /** Line count constant. */
+        public static final Text LINE_COUNT = new Text("lineCount");
+
+        /** {@inheritDoc} */
+        @Override protected void setup(Context ctx) throws IOException, InterruptedException {
+            X.println("___ Mapper: " + ctx.getTaskAttemptID());
+
+            String taskId = ctx.getTaskAttemptID().toString();
+
+            LocalFileSystem locFs = FileSystem.getLocal(ctx.getConfiguration());
+
+            String workDir = locFs.getWorkingDirectory().toString();
+
+            assertNull(taskWorkDirs.put(workDir, taskId));
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException {
+            if (ctx.getConfiguration().getBoolean(MAP_WRITE, false))
+                ctx.write(LINE_COUNT, ONE);
+            else
+                totalLineCnt.incrementAndGet();
+        }
+    }
+
+    /**
+     * Combiner calculates number of lines.
+     */
+    private static class TestCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
+        /** */
+        IntWritable sum = new IntWritable();
+
+        /** {@inheritDoc} */
+        @Override protected void setup(Context ctx) throws IOException, InterruptedException {
+            X.println("___ Combiner: ");
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException,
+            InterruptedException {
+            int lineCnt = 0;
+
+            for (IntWritable value : values)
+                lineCnt += value.get();
+
+            sum.set(lineCnt);
+
+            X.println("___ combo: " + lineCnt);
+
+            ctx.write(key, sum);
+        }
+    }
+
+    /**
+     * Combiner calculates number of lines.
+     */
+    private static class TestReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
+        /** */
+        IntWritable sum = new IntWritable();
+
+        /** {@inheritDoc} */
+        @Override protected void setup(Context ctx) throws IOException, InterruptedException {
+            X.println("___ Reducer: " + ctx.getTaskAttemptID());
+
+            String taskId = ctx.getTaskAttemptID().toString();
+            String workDir = FileSystem.getLocal(ctx.getConfiguration()).getWorkingDirectory().toString();
+
+            assertNull(taskWorkDirs.put(workDir, taskId));
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException,
+            InterruptedException {
+            int lineCnt = 0;
+
+            for (IntWritable value : values) {
+                lineCnt += value.get();
+
+                X.println("___ rdcr: " + value.get());
+            }
+
+            sum.set(lineCnt);
+
+            ctx.write(key, sum);
+
+            X.println("___ RDCR SUM: " + lineCnt);
+
+            totalLineCnt.addAndGet(lineCnt);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
new file mode 100644
index 0000000..265890d
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import com.google.common.base.*;
+import org.apache.hadoop.io.*;
+import org.apache.ignite.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.processors.hadoop.examples.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * Tests of Map, Combine and Reduce task executions of any version of hadoop API.
+ */
+abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
+    /** Empty hosts array. */
+    private static final String[] HOSTS = new String[0];
+
+    /**
+     * Creates some grid hadoop job. Override this method to create tests for any job implementation.
+     *
+     * @param inFile Input file name for the job.
+     * @param outFile Output file name for the job.
+     * @return Hadoop job.
+     * @throws IOException If fails.
+     */
+    public abstract HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception;
+
+    /**
+     * @return prefix of reducer output file name. It's "part-" for v1 and "part-r-" for v2 API
+     */
+    public abstract String getOutputFileNamePrefix();
+
+    /**
+     * Tests map task execution.
+     *
+     * @throws Exception If fails.
+     */
+    @SuppressWarnings("ConstantConditions")
+    public void testMapTask() throws Exception {
+        IgfsPath inDir = new IgfsPath(PATH_INPUT);
+
+        igfs.mkdirs(inDir);
+
+        IgfsPath inFile = new IgfsPath(inDir, GridHadoopWordCount2.class.getSimpleName() + "-input");
+
+        URI inFileUri = URI.create(igfsScheme() + inFile.toString());
+
+        try (PrintWriter pw = new PrintWriter(igfs.create(inFile, true))) {
+            pw.println("hello0 world0");
+            pw.println("world1 hello1");
+        }
+
+        HadoopFileBlock fileBlock1 = new HadoopFileBlock(HOSTS, inFileUri, 0, igfs.info(inFile).length() - 1);
+
+        try (PrintWriter pw = new PrintWriter(igfs.append(inFile, false))) {
+            pw.println("hello2 world2");
+            pw.println("world3 hello3");
+        }
+        HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, fileBlock1.length(),
+                igfs.info(inFile).length() - fileBlock1.length());
+
+        HadoopV2Job gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT);
+
+        GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock1);
+
+        HadoopTestTaskContext ctx = new HadoopTestTaskContext(taskInfo, gridJob);
+
+        ctx.mockOutput().clear();
+
+        ctx.run();
+
+        assertEquals("hello0,1; world0,1; world1,1; hello1,1", Joiner.on("; ").join(ctx.mockOutput()));
+
+        ctx.mockOutput().clear();
+
+        ctx.taskInfo(new GridHadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock2));
+
+        ctx.run();
+
+        assertEquals("hello2,1; world2,1; world3,1; hello3,1", Joiner.on("; ").join(ctx.mockOutput()));
+    }
+
+    /**
+     * Generates input data for reduce-like operation into mock context input and runs the operation.
+     *
+     * @param gridJob Job is to create reduce task from.
+     * @param taskType Type of task - combine or reduce.
+     * @param taskNum Number of task in job.
+     * @param words Pairs of words and its counts.
+     * @return Context with mock output.
+     * @throws IgniteCheckedException If fails.
+     */
+    private HadoopTestTaskContext runTaskWithInput(HadoopV2Job gridJob, HadoopTaskType taskType,
+        int taskNum, String... words) throws IgniteCheckedException {
+        GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(taskType, gridJob.id(), taskNum, 0, null);
+
+        HadoopTestTaskContext ctx = new HadoopTestTaskContext(taskInfo, gridJob);
+
+        for (int i = 0; i < words.length; i+=2) {
+            List<IntWritable> valList = new ArrayList<>();
+
+            for (int j = 0; j < Integer.parseInt(words[i + 1]); j++)
+                valList.add(new IntWritable(1));
+
+            ctx.mockInput().put(new Text(words[i]), valList);
+        }
+
+        ctx.run();
+
+        return ctx;
+    }
+
+    /**
+     * Tests reduce task execution.
+     *
+     * @throws Exception If fails.
+     */
+    public void testReduceTask() throws Exception {
+        HadoopV2Job gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT);
+
+        runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 0, "word1", "5", "word2", "10");
+        runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 1, "word3", "7", "word4", "15");
+
+        assertEquals(
+            "word1\t5\n" +
+            "word2\t10\n",
+            readAndSortFile(PATH_OUTPUT + "/_temporary/0/task_00000000-0000-0000-0000-000000000000_0000_r_000000/" +
+                getOutputFileNamePrefix() + "00000")
+        );
+
+        assertEquals(
+            "word3\t7\n" +
+            "word4\t15\n",
+            readAndSortFile(PATH_OUTPUT + "/_temporary/0/task_00000000-0000-0000-0000-000000000000_0000_r_000001/" +
+                getOutputFileNamePrefix() + "00001")
+        );
+    }
+
+    /**
+     * Tests combine task execution.
+     *
+     * @throws Exception If fails.
+     */
+    public void testCombinerTask() throws Exception {
+        HadoopV2Job gridJob = getHadoopJob("/", "/");
+
+        HadoopTestTaskContext ctx =
+            runTaskWithInput(gridJob, HadoopTaskType.COMBINE, 0, "word1", "5", "word2", "10");
+
+        assertEquals("word1,5; word2,10", Joiner.on("; ").join(ctx.mockOutput()));
+
+        ctx = runTaskWithInput(gridJob, HadoopTaskType.COMBINE, 1, "word3", "7", "word4", "15");
+
+        assertEquals("word3,7; word4,15", Joiner.on("; ").join(ctx.mockOutput()));
+    }
+
+    /**
+     * Runs chain of map-combine task on file block.
+     *
+     * @param fileBlock block of input file to be processed.
+     * @param gridJob Hadoop job implementation.
+     * @return Context of combine task with mock output.
+     * @throws IgniteCheckedException If fails.
+     */
+    private HadoopTestTaskContext runMapCombineTask(HadoopFileBlock fileBlock, HadoopV2Job gridJob)
+        throws IgniteCheckedException {
+        GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock);
+
+        HadoopTestTaskContext mapCtx = new HadoopTestTaskContext(taskInfo, gridJob);
+
+        mapCtx.run();
+
+        //Prepare input for combine
+        taskInfo = new GridHadoopTaskInfo(HadoopTaskType.COMBINE, gridJob.id(), 0, 0, null);
+
+        HadoopTestTaskContext combineCtx = new HadoopTestTaskContext(taskInfo, gridJob);
+
+        combineCtx.makeTreeOfWritables(mapCtx.mockOutput());
+
+        combineCtx.run();
+
+        return combineCtx;
+    }
+
+    /**
+     * Tests all job in complex.
+     * Runs 2 chains of map-combine tasks and sends result into one reduce task.
+     *
+     * @throws Exception If fails.
+     */
+    @SuppressWarnings("ConstantConditions")
+    public void testAllTasks() throws Exception {
+        IgfsPath inDir = new IgfsPath(PATH_INPUT);
+
+        igfs.mkdirs(inDir);
+
+        IgfsPath inFile = new IgfsPath(inDir, GridHadoopWordCount2.class.getSimpleName() + "-input");
+
+        URI inFileUri = URI.create(igfsScheme() + inFile.toString());
+
+        generateTestFile(inFile.toString(), "red", 100, "blue", 200, "green", 150, "yellow", 70);
+
+        //Split file into two blocks
+        long fileLen = igfs.info(inFile).length();
+
+        Long l = fileLen / 2;
+
+        HadoopFileBlock fileBlock1 = new HadoopFileBlock(HOSTS, inFileUri, 0, l);
+        HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, l, fileLen - l);
+
+        HadoopV2Job gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT);
+
+        HadoopTestTaskContext combine1Ctx = runMapCombineTask(fileBlock1, gridJob);
+
+        HadoopTestTaskContext combine2Ctx = runMapCombineTask(fileBlock2, gridJob);
+
+        //Prepare input for combine
+        GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(HadoopTaskType.REDUCE, gridJob.id(), 0, 0, null);
+
+        HadoopTestTaskContext reduceCtx = new HadoopTestTaskContext(taskInfo, gridJob);
+
+        reduceCtx.makeTreeOfWritables(combine1Ctx.mockOutput());
+        reduceCtx.makeTreeOfWritables(combine2Ctx.mockOutput());
+
+        reduceCtx.run();
+
+        reduceCtx.taskInfo(new GridHadoopTaskInfo(HadoopTaskType.COMMIT, gridJob.id(), 0, 0, null));
+
+        reduceCtx.run();
+
+        assertEquals(
+            "blue\t200\n" +
+            "green\t150\n" +
+            "red\t100\n" +
+            "yellow\t70\n",
+            readAndSortFile(PATH_OUTPUT + "/" + getOutputFileNamePrefix() + "00000")
+        );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
new file mode 100644
index 0000000..d932a8f
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.ignite.internal.processors.hadoop.examples.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+
+import java.io.*;
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
+
+/**
+ * Tests of Map, Combine and Reduce task executions via running of job of hadoop API v1.
+ */
+public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest {
+    /**
+     * Creates WordCount hadoop job for API v1.
+     *
+     * @param inFile Input file name for the job.
+     * @param outFile Output file name for the job.
+     * @return Hadoop job.
+     * @throws IOException If fails.
+     */
+    @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception {
+        JobConf jobConf = GridHadoopWordCount1.getJob(inFile, outFile);
+
+        setupFileSystems(jobConf);
+
+        HadoopDefaultJobInfo jobInfo = createJobInfo(jobConf);
+
+        GridHadoopJobId jobId = new GridHadoopJobId(new UUID(0, 0), 0);
+
+        return new HadoopV2Job(jobId, jobInfo, log);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getOutputFileNamePrefix() {
+        return "part-";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
new file mode 100644
index 0000000..2e1e9fd
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.*;
+import org.apache.hadoop.mapreduce.lib.output.*;
+import org.apache.ignite.internal.processors.hadoop.examples.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
+
+/**
+ * Tests of Map, Combine and Reduce task executions via running of job of hadoop API v2.
+ */
+public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest {
+    /**
+     * Creates WordCount hadoop job for API v2.
+     *
+     * @param inFile Input file name for the job.
+     * @param outFile Output file name for the job.
+     * @return Hadoop job.
+     * @throws Exception if fails.
+     */
+    @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception {
+        Job job = Job.getInstance();
+
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(IntWritable.class);
+
+        GridHadoopWordCount2.setTasksClasses(job, true, true, true);
+
+        Configuration conf = job.getConfiguration();
+
+        setupFileSystems(conf);
+
+        FileInputFormat.setInputPaths(job, new Path(inFile));
+        FileOutputFormat.setOutputPath(job, new Path(outFile));
+
+        job.setJarByClass(GridHadoopWordCount2.class);
+
+        Job hadoopJob = GridHadoopWordCount2.getJob(inFile, outFile);
+
+        HadoopDefaultJobInfo jobInfo = createJobInfo(hadoopJob.getConfiguration());
+
+        GridHadoopJobId jobId = new GridHadoopJobId(new UUID(0, 0), 0);
+
+        return new HadoopV2Job(jobId, jobInfo, log);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getOutputFileNamePrefix() {
+        return "part-r-";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java
index 9b56300..c3c8806 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java
@@ -79,7 +79,7 @@ class HadoopTestTaskContext extends HadoopV2TaskContext {
     private Map<Object,List> mockInput = new TreeMap<>();
 
     /** Context output implementation to write data into mockOutput. */
-    private GridHadoopTaskOutput output = new GridHadoopTaskOutput() {
+    private HadoopTaskOutput output = new HadoopTaskOutput() {
         /** {@inheritDoc} */
         @Override public void write(Object key, Object val) {
             //Check of casting and extract/copy values
@@ -96,7 +96,7 @@ class HadoopTestTaskContext extends HadoopV2TaskContext {
     };
 
     /** Context input implementation to read data from mockInput. */
-    private GridHadoopTaskInput input = new GridHadoopTaskInput() {
+    private HadoopTaskInput input = new HadoopTaskInput() {
         /** Iterator of keys and associated lists of values. */
         Iterator<Map.Entry<Object, List>> iter;
 
@@ -178,7 +178,7 @@ class HadoopTestTaskContext extends HadoopV2TaskContext {
      * @param taskInfo Task info.
      * @param gridJob Grid Hadoop job.
      */
-    public HadoopTestTaskContext(GridHadoopTaskInfo taskInfo, GridHadoopJob gridJob) throws IgniteCheckedException {
+    public HadoopTestTaskContext(GridHadoopTaskInfo taskInfo, HadoopJob gridJob) throws IgniteCheckedException {
         super(taskInfo, gridJob, gridJob.id(), null, jobConfDataInput(gridJob));
     }
 
@@ -189,7 +189,7 @@ class HadoopTestTaskContext extends HadoopV2TaskContext {
      * @return DataInput with JobConf.
      * @throws IgniteCheckedException If failed.
      */
-    private static DataInput jobConfDataInput(GridHadoopJob job) throws IgniteCheckedException {
+    private static DataInput jobConfDataInput(HadoopJob job) throws IgniteCheckedException {
         JobConf jobConf = new JobConf();
 
         for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)job.info()).properties().entrySet())
@@ -208,12 +208,12 @@ class HadoopTestTaskContext extends HadoopV2TaskContext {
     }
 
     /** {@inheritDoc} */
-    @Override public GridHadoopTaskOutput output() {
+    @Override public HadoopTaskOutput output() {
         return output;
     }
 
     /** {@inheritDoc} */
-    @Override public GridHadoopTaskInput input() {
+    @Override public HadoopTaskInput input() {
         return input;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
index 66e35b5..222ba17 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
@@ -32,7 +32,7 @@ import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
 /**
  * Self test of {@link org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job}.
  */
-public class HadoopV2JobSelfTest extends GridHadoopAbstractSelfTest {
+public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest {
     /** */
     private static final String TEST_SERIALIZED_VALUE = "Test serialized value";
 
@@ -55,7 +55,7 @@ public class HadoopV2JobSelfTest extends GridHadoopAbstractSelfTest {
     }
 
     /**
-     * Tests that {@link GridHadoopJob} provides wrapped serializer if it's set in configuration.
+     * Tests that {@link HadoopJob} provides wrapped serializer if it's set in configuration.
      *
      * @throws IgniteCheckedException If fails.
      */
@@ -66,12 +66,12 @@ public class HadoopV2JobSelfTest extends GridHadoopAbstractSelfTest {
         cfg.setMapOutputValueClass(Text.class);
         cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName());
 
-        GridHadoopJob job = new HadoopV2Job(new GridHadoopJobId(UUID.randomUUID(), 1), createJobInfo(cfg), log);
+        HadoopJob job = new HadoopV2Job(new GridHadoopJobId(UUID.randomUUID(), 1), createJobInfo(cfg), log);
 
-        GridHadoopTaskContext taskCtx = job.getTaskContext(new GridHadoopTaskInfo(GridHadoopTaskType.MAP, null, 0, 0,
+        HadoopTaskContext taskCtx = job.getTaskContext(new GridHadoopTaskInfo(HadoopTaskType.MAP, null, 0, 0,
             null));
 
-        GridHadoopSerialization ser = taskCtx.keySerialization();
+        HadoopSerialization ser = taskCtx.keySerialization();
 
         assertEquals(HadoopSerializationWrapper.class.getName(), ser.getClass().getName());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopValidationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopValidationSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopValidationSelfTest.java
new file mode 100644
index 0000000..558dec5
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopValidationSelfTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.configuration.*;
+
+/**
+ * Configuration validation tests.
+ */
+public class HadoopValidationSelfTest extends HadoopAbstractSelfTest {
+    /** Peer class loading enabled flag. */
+    public boolean peerClassLoading;
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids(true);
+
+        peerClassLoading = false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setPeerClassLoadingEnabled(peerClassLoading);
+
+        return cfg;
+    }
+
+    /**
+     * Ensure that Grid starts when all configuration parameters are valid.
+     *
+     * @throws Exception If failed.
+     */
+    public void testValid() throws Exception {
+        startGrids(1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopAbstractMapTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopAbstractMapTest.java
deleted file mode 100644
index aa0ddc1..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopAbstractMapTest.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
-
-import org.apache.commons.collections.comparators.*;
-import org.apache.hadoop.io.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
-import org.apache.ignite.testframework.junits.common.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Abstract class for maps test.
- */
-public abstract class GridHadoopAbstractMapTest extends GridCommonAbstractTest {
-    /**
-     * Test task context.
-     */
-    protected static class TaskContext extends GridHadoopTaskContext {
-        /**
-         */
-        protected TaskContext() {
-            super(null, null);
-        }
-
-        /** {@inheritDoc} */
-        @Override public <T extends GridHadoopCounter> T counter(String grp, String name, Class<T> cls) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridHadoopCounters counters() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridHadoopPartitioner partitioner() throws IgniteCheckedException {
-            assert false;
-
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridHadoopSerialization keySerialization() throws IgniteCheckedException {
-            return new HadoopWritableSerialization(IntWritable.class);
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridHadoopSerialization valueSerialization() throws IgniteCheckedException {
-            return new HadoopWritableSerialization(IntWritable.class);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Comparator<Object> sortComparator() {
-            return ComparableComparator.getInstance();
-        }
-
-        /** {@inheritDoc} */
-        @Override public Comparator<Object> groupComparator() {
-            return ComparableComparator.getInstance();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void run() throws IgniteCheckedException {
-            assert false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void cancel() {
-            assert false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void prepareTaskEnvironment() throws IgniteCheckedException {
-            assert false;
-        }
-
-        @Override public void cleanupTaskEnvironment() throws IgniteCheckedException {
-            assert false;
-        }
-    }
-
-    /**
-     * Test job info.
-     */
-    protected static class JobInfo implements GridHadoopJobInfo {
-        /** {@inheritDoc} */
-        @Nullable @Override public String property(String name) {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean hasCombiner() {
-            assert false;
-
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean hasReducer() {
-            assert false;
-
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridHadoopJob createJob(GridHadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException {
-            assert false;
-
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int reducers() {
-            assert false;
-
-            return 0;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String jobName() {
-            assert false;
-
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String user() {
-            assert false;
-
-            return null;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopConcurrentHashMultimapSelftest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopConcurrentHashMultimapSelftest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopConcurrentHashMultimapSelftest.java
index 8b5d2b6..43b1f02 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopConcurrentHashMultimapSelftest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopConcurrentHashMultimapSelftest.java
@@ -34,7 +34,7 @@ import static org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*;
 /**
  *
  */
-public class GridHadoopConcurrentHashMultimapSelftest extends GridHadoopAbstractMapTest {
+public class GridHadoopConcurrentHashMultimapSelftest extends HadoopAbstractMapTest {
     /** */
     public void testMapSimple() throws Exception {
         GridUnsafeMemory mem = new GridUnsafeMemory(0);
@@ -52,7 +52,7 @@ public class GridHadoopConcurrentHashMultimapSelftest extends GridHadoopAbstract
 
         GridHadoopJobInfo job = new JobInfo();
 
-        GridHadoopTaskContext taskCtx = new TaskContext();
+        HadoopTaskContext taskCtx = new TaskContext();
 
         HadoopConcurrentHashMultimap m = new HadoopConcurrentHashMultimap(job, mem, mapSize);
 
@@ -91,8 +91,8 @@ public class GridHadoopConcurrentHashMultimapSelftest extends GridHadoopAbstract
     }
 
     private void check(HadoopConcurrentHashMultimap m, Multimap<Integer, Integer> mm,
-        final Multimap<Integer, Integer> vis, GridHadoopTaskContext taskCtx) throws Exception {
-        final GridHadoopTaskInput in = m.input(taskCtx);
+        final Multimap<Integer, Integer> vis, HadoopTaskContext taskCtx) throws Exception {
+        final HadoopTaskInput in = m.input(taskCtx);
 
         Map<Integer, Collection<Integer>> mmm = mm.asMap();
 
@@ -182,7 +182,7 @@ public class GridHadoopConcurrentHashMultimapSelftest extends GridHadoopAbstract
         for (int i = 0; i < 20; i++) {
             GridHadoopJobInfo job = new JobInfo();
 
-            final GridHadoopTaskContext taskCtx = new TaskContext();
+            final HadoopTaskContext taskCtx = new TaskContext();
 
             final HadoopConcurrentHashMultimap m = new HadoopConcurrentHashMultimap(job, mem, 16);
 
@@ -238,7 +238,7 @@ public class GridHadoopConcurrentHashMultimapSelftest extends GridHadoopAbstract
 
             assertTrue(m.capacity() > 32000);
 
-            GridHadoopTaskInput in = m.input(taskCtx);
+            HadoopTaskInput in = m.input(taskCtx);
 
             while (in.next()) {
                 IntWritable key = (IntWritable) in.key();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopHashMapSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopHashMapSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopHashMapSelfTest.java
deleted file mode 100644
index 90d957b..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopHashMapSelfTest.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
-
-import com.google.common.collect.*;
-import org.apache.hadoop.io.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.offheap.unsafe.*;
-import org.apache.ignite.internal.util.typedef.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- *
- */
-public class GridHadoopHashMapSelfTest extends GridHadoopAbstractMapTest {
-
-    public void _testAllocation() throws Exception {
-        final GridUnsafeMemory mem = new GridUnsafeMemory(0);
-
-        long size = 3L * 1024 * 1024 * 1024;
-
-        final long chunk = 16;// * 1024;
-
-        final int page = 4 * 1024;
-
-        final int writes = chunk < page ? 1 : (int)(chunk / page);
-
-        final long cnt = size / chunk;
-
-        assert cnt < Integer.MAX_VALUE;
-
-        final int threads = 4;
-
-        long start = System.currentTimeMillis();
-
-        multithreaded(new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                int cnt0 = (int)(cnt / threads);
-
-                for (int i = 0; i < cnt0; i++) {
-                    long ptr = mem.allocate(chunk);
-
-                    for (int j = 0; j < writes; j++)
-                        mem.writeInt(ptr + j * page, 100500);
-                }
-
-                return null;
-            }
-        }, threads);
-
-        X.println("End: " + (System.currentTimeMillis() - start) + " mem: " + mem.allocatedSize() + " cnt: " + cnt);
-
-        Thread.sleep(30000);
-    }
-
-
-    /** */
-    public void testMapSimple() throws Exception {
-        GridUnsafeMemory mem = new GridUnsafeMemory(0);
-
-//        mem.listen(new GridOffHeapEventListener() {
-//            @Override public void onEvent(GridOffHeapEvent evt) {
-//                if (evt == GridOffHeapEvent.ALLOCATE)
-//                    U.dumpStack();
-//            }
-//        });
-
-        Random rnd = new Random();
-
-        int mapSize = 16 << rnd.nextInt(3);
-
-        GridHadoopTaskContext taskCtx = new TaskContext();
-
-        final HadoopHashMultimap m = new HadoopHashMultimap(new JobInfo(), mem, mapSize);
-
-        HadoopMultimap.Adder a = m.startAdding(taskCtx);
-
-        Multimap<Integer, Integer> mm = ArrayListMultimap.create();
-
-        for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) {
-            int key = rnd.nextInt(mapSize);
-            int val = rnd.nextInt();
-
-            a.write(new IntWritable(key), new IntWritable(val));
-            mm.put(key, val);
-
-            X.println("k: " + key + " v: " + val);
-
-            a.close();
-
-            check(m, mm, taskCtx);
-
-            a = m.startAdding(taskCtx);
-        }
-
-//        a.add(new IntWritable(10), new IntWritable(2));
-//        mm.put(10, 2);
-//        check(m, mm);
-
-        a.close();
-
-        X.println("Alloc: " + mem.allocatedSize());
-
-        m.close();
-
-        assertEquals(0, mem.allocatedSize());
-    }
-
-    private void check(HadoopHashMultimap m, Multimap<Integer, Integer> mm, GridHadoopTaskContext taskCtx) throws Exception {
-        final GridHadoopTaskInput in = m.input(taskCtx);
-
-        Map<Integer, Collection<Integer>> mmm = mm.asMap();
-
-        int keys = 0;
-
-        while (in.next()) {
-            keys++;
-
-            IntWritable k = (IntWritable)in.key();
-
-            assertNotNull(k);
-
-            ArrayList<Integer> vs = new ArrayList<>();
-
-            Iterator<?> it = in.values();
-
-            while (it.hasNext())
-                vs.add(((IntWritable) it.next()).get());
-
-            Collection<Integer> exp = mmm.get(k.get());
-
-            assertEquals(sorted(exp), sorted(vs));
-        }
-
-        X.println("keys: " + keys + " cap: " + m.capacity());
-
-        assertEquals(mmm.size(), keys);
-
-        assertEquals(m.keys(), keys);
-
-        in.close();
-    }
-
-    private GridLongList sorted(Collection<Integer> col) {
-        GridLongList lst = new GridLongList(col.size());
-
-        for (Integer i : col)
-            lst.add(i);
-
-        return lst.sort();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipListSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipListSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipListSelfTest.java
deleted file mode 100644
index 3d930ff..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipListSelfTest.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
-
-import com.google.common.collect.*;
-import org.apache.hadoop.io.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.io.*;
-import org.apache.ignite.internal.util.offheap.unsafe.*;
-import org.apache.ignite.internal.util.typedef.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-import static java.lang.Math.*;
-import static org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*;
-
-/**
- * Skip list tests.
- */
-public class GridHadoopSkipListSelfTest extends GridHadoopAbstractMapTest {
-    /**
-     *
-     */
-    public void testLevel() {
-        Random rnd = new GridRandom();
-
-        int[] levelsCnts = new int[32];
-
-        int all = 10000;
-
-        for (int i = 0; i < all; i++) {
-            int level = HadoopSkipList.randomLevel(rnd);
-
-            levelsCnts[level]++;
-        }
-
-        X.println("Distribution: " + Arrays.toString(levelsCnts));
-
-        for (int level = 0; level < levelsCnts.length; level++) {
-            int exp = (level + 1) == levelsCnts.length ? 0 : all >>> (level + 1);
-
-            double precission = 0.72 / Math.max(32 >>> level, 1);
-
-            int sigma = max((int)ceil(precission * exp), 5);
-
-            X.println("Level: " + level + " exp: " + exp + " act: " + levelsCnts[level] + " precission: " + precission +
-                " sigma: " + sigma);
-
-            assertTrue(abs(exp - levelsCnts[level]) <= sigma);
-        }
-    }
-
-    public void testMapSimple() throws Exception {
-        GridUnsafeMemory mem = new GridUnsafeMemory(0);
-
-//        mem.listen(new GridOffHeapEventListener() {
-//            @Override public void onEvent(GridOffHeapEvent evt) {
-//                if (evt == GridOffHeapEvent.ALLOCATE)
-//                    U.dumpStack();
-//            }
-//        });
-
-        Random rnd = new Random();
-
-        int mapSize = 16 << rnd.nextInt(6);
-
-        GridHadoopJobInfo job = new JobInfo();
-
-        GridHadoopTaskContext taskCtx = new TaskContext();
-
-        HadoopMultimap m = new HadoopSkipList(job, mem);
-
-        HadoopConcurrentHashMultimap.Adder a = m.startAdding(taskCtx);
-
-        Multimap<Integer, Integer> mm = ArrayListMultimap.create();
-        Multimap<Integer, Integer> vis = ArrayListMultimap.create();
-
-        for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) {
-            int key = rnd.nextInt(mapSize);
-            int val = rnd.nextInt();
-
-            a.write(new IntWritable(key), new IntWritable(val));
-            mm.put(key, val);
-
-            X.println("k: " + key + " v: " + val);
-
-            a.close();
-
-            check(m, mm, vis, taskCtx);
-
-            a = m.startAdding(taskCtx);
-        }
-
-//        a.add(new IntWritable(10), new IntWritable(2));
-//        mm.put(10, 2);
-//        check(m, mm);
-
-        a.close();
-
-        X.println("Alloc: " + mem.allocatedSize());
-
-        m.close();
-
-        assertEquals(0, mem.allocatedSize());
-    }
-
-    private void check(HadoopMultimap m, Multimap<Integer, Integer> mm, final Multimap<Integer, Integer> vis, GridHadoopTaskContext taskCtx)
-        throws Exception {
-        final GridHadoopTaskInput in = m.input(taskCtx);
-
-        Map<Integer, Collection<Integer>> mmm = mm.asMap();
-
-        int keys = 0;
-
-        int prevKey = Integer.MIN_VALUE;
-
-        while (in.next()) {
-            keys++;
-
-            IntWritable k = (IntWritable)in.key();
-
-            assertNotNull(k);
-
-            assertTrue(k.get() > prevKey);
-
-            prevKey = k.get();
-
-            Deque<Integer> vs = new LinkedList<>();
-
-            Iterator<?> it = in.values();
-
-            while (it.hasNext())
-                vs.addFirst(((IntWritable) it.next()).get());
-
-            Collection<Integer> exp = mmm.get(k.get());
-
-            assertEquals(exp, vs);
-        }
-
-        assertEquals(mmm.size(), keys);
-
-//!        assertEquals(m.keys(), keys);
-
-        // Check visitor.
-
-        final byte[] buf = new byte[4];
-
-        final GridDataInput dataInput = new GridUnsafeDataInput();
-
-        m.visit(false, new HadoopConcurrentHashMultimap.Visitor() {
-            /** */
-            IntWritable key = new IntWritable();
-
-            /** */
-            IntWritable val = new IntWritable();
-
-            @Override public void onKey(long keyPtr, int keySize) {
-                read(keyPtr, keySize, key);
-            }
-
-            @Override public void onValue(long valPtr, int valSize) {
-                read(valPtr, valSize, val);
-
-                vis.put(key.get(), val.get());
-            }
-
-            private void read(long ptr, int size, Writable w) {
-                assert size == 4 : size;
-
-                UNSAFE.copyMemory(null, ptr, buf, BYTE_ARR_OFF, size);
-
-                dataInput.bytes(buf, size);
-
-                try {
-                    w.readFields(dataInput);
-                }
-                catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        });
-
-//        X.println("vis: " + vis);
-
-        assertEquals(mm, vis);
-
-        in.close();
-    }
-
-    /**
-     * @throws Exception if failed.
-     */
-    public void testMultiThreaded() throws Exception {
-        GridUnsafeMemory mem = new GridUnsafeMemory(0);
-
-        X.println("___ Started");
-
-        Random rnd = new GridRandom();
-
-        for (int i = 0; i < 20; i++) {
-            GridHadoopJobInfo job = new JobInfo();
-
-            final GridHadoopTaskContext taskCtx = new TaskContext();
-
-            final HadoopMultimap m = new HadoopSkipList(job, mem);
-
-            final ConcurrentMap<Integer, Collection<Integer>> mm = new ConcurrentHashMap<>();
-
-            X.println("___ MT");
-
-            multithreaded(new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    X.println("___ TH in");
-
-                    Random rnd = new GridRandom();
-
-                    IntWritable key = new IntWritable();
-                    IntWritable val = new IntWritable();
-
-                    HadoopMultimap.Adder a = m.startAdding(taskCtx);
-
-                    for (int i = 0; i < 50000; i++) {
-                        int k = rnd.nextInt(32000);
-                        int v = rnd.nextInt();
-
-                        key.set(k);
-                        val.set(v);
-
-                        a.write(key, val);
-
-                        Collection<Integer> list = mm.get(k);
-
-                        if (list == null) {
-                            list = new ConcurrentLinkedQueue<>();
-
-                            Collection<Integer> old = mm.putIfAbsent(k, list);
-
-                            if (old != null)
-                                list = old;
-                        }
-
-                        list.add(v);
-                    }
-
-                    a.close();
-
-                    X.println("___ TH out");
-
-                    return null;
-                }
-            }, 3 + rnd.nextInt(27));
-
-            GridHadoopTaskInput in = m.input(taskCtx);
-
-            int prevKey = Integer.MIN_VALUE;
-
-            while (in.next()) {
-                IntWritable key = (IntWritable)in.key();
-
-                assertTrue(key.get() > prevKey);
-
-                prevKey = key.get();
-
-                Iterator<?> valsIter = in.values();
-
-                Collection<Integer> vals = mm.remove(key.get());
-
-                assertNotNull(vals);
-
-                while (valsIter.hasNext()) {
-                    IntWritable val = (IntWritable) valsIter.next();
-
-                    assertTrue(vals.remove(val.get()));
-                }
-
-                assertTrue(vals.isEmpty());
-            }
-
-            in.close();
-            m.close();
-
-            assertEquals(0, mem.allocatedSize());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
new file mode 100644
index 0000000..e98e82c
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
+
+import org.apache.commons.collections.comparators.*;
+import org.apache.hadoop.io.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Abstract class for maps test.
+ */
+public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest {
+    /**
+     * Test task context.
+     */
+    protected static class TaskContext extends HadoopTaskContext {
+        /**
+         */
+        protected TaskContext() {
+            super(null, null);
+        }
+
+        /** {@inheritDoc} */
+        @Override public <T extends HadoopCounter> T counter(String grp, String name, Class<T> cls) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridHadoopCounters counters() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public HadoopPartitioner partitioner() throws IgniteCheckedException {
+            assert false;
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public HadoopSerialization keySerialization() throws IgniteCheckedException {
+            return new HadoopWritableSerialization(IntWritable.class);
+        }
+
+        /** {@inheritDoc} */
+        @Override public HadoopSerialization valueSerialization() throws IgniteCheckedException {
+            return new HadoopWritableSerialization(IntWritable.class);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Comparator<Object> sortComparator() {
+            return ComparableComparator.getInstance();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Comparator<Object> groupComparator() {
+            return ComparableComparator.getInstance();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() throws IgniteCheckedException {
+            assert false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void cancel() {
+            assert false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void prepareTaskEnvironment() throws IgniteCheckedException {
+            assert false;
+        }
+
+        @Override public void cleanupTaskEnvironment() throws IgniteCheckedException {
+            assert false;
+        }
+    }
+
+    /**
+     * Test job info.
+     */
+    protected static class JobInfo implements GridHadoopJobInfo {
+        /** {@inheritDoc} */
+        @Nullable @Override public String property(String name) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasCombiner() {
+            assert false;
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasReducer() {
+            assert false;
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public HadoopJob createJob(GridHadoopJobId jobId, IgniteLogger log) throws IgniteCheckedException {
+            assert false;
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int reducers() {
+            assert false;
+
+            return 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String jobName() {
+            assert false;
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String user() {
+            assert false;
+
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMapSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMapSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMapSelfTest.java
new file mode 100644
index 0000000..5b1b6a8
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMapSelfTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
+
+import com.google.common.collect.*;
+import org.apache.hadoop.io.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.offheap.unsafe.*;
+import org.apache.ignite.internal.util.typedef.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ *
+ */
+public class HadoopHashMapSelfTest extends HadoopAbstractMapTest {
+
+    public void _testAllocation() throws Exception {
+        final GridUnsafeMemory mem = new GridUnsafeMemory(0);
+
+        long size = 3L * 1024 * 1024 * 1024;
+
+        final long chunk = 16;// * 1024;
+
+        final int page = 4 * 1024;
+
+        final int writes = chunk < page ? 1 : (int)(chunk / page);
+
+        final long cnt = size / chunk;
+
+        assert cnt < Integer.MAX_VALUE;
+
+        final int threads = 4;
+
+        long start = System.currentTimeMillis();
+
+        multithreaded(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                int cnt0 = (int)(cnt / threads);
+
+                for (int i = 0; i < cnt0; i++) {
+                    long ptr = mem.allocate(chunk);
+
+                    for (int j = 0; j < writes; j++)
+                        mem.writeInt(ptr + j * page, 100500);
+                }
+
+                return null;
+            }
+        }, threads);
+
+        X.println("End: " + (System.currentTimeMillis() - start) + " mem: " + mem.allocatedSize() + " cnt: " + cnt);
+
+        Thread.sleep(30000);
+    }
+
+
+    /** */
+    public void testMapSimple() throws Exception {
+        GridUnsafeMemory mem = new GridUnsafeMemory(0);
+
+//        mem.listen(new GridOffHeapEventListener() {
+//            @Override public void onEvent(GridOffHeapEvent evt) {
+//                if (evt == GridOffHeapEvent.ALLOCATE)
+//                    U.dumpStack();
+//            }
+//        });
+
+        Random rnd = new Random();
+
+        int mapSize = 16 << rnd.nextInt(3);
+
+        HadoopTaskContext taskCtx = new TaskContext();
+
+        final HadoopHashMultimap m = new HadoopHashMultimap(new JobInfo(), mem, mapSize);
+
+        HadoopMultimap.Adder a = m.startAdding(taskCtx);
+
+        Multimap<Integer, Integer> mm = ArrayListMultimap.create();
+
+        for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) {
+            int key = rnd.nextInt(mapSize);
+            int val = rnd.nextInt();
+
+            a.write(new IntWritable(key), new IntWritable(val));
+            mm.put(key, val);
+
+            X.println("k: " + key + " v: " + val);
+
+            a.close();
+
+            check(m, mm, taskCtx);
+
+            a = m.startAdding(taskCtx);
+        }
+
+//        a.add(new IntWritable(10), new IntWritable(2));
+//        mm.put(10, 2);
+//        check(m, mm);
+
+        a.close();
+
+        X.println("Alloc: " + mem.allocatedSize());
+
+        m.close();
+
+        assertEquals(0, mem.allocatedSize());
+    }
+
+    private void check(HadoopHashMultimap m, Multimap<Integer, Integer> mm, HadoopTaskContext taskCtx) throws Exception {
+        final HadoopTaskInput in = m.input(taskCtx);
+
+        Map<Integer, Collection<Integer>> mmm = mm.asMap();
+
+        int keys = 0;
+
+        while (in.next()) {
+            keys++;
+
+            IntWritable k = (IntWritable)in.key();
+
+            assertNotNull(k);
+
+            ArrayList<Integer> vs = new ArrayList<>();
+
+            Iterator<?> it = in.values();
+
+            while (it.hasNext())
+                vs.add(((IntWritable) it.next()).get());
+
+            Collection<Integer> exp = mmm.get(k.get());
+
+            assertEquals(sorted(exp), sorted(vs));
+        }
+
+        X.println("keys: " + keys + " cap: " + m.capacity());
+
+        assertEquals(mmm.size(), keys);
+
+        assertEquals(m.keys(), keys);
+
+        in.close();
+    }
+
+    private GridLongList sorted(Collection<Integer> col) {
+        GridLongList lst = new GridLongList(col.size());
+
+        for (Integer i : col)
+            lst.add(i);
+
+        return lst.sort();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/06525cad/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java
new file mode 100644
index 0000000..52512cf
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
+
+import com.google.common.collect.*;
+import org.apache.hadoop.io.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.io.*;
+import org.apache.ignite.internal.util.offheap.unsafe.*;
+import org.apache.ignite.internal.util.typedef.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static java.lang.Math.*;
+import static org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*;
+
+/**
+ * Skip list tests.
+ */
+public class HadoopSkipListSelfTest extends HadoopAbstractMapTest {
+    /**
+     *
+     */
+    public void testLevel() {
+        Random rnd = new GridRandom();
+
+        int[] levelsCnts = new int[32];
+
+        int all = 10000;
+
+        for (int i = 0; i < all; i++) {
+            int level = HadoopSkipList.randomLevel(rnd);
+
+            levelsCnts[level]++;
+        }
+
+        X.println("Distribution: " + Arrays.toString(levelsCnts));
+
+        for (int level = 0; level < levelsCnts.length; level++) {
+            int exp = (level + 1) == levelsCnts.length ? 0 : all >>> (level + 1);
+
+            double precission = 0.72 / Math.max(32 >>> level, 1);
+
+            int sigma = max((int)ceil(precission * exp), 5);
+
+            X.println("Level: " + level + " exp: " + exp + " act: " + levelsCnts[level] + " precission: " + precission +
+                " sigma: " + sigma);
+
+            assertTrue(abs(exp - levelsCnts[level]) <= sigma);
+        }
+    }
+
+    public void testMapSimple() throws Exception {
+        GridUnsafeMemory mem = new GridUnsafeMemory(0);
+
+//        mem.listen(new GridOffHeapEventListener() {
+//            @Override public void onEvent(GridOffHeapEvent evt) {
+//                if (evt == GridOffHeapEvent.ALLOCATE)
+//                    U.dumpStack();
+//            }
+//        });
+
+        Random rnd = new Random();
+
+        int mapSize = 16 << rnd.nextInt(6);
+
+        GridHadoopJobInfo job = new JobInfo();
+
+        HadoopTaskContext taskCtx = new TaskContext();
+
+        HadoopMultimap m = new HadoopSkipList(job, mem);
+
+        HadoopConcurrentHashMultimap.Adder a = m.startAdding(taskCtx);
+
+        Multimap<Integer, Integer> mm = ArrayListMultimap.create();
+        Multimap<Integer, Integer> vis = ArrayListMultimap.create();
+
+        for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) {
+            int key = rnd.nextInt(mapSize);
+            int val = rnd.nextInt();
+
+            a.write(new IntWritable(key), new IntWritable(val));
+            mm.put(key, val);
+
+            X.println("k: " + key + " v: " + val);
+
+            a.close();
+
+            check(m, mm, vis, taskCtx);
+
+            a = m.startAdding(taskCtx);
+        }
+
+//        a.add(new IntWritable(10), new IntWritable(2));
+//        mm.put(10, 2);
+//        check(m, mm);
+
+        a.close();
+
+        X.println("Alloc: " + mem.allocatedSize());
+
+        m.close();
+
+        assertEquals(0, mem.allocatedSize());
+    }
+
+    private void check(HadoopMultimap m, Multimap<Integer, Integer> mm, final Multimap<Integer, Integer> vis, HadoopTaskContext taskCtx)
+        throws Exception {
+        final HadoopTaskInput in = m.input(taskCtx);
+
+        Map<Integer, Collection<Integer>> mmm = mm.asMap();
+
+        int keys = 0;
+
+        int prevKey = Integer.MIN_VALUE;
+
+        while (in.next()) {
+            keys++;
+
+            IntWritable k = (IntWritable)in.key();
+
+            assertNotNull(k);
+
+            assertTrue(k.get() > prevKey);
+
+            prevKey = k.get();
+
+            Deque<Integer> vs = new LinkedList<>();
+
+            Iterator<?> it = in.values();
+
+            while (it.hasNext())
+                vs.addFirst(((IntWritable) it.next()).get());
+
+            Collection<Integer> exp = mmm.get(k.get());
+
+            assertEquals(exp, vs);
+        }
+
+        assertEquals(mmm.size(), keys);
+
+//!        assertEquals(m.keys(), keys);
+
+        // Check visitor.
+
+        final byte[] buf = new byte[4];
+
+        final GridDataInput dataInput = new GridUnsafeDataInput();
+
+        m.visit(false, new HadoopConcurrentHashMultimap.Visitor() {
+            /** */
+            IntWritable key = new IntWritable();
+
+            /** */
+            IntWritable val = new IntWritable();
+
+            @Override public void onKey(long keyPtr, int keySize) {
+                read(keyPtr, keySize, key);
+            }
+
+            @Override public void onValue(long valPtr, int valSize) {
+                read(valPtr, valSize, val);
+
+                vis.put(key.get(), val.get());
+            }
+
+            private void read(long ptr, int size, Writable w) {
+                assert size == 4 : size;
+
+                UNSAFE.copyMemory(null, ptr, buf, BYTE_ARR_OFF, size);
+
+                dataInput.bytes(buf, size);
+
+                try {
+                    w.readFields(dataInput);
+                }
+                catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+
+//        X.println("vis: " + vis);
+
+        assertEquals(mm, vis);
+
+        in.close();
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testMultiThreaded() throws Exception {
+        GridUnsafeMemory mem = new GridUnsafeMemory(0);
+
+        X.println("___ Started");
+
+        Random rnd = new GridRandom();
+
+        for (int i = 0; i < 20; i++) {
+            GridHadoopJobInfo job = new JobInfo();
+
+            final HadoopTaskContext taskCtx = new TaskContext();
+
+            final HadoopMultimap m = new HadoopSkipList(job, mem);
+
+            final ConcurrentMap<Integer, Collection<Integer>> mm = new ConcurrentHashMap<>();
+
+            X.println("___ MT");
+
+            multithreaded(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    X.println("___ TH in");
+
+                    Random rnd = new GridRandom();
+
+                    IntWritable key = new IntWritable();
+                    IntWritable val = new IntWritable();
+
+                    HadoopMultimap.Adder a = m.startAdding(taskCtx);
+
+                    for (int i = 0; i < 50000; i++) {
+                        int k = rnd.nextInt(32000);
+                        int v = rnd.nextInt();
+
+                        key.set(k);
+                        val.set(v);
+
+                        a.write(key, val);
+
+                        Collection<Integer> list = mm.get(k);
+
+                        if (list == null) {
+                            list = new ConcurrentLinkedQueue<>();
+
+                            Collection<Integer> old = mm.putIfAbsent(k, list);
+
+                            if (old != null)
+                                list = old;
+                        }
+
+                        list.add(v);
+                    }
+
+                    a.close();
+
+                    X.println("___ TH out");
+
+                    return null;
+                }
+            }, 3 + rnd.nextInt(27));
+
+            HadoopTaskInput in = m.input(taskCtx);
+
+            int prevKey = Integer.MIN_VALUE;
+
+            while (in.next()) {
+                IntWritable key = (IntWritable)in.key();
+
+                assertTrue(key.get() > prevKey);
+
+                prevKey = key.get();
+
+                Iterator<?> valsIter = in.values();
+
+                Collection<Integer> vals = mm.remove(key.get());
+
+                assertNotNull(vals);
+
+                while (valsIter.hasNext()) {
+                    IntWritable val = (IntWritable) valsIter.next();
+
+                    assertTrue(vals.remove(val.get()));
+                }
+
+                assertTrue(vals.isEmpty());
+            }
+
+            in.close();
+            m.close();
+
+            assertEquals(0, mem.allocatedSize());
+        }
+    }
+}