You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/03/03 14:08:27 UTC

[11/31] incubator-ignite git commit: # IGNITE-386: WIP on internal namings (2).

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopHashMultimapBase.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopHashMultimapBase.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopHashMultimapBase.java
deleted file mode 100644
index 92854f1..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopHashMultimapBase.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.offheap.unsafe.*;
-
-import java.util.*;
-
-/**
- * Base class for hash multimaps.
- */
-public abstract class GridHadoopHashMultimapBase extends GridHadoopMultimapBase {
-    /**
-     * @param jobInfo Job info.
-     * @param mem Memory.
-     */
-    protected GridHadoopHashMultimapBase(GridHadoopJobInfo jobInfo, GridUnsafeMemory mem) {
-        super(jobInfo, mem);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException {
-        throw new UnsupportedOperationException("visit");
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
-        return new Input(taskCtx);
-    }
-
-    /**
-     * @return Hash table capacity.
-     */
-    public abstract int capacity();
-
-    /**
-     * @param idx Index in hash table.
-     * @return Meta page pointer.
-     */
-    protected abstract long meta(int idx);
-
-    /**
-     * @param meta Meta pointer.
-     * @return Key hash.
-     */
-    protected int keyHash(long meta) {
-        return mem.readInt(meta);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @return Key size.
-     */
-    protected int keySize(long meta) {
-        return mem.readInt(meta + 4);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @return Key pointer.
-     */
-    protected long key(long meta) {
-        return mem.readLong(meta + 8);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @return Value pointer.
-     */
-    protected long value(long meta) {
-        return mem.readLong(meta + 16);
-    }
-    /**
-     * @param meta Meta pointer.
-     * @param val Value pointer.
-     */
-    protected void value(long meta, long val) {
-        mem.writeLong(meta + 16, val);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @return Collision pointer.
-     */
-    protected long collision(long meta) {
-        return mem.readLong(meta + 24);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @param collision Collision pointer.
-     */
-    protected void collision(long meta, long collision) {
-        assert meta != collision : meta;
-
-        mem.writeLong(meta + 24, collision);
-    }
-
-    /**
-     * Reader for key and value.
-     */
-    protected class Reader extends ReaderBase {
-        /**
-         * @param ser Serialization.
-         */
-        protected Reader(GridHadoopSerialization ser) {
-            super(ser);
-        }
-
-        /**
-         * @param meta Meta pointer.
-         * @return Key.
-         */
-        public Object readKey(long meta) {
-            assert meta > 0 : meta;
-
-            try {
-                return read(key(meta), keySize(meta));
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-        }
-    }
-
-    /**
-     * Task input.
-     */
-    protected class Input implements GridHadoopTaskInput {
-        /** */
-        private int idx = -1;
-
-        /** */
-        private long metaPtr;
-
-        /** */
-        private final int cap;
-
-        /** */
-        private final Reader keyReader;
-
-        /** */
-        private final Reader valReader;
-
-        /**
-         * @param taskCtx Task context.
-         * @throws IgniteCheckedException If failed.
-         */
-        public Input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
-            cap = capacity();
-
-            keyReader = new Reader(taskCtx.keySerialization());
-            valReader = new Reader(taskCtx.valueSerialization());
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean next() {
-            if (metaPtr != 0) {
-                metaPtr = collision(metaPtr);
-
-                if (metaPtr != 0)
-                    return true;
-            }
-
-            while (++idx < cap) { // Scan table.
-                metaPtr = meta(idx);
-
-                if (metaPtr != 0)
-                    return true;
-            }
-
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Object key() {
-            return keyReader.readKey(metaPtr);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Iterator<?> values() {
-            return new ValueIterator(value(metaPtr), valReader);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close() throws IgniteCheckedException {
-            keyReader.close();
-            valReader.close();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopMultimap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopMultimap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopMultimap.java
deleted file mode 100644
index b8eb12c..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopMultimap.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-
-/**
- * Multimap for hadoop intermediate results.
- */
-@SuppressWarnings("PublicInnerClass")
-public interface GridHadoopMultimap extends AutoCloseable {
-    /**
-     * Incrementally visits all the keys and values in the map.
-     *
-     * @param ignoreLastVisited Flag indicating that visiting must be started from the beginning.
-     * @param v Visitor.
-     * @return {@code false} If visiting was impossible.
-     */
-    public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException;
-
-    /**
-     * @param ctx Task context.
-     * @return Adder.
-     * @throws IgniteCheckedException If failed.
-     */
-    public Adder startAdding(GridHadoopTaskContext ctx) throws IgniteCheckedException;
-
-    /**
-     * @param taskCtx Task context.
-     * @return Task input.
-     * @throws IgniteCheckedException If failed.
-     */
-    public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx)
-        throws IgniteCheckedException;
-
-    /** {@inheritDoc} */
-    @Override public void close();
-
-    /**
-     * Adder.
-     */
-    public interface Adder extends GridHadoopTaskOutput {
-        /**
-         * @param in Data input.
-         * @param reuse Reusable key.
-         * @return Key.
-         * @throws IgniteCheckedException If failed.
-         */
-        public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException;
-    }
-
-    /**
-     * Key add values to.
-     */
-    public interface Key {
-        /**
-         * @param val Value.
-         */
-        public void add(Value val);
-    }
-
-    /**
-     * Value.
-     */
-    public interface Value {
-        /**
-         * @return Size in bytes.
-         */
-        public int size();
-
-        /**
-         * @param ptr Pointer.
-         */
-        public void copyTo(long ptr);
-    }
-
-    /**
-     * Key and values visitor.
-     */
-    public interface Visitor {
-        /**
-         * @param keyPtr Key pointer.
-         * @param keySize Key size.
-         */
-        public void onKey(long keyPtr, int keySize) throws IgniteCheckedException;
-
-        /**
-         * @param valPtr Value pointer.
-         * @param valSize Value size.
-         */
-        public void onValue(long valPtr, int valSize) throws IgniteCheckedException;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopMultimapBase.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopMultimapBase.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopMultimapBase.java
deleted file mode 100644
index 2d8660f..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopMultimapBase.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.shuffle.streams.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.offheap.unsafe.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobProperty.*;
-
-/**
- * Base class for all multimaps.
- */
-public abstract class GridHadoopMultimapBase implements GridHadoopMultimap {
-    /** */
-    protected final GridUnsafeMemory mem;
-
-    /** */
-    protected final int pageSize;
-
-    /** */
-    private final Collection<GridLongList> allPages = new ConcurrentLinkedQueue<>();
-
-    /**
-     * @param jobInfo Job info.
-     * @param mem Memory.
-     */
-    protected GridHadoopMultimapBase(GridHadoopJobInfo jobInfo, GridUnsafeMemory mem) {
-        assert jobInfo != null;
-        assert mem != null;
-
-        this.mem = mem;
-
-        pageSize = get(jobInfo, SHUFFLE_OFFHEAP_PAGE_SIZE, 32 * 1024);
-    }
-
-    /**
-     * @param ptrs Page pointers.
-     */
-    private void deallocate(GridLongList ptrs) {
-        while (!ptrs.isEmpty())
-            mem.release(ptrs.remove(), ptrs.remove());
-    }
-
-    /**
-     * @param valPtr Value page pointer.
-     * @param nextValPtr Next value page pointer.
-     */
-    protected void nextValue(long valPtr, long nextValPtr) {
-        mem.writeLong(valPtr, nextValPtr);
-    }
-
-    /**
-     * @param valPtr Value page pointer.
-     * @return Next value page pointer.
-     */
-    protected long nextValue(long valPtr) {
-        return mem.readLong(valPtr);
-    }
-
-    /**
-     * @param valPtr Value page pointer.
-     * @param size Size.
-     */
-    protected void valueSize(long valPtr, int size) {
-        mem.writeInt(valPtr + 8, size);
-    }
-
-    /**
-     * @param valPtr Value page pointer.
-     * @return Value size.
-     */
-    protected int valueSize(long valPtr) {
-        return mem.readInt(valPtr + 8);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close() {
-        for (GridLongList list : allPages)
-            deallocate(list);
-    }
-
-    /**
-     * Reader for key and value.
-     */
-    protected class ReaderBase implements AutoCloseable {
-        /** */
-        private Object tmp;
-
-        /** */
-        private final GridHadoopSerialization ser;
-
-        /** */
-        private final GridHadoopDataInStream in = new GridHadoopDataInStream(mem);
-
-        /**
-         * @param ser Serialization.
-         */
-        protected ReaderBase(GridHadoopSerialization ser) {
-            assert ser != null;
-
-            this.ser = ser;
-        }
-
-        /**
-         * @param valPtr Value page pointer.
-         * @return Value.
-         */
-        public Object readValue(long valPtr) {
-            assert valPtr > 0 : valPtr;
-
-            try {
-                return read(valPtr + 12, valueSize(valPtr));
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-        }
-
-        /**
-         * Resets temporary object to the given one.
-         *
-         * @param tmp Temporary object for reuse.
-         */
-        public void resetReusedObject(Object tmp) {
-            this.tmp = tmp;
-        }
-
-        /**
-         * @param ptr Pointer.
-         * @param size Object size.
-         * @return Object.
-         */
-        protected Object read(long ptr, long size) throws IgniteCheckedException {
-            in.buffer().set(ptr, size);
-
-            tmp = ser.read(in, tmp);
-
-            return tmp;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close() throws IgniteCheckedException {
-            ser.close();
-        }
-    }
-
-    /**
-     * Base class for adders.
-     */
-    protected abstract class AdderBase implements Adder {
-        /** */
-        protected final GridHadoopSerialization keySer;
-
-        /** */
-        protected final GridHadoopSerialization valSer;
-
-        /** */
-        private final GridHadoopDataOutStream out;
-
-        /** */
-        private long writeStart;
-
-        /** Size and pointer pairs list. */
-        private final GridLongList pages = new GridLongList(16);
-
-        /**
-         * @param ctx Task context.
-         * @throws IgniteCheckedException If failed.
-         */
-        protected AdderBase(GridHadoopTaskContext ctx) throws IgniteCheckedException {
-            valSer = ctx.valueSerialization();
-            keySer = ctx.keySerialization();
-
-            out = new GridHadoopDataOutStream(mem) {
-                @Override public long move(long size) {
-                    long ptr = super.move(size);
-
-                    if (ptr == 0) // Was not able to move - not enough free space.
-                        ptr = allocateNextPage(size);
-
-                    assert ptr != 0;
-
-                    return ptr;
-                }
-            };
-        }
-
-        /**
-         * @param requestedSize Requested size.
-         * @return Next write pointer.
-         */
-        private long allocateNextPage(long requestedSize) {
-            int writtenSize = writtenSize();
-
-            long newPageSize = Math.max(writtenSize + requestedSize, pageSize);
-            long newPagePtr = mem.allocate(newPageSize);
-
-            pages.add(newPageSize);
-            pages.add(newPagePtr);
-
-            GridHadoopOffheapBuffer b = out.buffer();
-
-            b.set(newPagePtr, newPageSize);
-
-            if (writtenSize != 0) {
-                mem.copyMemory(writeStart, newPagePtr, writtenSize);
-
-                b.move(writtenSize);
-            }
-
-            writeStart = newPagePtr;
-
-            return b.move(requestedSize);
-        }
-
-        /**
-         * @return Fixed pointer.
-         */
-        private long fixAlignment() {
-            GridHadoopOffheapBuffer b = out.buffer();
-
-            long ptr = b.pointer();
-
-            if ((ptr & 7L) != 0) { // Address is not aligned by octet.
-                ptr = (ptr + 8L) & ~7L;
-
-                b.pointer(ptr);
-            }
-
-            return ptr;
-        }
-
-        /**
-         * @param off Offset.
-         * @param o Object.
-         * @return Page pointer.
-         * @throws IgniteCheckedException If failed.
-         */
-        protected long write(int off, Object o, GridHadoopSerialization ser) throws IgniteCheckedException {
-            writeStart = fixAlignment();
-
-            if (off != 0)
-                out.move(off);
-
-            ser.write(out, o);
-
-            return writeStart;
-        }
-
-        /**
-         * @param size Size.
-         * @return Pointer.
-         */
-        protected long allocate(int size) {
-            writeStart = fixAlignment();
-
-            out.move(size);
-
-            return writeStart;
-        }
-
-        /**
-         * Rewinds local allocation pointer to the given pointer if possible.
-         *
-         * @param ptr Pointer.
-         */
-        protected void localDeallocate(long ptr) {
-            GridHadoopOffheapBuffer b = out.buffer();
-
-            if (b.isInside(ptr))
-                b.pointer(ptr);
-            else
-                b.reset();
-        }
-
-        /**
-         * @return Written size.
-         */
-        protected int writtenSize() {
-            return (int)(out.buffer().pointer() - writeStart);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException {
-            throw new UnsupportedOperationException();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close() throws IgniteCheckedException {
-            allPages.add(pages);
-
-            keySer.close();
-            valSer.close();
-        }
-    }
-
-    /**
-     * Iterator over values.
-     */
-    protected class ValueIterator implements Iterator<Object> {
-        /** */
-        private long valPtr;
-
-        /** */
-        private final ReaderBase valReader;
-
-        /**
-         * @param valPtr Value page pointer.
-         * @param valReader Value reader.
-         */
-        protected ValueIterator(long valPtr, ReaderBase valReader) {
-            this.valPtr = valPtr;
-            this.valReader = valReader;
-        }
-
-        /**
-         * @param valPtr Head value pointer.
-         */
-        public void head(long valPtr) {
-            this.valPtr = valPtr;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean hasNext() {
-            return valPtr != 0;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Object next() {
-            if (!hasNext())
-                throw new NoSuchElementException();
-
-            Object res = valReader.readValue(valPtr);
-
-            valPtr = nextValue(valPtr);
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void remove() {
-            throw new UnsupportedOperationException();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipList.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipList.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipList.java
deleted file mode 100644
index a2c626c..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipList.java
+++ /dev/null
@@ -1,726 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.offheap.unsafe.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Skip list.
- */
-public class GridHadoopSkipList extends GridHadoopMultimapBase {
-    /** */
-    private static final int HEADS_SIZE = 24 + 33 * 8; // Offset + max level is from 0 to 32 inclusive.
-
-    /** Top level. */
-    private final AtomicInteger topLevel = new AtomicInteger(-1);
-
-    /** Heads for all the lists. */
-    private final long heads;
-
-    /** */
-    private final AtomicBoolean visitGuard = new AtomicBoolean();
-
-    /**
-     * @param jobInfo Job info.
-     * @param mem Memory.
-     */
-    public GridHadoopSkipList(GridHadoopJobInfo jobInfo, GridUnsafeMemory mem) {
-        super(jobInfo, mem);
-
-        heads = mem.allocate(HEADS_SIZE, true);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close() {
-        super.close();
-
-        mem.release(heads, HEADS_SIZE);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException {
-        if (!visitGuard.compareAndSet(false, true))
-            return false;
-
-        for (long meta = nextMeta(heads, 0); meta != 0L; meta = nextMeta(meta, 0)) {
-            long valPtr = value(meta);
-
-            long lastVisited = ignoreLastVisited ? 0 : lastVisitedValue(meta);
-
-            if (valPtr != lastVisited) {
-                long k = key(meta);
-
-                v.onKey(k + 4, keySize(k));
-
-                lastVisitedValue(meta, valPtr); // Set it to the first value in chain.
-
-                do {
-                    v.onValue(valPtr + 12, valueSize(valPtr));
-
-                    valPtr = nextValue(valPtr);
-                }
-                while (valPtr != lastVisited);
-            }
-        }
-
-        visitGuard.lazySet(false);
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Adder startAdding(GridHadoopTaskContext ctx) throws IgniteCheckedException {
-        return new AdderImpl(ctx);
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
-        Input in = new Input(taskCtx);
-
-        Comparator<Object> grpCmp = taskCtx.groupComparator();
-
-        if (grpCmp != null)
-            return new GroupedInput(grpCmp, in);
-
-        return in;
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @return Key pointer.
-     */
-    private long key(long meta) {
-        return mem.readLong(meta);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @param key Key pointer.
-     */
-    private void key(long meta, long key) {
-        mem.writeLong(meta, key);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @return Value pointer.
-     */
-    private long value(long meta) {
-        return mem.readLongVolatile(meta + 8);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @param valPtr Value pointer.
-     */
-    private void value(long meta, long valPtr) {
-        mem.writeLongVolatile(meta + 8, valPtr);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @param oldValPtr Old first value pointer.
-     * @param newValPtr New first value pointer.
-     * @return {@code true} If operation succeeded.
-     */
-    private boolean casValue(long meta, long oldValPtr, long newValPtr) {
-        return mem.casLong(meta + 8, oldValPtr, newValPtr);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @return Last visited value pointer.
-     */
-    private long lastVisitedValue(long meta) {
-        return mem.readLong(meta + 16);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @param valPtr Last visited value pointer.
-     */
-    private void lastVisitedValue(long meta, long valPtr) {
-        mem.writeLong(meta + 16, valPtr);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @param level Level.
-     * @return Next meta pointer.
-     */
-    private long nextMeta(long meta, int level) {
-        assert meta > 0 : meta;
-
-        return mem.readLongVolatile(meta + 24 + 8 * level);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @param level Level.
-     * @param oldNext Old next meta pointer.
-     * @param newNext New next meta pointer.
-     * @return {@code true} If operation succeeded.
-     */
-    private boolean casNextMeta(long meta, int level, long oldNext, long newNext) {
-        assert meta > 0 : meta;
-
-        return mem.casLong(meta + 24 + 8 * level, oldNext, newNext);
-    }
-
-    /**
-     * @param meta Meta pointer.
-     * @param level Level.
-     * @param nextMeta Next meta.
-     */
-    private void nextMeta(long meta, int level, long nextMeta) {
-        assert meta != 0;
-
-        mem.writeLong(meta + 24 + 8 * level, nextMeta);
-    }
-
-    /**
-     * @param keyPtr Key pointer.
-     * @return Key size.
-     */
-    private int keySize(long keyPtr) {
-        return mem.readInt(keyPtr);
-    }
-
-    /**
-     * @param keyPtr Key pointer.
-     * @param keySize Key size.
-     */
-    private void keySize(long keyPtr, int keySize) {
-        mem.writeInt(keyPtr, keySize);
-    }
-
-    /**
-     * @param rnd Random.
-     * @return Next level.
-     */
-    public static int randomLevel(Random rnd) {
-        int x = rnd.nextInt();
-
-        int level = 0;
-
-        while ((x & 1) != 0) { // Count sequential 1 bits.
-            level++;
-
-            x >>>= 1;
-        }
-
-        return level;
-    }
-
-    /**
-     * Reader.
-     */
-    private class Reader extends ReaderBase {
-        /**
-         * @param ser Serialization.
-         */
-        protected Reader(GridHadoopSerialization ser) {
-            super(ser);
-        }
-
-        /**
-         * @param meta Meta pointer.
-         * @return Key.
-         */
-        public Object readKey(long meta) {
-            assert meta > 0 : meta;
-
-            long k = key(meta);
-
-            try {
-                return read(k + 4, keySize(k));
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-        }
-    }
-
-    /**
-     * Adder.
-     */
-    private class AdderImpl extends AdderBase {
-        /** */
-        private final Comparator<Object> cmp;
-
-        /** */
-        private final Random rnd = new GridRandom();
-
-        /** */
-        private final GridLongList stack = new GridLongList(16);
-
-        /** */
-        private final Reader keyReader;
-
-        /**
-         * @param ctx Task context.
-         * @throws IgniteCheckedException If failed.
-         */
-        protected AdderImpl(GridHadoopTaskContext ctx) throws IgniteCheckedException {
-            super(ctx);
-
-            keyReader = new Reader(keySer);
-
-            cmp = ctx.sortComparator();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void write(Object key, Object val) throws IgniteCheckedException {
-            A.notNull(val, "val");
-
-            add(key, val);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException {
-            KeyImpl k = reuse == null ? new KeyImpl() : (KeyImpl)reuse;
-
-            k.tmpKey = keySer.read(in, k.tmpKey);
-
-            k.meta = add(k.tmpKey, null);
-
-            return k;
-        }
-
-        /**
-         * @param key Key.
-         * @param val Value.
-         * @param level Level.
-         * @return Meta pointer.
-         */
-        private long createMeta(long key, long val, int level) {
-            int size = 32 + 8 * level;
-
-            long meta = allocate(size);
-
-            key(meta, key);
-            value(meta, val);
-            lastVisitedValue(meta, 0L);
-
-            for (int i = 32; i < size; i += 8) // Fill with 0.
-                mem.writeLong(meta + i, 0L);
-
-            return meta;
-        }
-
-        /**
-         * @param key Key.
-         * @return Pointer.
-         * @throws IgniteCheckedException If failed.
-         */
-        private long writeKey(Object key) throws IgniteCheckedException {
-            long keyPtr = write(4, key, keySer);
-            int keySize = writtenSize() - 4;
-
-            keySize(keyPtr, keySize);
-
-            return keyPtr;
-        }
-
-        /**
-         * @param prevMeta Previous meta.
-         * @param meta Next meta.
-         */
-        private void stackPush(long prevMeta, long meta) {
-            stack.add(prevMeta);
-            stack.add(meta);
-        }
-
-        /**
-         * Drops last remembered frame from the stack.
-         */
-        private void stackPop() {
-            stack.pop(2);
-        }
-
-        /**
-         * @param key Key.
-         * @param val Value.
-         * @return Meta pointer.
-         * @throws IgniteCheckedException If failed.
-         */
-        private long add(Object key, @Nullable Object val) throws IgniteCheckedException {
-            assert key != null;
-
-            stack.clear();
-
-            long valPtr = 0;
-
-            if (val != null) { // Write value.
-                valPtr = write(12, val, valSer);
-                int valSize = writtenSize() - 12;
-
-                nextValue(valPtr, 0);
-                valueSize(valPtr, valSize);
-            }
-
-            long keyPtr = 0;
-            long newMeta = 0;
-            int newMetaLevel = -1;
-
-            long prevMeta = heads;
-            int level = topLevel.get();
-            long meta = level < 0 ? 0 : nextMeta(heads, level);
-
-            for (;;) {
-                if (level < 0) { // We did not find our key, trying to add new meta.
-                    if (keyPtr == 0) { // Write key and create meta only once.
-                        keyPtr = writeKey(key);
-
-                        newMetaLevel = randomLevel(rnd);
-                        newMeta = createMeta(keyPtr, valPtr, newMetaLevel);
-                    }
-
-                    nextMeta(newMeta, 0, meta); // Set next to new meta before publishing.
-
-                    if (casNextMeta(prevMeta, 0, meta, newMeta)) { // New key was added successfully.
-                        laceUp(key, newMeta, newMetaLevel);
-
-                        return newMeta;
-                    }
-                    else { // Add failed, need to check out what was added by another thread.
-                        meta = nextMeta(prevMeta, level = 0);
-
-                        stackPop();
-                    }
-                }
-
-                int cmpRes = cmp(key, meta);
-
-                if (cmpRes == 0) { // Key found.
-                    if (newMeta != 0)  // Deallocate if we've allocated something.
-                        localDeallocate(keyPtr);
-
-                    if (valPtr == 0) // Only key needs to be added.
-                        return meta;
-
-                    for (;;) { // Add value for the key found.
-                        long nextVal = value(meta);
-
-                        nextValue(valPtr, nextVal);
-
-                        if (casValue(meta, nextVal, valPtr))
-                            return meta;
-                    }
-                }
-
-                assert cmpRes != 0;
-
-                if (cmpRes > 0) { // Go right.
-                    prevMeta = meta;
-                    meta = nextMeta(meta, level);
-
-                    if (meta != 0) // If nothing to the right then go down.
-                        continue;
-                }
-
-                while (--level >= 0) { // Go down.
-                    stackPush(prevMeta, meta); // Remember the path.
-
-                    long nextMeta = nextMeta(prevMeta, level);
-
-                    if (nextMeta != meta) { // If the meta is the same as on upper level go deeper.
-                        meta = nextMeta;
-
-                        assert meta != 0;
-
-                        break;
-                    }
-                }
-            }
-        }
-
-        /**
-         * @param key Key.
-         * @param meta Meta pointer.
-         * @return Comparison result.
-         */
-        @SuppressWarnings("unchecked")
-        private int cmp(Object key, long meta) {
-            assert meta != 0;
-
-            return cmp.compare(key, keyReader.readKey(meta));
-        }
-
-        /**
-         * Adds appropriate index links between metas.
-         *
-         * @param newMeta Just added meta.
-         * @param newMetaLevel New level.
-         */
-        private void laceUp(Object key, long newMeta, int newMetaLevel) {
-            for (int level = 1; level <= newMetaLevel; level++) { // Go from the bottom up.
-                long prevMeta = heads;
-                long meta = 0;
-
-                if (!stack.isEmpty()) { // Get the path back.
-                    meta = stack.remove();
-                    prevMeta = stack.remove();
-                }
-
-                for (;;) {
-                    nextMeta(newMeta, level, meta);
-
-                    if (casNextMeta(prevMeta, level, meta, newMeta))
-                        break;
-
-                    long oldMeta = meta;
-
-                    meta = nextMeta(prevMeta, level); // Reread meta.
-
-                    for (;;) {
-                        int cmpRes = cmp(key, meta);
-
-                        if (cmpRes > 0) { // Go right.
-                            prevMeta = meta;
-                            meta = nextMeta(prevMeta, level);
-
-                            if (meta != oldMeta) // Old meta already known to be greater than ours or is 0.
-                                continue;
-                        }
-
-                        assert cmpRes != 0; // Two different metas with equal keys must be impossible.
-
-                        break; // Retry cas.
-                    }
-                }
-            }
-
-            if (!stack.isEmpty())
-                return; // Our level already lower than top.
-
-            for (;;) { // Raise top level.
-                int top = topLevel.get();
-
-                if (newMetaLevel <= top || topLevel.compareAndSet(top, newMetaLevel))
-                    break;
-            }
-        }
-
-        /**
-         * Key.
-         */
-        private class KeyImpl implements Key {
-            /** */
-            private long meta;
-
-            /** */
-            private Object tmpKey;
-
-            /**
-             * @return Meta pointer for the key.
-             */
-            public long address() {
-                return meta;
-            }
-
-            /**
-             * @param val Value.
-             */
-            @Override public void add(Value val) {
-                int size = val.size();
-
-                long valPtr = allocate(size + 12);
-
-                val.copyTo(valPtr + 12);
-
-                valueSize(valPtr, size);
-
-                long nextVal;
-
-                do {
-                    nextVal = value(meta);
-
-                    nextValue(valPtr, nextVal);
-                }
-                while(!casValue(meta, nextVal, valPtr));
-            }
-        }
-    }
-
-    /**
-     * Task input.
-     */
-    private class Input implements GridHadoopTaskInput {
-        /** */
-        private long metaPtr = heads;
-
-        /** */
-        private final Reader keyReader;
-
-        /** */
-        private final Reader valReader;
-
-        /**
-         * @param taskCtx Task context.
-         * @throws IgniteCheckedException If failed.
-         */
-        private Input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
-            keyReader = new Reader(taskCtx.keySerialization());
-            valReader = new Reader(taskCtx.valueSerialization());
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean next() {
-            metaPtr = nextMeta(metaPtr, 0);
-
-            return metaPtr != 0;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Object key() {
-            return keyReader.readKey(metaPtr);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Iterator<?> values() {
-            return new ValueIterator(value(metaPtr), valReader);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close() throws IgniteCheckedException {
-            keyReader.close();
-            valReader.close();
-        }
-    }
-
-    /**
-     * Grouped input using grouping comparator.
-     */
-    private class GroupedInput implements GridHadoopTaskInput {
-        /** */
-        private final Comparator<Object> grpCmp;
-
-        /** */
-        private final Input in;
-
-        /** */
-        private Object prevKey;
-
-        /** */
-        private Object nextKey;
-
-        /** */
-        private final GridLongList vals = new GridLongList();
-
-        /**
-         * @param grpCmp Grouping comparator.
-         * @param in Input.
-         */
-        private GroupedInput(Comparator<Object> grpCmp, Input in) {
-            this.grpCmp = grpCmp;
-            this.in = in;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean next() {
-            if (prevKey == null) { // First call.
-                if (!in.next())
-                    return false;
-
-                prevKey = in.key();
-
-                assert prevKey != null;
-
-                in.keyReader.resetReusedObject(null); // We need 2 instances of key object for comparison.
-
-                vals.add(value(in.metaPtr));
-            }
-            else {
-                if (in.metaPtr == 0) // We reached the end of the input.
-                    return false;
-
-                vals.clear();
-
-                vals.add(value(in.metaPtr));
-
-                in.keyReader.resetReusedObject(prevKey); // Switch key instances.
-
-                prevKey = nextKey;
-            }
-
-            while (in.next()) { // Fill with head value pointers with equal keys.
-                if (grpCmp.compare(prevKey, nextKey = in.key()) == 0)
-                    vals.add(value(in.metaPtr));
-                else
-                    break;
-            }
-
-            assert !vals.isEmpty();
-
-            return true;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Object key() {
-            return prevKey;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Iterator<?> values() {
-            assert !vals.isEmpty();
-
-            final ValueIterator valIter = new ValueIterator(vals.get(0), in.valReader);
-
-            return new Iterator<Object>() {
-                /** */
-                private int idx;
-
-                @Override public boolean hasNext() {
-                    if (!valIter.hasNext()) {
-                        if (++idx == vals.size())
-                            return false;
-
-                        valIter.head(vals.get(idx));
-
-                        assert valIter.hasNext();
-                    }
-
-                    return true;
-                }
-
-                @Override public Object next() {
-                    return valIter.next();
-                }
-
-                @Override public void remove() {
-                    valIter.remove();
-                }
-            };
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close() throws IgniteCheckedException {
-            in.close();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java
new file mode 100644
index 0000000..46d8bc9
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java
@@ -0,0 +1,611 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.offheap.unsafe.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Multimap for map reduce intermediate results.
+ */
+public class HadoopConcurrentHashMultimap extends HadoopHashMultimapBase {
+    /** */
+    private final AtomicReference<State> state = new AtomicReference<>(State.READING_WRITING);
+
+    /** */
+    private volatile AtomicLongArray oldTbl;
+
+    /** */
+    private volatile AtomicLongArray newTbl;
+
+    /** */
+    private final AtomicInteger keys = new AtomicInteger();
+
+    /** */
+    private final CopyOnWriteArrayList<AdderImpl> adders = new CopyOnWriteArrayList<>();
+
+    /** */
+    private final AtomicInteger inputs = new AtomicInteger();
+
+    /**
+     * @param jobInfo Job info.
+     * @param mem Memory.
+     * @param cap Initial capacity.
+     */
+    public HadoopConcurrentHashMultimap(GridHadoopJobInfo jobInfo, GridUnsafeMemory mem, int cap) {
+        super(jobInfo, mem);
+
+        assert U.isPow2(cap);
+
+        newTbl = oldTbl = new AtomicLongArray(cap);
+    }
+
+    /**
+     * @return Number of keys.
+     */
+    public long keys() {
+        int res = keys.get();
+
+        for (AdderImpl adder : adders)
+            res += adder.locKeys.get();
+
+        return res;
+    }
+
+    /**
+     * @return Current table capacity.
+     */
+    @Override public int capacity() {
+        return oldTbl.length();
+    }
+
+    /**
+     * @return Adder object.
+     * @param ctx Task context.
+     */
+    @Override public Adder startAdding(GridHadoopTaskContext ctx) throws IgniteCheckedException {
+        if (inputs.get() != 0)
+            throw new IllegalStateException("Active inputs.");
+
+        if (state.get() == State.CLOSING)
+            throw new IllegalStateException("Closed.");
+
+        return new AdderImpl(ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        assert inputs.get() == 0 : inputs.get();
+        assert adders.isEmpty() : adders.size();
+
+        state(State.READING_WRITING, State.CLOSING);
+
+        if (keys() == 0)
+            return;
+
+        super.close();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long meta(int idx) {
+        return oldTbl.get(idx);
+    }
+
+    /**
+     * Incrementally visits all the keys and values in the map.
+     *
+     * @param ignoreLastVisited Flag indicating that visiting must be started from the beginning.
+     * @param v Visitor.
+     * @return {@code false} If visiting was impossible due to rehashing.
+     */
+    @Override public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException {
+        if (!state.compareAndSet(State.READING_WRITING, State.VISITING)) {
+            assert state.get() != State.CLOSING;
+
+            return false; // Can not visit while rehashing happens.
+        }
+
+        AtomicLongArray tbl0 = oldTbl;
+
+        for (int i = 0; i < tbl0.length(); i++) {
+            long meta = tbl0.get(i);
+
+            while (meta != 0) {
+                long valPtr = value(meta);
+
+                long lastVisited = ignoreLastVisited ? 0 : lastVisitedValue(meta);
+
+                if (valPtr != lastVisited) {
+                    v.onKey(key(meta), keySize(meta));
+
+                    lastVisitedValue(meta, valPtr); // Set it to the first value in chain.
+
+                    do {
+                        v.onValue(valPtr + 12, valueSize(valPtr));
+
+                        valPtr = nextValue(valPtr);
+                    }
+                    while (valPtr != lastVisited);
+                }
+
+                meta = collision(meta);
+            }
+        }
+
+        state(State.VISITING, State.READING_WRITING);
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
+        inputs.incrementAndGet();
+
+        if (!adders.isEmpty())
+            throw new IllegalStateException("Active adders.");
+
+        State s = state.get();
+
+        if (s == State.CLOSING)
+            throw new IllegalStateException("Closed.");
+
+        assert s != State.REHASHING;
+
+        return new Input(taskCtx) {
+            @Override public void close() throws IgniteCheckedException {
+                if (inputs.decrementAndGet() < 0)
+                    throw new IllegalStateException();
+
+                super.close();
+            }
+        };
+    }
+
+    /**
+     * @param fromTbl Table.
+     */
+    private void rehashIfNeeded(AtomicLongArray fromTbl) {
+        if (fromTbl.length() == Integer.MAX_VALUE)
+            return;
+
+        long keys0 = keys();
+
+        if (keys0 < 3 * (fromTbl.length() >>> 2)) // New size has to be >= than 3/4 of capacity to rehash.
+            return;
+
+        if (fromTbl != newTbl) // Check if someone else have done the job.
+            return;
+
+        if (!state.compareAndSet(State.READING_WRITING, State.REHASHING)) {
+            assert state.get() != State.CLOSING; // Visiting is allowed, but we will not rehash.
+
+            return;
+        }
+
+        if (fromTbl != newTbl) { // Double check.
+            state(State.REHASHING, State.READING_WRITING); // Switch back.
+
+            return;
+        }
+
+        // Calculate new table capacity.
+        int newLen = fromTbl.length();
+
+        do {
+            newLen <<= 1;
+        }
+        while (newLen < keys0);
+
+        if (keys0 >= 3 * (newLen >>> 2)) // Still more than 3/4.
+            newLen <<= 1;
+
+        // This is our target table for rehashing.
+        AtomicLongArray toTbl = new AtomicLongArray(newLen);
+
+        // Make the new table visible before rehashing.
+        newTbl = toTbl;
+
+        // Rehash.
+        int newMask = newLen - 1;
+
+        long failedMeta = 0;
+
+        GridLongList collisions = new GridLongList(16);
+
+        for (int i = 0; i < fromTbl.length(); i++) { // Scan source table.
+            long meta = fromTbl.get(i);
+
+            assert meta != -1;
+
+            if (meta == 0) { // No entry.
+                failedMeta = 0;
+
+                if (!fromTbl.compareAndSet(i, 0, -1)) // Mark as moved.
+                    i--; // Retry.
+
+                continue;
+            }
+
+            do { // Collect all the collisions before the last one failed to nullify or 0.
+                collisions.add(meta);
+
+                meta = collision(meta);
+            }
+            while (meta != failedMeta);
+
+            do { // Go from the last to the first to avoid 'in-flight' state for meta entries.
+                meta = collisions.remove();
+
+                int addr = keyHash(meta) & newMask;
+
+                for (;;) { // Move meta entry to the new table.
+                    long toCollision = toTbl.get(addr);
+
+                    collision(meta, toCollision);
+
+                    if (toTbl.compareAndSet(addr, toCollision, meta))
+                        break;
+                }
+            }
+            while (!collisions.isEmpty());
+
+            // Here 'meta' will be a root pointer in old table.
+            if (!fromTbl.compareAndSet(i, meta, -1)) { // Try to mark as moved.
+                failedMeta = meta;
+
+                i--; // Retry the same address in table because new keys were added.
+            }
+            else
+                failedMeta = 0;
+        }
+
+        // Now old and new tables will be the same again.
+        oldTbl = toTbl;
+
+        state(State.REHASHING, State.READING_WRITING);
+    }
+
+    /**
+     * Switch state.
+     *
+     * @param oldState Expected state.
+     * @param newState New state.
+     */
+    private void state(State oldState, State newState) {
+        if (!state.compareAndSet(oldState, newState))
+            throw new IllegalStateException();
+    }
+
+    /**
+     * @param meta Meta pointer.
+     * @return Value pointer.
+     */
+    @Override protected long value(long meta) {
+        return mem.readLongVolatile(meta + 16);
+    }
+
+    /**
+     * @param meta Meta pointer.
+     * @param oldValPtr Old value.
+     * @param newValPtr New value.
+     * @return {@code true} If succeeded.
+     */
+    private boolean casValue(long meta, long oldValPtr, long newValPtr) {
+        return mem.casLong(meta + 16, oldValPtr, newValPtr);
+    }
+
+    /**
+     * @param meta Meta pointer.
+     * @return Collision pointer.
+     */
+    @Override protected long collision(long meta) {
+        return mem.readLongVolatile(meta + 24);
+    }
+
+    /**
+     * @param meta Meta pointer.
+     * @param collision Collision pointer.
+     */
+    @Override protected void collision(long meta, long collision) {
+        assert meta != collision : meta;
+
+        mem.writeLongVolatile(meta + 24, collision);
+    }
+
+    /**
+     * @param meta Meta pointer.
+     * @return Last visited value pointer.
+     */
+    private long lastVisitedValue(long meta) {
+        return mem.readLong(meta + 32);
+    }
+
+    /**
+     * @param meta Meta pointer.
+     * @param valPtr Last visited value pointer.
+     */
+    private void lastVisitedValue(long meta, long valPtr) {
+        mem.writeLong(meta + 32, valPtr);
+    }
+
+    /**
+     * Adder. Must not be shared between threads.
+     */
+    private class AdderImpl extends AdderBase {
+        /** */
+        private final Reader keyReader;
+
+        /** */
+        private final AtomicInteger locKeys = new AtomicInteger();
+
+        /** */
+        private final Random rnd = new GridRandom();
+
+        /**
+         * @param ctx Task context.
+         * @throws IgniteCheckedException If failed.
+         */
+        private AdderImpl(GridHadoopTaskContext ctx) throws IgniteCheckedException {
+            super(ctx);
+
+            keyReader = new Reader(keySer);
+
+            rehashIfNeeded(oldTbl);
+
+            adders.add(this);
+        }
+
+        /**
+         * @param in Data input.
+         * @param reuse Reusable key.
+         * @return Key.
+         * @throws IgniteCheckedException If failed.
+         */
+        @Override public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException {
+            KeyImpl k = reuse == null ? new KeyImpl() : (KeyImpl)reuse;
+
+            k.tmpKey = keySer.read(in, k.tmpKey);
+
+            k.meta = add(k.tmpKey, null);
+
+            return k;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Object key, Object val) throws IgniteCheckedException {
+            A.notNull(val, "val");
+
+            add(key, val);
+        }
+
+        /**
+         * @param tbl Table.
+         */
+        private void incrementKeys(AtomicLongArray tbl) {
+            locKeys.lazySet(locKeys.get() + 1);
+
+            if (rnd.nextInt(tbl.length()) < 512)
+                rehashIfNeeded(tbl);
+        }
+
+        /**
+         * @param keyHash Key hash.
+         * @param keySize Key size.
+         * @param keyPtr Key pointer.
+         * @param valPtr Value page pointer.
+         * @param collisionPtr Pointer to meta with hash collision.
+         * @param lastVisitedVal Last visited value pointer.
+         * @return Created meta page pointer.
+         */
+        private long createMeta(int keyHash, int keySize, long keyPtr, long valPtr, long collisionPtr, long lastVisitedVal) {
+            long meta = allocate(40);
+
+            mem.writeInt(meta, keyHash);
+            mem.writeInt(meta + 4, keySize);
+            mem.writeLong(meta + 8, keyPtr);
+            mem.writeLong(meta + 16, valPtr);
+            mem.writeLong(meta + 24, collisionPtr);
+            mem.writeLong(meta + 32, lastVisitedVal);
+
+            return meta;
+        }
+
+        /**
+         * @param key Key.
+         * @param val Value.
+         * @return Updated or created meta page pointer.
+         * @throws IgniteCheckedException If failed.
+         */
+        private long add(Object key, @Nullable Object val) throws IgniteCheckedException {
+            AtomicLongArray tbl = oldTbl;
+
+            int keyHash = U.hash(key.hashCode());
+
+            long newMetaPtr = 0;
+
+            long valPtr = 0;
+
+            if (val != null) {
+                valPtr = write(12, val, valSer);
+                int valSize = writtenSize() - 12;
+
+                valueSize(valPtr, valSize);
+            }
+
+            for (AtomicLongArray old = null;;) {
+                int addr = keyHash & (tbl.length() - 1);
+
+                long metaPtrRoot = tbl.get(addr); // Read root meta pointer at this address.
+
+                if (metaPtrRoot == -1) { // The cell was already moved by rehashing.
+                    AtomicLongArray n = newTbl; // Need to read newTbl first here.
+                    AtomicLongArray o = oldTbl;
+
+                    tbl = tbl == o ? n : o; // Trying to get the oldest table but newer than ours.
+
+                    old = null;
+
+                    continue;
+                }
+
+                if (metaPtrRoot != 0) { // Not empty slot.
+                    long metaPtr = metaPtrRoot;
+
+                    do { // Scan all the collisions.
+                        if (keyHash(metaPtr) == keyHash && key.equals(keyReader.readKey(metaPtr))) { // Found key.
+                            if (newMetaPtr != 0)  // Deallocate new meta if one was allocated.
+                                localDeallocate(key(newMetaPtr)); // Key was allocated first, so rewind to it's pointer.
+
+                            if (valPtr != 0) { // Add value if it exists.
+                                long nextValPtr;
+
+                                // Values are linked to each other to a stack like structure.
+                                // Replace the last value in meta with ours and link it as next.
+                                do {
+                                    nextValPtr = value(metaPtr);
+
+                                    nextValue(valPtr, nextValPtr);
+                                }
+                                while (!casValue(metaPtr, nextValPtr, valPtr));
+                            }
+
+                            return metaPtr;
+                        }
+
+                        metaPtr = collision(metaPtr);
+                    }
+                    while (metaPtr != 0);
+
+                    // Here we did not find our key, need to check if it was moved by rehashing to the new table.
+                    if (old == null) { // If the old table already set, then we will just try to update it.
+                        AtomicLongArray n = newTbl;
+
+                        if (n != tbl) { // Rehashing happens, try to find the key in new table but preserve the old one.
+                            old = tbl;
+                            tbl = n;
+
+                            continue;
+                        }
+                    }
+                }
+
+                if (old != null) { // We just checked new table but did not find our key as well as in the old one.
+                    tbl = old; // Try to add new key to the old table.
+
+                    addr = keyHash & (tbl.length() - 1);
+
+                    old = null;
+                }
+
+                if (newMetaPtr == 0) { // Allocate new meta page.
+                    long keyPtr = write(0, key, keySer);
+                    int keySize = writtenSize();
+
+                    if (valPtr != 0)
+                        nextValue(valPtr, 0);
+
+                    newMetaPtr = createMeta(keyHash, keySize, keyPtr, valPtr, metaPtrRoot, 0);
+                }
+                else // Update new meta with root pointer collision.
+                    collision(newMetaPtr, metaPtrRoot);
+
+                if (tbl.compareAndSet(addr, metaPtrRoot, newMetaPtr)) { // Try to replace root pointer with new one.
+                    incrementKeys(tbl);
+
+                    return newMetaPtr;
+                }
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() throws IgniteCheckedException {
+            if (!adders.remove(this))
+                throw new IllegalStateException();
+
+            keys.addAndGet(locKeys.get()); // Here we have race and #keys() method can return wrong result but it is ok.
+
+            super.close();
+        }
+
+        /**
+         * Key.
+         */
+        private class KeyImpl implements Key {
+            /** */
+            private long meta;
+
+            /** */
+            private Object tmpKey;
+
+            /**
+             * @return Meta pointer for the key.
+             */
+            public long address() {
+                return meta;
+            }
+
+            /**
+             * @param val Value.
+             */
+            @Override public void add(Value val) {
+                int size = val.size();
+
+                long valPtr = allocate(size + 12);
+
+                val.copyTo(valPtr + 12);
+
+                valueSize(valPtr, size);
+
+                long nextVal;
+
+                do {
+                    nextVal = value(meta);
+
+                    nextValue(valPtr, nextVal);
+                }
+                while(!casValue(meta, nextVal, valPtr));
+            }
+        }
+    }
+
+    /**
+     * Current map state.
+     */
+    private enum State {
+        /** */
+        REHASHING,
+
+        /** */
+        VISITING,
+
+        /** */
+        READING_WRITING,
+
+        /** */
+        CLOSING
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java
new file mode 100644
index 0000000..15b93c6
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.offheap.unsafe.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+/**
+ * Hash multimap.
+ */
+public class HadoopHashMultimap extends HadoopHashMultimapBase {
+    /** */
+    private long[] tbl;
+
+    /** */
+    private int keys;
+
+    /**
+     * @param jobInfo Job info.
+     * @param mem Memory.
+     * @param cap Initial capacity.
+     */
+    public HadoopHashMultimap(GridHadoopJobInfo jobInfo, GridUnsafeMemory mem, int cap) {
+        super(jobInfo, mem);
+
+        assert U.isPow2(cap) : cap;
+
+        tbl = new long[cap];
+    }
+
+    /** {@inheritDoc} */
+    @Override public Adder startAdding(GridHadoopTaskContext ctx) throws IgniteCheckedException {
+        return new AdderImpl(ctx);
+    }
+
+    /**
+     * Rehash.
+     */
+    private void rehash() {
+        long[] newTbl = new long[tbl.length << 1];
+
+        int newMask = newTbl.length - 1;
+
+        for (long meta : tbl) {
+            while (meta != 0) {
+                long collision = collision(meta);
+
+                int idx = keyHash(meta) & newMask;
+
+                collision(meta, newTbl[idx]);
+
+                newTbl[idx] = meta;
+
+                meta = collision;
+            }
+        }
+
+        tbl = newTbl;
+    }
+
+    /**
+     * @return Keys count.
+     */
+    public int keys() {
+        return keys;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int capacity() {
+        return tbl.length;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long meta(int idx) {
+        return tbl[idx];
+    }
+
+    /**
+     * Adder.
+     */
+    private class AdderImpl extends AdderBase {
+        /** */
+        private final Reader keyReader;
+
+        /**
+         * @param ctx Task context.
+         * @throws IgniteCheckedException If failed.
+         */
+        protected AdderImpl(GridHadoopTaskContext ctx) throws IgniteCheckedException {
+            super(ctx);
+
+            keyReader = new Reader(keySer);
+        }
+
+        /**
+         * @param keyHash Key hash.
+         * @param keySize Key size.
+         * @param keyPtr Key pointer.
+         * @param valPtr Value page pointer.
+         * @param collisionPtr Pointer to meta with hash collision.
+         * @return Created meta page pointer.
+         */
+        private long createMeta(int keyHash, int keySize, long keyPtr, long valPtr, long collisionPtr) {
+            long meta = allocate(32);
+
+            mem.writeInt(meta, keyHash);
+            mem.writeInt(meta + 4, keySize);
+            mem.writeLong(meta + 8, keyPtr);
+            mem.writeLong(meta + 16, valPtr);
+            mem.writeLong(meta + 24, collisionPtr);
+
+            return meta;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Object key, Object val) throws IgniteCheckedException {
+            A.notNull(val, "val");
+
+            int keyHash = U.hash(key.hashCode());
+
+            // Write value.
+            long valPtr = write(12, val, valSer);
+            int valSize = writtenSize() - 12;
+
+            valueSize(valPtr, valSize);
+
+            // Find position in table.
+            int idx = keyHash & (tbl.length - 1);
+
+            long meta = tbl[idx];
+
+            // Search for our key in collisions.
+            while (meta != 0) {
+                if (keyHash(meta) == keyHash && key.equals(keyReader.readKey(meta))) { // Found key.
+                    nextValue(valPtr, value(meta));
+
+                    value(meta, valPtr);
+
+                    return;
+                }
+
+                meta = collision(meta);
+            }
+
+            // Write key.
+            long keyPtr = write(0, key, keySer);
+            int keySize = writtenSize();
+
+            nextValue(valPtr, 0);
+
+            tbl[idx] = createMeta(keyHash, keySize, keyPtr, valPtr, tbl[idx]);
+
+            if (++keys > (tbl.length >>> 2) * 3)
+                rehash();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java
new file mode 100644
index 0000000..f62a354
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.offheap.unsafe.*;
+
+import java.util.*;
+
+/**
+ * Base class for hash multimaps.
+ */
+public abstract class HadoopHashMultimapBase extends HadoopMultimapBase {
+    /**
+     * @param jobInfo Job info.
+     * @param mem Memory.
+     */
+    protected HadoopHashMultimapBase(GridHadoopJobInfo jobInfo, GridUnsafeMemory mem) {
+        super(jobInfo, mem);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException {
+        throw new UnsupportedOperationException("visit");
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
+        return new Input(taskCtx);
+    }
+
+    /**
+     * @return Hash table capacity.
+     */
+    public abstract int capacity();
+
+    /**
+     * @param idx Index in hash table.
+     * @return Meta page pointer.
+     */
+    protected abstract long meta(int idx);
+
+    /**
+     * @param meta Meta pointer.
+     * @return Key hash.
+     */
+    protected int keyHash(long meta) {
+        return mem.readInt(meta);
+    }
+
+    /**
+     * @param meta Meta pointer.
+     * @return Key size.
+     */
+    protected int keySize(long meta) {
+        return mem.readInt(meta + 4);
+    }
+
+    /**
+     * @param meta Meta pointer.
+     * @return Key pointer.
+     */
+    protected long key(long meta) {
+        return mem.readLong(meta + 8);
+    }
+
+    /**
+     * @param meta Meta pointer.
+     * @return Value pointer.
+     */
+    protected long value(long meta) {
+        return mem.readLong(meta + 16);
+    }
+    /**
+     * @param meta Meta pointer.
+     * @param val Value pointer.
+     */
+    protected void value(long meta, long val) {
+        mem.writeLong(meta + 16, val);
+    }
+
+    /**
+     * @param meta Meta pointer.
+     * @return Collision pointer.
+     */
+    protected long collision(long meta) {
+        return mem.readLong(meta + 24);
+    }
+
+    /**
+     * @param meta Meta pointer.
+     * @param collision Collision pointer.
+     */
+    protected void collision(long meta, long collision) {
+        assert meta != collision : meta;
+
+        mem.writeLong(meta + 24, collision);
+    }
+
+    /**
+     * Reader for key and value.
+     */
+    protected class Reader extends ReaderBase {
+        /**
+         * @param ser Serialization.
+         */
+        protected Reader(GridHadoopSerialization ser) {
+            super(ser);
+        }
+
+        /**
+         * @param meta Meta pointer.
+         * @return Key.
+         */
+        public Object readKey(long meta) {
+            assert meta > 0 : meta;
+
+            try {
+                return read(key(meta), keySize(meta));
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+    }
+
+    /**
+     * Task input.
+     */
+    protected class Input implements GridHadoopTaskInput {
+        /** */
+        private int idx = -1;
+
+        /** */
+        private long metaPtr;
+
+        /** */
+        private final int cap;
+
+        /** */
+        private final Reader keyReader;
+
+        /** */
+        private final Reader valReader;
+
+        /**
+         * @param taskCtx Task context.
+         * @throws IgniteCheckedException If failed.
+         */
+        public Input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
+            cap = capacity();
+
+            keyReader = new Reader(taskCtx.keySerialization());
+            valReader = new Reader(taskCtx.valueSerialization());
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean next() {
+            if (metaPtr != 0) {
+                metaPtr = collision(metaPtr);
+
+                if (metaPtr != 0)
+                    return true;
+            }
+
+            while (++idx < cap) { // Scan table.
+                metaPtr = meta(idx);
+
+                if (metaPtr != 0)
+                    return true;
+            }
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object key() {
+            return keyReader.readKey(metaPtr);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Iterator<?> values() {
+            return new ValueIterator(value(metaPtr), valReader);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() throws IgniteCheckedException {
+            keyReader.close();
+            valReader.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java
new file mode 100644
index 0000000..e1fa1f1
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Multimap for hadoop intermediate results.
+ */
+@SuppressWarnings("PublicInnerClass")
+public interface HadoopMultimap extends AutoCloseable {
+    /**
+     * Incrementally visits all the keys and values in the map.
+     *
+     * @param ignoreLastVisited Flag indicating that visiting must be started from the beginning.
+     * @param v Visitor.
+     * @return {@code false} If visiting was impossible.
+     */
+    public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException;
+
+    /**
+     * @param ctx Task context.
+     * @return Adder.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Adder startAdding(GridHadoopTaskContext ctx) throws IgniteCheckedException;
+
+    /**
+     * @param taskCtx Task context.
+     * @return Task input.
+     * @throws IgniteCheckedException If failed.
+     */
+    public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx)
+        throws IgniteCheckedException;
+
+    /** {@inheritDoc} */
+    @Override public void close();
+
+    /**
+     * Adder.
+     */
+    public interface Adder extends GridHadoopTaskOutput {
+        /**
+         * @param in Data input.
+         * @param reuse Reusable key.
+         * @return Key.
+         * @throws IgniteCheckedException If failed.
+         */
+        public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException;
+    }
+
+    /**
+     * Key add values to.
+     */
+    public interface Key {
+        /**
+         * @param val Value.
+         */
+        public void add(Value val);
+    }
+
+    /**
+     * Value.
+     */
+    public interface Value {
+        /**
+         * @return Size in bytes.
+         */
+        public int size();
+
+        /**
+         * @param ptr Pointer.
+         */
+        public void copyTo(long ptr);
+    }
+
+    /**
+     * Key and values visitor.
+     */
+    public interface Visitor {
+        /**
+         * @param keyPtr Key pointer.
+         * @param keySize Key size.
+         */
+        public void onKey(long keyPtr, int keySize) throws IgniteCheckedException;
+
+        /**
+         * @param valPtr Value pointer.
+         * @param valSize Value size.
+         */
+        public void onValue(long valPtr, int valSize) throws IgniteCheckedException;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
new file mode 100644
index 0000000..4aa6e9e
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.shuffle.streams.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.offheap.unsafe.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobProperty.*;
+
+/**
+ * Base class for all multimaps.
+ */
+public abstract class HadoopMultimapBase implements HadoopMultimap {
+    /** */
+    protected final GridUnsafeMemory mem;
+
+    /** */
+    protected final int pageSize;
+
+    /** */
+    private final Collection<GridLongList> allPages = new ConcurrentLinkedQueue<>();
+
+    /**
+     * @param jobInfo Job info.
+     * @param mem Memory.
+     */
+    protected HadoopMultimapBase(GridHadoopJobInfo jobInfo, GridUnsafeMemory mem) {
+        assert jobInfo != null;
+        assert mem != null;
+
+        this.mem = mem;
+
+        pageSize = get(jobInfo, SHUFFLE_OFFHEAP_PAGE_SIZE, 32 * 1024);
+    }
+
+    /**
+     * @param ptrs Page pointers.
+     */
+    private void deallocate(GridLongList ptrs) {
+        while (!ptrs.isEmpty())
+            mem.release(ptrs.remove(), ptrs.remove());
+    }
+
+    /**
+     * @param valPtr Value page pointer.
+     * @param nextValPtr Next value page pointer.
+     */
+    protected void nextValue(long valPtr, long nextValPtr) {
+        mem.writeLong(valPtr, nextValPtr);
+    }
+
+    /**
+     * @param valPtr Value page pointer.
+     * @return Next value page pointer.
+     */
+    protected long nextValue(long valPtr) {
+        return mem.readLong(valPtr);
+    }
+
+    /**
+     * @param valPtr Value page pointer.
+     * @param size Size.
+     */
+    protected void valueSize(long valPtr, int size) {
+        mem.writeInt(valPtr + 8, size);
+    }
+
+    /**
+     * @param valPtr Value page pointer.
+     * @return Value size.
+     */
+    protected int valueSize(long valPtr) {
+        return mem.readInt(valPtr + 8);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        for (GridLongList list : allPages)
+            deallocate(list);
+    }
+
+    /**
+     * Reader for key and value.
+     */
+    protected class ReaderBase implements AutoCloseable {
+        /** */
+        private Object tmp;
+
+        /** */
+        private final GridHadoopSerialization ser;
+
+        /** */
+        private final HadoopDataInStream in = new HadoopDataInStream(mem);
+
+        /**
+         * @param ser Serialization.
+         */
+        protected ReaderBase(GridHadoopSerialization ser) {
+            assert ser != null;
+
+            this.ser = ser;
+        }
+
+        /**
+         * @param valPtr Value page pointer.
+         * @return Value.
+         */
+        public Object readValue(long valPtr) {
+            assert valPtr > 0 : valPtr;
+
+            try {
+                return read(valPtr + 12, valueSize(valPtr));
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /**
+         * Resets temporary object to the given one.
+         *
+         * @param tmp Temporary object for reuse.
+         */
+        public void resetReusedObject(Object tmp) {
+            this.tmp = tmp;
+        }
+
+        /**
+         * @param ptr Pointer.
+         * @param size Object size.
+         * @return Object.
+         */
+        protected Object read(long ptr, long size) throws IgniteCheckedException {
+            in.buffer().set(ptr, size);
+
+            tmp = ser.read(in, tmp);
+
+            return tmp;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() throws IgniteCheckedException {
+            ser.close();
+        }
+    }
+
+    /**
+     * Base class for adders.
+     */
+    protected abstract class AdderBase implements Adder {
+        /** */
+        protected final GridHadoopSerialization keySer;
+
+        /** */
+        protected final GridHadoopSerialization valSer;
+
+        /** */
+        private final HadoopDataOutStream out;
+
+        /** */
+        private long writeStart;
+
+        /** Size and pointer pairs list. */
+        private final GridLongList pages = new GridLongList(16);
+
+        /**
+         * @param ctx Task context.
+         * @throws IgniteCheckedException If failed.
+         */
+        protected AdderBase(GridHadoopTaskContext ctx) throws IgniteCheckedException {
+            valSer = ctx.valueSerialization();
+            keySer = ctx.keySerialization();
+
+            out = new HadoopDataOutStream(mem) {
+                @Override public long move(long size) {
+                    long ptr = super.move(size);
+
+                    if (ptr == 0) // Was not able to move - not enough free space.
+                        ptr = allocateNextPage(size);
+
+                    assert ptr != 0;
+
+                    return ptr;
+                }
+            };
+        }
+
+        /**
+         * @param requestedSize Requested size.
+         * @return Next write pointer.
+         */
+        private long allocateNextPage(long requestedSize) {
+            int writtenSize = writtenSize();
+
+            long newPageSize = Math.max(writtenSize + requestedSize, pageSize);
+            long newPagePtr = mem.allocate(newPageSize);
+
+            pages.add(newPageSize);
+            pages.add(newPagePtr);
+
+            HadoopOffheapBuffer b = out.buffer();
+
+            b.set(newPagePtr, newPageSize);
+
+            if (writtenSize != 0) {
+                mem.copyMemory(writeStart, newPagePtr, writtenSize);
+
+                b.move(writtenSize);
+            }
+
+            writeStart = newPagePtr;
+
+            return b.move(requestedSize);
+        }
+
+        /**
+         * @return Fixed pointer.
+         */
+        private long fixAlignment() {
+            HadoopOffheapBuffer b = out.buffer();
+
+            long ptr = b.pointer();
+
+            if ((ptr & 7L) != 0) { // Address is not aligned by octet.
+                ptr = (ptr + 8L) & ~7L;
+
+                b.pointer(ptr);
+            }
+
+            return ptr;
+        }
+
+        /**
+         * @param off Offset.
+         * @param o Object.
+         * @return Page pointer.
+         * @throws IgniteCheckedException If failed.
+         */
+        protected long write(int off, Object o, GridHadoopSerialization ser) throws IgniteCheckedException {
+            writeStart = fixAlignment();
+
+            if (off != 0)
+                out.move(off);
+
+            ser.write(out, o);
+
+            return writeStart;
+        }
+
+        /**
+         * @param size Size.
+         * @return Pointer.
+         */
+        protected long allocate(int size) {
+            writeStart = fixAlignment();
+
+            out.move(size);
+
+            return writeStart;
+        }
+
+        /**
+         * Rewinds local allocation pointer to the given pointer if possible.
+         *
+         * @param ptr Pointer.
+         */
+        protected void localDeallocate(long ptr) {
+            HadoopOffheapBuffer b = out.buffer();
+
+            if (b.isInside(ptr))
+                b.pointer(ptr);
+            else
+                b.reset();
+        }
+
+        /**
+         * @return Written size.
+         */
+        protected int writtenSize() {
+            return (int)(out.buffer().pointer() - writeStart);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() throws IgniteCheckedException {
+            allPages.add(pages);
+
+            keySer.close();
+            valSer.close();
+        }
+    }
+
+    /**
+     * Iterator over values.
+     */
+    protected class ValueIterator implements Iterator<Object> {
+        /** */
+        private long valPtr;
+
+        /** */
+        private final ReaderBase valReader;
+
+        /**
+         * @param valPtr Value page pointer.
+         * @param valReader Value reader.
+         */
+        protected ValueIterator(long valPtr, ReaderBase valReader) {
+            this.valPtr = valPtr;
+            this.valReader = valReader;
+        }
+
+        /**
+         * @param valPtr Head value pointer.
+         */
+        public void head(long valPtr) {
+            this.valPtr = valPtr;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean hasNext() {
+            return valPtr != 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object next() {
+            if (!hasNext())
+                throw new NoSuchElementException();
+
+            Object res = valReader.readValue(valPtr);
+
+            valPtr = nextValue(valPtr);
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void remove() {
+            throw new UnsupportedOperationException();
+        }
+    }
+}