You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/03/05 10:06:35 UTC
[24/51] incubator-ignite git commit: IGNITE-386: Squashed changes.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipList.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipList.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipList.java
deleted file mode 100644
index a2c626c..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/GridHadoopSkipList.java
+++ /dev/null
@@ -1,726 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.offheap.unsafe.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Skip list.
- */
-public class GridHadoopSkipList extends GridHadoopMultimapBase {
- /** */
- private static final int HEADS_SIZE = 24 + 33 * 8; // Offset + max level is from 0 to 32 inclusive.
-
- /** Top level. */
- private final AtomicInteger topLevel = new AtomicInteger(-1);
-
- /** Heads for all the lists. */
- private final long heads;
-
- /** */
- private final AtomicBoolean visitGuard = new AtomicBoolean();
-
- /**
- * @param jobInfo Job info.
- * @param mem Memory.
- */
- public GridHadoopSkipList(GridHadoopJobInfo jobInfo, GridUnsafeMemory mem) {
- super(jobInfo, mem);
-
- heads = mem.allocate(HEADS_SIZE, true);
- }
-
- /** {@inheritDoc} */
- @Override public void close() {
- super.close();
-
- mem.release(heads, HEADS_SIZE);
- }
-
- /** {@inheritDoc} */
- @Override public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException {
- if (!visitGuard.compareAndSet(false, true))
- return false;
-
- for (long meta = nextMeta(heads, 0); meta != 0L; meta = nextMeta(meta, 0)) {
- long valPtr = value(meta);
-
- long lastVisited = ignoreLastVisited ? 0 : lastVisitedValue(meta);
-
- if (valPtr != lastVisited) {
- long k = key(meta);
-
- v.onKey(k + 4, keySize(k));
-
- lastVisitedValue(meta, valPtr); // Set it to the first value in chain.
-
- do {
- v.onValue(valPtr + 12, valueSize(valPtr));
-
- valPtr = nextValue(valPtr);
- }
- while (valPtr != lastVisited);
- }
- }
-
- visitGuard.lazySet(false);
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public Adder startAdding(GridHadoopTaskContext ctx) throws IgniteCheckedException {
- return new AdderImpl(ctx);
- }
-
- /** {@inheritDoc} */
- @Override public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
- Input in = new Input(taskCtx);
-
- Comparator<Object> grpCmp = taskCtx.groupComparator();
-
- if (grpCmp != null)
- return new GroupedInput(grpCmp, in);
-
- return in;
- }
-
- /**
- * @param meta Meta pointer.
- * @return Key pointer.
- */
- private long key(long meta) {
- return mem.readLong(meta);
- }
-
- /**
- * @param meta Meta pointer.
- * @param key Key pointer.
- */
- private void key(long meta, long key) {
- mem.writeLong(meta, key);
- }
-
- /**
- * @param meta Meta pointer.
- * @return Value pointer.
- */
- private long value(long meta) {
- return mem.readLongVolatile(meta + 8);
- }
-
- /**
- * @param meta Meta pointer.
- * @param valPtr Value pointer.
- */
- private void value(long meta, long valPtr) {
- mem.writeLongVolatile(meta + 8, valPtr);
- }
-
- /**
- * @param meta Meta pointer.
- * @param oldValPtr Old first value pointer.
- * @param newValPtr New first value pointer.
- * @return {@code true} If operation succeeded.
- */
- private boolean casValue(long meta, long oldValPtr, long newValPtr) {
- return mem.casLong(meta + 8, oldValPtr, newValPtr);
- }
-
- /**
- * @param meta Meta pointer.
- * @return Last visited value pointer.
- */
- private long lastVisitedValue(long meta) {
- return mem.readLong(meta + 16);
- }
-
- /**
- * @param meta Meta pointer.
- * @param valPtr Last visited value pointer.
- */
- private void lastVisitedValue(long meta, long valPtr) {
- mem.writeLong(meta + 16, valPtr);
- }
-
- /**
- * @param meta Meta pointer.
- * @param level Level.
- * @return Next meta pointer.
- */
- private long nextMeta(long meta, int level) {
- assert meta > 0 : meta;
-
- return mem.readLongVolatile(meta + 24 + 8 * level);
- }
-
- /**
- * @param meta Meta pointer.
- * @param level Level.
- * @param oldNext Old next meta pointer.
- * @param newNext New next meta pointer.
- * @return {@code true} If operation succeeded.
- */
- private boolean casNextMeta(long meta, int level, long oldNext, long newNext) {
- assert meta > 0 : meta;
-
- return mem.casLong(meta + 24 + 8 * level, oldNext, newNext);
- }
-
- /**
- * @param meta Meta pointer.
- * @param level Level.
- * @param nextMeta Next meta.
- */
- private void nextMeta(long meta, int level, long nextMeta) {
- assert meta != 0;
-
- mem.writeLong(meta + 24 + 8 * level, nextMeta);
- }
-
- /**
- * @param keyPtr Key pointer.
- * @return Key size.
- */
- private int keySize(long keyPtr) {
- return mem.readInt(keyPtr);
- }
-
- /**
- * @param keyPtr Key pointer.
- * @param keySize Key size.
- */
- private void keySize(long keyPtr, int keySize) {
- mem.writeInt(keyPtr, keySize);
- }
-
- /**
- * @param rnd Random.
- * @return Next level.
- */
- public static int randomLevel(Random rnd) {
- int x = rnd.nextInt();
-
- int level = 0;
-
- while ((x & 1) != 0) { // Count sequential 1 bits.
- level++;
-
- x >>>= 1;
- }
-
- return level;
- }
-
- /**
- * Reader.
- */
- private class Reader extends ReaderBase {
- /**
- * @param ser Serialization.
- */
- protected Reader(GridHadoopSerialization ser) {
- super(ser);
- }
-
- /**
- * @param meta Meta pointer.
- * @return Key.
- */
- public Object readKey(long meta) {
- assert meta > 0 : meta;
-
- long k = key(meta);
-
- try {
- return read(k + 4, keySize(k));
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
- }
-
- /**
- * Adder.
- */
- private class AdderImpl extends AdderBase {
- /** */
- private final Comparator<Object> cmp;
-
- /** */
- private final Random rnd = new GridRandom();
-
- /** */
- private final GridLongList stack = new GridLongList(16);
-
- /** */
- private final Reader keyReader;
-
- /**
- * @param ctx Task context.
- * @throws IgniteCheckedException If failed.
- */
- protected AdderImpl(GridHadoopTaskContext ctx) throws IgniteCheckedException {
- super(ctx);
-
- keyReader = new Reader(keySer);
-
- cmp = ctx.sortComparator();
- }
-
- /** {@inheritDoc} */
- @Override public void write(Object key, Object val) throws IgniteCheckedException {
- A.notNull(val, "val");
-
- add(key, val);
- }
-
- /** {@inheritDoc} */
- @Override public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException {
- KeyImpl k = reuse == null ? new KeyImpl() : (KeyImpl)reuse;
-
- k.tmpKey = keySer.read(in, k.tmpKey);
-
- k.meta = add(k.tmpKey, null);
-
- return k;
- }
-
- /**
- * @param key Key.
- * @param val Value.
- * @param level Level.
- * @return Meta pointer.
- */
- private long createMeta(long key, long val, int level) {
- int size = 32 + 8 * level;
-
- long meta = allocate(size);
-
- key(meta, key);
- value(meta, val);
- lastVisitedValue(meta, 0L);
-
- for (int i = 32; i < size; i += 8) // Fill with 0.
- mem.writeLong(meta + i, 0L);
-
- return meta;
- }
-
- /**
- * @param key Key.
- * @return Pointer.
- * @throws IgniteCheckedException If failed.
- */
- private long writeKey(Object key) throws IgniteCheckedException {
- long keyPtr = write(4, key, keySer);
- int keySize = writtenSize() - 4;
-
- keySize(keyPtr, keySize);
-
- return keyPtr;
- }
-
- /**
- * @param prevMeta Previous meta.
- * @param meta Next meta.
- */
- private void stackPush(long prevMeta, long meta) {
- stack.add(prevMeta);
- stack.add(meta);
- }
-
- /**
- * Drops last remembered frame from the stack.
- */
- private void stackPop() {
- stack.pop(2);
- }
-
- /**
- * @param key Key.
- * @param val Value.
- * @return Meta pointer.
- * @throws IgniteCheckedException If failed.
- */
- private long add(Object key, @Nullable Object val) throws IgniteCheckedException {
- assert key != null;
-
- stack.clear();
-
- long valPtr = 0;
-
- if (val != null) { // Write value.
- valPtr = write(12, val, valSer);
- int valSize = writtenSize() - 12;
-
- nextValue(valPtr, 0);
- valueSize(valPtr, valSize);
- }
-
- long keyPtr = 0;
- long newMeta = 0;
- int newMetaLevel = -1;
-
- long prevMeta = heads;
- int level = topLevel.get();
- long meta = level < 0 ? 0 : nextMeta(heads, level);
-
- for (;;) {
- if (level < 0) { // We did not find our key, trying to add new meta.
- if (keyPtr == 0) { // Write key and create meta only once.
- keyPtr = writeKey(key);
-
- newMetaLevel = randomLevel(rnd);
- newMeta = createMeta(keyPtr, valPtr, newMetaLevel);
- }
-
- nextMeta(newMeta, 0, meta); // Set next to new meta before publishing.
-
- if (casNextMeta(prevMeta, 0, meta, newMeta)) { // New key was added successfully.
- laceUp(key, newMeta, newMetaLevel);
-
- return newMeta;
- }
- else { // Add failed, need to check out what was added by another thread.
- meta = nextMeta(prevMeta, level = 0);
-
- stackPop();
- }
- }
-
- int cmpRes = cmp(key, meta);
-
- if (cmpRes == 0) { // Key found.
- if (newMeta != 0) // Deallocate if we've allocated something.
- localDeallocate(keyPtr);
-
- if (valPtr == 0) // Only key needs to be added.
- return meta;
-
- for (;;) { // Add value for the key found.
- long nextVal = value(meta);
-
- nextValue(valPtr, nextVal);
-
- if (casValue(meta, nextVal, valPtr))
- return meta;
- }
- }
-
- assert cmpRes != 0;
-
- if (cmpRes > 0) { // Go right.
- prevMeta = meta;
- meta = nextMeta(meta, level);
-
- if (meta != 0) // If nothing to the right then go down.
- continue;
- }
-
- while (--level >= 0) { // Go down.
- stackPush(prevMeta, meta); // Remember the path.
-
- long nextMeta = nextMeta(prevMeta, level);
-
- if (nextMeta != meta) { // If the meta is the same as on upper level go deeper.
- meta = nextMeta;
-
- assert meta != 0;
-
- break;
- }
- }
- }
- }
-
- /**
- * @param key Key.
- * @param meta Meta pointer.
- * @return Comparison result.
- */
- @SuppressWarnings("unchecked")
- private int cmp(Object key, long meta) {
- assert meta != 0;
-
- return cmp.compare(key, keyReader.readKey(meta));
- }
-
- /**
- * Adds appropriate index links between metas.
- *
- * @param newMeta Just added meta.
- * @param newMetaLevel New level.
- */
- private void laceUp(Object key, long newMeta, int newMetaLevel) {
- for (int level = 1; level <= newMetaLevel; level++) { // Go from the bottom up.
- long prevMeta = heads;
- long meta = 0;
-
- if (!stack.isEmpty()) { // Get the path back.
- meta = stack.remove();
- prevMeta = stack.remove();
- }
-
- for (;;) {
- nextMeta(newMeta, level, meta);
-
- if (casNextMeta(prevMeta, level, meta, newMeta))
- break;
-
- long oldMeta = meta;
-
- meta = nextMeta(prevMeta, level); // Reread meta.
-
- for (;;) {
- int cmpRes = cmp(key, meta);
-
- if (cmpRes > 0) { // Go right.
- prevMeta = meta;
- meta = nextMeta(prevMeta, level);
-
- if (meta != oldMeta) // Old meta already known to be greater than ours or is 0.
- continue;
- }
-
- assert cmpRes != 0; // Two different metas with equal keys must be impossible.
-
- break; // Retry cas.
- }
- }
- }
-
- if (!stack.isEmpty())
- return; // Our level already lower than top.
-
- for (;;) { // Raise top level.
- int top = topLevel.get();
-
- if (newMetaLevel <= top || topLevel.compareAndSet(top, newMetaLevel))
- break;
- }
- }
-
- /**
- * Key.
- */
- private class KeyImpl implements Key {
- /** */
- private long meta;
-
- /** */
- private Object tmpKey;
-
- /**
- * @return Meta pointer for the key.
- */
- public long address() {
- return meta;
- }
-
- /**
- * @param val Value.
- */
- @Override public void add(Value val) {
- int size = val.size();
-
- long valPtr = allocate(size + 12);
-
- val.copyTo(valPtr + 12);
-
- valueSize(valPtr, size);
-
- long nextVal;
-
- do {
- nextVal = value(meta);
-
- nextValue(valPtr, nextVal);
- }
- while(!casValue(meta, nextVal, valPtr));
- }
- }
- }
-
- /**
- * Task input.
- */
- private class Input implements GridHadoopTaskInput {
- /** */
- private long metaPtr = heads;
-
- /** */
- private final Reader keyReader;
-
- /** */
- private final Reader valReader;
-
- /**
- * @param taskCtx Task context.
- * @throws IgniteCheckedException If failed.
- */
- private Input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
- keyReader = new Reader(taskCtx.keySerialization());
- valReader = new Reader(taskCtx.valueSerialization());
- }
-
- /** {@inheritDoc} */
- @Override public boolean next() {
- metaPtr = nextMeta(metaPtr, 0);
-
- return metaPtr != 0;
- }
-
- /** {@inheritDoc} */
- @Override public Object key() {
- return keyReader.readKey(metaPtr);
- }
-
- /** {@inheritDoc} */
- @Override public Iterator<?> values() {
- return new ValueIterator(value(metaPtr), valReader);
- }
-
- /** {@inheritDoc} */
- @Override public void close() throws IgniteCheckedException {
- keyReader.close();
- valReader.close();
- }
- }
-
- /**
- * Grouped input using grouping comparator.
- */
- private class GroupedInput implements GridHadoopTaskInput {
- /** */
- private final Comparator<Object> grpCmp;
-
- /** */
- private final Input in;
-
- /** */
- private Object prevKey;
-
- /** */
- private Object nextKey;
-
- /** */
- private final GridLongList vals = new GridLongList();
-
- /**
- * @param grpCmp Grouping comparator.
- * @param in Input.
- */
- private GroupedInput(Comparator<Object> grpCmp, Input in) {
- this.grpCmp = grpCmp;
- this.in = in;
- }
-
- /** {@inheritDoc} */
- @Override public boolean next() {
- if (prevKey == null) { // First call.
- if (!in.next())
- return false;
-
- prevKey = in.key();
-
- assert prevKey != null;
-
- in.keyReader.resetReusedObject(null); // We need 2 instances of key object for comparison.
-
- vals.add(value(in.metaPtr));
- }
- else {
- if (in.metaPtr == 0) // We reached the end of the input.
- return false;
-
- vals.clear();
-
- vals.add(value(in.metaPtr));
-
- in.keyReader.resetReusedObject(prevKey); // Switch key instances.
-
- prevKey = nextKey;
- }
-
- while (in.next()) { // Fill with head value pointers with equal keys.
- if (grpCmp.compare(prevKey, nextKey = in.key()) == 0)
- vals.add(value(in.metaPtr));
- else
- break;
- }
-
- assert !vals.isEmpty();
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public Object key() {
- return prevKey;
- }
-
- /** {@inheritDoc} */
- @Override public Iterator<?> values() {
- assert !vals.isEmpty();
-
- final ValueIterator valIter = new ValueIterator(vals.get(0), in.valReader);
-
- return new Iterator<Object>() {
- /** */
- private int idx;
-
- @Override public boolean hasNext() {
- if (!valIter.hasNext()) {
- if (++idx == vals.size())
- return false;
-
- valIter.head(vals.get(idx));
-
- assert valIter.hasNext();
- }
-
- return true;
- }
-
- @Override public Object next() {
- return valIter.next();
- }
-
- @Override public void remove() {
- valIter.remove();
- }
- };
- }
-
- /** {@inheritDoc} */
- @Override public void close() throws IgniteCheckedException {
- in.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java
new file mode 100644
index 0000000..65d9268
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java
@@ -0,0 +1,611 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.offheap.unsafe.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Multimap for map reduce intermediate results.
+ */
+public class HadoopConcurrentHashMultimap extends HadoopHashMultimapBase {
+ /** */
+ private final AtomicReference<State> state = new AtomicReference<>(State.READING_WRITING);
+
+ /** */
+ private volatile AtomicLongArray oldTbl;
+
+ /** */
+ private volatile AtomicLongArray newTbl;
+
+ /** */
+ private final AtomicInteger keys = new AtomicInteger();
+
+ /** */
+ private final CopyOnWriteArrayList<AdderImpl> adders = new CopyOnWriteArrayList<>();
+
+ /** */
+ private final AtomicInteger inputs = new AtomicInteger();
+
+ /**
+ * @param jobInfo Job info.
+ * @param mem Memory.
+ * @param cap Initial capacity.
+ */
+ public HadoopConcurrentHashMultimap(HadoopJobInfo jobInfo, GridUnsafeMemory mem, int cap) {
+ super(jobInfo, mem);
+
+ assert U.isPow2(cap);
+
+ newTbl = oldTbl = new AtomicLongArray(cap);
+ }
+
+ /**
+ * @return Number of keys.
+ */
+ public long keys() {
+ int res = keys.get();
+
+ for (AdderImpl adder : adders)
+ res += adder.locKeys.get();
+
+ return res;
+ }
+
+ /**
+ * @return Current table capacity.
+ */
+ @Override public int capacity() {
+ return oldTbl.length();
+ }
+
+ /**
+ * @return Adder object.
+ * @param ctx Task context.
+ */
+ @Override public Adder startAdding(HadoopTaskContext ctx) throws IgniteCheckedException {
+ if (inputs.get() != 0)
+ throw new IllegalStateException("Active inputs.");
+
+ if (state.get() == State.CLOSING)
+ throw new IllegalStateException("Closed.");
+
+ return new AdderImpl(ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ assert inputs.get() == 0 : inputs.get();
+ assert adders.isEmpty() : adders.size();
+
+ state(State.READING_WRITING, State.CLOSING);
+
+ if (keys() == 0)
+ return;
+
+ super.close();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long meta(int idx) {
+ return oldTbl.get(idx);
+ }
+
+ /**
+ * Incrementally visits all the keys and values in the map.
+ *
+ * @param ignoreLastVisited Flag indicating that visiting must be started from the beginning.
+ * @param v Visitor.
+ * @return {@code false} If visiting was impossible due to rehashing.
+ */
+ @Override public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException {
+ if (!state.compareAndSet(State.READING_WRITING, State.VISITING)) {
+ assert state.get() != State.CLOSING;
+
+ return false; // Can not visit while rehashing happens.
+ }
+
+ AtomicLongArray tbl0 = oldTbl;
+
+ for (int i = 0; i < tbl0.length(); i++) {
+ long meta = tbl0.get(i);
+
+ while (meta != 0) {
+ long valPtr = value(meta);
+
+ long lastVisited = ignoreLastVisited ? 0 : lastVisitedValue(meta);
+
+ if (valPtr != lastVisited) {
+ v.onKey(key(meta), keySize(meta));
+
+ lastVisitedValue(meta, valPtr); // Set it to the first value in chain.
+
+ do {
+ v.onValue(valPtr + 12, valueSize(valPtr));
+
+ valPtr = nextValue(valPtr);
+ }
+ while (valPtr != lastVisited);
+ }
+
+ meta = collision(meta);
+ }
+ }
+
+ state(State.VISITING, State.READING_WRITING);
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+ inputs.incrementAndGet();
+
+ if (!adders.isEmpty())
+ throw new IllegalStateException("Active adders.");
+
+ State s = state.get();
+
+ if (s == State.CLOSING)
+ throw new IllegalStateException("Closed.");
+
+ assert s != State.REHASHING;
+
+ return new Input(taskCtx) {
+ @Override public void close() throws IgniteCheckedException {
+ if (inputs.decrementAndGet() < 0)
+ throw new IllegalStateException();
+
+ super.close();
+ }
+ };
+ }
+
+ /**
+ * @param fromTbl Table.
+ */
+ private void rehashIfNeeded(AtomicLongArray fromTbl) {
+ if (fromTbl.length() == Integer.MAX_VALUE)
+ return;
+
+ long keys0 = keys();
+
+ if (keys0 < 3 * (fromTbl.length() >>> 2)) // New size has to be >= than 3/4 of capacity to rehash.
+ return;
+
+ if (fromTbl != newTbl) // Check if someone else have done the job.
+ return;
+
+ if (!state.compareAndSet(State.READING_WRITING, State.REHASHING)) {
+ assert state.get() != State.CLOSING; // Visiting is allowed, but we will not rehash.
+
+ return;
+ }
+
+ if (fromTbl != newTbl) { // Double check.
+ state(State.REHASHING, State.READING_WRITING); // Switch back.
+
+ return;
+ }
+
+ // Calculate new table capacity.
+ int newLen = fromTbl.length();
+
+ do {
+ newLen <<= 1;
+ }
+ while (newLen < keys0);
+
+ if (keys0 >= 3 * (newLen >>> 2)) // Still more than 3/4.
+ newLen <<= 1;
+
+ // This is our target table for rehashing.
+ AtomicLongArray toTbl = new AtomicLongArray(newLen);
+
+ // Make the new table visible before rehashing.
+ newTbl = toTbl;
+
+ // Rehash.
+ int newMask = newLen - 1;
+
+ long failedMeta = 0;
+
+ GridLongList collisions = new GridLongList(16);
+
+ for (int i = 0; i < fromTbl.length(); i++) { // Scan source table.
+ long meta = fromTbl.get(i);
+
+ assert meta != -1;
+
+ if (meta == 0) { // No entry.
+ failedMeta = 0;
+
+ if (!fromTbl.compareAndSet(i, 0, -1)) // Mark as moved.
+ i--; // Retry.
+
+ continue;
+ }
+
+ do { // Collect all the collisions before the last one failed to nullify or 0.
+ collisions.add(meta);
+
+ meta = collision(meta);
+ }
+ while (meta != failedMeta);
+
+ do { // Go from the last to the first to avoid 'in-flight' state for meta entries.
+ meta = collisions.remove();
+
+ int addr = keyHash(meta) & newMask;
+
+ for (;;) { // Move meta entry to the new table.
+ long toCollision = toTbl.get(addr);
+
+ collision(meta, toCollision);
+
+ if (toTbl.compareAndSet(addr, toCollision, meta))
+ break;
+ }
+ }
+ while (!collisions.isEmpty());
+
+ // Here 'meta' will be a root pointer in old table.
+ if (!fromTbl.compareAndSet(i, meta, -1)) { // Try to mark as moved.
+ failedMeta = meta;
+
+ i--; // Retry the same address in table because new keys were added.
+ }
+ else
+ failedMeta = 0;
+ }
+
+ // Now old and new tables will be the same again.
+ oldTbl = toTbl;
+
+ state(State.REHASHING, State.READING_WRITING);
+ }
+
+ /**
+ * Switch state.
+ *
+ * @param oldState Expected state.
+ * @param newState New state.
+ */
+ private void state(State oldState, State newState) {
+ if (!state.compareAndSet(oldState, newState))
+ throw new IllegalStateException();
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @return Value pointer.
+ */
+ @Override protected long value(long meta) {
+ return mem.readLongVolatile(meta + 16);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @param oldValPtr Old value.
+ * @param newValPtr New value.
+ * @return {@code true} If succeeded.
+ */
+ private boolean casValue(long meta, long oldValPtr, long newValPtr) {
+ return mem.casLong(meta + 16, oldValPtr, newValPtr);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @return Collision pointer.
+ */
+ @Override protected long collision(long meta) {
+ return mem.readLongVolatile(meta + 24);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @param collision Collision pointer.
+ */
+ @Override protected void collision(long meta, long collision) {
+ assert meta != collision : meta;
+
+ mem.writeLongVolatile(meta + 24, collision);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @return Last visited value pointer.
+ */
+ private long lastVisitedValue(long meta) {
+ return mem.readLong(meta + 32);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @param valPtr Last visited value pointer.
+ */
+ private void lastVisitedValue(long meta, long valPtr) {
+ mem.writeLong(meta + 32, valPtr);
+ }
+
+ /**
+ * Adder. Must not be shared between threads.
+ */
+ private class AdderImpl extends AdderBase {
+ /** */
+ private final Reader keyReader;
+
+ /** */
+ private final AtomicInteger locKeys = new AtomicInteger();
+
+ /** */
+ private final Random rnd = new GridRandom();
+
+ /**
+ * @param ctx Task context.
+ * @throws IgniteCheckedException If failed.
+ */
+ private AdderImpl(HadoopTaskContext ctx) throws IgniteCheckedException {
+ super(ctx);
+
+ keyReader = new Reader(keySer);
+
+ rehashIfNeeded(oldTbl);
+
+ adders.add(this);
+ }
+
+ /**
+ * @param in Data input.
+ * @param reuse Reusable key.
+ * @return Key.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Override public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException {
+ KeyImpl k = reuse == null ? new KeyImpl() : (KeyImpl)reuse;
+
+ k.tmpKey = keySer.read(in, k.tmpKey);
+
+ k.meta = add(k.tmpKey, null);
+
+ return k;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Object key, Object val) throws IgniteCheckedException {
+ A.notNull(val, "val");
+
+ add(key, val);
+ }
+
+ /**
+ * @param tbl Table.
+ */
+ private void incrementKeys(AtomicLongArray tbl) {
+ locKeys.lazySet(locKeys.get() + 1);
+
+ if (rnd.nextInt(tbl.length()) < 512)
+ rehashIfNeeded(tbl);
+ }
+
+ /**
+ * @param keyHash Key hash.
+ * @param keySize Key size.
+ * @param keyPtr Key pointer.
+ * @param valPtr Value page pointer.
+ * @param collisionPtr Pointer to meta with hash collision.
+ * @param lastVisitedVal Last visited value pointer.
+ * @return Created meta page pointer.
+ */
+ private long createMeta(int keyHash, int keySize, long keyPtr, long valPtr, long collisionPtr, long lastVisitedVal) {
+ long meta = allocate(40);
+
+ mem.writeInt(meta, keyHash);
+ mem.writeInt(meta + 4, keySize);
+ mem.writeLong(meta + 8, keyPtr);
+ mem.writeLong(meta + 16, valPtr);
+ mem.writeLong(meta + 24, collisionPtr);
+ mem.writeLong(meta + 32, lastVisitedVal);
+
+ return meta;
+ }
+
+ /**
+ * @param key Key.
+ * @param val Value.
+ * @return Updated or created meta page pointer.
+ * @throws IgniteCheckedException If failed.
+ */
+ private long add(Object key, @Nullable Object val) throws IgniteCheckedException {
+ AtomicLongArray tbl = oldTbl;
+
+ int keyHash = U.hash(key.hashCode());
+
+ long newMetaPtr = 0;
+
+ long valPtr = 0;
+
+ if (val != null) {
+ valPtr = write(12, val, valSer);
+ int valSize = writtenSize() - 12;
+
+ valueSize(valPtr, valSize);
+ }
+
+ for (AtomicLongArray old = null;;) {
+ int addr = keyHash & (tbl.length() - 1);
+
+ long metaPtrRoot = tbl.get(addr); // Read root meta pointer at this address.
+
+ if (metaPtrRoot == -1) { // The cell was already moved by rehashing.
+ AtomicLongArray n = newTbl; // Need to read newTbl first here.
+ AtomicLongArray o = oldTbl;
+
+ tbl = tbl == o ? n : o; // Trying to get the oldest table but newer than ours.
+
+ old = null;
+
+ continue;
+ }
+
+ if (metaPtrRoot != 0) { // Not empty slot.
+ long metaPtr = metaPtrRoot;
+
+ do { // Scan all the collisions.
+ if (keyHash(metaPtr) == keyHash && key.equals(keyReader.readKey(metaPtr))) { // Found key.
+ if (newMetaPtr != 0) // Deallocate new meta if one was allocated.
+ localDeallocate(key(newMetaPtr)); // Key was allocated first, so rewind to it's pointer.
+
+ if (valPtr != 0) { // Add value if it exists.
+ long nextValPtr;
+
+ // Values are linked to each other to a stack like structure.
+ // Replace the last value in meta with ours and link it as next.
+ do {
+ nextValPtr = value(metaPtr);
+
+ nextValue(valPtr, nextValPtr);
+ }
+ while (!casValue(metaPtr, nextValPtr, valPtr));
+ }
+
+ return metaPtr;
+ }
+
+ metaPtr = collision(metaPtr);
+ }
+ while (metaPtr != 0);
+
+ // Here we did not find our key, need to check if it was moved by rehashing to the new table.
+ if (old == null) { // If the old table already set, then we will just try to update it.
+ AtomicLongArray n = newTbl;
+
+ if (n != tbl) { // Rehashing happens, try to find the key in new table but preserve the old one.
+ old = tbl;
+ tbl = n;
+
+ continue;
+ }
+ }
+ }
+
+ if (old != null) { // We just checked new table but did not find our key as well as in the old one.
+ tbl = old; // Try to add new key to the old table.
+
+ addr = keyHash & (tbl.length() - 1);
+
+ old = null;
+ }
+
+ if (newMetaPtr == 0) { // Allocate new meta page.
+ long keyPtr = write(0, key, keySer);
+ int keySize = writtenSize();
+
+ if (valPtr != 0)
+ nextValue(valPtr, 0);
+
+ newMetaPtr = createMeta(keyHash, keySize, keyPtr, valPtr, metaPtrRoot, 0);
+ }
+ else // Update new meta with root pointer collision.
+ collision(newMetaPtr, metaPtrRoot);
+
+ if (tbl.compareAndSet(addr, metaPtrRoot, newMetaPtr)) { // Try to replace root pointer with new one.
+ incrementKeys(tbl);
+
+ return newMetaPtr;
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IgniteCheckedException {
+ if (!adders.remove(this))
+ throw new IllegalStateException();
+
+ keys.addAndGet(locKeys.get()); // Here we have race and #keys() method can return wrong result but it is ok.
+
+ super.close();
+ }
+
+ /**
+ * Key.
+ */
+ private class KeyImpl implements Key {
+ /** */
+ private long meta;
+
+ /** */
+ private Object tmpKey;
+
+ /**
+ * @return Meta pointer for the key.
+ */
+ public long address() {
+ return meta;
+ }
+
+ /**
+ * @param val Value.
+ */
+ @Override public void add(Value val) {
+ int size = val.size();
+
+ long valPtr = allocate(size + 12);
+
+ val.copyTo(valPtr + 12);
+
+ valueSize(valPtr, size);
+
+ long nextVal;
+
+ do {
+ nextVal = value(meta);
+
+ nextValue(valPtr, nextVal);
+ }
+ while(!casValue(meta, nextVal, valPtr));
+ }
+ }
+ }
+
+ /**
+ * Current map state.
+ */
+ private enum State {
+ /** */
+ REHASHING,
+
+ /** */
+ VISITING,
+
+ /** */
+ READING_WRITING,
+
+ /** */
+ CLOSING
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java
new file mode 100644
index 0000000..f524bdc
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.offheap.unsafe.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+/**
+ * Hash multimap.
+ */
+public class HadoopHashMultimap extends HadoopHashMultimapBase {
+ /** */
+ private long[] tbl;
+
+ /** */
+ private int keys;
+
+ /**
+ * @param jobInfo Job info.
+ * @param mem Memory.
+ * @param cap Initial capacity.
+ */
+ public HadoopHashMultimap(HadoopJobInfo jobInfo, GridUnsafeMemory mem, int cap) {
+ super(jobInfo, mem);
+
+ assert U.isPow2(cap) : cap;
+
+ tbl = new long[cap];
+ }
+
+ /** {@inheritDoc} */
+ @Override public Adder startAdding(HadoopTaskContext ctx) throws IgniteCheckedException {
+ return new AdderImpl(ctx);
+ }
+
+ /**
+ * Rehash.
+ */
+ private void rehash() {
+ long[] newTbl = new long[tbl.length << 1];
+
+ int newMask = newTbl.length - 1;
+
+ for (long meta : tbl) {
+ while (meta != 0) {
+ long collision = collision(meta);
+
+ int idx = keyHash(meta) & newMask;
+
+ collision(meta, newTbl[idx]);
+
+ newTbl[idx] = meta;
+
+ meta = collision;
+ }
+ }
+
+ tbl = newTbl;
+ }
+
+ /**
+ * @return Keys count.
+ */
+ public int keys() {
+ return keys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int capacity() {
+ return tbl.length;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long meta(int idx) {
+ return tbl[idx];
+ }
+
+ /**
+ * Adder.
+ */
+ private class AdderImpl extends AdderBase {
+ /** */
+ private final Reader keyReader;
+
+ /**
+ * @param ctx Task context.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected AdderImpl(HadoopTaskContext ctx) throws IgniteCheckedException {
+ super(ctx);
+
+ keyReader = new Reader(keySer);
+ }
+
+ /**
+ * @param keyHash Key hash.
+ * @param keySize Key size.
+ * @param keyPtr Key pointer.
+ * @param valPtr Value page pointer.
+ * @param collisionPtr Pointer to meta with hash collision.
+ * @return Created meta page pointer.
+ */
+ private long createMeta(int keyHash, int keySize, long keyPtr, long valPtr, long collisionPtr) {
+ long meta = allocate(32);
+
+ mem.writeInt(meta, keyHash);
+ mem.writeInt(meta + 4, keySize);
+ mem.writeLong(meta + 8, keyPtr);
+ mem.writeLong(meta + 16, valPtr);
+ mem.writeLong(meta + 24, collisionPtr);
+
+ return meta;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Object key, Object val) throws IgniteCheckedException {
+ A.notNull(val, "val");
+
+ int keyHash = U.hash(key.hashCode());
+
+ // Write value.
+ long valPtr = write(12, val, valSer);
+ int valSize = writtenSize() - 12;
+
+ valueSize(valPtr, valSize);
+
+ // Find position in table.
+ int idx = keyHash & (tbl.length - 1);
+
+ long meta = tbl[idx];
+
+ // Search for our key in collisions.
+ while (meta != 0) {
+ if (keyHash(meta) == keyHash && key.equals(keyReader.readKey(meta))) { // Found key.
+ nextValue(valPtr, value(meta));
+
+ value(meta, valPtr);
+
+ return;
+ }
+
+ meta = collision(meta);
+ }
+
+ // Write key.
+ long keyPtr = write(0, key, keySer);
+ int keySize = writtenSize();
+
+ nextValue(valPtr, 0);
+
+ tbl[idx] = createMeta(keyHash, keySize, keyPtr, valPtr, tbl[idx]);
+
+ if (++keys > (tbl.length >>> 2) * 3)
+ rehash();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java
new file mode 100644
index 0000000..16aa673
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.offheap.unsafe.*;
+
+import java.util.*;
+
+/**
+ * Base class for hash multimaps.
+ */
+public abstract class HadoopHashMultimapBase extends HadoopMultimapBase {
+ /**
+ * @param jobInfo Job info.
+ * @param mem Memory.
+ */
+ protected HadoopHashMultimapBase(HadoopJobInfo jobInfo, GridUnsafeMemory mem) {
+ super(jobInfo, mem);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException {
+ throw new UnsupportedOperationException("visit");
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+ return new Input(taskCtx);
+ }
+
+ /**
+ * @return Hash table capacity.
+ */
+ public abstract int capacity();
+
+ /**
+ * @param idx Index in hash table.
+ * @return Meta page pointer.
+ */
+ protected abstract long meta(int idx);
+
+ /**
+ * @param meta Meta pointer.
+ * @return Key hash.
+ */
+ protected int keyHash(long meta) {
+ return mem.readInt(meta);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @return Key size.
+ */
+ protected int keySize(long meta) {
+ return mem.readInt(meta + 4);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @return Key pointer.
+ */
+ protected long key(long meta) {
+ return mem.readLong(meta + 8);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @return Value pointer.
+ */
+ protected long value(long meta) {
+ return mem.readLong(meta + 16);
+ }
+ /**
+ * @param meta Meta pointer.
+ * @param val Value pointer.
+ */
+ protected void value(long meta, long val) {
+ mem.writeLong(meta + 16, val);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @return Collision pointer.
+ */
+ protected long collision(long meta) {
+ return mem.readLong(meta + 24);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @param collision Collision pointer.
+ */
+ protected void collision(long meta, long collision) {
+ assert meta != collision : meta;
+
+ mem.writeLong(meta + 24, collision);
+ }
+
+ /**
+ * Reader for key and value.
+ */
+ protected class Reader extends ReaderBase {
+ /**
+ * @param ser Serialization.
+ */
+ protected Reader(HadoopSerialization ser) {
+ super(ser);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @return Key.
+ */
+ public Object readKey(long meta) {
+ assert meta > 0 : meta;
+
+ try {
+ return read(key(meta), keySize(meta));
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ }
+
+ /**
+ * Task input.
+ */
+ protected class Input implements HadoopTaskInput {
+ /** */
+ private int idx = -1;
+
+ /** */
+ private long metaPtr;
+
+ /** */
+ private final int cap;
+
+ /** */
+ private final Reader keyReader;
+
+ /** */
+ private final Reader valReader;
+
+ /**
+ * @param taskCtx Task context.
+ * @throws IgniteCheckedException If failed.
+ */
+ public Input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+ cap = capacity();
+
+ keyReader = new Reader(taskCtx.keySerialization());
+ valReader = new Reader(taskCtx.valueSerialization());
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean next() {
+ if (metaPtr != 0) {
+ metaPtr = collision(metaPtr);
+
+ if (metaPtr != 0)
+ return true;
+ }
+
+ while (++idx < cap) { // Scan table.
+ metaPtr = meta(idx);
+
+ if (metaPtr != 0)
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object key() {
+ return keyReader.readKey(metaPtr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<?> values() {
+ return new ValueIterator(value(metaPtr), valReader);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IgniteCheckedException {
+ keyReader.close();
+ valReader.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java
new file mode 100644
index 0000000..5def6d3
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Multimap for hadoop intermediate results.
+ */
+@SuppressWarnings("PublicInnerClass")
+public interface HadoopMultimap extends AutoCloseable {
+ /**
+ * Incrementally visits all the keys and values in the map.
+ *
+ * @param ignoreLastVisited Flag indicating that visiting must be started from the beginning.
+ * @param v Visitor.
+ * @return {@code false} If visiting was impossible.
+ */
+ public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException;
+
+ /**
+ * @param ctx Task context.
+ * @return Adder.
+ * @throws IgniteCheckedException If failed.
+ */
+ public Adder startAdding(HadoopTaskContext ctx) throws IgniteCheckedException;
+
+ /**
+ * @param taskCtx Task context.
+ * @return Task input.
+ * @throws IgniteCheckedException If failed.
+ */
+ public HadoopTaskInput input(HadoopTaskContext taskCtx)
+ throws IgniteCheckedException;
+
+ /** {@inheritDoc} */
+ @Override public void close();
+
+ /**
+ * Adder.
+ */
+ public interface Adder extends HadoopTaskOutput {
+ /**
+ * @param in Data input.
+ * @param reuse Reusable key.
+ * @return Key.
+ * @throws IgniteCheckedException If failed.
+ */
+ public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException;
+ }
+
+ /**
+ * Key add values to.
+ */
+ public interface Key {
+ /**
+ * @param val Value.
+ */
+ public void add(Value val);
+ }
+
+ /**
+ * Value.
+ */
+ public interface Value {
+ /**
+ * @return Size in bytes.
+ */
+ public int size();
+
+ /**
+ * @param ptr Pointer.
+ */
+ public void copyTo(long ptr);
+ }
+
+ /**
+ * Key and values visitor.
+ */
+ public interface Visitor {
+ /**
+ * @param keyPtr Key pointer.
+ * @param keySize Key size.
+ */
+ public void onKey(long keyPtr, int keySize) throws IgniteCheckedException;
+
+ /**
+ * @param valPtr Value pointer.
+ * @param valSize Value size.
+ */
+ public void onValue(long valPtr, int valSize) throws IgniteCheckedException;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
new file mode 100644
index 0000000..7f332aa
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.shuffle.streams.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.offheap.unsafe.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.*;
+
+/**
+ * Base class for all multimaps.
+ */
+public abstract class HadoopMultimapBase implements HadoopMultimap {
+ /** */
+ protected final GridUnsafeMemory mem;
+
+ /** */
+ protected final int pageSize;
+
+ /** */
+ private final Collection<GridLongList> allPages = new ConcurrentLinkedQueue<>();
+
+ /**
+ * @param jobInfo Job info.
+ * @param mem Memory.
+ */
+ protected HadoopMultimapBase(HadoopJobInfo jobInfo, GridUnsafeMemory mem) {
+ assert jobInfo != null;
+ assert mem != null;
+
+ this.mem = mem;
+
+ pageSize = get(jobInfo, SHUFFLE_OFFHEAP_PAGE_SIZE, 32 * 1024);
+ }
+
+ /**
+ * @param ptrs Page pointers.
+ */
+ private void deallocate(GridLongList ptrs) {
+ while (!ptrs.isEmpty())
+ mem.release(ptrs.remove(), ptrs.remove());
+ }
+
+ /**
+ * @param valPtr Value page pointer.
+ * @param nextValPtr Next value page pointer.
+ */
+ protected void nextValue(long valPtr, long nextValPtr) {
+ mem.writeLong(valPtr, nextValPtr);
+ }
+
+ /**
+ * @param valPtr Value page pointer.
+ * @return Next value page pointer.
+ */
+ protected long nextValue(long valPtr) {
+ return mem.readLong(valPtr);
+ }
+
+ /**
+ * @param valPtr Value page pointer.
+ * @param size Size.
+ */
+ protected void valueSize(long valPtr, int size) {
+ mem.writeInt(valPtr + 8, size);
+ }
+
+ /**
+ * @param valPtr Value page pointer.
+ * @return Value size.
+ */
+ protected int valueSize(long valPtr) {
+ return mem.readInt(valPtr + 8);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ for (GridLongList list : allPages)
+ deallocate(list);
+ }
+
+ /**
+ * Reader for key and value.
+ */
+ protected class ReaderBase implements AutoCloseable {
+ /** */
+ private Object tmp;
+
+ /** */
+ private final HadoopSerialization ser;
+
+ /** */
+ private final HadoopDataInStream in = new HadoopDataInStream(mem);
+
+ /**
+ * @param ser Serialization.
+ */
+ protected ReaderBase(HadoopSerialization ser) {
+ assert ser != null;
+
+ this.ser = ser;
+ }
+
+ /**
+ * @param valPtr Value page pointer.
+ * @return Value.
+ */
+ public Object readValue(long valPtr) {
+ assert valPtr > 0 : valPtr;
+
+ try {
+ return read(valPtr + 12, valueSize(valPtr));
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ * Resets temporary object to the given one.
+ *
+ * @param tmp Temporary object for reuse.
+ */
+ public void resetReusedObject(Object tmp) {
+ this.tmp = tmp;
+ }
+
+ /**
+ * @param ptr Pointer.
+ * @param size Object size.
+ * @return Object.
+ */
+ protected Object read(long ptr, long size) throws IgniteCheckedException {
+ in.buffer().set(ptr, size);
+
+ tmp = ser.read(in, tmp);
+
+ return tmp;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IgniteCheckedException {
+ ser.close();
+ }
+ }
+
+ /**
+ * Base class for adders.
+ */
+ protected abstract class AdderBase implements Adder {
+ /** */
+ protected final HadoopSerialization keySer;
+
+ /** */
+ protected final HadoopSerialization valSer;
+
+ /** */
+ private final HadoopDataOutStream out;
+
+ /** */
+ private long writeStart;
+
+ /** Size and pointer pairs list. */
+ private final GridLongList pages = new GridLongList(16);
+
+ /**
+ * @param ctx Task context.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected AdderBase(HadoopTaskContext ctx) throws IgniteCheckedException {
+ valSer = ctx.valueSerialization();
+ keySer = ctx.keySerialization();
+
+ out = new HadoopDataOutStream(mem) {
+ @Override public long move(long size) {
+ long ptr = super.move(size);
+
+ if (ptr == 0) // Was not able to move - not enough free space.
+ ptr = allocateNextPage(size);
+
+ assert ptr != 0;
+
+ return ptr;
+ }
+ };
+ }
+
+ /**
+ * @param requestedSize Requested size.
+ * @return Next write pointer.
+ */
+ private long allocateNextPage(long requestedSize) {
+ int writtenSize = writtenSize();
+
+ long newPageSize = Math.max(writtenSize + requestedSize, pageSize);
+ long newPagePtr = mem.allocate(newPageSize);
+
+ pages.add(newPageSize);
+ pages.add(newPagePtr);
+
+ HadoopOffheapBuffer b = out.buffer();
+
+ b.set(newPagePtr, newPageSize);
+
+ if (writtenSize != 0) {
+ mem.copyMemory(writeStart, newPagePtr, writtenSize);
+
+ b.move(writtenSize);
+ }
+
+ writeStart = newPagePtr;
+
+ return b.move(requestedSize);
+ }
+
+ /**
+ * @return Fixed pointer.
+ */
+ private long fixAlignment() {
+ HadoopOffheapBuffer b = out.buffer();
+
+ long ptr = b.pointer();
+
+ if ((ptr & 7L) != 0) { // Address is not aligned by octet.
+ ptr = (ptr + 8L) & ~7L;
+
+ b.pointer(ptr);
+ }
+
+ return ptr;
+ }
+
+ /**
+ * @param off Offset.
+ * @param o Object.
+ * @return Page pointer.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected long write(int off, Object o, HadoopSerialization ser) throws IgniteCheckedException {
+ writeStart = fixAlignment();
+
+ if (off != 0)
+ out.move(off);
+
+ ser.write(out, o);
+
+ return writeStart;
+ }
+
+ /**
+ * @param size Size.
+ * @return Pointer.
+ */
+ protected long allocate(int size) {
+ writeStart = fixAlignment();
+
+ out.move(size);
+
+ return writeStart;
+ }
+
+ /**
+ * Rewinds local allocation pointer to the given pointer if possible.
+ *
+ * @param ptr Pointer.
+ */
+ protected void localDeallocate(long ptr) {
+ HadoopOffheapBuffer b = out.buffer();
+
+ if (b.isInside(ptr))
+ b.pointer(ptr);
+ else
+ b.reset();
+ }
+
+ /**
+ * @return Written size.
+ */
+ protected int writtenSize() {
+ return (int)(out.buffer().pointer() - writeStart);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IgniteCheckedException {
+ allPages.add(pages);
+
+ keySer.close();
+ valSer.close();
+ }
+ }
+
+ /**
+ * Iterator over values.
+ */
+ protected class ValueIterator implements Iterator<Object> {
+ /** */
+ private long valPtr;
+
+ /** */
+ private final ReaderBase valReader;
+
+ /**
+ * @param valPtr Value page pointer.
+ * @param valReader Value reader.
+ */
+ protected ValueIterator(long valPtr, ReaderBase valReader) {
+ this.valPtr = valPtr;
+ this.valReader = valReader;
+ }
+
+ /**
+ * @param valPtr Head value pointer.
+ */
+ public void head(long valPtr) {
+ this.valPtr = valPtr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ return valPtr != 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object next() {
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ Object res = valReader.readValue(valPtr);
+
+ valPtr = nextValue(valPtr);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java
new file mode 100644
index 0000000..69aa7a7
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java
@@ -0,0 +1,726 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.offheap.unsafe.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Skip list.
+ */
+public class HadoopSkipList extends HadoopMultimapBase {
+ /** */
+ private static final int HEADS_SIZE = 24 + 33 * 8; // Offset + max level is from 0 to 32 inclusive.
+
+ /** Top level. */
+ private final AtomicInteger topLevel = new AtomicInteger(-1);
+
+ /** Heads for all the lists. */
+ private final long heads;
+
+ /** */
+ private final AtomicBoolean visitGuard = new AtomicBoolean();
+
+ /**
+ * @param jobInfo Job info.
+ * @param mem Memory.
+ */
+ public HadoopSkipList(HadoopJobInfo jobInfo, GridUnsafeMemory mem) {
+ super(jobInfo, mem);
+
+ heads = mem.allocate(HEADS_SIZE, true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ super.close();
+
+ mem.release(heads, HEADS_SIZE);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException {
+ if (!visitGuard.compareAndSet(false, true))
+ return false;
+
+ for (long meta = nextMeta(heads, 0); meta != 0L; meta = nextMeta(meta, 0)) {
+ long valPtr = value(meta);
+
+ long lastVisited = ignoreLastVisited ? 0 : lastVisitedValue(meta);
+
+ if (valPtr != lastVisited) {
+ long k = key(meta);
+
+ v.onKey(k + 4, keySize(k));
+
+ lastVisitedValue(meta, valPtr); // Set it to the first value in chain.
+
+ do {
+ v.onValue(valPtr + 12, valueSize(valPtr));
+
+ valPtr = nextValue(valPtr);
+ }
+ while (valPtr != lastVisited);
+ }
+ }
+
+ visitGuard.lazySet(false);
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Adder startAdding(HadoopTaskContext ctx) throws IgniteCheckedException {
+ return new AdderImpl(ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+ Input in = new Input(taskCtx);
+
+ Comparator<Object> grpCmp = taskCtx.groupComparator();
+
+ if (grpCmp != null)
+ return new GroupedInput(grpCmp, in);
+
+ return in;
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @return Key pointer.
+ */
+ private long key(long meta) {
+ return mem.readLong(meta);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @param key Key pointer.
+ */
+ private void key(long meta, long key) {
+ mem.writeLong(meta, key);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @return Value pointer.
+ */
+ private long value(long meta) {
+ return mem.readLongVolatile(meta + 8);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @param valPtr Value pointer.
+ */
+ private void value(long meta, long valPtr) {
+ mem.writeLongVolatile(meta + 8, valPtr);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @param oldValPtr Old first value pointer.
+ * @param newValPtr New first value pointer.
+ * @return {@code true} If operation succeeded.
+ */
+ private boolean casValue(long meta, long oldValPtr, long newValPtr) {
+ return mem.casLong(meta + 8, oldValPtr, newValPtr);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @return Last visited value pointer.
+ */
+ private long lastVisitedValue(long meta) {
+ return mem.readLong(meta + 16);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @param valPtr Last visited value pointer.
+ */
+ private void lastVisitedValue(long meta, long valPtr) {
+ mem.writeLong(meta + 16, valPtr);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @param level Level.
+ * @return Next meta pointer.
+ */
+ private long nextMeta(long meta, int level) {
+ assert meta > 0 : meta;
+
+ return mem.readLongVolatile(meta + 24 + 8 * level);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @param level Level.
+ * @param oldNext Old next meta pointer.
+ * @param newNext New next meta pointer.
+ * @return {@code true} If operation succeeded.
+ */
+ private boolean casNextMeta(long meta, int level, long oldNext, long newNext) {
+ assert meta > 0 : meta;
+
+ return mem.casLong(meta + 24 + 8 * level, oldNext, newNext);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @param level Level.
+ * @param nextMeta Next meta.
+ */
+ private void nextMeta(long meta, int level, long nextMeta) {
+ assert meta != 0;
+
+ mem.writeLong(meta + 24 + 8 * level, nextMeta);
+ }
+
+ /**
+ * @param keyPtr Key pointer.
+ * @return Key size.
+ */
+ private int keySize(long keyPtr) {
+ return mem.readInt(keyPtr);
+ }
+
+ /**
+ * @param keyPtr Key pointer.
+ * @param keySize Key size.
+ */
+ private void keySize(long keyPtr, int keySize) {
+ mem.writeInt(keyPtr, keySize);
+ }
+
+ /**
+ * @param rnd Random.
+ * @return Next level.
+ */
+ public static int randomLevel(Random rnd) {
+ int x = rnd.nextInt();
+
+ int level = 0;
+
+ while ((x & 1) != 0) { // Count sequential 1 bits.
+ level++;
+
+ x >>>= 1;
+ }
+
+ return level;
+ }
+
+ /**
+ * Reader.
+ */
+ private class Reader extends ReaderBase {
+ /**
+ * @param ser Serialization.
+ */
+ protected Reader(HadoopSerialization ser) {
+ super(ser);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @return Key.
+ */
+ public Object readKey(long meta) {
+ assert meta > 0 : meta;
+
+ long k = key(meta);
+
+ try {
+ return read(k + 4, keySize(k));
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ }
+
+ /**
+ * Adder.
+ */
+ private class AdderImpl extends AdderBase {
+ /** */
+ private final Comparator<Object> cmp;
+
+ /** */
+ private final Random rnd = new GridRandom();
+
+ /** */
+ private final GridLongList stack = new GridLongList(16);
+
+ /** */
+ private final Reader keyReader;
+
+ /**
+ * @param ctx Task context.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected AdderImpl(HadoopTaskContext ctx) throws IgniteCheckedException {
+ super(ctx);
+
+ keyReader = new Reader(keySer);
+
+ cmp = ctx.sortComparator();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Object key, Object val) throws IgniteCheckedException {
+ A.notNull(val, "val");
+
+ add(key, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException {
+ KeyImpl k = reuse == null ? new KeyImpl() : (KeyImpl)reuse;
+
+ k.tmpKey = keySer.read(in, k.tmpKey);
+
+ k.meta = add(k.tmpKey, null);
+
+ return k;
+ }
+
+ /**
+ * @param key Key.
+ * @param val Value.
+ * @param level Level.
+ * @return Meta pointer.
+ */
+ private long createMeta(long key, long val, int level) {
+ int size = 32 + 8 * level;
+
+ long meta = allocate(size);
+
+ key(meta, key);
+ value(meta, val);
+ lastVisitedValue(meta, 0L);
+
+ for (int i = 32; i < size; i += 8) // Fill with 0.
+ mem.writeLong(meta + i, 0L);
+
+ return meta;
+ }
+
+ /**
+ * @param key Key.
+ * @return Pointer.
+ * @throws IgniteCheckedException If failed.
+ */
+ private long writeKey(Object key) throws IgniteCheckedException {
+ long keyPtr = write(4, key, keySer);
+ int keySize = writtenSize() - 4;
+
+ keySize(keyPtr, keySize);
+
+ return keyPtr;
+ }
+
+ /**
+ * @param prevMeta Previous meta.
+ * @param meta Next meta.
+ */
+ private void stackPush(long prevMeta, long meta) {
+ stack.add(prevMeta);
+ stack.add(meta);
+ }
+
+ /**
+ * Drops last remembered frame from the stack.
+ */
+ private void stackPop() {
+ stack.pop(2);
+ }
+
+ /**
+ * @param key Key.
+ * @param val Value.
+ * @return Meta pointer.
+ * @throws IgniteCheckedException If failed.
+ */
+ private long add(Object key, @Nullable Object val) throws IgniteCheckedException {
+ assert key != null;
+
+ stack.clear();
+
+ long valPtr = 0;
+
+ if (val != null) { // Write value.
+ valPtr = write(12, val, valSer);
+ int valSize = writtenSize() - 12;
+
+ nextValue(valPtr, 0);
+ valueSize(valPtr, valSize);
+ }
+
+ long keyPtr = 0;
+ long newMeta = 0;
+ int newMetaLevel = -1;
+
+ long prevMeta = heads;
+ int level = topLevel.get();
+ long meta = level < 0 ? 0 : nextMeta(heads, level);
+
+ for (;;) {
+ if (level < 0) { // We did not find our key, trying to add new meta.
+ if (keyPtr == 0) { // Write key and create meta only once.
+ keyPtr = writeKey(key);
+
+ newMetaLevel = randomLevel(rnd);
+ newMeta = createMeta(keyPtr, valPtr, newMetaLevel);
+ }
+
+ nextMeta(newMeta, 0, meta); // Set next to new meta before publishing.
+
+ if (casNextMeta(prevMeta, 0, meta, newMeta)) { // New key was added successfully.
+ laceUp(key, newMeta, newMetaLevel);
+
+ return newMeta;
+ }
+ else { // Add failed, need to check out what was added by another thread.
+ meta = nextMeta(prevMeta, level = 0);
+
+ stackPop();
+ }
+ }
+
+ int cmpRes = cmp(key, meta);
+
+ if (cmpRes == 0) { // Key found.
+ if (newMeta != 0) // Deallocate if we've allocated something.
+ localDeallocate(keyPtr);
+
+ if (valPtr == 0) // Only key needs to be added.
+ return meta;
+
+ for (;;) { // Add value for the key found.
+ long nextVal = value(meta);
+
+ nextValue(valPtr, nextVal);
+
+ if (casValue(meta, nextVal, valPtr))
+ return meta;
+ }
+ }
+
+ assert cmpRes != 0;
+
+ if (cmpRes > 0) { // Go right.
+ prevMeta = meta;
+ meta = nextMeta(meta, level);
+
+ if (meta != 0) // If nothing to the right then go down.
+ continue;
+ }
+
+ while (--level >= 0) { // Go down.
+ stackPush(prevMeta, meta); // Remember the path.
+
+ long nextMeta = nextMeta(prevMeta, level);
+
+ if (nextMeta != meta) { // If the meta is the same as on upper level go deeper.
+ meta = nextMeta;
+
+ assert meta != 0;
+
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * @param key Key.
+ * @param meta Meta pointer.
+ * @return Comparison result.
+ */
+ @SuppressWarnings("unchecked")
+ private int cmp(Object key, long meta) {
+ assert meta != 0;
+
+ return cmp.compare(key, keyReader.readKey(meta));
+ }
+
+ /**
+ * Adds appropriate index links between metas.
+ *
+ * @param newMeta Just added meta.
+ * @param newMetaLevel New level.
+ */
+ private void laceUp(Object key, long newMeta, int newMetaLevel) {
+ for (int level = 1; level <= newMetaLevel; level++) { // Go from the bottom up.
+ long prevMeta = heads;
+ long meta = 0;
+
+ if (!stack.isEmpty()) { // Get the path back.
+ meta = stack.remove();
+ prevMeta = stack.remove();
+ }
+
+ for (;;) {
+ nextMeta(newMeta, level, meta);
+
+ if (casNextMeta(prevMeta, level, meta, newMeta))
+ break;
+
+ long oldMeta = meta;
+
+ meta = nextMeta(prevMeta, level); // Reread meta.
+
+ for (;;) {
+ int cmpRes = cmp(key, meta);
+
+ if (cmpRes > 0) { // Go right.
+ prevMeta = meta;
+ meta = nextMeta(prevMeta, level);
+
+ if (meta != oldMeta) // Old meta already known to be greater than ours or is 0.
+ continue;
+ }
+
+ assert cmpRes != 0; // Two different metas with equal keys must be impossible.
+
+ break; // Retry cas.
+ }
+ }
+ }
+
+ if (!stack.isEmpty())
+ return; // Our level already lower than top.
+
+ for (;;) { // Raise top level.
+ int top = topLevel.get();
+
+ if (newMetaLevel <= top || topLevel.compareAndSet(top, newMetaLevel))
+ break;
+ }
+ }
+
+ /**
+ * Key.
+ */
+ private class KeyImpl implements Key {
+ /** */
+ private long meta;
+
+ /** */
+ private Object tmpKey;
+
+ /**
+ * @return Meta pointer for the key.
+ */
+ public long address() {
+ return meta;
+ }
+
+ /**
+ * @param val Value.
+ */
+ @Override public void add(Value val) {
+ int size = val.size();
+
+ long valPtr = allocate(size + 12);
+
+ val.copyTo(valPtr + 12);
+
+ valueSize(valPtr, size);
+
+ long nextVal;
+
+ do {
+ nextVal = value(meta);
+
+ nextValue(valPtr, nextVal);
+ }
+ while(!casValue(meta, nextVal, valPtr));
+ }
+ }
+ }
+
+ /**
+ * Task input.
+ */
+ private class Input implements HadoopTaskInput {
+ /** */
+ private long metaPtr = heads;
+
+ /** */
+ private final Reader keyReader;
+
+ /** */
+ private final Reader valReader;
+
+ /**
+ * @param taskCtx Task context.
+ * @throws IgniteCheckedException If failed.
+ */
+ private Input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+ keyReader = new Reader(taskCtx.keySerialization());
+ valReader = new Reader(taskCtx.valueSerialization());
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean next() {
+ metaPtr = nextMeta(metaPtr, 0);
+
+ return metaPtr != 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object key() {
+ return keyReader.readKey(metaPtr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<?> values() {
+ return new ValueIterator(value(metaPtr), valReader);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IgniteCheckedException {
+ keyReader.close();
+ valReader.close();
+ }
+ }
+
+ /**
+ * Grouped input using grouping comparator.
+ */
+ private class GroupedInput implements HadoopTaskInput {
+ /** */
+ private final Comparator<Object> grpCmp;
+
+ /** */
+ private final Input in;
+
+ /** */
+ private Object prevKey;
+
+ /** */
+ private Object nextKey;
+
+ /** */
+ private final GridLongList vals = new GridLongList();
+
+ /**
+ * @param grpCmp Grouping comparator.
+ * @param in Input.
+ */
+ private GroupedInput(Comparator<Object> grpCmp, Input in) {
+ this.grpCmp = grpCmp;
+ this.in = in;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean next() {
+ if (prevKey == null) { // First call.
+ if (!in.next())
+ return false;
+
+ prevKey = in.key();
+
+ assert prevKey != null;
+
+ in.keyReader.resetReusedObject(null); // We need 2 instances of key object for comparison.
+
+ vals.add(value(in.metaPtr));
+ }
+ else {
+ if (in.metaPtr == 0) // We reached the end of the input.
+ return false;
+
+ vals.clear();
+
+ vals.add(value(in.metaPtr));
+
+ in.keyReader.resetReusedObject(prevKey); // Switch key instances.
+
+ prevKey = nextKey;
+ }
+
+ while (in.next()) { // Fill with head value pointers with equal keys.
+ if (grpCmp.compare(prevKey, nextKey = in.key()) == 0)
+ vals.add(value(in.metaPtr));
+ else
+ break;
+ }
+
+ assert !vals.isEmpty();
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object key() {
+ return prevKey;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<?> values() {
+ assert !vals.isEmpty();
+
+ final ValueIterator valIter = new ValueIterator(vals.get(0), in.valReader);
+
+ return new Iterator<Object>() {
+ /** */
+ private int idx;
+
+ @Override public boolean hasNext() {
+ if (!valIter.hasNext()) {
+ if (++idx == vals.size())
+ return false;
+
+ valIter.head(vals.get(idx));
+
+ assert valIter.hasNext();
+ }
+
+ return true;
+ }
+
+ @Override public Object next() {
+ return valIter.next();
+ }
+
+ @Override public void remove() {
+ valIter.remove();
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IgniteCheckedException {
+ in.close();
+ }
+ }
+}