You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/21 14:54:07 UTC
[75/92] [abbrv] ignite git commit: Preparing big move.
http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java
new file mode 100644
index 0000000..acc9be6
--- /dev/null
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.streams;
+
+/**
+ * Offheap buffer.
+ */
+public class HadoopOffheapBuffer {
+ /** Buffer begin address. */
+ private long bufPtr;
+
+ /** The first address we do not own. */
+ private long bufEnd;
+
+ /** Current read or write pointer. */
+ private long posPtr;
+
+ /**
+ * @param bufPtr Pointer to buffer begin.
+ * @param bufSize Size of the buffer.
+ */
+ public HadoopOffheapBuffer(long bufPtr, long bufSize) {
+ set(bufPtr, bufSize);
+ }
+
+ /**
+ * @param bufPtr Pointer to buffer begin.
+ * @param bufSize Size of the buffer.
+ */
+ public void set(long bufPtr, long bufSize) {
+ this.bufPtr = bufPtr;
+
+ posPtr = bufPtr;
+ bufEnd = bufPtr + bufSize;
+ }
+
+ /**
+ * @return Pointer to internal buffer begin.
+ */
+ public long begin() {
+ return bufPtr;
+ }
+
+ /**
+ * @return Buffer capacity.
+ */
+ public long capacity() {
+ return bufEnd - bufPtr;
+ }
+
+ /**
+ * @return Remaining capacity.
+ */
+ public long remaining() {
+ return bufEnd - posPtr;
+ }
+
+ /**
+ * @return Absolute pointer to the current position inside of the buffer.
+ */
+ public long pointer() {
+ return posPtr;
+ }
+
+ /**
+ * @param ptr Absolute pointer to the current position inside of the buffer.
+ */
+ public void pointer(long ptr) {
+ assert ptr >= bufPtr : bufPtr + " <= " + ptr;
+ assert ptr <= bufEnd : bufEnd + " <= " + bufPtr;
+
+ posPtr = ptr;
+ }
+
+ /**
+ * @param size Size move on.
+ * @return Old position pointer or {@code 0} if move goes beyond the end of the buffer.
+ */
+ public long move(long size) {
+ assert size > 0 : size;
+
+ long oldPos = posPtr;
+ long newPos = oldPos + size;
+
+ if (newPos > bufEnd)
+ return 0;
+
+ posPtr = newPos;
+
+ return oldPos;
+ }
+
+ /**
+ * @param ptr Pointer.
+ * @return {@code true} If the given pointer is inside of this buffer.
+ */
+ public boolean isInside(long ptr) {
+ return ptr >= bufPtr && ptr <= bufEnd;
+ }
+
+ /**
+ * Resets position to the beginning of buffer.
+ */
+ public void reset() {
+ posPtr = bufPtr;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
index f156669..d2c75b4 100644
--- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
+++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTaskExecutionSelfTest.java
@@ -50,6 +50,7 @@ import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.hadoop.Hadoop;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java
deleted file mode 100644
index 9d1fd4f..0000000
--- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections;
-
-import java.util.Comparator;
-import java.util.concurrent.Callable;
-import org.apache.commons.collections.comparators.ComparableComparator;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.processors.hadoop.HadoopHelper;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
-import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopWritableSerialization;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Abstract class for maps test.
- */
-public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest {
- /**
- * Test task context.
- */
- protected static class TaskContext extends HadoopTaskContext {
- /**
- */
- protected TaskContext() {
- super(null, null);
- }
-
- /** {@inheritDoc} */
- @Override public <T extends HadoopCounter> T counter(String grp, String name, Class<T> cls) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public HadoopCounters counters() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public HadoopPartitioner partitioner() throws IgniteCheckedException {
- assert false;
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public HadoopSerialization keySerialization() throws IgniteCheckedException {
- return new HadoopWritableSerialization(IntWritable.class);
- }
-
- /** {@inheritDoc} */
- @Override public HadoopSerialization valueSerialization() throws IgniteCheckedException {
- return new HadoopWritableSerialization(IntWritable.class);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public Comparator<Object> sortComparator() {
- return ComparableComparator.getInstance();
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public Comparator<Object> groupComparator() {
- return ComparableComparator.getInstance();
- }
-
- /** {@inheritDoc} */
- @Override public void run() throws IgniteCheckedException {
- assert false;
- }
-
- /** {@inheritDoc} */
- @Override public void cancel() {
- assert false;
- }
-
- /** {@inheritDoc} */
- @Override public void prepareTaskEnvironment() throws IgniteCheckedException {
- assert false;
- }
-
- /** {@inheritDoc} */
- @Override public void cleanupTaskEnvironment() throws IgniteCheckedException {
- assert false;
- }
-
- /** {@inheritDoc} */
- @Override public <T> T runAsJobOwner(Callable<T> c) throws IgniteCheckedException {
- try {
- return c.call();
- }
- catch (Exception e) {
- throw new IgniteCheckedException(e);
- }
- }
- }
-
- /**
- * Test job info.
- */
- protected static class JobInfo implements HadoopJobInfo {
- /** {@inheritDoc} */
- @Nullable @Override public String property(String name) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasCombiner() {
- assert false;
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasReducer() {
- assert false;
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls, HadoopJobId jobId, IgniteLogger log,
- @Nullable String[] libNames, HadoopHelper helper) throws IgniteCheckedException {
- assert false;
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public int reducers() {
- assert false;
-
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public String jobName() {
- assert false;
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public String user() {
- assert false;
-
- return null;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopConcurrentHashMultimapSelftest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopConcurrentHashMultimapSelftest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopConcurrentHashMultimapSelftest.java
deleted file mode 100644
index f5a2a3f..0000000
--- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopConcurrentHashMultimapSelftest.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections;
-
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Deque;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
-import org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections.HadoopConcurrentHashMultimap;
-import org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections.HadoopMultimap;
-import org.apache.ignite.internal.util.GridRandom;
-import org.apache.ignite.internal.util.GridUnsafe;
-import org.apache.ignite.internal.util.io.GridDataInput;
-import org.apache.ignite.internal.util.io.GridUnsafeDataInput;
-import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
-import org.apache.ignite.internal.util.typedef.X;
-
-/**
- *
- */
-public class HadoopConcurrentHashMultimapSelftest extends HadoopAbstractMapTest {
- /** */
- public void testMapSimple() throws Exception {
- GridUnsafeMemory mem = new GridUnsafeMemory(0);
-
-// mem.listen(new GridOffHeapEventListener() {
-// @Override public void onEvent(GridOffHeapEvent evt) {
-// if (evt == GridOffHeapEvent.ALLOCATE)
-// U.dumpStack();
-// }
-// });
-
- Random rnd = new Random();
-
- int mapSize = 16 << rnd.nextInt(3);
-
- HadoopJobInfo job = new JobInfo();
-
- HadoopTaskContext taskCtx = new TaskContext();
-
- HadoopConcurrentHashMultimap m = new HadoopConcurrentHashMultimap(job, mem, mapSize);
-
- HadoopConcurrentHashMultimap.Adder a = m.startAdding(taskCtx);
-
- Multimap<Integer, Integer> mm = ArrayListMultimap.create();
- Multimap<Integer, Integer> vis = ArrayListMultimap.create();
-
- for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) {
- int key = rnd.nextInt(mapSize);
- int val = rnd.nextInt();
-
- a.write(new IntWritable(key), new IntWritable(val));
- mm.put(key, val);
-
- X.println("k: " + key + " v: " + val);
-
- a.close();
-
- check(m, mm, vis, taskCtx);
-
- a = m.startAdding(taskCtx);
- }
-
-// a.add(new IntWritable(10), new IntWritable(2));
-// mm.put(10, 2);
-// check(m, mm);
-
- a.close();
-
- X.println("Alloc: " + mem.allocatedSize());
-
- m.close();
-
- assertEquals(0, mem.allocatedSize());
- }
-
- private void check(HadoopConcurrentHashMultimap m, Multimap<Integer, Integer> mm,
- final Multimap<Integer, Integer> vis, HadoopTaskContext taskCtx) throws Exception {
- final HadoopTaskInput in = m.input(taskCtx);
-
- Map<Integer, Collection<Integer>> mmm = mm.asMap();
-
- int keys = 0;
-
- while (in.next()) {
- keys++;
-
- IntWritable k = (IntWritable)in.key();
-
- assertNotNull(k);
-
- Deque<Integer> vs = new LinkedList<>();
-
- Iterator<?> it = in.values();
-
- while (it.hasNext())
- vs.addFirst(((IntWritable) it.next()).get());
-
- Collection<Integer> exp = mmm.get(k.get());
-
- assertEquals(exp, vs);
- }
-
- assertEquals(mmm.size(), keys);
-
- assertEquals(m.keys(), keys);
-
- X.println("keys: " + keys + " cap: " + m.capacity());
-
- // Check visitor.
-
- final byte[] buf = new byte[4];
-
- final GridDataInput dataInput = new GridUnsafeDataInput();
-
- m.visit(false, new HadoopConcurrentHashMultimap.Visitor() {
- /** */
- IntWritable key = new IntWritable();
-
- /** */
- IntWritable val = new IntWritable();
-
- @Override public void onKey(long keyPtr, int keySize) {
- read(keyPtr, keySize, key);
- }
-
- @Override public void onValue(long valPtr, int valSize) {
- read(valPtr, valSize, val);
-
- vis.put(key.get(), val.get());
- }
-
- private void read(long ptr, int size, Writable w) {
- assert size == 4 : size;
-
- GridUnsafe.copyMemory(null, ptr, buf, GridUnsafe.BYTE_ARR_OFF, size);
-
- dataInput.bytes(buf, size);
-
- try {
- w.readFields(dataInput);
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- });
-
-// X.println("vis: " + vis);
-
- assertEquals(mm, vis);
-
- in.close();
- }
-
- /**
- * @throws Exception if failed.
- */
- public void testMultiThreaded() throws Exception {
- GridUnsafeMemory mem = new GridUnsafeMemory(0);
-
- X.println("___ Started");
-
- Random rnd = new GridRandom();
-
- for (int i = 0; i < 20; i++) {
- HadoopJobInfo job = new JobInfo();
-
- final HadoopTaskContext taskCtx = new TaskContext();
-
- final HadoopConcurrentHashMultimap m = new HadoopConcurrentHashMultimap(job, mem, 16);
-
- final ConcurrentMap<Integer, Collection<Integer>> mm = new ConcurrentHashMap<>();
-
- X.println("___ MT");
-
- multithreaded(new Callable<Object>() {
- @Override public Object call() throws Exception {
- X.println("___ TH in");
-
- Random rnd = new GridRandom();
-
- IntWritable key = new IntWritable();
- IntWritable val = new IntWritable();
-
- HadoopMultimap.Adder a = m.startAdding(taskCtx);
-
- for (int i = 0; i < 50000; i++) {
- int k = rnd.nextInt(32000);
- int v = rnd.nextInt();
-
- key.set(k);
- val.set(v);
-
- a.write(key, val);
-
- Collection<Integer> list = mm.get(k);
-
- if (list == null) {
- list = new ConcurrentLinkedQueue<>();
-
- Collection<Integer> old = mm.putIfAbsent(k, list);
-
- if (old != null)
- list = old;
- }
-
- list.add(v);
- }
-
- a.close();
-
- X.println("___ TH out");
-
- return null;
- }
- }, 3 + rnd.nextInt(27));
-
- X.println("___ Check: " + m.capacity());
-
- assertEquals(mm.size(), m.keys());
-
- assertTrue(m.capacity() > 32000);
-
- HadoopTaskInput in = m.input(taskCtx);
-
- while (in.next()) {
- IntWritable key = (IntWritable) in.key();
-
- Iterator<?> valsIter = in.values();
-
- Collection<Integer> vals = mm.remove(key.get());
-
- assertNotNull(vals);
-
- while (valsIter.hasNext()) {
- IntWritable val = (IntWritable) valsIter.next();
-
- assertTrue(vals.remove(val.get()));
- }
-
- assertTrue(vals.isEmpty());
- }
-
- in.close();
- m.close();
-
- assertEquals(0, mem.allocatedSize());
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopHashMapSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopHashMapSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopHashMapSelfTest.java
deleted file mode 100644
index c38b021..0000000
--- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopHashMapSelfTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections;
-
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
-import org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections.HadoopHashMultimap;
-import org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections.HadoopMultimap;
-import org.apache.ignite.internal.util.GridLongList;
-import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
-import org.apache.ignite.internal.util.typedef.X;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Random;
-
-/**
- *
- */
-public class HadoopHashMapSelfTest extends HadoopAbstractMapTest {
- /**
- * Test simple map.
- *
- * @throws Exception If failed.
- */
- public void testMapSimple() throws Exception {
- GridUnsafeMemory mem = new GridUnsafeMemory(0);
-
- Random rnd = new Random();
-
- int mapSize = 16 << rnd.nextInt(3);
-
- HadoopTaskContext taskCtx = new TaskContext();
-
- final HadoopHashMultimap m = new HadoopHashMultimap(new JobInfo(), mem, mapSize);
-
- HadoopMultimap.Adder a = m.startAdding(taskCtx);
-
- Multimap<Integer, Integer> mm = ArrayListMultimap.create();
-
- for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) {
- int key = rnd.nextInt(mapSize);
- int val = rnd.nextInt();
-
- a.write(new IntWritable(key), new IntWritable(val));
- mm.put(key, val);
-
- X.println("k: " + key + " v: " + val);
-
- a.close();
-
- check(m, mm, taskCtx);
-
- a = m.startAdding(taskCtx);
- }
-
-// a.add(new IntWritable(10), new IntWritable(2));
-// mm.put(10, 2);
-// check(m, mm);
-
- a.close();
-
- X.println("Alloc: " + mem.allocatedSize());
-
- m.close();
-
- assertEquals(0, mem.allocatedSize());
- }
-
- private void check(HadoopHashMultimap m, Multimap<Integer, Integer> mm, HadoopTaskContext taskCtx) throws Exception {
- final HadoopTaskInput in = m.input(taskCtx);
-
- Map<Integer, Collection<Integer>> mmm = mm.asMap();
-
- int keys = 0;
-
- while (in.next()) {
- keys++;
-
- IntWritable k = (IntWritable)in.key();
-
- assertNotNull(k);
-
- ArrayList<Integer> vs = new ArrayList<>();
-
- Iterator<?> it = in.values();
-
- while (it.hasNext())
- vs.add(((IntWritable) it.next()).get());
-
- Collection<Integer> exp = mmm.get(k.get());
-
- assertEquals(sorted(exp), sorted(vs));
- }
-
- X.println("keys: " + keys + " cap: " + m.capacity());
-
- assertEquals(mmm.size(), keys);
-
- assertEquals(m.keys(), keys);
-
- in.close();
- }
-
- private GridLongList sorted(Collection<Integer> col) {
- GridLongList lst = new GridLongList(col.size());
-
- for (Integer i : col)
- lst.add(i);
-
- return lst.sort();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopSkipListSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopSkipListSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopSkipListSelfTest.java
deleted file mode 100644
index 11430ca..0000000
--- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopSkipListSelfTest.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections;
-
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Deque;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
-import org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections.HadoopMultimap;
-import org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections.HadoopSkipList;
-import org.apache.ignite.internal.util.GridRandom;
-import org.apache.ignite.internal.util.GridUnsafe;
-import org.apache.ignite.internal.util.io.GridDataInput;
-import org.apache.ignite.internal.util.io.GridUnsafeDataInput;
-import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
-import org.apache.ignite.internal.util.typedef.X;
-
-import static java.lang.Math.abs;
-import static java.lang.Math.ceil;
-import static java.lang.Math.max;
-
-/**
- * Skip list tests.
- */
-public class HadoopSkipListSelfTest extends HadoopAbstractMapTest {
- /**
- *
- */
- public void testLevel() {
- Random rnd = new GridRandom();
-
- int[] levelsCnts = new int[32];
-
- int all = 10000;
-
- for (int i = 0; i < all; i++) {
- int level = HadoopSkipList.randomLevel(rnd);
-
- levelsCnts[level]++;
- }
-
- X.println("Distribution: " + Arrays.toString(levelsCnts));
-
- for (int level = 0; level < levelsCnts.length; level++) {
- int exp = (level + 1) == levelsCnts.length ? 0 : all >>> (level + 1);
-
- double precission = 0.72 / Math.max(32 >>> level, 1);
-
- int sigma = max((int)ceil(precission * exp), 5);
-
- X.println("Level: " + level + " exp: " + exp + " act: " + levelsCnts[level] + " precision: " + precission +
- " sigma: " + sigma);
-
- assertTrue(abs(exp - levelsCnts[level]) <= sigma); // Sometimes fails.
- }
- }
-
- public void testMapSimple() throws Exception {
- GridUnsafeMemory mem = new GridUnsafeMemory(0);
-
-// mem.listen(new GridOffHeapEventListener() {
-// @Override public void onEvent(GridOffHeapEvent evt) {
-// if (evt == GridOffHeapEvent.ALLOCATE)
-// U.dumpStack();
-// }
-// });
-
- Random rnd = new Random();
-
- int mapSize = 16 << rnd.nextInt(6);
-
- HadoopJobInfo job = new JobInfo();
-
- HadoopTaskContext taskCtx = new TaskContext();
-
- HadoopMultimap m = new HadoopSkipList(job, mem);
-
- HadoopMultimap.Adder a = m.startAdding(taskCtx);
-
- Multimap<Integer, Integer> mm = ArrayListMultimap.create();
- Multimap<Integer, Integer> vis = ArrayListMultimap.create();
-
- for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) {
- int key = rnd.nextInt(mapSize);
- int val = rnd.nextInt();
-
- a.write(new IntWritable(key), new IntWritable(val));
- mm.put(key, val);
-
- X.println("k: " + key + " v: " + val);
-
- a.close();
-
- check(m, mm, vis, taskCtx);
-
- a = m.startAdding(taskCtx);
- }
-
-// a.add(new IntWritable(10), new IntWritable(2));
-// mm.put(10, 2);
-// check(m, mm);
-
- a.close();
-
- X.println("Alloc: " + mem.allocatedSize());
-
- m.close();
-
- assertEquals(0, mem.allocatedSize());
- }
-
- private void check(HadoopMultimap m, Multimap<Integer, Integer> mm, final Multimap<Integer, Integer> vis, HadoopTaskContext taskCtx)
- throws Exception {
- final HadoopTaskInput in = m.input(taskCtx);
-
- Map<Integer, Collection<Integer>> mmm = mm.asMap();
-
- int keys = 0;
-
- int prevKey = Integer.MIN_VALUE;
-
- while (in.next()) {
- keys++;
-
- IntWritable k = (IntWritable)in.key();
-
- assertNotNull(k);
-
- assertTrue(k.get() > prevKey);
-
- prevKey = k.get();
-
- Deque<Integer> vs = new LinkedList<>();
-
- Iterator<?> it = in.values();
-
- while (it.hasNext())
- vs.addFirst(((IntWritable) it.next()).get());
-
- Collection<Integer> exp = mmm.get(k.get());
-
- assertEquals(exp, vs);
- }
-
- assertEquals(mmm.size(), keys);
-
-//! assertEquals(m.keys(), keys);
-
- // Check visitor.
-
- final byte[] buf = new byte[4];
-
- final GridDataInput dataInput = new GridUnsafeDataInput();
-
- m.visit(false, new HadoopMultimap.Visitor() {
- /** */
- IntWritable key = new IntWritable();
-
- /** */
- IntWritable val = new IntWritable();
-
- @Override public void onKey(long keyPtr, int keySize) {
- read(keyPtr, keySize, key);
- }
-
- @Override public void onValue(long valPtr, int valSize) {
- read(valPtr, valSize, val);
-
- vis.put(key.get(), val.get());
- }
-
- private void read(long ptr, int size, Writable w) {
- assert size == 4 : size;
-
- GridUnsafe.copyMemory(null, ptr, buf, GridUnsafe.BYTE_ARR_OFF, size);
-
- dataInput.bytes(buf, size);
-
- try {
- w.readFields(dataInput);
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- });
-
-// X.println("vis: " + vis);
-
- assertEquals(mm, vis);
-
- in.close();
- }
-
- /**
- * @throws Exception if failed.
- */
- public void testMultiThreaded() throws Exception {
- GridUnsafeMemory mem = new GridUnsafeMemory(0);
-
- X.println("___ Started");
-
- Random rnd = new GridRandom();
-
- for (int i = 0; i < 20; i++) {
- HadoopJobInfo job = new JobInfo();
-
- final HadoopTaskContext taskCtx = new TaskContext();
-
- final HadoopMultimap m = new HadoopSkipList(job, mem);
-
- final ConcurrentMap<Integer, Collection<Integer>> mm = new ConcurrentHashMap<>();
-
- X.println("___ MT");
-
- multithreaded(new Callable<Object>() {
- @Override public Object call() throws Exception {
- X.println("___ TH in");
-
- Random rnd = new GridRandom();
-
- IntWritable key = new IntWritable();
- IntWritable val = new IntWritable();
-
- HadoopMultimap.Adder a = m.startAdding(taskCtx);
-
- for (int i = 0; i < 50000; i++) {
- int k = rnd.nextInt(32000);
- int v = rnd.nextInt();
-
- key.set(k);
- val.set(v);
-
- a.write(key, val);
-
- Collection<Integer> list = mm.get(k);
-
- if (list == null) {
- list = new ConcurrentLinkedQueue<>();
-
- Collection<Integer> old = mm.putIfAbsent(k, list);
-
- if (old != null)
- list = old;
- }
-
- list.add(v);
- }
-
- a.close();
-
- X.println("___ TH out");
-
- return null;
- }
- }, 3 + rnd.nextInt(27));
-
- HadoopTaskInput in = m.input(taskCtx);
-
- int prevKey = Integer.MIN_VALUE;
-
- while (in.next()) {
- IntWritable key = (IntWritable)in.key();
-
- assertTrue(key.get() > prevKey);
-
- prevKey = key.get();
-
- Iterator<?> valsIter = in.values();
-
- Collection<Integer> vals = mm.remove(key.get());
-
- assertNotNull(vals);
-
- while (valsIter.hasNext()) {
- IntWritable val = (IntWritable) valsIter.next();
-
- assertTrue(vals.remove(val.get()));
- }
-
- assertTrue(vals.isEmpty());
- }
-
- in.close();
- m.close();
-
- assertEquals(0, mem.allocatedSize());
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataStreamSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataStreamSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataStreamSelfTest.java
deleted file mode 100644
index 2b6b713..0000000
--- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataStreamSelfTest.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.shuffle.streams;
-
-import java.io.IOException;
-import java.util.Arrays;
-
-import org.apache.ignite.internal.processors.hadoop.impl.shuffle.streams.HadoopDataInStream;
-import org.apache.ignite.internal.processors.hadoop.impl.shuffle.streams.HadoopDataOutStream;
-import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-
-/**
- *
- */
-public class HadoopDataStreamSelfTest extends GridCommonAbstractTest {
-
- public void testStreams() throws IOException {
- GridUnsafeMemory mem = new GridUnsafeMemory(0);
-
- HadoopDataOutStream out = new HadoopDataOutStream(mem);
-
- int size = 4 * 1024;
-
- final long ptr = mem.allocate(size);
-
- out.buffer().set(ptr, size);
-
- out.writeBoolean(false);
- out.writeBoolean(true);
- out.writeBoolean(false);
- out.write(17);
- out.write(121);
- out.write(0xfafa);
- out.writeByte(17);
- out.writeByte(121);
- out.writeByte(0xfafa);
- out.writeChar('z');
- out.writeChar('o');
- out.writeChar('r');
- out.writeShort(100);
- out.writeShort(Short.MIN_VALUE);
- out.writeShort(Short.MAX_VALUE);
- out.writeShort(65535);
- out.writeShort(65536); // 0
- out.writeInt(Integer.MAX_VALUE);
- out.writeInt(Integer.MIN_VALUE);
- out.writeInt(-1);
- out.writeInt(0);
- out.writeInt(1);
- out.writeFloat(0.33f);
- out.writeFloat(0.5f);
- out.writeFloat(-0.7f);
- out.writeFloat(Float.MAX_VALUE);
- out.writeFloat(Float.MIN_VALUE);
- out.writeFloat(Float.MIN_NORMAL);
- out.writeFloat(Float.POSITIVE_INFINITY);
- out.writeFloat(Float.NEGATIVE_INFINITY);
- out.writeFloat(Float.NaN);
- out.writeDouble(-12312312.3333333336666779);
- out.writeDouble(123123.234);
- out.writeDouble(Double.MAX_VALUE);
- out.writeDouble(Double.MIN_VALUE);
- out.writeDouble(Double.MIN_NORMAL);
- out.writeDouble(Double.NEGATIVE_INFINITY);
- out.writeDouble(Double.POSITIVE_INFINITY);
- out.writeDouble(Double.NaN);
- out.writeLong(Long.MAX_VALUE);
- out.writeLong(Long.MIN_VALUE);
- out.writeLong(0);
- out.writeLong(-1L);
- out.write(new byte[]{1,2,3});
- out.write(new byte[]{0,1,2,3}, 1, 2);
- out.writeUTF("mom washes rum");
-
- HadoopDataInStream in = new HadoopDataInStream(mem);
-
- in.buffer().set(ptr, out.buffer().pointer());
-
- assertEquals(false, in.readBoolean());
- assertEquals(true, in.readBoolean());
- assertEquals(false, in.readBoolean());
- assertEquals(17, in.read());
- assertEquals(121, in.read());
- assertEquals(0xfa, in.read());
- assertEquals(17, in.readByte());
- assertEquals(121, in.readByte());
- assertEquals((byte)0xfa, in.readByte());
- assertEquals('z', in.readChar());
- assertEquals('o', in.readChar());
- assertEquals('r', in.readChar());
- assertEquals(100, in.readShort());
- assertEquals(Short.MIN_VALUE, in.readShort());
- assertEquals(Short.MAX_VALUE, in.readShort());
- assertEquals(-1, in.readShort());
- assertEquals(0, in.readShort());
- assertEquals(Integer.MAX_VALUE, in.readInt());
- assertEquals(Integer.MIN_VALUE, in.readInt());
- assertEquals(-1, in.readInt());
- assertEquals(0, in.readInt());
- assertEquals(1, in.readInt());
- assertEquals(0.33f, in.readFloat());
- assertEquals(0.5f, in.readFloat());
- assertEquals(-0.7f, in.readFloat());
- assertEquals(Float.MAX_VALUE, in.readFloat());
- assertEquals(Float.MIN_VALUE, in.readFloat());
- assertEquals(Float.MIN_NORMAL, in.readFloat());
- assertEquals(Float.POSITIVE_INFINITY, in.readFloat());
- assertEquals(Float.NEGATIVE_INFINITY, in.readFloat());
- assertEquals(Float.NaN, in.readFloat());
- assertEquals(-12312312.3333333336666779, in.readDouble());
- assertEquals(123123.234, in.readDouble());
- assertEquals(Double.MAX_VALUE, in.readDouble());
- assertEquals(Double.MIN_VALUE, in.readDouble());
- assertEquals(Double.MIN_NORMAL, in.readDouble());
- assertEquals(Double.NEGATIVE_INFINITY, in.readDouble());
- assertEquals(Double.POSITIVE_INFINITY, in.readDouble());
- assertEquals(Double.NaN, in.readDouble());
- assertEquals(Long.MAX_VALUE, in.readLong());
- assertEquals(Long.MIN_VALUE, in.readLong());
- assertEquals(0, in.readLong());
- assertEquals(-1, in.readLong());
-
- byte[] b = new byte[3];
-
- in.read(b);
-
- assertTrue(Arrays.equals(new byte[]{1,2,3}, b));
-
- b = new byte[4];
-
- in.read(b, 1, 2);
-
- assertTrue(Arrays.equals(new byte[]{0, 1, 2, 0}, b));
-
- assertEquals("mom washes rum", in.readUTF());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
new file mode 100644
index 0000000..07a3ecc
--- /dev/null
+++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
+
+import java.util.Comparator;
+import java.util.concurrent.Callable;
+import org.apache.commons.collections.comparators.ComparableComparator;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.hadoop.HadoopHelper;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
+import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopWritableSerialization;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Abstract class for maps test.
+ */
+public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest {
+ /**
+ * Test task context.
+ */
+ protected static class TaskContext extends HadoopTaskContext {
+ /**
+ */
+ protected TaskContext() {
+ super(null, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T extends HadoopCounter> T counter(String grp, String name, Class<T> cls) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopCounters counters() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopPartitioner partitioner() throws IgniteCheckedException {
+ assert false;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopSerialization keySerialization() throws IgniteCheckedException {
+ return new HadoopWritableSerialization(IntWritable.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopSerialization valueSerialization() throws IgniteCheckedException {
+ return new HadoopWritableSerialization(IntWritable.class);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public Comparator<Object> sortComparator() {
+ return ComparableComparator.getInstance();
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public Comparator<Object> groupComparator() {
+ return ComparableComparator.getInstance();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() throws IgniteCheckedException {
+ assert false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ assert false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void prepareTaskEnvironment() throws IgniteCheckedException {
+ assert false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cleanupTaskEnvironment() throws IgniteCheckedException {
+ assert false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T runAsJobOwner(Callable<T> c) throws IgniteCheckedException {
+ try {
+ return c.call();
+ }
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+ }
+
+ /**
+ * Test job info.
+ */
+ protected static class JobInfo implements HadoopJobInfo {
+ /** {@inheritDoc} */
+ @Nullable @Override public String property(String name) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasCombiner() {
+ assert false;
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasReducer() {
+ assert false;
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls, HadoopJobId jobId, IgniteLogger log,
+ @Nullable String[] libNames, HadoopHelper helper) throws IgniteCheckedException {
+ assert false;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int reducers() {
+ assert false;
+
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String jobName() {
+ assert false;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String user() {
+ assert false;
+
+ return null;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimapSelftest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimapSelftest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimapSelftest.java
new file mode 100644
index 0000000..a37d74b
--- /dev/null
+++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimapSelftest.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
+import org.apache.ignite.internal.util.GridRandom;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.io.GridDataInput;
+import org.apache.ignite.internal.util.io.GridUnsafeDataInput;
+import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+import org.apache.ignite.internal.util.typedef.X;
+
+/**
+ *
+ */
+public class HadoopConcurrentHashMultimapSelftest extends HadoopAbstractMapTest {
+ /** */
+ public void testMapSimple() throws Exception {
+ GridUnsafeMemory mem = new GridUnsafeMemory(0);
+
+// mem.listen(new GridOffHeapEventListener() {
+// @Override public void onEvent(GridOffHeapEvent evt) {
+// if (evt == GridOffHeapEvent.ALLOCATE)
+// U.dumpStack();
+// }
+// });
+
+ Random rnd = new Random();
+
+ int mapSize = 16 << rnd.nextInt(3);
+
+ HadoopJobInfo job = new JobInfo();
+
+ HadoopTaskContext taskCtx = new TaskContext();
+
+ HadoopConcurrentHashMultimap m = new HadoopConcurrentHashMultimap(job, mem, mapSize);
+
+ HadoopConcurrentHashMultimap.Adder a = m.startAdding(taskCtx);
+
+ Multimap<Integer, Integer> mm = ArrayListMultimap.create();
+ Multimap<Integer, Integer> vis = ArrayListMultimap.create();
+
+ for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) {
+ int key = rnd.nextInt(mapSize);
+ int val = rnd.nextInt();
+
+ a.write(new IntWritable(key), new IntWritable(val));
+ mm.put(key, val);
+
+ X.println("k: " + key + " v: " + val);
+
+ a.close();
+
+ check(m, mm, vis, taskCtx);
+
+ a = m.startAdding(taskCtx);
+ }
+
+// a.add(new IntWritable(10), new IntWritable(2));
+// mm.put(10, 2);
+// check(m, mm);
+
+ a.close();
+
+ X.println("Alloc: " + mem.allocatedSize());
+
+ m.close();
+
+ assertEquals(0, mem.allocatedSize());
+ }
+
+ private void check(HadoopConcurrentHashMultimap m, Multimap<Integer, Integer> mm,
+ final Multimap<Integer, Integer> vis, HadoopTaskContext taskCtx) throws Exception {
+ final HadoopTaskInput in = m.input(taskCtx);
+
+ Map<Integer, Collection<Integer>> mmm = mm.asMap();
+
+ int keys = 0;
+
+ while (in.next()) {
+ keys++;
+
+ IntWritable k = (IntWritable)in.key();
+
+ assertNotNull(k);
+
+ Deque<Integer> vs = new LinkedList<>();
+
+ Iterator<?> it = in.values();
+
+ while (it.hasNext())
+ vs.addFirst(((IntWritable) it.next()).get());
+
+ Collection<Integer> exp = mmm.get(k.get());
+
+ assertEquals(exp, vs);
+ }
+
+ assertEquals(mmm.size(), keys);
+
+ assertEquals(m.keys(), keys);
+
+ X.println("keys: " + keys + " cap: " + m.capacity());
+
+ // Check visitor.
+
+ final byte[] buf = new byte[4];
+
+ final GridDataInput dataInput = new GridUnsafeDataInput();
+
+ m.visit(false, new HadoopConcurrentHashMultimap.Visitor() {
+ /** */
+ IntWritable key = new IntWritable();
+
+ /** */
+ IntWritable val = new IntWritable();
+
+ @Override public void onKey(long keyPtr, int keySize) {
+ read(keyPtr, keySize, key);
+ }
+
+ @Override public void onValue(long valPtr, int valSize) {
+ read(valPtr, valSize, val);
+
+ vis.put(key.get(), val.get());
+ }
+
+ private void read(long ptr, int size, Writable w) {
+ assert size == 4 : size;
+
+ GridUnsafe.copyMemory(null, ptr, buf, GridUnsafe.BYTE_ARR_OFF, size);
+
+ dataInput.bytes(buf, size);
+
+ try {
+ w.readFields(dataInput);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+// X.println("vis: " + vis);
+
+ assertEquals(mm, vis);
+
+ in.close();
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testMultiThreaded() throws Exception {
+ GridUnsafeMemory mem = new GridUnsafeMemory(0);
+
+ X.println("___ Started");
+
+ Random rnd = new GridRandom();
+
+ for (int i = 0; i < 20; i++) {
+ HadoopJobInfo job = new JobInfo();
+
+ final HadoopTaskContext taskCtx = new TaskContext();
+
+ final HadoopConcurrentHashMultimap m = new HadoopConcurrentHashMultimap(job, mem, 16);
+
+ final ConcurrentMap<Integer, Collection<Integer>> mm = new ConcurrentHashMap<>();
+
+ X.println("___ MT");
+
+ multithreaded(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ X.println("___ TH in");
+
+ Random rnd = new GridRandom();
+
+ IntWritable key = new IntWritable();
+ IntWritable val = new IntWritable();
+
+ HadoopMultimap.Adder a = m.startAdding(taskCtx);
+
+ for (int i = 0; i < 50000; i++) {
+ int k = rnd.nextInt(32000);
+ int v = rnd.nextInt();
+
+ key.set(k);
+ val.set(v);
+
+ a.write(key, val);
+
+ Collection<Integer> list = mm.get(k);
+
+ if (list == null) {
+ list = new ConcurrentLinkedQueue<>();
+
+ Collection<Integer> old = mm.putIfAbsent(k, list);
+
+ if (old != null)
+ list = old;
+ }
+
+ list.add(v);
+ }
+
+ a.close();
+
+ X.println("___ TH out");
+
+ return null;
+ }
+ }, 3 + rnd.nextInt(27));
+
+ X.println("___ Check: " + m.capacity());
+
+ assertEquals(mm.size(), m.keys());
+
+ assertTrue(m.capacity() > 32000);
+
+ HadoopTaskInput in = m.input(taskCtx);
+
+ while (in.next()) {
+ IntWritable key = (IntWritable) in.key();
+
+ Iterator<?> valsIter = in.values();
+
+ Collection<Integer> vals = mm.remove(key.get());
+
+ assertNotNull(vals);
+
+ while (valsIter.hasNext()) {
+ IntWritable val = (IntWritable) valsIter.next();
+
+ assertTrue(vals.remove(val.get()));
+ }
+
+ assertTrue(vals.isEmpty());
+ }
+
+ in.close();
+ m.close();
+
+ assertEquals(0, mem.allocatedSize());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMapSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMapSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMapSelfTest.java
new file mode 100644
index 0000000..04585ec
--- /dev/null
+++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMapSelfTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+import org.apache.ignite.internal.util.typedef.X;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ *
+ */
+public class HadoopHashMapSelfTest extends HadoopAbstractMapTest {
+ /**
+ * Test simple map.
+ *
+ * @throws Exception If failed.
+ */
+ public void testMapSimple() throws Exception {
+ GridUnsafeMemory mem = new GridUnsafeMemory(0);
+
+ Random rnd = new Random();
+
+ int mapSize = 16 << rnd.nextInt(3);
+
+ HadoopTaskContext taskCtx = new TaskContext();
+
+ final HadoopHashMultimap m = new HadoopHashMultimap(new JobInfo(), mem, mapSize);
+
+ HadoopMultimap.Adder a = m.startAdding(taskCtx);
+
+ Multimap<Integer, Integer> mm = ArrayListMultimap.create();
+
+ for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) {
+ int key = rnd.nextInt(mapSize);
+ int val = rnd.nextInt();
+
+ a.write(new IntWritable(key), new IntWritable(val));
+ mm.put(key, val);
+
+ X.println("k: " + key + " v: " + val);
+
+ a.close();
+
+ check(m, mm, taskCtx);
+
+ a = m.startAdding(taskCtx);
+ }
+
+// a.add(new IntWritable(10), new IntWritable(2));
+// mm.put(10, 2);
+// check(m, mm);
+
+ a.close();
+
+ X.println("Alloc: " + mem.allocatedSize());
+
+ m.close();
+
+ assertEquals(0, mem.allocatedSize());
+ }
+
+ private void check(HadoopHashMultimap m, Multimap<Integer, Integer> mm, HadoopTaskContext taskCtx) throws Exception {
+ final HadoopTaskInput in = m.input(taskCtx);
+
+ Map<Integer, Collection<Integer>> mmm = mm.asMap();
+
+ int keys = 0;
+
+ while (in.next()) {
+ keys++;
+
+ IntWritable k = (IntWritable)in.key();
+
+ assertNotNull(k);
+
+ ArrayList<Integer> vs = new ArrayList<>();
+
+ Iterator<?> it = in.values();
+
+ while (it.hasNext())
+ vs.add(((IntWritable) it.next()).get());
+
+ Collection<Integer> exp = mmm.get(k.get());
+
+ assertEquals(sorted(exp), sorted(vs));
+ }
+
+ X.println("keys: " + keys + " cap: " + m.capacity());
+
+ assertEquals(mmm.size(), keys);
+
+ assertEquals(m.keys(), keys);
+
+ in.close();
+ }
+
+ private GridLongList sorted(Collection<Integer> col) {
+ GridLongList lst = new GridLongList(col.size());
+
+ for (Integer i : col)
+ lst.add(i);
+
+ return lst.sort();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java
new file mode 100644
index 0000000..15d3b13
--- /dev/null
+++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.collections;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
+import org.apache.ignite.internal.util.GridRandom;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.io.GridDataInput;
+import org.apache.ignite.internal.util.io.GridUnsafeDataInput;
+import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+import org.apache.ignite.internal.util.typedef.X;
+
+import static java.lang.Math.abs;
+import static java.lang.Math.ceil;
+import static java.lang.Math.max;
+
+/**
+ * Skip list tests.
+ */
+public class HadoopSkipListSelfTest extends HadoopAbstractMapTest {
+ /**
+ *
+ */
+ public void testLevel() {
+ Random rnd = new GridRandom();
+
+ int[] levelsCnts = new int[32];
+
+ int all = 10000;
+
+ for (int i = 0; i < all; i++) {
+ int level = HadoopSkipList.randomLevel(rnd);
+
+ levelsCnts[level]++;
+ }
+
+ X.println("Distribution: " + Arrays.toString(levelsCnts));
+
+ for (int level = 0; level < levelsCnts.length; level++) {
+ int exp = (level + 1) == levelsCnts.length ? 0 : all >>> (level + 1);
+
+ double precission = 0.72 / Math.max(32 >>> level, 1);
+
+ int sigma = max((int)ceil(precission * exp), 5);
+
+ X.println("Level: " + level + " exp: " + exp + " act: " + levelsCnts[level] + " precision: " + precission +
+ " sigma: " + sigma);
+
+ assertTrue(abs(exp - levelsCnts[level]) <= sigma); // Sometimes fails.
+ }
+ }
+
+ public void testMapSimple() throws Exception {
+ GridUnsafeMemory mem = new GridUnsafeMemory(0);
+
+// mem.listen(new GridOffHeapEventListener() {
+// @Override public void onEvent(GridOffHeapEvent evt) {
+// if (evt == GridOffHeapEvent.ALLOCATE)
+// U.dumpStack();
+// }
+// });
+
+ Random rnd = new Random();
+
+ int mapSize = 16 << rnd.nextInt(6);
+
+ HadoopJobInfo job = new JobInfo();
+
+ HadoopTaskContext taskCtx = new TaskContext();
+
+ HadoopMultimap m = new HadoopSkipList(job, mem);
+
+ HadoopMultimap.Adder a = m.startAdding(taskCtx);
+
+ Multimap<Integer, Integer> mm = ArrayListMultimap.create();
+ Multimap<Integer, Integer> vis = ArrayListMultimap.create();
+
+ for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) {
+ int key = rnd.nextInt(mapSize);
+ int val = rnd.nextInt();
+
+ a.write(new IntWritable(key), new IntWritable(val));
+ mm.put(key, val);
+
+ X.println("k: " + key + " v: " + val);
+
+ a.close();
+
+ check(m, mm, vis, taskCtx);
+
+ a = m.startAdding(taskCtx);
+ }
+
+// a.add(new IntWritable(10), new IntWritable(2));
+// mm.put(10, 2);
+// check(m, mm);
+
+ a.close();
+
+ X.println("Alloc: " + mem.allocatedSize());
+
+ m.close();
+
+ assertEquals(0, mem.allocatedSize());
+ }
+
+ private void check(HadoopMultimap m, Multimap<Integer, Integer> mm, final Multimap<Integer, Integer> vis, HadoopTaskContext taskCtx)
+ throws Exception {
+ final HadoopTaskInput in = m.input(taskCtx);
+
+ Map<Integer, Collection<Integer>> mmm = mm.asMap();
+
+ int keys = 0;
+
+ int prevKey = Integer.MIN_VALUE;
+
+ while (in.next()) {
+ keys++;
+
+ IntWritable k = (IntWritable)in.key();
+
+ assertNotNull(k);
+
+ assertTrue(k.get() > prevKey);
+
+ prevKey = k.get();
+
+ Deque<Integer> vs = new LinkedList<>();
+
+ Iterator<?> it = in.values();
+
+ while (it.hasNext())
+ vs.addFirst(((IntWritable) it.next()).get());
+
+ Collection<Integer> exp = mmm.get(k.get());
+
+ assertEquals(exp, vs);
+ }
+
+ assertEquals(mmm.size(), keys);
+
+//! assertEquals(m.keys(), keys);
+
+ // Check visitor.
+
+ final byte[] buf = new byte[4];
+
+ final GridDataInput dataInput = new GridUnsafeDataInput();
+
+ m.visit(false, new HadoopMultimap.Visitor() {
+ /** */
+ IntWritable key = new IntWritable();
+
+ /** */
+ IntWritable val = new IntWritable();
+
+ @Override public void onKey(long keyPtr, int keySize) {
+ read(keyPtr, keySize, key);
+ }
+
+ @Override public void onValue(long valPtr, int valSize) {
+ read(valPtr, valSize, val);
+
+ vis.put(key.get(), val.get());
+ }
+
+ private void read(long ptr, int size, Writable w) {
+ assert size == 4 : size;
+
+ GridUnsafe.copyMemory(null, ptr, buf, GridUnsafe.BYTE_ARR_OFF, size);
+
+ dataInput.bytes(buf, size);
+
+ try {
+ w.readFields(dataInput);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+// X.println("vis: " + vis);
+
+ assertEquals(mm, vis);
+
+ in.close();
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testMultiThreaded() throws Exception {
+ GridUnsafeMemory mem = new GridUnsafeMemory(0);
+
+ X.println("___ Started");
+
+ Random rnd = new GridRandom();
+
+ for (int i = 0; i < 20; i++) {
+ HadoopJobInfo job = new JobInfo();
+
+ final HadoopTaskContext taskCtx = new TaskContext();
+
+ final HadoopMultimap m = new HadoopSkipList(job, mem);
+
+ final ConcurrentMap<Integer, Collection<Integer>> mm = new ConcurrentHashMap<>();
+
+ X.println("___ MT");
+
+ multithreaded(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ X.println("___ TH in");
+
+ Random rnd = new GridRandom();
+
+ IntWritable key = new IntWritable();
+ IntWritable val = new IntWritable();
+
+ HadoopMultimap.Adder a = m.startAdding(taskCtx);
+
+ for (int i = 0; i < 50000; i++) {
+ int k = rnd.nextInt(32000);
+ int v = rnd.nextInt();
+
+ key.set(k);
+ val.set(v);
+
+ a.write(key, val);
+
+ Collection<Integer> list = mm.get(k);
+
+ if (list == null) {
+ list = new ConcurrentLinkedQueue<>();
+
+ Collection<Integer> old = mm.putIfAbsent(k, list);
+
+ if (old != null)
+ list = old;
+ }
+
+ list.add(v);
+ }
+
+ a.close();
+
+ X.println("___ TH out");
+
+ return null;
+ }
+ }, 3 + rnd.nextInt(27));
+
+ HadoopTaskInput in = m.input(taskCtx);
+
+ int prevKey = Integer.MIN_VALUE;
+
+ while (in.next()) {
+ IntWritable key = (IntWritable)in.key();
+
+ assertTrue(key.get() > prevKey);
+
+ prevKey = key.get();
+
+ Iterator<?> valsIter = in.values();
+
+ Collection<Integer> vals = mm.remove(key.get());
+
+ assertNotNull(vals);
+
+ while (valsIter.hasNext()) {
+ IntWritable val = (IntWritable) valsIter.next();
+
+ assertTrue(vals.remove(val.get()));
+ }
+
+ assertTrue(vals.isEmpty());
+ }
+
+ in.close();
+ m.close();
+
+ assertEquals(0, mem.allocatedSize());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataStreamSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataStreamSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataStreamSelfTest.java
new file mode 100644
index 0000000..3fea0ae
--- /dev/null
+++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataStreamSelfTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.streams;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class HadoopDataStreamSelfTest extends GridCommonAbstractTest {
+
+ public void testStreams() throws IOException {
+ GridUnsafeMemory mem = new GridUnsafeMemory(0);
+
+ HadoopDataOutStream out = new HadoopDataOutStream(mem);
+
+ int size = 4 * 1024;
+
+ final long ptr = mem.allocate(size);
+
+ out.buffer().set(ptr, size);
+
+ out.writeBoolean(false);
+ out.writeBoolean(true);
+ out.writeBoolean(false);
+ out.write(17);
+ out.write(121);
+ out.write(0xfafa);
+ out.writeByte(17);
+ out.writeByte(121);
+ out.writeByte(0xfafa);
+ out.writeChar('z');
+ out.writeChar('o');
+ out.writeChar('r');
+ out.writeShort(100);
+ out.writeShort(Short.MIN_VALUE);
+ out.writeShort(Short.MAX_VALUE);
+ out.writeShort(65535);
+ out.writeShort(65536); // 0
+ out.writeInt(Integer.MAX_VALUE);
+ out.writeInt(Integer.MIN_VALUE);
+ out.writeInt(-1);
+ out.writeInt(0);
+ out.writeInt(1);
+ out.writeFloat(0.33f);
+ out.writeFloat(0.5f);
+ out.writeFloat(-0.7f);
+ out.writeFloat(Float.MAX_VALUE);
+ out.writeFloat(Float.MIN_VALUE);
+ out.writeFloat(Float.MIN_NORMAL);
+ out.writeFloat(Float.POSITIVE_INFINITY);
+ out.writeFloat(Float.NEGATIVE_INFINITY);
+ out.writeFloat(Float.NaN);
+ out.writeDouble(-12312312.3333333336666779);
+ out.writeDouble(123123.234);
+ out.writeDouble(Double.MAX_VALUE);
+ out.writeDouble(Double.MIN_VALUE);
+ out.writeDouble(Double.MIN_NORMAL);
+ out.writeDouble(Double.NEGATIVE_INFINITY);
+ out.writeDouble(Double.POSITIVE_INFINITY);
+ out.writeDouble(Double.NaN);
+ out.writeLong(Long.MAX_VALUE);
+ out.writeLong(Long.MIN_VALUE);
+ out.writeLong(0);
+ out.writeLong(-1L);
+ out.write(new byte[]{1,2,3});
+ out.write(new byte[]{0,1,2,3}, 1, 2);
+ out.writeUTF("mom washes rum");
+
+ HadoopDataInStream in = new HadoopDataInStream(mem);
+
+ in.buffer().set(ptr, out.buffer().pointer());
+
+ assertEquals(false, in.readBoolean());
+ assertEquals(true, in.readBoolean());
+ assertEquals(false, in.readBoolean());
+ assertEquals(17, in.read());
+ assertEquals(121, in.read());
+ assertEquals(0xfa, in.read());
+ assertEquals(17, in.readByte());
+ assertEquals(121, in.readByte());
+ assertEquals((byte)0xfa, in.readByte());
+ assertEquals('z', in.readChar());
+ assertEquals('o', in.readChar());
+ assertEquals('r', in.readChar());
+ assertEquals(100, in.readShort());
+ assertEquals(Short.MIN_VALUE, in.readShort());
+ assertEquals(Short.MAX_VALUE, in.readShort());
+ assertEquals(-1, in.readShort());
+ assertEquals(0, in.readShort());
+ assertEquals(Integer.MAX_VALUE, in.readInt());
+ assertEquals(Integer.MIN_VALUE, in.readInt());
+ assertEquals(-1, in.readInt());
+ assertEquals(0, in.readInt());
+ assertEquals(1, in.readInt());
+ assertEquals(0.33f, in.readFloat());
+ assertEquals(0.5f, in.readFloat());
+ assertEquals(-0.7f, in.readFloat());
+ assertEquals(Float.MAX_VALUE, in.readFloat());
+ assertEquals(Float.MIN_VALUE, in.readFloat());
+ assertEquals(Float.MIN_NORMAL, in.readFloat());
+ assertEquals(Float.POSITIVE_INFINITY, in.readFloat());
+ assertEquals(Float.NEGATIVE_INFINITY, in.readFloat());
+ assertEquals(Float.NaN, in.readFloat());
+ assertEquals(-12312312.3333333336666779, in.readDouble());
+ assertEquals(123123.234, in.readDouble());
+ assertEquals(Double.MAX_VALUE, in.readDouble());
+ assertEquals(Double.MIN_VALUE, in.readDouble());
+ assertEquals(Double.MIN_NORMAL, in.readDouble());
+ assertEquals(Double.NEGATIVE_INFINITY, in.readDouble());
+ assertEquals(Double.POSITIVE_INFINITY, in.readDouble());
+ assertEquals(Double.NaN, in.readDouble());
+ assertEquals(Long.MAX_VALUE, in.readLong());
+ assertEquals(Long.MIN_VALUE, in.readLong());
+ assertEquals(0, in.readLong());
+ assertEquals(-1, in.readLong());
+
+ byte[] b = new byte[3];
+
+ in.read(b);
+
+ assertTrue(Arrays.equals(new byte[]{1,2,3}, b));
+
+ b = new byte[4];
+
+ in.read(b, 1, 2);
+
+ assertTrue(Arrays.equals(new byte[]{0, 1, 2, 0}, b));
+
+ assertEquals("mom washes rum", in.readUTF());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
index 944e285..773bec5 100644
--- a/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
+++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
@@ -72,10 +72,10 @@ import org.apache.ignite.internal.processors.hadoop.impl.HadoopV2JobSelfTest;
import org.apache.ignite.internal.processors.hadoop.impl.HadoopValidationSelfTest;
import org.apache.ignite.internal.processors.hadoop.impl.HadoopWeightedMapReducePlannerTest;
import org.apache.ignite.internal.processors.hadoop.impl.HadoopWeightedPlannerMapReduceTest;
-import org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections.HadoopConcurrentHashMultimapSelftest;
-import org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections.HadoopHashMapSelfTest;
-import org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections.HadoopSkipListSelfTest;
-import org.apache.ignite.internal.processors.hadoop.impl.shuffle.streams.HadoopDataStreamSelfTest;
+import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopConcurrentHashMultimapSelftest;
+import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopHashMapSelfTest;
+import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopSkipListSelfTest;
+import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataStreamSelfTest;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
http://git-wip-us.apache.org/repos/asf/ignite/blob/79b3eff1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java
index 2ae7ec9..37af147 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java
@@ -28,6 +28,9 @@ import java.util.TreeSet;
* Common Hadoop utility methods which do not depend on Hadoop API.
*/
public class HadoopCommonUtils {
+ /** Job class name. */
+ public static final String JOB_CLS_NAME = "org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopV2Job";
+
/** Property to store timestamp of new job id request. */
public static final String REQ_NEW_JOBID_TS_PROPERTY = "ignite.job.requestNewIdTs";