You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/19 10:51:12 UTC
[42/51] [partial] ignite git commit: IGNITE-3916: Created separate
module.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
new file mode 100644
index 0000000..39b7c51
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
+
+import java.io.DataInput;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataInStream;
+import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataOutStream;
+import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopOffheapBuffer;
+import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_OFFHEAP_PAGE_SIZE;
+import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get;
+
+/**
+ * Base class for all multimaps.
+ */
+public abstract class HadoopMultimapBase implements HadoopMultimap {
+ /** */
+ protected final GridUnsafeMemory mem;
+
+ /** */
+ protected final int pageSize;
+
+ /** */
+ private final Collection<Page> allPages = new ConcurrentLinkedQueue<>();
+
+ /**
+ * @param jobInfo Job info.
+ * @param mem Memory.
+ */
+ protected HadoopMultimapBase(HadoopJobInfo jobInfo, GridUnsafeMemory mem) {
+ assert jobInfo != null;
+ assert mem != null;
+
+ this.mem = mem;
+
+ pageSize = get(jobInfo, SHUFFLE_OFFHEAP_PAGE_SIZE, 32 * 1024);
+ }
+
+ /**
+ * @param page Page.
+ */
+ private void deallocate(Page page) {
+ assert page != null;
+
+ mem.release(page.ptr, page.size);
+ }
+
+ /**
+ * @param valPtr Value page pointer.
+ * @param nextValPtr Next value page pointer.
+ */
+ protected void nextValue(long valPtr, long nextValPtr) {
+ mem.writeLong(valPtr, nextValPtr);
+ }
+
+ /**
+ * @param valPtr Value page pointer.
+ * @return Next value page pointer.
+ */
+ protected long nextValue(long valPtr) {
+ return mem.readLong(valPtr);
+ }
+
+ /**
+ * @param valPtr Value page pointer.
+ * @param size Size.
+ */
+ protected void valueSize(long valPtr, int size) {
+ mem.writeInt(valPtr + 8, size);
+ }
+
+ /**
+ * @param valPtr Value page pointer.
+ * @return Value size.
+ */
+ protected int valueSize(long valPtr) {
+ return mem.readInt(valPtr + 8);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ for (Page page : allPages)
+ deallocate(page);
+ }
+
+ /**
+ * Reader for key and value.
+ */
+ protected class ReaderBase implements AutoCloseable {
+ /** */
+ private Object tmp;
+
+ /** */
+ private final HadoopSerialization ser;
+
+ /** */
+ private final HadoopDataInStream in = new HadoopDataInStream(mem);
+
+ /**
+ * @param ser Serialization.
+ */
+ protected ReaderBase(HadoopSerialization ser) {
+ assert ser != null;
+
+ this.ser = ser;
+ }
+
+ /**
+ * @param valPtr Value page pointer.
+ * @return Value.
+ */
+ public Object readValue(long valPtr) {
+ assert valPtr > 0 : valPtr;
+
+ try {
+ return read(valPtr + 12, valueSize(valPtr));
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ * Resets temporary object to the given one.
+ *
+ * @param tmp Temporary object for reuse.
+ */
+ public void resetReusedObject(Object tmp) {
+ this.tmp = tmp;
+ }
+
+ /**
+ * @param ptr Pointer.
+ * @param size Object size.
+ * @return Object.
+ */
+ protected Object read(long ptr, long size) throws IgniteCheckedException {
+ in.buffer().set(ptr, size);
+
+ tmp = ser.read(in, tmp);
+
+ return tmp;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IgniteCheckedException {
+ ser.close();
+ }
+ }
+
+ /**
+ * Base class for adders.
+ */
+ protected abstract class AdderBase implements Adder {
+ /** */
+ protected final HadoopSerialization keySer;
+
+ /** */
+ protected final HadoopSerialization valSer;
+
+ /** */
+ private final HadoopDataOutStream out;
+
+ /** */
+ private long writeStart;
+
+ /** Current page. */
+ private Page curPage;
+
+ /**
+ * @param ctx Task context.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected AdderBase(HadoopTaskContext ctx) throws IgniteCheckedException {
+ valSer = ctx.valueSerialization();
+ keySer = ctx.keySerialization();
+
+ out = new HadoopDataOutStream(mem) {
+ @Override public long move(long size) {
+ long ptr = super.move(size);
+
+ if (ptr == 0) // Was not able to move - not enough free space.
+ ptr = allocateNextPage(size);
+
+ assert ptr != 0;
+
+ return ptr;
+ }
+ };
+ }
+
+ /**
+ * @param requestedSize Requested size.
+ * @return Next write pointer.
+ */
+ private long allocateNextPage(long requestedSize) {
+ int writtenSize = writtenSize();
+
+ long newPageSize = nextPageSize(writtenSize + requestedSize);
+ long newPagePtr = mem.allocate(newPageSize);
+
+ HadoopOffheapBuffer b = out.buffer();
+
+ b.set(newPagePtr, newPageSize);
+
+ if (writtenSize != 0) {
+ mem.copyMemory(writeStart, newPagePtr, writtenSize);
+
+ b.move(writtenSize);
+ }
+
+ writeStart = newPagePtr;
+
+ // At this point old page is not needed, so we release it.
+ Page oldPage = curPage;
+
+ curPage = new Page(newPagePtr, newPageSize);
+
+ if (oldPage != null)
+ allPages.add(oldPage);
+
+ return b.move(requestedSize);
+ }
+
+ /**
+ * Get next page size.
+ *
+ * @param required Required amount of data.
+ * @return Next page size.
+ */
+ private long nextPageSize(long required) {
+ long pages = (required / pageSize) + 1;
+
+ long pagesPow2 = nextPowerOfTwo(pages);
+
+ return pagesPow2 * pageSize;
+ }
+
+ /**
+ * Get next power of two which greater or equal to the given number. Naive implementation.
+ *
+ * @param val Number
+ * @return Nearest pow2.
+ */
+ private long nextPowerOfTwo(long val) {
+ long res = 1;
+
+ while (res < val)
+ res = res << 1;
+
+ if (res < 0)
+ throw new IllegalArgumentException("Value is too big to find positive pow2: " + val);
+
+ return res;
+ }
+
+ /**
+ * @return Fixed pointer.
+ */
+ private long fixAlignment() {
+ HadoopOffheapBuffer b = out.buffer();
+
+ long ptr = b.pointer();
+
+ if ((ptr & 7L) != 0) { // Address is not aligned by octet.
+ ptr = (ptr + 8L) & ~7L;
+
+ b.pointer(ptr);
+ }
+
+ return ptr;
+ }
+
+ /**
+ * @param off Offset.
+ * @param o Object.
+ * @return Page pointer.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected long write(int off, Object o, HadoopSerialization ser) throws IgniteCheckedException {
+ writeStart = fixAlignment();
+
+ if (off != 0)
+ out.move(off);
+
+ ser.write(out, o);
+
+ return writeStart;
+ }
+
+ /**
+ * @param size Size.
+ * @return Pointer.
+ */
+ protected long allocate(int size) {
+ writeStart = fixAlignment();
+
+ out.move(size);
+
+ return writeStart;
+ }
+
+ /**
+ * Rewinds local allocation pointer to the given pointer if possible.
+ *
+ * @param ptr Pointer.
+ */
+ protected void localDeallocate(long ptr) {
+ HadoopOffheapBuffer b = out.buffer();
+
+ if (b.isInside(ptr))
+ b.pointer(ptr);
+ else
+ b.reset();
+ }
+
+ /**
+ * @return Written size.
+ */
+ protected int writtenSize() {
+ return (int)(out.buffer().pointer() - writeStart);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IgniteCheckedException {
+ if (curPage != null)
+ allPages.add(curPage);
+
+ keySer.close();
+ valSer.close();
+ }
+ }
+
+ /**
+ * Iterator over values.
+ */
+ protected class ValueIterator implements Iterator<Object> {
+ /** */
+ private long valPtr;
+
+ /** */
+ private final ReaderBase valReader;
+
+ /**
+ * @param valPtr Value page pointer.
+ * @param valReader Value reader.
+ */
+ protected ValueIterator(long valPtr, ReaderBase valReader) {
+ this.valPtr = valPtr;
+ this.valReader = valReader;
+ }
+
+ /**
+ * @param valPtr Head value pointer.
+ */
+ public void head(long valPtr) {
+ this.valPtr = valPtr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ return valPtr != 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object next() {
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ Object res = valReader.readValue(valPtr);
+
+ valPtr = nextValue(valPtr);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ /**
+ * Page.
+ */
+ private static class Page {
+ /** Pointer. */
+ private final long ptr;
+
+ /** Size. */
+ private final long size;
+
+ /**
+ * Constructor.
+ *
+ * @param ptr Pointer.
+ * @param size Size.
+ */
+ public Page(long ptr, long size) {
+ this.ptr = ptr;
+ this.size = size;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java
new file mode 100644
index 0000000..7db88bc
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java
@@ -0,0 +1,733 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
+
+import java.io.DataInput;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.GridRandom;
+import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Skip list.
+ */
+public class HadoopSkipList extends HadoopMultimapBase {
+ /** */
+ private static final int HEADS_SIZE = 24 + 33 * 8; // Offset + max level is from 0 to 32 inclusive.
+
+ /** Top level. */
+ private final AtomicInteger topLevel = new AtomicInteger(-1);
+
+ /** Heads for all the lists. */
+ private final long heads;
+
+ /** */
+ private final AtomicBoolean visitGuard = new AtomicBoolean();
+
+ /**
+ * @param jobInfo Job info.
+ * @param mem Memory.
+ */
+ public HadoopSkipList(HadoopJobInfo jobInfo, GridUnsafeMemory mem) {
+ super(jobInfo, mem);
+
+ heads = mem.allocate(HEADS_SIZE, true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ super.close();
+
+ mem.release(heads, HEADS_SIZE);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException {
+ if (!visitGuard.compareAndSet(false, true))
+ return false;
+
+ for (long meta = nextMeta(heads, 0); meta != 0L; meta = nextMeta(meta, 0)) {
+ long valPtr = value(meta);
+
+ long lastVisited = ignoreLastVisited ? 0 : lastVisitedValue(meta);
+
+ if (valPtr != lastVisited) {
+ long k = key(meta);
+
+ v.onKey(k + 4, keySize(k));
+
+ lastVisitedValue(meta, valPtr); // Set it to the first value in chain.
+
+ do {
+ v.onValue(valPtr + 12, valueSize(valPtr));
+
+ valPtr = nextValue(valPtr);
+ }
+ while (valPtr != lastVisited);
+ }
+ }
+
+ visitGuard.lazySet(false);
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Adder startAdding(HadoopTaskContext ctx) throws IgniteCheckedException {
+ return new AdderImpl(ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+ Input in = new Input(taskCtx);
+
+ Comparator<Object> grpCmp = taskCtx.groupComparator();
+
+ if (grpCmp != null)
+ return new GroupedInput(grpCmp, in);
+
+ return in;
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @return Key pointer.
+ */
+ private long key(long meta) {
+ return mem.readLong(meta);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @param key Key pointer.
+ */
+ private void key(long meta, long key) {
+ mem.writeLong(meta, key);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @return Value pointer.
+ */
+ private long value(long meta) {
+ return mem.readLongVolatile(meta + 8);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @param valPtr Value pointer.
+ */
+ private void value(long meta, long valPtr) {
+ mem.writeLongVolatile(meta + 8, valPtr);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @param oldValPtr Old first value pointer.
+ * @param newValPtr New first value pointer.
+ * @return {@code true} If operation succeeded.
+ */
+ private boolean casValue(long meta, long oldValPtr, long newValPtr) {
+ return mem.casLong(meta + 8, oldValPtr, newValPtr);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @return Last visited value pointer.
+ */
+ private long lastVisitedValue(long meta) {
+ return mem.readLong(meta + 16);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @param valPtr Last visited value pointer.
+ */
+ private void lastVisitedValue(long meta, long valPtr) {
+ mem.writeLong(meta + 16, valPtr);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @param level Level.
+ * @return Next meta pointer.
+ */
+ private long nextMeta(long meta, int level) {
+ assert meta > 0 : meta;
+
+ return mem.readLongVolatile(meta + 24 + 8 * level);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @param level Level.
+ * @param oldNext Old next meta pointer.
+ * @param newNext New next meta pointer.
+ * @return {@code true} If operation succeeded.
+ */
+ private boolean casNextMeta(long meta, int level, long oldNext, long newNext) {
+ assert meta > 0 : meta;
+
+ return mem.casLong(meta + 24 + 8 * level, oldNext, newNext);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @param level Level.
+ * @param nextMeta Next meta.
+ */
+ private void nextMeta(long meta, int level, long nextMeta) {
+ assert meta != 0;
+
+ mem.writeLong(meta + 24 + 8 * level, nextMeta);
+ }
+
+ /**
+ * @param keyPtr Key pointer.
+ * @return Key size.
+ */
+ private int keySize(long keyPtr) {
+ return mem.readInt(keyPtr);
+ }
+
+ /**
+ * @param keyPtr Key pointer.
+ * @param keySize Key size.
+ */
+ private void keySize(long keyPtr, int keySize) {
+ mem.writeInt(keyPtr, keySize);
+ }
+
+ /**
+ * @param rnd Random.
+ * @return Next level.
+ */
+ public static int randomLevel(Random rnd) {
+ int x = rnd.nextInt();
+
+ int level = 0;
+
+ while ((x & 1) != 0) { // Count sequential 1 bits.
+ level++;
+
+ x >>>= 1;
+ }
+
+ return level;
+ }
+
+ /**
+ * Reader.
+ */
+ private class Reader extends ReaderBase {
+ /**
+ * @param ser Serialization.
+ */
+ protected Reader(HadoopSerialization ser) {
+ super(ser);
+ }
+
+ /**
+ * @param meta Meta pointer.
+ * @return Key.
+ */
+ public Object readKey(long meta) {
+ assert meta > 0 : meta;
+
+ long k = key(meta);
+
+ try {
+ return read(k + 4, keySize(k));
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ }
+
+ /**
+ * Adder.
+ */
+ private class AdderImpl extends AdderBase {
+ /** */
+ private final Comparator<Object> cmp;
+
+ /** */
+ private final Random rnd = new GridRandom();
+
+ /** */
+ private final GridLongList stack = new GridLongList(16);
+
+ /** */
+ private final Reader keyReader;
+
+ /**
+ * @param ctx Task context.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected AdderImpl(HadoopTaskContext ctx) throws IgniteCheckedException {
+ super(ctx);
+
+ keyReader = new Reader(keySer);
+
+ cmp = ctx.sortComparator();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Object key, Object val) throws IgniteCheckedException {
+ A.notNull(val, "val");
+
+ add(key, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException {
+ KeyImpl k = reuse == null ? new KeyImpl() : (KeyImpl)reuse;
+
+ k.tmpKey = keySer.read(in, k.tmpKey);
+
+ k.meta = add(k.tmpKey, null);
+
+ return k;
+ }
+
+ /**
+ * @param key Key.
+ * @param val Value.
+ * @param level Level.
+ * @return Meta pointer.
+ */
+ private long createMeta(long key, long val, int level) {
+ int size = 32 + 8 * level;
+
+ long meta = allocate(size);
+
+ key(meta, key);
+ value(meta, val);
+ lastVisitedValue(meta, 0L);
+
+ for (int i = 32; i < size; i += 8) // Fill with 0.
+ mem.writeLong(meta + i, 0L);
+
+ return meta;
+ }
+
+ /**
+ * @param key Key.
+ * @return Pointer.
+ * @throws IgniteCheckedException If failed.
+ */
+ private long writeKey(Object key) throws IgniteCheckedException {
+ long keyPtr = write(4, key, keySer);
+ int keySize = writtenSize() - 4;
+
+ keySize(keyPtr, keySize);
+
+ return keyPtr;
+ }
+
+ /**
+ * @param prevMeta Previous meta.
+ * @param meta Next meta.
+ */
+ private void stackPush(long prevMeta, long meta) {
+ stack.add(prevMeta);
+ stack.add(meta);
+ }
+
+ /**
+ * Drops last remembered frame from the stack.
+ */
+ private void stackPop() {
+ stack.pop(2);
+ }
+
+ /**
+ * @param key Key.
+ * @param val Value.
+ * @return Meta pointer.
+ * @throws IgniteCheckedException If failed.
+ */
+ private long add(Object key, @Nullable Object val) throws IgniteCheckedException {
+ assert key != null;
+
+ stack.clear();
+
+ long valPtr = 0;
+
+ if (val != null) { // Write value.
+ valPtr = write(12, val, valSer);
+ int valSize = writtenSize() - 12;
+
+ nextValue(valPtr, 0);
+ valueSize(valPtr, valSize);
+ }
+
+ long keyPtr = 0;
+ long newMeta = 0;
+ int newMetaLevel = -1;
+
+ long prevMeta = heads;
+ int level = topLevel.get();
+ long meta = level < 0 ? 0 : nextMeta(heads, level);
+
+ for (;;) {
+ if (level < 0) { // We did not find our key, trying to add new meta.
+ if (keyPtr == 0) { // Write key and create meta only once.
+ keyPtr = writeKey(key);
+
+ newMetaLevel = randomLevel(rnd);
+ newMeta = createMeta(keyPtr, valPtr, newMetaLevel);
+ }
+
+ nextMeta(newMeta, 0, meta); // Set next to new meta before publishing.
+
+ if (casNextMeta(prevMeta, 0, meta, newMeta)) { // New key was added successfully.
+ laceUp(key, newMeta, newMetaLevel);
+
+ return newMeta;
+ }
+ else { // Add failed, need to check out what was added by another thread.
+ meta = nextMeta(prevMeta, level = 0);
+
+ stackPop();
+ }
+ }
+
+ int cmpRes = cmp(key, meta);
+
+ if (cmpRes == 0) { // Key found.
+ if (newMeta != 0) // Deallocate if we've allocated something.
+ localDeallocate(keyPtr);
+
+ if (valPtr == 0) // Only key needs to be added.
+ return meta;
+
+ for (;;) { // Add value for the key found.
+ long nextVal = value(meta);
+
+ nextValue(valPtr, nextVal);
+
+ if (casValue(meta, nextVal, valPtr))
+ return meta;
+ }
+ }
+
+ assert cmpRes != 0;
+
+ if (cmpRes > 0) { // Go right.
+ prevMeta = meta;
+ meta = nextMeta(meta, level);
+
+ if (meta != 0) // If nothing to the right then go down.
+ continue;
+ }
+
+ while (--level >= 0) { // Go down.
+ stackPush(prevMeta, meta); // Remember the path.
+
+ long nextMeta = nextMeta(prevMeta, level);
+
+ if (nextMeta != meta) { // If the meta is the same as on upper level go deeper.
+ meta = nextMeta;
+
+ assert meta != 0;
+
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * @param key Key.
+ * @param meta Meta pointer.
+ * @return Comparison result.
+ */
+ @SuppressWarnings("unchecked")
+ private int cmp(Object key, long meta) {
+ assert meta != 0;
+
+ return cmp.compare(key, keyReader.readKey(meta));
+ }
+
+ /**
+ * Adds appropriate index links between metas.
+ *
+ * @param newMeta Just added meta.
+ * @param newMetaLevel New level.
+ */
+ private void laceUp(Object key, long newMeta, int newMetaLevel) {
+ for (int level = 1; level <= newMetaLevel; level++) { // Go from the bottom up.
+ long prevMeta = heads;
+ long meta = 0;
+
+ if (!stack.isEmpty()) { // Get the path back.
+ meta = stack.remove();
+ prevMeta = stack.remove();
+ }
+
+ for (;;) {
+ nextMeta(newMeta, level, meta);
+
+ if (casNextMeta(prevMeta, level, meta, newMeta))
+ break;
+
+ long oldMeta = meta;
+
+ meta = nextMeta(prevMeta, level); // Reread meta.
+
+ for (;;) {
+ int cmpRes = cmp(key, meta);
+
+ if (cmpRes > 0) { // Go right.
+ prevMeta = meta;
+ meta = nextMeta(prevMeta, level);
+
+ if (meta != oldMeta) // Old meta already known to be greater than ours or is 0.
+ continue;
+ }
+
+ assert cmpRes != 0; // Two different metas with equal keys must be impossible.
+
+ break; // Retry cas.
+ }
+ }
+ }
+
+ if (!stack.isEmpty())
+ return; // Our level already lower than top.
+
+ for (;;) { // Raise top level.
+ int top = topLevel.get();
+
+ if (newMetaLevel <= top || topLevel.compareAndSet(top, newMetaLevel))
+ break;
+ }
+ }
+
+ /**
+ * Key.
+ */
+ private class KeyImpl implements Key {
+ /** */
+ private long meta;
+
+ /** */
+ private Object tmpKey;
+
+ /**
+ * @return Meta pointer for the key.
+ */
+ public long address() {
+ return meta;
+ }
+
+ /**
+ * @param val Value.
+ */
+ @Override public void add(Value val) {
+ int size = val.size();
+
+ long valPtr = allocate(size + 12);
+
+ val.copyTo(valPtr + 12);
+
+ valueSize(valPtr, size);
+
+ long nextVal;
+
+ do {
+ nextVal = value(meta);
+
+ nextValue(valPtr, nextVal);
+ }
+ while(!casValue(meta, nextVal, valPtr));
+ }
+ }
+ }
+
+ /**
+ * Task input.
+ */
+ private class Input implements HadoopTaskInput {
+ /** */
+ private long metaPtr = heads;
+
+ /** */
+ private final Reader keyReader;
+
+ /** */
+ private final Reader valReader;
+
+ /**
+ * @param taskCtx Task context.
+ * @throws IgniteCheckedException If failed.
+ */
+ private Input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+ keyReader = new Reader(taskCtx.keySerialization());
+ valReader = new Reader(taskCtx.valueSerialization());
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean next() {
+ metaPtr = nextMeta(metaPtr, 0);
+
+ return metaPtr != 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object key() {
+ return keyReader.readKey(metaPtr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<?> values() {
+ return new ValueIterator(value(metaPtr), valReader);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IgniteCheckedException {
+ keyReader.close();
+ valReader.close();
+ }
+ }
+
+ /**
+ * Grouped input using grouping comparator.
+ */
+ private class GroupedInput implements HadoopTaskInput {
+ /** */
+ private final Comparator<Object> grpCmp;
+
+ /** */
+ private final Input in;
+
+ /** */
+ private Object prevKey;
+
+ /** */
+ private Object nextKey;
+
+ /** */
+ private final GridLongList vals = new GridLongList();
+
+ /**
+ * @param grpCmp Grouping comparator.
+ * @param in Input.
+ */
+ private GroupedInput(Comparator<Object> grpCmp, Input in) {
+ this.grpCmp = grpCmp;
+ this.in = in;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean next() {
+ if (prevKey == null) { // First call.
+ if (!in.next())
+ return false;
+
+ prevKey = in.key();
+
+ assert prevKey != null;
+
+ in.keyReader.resetReusedObject(null); // We need 2 instances of key object for comparison.
+
+ vals.add(value(in.metaPtr));
+ }
+ else {
+ if (in.metaPtr == 0) // We reached the end of the input.
+ return false;
+
+ vals.clear();
+
+ vals.add(value(in.metaPtr));
+
+ in.keyReader.resetReusedObject(prevKey); // Switch key instances.
+
+ prevKey = nextKey;
+ }
+
+ while (in.next()) { // Fill with head value pointers with equal keys.
+ if (grpCmp.compare(prevKey, nextKey = in.key()) == 0)
+ vals.add(value(in.metaPtr));
+ else
+ break;
+ }
+
+ assert !vals.isEmpty();
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object key() {
+ return prevKey;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<?> values() {
+ assert !vals.isEmpty();
+
+ final ValueIterator valIter = new ValueIterator(vals.get(0), in.valReader);
+
+ return new Iterator<Object>() {
+ /** */
+ private int idx;
+
+ @Override public boolean hasNext() {
+ if (!valIter.hasNext()) {
+ if (++idx == vals.size())
+ return false;
+
+ valIter.head(vals.get(idx));
+
+ assert valIter.hasNext();
+ }
+
+ return true;
+ }
+
+ @Override public Object next() {
+ return valIter.next();
+ }
+
+ @Override public void remove() {
+ valIter.remove();
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IgniteCheckedException {
+ in.close();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java
new file mode 100644
index 0000000..3b5fa15
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.streams;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+
+/**
+ * Data input stream.
+ */
+public class HadoopDataInStream extends InputStream implements DataInput {
+ /** */
+ private final HadoopOffheapBuffer buf = new HadoopOffheapBuffer(0, 0);
+
+ /** */
+ private final GridUnsafeMemory mem;
+
+ /**
+ * @param mem Memory.
+ */
+ public HadoopDataInStream(GridUnsafeMemory mem) {
+ assert mem != null;
+
+ this.mem = mem;
+ }
+
+ /**
+ * @return Buffer.
+ */
+ public HadoopOffheapBuffer buffer() {
+ return buf;
+ }
+
+ /**
+ * @param size Size.
+ * @return Old pointer.
+ */
+ protected long move(long size) throws IOException {
+ long ptr = buf.move(size);
+
+ assert ptr != 0;
+
+ return ptr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read() throws IOException {
+ return readUnsignedByte();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read(byte[] b, int off, int len) throws IOException {
+ readFully(b, off, len);
+
+ return len;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long skip(long n) throws IOException {
+ move(n);
+
+ return n;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readFully(byte[] b) throws IOException {
+ readFully(b, 0, b.length);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readFully(byte[] b, int off, int len) throws IOException {
+ mem.readBytes(move(len), b, off, len);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int skipBytes(int n) throws IOException {
+ move(n);
+
+ return n;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readBoolean() throws IOException {
+ byte res = readByte();
+
+ if (res == 1)
+ return true;
+
+ assert res == 0 : res;
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte readByte() throws IOException {
+ return mem.readByte(move(1));
+ }
+
+ /** {@inheritDoc} */
+ @Override public int readUnsignedByte() throws IOException {
+ return readByte() & 0xff;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short readShort() throws IOException {
+ return mem.readShort(move(2));
+ }
+
+ /** {@inheritDoc} */
+ @Override public int readUnsignedShort() throws IOException {
+ return readShort() & 0xffff;
+ }
+
+ /** {@inheritDoc} */
+ @Override public char readChar() throws IOException {
+ return (char)readShort();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int readInt() throws IOException {
+ return mem.readInt(move(4));
+ }
+
+ /** {@inheritDoc} */
+ @Override public long readLong() throws IOException {
+ return mem.readLong(move(8));
+ }
+
+ /** {@inheritDoc} */
+ @Override public float readFloat() throws IOException {
+ return mem.readFloat(move(4));
+ }
+
+ /** {@inheritDoc} */
+ @Override public double readDouble() throws IOException {
+ return mem.readDouble(move(8));
+ }
+
+ /** {@inheritDoc} */
+ @Override public String readLine() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String readUTF() throws IOException {
+ byte[] bytes = new byte[readInt()];
+
+ if (bytes.length != 0)
+ readFully(bytes);
+
+ return new String(bytes, StandardCharsets.UTF_8);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataOutStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataOutStream.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataOutStream.java
new file mode 100644
index 0000000..f7b1a73
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataOutStream.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.streams;
+
+import java.io.DataOutput;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+
+/**
+ * Data output stream.
+ */
+public class HadoopDataOutStream extends OutputStream implements DataOutput {
+ /** */
+ private final HadoopOffheapBuffer buf = new HadoopOffheapBuffer(0, 0);
+
+ /** */
+ private final GridUnsafeMemory mem;
+
+ /**
+ * @param mem Memory.
+ */
+ public HadoopDataOutStream(GridUnsafeMemory mem) {
+ this.mem = mem;
+ }
+
+ /**
+ * @return Buffer.
+ */
+ public HadoopOffheapBuffer buffer() {
+ return buf;
+ }
+
+ /**
+ * @param size Size.
+ * @return Old pointer or {@code 0} if move was impossible.
+ */
+ public long move(long size) {
+ return buf.move(size);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(int b) {
+ writeByte(b);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(byte[] b) {
+ write(b, 0, b.length);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(byte[] b, int off, int len) {
+ GridUnsafe.copyMemory(b, GridUnsafe.BYTE_ARR_OFF + off, null, move(len), len);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBoolean(boolean v) {
+ writeByte(v ? 1 : 0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeByte(int v) {
+ mem.writeByte(move(1), (byte)v);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeShort(int v) {
+ mem.writeShort(move(2), (short)v);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeChar(int v) {
+ writeShort(v);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeInt(int v) {
+ mem.writeInt(move(4), v);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeLong(long v) {
+ mem.writeLong(move(8), v);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeFloat(float v) {
+ mem.writeFloat(move(4), v);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeDouble(double v) {
+ mem.writeDouble(move(8), v);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBytes(String s) {
+ writeUTF(s);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeChars(String s) {
+ writeUTF(s);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeUTF(String s) {
+ byte[] b = s.getBytes(StandardCharsets.UTF_8);
+
+ writeInt(b.length);
+ write(b);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java
new file mode 100644
index 0000000..acc9be6
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.streams;
+
+/**
+ * Offheap buffer.
+ */
+public class HadoopOffheapBuffer {
+ /** Buffer begin address. */
+ private long bufPtr;
+
+ /** The first address we do not own. */
+ private long bufEnd;
+
+ /** Current read or write pointer. */
+ private long posPtr;
+
+ /**
+ * @param bufPtr Pointer to buffer begin.
+ * @param bufSize Size of the buffer.
+ */
+ public HadoopOffheapBuffer(long bufPtr, long bufSize) {
+ set(bufPtr, bufSize);
+ }
+
+ /**
+ * @param bufPtr Pointer to buffer begin.
+ * @param bufSize Size of the buffer.
+ */
+ public void set(long bufPtr, long bufSize) {
+ this.bufPtr = bufPtr;
+
+ posPtr = bufPtr;
+ bufEnd = bufPtr + bufSize;
+ }
+
+ /**
+ * @return Pointer to internal buffer begin.
+ */
+ public long begin() {
+ return bufPtr;
+ }
+
+ /**
+ * @return Buffer capacity.
+ */
+ public long capacity() {
+ return bufEnd - bufPtr;
+ }
+
+ /**
+ * @return Remaining capacity.
+ */
+ public long remaining() {
+ return bufEnd - posPtr;
+ }
+
+ /**
+ * @return Absolute pointer to the current position inside of the buffer.
+ */
+ public long pointer() {
+ return posPtr;
+ }
+
+ /**
+ * @param ptr Absolute pointer to the current position inside of the buffer.
+ */
+ public void pointer(long ptr) {
+ assert ptr >= bufPtr : bufPtr + " <= " + ptr;
+ assert ptr <= bufEnd : bufEnd + " <= " + bufPtr;
+
+ posPtr = ptr;
+ }
+
+ /**
+ * @param size Size move on.
+ * @return Old position pointer or {@code 0} if move goes beyond the end of the buffer.
+ */
+ public long move(long size) {
+ assert size > 0 : size;
+
+ long oldPos = posPtr;
+ long newPos = oldPos + size;
+
+ if (newPos > bufEnd)
+ return 0;
+
+ posPtr = newPos;
+
+ return oldPos;
+ }
+
+ /**
+ * @param ptr Pointer.
+ * @return {@code true} If the given pointer is inside of this buffer.
+ */
+ public boolean isInside(long ptr) {
+ return ptr >= bufPtr && ptr <= bufEnd;
+ }
+
+ /**
+ * Resets position to the beginning of buffer.
+ */
+ public void reset() {
+ posPtr = bufPtr;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java
new file mode 100644
index 0000000..5ede18e
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
+import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobMetadata;
+import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+
+/**
+ * Task executor.
+ */
+public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter {
+ /** Job tracker. */
+ private HadoopJobTracker jobTracker;
+
+ /** */
+ private final ConcurrentMap<HadoopJobId, Collection<HadoopRunnableTask>> jobs = new ConcurrentHashMap<>();
+
+ /** Executor service to run tasks. */
+ private HadoopExecutorService exec;
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStart() throws IgniteCheckedException {
+ super.onKernalStart();
+
+ jobTracker = ctx.jobTracker();
+
+ exec = new HadoopExecutorService(log, ctx.kernalContext().gridName(),
+ ctx.configuration().getMaxParallelTasks(), ctx.configuration().getMaxTaskQueueSize());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStop(boolean cancel) {
+ if (exec != null) {
+ exec.shutdown(3000);
+
+ if (cancel) {
+ for (HadoopJobId jobId : jobs.keySet())
+ cancelTasks(jobId);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) {
+ if (exec != null && !exec.shutdown(30000))
+ U.warn(log, "Failed to finish running tasks in 30 sec.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run(final HadoopJob job, Collection<HadoopTaskInfo> tasks) throws IgniteCheckedException {
+ if (log.isDebugEnabled())
+ log.debug("Submitting tasks for local execution [locNodeId=" + ctx.localNodeId() +
+ ", tasksCnt=" + tasks.size() + ']');
+
+ Collection<HadoopRunnableTask> executedTasks = jobs.get(job.id());
+
+ if (executedTasks == null) {
+ executedTasks = new GridConcurrentHashSet<>();
+
+ Collection<HadoopRunnableTask> extractedCol = jobs.put(job.id(), executedTasks);
+
+ assert extractedCol == null;
+ }
+
+ final Collection<HadoopRunnableTask> finalExecutedTasks = executedTasks;
+
+ for (final HadoopTaskInfo info : tasks) {
+ assert info != null;
+
+ HadoopRunnableTask task = new HadoopRunnableTask(log, job, ctx.shuffle().memory(), info,
+ ctx.localNodeId()) {
+ @Override protected void onTaskFinished(HadoopTaskStatus status) {
+ if (log.isDebugEnabled())
+ log.debug("Finished task execution [jobId=" + job.id() + ", taskInfo=" + info + ", " +
+ "waitTime=" + waitTime() + ", execTime=" + executionTime() + ']');
+
+ finalExecutedTasks.remove(this);
+
+ jobTracker.onTaskFinished(info, status);
+ }
+
+ @Override protected HadoopTaskInput createInput(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+ return ctx.shuffle().input(taskCtx);
+ }
+
+ @Override protected HadoopTaskOutput createOutput(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+ return ctx.shuffle().output(taskCtx);
+ }
+ };
+
+ executedTasks.add(task);
+
+ exec.submit(task);
+ }
+ }
+
+ /**
+ * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks
+ * for this job ID.
+ * <p>
+ * It is guaranteed that this method will not be called concurrently with
+ * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method. No more job submissions will be performed via
+ * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method for given job ID after this method is called.
+ *
+ * @param jobId Job ID to cancel.
+ */
+ @Override public void cancelTasks(HadoopJobId jobId) {
+ Collection<HadoopRunnableTask> executedTasks = jobs.get(jobId);
+
+ if (executedTasks != null) {
+ for (HadoopRunnableTask task : executedTasks)
+ task.cancel();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onJobStateChanged(HadoopJobMetadata meta) throws IgniteCheckedException {
+ if (meta.phase() == HadoopJobPhase.PHASE_COMPLETE) {
+ Collection<HadoopRunnableTask> executedTasks = jobs.remove(meta.jobId());
+
+ assert executedTasks == null || executedTasks.isEmpty();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java
new file mode 100644
index 0000000..993ecc9
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor;
+
+
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.internal.util.worker.GridWorkerListener;
+import org.apache.ignite.internal.util.worker.GridWorkerListenerAdapter;
+import org.apache.ignite.thread.IgniteThread;
+import org.jsr166.ConcurrentHashMap8;
+
+import static java.util.Collections.newSetFromMap;
+
+/**
+ * Executor service without thread pooling.
+ */
+public class HadoopExecutorService {
+ /** */
+ private final LinkedBlockingQueue<Callable<?>> queue;
+
+ /** */
+ private final Collection<GridWorker> workers = newSetFromMap(new ConcurrentHashMap8<GridWorker, Boolean>());
+
+ /** */
+ private final AtomicInteger active = new AtomicInteger();
+
+ /** */
+ private final int maxTasks;
+
+ /** */
+ private final String gridName;
+
+ /** */
+ private final IgniteLogger log;
+
+ /** */
+ private volatile boolean shutdown;
+
+ /** */
+ private final GridWorkerListener lsnr = new GridWorkerListenerAdapter() {
+ @Override public void onStopped(GridWorker w) {
+ workers.remove(w);
+
+ if (shutdown) {
+ active.decrementAndGet();
+
+ return;
+ }
+
+ Callable<?> task = queue.poll();
+
+ if (task != null)
+ startThread(task);
+ else {
+ active.decrementAndGet();
+
+ if (!queue.isEmpty())
+ startFromQueue();
+ }
+ }
+ };
+
+ /**
+ * @param log Logger.
+ * @param gridName Grid name.
+ * @param maxTasks Max number of tasks.
+ * @param maxQueue Max queue length.
+ */
+ public HadoopExecutorService(IgniteLogger log, String gridName, int maxTasks, int maxQueue) {
+ assert maxTasks > 0 : maxTasks;
+ assert maxQueue > 0 : maxQueue;
+
+ this.maxTasks = maxTasks;
+ this.queue = new LinkedBlockingQueue<>(maxQueue);
+ this.gridName = gridName;
+ this.log = log.getLogger(HadoopExecutorService.class);
+ }
+
+ /**
+ * @return Number of active workers.
+ */
+ public int active() {
+ return workers.size();
+ }
+
+ /**
+ * Submit task.
+ *
+ * @param task Task.
+ */
+ public void submit(Callable<?> task) {
+ while (queue.isEmpty()) {
+ int active0 = active.get();
+
+ if (active0 == maxTasks)
+ break;
+
+ if (active.compareAndSet(active0, active0 + 1)) {
+ startThread(task);
+
+ return; // Started in new thread bypassing queue.
+ }
+ }
+
+ try {
+ while (!queue.offer(task, 100, TimeUnit.MILLISECONDS)) {
+ if (shutdown)
+ return; // Rejected due to shutdown.
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ return;
+ }
+
+ startFromQueue();
+ }
+
+ /**
+ * Attempts to start task from queue.
+ */
+ private void startFromQueue() {
+ do {
+ int active0 = active.get();
+
+ if (active0 == maxTasks)
+ break;
+
+ if (active.compareAndSet(active0, active0 + 1)) {
+ Callable<?> task = queue.poll();
+
+ if (task == null) {
+ int res = active.decrementAndGet();
+
+ assert res >= 0 : res;
+
+ break;
+ }
+
+ startThread(task);
+ }
+ }
+ while (!queue.isEmpty());
+ }
+
+ /**
+ * @param task Task.
+ */
+ private void startThread(final Callable<?> task) {
+ String workerName;
+
+ if (task instanceof HadoopRunnableTask) {
+ final HadoopTaskInfo i = ((HadoopRunnableTask)task).taskInfo();
+
+ workerName = "Hadoop-task-" + i.jobId() + "-" + i.type() + "-" + i.taskNumber() + "-" + i.attempt();
+ }
+ else
+ workerName = task.toString();
+
+ GridWorker w = new GridWorker(gridName, workerName, log, lsnr) {
+ @Override protected void body() {
+ try {
+ task.call();
+ }
+ catch (Exception e) {
+ log.error("Failed to execute task: " + task, e);
+ }
+ }
+ };
+
+ workers.add(w);
+
+ if (shutdown)
+ w.cancel();
+
+ new IgniteThread(w).start();
+ }
+
+ /**
+ * Shuts down this executor service.
+ *
+ * @param awaitTimeMillis Time in milliseconds to wait for tasks completion.
+ * @return {@code true} If all tasks completed.
+ */
+ public boolean shutdown(long awaitTimeMillis) {
+ shutdown = true;
+
+ for (GridWorker w : workers)
+ w.cancel();
+
+ while (awaitTimeMillis > 0 && !workers.isEmpty()) {
+ try {
+ Thread.sleep(100);
+
+ awaitTimeMillis -= 100;
+ }
+ catch (InterruptedException e) {
+ break;
+ }
+ }
+
+ return workers.isEmpty();
+ }
+
+ /**
+ * @return {@code true} If method {@linkplain #shutdown(long)} was already called.
+ */
+ public boolean isShutdown() {
+ return shutdown;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
new file mode 100644
index 0000000..a57efe6
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor;
+
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter;
+import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopHashMultimap;
+import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopMultimap;
+import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopSkipList;
+import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.COMBINER_HASHMAP_SIZE;
+import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_COMBINER_NO_SORTING;
+import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get;
+import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.COMBINE;
+import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.MAP;
+
+/**
+ * Runnable task.
+ */
+public abstract class HadoopRunnableTask implements Callable<Void> {
+ /** */
+ private final GridUnsafeMemory mem;
+
+ /** */
+ private final IgniteLogger log;
+
+ /** */
+ private final HadoopJob job;
+
+ /** Task to run. */
+ private final HadoopTaskInfo info;
+
+ /** Submit time. */
+ private final long submitTs = U.currentTimeMillis();
+
+ /** Execution start timestamp. */
+ private long execStartTs;
+
+ /** Execution end timestamp. */
+ private long execEndTs;
+
+ /** */
+ private HadoopMultimap combinerInput;
+
+ /** */
+ private volatile HadoopTaskContext ctx;
+
+ /** Set if task is to cancelling. */
+ private volatile boolean cancelled;
+
+ /** Node id. */
+ private UUID nodeId;
+
+ /**
+ * @param log Log.
+ * @param job Job.
+ * @param mem Memory.
+ * @param info Task info.
+ * @param nodeId Node id.
+ */
+ protected HadoopRunnableTask(IgniteLogger log, HadoopJob job, GridUnsafeMemory mem, HadoopTaskInfo info,
+ UUID nodeId) {
+ this.nodeId = nodeId;
+ this.log = log.getLogger(HadoopRunnableTask.class);
+ this.job = job;
+ this.mem = mem;
+ this.info = info;
+ }
+
+ /**
+ * @return Wait time.
+ */
+ public long waitTime() {
+ return execStartTs - submitTs;
+ }
+
+ /**
+ * @return Execution time.
+ */
+ public long executionTime() {
+ return execEndTs - execStartTs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void call() throws IgniteCheckedException {
+ ctx = job.getTaskContext(info);
+
+ return ctx.runAsJobOwner(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ call0();
+
+ return null;
+ }
+ });
+ }
+
+ /**
+ * Implements actual task running.
+ * @throws IgniteCheckedException
+ */
+ void call0() throws IgniteCheckedException {
+ execStartTs = U.currentTimeMillis();
+
+ Throwable err = null;
+
+ HadoopTaskState state = HadoopTaskState.COMPLETED;
+
+ HadoopPerformanceCounter perfCntr = null;
+
+ try {
+ perfCntr = HadoopPerformanceCounter.getCounter(ctx.counters(), nodeId);
+
+ perfCntr.onTaskSubmit(info, submitTs);
+ perfCntr.onTaskPrepare(info, execStartTs);
+
+ ctx.prepareTaskEnvironment();
+
+ runTask(perfCntr);
+
+ if (info.type() == MAP && job.info().hasCombiner()) {
+ ctx.taskInfo(new HadoopTaskInfo(COMBINE, info.jobId(), info.taskNumber(), info.attempt(), null));
+
+ try {
+ runTask(perfCntr);
+ }
+ finally {
+ ctx.taskInfo(info);
+ }
+ }
+ }
+ catch (HadoopTaskCancelledException ignored) {
+ state = HadoopTaskState.CANCELED;
+ }
+ catch (Throwable e) {
+ state = HadoopTaskState.FAILED;
+ err = e;
+
+ U.error(log, "Task execution failed.", e);
+
+ if (e instanceof Error)
+ throw e;
+ }
+ finally {
+ execEndTs = U.currentTimeMillis();
+
+ if (perfCntr != null)
+ perfCntr.onTaskFinish(info, execEndTs);
+
+ onTaskFinished(new HadoopTaskStatus(state, err, ctx==null ? null : ctx.counters()));
+
+ if (combinerInput != null)
+ combinerInput.close();
+
+ if (ctx != null)
+ ctx.cleanupTaskEnvironment();
+ }
+ }
+
+ /**
+ * @param perfCntr Performance counter.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void runTask(HadoopPerformanceCounter perfCntr) throws IgniteCheckedException {
+ if (cancelled)
+ throw new HadoopTaskCancelledException("Task cancelled.");
+
+ try (HadoopTaskOutput out = createOutputInternal(ctx);
+ HadoopTaskInput in = createInputInternal(ctx)) {
+
+ ctx.input(in);
+ ctx.output(out);
+
+ perfCntr.onTaskStart(ctx.taskInfo(), U.currentTimeMillis());
+
+ ctx.run();
+ }
+ }
+
+ /**
+ * Cancel the executed task.
+ */
+ public void cancel() {
+ cancelled = true;
+
+ if (ctx != null)
+ ctx.cancel();
+ }
+
+ /**
+ * @param status Task status.
+ */
+ protected abstract void onTaskFinished(HadoopTaskStatus status);
+
+ /**
+ * @param ctx Task context.
+ * @return Task input.
+ * @throws IgniteCheckedException If failed.
+ */
+ @SuppressWarnings("unchecked")
+ private HadoopTaskInput createInputInternal(HadoopTaskContext ctx) throws IgniteCheckedException {
+ switch (ctx.taskInfo().type()) {
+ case SETUP:
+ case MAP:
+ case COMMIT:
+ case ABORT:
+ return null;
+
+ case COMBINE:
+ assert combinerInput != null;
+
+ return combinerInput.input(ctx);
+
+ default:
+ return createInput(ctx);
+ }
+ }
+
+ /**
+ * @param ctx Task context.
+ * @return Input.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected abstract HadoopTaskInput createInput(HadoopTaskContext ctx) throws IgniteCheckedException;
+
+ /**
+ * @param ctx Task info.
+ * @return Output.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected abstract HadoopTaskOutput createOutput(HadoopTaskContext ctx) throws IgniteCheckedException;
+
+ /**
+ * @param ctx Task info.
+ * @return Task output.
+ * @throws IgniteCheckedException If failed.
+ */
+ private HadoopTaskOutput createOutputInternal(HadoopTaskContext ctx) throws IgniteCheckedException {
+ switch (ctx.taskInfo().type()) {
+ case SETUP:
+ case REDUCE:
+ case COMMIT:
+ case ABORT:
+ return null;
+
+ case MAP:
+ if (job.info().hasCombiner()) {
+ assert combinerInput == null;
+
+ combinerInput = get(job.info(), SHUFFLE_COMBINER_NO_SORTING, false) ?
+ new HadoopHashMultimap(job.info(), mem, get(job.info(), COMBINER_HASHMAP_SIZE, 8 * 1024)):
+ new HadoopSkipList(job.info(), mem); // TODO replace with red-black tree
+
+ return combinerInput.startAdding(ctx);
+ }
+
+ default:
+ return createOutput(ctx);
+ }
+ }
+
+ /**
+ * @return Task info.
+ */
+ public HadoopTaskInfo taskInfo() {
+ return info;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java
new file mode 100644
index 0000000..f13c76a
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor;
+
+import java.util.Collection;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopComponent;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobMetadata;
+
+/**
+ * Common superclass for task executor.
+ */
+public abstract class HadoopTaskExecutorAdapter extends HadoopComponent {
+ /**
+ * Runs tasks.
+ *
+ * @param job Job.
+ * @param tasks Tasks.
+ * @throws IgniteCheckedException If failed.
+ */
+ public abstract void run(final HadoopJob job, Collection<HadoopTaskInfo> tasks) throws IgniteCheckedException;
+
+ /**
+ * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks
+ * for this job ID.
+ * <p>
+ * It is guaranteed that this method will not be called concurrently with
+ * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method. No more job submissions will be performed via
+ * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method for given job ID after this method is called.
+ *
+ * @param jobId Job ID to cancel.
+ */
+ public abstract void cancelTasks(HadoopJobId jobId) throws IgniteCheckedException;
+
+ /**
+ * On job state change callback;
+ *
+ * @param meta Job metadata.
+ */
+ public abstract void onJobStateChanged(HadoopJobMetadata meta) throws IgniteCheckedException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java
new file mode 100644
index 0000000..b22d291
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor;
+
+/**
+* State of the task.
+*/
+public enum HadoopTaskState {
+ /** Running task. */
+ RUNNING,
+
+ /** Completed task. */
+ COMPLETED,
+
+ /** Failed task. */
+ FAILED,
+
+ /** Canceled task. */
+ CANCELED,
+
+ /** Process crashed. */
+ CRASHED
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java
new file mode 100644
index 0000000..fa09ff7
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Task status.
+ */
+public class HadoopTaskStatus implements Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private HadoopTaskState state;
+
+ /** */
+ private Throwable failCause;
+
+ /** */
+ private HadoopCounters cntrs;
+
+ /**
+ * Default constructor required by {@link Externalizable}.
+ */
+ public HadoopTaskStatus() {
+ // No-op.
+ }
+
+ /**
+ * Creates new instance.
+ *
+ * @param state Task state.
+ * @param failCause Failure cause (if any).
+ */
+ public HadoopTaskStatus(HadoopTaskState state, @Nullable Throwable failCause) {
+ this(state, failCause, null);
+ }
+
+ /**
+ * Creates new instance.
+ *
+ * @param state Task state.
+ * @param failCause Failure cause (if any).
+ * @param cntrs Task counters.
+ */
+ public HadoopTaskStatus(HadoopTaskState state, @Nullable Throwable failCause,
+ @Nullable HadoopCounters cntrs) {
+ assert state != null;
+
+ this.state = state;
+ this.failCause = failCause;
+ this.cntrs = cntrs;
+ }
+
+ /**
+ * @return State.
+ */
+ public HadoopTaskState state() {
+ return state;
+ }
+
+ /**
+ * @return Fail cause.
+ */
+ @Nullable public Throwable failCause() {
+ return failCause;
+ }
+
+ /**
+ * @return Counters.
+ */
+ @Nullable public HadoopCounters counters() {
+ return cntrs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopTaskStatus.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(state);
+ out.writeObject(failCause);
+ out.writeObject(cntrs);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ state = (HadoopTaskState)in.readObject();
+ failCause = (Throwable)in.readObject();
+ cntrs = (HadoopCounters)in.readObject();
+ }
+}
\ No newline at end of file