You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/21 14:54:09 UTC

[77/92] [abbrv] ignite git commit: Preparing big move.

http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopMultimapBase.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopMultimapBase.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopMultimapBase.java
deleted file mode 100644
index 3ace04c..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopMultimapBase.java
+++ /dev/null
@@ -1,435 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections;
-
-import java.io.DataInput;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.impl.shuffle.streams.HadoopDataInStream;
-import org.apache.ignite.internal.processors.hadoop.impl.shuffle.streams.HadoopDataOutStream;
-import org.apache.ignite.internal.processors.hadoop.impl.shuffle.streams.HadoopOffheapBuffer;
-import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
-import org.jetbrains.annotations.Nullable;
-
-import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_OFFHEAP_PAGE_SIZE;
-import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get;
-
-/**
- * Base class for all multimaps.
- */
-public abstract class HadoopMultimapBase implements HadoopMultimap {
-    /** */
-    protected final GridUnsafeMemory mem;
-
-    /** */
-    protected final int pageSize;
-
-    /** */
-    private final Collection<Page> allPages = new ConcurrentLinkedQueue<>();
-
-    /**
-     * @param jobInfo Job info.
-     * @param mem Memory.
-     */
-    protected HadoopMultimapBase(HadoopJobInfo jobInfo, GridUnsafeMemory mem) {
-        assert jobInfo != null;
-        assert mem != null;
-
-        this.mem = mem;
-
-        pageSize = get(jobInfo, SHUFFLE_OFFHEAP_PAGE_SIZE, 32 * 1024);
-    }
-
-    /**
-     * @param page Page.
-     */
-    private void deallocate(Page page) {
-        assert page != null;
-
-        mem.release(page.ptr, page.size);
-    }
-
-    /**
-     * @param valPtr Value page pointer.
-     * @param nextValPtr Next value page pointer.
-     */
-    protected void nextValue(long valPtr, long nextValPtr) {
-        mem.writeLong(valPtr, nextValPtr);
-    }
-
-    /**
-     * @param valPtr Value page pointer.
-     * @return Next value page pointer.
-     */
-    protected long nextValue(long valPtr) {
-        return mem.readLong(valPtr);
-    }
-
-    /**
-     * @param valPtr Value page pointer.
-     * @param size Size.
-     */
-    protected void valueSize(long valPtr, int size) {
-        mem.writeInt(valPtr + 8, size);
-    }
-
-    /**
-     * @param valPtr Value page pointer.
-     * @return Value size.
-     */
-    protected int valueSize(long valPtr) {
-        return mem.readInt(valPtr + 8);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close() {
-        for (Page page : allPages)
-            deallocate(page);
-    }
-
-    /**
-     * Reader for key and value.
-     */
-    protected class ReaderBase implements AutoCloseable {
-        /** */
-        private Object tmp;
-
-        /** */
-        private final HadoopSerialization ser;
-
-        /** */
-        private final HadoopDataInStream in = new HadoopDataInStream(mem);
-
-        /**
-         * @param ser Serialization.
-         */
-        protected ReaderBase(HadoopSerialization ser) {
-            assert ser != null;
-
-            this.ser = ser;
-        }
-
-        /**
-         * @param valPtr Value page pointer.
-         * @return Value.
-         */
-        public Object readValue(long valPtr) {
-            assert valPtr > 0 : valPtr;
-
-            try {
-                return read(valPtr + 12, valueSize(valPtr));
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-        }
-
-        /**
-         * Resets temporary object to the given one.
-         *
-         * @param tmp Temporary object for reuse.
-         */
-        public void resetReusedObject(Object tmp) {
-            this.tmp = tmp;
-        }
-
-        /**
-         * @param ptr Pointer.
-         * @param size Object size.
-         * @return Object.
-         */
-        protected Object read(long ptr, long size) throws IgniteCheckedException {
-            in.buffer().set(ptr, size);
-
-            tmp = ser.read(in, tmp);
-
-            return tmp;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close() throws IgniteCheckedException {
-            ser.close();
-        }
-    }
-
-    /**
-     * Base class for adders.
-     */
-    protected abstract class AdderBase implements Adder {
-        /** */
-        protected final HadoopSerialization keySer;
-
-        /** */
-        protected final HadoopSerialization valSer;
-
-        /** */
-        private final HadoopDataOutStream out;
-
-        /** */
-        private long writeStart;
-
-        /** Current page. */
-        private Page curPage;
-
-        /**
-         * @param ctx Task context.
-         * @throws IgniteCheckedException If failed.
-         */
-        protected AdderBase(HadoopTaskContext ctx) throws IgniteCheckedException {
-            valSer = ctx.valueSerialization();
-            keySer = ctx.keySerialization();
-
-            out = new HadoopDataOutStream(mem) {
-                @Override public long move(long size) {
-                    long ptr = super.move(size);
-
-                    if (ptr == 0) // Was not able to move - not enough free space.
-                        ptr = allocateNextPage(size);
-
-                    assert ptr != 0;
-
-                    return ptr;
-                }
-            };
-        }
-
-        /**
-         * @param requestedSize Requested size.
-         * @return Next write pointer.
-         */
-        private long allocateNextPage(long requestedSize) {
-            int writtenSize = writtenSize();
-
-            long newPageSize = nextPageSize(writtenSize + requestedSize);
-            long newPagePtr = mem.allocate(newPageSize);
-
-            HadoopOffheapBuffer b = out.buffer();
-
-            b.set(newPagePtr, newPageSize);
-
-            if (writtenSize != 0) {
-                mem.copyMemory(writeStart, newPagePtr, writtenSize);
-
-                b.move(writtenSize);
-            }
-
-            writeStart = newPagePtr;
-
-            // At this point old page is not needed, so we release it.
-            Page oldPage = curPage;
-
-            curPage = new Page(newPagePtr, newPageSize);
-
-            if (oldPage != null)
-                allPages.add(oldPage);
-
-            return b.move(requestedSize);
-        }
-
-        /**
-         * Get next page size.
-         *
-         * @param required Required amount of data.
-         * @return Next page size.
-         */
-        private long nextPageSize(long required) {
-            long pages = (required / pageSize) + 1;
-
-            long pagesPow2 = nextPowerOfTwo(pages);
-
-            return pagesPow2 * pageSize;
-        }
-
-        /**
-         * Get next power of two which greater or equal to the given number. Naive implementation.
-         *
-         * @param val Number
-         * @return Nearest pow2.
-         */
-        private long nextPowerOfTwo(long val) {
-            long res = 1;
-
-            while (res < val)
-                res = res << 1;
-
-            if (res < 0)
-                throw new IllegalArgumentException("Value is too big to find positive pow2: " + val);
-
-            return res;
-        }
-
-        /**
-         * @return Fixed pointer.
-         */
-        private long fixAlignment() {
-            HadoopOffheapBuffer b = out.buffer();
-
-            long ptr = b.pointer();
-
-            if ((ptr & 7L) != 0) { // Address is not aligned by octet.
-                ptr = (ptr + 8L) & ~7L;
-
-                b.pointer(ptr);
-            }
-
-            return ptr;
-        }
-
-        /**
-         * @param off Offset.
-         * @param o Object.
-         * @return Page pointer.
-         * @throws IgniteCheckedException If failed.
-         */
-        protected long write(int off, Object o, HadoopSerialization ser) throws IgniteCheckedException {
-            writeStart = fixAlignment();
-
-            if (off != 0)
-                out.move(off);
-
-            ser.write(out, o);
-
-            return writeStart;
-        }
-
-        /**
-         * @param size Size.
-         * @return Pointer.
-         */
-        protected long allocate(int size) {
-            writeStart = fixAlignment();
-
-            out.move(size);
-
-            return writeStart;
-        }
-
-        /**
-         * Rewinds local allocation pointer to the given pointer if possible.
-         *
-         * @param ptr Pointer.
-         */
-        protected void localDeallocate(long ptr) {
-            HadoopOffheapBuffer b = out.buffer();
-
-            if (b.isInside(ptr))
-                b.pointer(ptr);
-            else
-                b.reset();
-        }
-
-        /**
-         * @return Written size.
-         */
-        protected int writtenSize() {
-            return (int)(out.buffer().pointer() - writeStart);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException {
-            throw new UnsupportedOperationException();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close() throws IgniteCheckedException {
-            if (curPage != null)
-                allPages.add(curPage);
-
-            keySer.close();
-            valSer.close();
-        }
-    }
-
-    /**
-     * Iterator over values.
-     */
-    protected class ValueIterator implements Iterator<Object> {
-        /** */
-        private long valPtr;
-
-        /** */
-        private final ReaderBase valReader;
-
-        /**
-         * @param valPtr Value page pointer.
-         * @param valReader Value reader.
-         */
-        protected ValueIterator(long valPtr, ReaderBase valReader) {
-            this.valPtr = valPtr;
-            this.valReader = valReader;
-        }
-
-        /**
-         * @param valPtr Head value pointer.
-         */
-        public void head(long valPtr) {
-            this.valPtr = valPtr;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean hasNext() {
-            return valPtr != 0;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Object next() {
-            if (!hasNext())
-                throw new NoSuchElementException();
-
-            Object res = valReader.readValue(valPtr);
-
-            valPtr = nextValue(valPtr);
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void remove() {
-            throw new UnsupportedOperationException();
-        }
-    }
-
-    /**
-     * Page.
-     */
-    private static class Page {
-        /** Pointer. */
-        private final long ptr;
-
-        /** Size. */
-        private final long size;
-
-        /**
-         * Constructor.
-         *
-         * @param ptr Pointer.
-         * @param size Size.
-         */
-        public Page(long ptr, long size) {
-            this.ptr = ptr;
-            this.size = size;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopSkipList.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopSkipList.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopSkipList.java
deleted file mode 100644
index d374092..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopSkipList.java
+++ /dev/null
@@ -1,733 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections;
-
-import java.io.DataInput;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
-import org.apache.ignite.internal.util.GridLongList;
-import org.apache.ignite.internal.util.GridRandom;
-import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Skip list.
- */
-public class HadoopSkipList extends HadoopMultimapBase {
-    /** */
-    private static final int HEADS_SIZE = 24 + 33 * 8; // Offset + max level is from 0 to 32 inclusive.
-
-    /** Top level. */
-    private final AtomicInteger topLevel = new AtomicInteger(-1);
-
-    /** Heads for all the lists. */
-    private final long heads;
-
-    /** */
-    private final AtomicBoolean visitGuard = new AtomicBoolean();
-
-    /**
-     * @param jobInfo Job info.
-     * @param mem Memory.
-     */
-    public HadoopSkipList(HadoopJobInfo jobInfo, GridUnsafeMemory mem) {
-        super(jobInfo, mem);
-
-        heads = mem.allocate(HEADS_SIZE, true);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close() {
-        super.close();
-
-        mem.release(heads, HEADS_SIZE);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException {
-        if (!visitGuard.compareAndSet(false, true))
-            return false;
-
-        for (long meta = nextMeta(heads, 0); meta != 0L; meta = nextMeta(meta, 0)) {
-            long valPtr = value(meta);
-
-            long lastVisited = ignoreLastVisited ? 0 : lastVisitedValue(meta);
-
-            if (valPtr != lastVisited) {
-                long k = key(meta);
-
-                v.onKey(k + 4, keySize(k));
-
-                lastVisitedValue(meta, valPtr); // Set it to the first value in chain.
-
-                do {
-                    v.onValue(valPtr + 12, valueSize(valPtr));
-
-                    valPtr = nextValue(valPtr);
-                }
-                while (valPtr != lastVisited);
-            }
-        }
-
-        visitGuard.lazySet(false);
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Adder startAdding(HadoopTaskContext ctx) throws IgniteCheckedException {
-        return new AdderImpl(ctx);
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
-        Input in = new Input(taskCtx);
-
-        Comparator<Object> grpCmp = taskCtx.groupComparator();
-
-        if (grpCmp != null)
-            return new GroupedInput(grpCmp, in);
-
-        return in;
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @return Key pointer.
-     */
-    private long key(long meta) {
-        return mem.readLong(meta);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @param key Key pointer.
-     */
-    private void key(long meta, long key) {
-        mem.writeLong(meta, key);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @return Value pointer.
-     */
-    private long value(long meta) {
-        return mem.readLongVolatile(meta + 8);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @param valPtr Value pointer.
-     */
-    private void value(long meta, long valPtr) {
-        mem.writeLongVolatile(meta + 8, valPtr);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @param oldValPtr Old first value pointer.
-     * @param newValPtr New first value pointer.
-     * @return {@code true} If operation succeeded.
-     */
-    private boolean casValue(long meta, long oldValPtr, long newValPtr) {
-        return mem.casLong(meta + 8, oldValPtr, newValPtr);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @return Last visited value pointer.
-     */
-    private long lastVisitedValue(long meta) {
-        return mem.readLong(meta + 16);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @param valPtr Last visited value pointer.
-     */
-    private void lastVisitedValue(long meta, long valPtr) {
-        mem.writeLong(meta + 16, valPtr);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @param level Level.
-     * @return Next meta pointer.
-     */
-    private long nextMeta(long meta, int level) {
-        assert meta > 0 : meta;
-
-        return mem.readLongVolatile(meta + 24 + 8 * level);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @param level Level.
-     * @param oldNext Old next meta pointer.
-     * @param newNext New next meta pointer.
-     * @return {@code true} If operation succeeded.
-     */
-    private boolean casNextMeta(long meta, int level, long oldNext, long newNext) {
-        assert meta > 0 : meta;
-
-        return mem.casLong(meta + 24 + 8 * level, oldNext, newNext);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @param level Level.
-     * @param nextMeta Next meta.
-     */
-    private void nextMeta(long meta, int level, long nextMeta) {
-        assert meta != 0;
-
-        mem.writeLong(meta + 24 + 8 * level, nextMeta);
-    }
-
-    /**
-     * @param keyPtr Key pointer.
-     * @return Key size.
-     */
-    private int keySize(long keyPtr) {
-        return mem.readInt(keyPtr);
-    }
-
-    /**
-     * @param keyPtr Key pointer.
-     * @param keySize Key size.
-     */
-    private void keySize(long keyPtr, int keySize) {
-        mem.writeInt(keyPtr, keySize);
-    }
-
-    /**
-     * @param rnd Random.
-     * @return Next level.
-     */
-    public static int randomLevel(Random rnd) {
-        int x = rnd.nextInt();
-
-        int level = 0;
-
-        while ((x & 1) != 0) { // Count sequential 1 bits.
-            level++;
-
-            x >>>= 1;
-        }
-
-        return level;
-    }
-
-    /**
-     * Reader.
-     */
-    private class Reader extends ReaderBase {
-        /**
-         * @param ser Serialization.
-         */
-        protected Reader(HadoopSerialization ser) {
-            super(ser);
-        }
-
-        /**
-         * @param meta Meta pointer.
-         * @return Key.
-         */
-        public Object readKey(long meta) {
-            assert meta > 0 : meta;
-
-            long k = key(meta);
-
-            try {
-                return read(k + 4, keySize(k));
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-        }
-    }
-
-    /**
-     * Adder.
-     */
-    private class AdderImpl extends AdderBase {
-        /** */
-        private final Comparator<Object> cmp;
-
-        /** */
-        private final Random rnd = new GridRandom();
-
-        /** */
-        private final GridLongList stack = new GridLongList(16);
-
-        /** */
-        private final Reader keyReader;
-
-        /**
-         * @param ctx Task context.
-         * @throws IgniteCheckedException If failed.
-         */
-        protected AdderImpl(HadoopTaskContext ctx) throws IgniteCheckedException {
-            super(ctx);
-
-            keyReader = new Reader(keySer);
-
-            cmp = ctx.sortComparator();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void write(Object key, Object val) throws IgniteCheckedException {
-            A.notNull(val, "val");
-
-            add(key, val);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException {
-            KeyImpl k = reuse == null ? new KeyImpl() : (KeyImpl)reuse;
-
-            k.tmpKey = keySer.read(in, k.tmpKey);
-
-            k.meta = add(k.tmpKey, null);
-
-            return k;
-        }
-
-        /**
-         * @param key Key.
-         * @param val Value.
-         * @param level Level.
-         * @return Meta pointer.
-         */
-        private long createMeta(long key, long val, int level) {
-            int size = 32 + 8 * level;
-
-            long meta = allocate(size);
-
-            key(meta, key);
-            value(meta, val);
-            lastVisitedValue(meta, 0L);
-
-            for (int i = 32; i < size; i += 8) // Fill with 0.
-                mem.writeLong(meta + i, 0L);
-
-            return meta;
-        }
-
-        /**
-         * @param key Key.
-         * @return Pointer.
-         * @throws IgniteCheckedException If failed.
-         */
-        private long writeKey(Object key) throws IgniteCheckedException {
-            long keyPtr = write(4, key, keySer);
-            int keySize = writtenSize() - 4;
-
-            keySize(keyPtr, keySize);
-
-            return keyPtr;
-        }
-
-        /**
-         * @param prevMeta Previous meta.
-         * @param meta Next meta.
-         */
-        private void stackPush(long prevMeta, long meta) {
-            stack.add(prevMeta);
-            stack.add(meta);
-        }
-
-        /**
-         * Drops last remembered frame from the stack.
-         */
-        private void stackPop() {
-            stack.pop(2);
-        }
-
-        /**
-         * @param key Key.
-         * @param val Value.
-         * @return Meta pointer.
-         * @throws IgniteCheckedException If failed.
-         */
-        private long add(Object key, @Nullable Object val) throws IgniteCheckedException {
-            assert key != null;
-
-            stack.clear();
-
-            long valPtr = 0;
-
-            if (val != null) { // Write value.
-                valPtr = write(12, val, valSer);
-                int valSize = writtenSize() - 12;
-
-                nextValue(valPtr, 0);
-                valueSize(valPtr, valSize);
-            }
-
-            long keyPtr = 0;
-            long newMeta = 0;
-            int newMetaLevel = -1;
-
-            long prevMeta = heads;
-            int level = topLevel.get();
-            long meta = level < 0 ? 0 : nextMeta(heads, level);
-
-            for (;;) {
-                if (level < 0) { // We did not find our key, trying to add new meta.
-                    if (keyPtr == 0) { // Write key and create meta only once.
-                        keyPtr = writeKey(key);
-
-                        newMetaLevel = randomLevel(rnd);
-                        newMeta = createMeta(keyPtr, valPtr, newMetaLevel);
-                    }
-
-                    nextMeta(newMeta, 0, meta); // Set next to new meta before publishing.
-
-                    if (casNextMeta(prevMeta, 0, meta, newMeta)) { // New key was added successfully.
-                        laceUp(key, newMeta, newMetaLevel);
-
-                        return newMeta;
-                    }
-                    else { // Add failed, need to check out what was added by another thread.
-                        meta = nextMeta(prevMeta, level = 0);
-
-                        stackPop();
-                    }
-                }
-
-                int cmpRes = cmp(key, meta);
-
-                if (cmpRes == 0) { // Key found.
-                    if (newMeta != 0)  // Deallocate if we've allocated something.
-                        localDeallocate(keyPtr);
-
-                    if (valPtr == 0) // Only key needs to be added.
-                        return meta;
-
-                    for (;;) { // Add value for the key found.
-                        long nextVal = value(meta);
-
-                        nextValue(valPtr, nextVal);
-
-                        if (casValue(meta, nextVal, valPtr))
-                            return meta;
-                    }
-                }
-
-                assert cmpRes != 0;
-
-                if (cmpRes > 0) { // Go right.
-                    prevMeta = meta;
-                    meta = nextMeta(meta, level);
-
-                    if (meta != 0) // If nothing to the right then go down.
-                        continue;
-                }
-
-                while (--level >= 0) { // Go down.
-                    stackPush(prevMeta, meta); // Remember the path.
-
-                    long nextMeta = nextMeta(prevMeta, level);
-
-                    if (nextMeta != meta) { // If the meta is the same as on upper level go deeper.
-                        meta = nextMeta;
-
-                        assert meta != 0;
-
-                        break;
-                    }
-                }
-            }
-        }
-
-        /**
-         * @param key Key.
-         * @param meta Meta pointer.
-         * @return Comparison result.
-         */
-        @SuppressWarnings("unchecked")
-        private int cmp(Object key, long meta) {
-            assert meta != 0;
-
-            return cmp.compare(key, keyReader.readKey(meta));
-        }
-
-        /**
-         * Adds appropriate index links between metas.
-         *
-         * @param newMeta Just added meta.
-         * @param newMetaLevel New level.
-         */
-        private void laceUp(Object key, long newMeta, int newMetaLevel) {
-            for (int level = 1; level <= newMetaLevel; level++) { // Go from the bottom up.
-                long prevMeta = heads;
-                long meta = 0;
-
-                if (!stack.isEmpty()) { // Get the path back.
-                    meta = stack.remove();
-                    prevMeta = stack.remove();
-                }
-
-                for (;;) {
-                    nextMeta(newMeta, level, meta);
-
-                    if (casNextMeta(prevMeta, level, meta, newMeta))
-                        break;
-
-                    long oldMeta = meta;
-
-                    meta = nextMeta(prevMeta, level); // Reread meta.
-
-                    for (;;) {
-                        int cmpRes = cmp(key, meta);
-
-                        if (cmpRes > 0) { // Go right.
-                            prevMeta = meta;
-                            meta = nextMeta(prevMeta, level);
-
-                            if (meta != oldMeta) // Old meta already known to be greater than ours or is 0.
-                                continue;
-                        }
-
-                        assert cmpRes != 0; // Two different metas with equal keys must be impossible.
-
-                        break; // Retry cas.
-                    }
-                }
-            }
-
-            if (!stack.isEmpty())
-                return; // Our level already lower than top.
-
-            for (;;) { // Raise top level.
-                int top = topLevel.get();
-
-                if (newMetaLevel <= top || topLevel.compareAndSet(top, newMetaLevel))
-                    break;
-            }
-        }
-
-        /**
-         * Key.
-         */
-        private class KeyImpl implements Key {
-            /** */
-            private long meta;
-
-            /** */
-            private Object tmpKey;
-
-            /**
-             * @return Meta pointer for the key.
-             */
-            public long address() {
-                return meta;
-            }
-
-            /**
-             * @param val Value.
-             */
-            @Override public void add(Value val) {
-                int size = val.size();
-
-                long valPtr = allocate(size + 12);
-
-                val.copyTo(valPtr + 12);
-
-                valueSize(valPtr, size);
-
-                long nextVal;
-
-                do {
-                    nextVal = value(meta);
-
-                    nextValue(valPtr, nextVal);
-                }
-                while(!casValue(meta, nextVal, valPtr));
-            }
-        }
-    }
-
-    /**
-     * Task input.
-     */
-    private class Input implements HadoopTaskInput {
-        /** */
-        private long metaPtr = heads;
-
-        /** */
-        private final Reader keyReader;
-
-        /** */
-        private final Reader valReader;
-
-        /**
-         * @param taskCtx Task context.
-         * @throws IgniteCheckedException If failed.
-         */
-        private Input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
-            keyReader = new Reader(taskCtx.keySerialization());
-            valReader = new Reader(taskCtx.valueSerialization());
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean next() {
-            metaPtr = nextMeta(metaPtr, 0);
-
-            return metaPtr != 0;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Object key() {
-            return keyReader.readKey(metaPtr);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Iterator<?> values() {
-            return new ValueIterator(value(metaPtr), valReader);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close() throws IgniteCheckedException {
-            keyReader.close();
-            valReader.close();
-        }
-    }
-
-    /**
-     * Grouped input using grouping comparator.
-     */
-    private class GroupedInput implements HadoopTaskInput {
-        /** */
-        private final Comparator<Object> grpCmp;
-
-        /** */
-        private final Input in;
-
-        /** */
-        private Object prevKey;
-
-        /** */
-        private Object nextKey;
-
-        /** */
-        private final GridLongList vals = new GridLongList();
-
-        /**
-         * @param grpCmp Grouping comparator.
-         * @param in Input.
-         */
-        private GroupedInput(Comparator<Object> grpCmp, Input in) {
-            this.grpCmp = grpCmp;
-            this.in = in;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean next() {
-            if (prevKey == null) { // First call.
-                if (!in.next())
-                    return false;
-
-                prevKey = in.key();
-
-                assert prevKey != null;
-
-                in.keyReader.resetReusedObject(null); // We need 2 instances of key object for comparison.
-
-                vals.add(value(in.metaPtr));
-            }
-            else {
-                if (in.metaPtr == 0) // We reached the end of the input.
-                    return false;
-
-                vals.clear();
-
-                vals.add(value(in.metaPtr));
-
-                in.keyReader.resetReusedObject(prevKey); // Switch key instances.
-
-                prevKey = nextKey;
-            }
-
-            while (in.next()) { // Fill with head value pointers with equal keys.
-                if (grpCmp.compare(prevKey, nextKey = in.key()) == 0)
-                    vals.add(value(in.metaPtr));
-                else
-                    break;
-            }
-
-            assert !vals.isEmpty();
-
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Object key() {
-            return prevKey;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Iterator<?> values() {
-            assert !vals.isEmpty();
-
-            final ValueIterator valIter = new ValueIterator(vals.get(0), in.valReader);
-
-            return new Iterator<Object>() {
-                /** */
-                private int idx;
-
-                @Override public boolean hasNext() {
-                    if (!valIter.hasNext()) {
-                        if (++idx == vals.size())
-                            return false;
-
-                        valIter.head(vals.get(idx));
-
-                        assert valIter.hasNext();
-                    }
-
-                    return true;
-                }
-
-                @Override public Object next() {
-                    return valIter.next();
-                }
-
-                @Override public void remove() {
-                    valIter.remove();
-                }
-            };
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close() throws IgniteCheckedException {
-            in.close();
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataInStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataInStream.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataInStream.java
deleted file mode 100644
index 04aba2d..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataInStream.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.shuffle.streams;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
-
-/**
- * Data input stream.
- */
-public class HadoopDataInStream extends InputStream implements DataInput {
-    /** */
-    private final HadoopOffheapBuffer buf = new HadoopOffheapBuffer(0, 0);
-
-    /** */
-    private final GridUnsafeMemory mem;
-
-    /**
-     * @param mem Memory.
-     */
-    public HadoopDataInStream(GridUnsafeMemory mem) {
-        assert mem != null;
-
-        this.mem = mem;
-    }
-
-    /**
-     * @return Buffer.
-     */
-    public HadoopOffheapBuffer buffer() {
-        return buf;
-    }
-
-    /**
-     * @param size Size.
-     * @return Old pointer.
-     */
-    protected long move(long size) throws IOException {
-        long ptr = buf.move(size);
-
-        assert ptr != 0;
-
-        return ptr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int read() throws IOException {
-        return readUnsignedByte();
-    }
-
-    /** {@inheritDoc} */
-    @Override public int read(byte[] b, int off, int len) throws IOException {
-        readFully(b, off, len);
-
-        return len;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long skip(long n) throws IOException {
-        move(n);
-
-        return n;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readFully(byte[] b) throws IOException {
-        readFully(b, 0, b.length);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readFully(byte[] b, int off, int len) throws IOException {
-        mem.readBytes(move(len), b, off, len);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int skipBytes(int n) throws IOException {
-        move(n);
-
-        return n;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readBoolean() throws IOException {
-        byte res = readByte();
-
-        if (res == 1)
-            return true;
-
-        assert res == 0 : res;
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte readByte() throws IOException {
-        return mem.readByte(move(1));
-    }
-
-    /** {@inheritDoc} */
-    @Override public int readUnsignedByte() throws IOException {
-        return readByte() & 0xff;
-    }
-
-    /** {@inheritDoc} */
-    @Override public short readShort() throws IOException {
-        return mem.readShort(move(2));
-    }
-
-    /** {@inheritDoc} */
-    @Override public int readUnsignedShort() throws IOException {
-        return readShort() & 0xffff;
-    }
-
-    /** {@inheritDoc} */
-    @Override public char readChar() throws IOException {
-        return (char)readShort();
-    }
-
-    /** {@inheritDoc} */
-    @Override public int readInt() throws IOException {
-        return mem.readInt(move(4));
-    }
-
-    /** {@inheritDoc} */
-    @Override public long readLong() throws IOException {
-        return mem.readLong(move(8));
-    }
-
-    /** {@inheritDoc} */
-    @Override public float readFloat() throws IOException {
-        return mem.readFloat(move(4));
-    }
-
-    /** {@inheritDoc} */
-    @Override public double readDouble() throws IOException {
-        return mem.readDouble(move(8));
-    }
-
-    /** {@inheritDoc} */
-    @Override public String readLine() throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String readUTF() throws IOException {
-        byte[] bytes = new byte[readInt()];
-
-        if (bytes.length != 0)
-            readFully(bytes);
-
-        return new String(bytes, StandardCharsets.UTF_8);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataOutStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataOutStream.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataOutStream.java
deleted file mode 100644
index 2714517..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataOutStream.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.shuffle.streams;
-
-import java.io.DataOutput;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-import org.apache.ignite.internal.util.GridUnsafe;
-import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
-
-/**
- * Data output stream.
- */
-public class HadoopDataOutStream extends OutputStream implements DataOutput {
-    /** */
-    private final HadoopOffheapBuffer buf = new HadoopOffheapBuffer(0, 0);
-
-    /** */
-    private final GridUnsafeMemory mem;
-
-    /**
-     * @param mem Memory.
-     */
-    public HadoopDataOutStream(GridUnsafeMemory mem) {
-        this.mem = mem;
-    }
-
-    /**
-     * @return Buffer.
-     */
-    public HadoopOffheapBuffer buffer() {
-        return buf;
-    }
-
-    /**
-     * @param size Size.
-     * @return Old pointer or {@code 0} if move was impossible.
-     */
-    public long move(long size) {
-        return buf.move(size);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void write(int b) {
-        writeByte(b);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void write(byte[] b) {
-        write(b, 0, b.length);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void write(byte[] b, int off, int len) {
-        GridUnsafe.copyMemory(b, GridUnsafe.BYTE_ARR_OFF + off, null, move(len), len);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeBoolean(boolean v) {
-        writeByte(v ? 1 : 0);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeByte(int v) {
-        mem.writeByte(move(1), (byte)v);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeShort(int v) {
-        mem.writeShort(move(2), (short)v);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeChar(int v) {
-        writeShort(v);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeInt(int v) {
-        mem.writeInt(move(4), v);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeLong(long v) {
-        mem.writeLong(move(8), v);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeFloat(float v) {
-        mem.writeFloat(move(4), v);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeDouble(double v) {
-        mem.writeDouble(move(8), v);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeBytes(String s) {
-        writeUTF(s);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeChars(String s) {
-        writeUTF(s);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeUTF(String s) {
-        byte[] b = s.getBytes(StandardCharsets.UTF_8);
-
-        writeInt(b.length);
-        write(b);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopOffheapBuffer.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopOffheapBuffer.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopOffheapBuffer.java
deleted file mode 100644
index 273d6c2..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopOffheapBuffer.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.shuffle.streams;
-
-/**
- * Offheap buffer.
- */
-public class HadoopOffheapBuffer {
-    /** Buffer begin address. */
-    private long bufPtr;
-
-    /** The first address we do not own. */
-    private long bufEnd;
-
-    /** Current read or write pointer. */
-    private long posPtr;
-
-    /**
-     * @param bufPtr Pointer to buffer begin.
-     * @param bufSize Size of the buffer.
-     */
-    public HadoopOffheapBuffer(long bufPtr, long bufSize) {
-        set(bufPtr, bufSize);
-    }
-
-    /**
-     * @param bufPtr Pointer to buffer begin.
-     * @param bufSize Size of the buffer.
-     */
-    public void set(long bufPtr, long bufSize) {
-        this.bufPtr = bufPtr;
-
-        posPtr = bufPtr;
-        bufEnd = bufPtr + bufSize;
-    }
-
-    /**
-     * @return Pointer to internal buffer begin.
-     */
-    public long begin() {
-        return bufPtr;
-    }
-
-    /**
-     * @return Buffer capacity.
-     */
-    public long capacity() {
-        return bufEnd - bufPtr;
-    }
-
-    /**
-     * @return Remaining capacity.
-     */
-    public long remaining() {
-        return bufEnd - posPtr;
-    }
-
-    /**
-     * @return Absolute pointer to the current position inside of the buffer.
-     */
-    public long pointer() {
-        return posPtr;
-    }
-
-    /**
-     * @param ptr Absolute pointer to the current position inside of the buffer.
-     */
-    public void pointer(long ptr) {
-        assert ptr >= bufPtr : bufPtr + " <= " + ptr;
-        assert ptr <= bufEnd : bufEnd + " <= " + bufPtr;
-
-        posPtr = ptr;
-    }
-
-    /**
-     * @param size Size move on.
-     * @return Old position pointer or {@code 0} if move goes beyond the end of the buffer.
-     */
-    public long move(long size) {
-        assert size > 0 : size;
-
-        long oldPos = posPtr;
-        long newPos = oldPos + size;
-
-        if (newPos > bufEnd)
-            return 0;
-
-        posPtr = newPos;
-
-        return oldPos;
-    }
-
-    /**
-     * @param ptr Pointer.
-     * @return {@code true} If the given pointer is inside of this buffer.
-     */
-    public boolean isInside(long ptr) {
-        return ptr >= bufPtr && ptr <= bufEnd;
-    }
-
-    /**
-     * Resets position to the beginning of buffer.
-     */
-    public void reset() {
-        posPtr = bufPtr;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
index c82bbd4..65ff280 100644
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
@@ -30,7 +30,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
 import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
 import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.impl.HadoopTaskCancelledException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
 import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopV2TaskContext;

http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
index 9e34593..92c024e 100644
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.impl.HadoopTaskCancelledException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;

http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Task.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Task.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Task.java
index 3b97236..648b4f9 100644
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Task.java
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Task.java
@@ -22,7 +22,7 @@ import java.text.NumberFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.ignite.internal.processors.hadoop.HadoopTask;
-import org.apache.ignite.internal.processors.hadoop.impl.HadoopTaskCancelledException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
 import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopV2TaskContext;
 import org.jetbrains.annotations.Nullable;

http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
index c469a2c..896e1b1 100644
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
 import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.impl.HadoopTaskCancelledException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;

http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
index 525b2a5..4c030b4 100644
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
@@ -54,7 +54,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
 import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
 import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
 import org.apache.ignite.internal.processors.hadoop.HadoopTask;
-import org.apache.ignite.internal.processors.hadoop.impl.HadoopTaskCancelledException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;

http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
new file mode 100644
index 0000000..769bdc4
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle;
+
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.hadoop.HadoopComponent;
+import org.apache.ignite.internal.processors.hadoop.HadoopContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
+import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+
+/**
+ * Shuffle.
+ */
+public class HadoopShuffle extends HadoopComponent {
+    /** */
+    private final ConcurrentMap<HadoopJobId, HadoopShuffleJob<UUID>> jobs = new ConcurrentHashMap<>();
+
+    /** */
+    protected final GridUnsafeMemory mem = new GridUnsafeMemory(0);
+
+    /** {@inheritDoc} */
+    @Override public void start(HadoopContext ctx) throws IgniteCheckedException {
+        super.start(ctx);
+
+        ctx.kernalContext().io().addUserMessageListener(GridTopic.TOPIC_HADOOP,
+            new IgniteBiPredicate<UUID, Object>() {
+                @Override public boolean apply(UUID nodeId, Object msg) {
+                    return onMessageReceived(nodeId, (HadoopMessage)msg);
+                }
+            });
+    }
+
+    /**
+     * Stops shuffle.
+     *
+     * @param cancel If should cancel all ongoing activities.
+     */
+    @Override public void stop(boolean cancel) {
+        for (HadoopShuffleJob job : jobs.values()) {
+            try {
+                job.close();
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to close job.", e);
+            }
+        }
+
+        jobs.clear();
+    }
+
+    /**
+     * Creates new shuffle job.
+     *
+     * @param jobId Job ID.
+     * @return Created shuffle job.
+     * @throws IgniteCheckedException If job creation failed.
+     */
+    private HadoopShuffleJob<UUID> newJob(HadoopJobId jobId) throws IgniteCheckedException {
+        HadoopMapReducePlan plan = ctx.jobTracker().plan(jobId);
+
+        HadoopShuffleJob<UUID> job = new HadoopShuffleJob<>(ctx.localNodeId(), log,
+            ctx.jobTracker().job(jobId, null), mem, plan.reducers(), plan.reducers(ctx.localNodeId()));
+
+        UUID[] rdcAddrs = new UUID[plan.reducers()];
+
+        for (int i = 0; i < rdcAddrs.length; i++) {
+            UUID nodeId = plan.nodeForReducer(i);
+
+            assert nodeId != null : "Plan is missing node for reducer [plan=" + plan + ", rdc=" + i + ']';
+
+            rdcAddrs[i] = nodeId;
+        }
+
+        boolean init = job.initializeReduceAddresses(rdcAddrs);
+
+        assert init;
+
+        return job;
+    }
+
+    /**
+     * @param nodeId Node ID to send message to.
+     * @param msg Message to send.
+     * @throws IgniteCheckedException If send failed.
+     */
+    private void send0(UUID nodeId, Object msg) throws IgniteCheckedException {
+        ClusterNode node = ctx.kernalContext().discovery().node(nodeId);
+
+        ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0);
+    }
+
+    /**
+     * @param jobId Task info.
+     * @return Shuffle job.
+     */
+    private HadoopShuffleJob<UUID> job(HadoopJobId jobId) throws IgniteCheckedException {
+        HadoopShuffleJob<UUID> res = jobs.get(jobId);
+
+        if (res == null) {
+            res = newJob(jobId);
+
+            HadoopShuffleJob<UUID> old = jobs.putIfAbsent(jobId, res);
+
+            if (old != null) {
+                res.close();
+
+                res = old;
+            }
+            else if (res.reducersInitialized())
+                startSending(res);
+        }
+
+        return res;
+    }
+
+    /**
+     * Starts message sending thread.
+     *
+     * @param shuffleJob Job to start sending for.
+     */
+    private void startSending(HadoopShuffleJob<UUID> shuffleJob) {
+        shuffleJob.startSending(ctx.kernalContext().gridName(),
+            new IgniteInClosure2X<UUID, HadoopShuffleMessage>() {
+                @Override public void applyx(UUID dest, HadoopShuffleMessage msg) throws IgniteCheckedException {
+                    send0(dest, msg);
+                }
+            }
+        );
+    }
+
+    /**
+     * Message received callback.
+     *
+     * @param src Sender node ID.
+     * @param msg Received message.
+     * @return {@code True}.
+     */
+    public boolean onMessageReceived(UUID src, HadoopMessage msg) {
+        if (msg instanceof HadoopShuffleMessage) {
+            HadoopShuffleMessage m = (HadoopShuffleMessage)msg;
+
+            try {
+                job(m.jobId()).onShuffleMessage(m);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Message handling failed.", e);
+            }
+
+            try {
+                // Reply with ack.
+                send0(src, new HadoopShuffleAck(m.id(), m.jobId()));
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to reply back to shuffle message sender [snd=" + src + ", msg=" + msg + ']', e);
+            }
+        }
+        else if (msg instanceof HadoopShuffleAck) {
+            HadoopShuffleAck m = (HadoopShuffleAck)msg;
+
+            try {
+                job(m.jobId()).onShuffleAck(m);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Message handling failed.", e);
+            }
+        }
+        else
+            throw new IllegalStateException("Unknown message type received to Hadoop shuffle [src=" + src +
+                ", msg=" + msg + ']');
+
+        return true;
+    }
+
+    /**
+     * @param taskCtx Task info.
+     * @return Output.
+     */
+    public HadoopTaskOutput output(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+        return job(taskCtx.taskInfo().jobId()).output(taskCtx);
+    }
+
+    /**
+     * @param taskCtx Task info.
+     * @return Input.
+     */
+    public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+        return job(taskCtx.taskInfo().jobId()).input(taskCtx);
+    }
+
+    /**
+     * @param jobId Job id.
+     */
+    public void jobFinished(HadoopJobId jobId) {
+        HadoopShuffleJob job = jobs.remove(jobId);
+
+        if (job != null) {
+            try {
+                job.close();
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to close job: " + jobId, e);
+            }
+        }
+    }
+
+    /**
+     * Flushes all the outputs for the given job to remote nodes.
+     *
+     * @param jobId Job ID.
+     * @return Future.
+     */
+    public IgniteInternalFuture<?> flush(HadoopJobId jobId) {
+        HadoopShuffleJob job = jobs.get(jobId);
+
+        if (job == null)
+            return new GridFinishedFuture<>();
+
+        try {
+            return job.flush();
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture<>(e);
+        }
+    }
+
+    /**
+     * @return Memory.
+     */
+    public GridUnsafeMemory memory() {
+        return mem;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java
new file mode 100644
index 0000000..6013ec6
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Acknowledgement message.
+ */
+public class HadoopShuffleAck implements HadoopMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    @GridToStringInclude
+    private long msgId;
+
+    /** */
+    @GridToStringInclude
+    private HadoopJobId jobId;
+
+    /**
+     *
+     */
+    public HadoopShuffleAck() {
+        // No-op.
+    }
+
+    /**
+     * @param msgId Message ID.
+     */
+    public HadoopShuffleAck(long msgId, HadoopJobId jobId) {
+        assert jobId != null;
+
+        this.msgId = msgId;
+        this.jobId = jobId;
+    }
+
+    /**
+     * @return Message ID.
+     */
+    public long id() {
+        return msgId;
+    }
+
+    /**
+     * @return Job ID.
+     */
+    public HadoopJobId jobId() {
+        return jobId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        jobId.writeExternal(out);
+        out.writeLong(msgId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        jobId = new HadoopJobId();
+
+        jobId.readExternal(in);
+        msgId = in.readLong();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopShuffleAck.class, this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
new file mode 100644
index 0000000..b940c72
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter;
+import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopConcurrentHashMultimap;
+import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopMultimap;
+import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopSkipList;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.io.GridUnsafeDataInput;
+import org.apache.ignite.internal.util.lang.GridClosureException;
+import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
+import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.thread.IgniteThread;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.PARTITION_HASHMAP_SIZE;
+import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_REDUCER_NO_SORTING;
+import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get;
+
+/**
+ * Shuffle job.
+ */
+public class HadoopShuffleJob<T> implements AutoCloseable {
+    /** */
+    private static final int MSG_BUF_SIZE = 128 * 1024;
+
+    /** */
+    private final HadoopJob job;
+
+    /** */
+    private final GridUnsafeMemory mem;
+
+    /** */
+    private final boolean needPartitioner;
+
+    /** Collection of task contexts for each reduce task. */
+    private final Map<Integer, HadoopTaskContext> reducersCtx = new HashMap<>();
+
+    /** Reducers addresses. */
+    private T[] reduceAddrs;
+
+    /** Local reducers address. */
+    private final T locReduceAddr;
+
+    /** */
+    private final HadoopShuffleMessage[] msgs;
+
+    /** */
+    private final AtomicReferenceArray<HadoopMultimap> maps;
+
+    /** */
+    private volatile IgniteInClosure2X<T, HadoopShuffleMessage> io;
+
+    /** */
+    protected ConcurrentMap<Long, IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>>> sentMsgs =
+        new ConcurrentHashMap<>();
+
+    /** */
+    private volatile GridWorker snd;
+
+    /** Latch for remote addresses waiting. */
+    private final CountDownLatch ioInitLatch = new CountDownLatch(1);
+
+    /** Finished flag. Set on flush or close. */
+    private volatile boolean flushed;
+
+    /** */
+    private final IgniteLogger log;
+
+    /**
+     * @param locReduceAddr Local reducer address.
+     * @param log Logger.
+     * @param job Job.
+     * @param mem Memory.
+     * @param totalReducerCnt Amount of reducers in the Job.
+     * @param locReducers Reducers will work on current node.
+     * @throws IgniteCheckedException If error.
+     */
+    public HadoopShuffleJob(T locReduceAddr, IgniteLogger log, HadoopJob job, GridUnsafeMemory mem,
+        int totalReducerCnt, int[] locReducers) throws IgniteCheckedException {
+        this.locReduceAddr = locReduceAddr;
+        this.job = job;
+        this.mem = mem;
+        this.log = log.getLogger(HadoopShuffleJob.class);
+
+        if (!F.isEmpty(locReducers)) {
+            for (int rdc : locReducers) {
+                HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.REDUCE, job.id(), rdc, 0, null);
+
+                reducersCtx.put(rdc, job.getTaskContext(taskInfo));
+            }
+        }
+
+        needPartitioner = totalReducerCnt > 1;
+
+        maps = new AtomicReferenceArray<>(totalReducerCnt);
+        msgs = new HadoopShuffleMessage[totalReducerCnt];
+    }
+
+    /**
+     * @param reduceAddrs Addresses of reducers.
+     * @return {@code True} if addresses were initialized by this call.
+     */
+    public boolean initializeReduceAddresses(T[] reduceAddrs) {
+        if (this.reduceAddrs == null) {
+            this.reduceAddrs = reduceAddrs;
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * @return {@code True} if reducers addresses were initialized.
+     */
+    public boolean reducersInitialized() {
+        return reduceAddrs != null;
+    }
+
+    /**
+     * @param gridName Grid name.
+     * @param io IO Closure for sending messages.
+     */
+    @SuppressWarnings("BusyWait")
+    public void startSending(String gridName, IgniteInClosure2X<T, HadoopShuffleMessage> io) {
+        assert snd == null;
+        assert io != null;
+
+        this.io = io;
+
+        if (!flushed) {
+            snd = new GridWorker(gridName, "hadoop-shuffle-" + job.id(), log) {
+                @Override protected void body() throws InterruptedException {
+                    try {
+                        while (!isCancelled()) {
+                            Thread.sleep(5);
+
+                            collectUpdatesAndSend(false);
+                        }
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw new IllegalStateException(e);
+                    }
+                }
+            };
+
+            new IgniteThread(snd).start();
+        }
+
+        ioInitLatch.countDown();
+    }
+
+    /**
+     * @param maps Maps.
+     * @param idx Index.
+     * @return Map.
+     */
+    private HadoopMultimap getOrCreateMap(AtomicReferenceArray<HadoopMultimap> maps, int idx) {
+        HadoopMultimap map = maps.get(idx);
+
+        if (map == null) { // Create new map.
+            map = get(job.info(), SHUFFLE_REDUCER_NO_SORTING, false) ?
+                new HadoopConcurrentHashMultimap(job.info(), mem, get(job.info(), PARTITION_HASHMAP_SIZE, 8 * 1024)):
+                new HadoopSkipList(job.info(), mem);
+
+            if (!maps.compareAndSet(idx, null, map)) {
+                map.close();
+
+                return maps.get(idx);
+            }
+        }
+
+        return map;
+    }
+
+    /**
+     * @param msg Message.
+     * @throws IgniteCheckedException Exception.
+     */
+    public void onShuffleMessage(HadoopShuffleMessage msg) throws IgniteCheckedException {
+        assert msg.buffer() != null;
+        assert msg.offset() > 0;
+
+        HadoopTaskContext taskCtx = reducersCtx.get(msg.reducer());
+
+        HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(taskCtx.counters(), null);
+
+        perfCntr.onShuffleMessage(msg.reducer(), U.currentTimeMillis());
+
+        HadoopMultimap map = getOrCreateMap(maps, msg.reducer());
+
+        // Add data from message to the map.
+        try (HadoopMultimap.Adder adder = map.startAdding(taskCtx)) {
+            final GridUnsafeDataInput dataInput = new GridUnsafeDataInput();
+            final UnsafeValue val = new UnsafeValue(msg.buffer());
+
+            msg.visit(new HadoopShuffleMessage.Visitor() {
+                /** */
+                private HadoopMultimap.Key key;
+
+                @Override public void onKey(byte[] buf, int off, int len) throws IgniteCheckedException {
+                    dataInput.bytes(buf, off, off + len);
+
+                    key = adder.addKey(dataInput, key);
+                }
+
+                @Override public void onValue(byte[] buf, int off, int len) {
+                    val.off = off;
+                    val.size = len;
+
+                    key.add(val);
+                }
+            });
+        }
+    }
+
+    /**
+     * @param ack Shuffle ack.
+     */
+    @SuppressWarnings("ConstantConditions")
+    public void onShuffleAck(HadoopShuffleAck ack) {
+        IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>> tup = sentMsgs.get(ack.id());
+
+        if (tup != null)
+            tup.get2().onDone();
+        else
+            log.warning("Received shuffle ack for not registered shuffle id: " + ack);
+    }
+
+    /**
+     * Unsafe value.
+     */
+    private static class UnsafeValue implements HadoopMultimap.Value {
+        /** */
+        private final byte[] buf;
+
+        /** */
+        private int off;
+
+        /** */
+        private int size;
+
+        /**
+         * @param buf Buffer.
+         */
+        private UnsafeValue(byte[] buf) {
+            assert buf != null;
+
+            this.buf = buf;
+        }
+
+        /** */
+        @Override public int size() {
+            return size;
+        }
+
+        /** */
+        @Override public void copyTo(long ptr) {
+            GridUnsafe.copyMemory(buf, GridUnsafe.BYTE_ARR_OFF + off, null, ptr, size);
+        }
+    }
+
+    /**
+     * Sends map updates to remote reducers.
+     */
+    private void collectUpdatesAndSend(boolean flush) throws IgniteCheckedException {
+        for (int i = 0; i < maps.length(); i++) {
+            HadoopMultimap map = maps.get(i);
+
+            if (map == null || locReduceAddr.equals(reduceAddrs[i]))
+                continue; // Skip empty map and local node.
+
+            if (msgs[i] == null)
+                msgs[i] = new HadoopShuffleMessage(job.id(), i, MSG_BUF_SIZE);
+
+            final int idx = i;
+
+            map.visit(false, new HadoopMultimap.Visitor() {
+                /** */
+                private long keyPtr;
+
+                /** */
+                private int keySize;
+
+                /** */
+                private boolean keyAdded;
+
+                /** {@inheritDoc} */
+                @Override public void onKey(long keyPtr, int keySize) {
+                    this.keyPtr = keyPtr;
+                    this.keySize = keySize;
+
+                    keyAdded = false;
+                }
+
+                private boolean tryAdd(long valPtr, int valSize) {
+                    HadoopShuffleMessage msg = msgs[idx];
+
+                    if (!keyAdded) { // Add key and value.
+                        int size = keySize + valSize;
+
+                        if (!msg.available(size, false))
+                            return false;
+
+                        msg.addKey(keyPtr, keySize);
+                        msg.addValue(valPtr, valSize);
+
+                        keyAdded = true;
+
+                        return true;
+                    }
+
+                    if (!msg.available(valSize, true))
+                        return false;
+
+                    msg.addValue(valPtr, valSize);
+
+                    return true;
+                }
+
+                /** {@inheritDoc} */
+                @Override public void onValue(long valPtr, int valSize) {
+                    if (tryAdd(valPtr, valSize))
+                        return;
+
+                    send(idx, keySize + valSize);
+
+                    keyAdded = false;
+
+                    if (!tryAdd(valPtr, valSize))
+                        throw new IllegalStateException();
+                }
+            });
+
+            if (flush && msgs[i].offset() != 0)
+                send(i, 0);
+        }
+    }
+
+    /**
+     * @param idx Index of message.
+     * @param newBufMinSize Min new buffer size.
+     */
+    private void send(final int idx, int newBufMinSize) {
+        final GridFutureAdapter<?> fut = new GridFutureAdapter<>();
+
+        HadoopShuffleMessage msg = msgs[idx];
+
+        final long msgId = msg.id();
+
+        IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>> old = sentMsgs.putIfAbsent(msgId,
+            new IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>>(msg, fut));
+
+        assert old == null;
+
+        try {
+            io.apply(reduceAddrs[idx], msg);
+        }
+        catch (GridClosureException e) {
+            fut.onDone(U.unwrap(e));
+        }
+
+        fut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
+            @Override public void apply(IgniteInternalFuture<?> f) {
+                try {
+                    f.get();
+
+                    // Clean up the future from map only if there was no exception.
+                    // Otherwise flush() should fail.
+                    sentMsgs.remove(msgId);
+                }
+                catch (IgniteCheckedException e) {
+                    log.error("Failed to send message.", e);
+                }
+            }
+        });
+
+        msgs[idx] = newBufMinSize == 0 ? null : new HadoopShuffleMessage(job.id(), idx,
+            Math.max(MSG_BUF_SIZE, newBufMinSize));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws IgniteCheckedException {
+        if (snd != null) {
+            snd.cancel();
+
+            try {
+                snd.join();
+            }
+            catch (InterruptedException e) {
+                throw new IgniteInterruptedCheckedException(e);
+            }
+        }
+
+        close(maps);
+    }
+
+    /**
+     * @param maps Maps.
+     */
+    private void close(AtomicReferenceArray<HadoopMultimap> maps) {
+        for (int i = 0; i < maps.length(); i++) {
+            HadoopMultimap map = maps.get(i);
+
+            if (map != null)
+                map.close();
+        }
+    }
+
+    /**
+     * @return Future.
+     */
+    @SuppressWarnings("unchecked")
+    public IgniteInternalFuture<?> flush() throws IgniteCheckedException {
+        if (log.isDebugEnabled())
+            log.debug("Flushing job " + job.id() + " on address " + locReduceAddr);
+
+        flushed = true;
+
+        if (maps.length() == 0)
+            return new GridFinishedFuture<>();
+
+        U.await(ioInitLatch);
+
+        GridWorker snd0 = snd;
+
+        if (snd0 != null) {
+            if (log.isDebugEnabled())
+                log.debug("Cancelling sender thread.");
+
+            snd0.cancel();
+
+            try {
+                snd0.join();
+
+                if (log.isDebugEnabled())
+                    log.debug("Finished waiting for sending thread to complete on shuffle job flush: " + job.id());
+            }
+            catch (InterruptedException e) {
+                throw new IgniteInterruptedCheckedException(e);
+            }
+        }
+
+        collectUpdatesAndSend(true); // With flush.
+
+        if (log.isDebugEnabled())
+            log.debug("Finished sending collected updates to remote reducers: " + job.id());
+
+        GridCompoundFuture fut = new GridCompoundFuture<>();
+
+        for (IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>> tup : sentMsgs.values())
+            fut.add(tup.get2());
+
+        fut.markInitialized();
+
+        if (log.isDebugEnabled())
+            log.debug("Collected futures to compound futures for flush: " + sentMsgs.size());
+
+        return fut;
+    }
+
+    /**
+     * @param taskCtx Task context.
+     * @return Output.
+     * @throws IgniteCheckedException If failed.
+     */
+    public HadoopTaskOutput output(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+        switch (taskCtx.taskInfo().type()) {
+            case MAP:
+                assert !job.info().hasCombiner() : "The output creation is allowed if combiner has not been defined.";
+
+            case COMBINE:
+                return new PartitionedOutput(taskCtx);
+
+            default:
+                throw new IllegalStateException("Illegal type: " + taskCtx.taskInfo().type());
+        }
+    }
+
+    /**
+     * @param taskCtx Task context.
+     * @return Input.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("unchecked")
+    public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+        switch (taskCtx.taskInfo().type()) {
+            case REDUCE:
+                int reducer = taskCtx.taskInfo().taskNumber();
+
+                HadoopMultimap m = maps.get(reducer);
+
+                if (m != null)
+                    return m.input(taskCtx);
+
+                return new HadoopTaskInput() { // Empty input.
+                    @Override public boolean next() {
+                        return false;
+                    }
+
+                    @Override public Object key() {
+                        throw new IllegalStateException();
+                    }
+
+                    @Override public Iterator<?> values() {
+                        throw new IllegalStateException();
+                    }
+
+                    @Override public void close() {
+                        // No-op.
+                    }
+                };
+
+            default:
+                throw new IllegalStateException("Illegal type: " + taskCtx.taskInfo().type());
+        }
+    }
+
+    /**
+     * Partitioned output.
+     */
+    private class PartitionedOutput implements HadoopTaskOutput {
+        /** */
+        private final HadoopTaskOutput[] adders = new HadoopTaskOutput[maps.length()];
+
+        /** */
+        private HadoopPartitioner partitioner;
+
+        /** */
+        private final HadoopTaskContext taskCtx;
+
+        /**
+         * Constructor.
+         * @param taskCtx Task context.
+         */
+        private PartitionedOutput(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+            this.taskCtx = taskCtx;
+
+            if (needPartitioner)
+                partitioner = taskCtx.partitioner();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Object key, Object val) throws IgniteCheckedException {
+            int part = 0;
+
+            if (partitioner != null) {
+                part = partitioner.partition(key, val, adders.length);
+
+                if (part < 0 || part >= adders.length)
+                    throw new IgniteCheckedException("Invalid partition: " + part);
+            }
+
+            HadoopTaskOutput out = adders[part];
+
+            if (out == null)
+                adders[part] = out = getOrCreateMap(maps, part).startAdding(taskCtx);
+
+            out.write(key, val);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() throws IgniteCheckedException {
+            for (HadoopTaskOutput adder : adders) {
+                if (adder != null)
+                    adder.close();
+            }
+        }
+    }
+}
\ No newline at end of file