You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/19 10:50:43 UTC
[13/51] [partial] ignite git commit: IGNITE-3916: Created separate
module.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
deleted file mode 100644
index 39b7c51..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
+++ /dev/null
@@ -1,435 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
-
-import java.io.DataInput;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataInStream;
-import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataOutStream;
-import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopOffheapBuffer;
-import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
-import org.jetbrains.annotations.Nullable;
-
-import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_OFFHEAP_PAGE_SIZE;
-import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get;
-
-/**
- * Base class for all multimaps.
- */
-public abstract class HadoopMultimapBase implements HadoopMultimap {
- /** */
- protected final GridUnsafeMemory mem;
-
- /** */
- protected final int pageSize;
-
- /** */
- private final Collection<Page> allPages = new ConcurrentLinkedQueue<>();
-
- /**
- * @param jobInfo Job info.
- * @param mem Memory.
- */
- protected HadoopMultimapBase(HadoopJobInfo jobInfo, GridUnsafeMemory mem) {
- assert jobInfo != null;
- assert mem != null;
-
- this.mem = mem;
-
- pageSize = get(jobInfo, SHUFFLE_OFFHEAP_PAGE_SIZE, 32 * 1024);
- }
-
- /**
- * @param page Page.
- */
- private void deallocate(Page page) {
- assert page != null;
-
- mem.release(page.ptr, page.size);
- }
-
- /**
- * @param valPtr Value page pointer.
- * @param nextValPtr Next value page pointer.
- */
- protected void nextValue(long valPtr, long nextValPtr) {
- mem.writeLong(valPtr, nextValPtr);
- }
-
- /**
- * @param valPtr Value page pointer.
- * @return Next value page pointer.
- */
- protected long nextValue(long valPtr) {
- return mem.readLong(valPtr);
- }
-
- /**
- * @param valPtr Value page pointer.
- * @param size Size.
- */
- protected void valueSize(long valPtr, int size) {
- mem.writeInt(valPtr + 8, size);
- }
-
- /**
- * @param valPtr Value page pointer.
- * @return Value size.
- */
- protected int valueSize(long valPtr) {
- return mem.readInt(valPtr + 8);
- }
-
- /** {@inheritDoc} */
- @Override public void close() {
- for (Page page : allPages)
- deallocate(page);
- }
-
- /**
- * Reader for key and value.
- */
- protected class ReaderBase implements AutoCloseable {
- /** */
- private Object tmp;
-
- /** */
- private final HadoopSerialization ser;
-
- /** */
- private final HadoopDataInStream in = new HadoopDataInStream(mem);
-
- /**
- * @param ser Serialization.
- */
- protected ReaderBase(HadoopSerialization ser) {
- assert ser != null;
-
- this.ser = ser;
- }
-
- /**
- * @param valPtr Value page pointer.
- * @return Value.
- */
- public Object readValue(long valPtr) {
- assert valPtr > 0 : valPtr;
-
- try {
- return read(valPtr + 12, valueSize(valPtr));
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
-
- /**
- * Resets temporary object to the given one.
- *
- * @param tmp Temporary object for reuse.
- */
- public void resetReusedObject(Object tmp) {
- this.tmp = tmp;
- }
-
- /**
- * @param ptr Pointer.
- * @param size Object size.
- * @return Object.
- */
- protected Object read(long ptr, long size) throws IgniteCheckedException {
- in.buffer().set(ptr, size);
-
- tmp = ser.read(in, tmp);
-
- return tmp;
- }
-
- /** {@inheritDoc} */
- @Override public void close() throws IgniteCheckedException {
- ser.close();
- }
- }
-
- /**
- * Base class for adders.
- */
- protected abstract class AdderBase implements Adder {
- /** */
- protected final HadoopSerialization keySer;
-
- /** */
- protected final HadoopSerialization valSer;
-
- /** */
- private final HadoopDataOutStream out;
-
- /** */
- private long writeStart;
-
- /** Current page. */
- private Page curPage;
-
- /**
- * @param ctx Task context.
- * @throws IgniteCheckedException If failed.
- */
- protected AdderBase(HadoopTaskContext ctx) throws IgniteCheckedException {
- valSer = ctx.valueSerialization();
- keySer = ctx.keySerialization();
-
- out = new HadoopDataOutStream(mem) {
- @Override public long move(long size) {
- long ptr = super.move(size);
-
- if (ptr == 0) // Was not able to move - not enough free space.
- ptr = allocateNextPage(size);
-
- assert ptr != 0;
-
- return ptr;
- }
- };
- }
-
- /**
- * @param requestedSize Requested size.
- * @return Next write pointer.
- */
- private long allocateNextPage(long requestedSize) {
- int writtenSize = writtenSize();
-
- long newPageSize = nextPageSize(writtenSize + requestedSize);
- long newPagePtr = mem.allocate(newPageSize);
-
- HadoopOffheapBuffer b = out.buffer();
-
- b.set(newPagePtr, newPageSize);
-
- if (writtenSize != 0) {
- mem.copyMemory(writeStart, newPagePtr, writtenSize);
-
- b.move(writtenSize);
- }
-
- writeStart = newPagePtr;
-
- // At this point old page is not needed, so we release it.
- Page oldPage = curPage;
-
- curPage = new Page(newPagePtr, newPageSize);
-
- if (oldPage != null)
- allPages.add(oldPage);
-
- return b.move(requestedSize);
- }
-
- /**
- * Get next page size.
- *
- * @param required Required amount of data.
- * @return Next page size.
- */
- private long nextPageSize(long required) {
- long pages = (required / pageSize) + 1;
-
- long pagesPow2 = nextPowerOfTwo(pages);
-
- return pagesPow2 * pageSize;
- }
-
- /**
- * Get next power of two which greater or equal to the given number. Naive implementation.
- *
- * @param val Number
- * @return Nearest pow2.
- */
- private long nextPowerOfTwo(long val) {
- long res = 1;
-
- while (res < val)
- res = res << 1;
-
- if (res < 0)
- throw new IllegalArgumentException("Value is too big to find positive pow2: " + val);
-
- return res;
- }
-
- /**
- * @return Fixed pointer.
- */
- private long fixAlignment() {
- HadoopOffheapBuffer b = out.buffer();
-
- long ptr = b.pointer();
-
- if ((ptr & 7L) != 0) { // Address is not aligned by octet.
- ptr = (ptr + 8L) & ~7L;
-
- b.pointer(ptr);
- }
-
- return ptr;
- }
-
- /**
- * @param off Offset.
- * @param o Object.
- * @return Page pointer.
- * @throws IgniteCheckedException If failed.
- */
- protected long write(int off, Object o, HadoopSerialization ser) throws IgniteCheckedException {
- writeStart = fixAlignment();
-
- if (off != 0)
- out.move(off);
-
- ser.write(out, o);
-
- return writeStart;
- }
-
- /**
- * @param size Size.
- * @return Pointer.
- */
- protected long allocate(int size) {
- writeStart = fixAlignment();
-
- out.move(size);
-
- return writeStart;
- }
-
- /**
- * Rewinds local allocation pointer to the given pointer if possible.
- *
- * @param ptr Pointer.
- */
- protected void localDeallocate(long ptr) {
- HadoopOffheapBuffer b = out.buffer();
-
- if (b.isInside(ptr))
- b.pointer(ptr);
- else
- b.reset();
- }
-
- /**
- * @return Written size.
- */
- protected int writtenSize() {
- return (int)(out.buffer().pointer() - writeStart);
- }
-
- /** {@inheritDoc} */
- @Override public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override public void close() throws IgniteCheckedException {
- if (curPage != null)
- allPages.add(curPage);
-
- keySer.close();
- valSer.close();
- }
- }
-
- /**
- * Iterator over values.
- */
- protected class ValueIterator implements Iterator<Object> {
- /** */
- private long valPtr;
-
- /** */
- private final ReaderBase valReader;
-
- /**
- * @param valPtr Value page pointer.
- * @param valReader Value reader.
- */
- protected ValueIterator(long valPtr, ReaderBase valReader) {
- this.valPtr = valPtr;
- this.valReader = valReader;
- }
-
- /**
- * @param valPtr Head value pointer.
- */
- public void head(long valPtr) {
- this.valPtr = valPtr;
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasNext() {
- return valPtr != 0;
- }
-
- /** {@inheritDoc} */
- @Override public Object next() {
- if (!hasNext())
- throw new NoSuchElementException();
-
- Object res = valReader.readValue(valPtr);
-
- valPtr = nextValue(valPtr);
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public void remove() {
- throw new UnsupportedOperationException();
- }
- }
-
- /**
- * Page.
- */
- private static class Page {
- /** Pointer. */
- private final long ptr;
-
- /** Size. */
- private final long size;
-
- /**
- * Constructor.
- *
- * @param ptr Pointer.
- * @param size Size.
- */
- public Page(long ptr, long size) {
- this.ptr = ptr;
- this.size = size;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java
deleted file mode 100644
index 7db88bc..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java
+++ /dev/null
@@ -1,733 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
-
-import java.io.DataInput;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
-import org.apache.ignite.internal.util.GridLongList;
-import org.apache.ignite.internal.util.GridRandom;
-import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Skip list.
- */
-public class HadoopSkipList extends HadoopMultimapBase {
- /** */
- private static final int HEADS_SIZE = 24 + 33 * 8; // Offset + max level is from 0 to 32 inclusive.
-
- /** Top level. */
- private final AtomicInteger topLevel = new AtomicInteger(-1);
-
- /** Heads for all the lists. */
- private final long heads;
-
- /** */
- private final AtomicBoolean visitGuard = new AtomicBoolean();
-
- /**
- * @param jobInfo Job info.
- * @param mem Memory.
- */
- public HadoopSkipList(HadoopJobInfo jobInfo, GridUnsafeMemory mem) {
- super(jobInfo, mem);
-
- heads = mem.allocate(HEADS_SIZE, true);
- }
-
- /** {@inheritDoc} */
- @Override public void close() {
- super.close();
-
- mem.release(heads, HEADS_SIZE);
- }
-
- /** {@inheritDoc} */
- @Override public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException {
- if (!visitGuard.compareAndSet(false, true))
- return false;
-
- for (long meta = nextMeta(heads, 0); meta != 0L; meta = nextMeta(meta, 0)) {
- long valPtr = value(meta);
-
- long lastVisited = ignoreLastVisited ? 0 : lastVisitedValue(meta);
-
- if (valPtr != lastVisited) {
- long k = key(meta);
-
- v.onKey(k + 4, keySize(k));
-
- lastVisitedValue(meta, valPtr); // Set it to the first value in chain.
-
- do {
- v.onValue(valPtr + 12, valueSize(valPtr));
-
- valPtr = nextValue(valPtr);
- }
- while (valPtr != lastVisited);
- }
- }
-
- visitGuard.lazySet(false);
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public Adder startAdding(HadoopTaskContext ctx) throws IgniteCheckedException {
- return new AdderImpl(ctx);
- }
-
- /** {@inheritDoc} */
- @Override public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
- Input in = new Input(taskCtx);
-
- Comparator<Object> grpCmp = taskCtx.groupComparator();
-
- if (grpCmp != null)
- return new GroupedInput(grpCmp, in);
-
- return in;
- }
-
- /**
- * @param meta Meta pointer.
- * @return Key pointer.
- */
- private long key(long meta) {
- return mem.readLong(meta);
- }
-
- /**
- * @param meta Meta pointer.
- * @param key Key pointer.
- */
- private void key(long meta, long key) {
- mem.writeLong(meta, key);
- }
-
- /**
- * @param meta Meta pointer.
- * @return Value pointer.
- */
- private long value(long meta) {
- return mem.readLongVolatile(meta + 8);
- }
-
- /**
- * @param meta Meta pointer.
- * @param valPtr Value pointer.
- */
- private void value(long meta, long valPtr) {
- mem.writeLongVolatile(meta + 8, valPtr);
- }
-
- /**
- * @param meta Meta pointer.
- * @param oldValPtr Old first value pointer.
- * @param newValPtr New first value pointer.
- * @return {@code true} If operation succeeded.
- */
- private boolean casValue(long meta, long oldValPtr, long newValPtr) {
- return mem.casLong(meta + 8, oldValPtr, newValPtr);
- }
-
- /**
- * @param meta Meta pointer.
- * @return Last visited value pointer.
- */
- private long lastVisitedValue(long meta) {
- return mem.readLong(meta + 16);
- }
-
- /**
- * @param meta Meta pointer.
- * @param valPtr Last visited value pointer.
- */
- private void lastVisitedValue(long meta, long valPtr) {
- mem.writeLong(meta + 16, valPtr);
- }
-
- /**
- * @param meta Meta pointer.
- * @param level Level.
- * @return Next meta pointer.
- */
- private long nextMeta(long meta, int level) {
- assert meta > 0 : meta;
-
- return mem.readLongVolatile(meta + 24 + 8 * level);
- }
-
- /**
- * @param meta Meta pointer.
- * @param level Level.
- * @param oldNext Old next meta pointer.
- * @param newNext New next meta pointer.
- * @return {@code true} If operation succeeded.
- */
- private boolean casNextMeta(long meta, int level, long oldNext, long newNext) {
- assert meta > 0 : meta;
-
- return mem.casLong(meta + 24 + 8 * level, oldNext, newNext);
- }
-
- /**
- * @param meta Meta pointer.
- * @param level Level.
- * @param nextMeta Next meta.
- */
- private void nextMeta(long meta, int level, long nextMeta) {
- assert meta != 0;
-
- mem.writeLong(meta + 24 + 8 * level, nextMeta);
- }
-
- /**
- * @param keyPtr Key pointer.
- * @return Key size.
- */
- private int keySize(long keyPtr) {
- return mem.readInt(keyPtr);
- }
-
- /**
- * @param keyPtr Key pointer.
- * @param keySize Key size.
- */
- private void keySize(long keyPtr, int keySize) {
- mem.writeInt(keyPtr, keySize);
- }
-
- /**
- * @param rnd Random.
- * @return Next level.
- */
- public static int randomLevel(Random rnd) {
- int x = rnd.nextInt();
-
- int level = 0;
-
- while ((x & 1) != 0) { // Count sequential 1 bits.
- level++;
-
- x >>>= 1;
- }
-
- return level;
- }
-
- /**
- * Reader.
- */
- private class Reader extends ReaderBase {
- /**
- * @param ser Serialization.
- */
- protected Reader(HadoopSerialization ser) {
- super(ser);
- }
-
- /**
- * @param meta Meta pointer.
- * @return Key.
- */
- public Object readKey(long meta) {
- assert meta > 0 : meta;
-
- long k = key(meta);
-
- try {
- return read(k + 4, keySize(k));
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
- }
-
- /**
- * Adder.
- */
- private class AdderImpl extends AdderBase {
- /** */
- private final Comparator<Object> cmp;
-
- /** */
- private final Random rnd = new GridRandom();
-
- /** */
- private final GridLongList stack = new GridLongList(16);
-
- /** */
- private final Reader keyReader;
-
- /**
- * @param ctx Task context.
- * @throws IgniteCheckedException If failed.
- */
- protected AdderImpl(HadoopTaskContext ctx) throws IgniteCheckedException {
- super(ctx);
-
- keyReader = new Reader(keySer);
-
- cmp = ctx.sortComparator();
- }
-
- /** {@inheritDoc} */
- @Override public void write(Object key, Object val) throws IgniteCheckedException {
- A.notNull(val, "val");
-
- add(key, val);
- }
-
- /** {@inheritDoc} */
- @Override public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException {
- KeyImpl k = reuse == null ? new KeyImpl() : (KeyImpl)reuse;
-
- k.tmpKey = keySer.read(in, k.tmpKey);
-
- k.meta = add(k.tmpKey, null);
-
- return k;
- }
-
- /**
- * @param key Key.
- * @param val Value.
- * @param level Level.
- * @return Meta pointer.
- */
- private long createMeta(long key, long val, int level) {
- int size = 32 + 8 * level;
-
- long meta = allocate(size);
-
- key(meta, key);
- value(meta, val);
- lastVisitedValue(meta, 0L);
-
- for (int i = 32; i < size; i += 8) // Fill with 0.
- mem.writeLong(meta + i, 0L);
-
- return meta;
- }
-
- /**
- * @param key Key.
- * @return Pointer.
- * @throws IgniteCheckedException If failed.
- */
- private long writeKey(Object key) throws IgniteCheckedException {
- long keyPtr = write(4, key, keySer);
- int keySize = writtenSize() - 4;
-
- keySize(keyPtr, keySize);
-
- return keyPtr;
- }
-
- /**
- * @param prevMeta Previous meta.
- * @param meta Next meta.
- */
- private void stackPush(long prevMeta, long meta) {
- stack.add(prevMeta);
- stack.add(meta);
- }
-
- /**
- * Drops last remembered frame from the stack.
- */
- private void stackPop() {
- stack.pop(2);
- }
-
- /**
- * @param key Key.
- * @param val Value.
- * @return Meta pointer.
- * @throws IgniteCheckedException If failed.
- */
- private long add(Object key, @Nullable Object val) throws IgniteCheckedException {
- assert key != null;
-
- stack.clear();
-
- long valPtr = 0;
-
- if (val != null) { // Write value.
- valPtr = write(12, val, valSer);
- int valSize = writtenSize() - 12;
-
- nextValue(valPtr, 0);
- valueSize(valPtr, valSize);
- }
-
- long keyPtr = 0;
- long newMeta = 0;
- int newMetaLevel = -1;
-
- long prevMeta = heads;
- int level = topLevel.get();
- long meta = level < 0 ? 0 : nextMeta(heads, level);
-
- for (;;) {
- if (level < 0) { // We did not find our key, trying to add new meta.
- if (keyPtr == 0) { // Write key and create meta only once.
- keyPtr = writeKey(key);
-
- newMetaLevel = randomLevel(rnd);
- newMeta = createMeta(keyPtr, valPtr, newMetaLevel);
- }
-
- nextMeta(newMeta, 0, meta); // Set next to new meta before publishing.
-
- if (casNextMeta(prevMeta, 0, meta, newMeta)) { // New key was added successfully.
- laceUp(key, newMeta, newMetaLevel);
-
- return newMeta;
- }
- else { // Add failed, need to check out what was added by another thread.
- meta = nextMeta(prevMeta, level = 0);
-
- stackPop();
- }
- }
-
- int cmpRes = cmp(key, meta);
-
- if (cmpRes == 0) { // Key found.
- if (newMeta != 0) // Deallocate if we've allocated something.
- localDeallocate(keyPtr);
-
- if (valPtr == 0) // Only key needs to be added.
- return meta;
-
- for (;;) { // Add value for the key found.
- long nextVal = value(meta);
-
- nextValue(valPtr, nextVal);
-
- if (casValue(meta, nextVal, valPtr))
- return meta;
- }
- }
-
- assert cmpRes != 0;
-
- if (cmpRes > 0) { // Go right.
- prevMeta = meta;
- meta = nextMeta(meta, level);
-
- if (meta != 0) // If nothing to the right then go down.
- continue;
- }
-
- while (--level >= 0) { // Go down.
- stackPush(prevMeta, meta); // Remember the path.
-
- long nextMeta = nextMeta(prevMeta, level);
-
- if (nextMeta != meta) { // If the meta is the same as on upper level go deeper.
- meta = nextMeta;
-
- assert meta != 0;
-
- break;
- }
- }
- }
- }
-
- /**
- * @param key Key.
- * @param meta Meta pointer.
- * @return Comparison result.
- */
- @SuppressWarnings("unchecked")
- private int cmp(Object key, long meta) {
- assert meta != 0;
-
- return cmp.compare(key, keyReader.readKey(meta));
- }
-
- /**
- * Adds appropriate index links between metas.
- *
- * @param newMeta Just added meta.
- * @param newMetaLevel New level.
- */
- private void laceUp(Object key, long newMeta, int newMetaLevel) {
- for (int level = 1; level <= newMetaLevel; level++) { // Go from the bottom up.
- long prevMeta = heads;
- long meta = 0;
-
- if (!stack.isEmpty()) { // Get the path back.
- meta = stack.remove();
- prevMeta = stack.remove();
- }
-
- for (;;) {
- nextMeta(newMeta, level, meta);
-
- if (casNextMeta(prevMeta, level, meta, newMeta))
- break;
-
- long oldMeta = meta;
-
- meta = nextMeta(prevMeta, level); // Reread meta.
-
- for (;;) {
- int cmpRes = cmp(key, meta);
-
- if (cmpRes > 0) { // Go right.
- prevMeta = meta;
- meta = nextMeta(prevMeta, level);
-
- if (meta != oldMeta) // Old meta already known to be greater than ours or is 0.
- continue;
- }
-
- assert cmpRes != 0; // Two different metas with equal keys must be impossible.
-
- break; // Retry cas.
- }
- }
- }
-
- if (!stack.isEmpty())
- return; // Our level already lower than top.
-
- for (;;) { // Raise top level.
- int top = topLevel.get();
-
- if (newMetaLevel <= top || topLevel.compareAndSet(top, newMetaLevel))
- break;
- }
- }
-
- /**
- * Key.
- */
- private class KeyImpl implements Key {
- /** */
- private long meta;
-
- /** */
- private Object tmpKey;
-
- /**
- * @return Meta pointer for the key.
- */
- public long address() {
- return meta;
- }
-
- /**
- * @param val Value.
- */
- @Override public void add(Value val) {
- int size = val.size();
-
- long valPtr = allocate(size + 12);
-
- val.copyTo(valPtr + 12);
-
- valueSize(valPtr, size);
-
- long nextVal;
-
- do {
- nextVal = value(meta);
-
- nextValue(valPtr, nextVal);
- }
- while(!casValue(meta, nextVal, valPtr));
- }
- }
- }
-
- /**
- * Task input.
- */
- private class Input implements HadoopTaskInput {
- /** */
- private long metaPtr = heads;
-
- /** */
- private final Reader keyReader;
-
- /** */
- private final Reader valReader;
-
- /**
- * @param taskCtx Task context.
- * @throws IgniteCheckedException If failed.
- */
- private Input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
- keyReader = new Reader(taskCtx.keySerialization());
- valReader = new Reader(taskCtx.valueSerialization());
- }
-
- /** {@inheritDoc} */
- @Override public boolean next() {
- metaPtr = nextMeta(metaPtr, 0);
-
- return metaPtr != 0;
- }
-
- /** {@inheritDoc} */
- @Override public Object key() {
- return keyReader.readKey(metaPtr);
- }
-
- /** {@inheritDoc} */
- @Override public Iterator<?> values() {
- return new ValueIterator(value(metaPtr), valReader);
- }
-
- /** {@inheritDoc} */
- @Override public void close() throws IgniteCheckedException {
- keyReader.close();
- valReader.close();
- }
- }
-
- /**
- * Grouped input using grouping comparator.
- */
- private class GroupedInput implements HadoopTaskInput {
- /** */
- private final Comparator<Object> grpCmp;
-
- /** */
- private final Input in;
-
- /** */
- private Object prevKey;
-
- /** */
- private Object nextKey;
-
- /** */
- private final GridLongList vals = new GridLongList();
-
- /**
- * @param grpCmp Grouping comparator.
- * @param in Input.
- */
- private GroupedInput(Comparator<Object> grpCmp, Input in) {
- this.grpCmp = grpCmp;
- this.in = in;
- }
-
- /** {@inheritDoc} */
- @Override public boolean next() {
- if (prevKey == null) { // First call.
- if (!in.next())
- return false;
-
- prevKey = in.key();
-
- assert prevKey != null;
-
- in.keyReader.resetReusedObject(null); // We need 2 instances of key object for comparison.
-
- vals.add(value(in.metaPtr));
- }
- else {
- if (in.metaPtr == 0) // We reached the end of the input.
- return false;
-
- vals.clear();
-
- vals.add(value(in.metaPtr));
-
- in.keyReader.resetReusedObject(prevKey); // Switch key instances.
-
- prevKey = nextKey;
- }
-
- while (in.next()) { // Fill with head value pointers with equal keys.
- if (grpCmp.compare(prevKey, nextKey = in.key()) == 0)
- vals.add(value(in.metaPtr));
- else
- break;
- }
-
- assert !vals.isEmpty();
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public Object key() {
- return prevKey;
- }
-
- /** {@inheritDoc} */
- @Override public Iterator<?> values() {
- assert !vals.isEmpty();
-
- final ValueIterator valIter = new ValueIterator(vals.get(0), in.valReader);
-
- return new Iterator<Object>() {
- /** */
- private int idx;
-
- @Override public boolean hasNext() {
- if (!valIter.hasNext()) {
- if (++idx == vals.size())
- return false;
-
- valIter.head(vals.get(idx));
-
- assert valIter.hasNext();
- }
-
- return true;
- }
-
- @Override public Object next() {
- return valIter.next();
- }
-
- @Override public void remove() {
- valIter.remove();
- }
- };
- }
-
- /** {@inheritDoc} */
- @Override public void close() throws IgniteCheckedException {
- in.close();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java
deleted file mode 100644
index 3b5fa15..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle.streams;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
-
-/**
- * Data input stream.
- */
-public class HadoopDataInStream extends InputStream implements DataInput {
- /** */
- private final HadoopOffheapBuffer buf = new HadoopOffheapBuffer(0, 0);
-
- /** */
- private final GridUnsafeMemory mem;
-
- /**
- * @param mem Memory.
- */
- public HadoopDataInStream(GridUnsafeMemory mem) {
- assert mem != null;
-
- this.mem = mem;
- }
-
- /**
- * @return Buffer.
- */
- public HadoopOffheapBuffer buffer() {
- return buf;
- }
-
- /**
- * @param size Size.
- * @return Old pointer.
- */
- protected long move(long size) throws IOException {
- long ptr = buf.move(size);
-
- assert ptr != 0;
-
- return ptr;
- }
-
- /** {@inheritDoc} */
- @Override public int read() throws IOException {
- return readUnsignedByte();
- }
-
- /** {@inheritDoc} */
- @Override public int read(byte[] b, int off, int len) throws IOException {
- readFully(b, off, len);
-
- return len;
- }
-
- /** {@inheritDoc} */
- @Override public long skip(long n) throws IOException {
- move(n);
-
- return n;
- }
-
- /** {@inheritDoc} */
- @Override public void readFully(byte[] b) throws IOException {
- readFully(b, 0, b.length);
- }
-
- /** {@inheritDoc} */
- @Override public void readFully(byte[] b, int off, int len) throws IOException {
- mem.readBytes(move(len), b, off, len);
- }
-
- /** {@inheritDoc} */
- @Override public int skipBytes(int n) throws IOException {
- move(n);
-
- return n;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readBoolean() throws IOException {
- byte res = readByte();
-
- if (res == 1)
- return true;
-
- assert res == 0 : res;
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public byte readByte() throws IOException {
- return mem.readByte(move(1));
- }
-
- /** {@inheritDoc} */
- @Override public int readUnsignedByte() throws IOException {
- return readByte() & 0xff;
- }
-
- /** {@inheritDoc} */
- @Override public short readShort() throws IOException {
- return mem.readShort(move(2));
- }
-
- /** {@inheritDoc} */
- @Override public int readUnsignedShort() throws IOException {
- return readShort() & 0xffff;
- }
-
- /** {@inheritDoc} */
- @Override public char readChar() throws IOException {
- return (char)readShort();
- }
-
- /** {@inheritDoc} */
- @Override public int readInt() throws IOException {
- return mem.readInt(move(4));
- }
-
- /** {@inheritDoc} */
- @Override public long readLong() throws IOException {
- return mem.readLong(move(8));
- }
-
- /** {@inheritDoc} */
- @Override public float readFloat() throws IOException {
- return mem.readFloat(move(4));
- }
-
- /** {@inheritDoc} */
- @Override public double readDouble() throws IOException {
- return mem.readDouble(move(8));
- }
-
- /** {@inheritDoc} */
- @Override public String readLine() throws IOException {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override public String readUTF() throws IOException {
- byte[] bytes = new byte[readInt()];
-
- if (bytes.length != 0)
- readFully(bytes);
-
- return new String(bytes, StandardCharsets.UTF_8);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataOutStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataOutStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataOutStream.java
deleted file mode 100644
index f7b1a73..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataOutStream.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle.streams;
-
-import java.io.DataOutput;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-import org.apache.ignite.internal.util.GridUnsafe;
-import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
-
-/**
- * Data output stream.
- */
-public class HadoopDataOutStream extends OutputStream implements DataOutput {
- /** */
- private final HadoopOffheapBuffer buf = new HadoopOffheapBuffer(0, 0);
-
- /** */
- private final GridUnsafeMemory mem;
-
- /**
- * @param mem Memory.
- */
- public HadoopDataOutStream(GridUnsafeMemory mem) {
- this.mem = mem;
- }
-
- /**
- * @return Buffer.
- */
- public HadoopOffheapBuffer buffer() {
- return buf;
- }
-
- /**
- * @param size Size.
- * @return Old pointer or {@code 0} if move was impossible.
- */
- public long move(long size) {
- return buf.move(size);
- }
-
- /** {@inheritDoc} */
- @Override public void write(int b) {
- writeByte(b);
- }
-
- /** {@inheritDoc} */
- @Override public void write(byte[] b) {
- write(b, 0, b.length);
- }
-
- /** {@inheritDoc} */
- @Override public void write(byte[] b, int off, int len) {
- GridUnsafe.copyMemory(b, GridUnsafe.BYTE_ARR_OFF + off, null, move(len), len);
- }
-
- /** {@inheritDoc} */
- @Override public void writeBoolean(boolean v) {
- writeByte(v ? 1 : 0);
- }
-
- /** {@inheritDoc} */
- @Override public void writeByte(int v) {
- mem.writeByte(move(1), (byte)v);
- }
-
- /** {@inheritDoc} */
- @Override public void writeShort(int v) {
- mem.writeShort(move(2), (short)v);
- }
-
- /** {@inheritDoc} */
- @Override public void writeChar(int v) {
- writeShort(v);
- }
-
- /** {@inheritDoc} */
- @Override public void writeInt(int v) {
- mem.writeInt(move(4), v);
- }
-
- /** {@inheritDoc} */
- @Override public void writeLong(long v) {
- mem.writeLong(move(8), v);
- }
-
- /** {@inheritDoc} */
- @Override public void writeFloat(float v) {
- mem.writeFloat(move(4), v);
- }
-
- /** {@inheritDoc} */
- @Override public void writeDouble(double v) {
- mem.writeDouble(move(8), v);
- }
-
- /** {@inheritDoc} */
- @Override public void writeBytes(String s) {
- writeUTF(s);
- }
-
- /** {@inheritDoc} */
- @Override public void writeChars(String s) {
- writeUTF(s);
- }
-
- /** {@inheritDoc} */
- @Override public void writeUTF(String s) {
- byte[] b = s.getBytes(StandardCharsets.UTF_8);
-
- writeInt(b.length);
- write(b);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java
deleted file mode 100644
index acc9be6..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle.streams;
-
-/**
- * Offheap buffer.
- */
-public class HadoopOffheapBuffer {
- /** Buffer begin address. */
- private long bufPtr;
-
- /** The first address we do not own. */
- private long bufEnd;
-
- /** Current read or write pointer. */
- private long posPtr;
-
- /**
- * @param bufPtr Pointer to buffer begin.
- * @param bufSize Size of the buffer.
- */
- public HadoopOffheapBuffer(long bufPtr, long bufSize) {
- set(bufPtr, bufSize);
- }
-
- /**
- * @param bufPtr Pointer to buffer begin.
- * @param bufSize Size of the buffer.
- */
- public void set(long bufPtr, long bufSize) {
- this.bufPtr = bufPtr;
-
- posPtr = bufPtr;
- bufEnd = bufPtr + bufSize;
- }
-
- /**
- * @return Pointer to internal buffer begin.
- */
- public long begin() {
- return bufPtr;
- }
-
- /**
- * @return Buffer capacity.
- */
- public long capacity() {
- return bufEnd - bufPtr;
- }
-
- /**
- * @return Remaining capacity.
- */
- public long remaining() {
- return bufEnd - posPtr;
- }
-
- /**
- * @return Absolute pointer to the current position inside of the buffer.
- */
- public long pointer() {
- return posPtr;
- }
-
- /**
- * @param ptr Absolute pointer to the current position inside of the buffer.
- */
- public void pointer(long ptr) {
- assert ptr >= bufPtr : bufPtr + " <= " + ptr;
- assert ptr <= bufEnd : bufEnd + " <= " + bufPtr;
-
- posPtr = ptr;
- }
-
- /**
- * @param size Size move on.
- * @return Old position pointer or {@code 0} if move goes beyond the end of the buffer.
- */
- public long move(long size) {
- assert size > 0 : size;
-
- long oldPos = posPtr;
- long newPos = oldPos + size;
-
- if (newPos > bufEnd)
- return 0;
-
- posPtr = newPos;
-
- return oldPos;
- }
-
- /**
- * @param ptr Pointer.
- * @return {@code true} If the given pointer is inside of this buffer.
- */
- public boolean isInside(long ptr) {
- return ptr >= bufPtr && ptr <= bufEnd;
- }
-
- /**
- * Resets position to the beginning of buffer.
- */
- public void reset() {
- posPtr = bufPtr;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java
deleted file mode 100644
index 5ede18e..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor;
-
-import java.util.Collection;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
-import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobMetadata;
-import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker;
-import org.apache.ignite.internal.util.GridConcurrentHashSet;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-
-/**
- * Task executor.
- */
-public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter {
- /** Job tracker. */
- private HadoopJobTracker jobTracker;
-
- /** */
- private final ConcurrentMap<HadoopJobId, Collection<HadoopRunnableTask>> jobs = new ConcurrentHashMap<>();
-
- /** Executor service to run tasks. */
- private HadoopExecutorService exec;
-
- /** {@inheritDoc} */
- @Override public void onKernalStart() throws IgniteCheckedException {
- super.onKernalStart();
-
- jobTracker = ctx.jobTracker();
-
- exec = new HadoopExecutorService(log, ctx.kernalContext().gridName(),
- ctx.configuration().getMaxParallelTasks(), ctx.configuration().getMaxTaskQueueSize());
- }
-
- /** {@inheritDoc} */
- @Override public void onKernalStop(boolean cancel) {
- if (exec != null) {
- exec.shutdown(3000);
-
- if (cancel) {
- for (HadoopJobId jobId : jobs.keySet())
- cancelTasks(jobId);
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public void stop(boolean cancel) {
- if (exec != null && !exec.shutdown(30000))
- U.warn(log, "Failed to finish running tasks in 30 sec.");
- }
-
- /** {@inheritDoc} */
- @Override public void run(final HadoopJob job, Collection<HadoopTaskInfo> tasks) throws IgniteCheckedException {
- if (log.isDebugEnabled())
- log.debug("Submitting tasks for local execution [locNodeId=" + ctx.localNodeId() +
- ", tasksCnt=" + tasks.size() + ']');
-
- Collection<HadoopRunnableTask> executedTasks = jobs.get(job.id());
-
- if (executedTasks == null) {
- executedTasks = new GridConcurrentHashSet<>();
-
- Collection<HadoopRunnableTask> extractedCol = jobs.put(job.id(), executedTasks);
-
- assert extractedCol == null;
- }
-
- final Collection<HadoopRunnableTask> finalExecutedTasks = executedTasks;
-
- for (final HadoopTaskInfo info : tasks) {
- assert info != null;
-
- HadoopRunnableTask task = new HadoopRunnableTask(log, job, ctx.shuffle().memory(), info,
- ctx.localNodeId()) {
- @Override protected void onTaskFinished(HadoopTaskStatus status) {
- if (log.isDebugEnabled())
- log.debug("Finished task execution [jobId=" + job.id() + ", taskInfo=" + info + ", " +
- "waitTime=" + waitTime() + ", execTime=" + executionTime() + ']');
-
- finalExecutedTasks.remove(this);
-
- jobTracker.onTaskFinished(info, status);
- }
-
- @Override protected HadoopTaskInput createInput(HadoopTaskContext taskCtx) throws IgniteCheckedException {
- return ctx.shuffle().input(taskCtx);
- }
-
- @Override protected HadoopTaskOutput createOutput(HadoopTaskContext taskCtx) throws IgniteCheckedException {
- return ctx.shuffle().output(taskCtx);
- }
- };
-
- executedTasks.add(task);
-
- exec.submit(task);
- }
- }
-
- /**
- * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks
- * for this job ID.
- * <p>
- * It is guaranteed that this method will not be called concurrently with
- * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method. No more job submissions will be performed via
- * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method for given job ID after this method is called.
- *
- * @param jobId Job ID to cancel.
- */
- @Override public void cancelTasks(HadoopJobId jobId) {
- Collection<HadoopRunnableTask> executedTasks = jobs.get(jobId);
-
- if (executedTasks != null) {
- for (HadoopRunnableTask task : executedTasks)
- task.cancel();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void onJobStateChanged(HadoopJobMetadata meta) throws IgniteCheckedException {
- if (meta.phase() == HadoopJobPhase.PHASE_COMPLETE) {
- Collection<HadoopRunnableTask> executedTasks = jobs.remove(meta.jobId());
-
- assert executedTasks == null || executedTasks.isEmpty();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java
deleted file mode 100644
index 993ecc9..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor;
-
-
-import java.util.Collection;
-import java.util.concurrent.Callable;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.internal.util.worker.GridWorkerListener;
-import org.apache.ignite.internal.util.worker.GridWorkerListenerAdapter;
-import org.apache.ignite.thread.IgniteThread;
-import org.jsr166.ConcurrentHashMap8;
-
-import static java.util.Collections.newSetFromMap;
-
-/**
- * Executor service without thread pooling.
- */
-public class HadoopExecutorService {
- /** */
- private final LinkedBlockingQueue<Callable<?>> queue;
-
- /** */
- private final Collection<GridWorker> workers = newSetFromMap(new ConcurrentHashMap8<GridWorker, Boolean>());
-
- /** */
- private final AtomicInteger active = new AtomicInteger();
-
- /** */
- private final int maxTasks;
-
- /** */
- private final String gridName;
-
- /** */
- private final IgniteLogger log;
-
- /** */
- private volatile boolean shutdown;
-
- /** */
- private final GridWorkerListener lsnr = new GridWorkerListenerAdapter() {
- @Override public void onStopped(GridWorker w) {
- workers.remove(w);
-
- if (shutdown) {
- active.decrementAndGet();
-
- return;
- }
-
- Callable<?> task = queue.poll();
-
- if (task != null)
- startThread(task);
- else {
- active.decrementAndGet();
-
- if (!queue.isEmpty())
- startFromQueue();
- }
- }
- };
-
- /**
- * @param log Logger.
- * @param gridName Grid name.
- * @param maxTasks Max number of tasks.
- * @param maxQueue Max queue length.
- */
- public HadoopExecutorService(IgniteLogger log, String gridName, int maxTasks, int maxQueue) {
- assert maxTasks > 0 : maxTasks;
- assert maxQueue > 0 : maxQueue;
-
- this.maxTasks = maxTasks;
- this.queue = new LinkedBlockingQueue<>(maxQueue);
- this.gridName = gridName;
- this.log = log.getLogger(HadoopExecutorService.class);
- }
-
- /**
- * @return Number of active workers.
- */
- public int active() {
- return workers.size();
- }
-
- /**
- * Submit task.
- *
- * @param task Task.
- */
- public void submit(Callable<?> task) {
- while (queue.isEmpty()) {
- int active0 = active.get();
-
- if (active0 == maxTasks)
- break;
-
- if (active.compareAndSet(active0, active0 + 1)) {
- startThread(task);
-
- return; // Started in new thread bypassing queue.
- }
- }
-
- try {
- while (!queue.offer(task, 100, TimeUnit.MILLISECONDS)) {
- if (shutdown)
- return; // Rejected due to shutdown.
- }
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- return;
- }
-
- startFromQueue();
- }
-
- /**
- * Attempts to start task from queue.
- */
- private void startFromQueue() {
- do {
- int active0 = active.get();
-
- if (active0 == maxTasks)
- break;
-
- if (active.compareAndSet(active0, active0 + 1)) {
- Callable<?> task = queue.poll();
-
- if (task == null) {
- int res = active.decrementAndGet();
-
- assert res >= 0 : res;
-
- break;
- }
-
- startThread(task);
- }
- }
- while (!queue.isEmpty());
- }
-
- /**
- * @param task Task.
- */
- private void startThread(final Callable<?> task) {
- String workerName;
-
- if (task instanceof HadoopRunnableTask) {
- final HadoopTaskInfo i = ((HadoopRunnableTask)task).taskInfo();
-
- workerName = "Hadoop-task-" + i.jobId() + "-" + i.type() + "-" + i.taskNumber() + "-" + i.attempt();
- }
- else
- workerName = task.toString();
-
- GridWorker w = new GridWorker(gridName, workerName, log, lsnr) {
- @Override protected void body() {
- try {
- task.call();
- }
- catch (Exception e) {
- log.error("Failed to execute task: " + task, e);
- }
- }
- };
-
- workers.add(w);
-
- if (shutdown)
- w.cancel();
-
- new IgniteThread(w).start();
- }
-
- /**
- * Shuts down this executor service.
- *
- * @param awaitTimeMillis Time in milliseconds to wait for tasks completion.
- * @return {@code true} If all tasks completed.
- */
- public boolean shutdown(long awaitTimeMillis) {
- shutdown = true;
-
- for (GridWorker w : workers)
- w.cancel();
-
- while (awaitTimeMillis > 0 && !workers.isEmpty()) {
- try {
- Thread.sleep(100);
-
- awaitTimeMillis -= 100;
- }
- catch (InterruptedException e) {
- break;
- }
- }
-
- return workers.isEmpty();
- }
-
- /**
- * @return {@code true} If method {@linkplain #shutdown(long)} was already called.
- */
- public boolean isShutdown() {
- return shutdown;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
deleted file mode 100644
index a57efe6..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor;
-
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter;
-import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopHashMultimap;
-import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopMultimap;
-import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopSkipList;
-import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.COMBINER_HASHMAP_SIZE;
-import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_COMBINER_NO_SORTING;
-import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get;
-import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.COMBINE;
-import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.MAP;
-
-/**
- * Runnable task.
- */
-public abstract class HadoopRunnableTask implements Callable<Void> {
- /** */
- private final GridUnsafeMemory mem;
-
- /** */
- private final IgniteLogger log;
-
- /** */
- private final HadoopJob job;
-
- /** Task to run. */
- private final HadoopTaskInfo info;
-
- /** Submit time. */
- private final long submitTs = U.currentTimeMillis();
-
- /** Execution start timestamp. */
- private long execStartTs;
-
- /** Execution end timestamp. */
- private long execEndTs;
-
- /** */
- private HadoopMultimap combinerInput;
-
- /** */
- private volatile HadoopTaskContext ctx;
-
- /** Set if task is to cancelling. */
- private volatile boolean cancelled;
-
- /** Node id. */
- private UUID nodeId;
-
- /**
- * @param log Log.
- * @param job Job.
- * @param mem Memory.
- * @param info Task info.
- * @param nodeId Node id.
- */
- protected HadoopRunnableTask(IgniteLogger log, HadoopJob job, GridUnsafeMemory mem, HadoopTaskInfo info,
- UUID nodeId) {
- this.nodeId = nodeId;
- this.log = log.getLogger(HadoopRunnableTask.class);
- this.job = job;
- this.mem = mem;
- this.info = info;
- }
-
- /**
- * @return Wait time.
- */
- public long waitTime() {
- return execStartTs - submitTs;
- }
-
- /**
- * @return Execution time.
- */
- public long executionTime() {
- return execEndTs - execStartTs;
- }
-
- /** {@inheritDoc} */
- @Override public Void call() throws IgniteCheckedException {
- ctx = job.getTaskContext(info);
-
- return ctx.runAsJobOwner(new Callable<Void>() {
- @Override public Void call() throws Exception {
- call0();
-
- return null;
- }
- });
- }
-
- /**
- * Implements actual task running.
- * @throws IgniteCheckedException
- */
- void call0() throws IgniteCheckedException {
- execStartTs = U.currentTimeMillis();
-
- Throwable err = null;
-
- HadoopTaskState state = HadoopTaskState.COMPLETED;
-
- HadoopPerformanceCounter perfCntr = null;
-
- try {
- perfCntr = HadoopPerformanceCounter.getCounter(ctx.counters(), nodeId);
-
- perfCntr.onTaskSubmit(info, submitTs);
- perfCntr.onTaskPrepare(info, execStartTs);
-
- ctx.prepareTaskEnvironment();
-
- runTask(perfCntr);
-
- if (info.type() == MAP && job.info().hasCombiner()) {
- ctx.taskInfo(new HadoopTaskInfo(COMBINE, info.jobId(), info.taskNumber(), info.attempt(), null));
-
- try {
- runTask(perfCntr);
- }
- finally {
- ctx.taskInfo(info);
- }
- }
- }
- catch (HadoopTaskCancelledException ignored) {
- state = HadoopTaskState.CANCELED;
- }
- catch (Throwable e) {
- state = HadoopTaskState.FAILED;
- err = e;
-
- U.error(log, "Task execution failed.", e);
-
- if (e instanceof Error)
- throw e;
- }
- finally {
- execEndTs = U.currentTimeMillis();
-
- if (perfCntr != null)
- perfCntr.onTaskFinish(info, execEndTs);
-
- onTaskFinished(new HadoopTaskStatus(state, err, ctx==null ? null : ctx.counters()));
-
- if (combinerInput != null)
- combinerInput.close();
-
- if (ctx != null)
- ctx.cleanupTaskEnvironment();
- }
- }
-
- /**
- * @param perfCntr Performance counter.
- * @throws IgniteCheckedException If failed.
- */
- private void runTask(HadoopPerformanceCounter perfCntr) throws IgniteCheckedException {
- if (cancelled)
- throw new HadoopTaskCancelledException("Task cancelled.");
-
- try (HadoopTaskOutput out = createOutputInternal(ctx);
- HadoopTaskInput in = createInputInternal(ctx)) {
-
- ctx.input(in);
- ctx.output(out);
-
- perfCntr.onTaskStart(ctx.taskInfo(), U.currentTimeMillis());
-
- ctx.run();
- }
- }
-
- /**
- * Cancel the executed task.
- */
- public void cancel() {
- cancelled = true;
-
- if (ctx != null)
- ctx.cancel();
- }
-
- /**
- * @param status Task status.
- */
- protected abstract void onTaskFinished(HadoopTaskStatus status);
-
- /**
- * @param ctx Task context.
- * @return Task input.
- * @throws IgniteCheckedException If failed.
- */
- @SuppressWarnings("unchecked")
- private HadoopTaskInput createInputInternal(HadoopTaskContext ctx) throws IgniteCheckedException {
- switch (ctx.taskInfo().type()) {
- case SETUP:
- case MAP:
- case COMMIT:
- case ABORT:
- return null;
-
- case COMBINE:
- assert combinerInput != null;
-
- return combinerInput.input(ctx);
-
- default:
- return createInput(ctx);
- }
- }
-
- /**
- * @param ctx Task context.
- * @return Input.
- * @throws IgniteCheckedException If failed.
- */
- protected abstract HadoopTaskInput createInput(HadoopTaskContext ctx) throws IgniteCheckedException;
-
- /**
- * @param ctx Task info.
- * @return Output.
- * @throws IgniteCheckedException If failed.
- */
- protected abstract HadoopTaskOutput createOutput(HadoopTaskContext ctx) throws IgniteCheckedException;
-
- /**
- * @param ctx Task info.
- * @return Task output.
- * @throws IgniteCheckedException If failed.
- */
- private HadoopTaskOutput createOutputInternal(HadoopTaskContext ctx) throws IgniteCheckedException {
- switch (ctx.taskInfo().type()) {
- case SETUP:
- case REDUCE:
- case COMMIT:
- case ABORT:
- return null;
-
- case MAP:
- if (job.info().hasCombiner()) {
- assert combinerInput == null;
-
- combinerInput = get(job.info(), SHUFFLE_COMBINER_NO_SORTING, false) ?
- new HadoopHashMultimap(job.info(), mem, get(job.info(), COMBINER_HASHMAP_SIZE, 8 * 1024)):
- new HadoopSkipList(job.info(), mem); // TODO replace with red-black tree
-
- return combinerInput.startAdding(ctx);
- }
-
- default:
- return createOutput(ctx);
- }
- }
-
- /**
- * @return Task info.
- */
- public HadoopTaskInfo taskInfo() {
- return info;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java
deleted file mode 100644
index f13c76a..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor;
-
-import java.util.Collection;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopComponent;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobMetadata;
-
-/**
- * Common superclass for task executor.
- */
-public abstract class HadoopTaskExecutorAdapter extends HadoopComponent {
- /**
- * Runs tasks.
- *
- * @param job Job.
- * @param tasks Tasks.
- * @throws IgniteCheckedException If failed.
- */
- public abstract void run(final HadoopJob job, Collection<HadoopTaskInfo> tasks) throws IgniteCheckedException;
-
- /**
- * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks
- * for this job ID.
- * <p>
- * It is guaranteed that this method will not be called concurrently with
- * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method. No more job submissions will be performed via
- * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method for given job ID after this method is called.
- *
- * @param jobId Job ID to cancel.
- */
- public abstract void cancelTasks(HadoopJobId jobId) throws IgniteCheckedException;
-
- /**
- * On job state change callback;
- *
- * @param meta Job metadata.
- */
- public abstract void onJobStateChanged(HadoopJobMetadata meta) throws IgniteCheckedException;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java
deleted file mode 100644
index b22d291..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor;
-
-/**
-* State of the task.
-*/
-public enum HadoopTaskState {
- /** Running task. */
- RUNNING,
-
- /** Completed task. */
- COMPLETED,
-
- /** Failed task. */
- FAILED,
-
- /** Canceled task. */
- CANCELED,
-
- /** Process crashed. */
- CRASHED
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java
deleted file mode 100644
index fa09ff7..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Task status.
- */
-public class HadoopTaskStatus implements Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private HadoopTaskState state;
-
- /** */
- private Throwable failCause;
-
- /** */
- private HadoopCounters cntrs;
-
- /**
- * Default constructor required by {@link Externalizable}.
- */
- public HadoopTaskStatus() {
- // No-op.
- }
-
- /**
- * Creates new instance.
- *
- * @param state Task state.
- * @param failCause Failure cause (if any).
- */
- public HadoopTaskStatus(HadoopTaskState state, @Nullable Throwable failCause) {
- this(state, failCause, null);
- }
-
- /**
- * Creates new instance.
- *
- * @param state Task state.
- * @param failCause Failure cause (if any).
- * @param cntrs Task counters.
- */
- public HadoopTaskStatus(HadoopTaskState state, @Nullable Throwable failCause,
- @Nullable HadoopCounters cntrs) {
- assert state != null;
-
- this.state = state;
- this.failCause = failCause;
- this.cntrs = cntrs;
- }
-
- /**
- * @return State.
- */
- public HadoopTaskState state() {
- return state;
- }
-
- /**
- * @return Fail cause.
- */
- @Nullable public Throwable failCause() {
- return failCause;
- }
-
- /**
- * @return Counters.
- */
- @Nullable public HadoopCounters counters() {
- return cntrs;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(HadoopTaskStatus.class, this);
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(state);
- out.writeObject(failCause);
- out.writeObject(cntrs);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- state = (HadoopTaskState)in.readObject();
- failCause = (Throwable)in.readObject();
- cntrs = (HadoopCounters)in.readObject();
- }
-}
\ No newline at end of file