You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/09/27 11:08:57 UTC
[02/63] [abbrv] ignite git commit: IGNITE-3912: Hadoop: Implemented
new class loading architecture for embedded execution mode.
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedDualAsyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedDualAsyncSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedDualAsyncSelfTest.java
new file mode 100644
index 0000000..cca960b
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedDualAsyncSelfTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC;
+
+/**
+ * IGFS Hadoop file system IPC shmem self test in DUAL_ASYNC mode.
+ */
+public class IgniteHadoopFileSystemShmemEmbeddedDualAsyncSelfTest
+ extends IgniteHadoopFileSystemShmemAbstractSelfTest {
+ /**
+ * Constructor.
+ */
+ public IgniteHadoopFileSystemShmemEmbeddedDualAsyncSelfTest() {
+ super(DUAL_ASYNC, false);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedDualSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedDualSyncSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedDualSyncSelfTest.java
new file mode 100644
index 0000000..73db4f8
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedDualSyncSelfTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
+
+/**
+ * IGFS Hadoop file system IPC shmem self test in DUAL_SYNC mode.
+ */
+public class IgniteHadoopFileSystemShmemEmbeddedDualSyncSelfTest
+ extends IgniteHadoopFileSystemShmemAbstractSelfTest {
+ /**
+ * Constructor.
+ */
+ public IgniteHadoopFileSystemShmemEmbeddedDualSyncSelfTest() {
+ super(DUAL_SYNC, false);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest.java
new file mode 100644
index 0000000..48a4694
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
+
+/**
+ * IGFS Hadoop file system IPC shmem self test in PRIMARY mode.
+ */
+public class IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest
+ extends IgniteHadoopFileSystemShmemAbstractSelfTest {
+ /**
+ * Constructor.
+ */
+ public IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest() {
+ super(PRIMARY, false);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest.java
new file mode 100644
index 0000000..ab9c357
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import static org.apache.ignite.igfs.IgfsMode.PROXY;
+
+/**
+ * IGFS Hadoop file system IPC shmem self test in SECONDARY mode.
+ */
+public class IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest
+ extends IgniteHadoopFileSystemShmemAbstractSelfTest {
+ /**
+ * Constructor.
+ */
+ public IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest() {
+ super(PROXY, false);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest.java
new file mode 100644
index 0000000..5154642
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC;
+
+/**
+ * IGFS Hadoop file system IPC shmem self test in DUAL_ASYNC mode.
+ */
+public class IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest
+ extends IgniteHadoopFileSystemShmemAbstractSelfTest {
+ /**
+ * Constructor.
+ */
+ public IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest() {
+ super(DUAL_ASYNC, true);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalDualSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalDualSyncSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalDualSyncSelfTest.java
new file mode 100644
index 0000000..d88a38b
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalDualSyncSelfTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
+
+/**
+ * IGFS Hadoop file system IPC shmem self test in DUAL_SYNC mode.
+ */
+public class IgniteHadoopFileSystemShmemExternalDualSyncSelfTest
+ extends IgniteHadoopFileSystemShmemAbstractSelfTest {
+ /**
+ * Constructor.
+ */
+ public IgniteHadoopFileSystemShmemExternalDualSyncSelfTest() {
+ super(DUAL_SYNC, true);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalPrimarySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalPrimarySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalPrimarySelfTest.java
new file mode 100644
index 0000000..7b41b22
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalPrimarySelfTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
+
+/**
+ * IGFS Hadoop file system IPC shmem self test in PRIMARY mode.
+ */
+public class IgniteHadoopFileSystemShmemExternalPrimarySelfTest
+ extends IgniteHadoopFileSystemShmemAbstractSelfTest {
+ /**
+ * Constructor.
+ */
+ public IgniteHadoopFileSystemShmemExternalPrimarySelfTest() {
+ super(PRIMARY, true);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalSecondarySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalSecondarySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalSecondarySelfTest.java
new file mode 100644
index 0000000..e54b020
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/IgniteHadoopFileSystemShmemExternalSecondarySelfTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import static org.apache.ignite.igfs.IgfsMode.PROXY;
+
+/**
+ * IGFS Hadoop file system IPC shmem self test in SECONDARY mode.
+ */
+public class IgniteHadoopFileSystemShmemExternalSecondarySelfTest
+ extends IgniteHadoopFileSystemShmemAbstractSelfTest {
+ /**
+ * Constructor.
+ */
+ public IgniteHadoopFileSystemShmemExternalSecondarySelfTest() {
+ super(PROXY, true);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java
new file mode 100644
index 0000000..9d1fd4f
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections;
+
+import java.util.Comparator;
+import java.util.concurrent.Callable;
+import org.apache.commons.collections.comparators.ComparableComparator;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.hadoop.HadoopHelper;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
+import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopWritableSerialization;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Abstract class for maps test.
+ */
+public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest {
+ /**
+ * Test task context.
+ */
+ protected static class TaskContext extends HadoopTaskContext {
+ /**
+ */
+ protected TaskContext() {
+ super(null, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T extends HadoopCounter> T counter(String grp, String name, Class<T> cls) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopCounters counters() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopPartitioner partitioner() throws IgniteCheckedException {
+ assert false;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopSerialization keySerialization() throws IgniteCheckedException {
+ return new HadoopWritableSerialization(IntWritable.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopSerialization valueSerialization() throws IgniteCheckedException {
+ return new HadoopWritableSerialization(IntWritable.class);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public Comparator<Object> sortComparator() {
+ return ComparableComparator.getInstance();
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public Comparator<Object> groupComparator() {
+ return ComparableComparator.getInstance();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() throws IgniteCheckedException {
+ assert false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel() {
+ assert false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void prepareTaskEnvironment() throws IgniteCheckedException {
+ assert false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cleanupTaskEnvironment() throws IgniteCheckedException {
+ assert false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T runAsJobOwner(Callable<T> c) throws IgniteCheckedException {
+ try {
+ return c.call();
+ }
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+ }
+
+ /**
+ * Test job info.
+ */
+ protected static class JobInfo implements HadoopJobInfo {
+ /** {@inheritDoc} */
+ @Nullable @Override public String property(String name) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasCombiner() {
+ assert false;
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasReducer() {
+ assert false;
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls, HadoopJobId jobId, IgniteLogger log,
+ @Nullable String[] libNames, HadoopHelper helper) throws IgniteCheckedException {
+ assert false;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int reducers() {
+ assert false;
+
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String jobName() {
+ assert false;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String user() {
+ assert false;
+
+ return null;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopConcurrentHashMultimapSelftest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopConcurrentHashMultimapSelftest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopConcurrentHashMultimapSelftest.java
new file mode 100644
index 0000000..019b172
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopConcurrentHashMultimapSelftest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
+import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopConcurrentHashMultimap;
+import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopMultimap;
+import org.apache.ignite.internal.util.GridRandom;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.io.GridDataInput;
+import org.apache.ignite.internal.util.io.GridUnsafeDataInput;
+import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+import org.apache.ignite.internal.util.typedef.X;
+
+/**
+ *
+ */
+public class HadoopConcurrentHashMultimapSelftest extends HadoopAbstractMapTest {
+ /** */
+ public void testMapSimple() throws Exception {
+ GridUnsafeMemory mem = new GridUnsafeMemory(0);
+
+// mem.listen(new GridOffHeapEventListener() {
+// @Override public void onEvent(GridOffHeapEvent evt) {
+// if (evt == GridOffHeapEvent.ALLOCATE)
+// U.dumpStack();
+// }
+// });
+
+ Random rnd = new Random();
+
+ int mapSize = 16 << rnd.nextInt(3);
+
+ HadoopJobInfo job = new JobInfo();
+
+ HadoopTaskContext taskCtx = new TaskContext();
+
+ HadoopConcurrentHashMultimap m = new HadoopConcurrentHashMultimap(job, mem, mapSize);
+
+ HadoopConcurrentHashMultimap.Adder a = m.startAdding(taskCtx);
+
+ Multimap<Integer, Integer> mm = ArrayListMultimap.create();
+ Multimap<Integer, Integer> vis = ArrayListMultimap.create();
+
+ for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) {
+ int key = rnd.nextInt(mapSize);
+ int val = rnd.nextInt();
+
+ a.write(new IntWritable(key), new IntWritable(val));
+ mm.put(key, val);
+
+ X.println("k: " + key + " v: " + val);
+
+ a.close();
+
+ check(m, mm, vis, taskCtx);
+
+ a = m.startAdding(taskCtx);
+ }
+
+// a.add(new IntWritable(10), new IntWritable(2));
+// mm.put(10, 2);
+// check(m, mm);
+
+ a.close();
+
+ X.println("Alloc: " + mem.allocatedSize());
+
+ m.close();
+
+ assertEquals(0, mem.allocatedSize());
+ }
+
+ private void check(HadoopConcurrentHashMultimap m, Multimap<Integer, Integer> mm,
+ final Multimap<Integer, Integer> vis, HadoopTaskContext taskCtx) throws Exception {
+ final HadoopTaskInput in = m.input(taskCtx);
+
+ Map<Integer, Collection<Integer>> mmm = mm.asMap();
+
+ int keys = 0;
+
+ while (in.next()) {
+ keys++;
+
+ IntWritable k = (IntWritable)in.key();
+
+ assertNotNull(k);
+
+ Deque<Integer> vs = new LinkedList<>();
+
+ Iterator<?> it = in.values();
+
+ while (it.hasNext())
+ vs.addFirst(((IntWritable) it.next()).get());
+
+ Collection<Integer> exp = mmm.get(k.get());
+
+ assertEquals(exp, vs);
+ }
+
+ assertEquals(mmm.size(), keys);
+
+ assertEquals(m.keys(), keys);
+
+ X.println("keys: " + keys + " cap: " + m.capacity());
+
+ // Check visitor.
+
+ final byte[] buf = new byte[4];
+
+ final GridDataInput dataInput = new GridUnsafeDataInput();
+
+ m.visit(false, new HadoopConcurrentHashMultimap.Visitor() {
+ /** */
+ IntWritable key = new IntWritable();
+
+ /** */
+ IntWritable val = new IntWritable();
+
+ @Override public void onKey(long keyPtr, int keySize) {
+ read(keyPtr, keySize, key);
+ }
+
+ @Override public void onValue(long valPtr, int valSize) {
+ read(valPtr, valSize, val);
+
+ vis.put(key.get(), val.get());
+ }
+
+ private void read(long ptr, int size, Writable w) {
+ assert size == 4 : size;
+
+ GridUnsafe.copyMemory(null, ptr, buf, GridUnsafe.BYTE_ARR_OFF, size);
+
+ dataInput.bytes(buf, size);
+
+ try {
+ w.readFields(dataInput);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+// X.println("vis: " + vis);
+
+ assertEquals(mm, vis);
+
+ in.close();
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testMultiThreaded() throws Exception {
+ GridUnsafeMemory mem = new GridUnsafeMemory(0);
+
+ X.println("___ Started");
+
+ Random rnd = new GridRandom();
+
+ for (int i = 0; i < 20; i++) {
+ HadoopJobInfo job = new JobInfo();
+
+ final HadoopTaskContext taskCtx = new TaskContext();
+
+ final HadoopConcurrentHashMultimap m = new HadoopConcurrentHashMultimap(job, mem, 16);
+
+ final ConcurrentMap<Integer, Collection<Integer>> mm = new ConcurrentHashMap<>();
+
+ X.println("___ MT");
+
+ multithreaded(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ X.println("___ TH in");
+
+ Random rnd = new GridRandom();
+
+ IntWritable key = new IntWritable();
+ IntWritable val = new IntWritable();
+
+ HadoopMultimap.Adder a = m.startAdding(taskCtx);
+
+ for (int i = 0; i < 50000; i++) {
+ int k = rnd.nextInt(32000);
+ int v = rnd.nextInt();
+
+ key.set(k);
+ val.set(v);
+
+ a.write(key, val);
+
+ Collection<Integer> list = mm.get(k);
+
+ if (list == null) {
+ list = new ConcurrentLinkedQueue<>();
+
+ Collection<Integer> old = mm.putIfAbsent(k, list);
+
+ if (old != null)
+ list = old;
+ }
+
+ list.add(v);
+ }
+
+ a.close();
+
+ X.println("___ TH out");
+
+ return null;
+ }
+ }, 3 + rnd.nextInt(27));
+
+ X.println("___ Check: " + m.capacity());
+
+ assertEquals(mm.size(), m.keys());
+
+ assertTrue(m.capacity() > 32000);
+
+ HadoopTaskInput in = m.input(taskCtx);
+
+ while (in.next()) {
+ IntWritable key = (IntWritable) in.key();
+
+ Iterator<?> valsIter = in.values();
+
+ Collection<Integer> vals = mm.remove(key.get());
+
+ assertNotNull(vals);
+
+ while (valsIter.hasNext()) {
+ IntWritable val = (IntWritable) valsIter.next();
+
+ assertTrue(vals.remove(val.get()));
+ }
+
+ assertTrue(vals.isEmpty());
+ }
+
+ in.close();
+ m.close();
+
+ assertEquals(0, mem.allocatedSize());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopHashMapSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopHashMapSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopHashMapSelfTest.java
new file mode 100644
index 0000000..195bcbb
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopHashMapSelfTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
+import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopHashMultimap;
+import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopMultimap;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+import org.apache.ignite.internal.util.typedef.X;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ *
+ */
+public class HadoopHashMapSelfTest extends HadoopAbstractMapTest {
+ /**
+ * Test simple map.
+ *
+ * @throws Exception If failed.
+ */
+ public void testMapSimple() throws Exception {
+ GridUnsafeMemory mem = new GridUnsafeMemory(0);
+
+ Random rnd = new Random();
+
+ int mapSize = 16 << rnd.nextInt(3);
+
+ HadoopTaskContext taskCtx = new TaskContext();
+
+ final HadoopHashMultimap m = new HadoopHashMultimap(new JobInfo(), mem, mapSize);
+
+ HadoopMultimap.Adder a = m.startAdding(taskCtx);
+
+ Multimap<Integer, Integer> mm = ArrayListMultimap.create();
+
+ for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) {
+ int key = rnd.nextInt(mapSize);
+ int val = rnd.nextInt();
+
+ a.write(new IntWritable(key), new IntWritable(val));
+ mm.put(key, val);
+
+ X.println("k: " + key + " v: " + val);
+
+ a.close();
+
+ check(m, mm, taskCtx);
+
+ a = m.startAdding(taskCtx);
+ }
+
+// a.add(new IntWritable(10), new IntWritable(2));
+// mm.put(10, 2);
+// check(m, mm);
+
+ a.close();
+
+ X.println("Alloc: " + mem.allocatedSize());
+
+ m.close();
+
+ assertEquals(0, mem.allocatedSize());
+ }
+
+ private void check(HadoopHashMultimap m, Multimap<Integer, Integer> mm, HadoopTaskContext taskCtx) throws Exception {
+ final HadoopTaskInput in = m.input(taskCtx);
+
+ Map<Integer, Collection<Integer>> mmm = mm.asMap();
+
+ int keys = 0;
+
+ while (in.next()) {
+ keys++;
+
+ IntWritable k = (IntWritable)in.key();
+
+ assertNotNull(k);
+
+ ArrayList<Integer> vs = new ArrayList<>();
+
+ Iterator<?> it = in.values();
+
+ while (it.hasNext())
+ vs.add(((IntWritable) it.next()).get());
+
+ Collection<Integer> exp = mmm.get(k.get());
+
+ assertEquals(sorted(exp), sorted(vs));
+ }
+
+ X.println("keys: " + keys + " cap: " + m.capacity());
+
+ assertEquals(mmm.size(), keys);
+
+ assertEquals(m.keys(), keys);
+
+ in.close();
+ }
+
+ private GridLongList sorted(Collection<Integer> col) {
+ GridLongList lst = new GridLongList(col.size());
+
+ for (Integer i : col)
+ lst.add(i);
+
+ return lst.sort();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopSkipListSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopSkipListSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopSkipListSelfTest.java
new file mode 100644
index 0000000..d04beca
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopSkipListSelfTest.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.shuffle.collections;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
+import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopMultimap;
+import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopSkipList;
+import org.apache.ignite.internal.util.GridRandom;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.io.GridDataInput;
+import org.apache.ignite.internal.util.io.GridUnsafeDataInput;
+import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+import org.apache.ignite.internal.util.typedef.X;
+
+import static java.lang.Math.abs;
+import static java.lang.Math.ceil;
+import static java.lang.Math.max;
+
+/**
+ * Skip list tests.
+ */
+public class HadoopSkipListSelfTest extends HadoopAbstractMapTest {
+ /**
+ *
+ */
+ public void testLevel() {
+ Random rnd = new GridRandom();
+
+ int[] levelsCnts = new int[32];
+
+ int all = 10000;
+
+ for (int i = 0; i < all; i++) {
+ int level = HadoopSkipList.randomLevel(rnd);
+
+ levelsCnts[level]++;
+ }
+
+ X.println("Distribution: " + Arrays.toString(levelsCnts));
+
+ for (int level = 0; level < levelsCnts.length; level++) {
+ int exp = (level + 1) == levelsCnts.length ? 0 : all >>> (level + 1);
+
+ double precission = 0.72 / Math.max(32 >>> level, 1);
+
+ int sigma = max((int)ceil(precission * exp), 5);
+
+ X.println("Level: " + level + " exp: " + exp + " act: " + levelsCnts[level] + " precision: " + precission +
+ " sigma: " + sigma);
+
+ assertTrue(abs(exp - levelsCnts[level]) <= sigma); // Sometimes fails.
+ }
+ }
+
+ public void testMapSimple() throws Exception {
+ GridUnsafeMemory mem = new GridUnsafeMemory(0);
+
+// mem.listen(new GridOffHeapEventListener() {
+// @Override public void onEvent(GridOffHeapEvent evt) {
+// if (evt == GridOffHeapEvent.ALLOCATE)
+// U.dumpStack();
+// }
+// });
+
+ Random rnd = new Random();
+
+ int mapSize = 16 << rnd.nextInt(6);
+
+ HadoopJobInfo job = new JobInfo();
+
+ HadoopTaskContext taskCtx = new TaskContext();
+
+ HadoopMultimap m = new HadoopSkipList(job, mem);
+
+ HadoopMultimap.Adder a = m.startAdding(taskCtx);
+
+ Multimap<Integer, Integer> mm = ArrayListMultimap.create();
+ Multimap<Integer, Integer> vis = ArrayListMultimap.create();
+
+ for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) {
+ int key = rnd.nextInt(mapSize);
+ int val = rnd.nextInt();
+
+ a.write(new IntWritable(key), new IntWritable(val));
+ mm.put(key, val);
+
+ X.println("k: " + key + " v: " + val);
+
+ a.close();
+
+ check(m, mm, vis, taskCtx);
+
+ a = m.startAdding(taskCtx);
+ }
+
+// a.add(new IntWritable(10), new IntWritable(2));
+// mm.put(10, 2);
+// check(m, mm);
+
+ a.close();
+
+ X.println("Alloc: " + mem.allocatedSize());
+
+ m.close();
+
+ assertEquals(0, mem.allocatedSize());
+ }
+
+ private void check(HadoopMultimap m, Multimap<Integer, Integer> mm, final Multimap<Integer, Integer> vis, HadoopTaskContext taskCtx)
+ throws Exception {
+ final HadoopTaskInput in = m.input(taskCtx);
+
+ Map<Integer, Collection<Integer>> mmm = mm.asMap();
+
+ int keys = 0;
+
+ int prevKey = Integer.MIN_VALUE;
+
+ while (in.next()) {
+ keys++;
+
+ IntWritable k = (IntWritable)in.key();
+
+ assertNotNull(k);
+
+ assertTrue(k.get() > prevKey);
+
+ prevKey = k.get();
+
+ Deque<Integer> vs = new LinkedList<>();
+
+ Iterator<?> it = in.values();
+
+ while (it.hasNext())
+ vs.addFirst(((IntWritable) it.next()).get());
+
+ Collection<Integer> exp = mmm.get(k.get());
+
+ assertEquals(exp, vs);
+ }
+
+ assertEquals(mmm.size(), keys);
+
+//! assertEquals(m.keys(), keys);
+
+ // Check visitor.
+
+ final byte[] buf = new byte[4];
+
+ final GridDataInput dataInput = new GridUnsafeDataInput();
+
+ m.visit(false, new HadoopMultimap.Visitor() {
+ /** */
+ IntWritable key = new IntWritable();
+
+ /** */
+ IntWritable val = new IntWritable();
+
+ @Override public void onKey(long keyPtr, int keySize) {
+ read(keyPtr, keySize, key);
+ }
+
+ @Override public void onValue(long valPtr, int valSize) {
+ read(valPtr, valSize, val);
+
+ vis.put(key.get(), val.get());
+ }
+
+ private void read(long ptr, int size, Writable w) {
+ assert size == 4 : size;
+
+ GridUnsafe.copyMemory(null, ptr, buf, GridUnsafe.BYTE_ARR_OFF, size);
+
+ dataInput.bytes(buf, size);
+
+ try {
+ w.readFields(dataInput);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+// X.println("vis: " + vis);
+
+ assertEquals(mm, vis);
+
+ in.close();
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testMultiThreaded() throws Exception {
+ GridUnsafeMemory mem = new GridUnsafeMemory(0);
+
+ X.println("___ Started");
+
+ Random rnd = new GridRandom();
+
+ for (int i = 0; i < 20; i++) {
+ HadoopJobInfo job = new JobInfo();
+
+ final HadoopTaskContext taskCtx = new TaskContext();
+
+ final HadoopMultimap m = new HadoopSkipList(job, mem);
+
+ final ConcurrentMap<Integer, Collection<Integer>> mm = new ConcurrentHashMap<>();
+
+ X.println("___ MT");
+
+ multithreaded(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ X.println("___ TH in");
+
+ Random rnd = new GridRandom();
+
+ IntWritable key = new IntWritable();
+ IntWritable val = new IntWritable();
+
+ HadoopMultimap.Adder a = m.startAdding(taskCtx);
+
+ for (int i = 0; i < 50000; i++) {
+ int k = rnd.nextInt(32000);
+ int v = rnd.nextInt();
+
+ key.set(k);
+ val.set(v);
+
+ a.write(key, val);
+
+ Collection<Integer> list = mm.get(k);
+
+ if (list == null) {
+ list = new ConcurrentLinkedQueue<>();
+
+ Collection<Integer> old = mm.putIfAbsent(k, list);
+
+ if (old != null)
+ list = old;
+ }
+
+ list.add(v);
+ }
+
+ a.close();
+
+ X.println("___ TH out");
+
+ return null;
+ }
+ }, 3 + rnd.nextInt(27));
+
+ HadoopTaskInput in = m.input(taskCtx);
+
+ int prevKey = Integer.MIN_VALUE;
+
+ while (in.next()) {
+ IntWritable key = (IntWritable)in.key();
+
+ assertTrue(key.get() > prevKey);
+
+ prevKey = key.get();
+
+ Iterator<?> valsIter = in.values();
+
+ Collection<Integer> vals = mm.remove(key.get());
+
+ assertNotNull(vals);
+
+ while (valsIter.hasNext()) {
+ IntWritable val = (IntWritable) valsIter.next();
+
+ assertTrue(vals.remove(val.get()));
+ }
+
+ assertTrue(vals.isEmpty());
+ }
+
+ in.close();
+ m.close();
+
+ assertEquals(0, mem.allocatedSize());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataStreamSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataStreamSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataStreamSelfTest.java
new file mode 100644
index 0000000..612e892
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/streams/HadoopDataStreamSelfTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.shuffle.streams;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataInStream;
+import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataOutStream;
+import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class HadoopDataStreamSelfTest extends GridCommonAbstractTest {
+
+ public void testStreams() throws IOException {
+ GridUnsafeMemory mem = new GridUnsafeMemory(0);
+
+ HadoopDataOutStream out = new HadoopDataOutStream(mem);
+
+ int size = 4 * 1024;
+
+ final long ptr = mem.allocate(size);
+
+ out.buffer().set(ptr, size);
+
+ out.writeBoolean(false);
+ out.writeBoolean(true);
+ out.writeBoolean(false);
+ out.write(17);
+ out.write(121);
+ out.write(0xfafa);
+ out.writeByte(17);
+ out.writeByte(121);
+ out.writeByte(0xfafa);
+ out.writeChar('z');
+ out.writeChar('o');
+ out.writeChar('r');
+ out.writeShort(100);
+ out.writeShort(Short.MIN_VALUE);
+ out.writeShort(Short.MAX_VALUE);
+ out.writeShort(65535);
+ out.writeShort(65536); // 0
+ out.writeInt(Integer.MAX_VALUE);
+ out.writeInt(Integer.MIN_VALUE);
+ out.writeInt(-1);
+ out.writeInt(0);
+ out.writeInt(1);
+ out.writeFloat(0.33f);
+ out.writeFloat(0.5f);
+ out.writeFloat(-0.7f);
+ out.writeFloat(Float.MAX_VALUE);
+ out.writeFloat(Float.MIN_VALUE);
+ out.writeFloat(Float.MIN_NORMAL);
+ out.writeFloat(Float.POSITIVE_INFINITY);
+ out.writeFloat(Float.NEGATIVE_INFINITY);
+ out.writeFloat(Float.NaN);
+ out.writeDouble(-12312312.3333333336666779);
+ out.writeDouble(123123.234);
+ out.writeDouble(Double.MAX_VALUE);
+ out.writeDouble(Double.MIN_VALUE);
+ out.writeDouble(Double.MIN_NORMAL);
+ out.writeDouble(Double.NEGATIVE_INFINITY);
+ out.writeDouble(Double.POSITIVE_INFINITY);
+ out.writeDouble(Double.NaN);
+ out.writeLong(Long.MAX_VALUE);
+ out.writeLong(Long.MIN_VALUE);
+ out.writeLong(0);
+ out.writeLong(-1L);
+ out.write(new byte[]{1,2,3});
+ out.write(new byte[]{0,1,2,3}, 1, 2);
+ out.writeUTF("mom washes rum");
+
+ HadoopDataInStream in = new HadoopDataInStream(mem);
+
+ in.buffer().set(ptr, out.buffer().pointer());
+
+ assertEquals(false, in.readBoolean());
+ assertEquals(true, in.readBoolean());
+ assertEquals(false, in.readBoolean());
+ assertEquals(17, in.read());
+ assertEquals(121, in.read());
+ assertEquals(0xfa, in.read());
+ assertEquals(17, in.readByte());
+ assertEquals(121, in.readByte());
+ assertEquals((byte)0xfa, in.readByte());
+ assertEquals('z', in.readChar());
+ assertEquals('o', in.readChar());
+ assertEquals('r', in.readChar());
+ assertEquals(100, in.readShort());
+ assertEquals(Short.MIN_VALUE, in.readShort());
+ assertEquals(Short.MAX_VALUE, in.readShort());
+ assertEquals(-1, in.readShort());
+ assertEquals(0, in.readShort());
+ assertEquals(Integer.MAX_VALUE, in.readInt());
+ assertEquals(Integer.MIN_VALUE, in.readInt());
+ assertEquals(-1, in.readInt());
+ assertEquals(0, in.readInt());
+ assertEquals(1, in.readInt());
+ assertEquals(0.33f, in.readFloat());
+ assertEquals(0.5f, in.readFloat());
+ assertEquals(-0.7f, in.readFloat());
+ assertEquals(Float.MAX_VALUE, in.readFloat());
+ assertEquals(Float.MIN_VALUE, in.readFloat());
+ assertEquals(Float.MIN_NORMAL, in.readFloat());
+ assertEquals(Float.POSITIVE_INFINITY, in.readFloat());
+ assertEquals(Float.NEGATIVE_INFINITY, in.readFloat());
+ assertEquals(Float.NaN, in.readFloat());
+ assertEquals(-12312312.3333333336666779, in.readDouble());
+ assertEquals(123123.234, in.readDouble());
+ assertEquals(Double.MAX_VALUE, in.readDouble());
+ assertEquals(Double.MIN_VALUE, in.readDouble());
+ assertEquals(Double.MIN_NORMAL, in.readDouble());
+ assertEquals(Double.NEGATIVE_INFINITY, in.readDouble());
+ assertEquals(Double.POSITIVE_INFINITY, in.readDouble());
+ assertEquals(Double.NaN, in.readDouble());
+ assertEquals(Long.MAX_VALUE, in.readLong());
+ assertEquals(Long.MIN_VALUE, in.readLong());
+ assertEquals(0, in.readLong());
+ assertEquals(-1, in.readLong());
+
+ byte[] b = new byte[3];
+
+ in.read(b);
+
+ assertTrue(Arrays.equals(new byte[]{1,2,3}, b));
+
+ b = new byte[4];
+
+ in.read(b, 1, 2);
+
+ assertTrue(Arrays.equals(new byte[]{0, 1, 2, 0}, b));
+
+ assertEquals("mom washes rum", in.readUTF());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopExecutorServiceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopExecutorServiceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopExecutorServiceTest.java
new file mode 100644
index 0000000..b4e63d1
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/HadoopExecutorServiceTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopExecutorService;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jsr166.LongAdder8;
+
+/**
+ *
+ */
+public class HadoopExecutorServiceTest extends GridCommonAbstractTest {
+ /**
+ * @throws Exception If failed.
+ */
+ public void testExecutesAll() throws Exception {
+ final HadoopExecutorService exec = new HadoopExecutorService(log, "_GRID_NAME_", 10, 5);
+
+ for (int i = 0; i < 5; i++) {
+ final int loops = 5000;
+ int threads = 17;
+
+ final LongAdder8 sum = new LongAdder8();
+
+ multithreaded(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ for (int i = 0; i < loops; i++) {
+ exec.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ sum.increment();
+
+ return null;
+ }
+ });
+ }
+
+ return null;
+ }
+ }, threads);
+
+ while (exec.active() != 0) {
+ X.println("__ active: " + exec.active());
+
+ Thread.sleep(200);
+ }
+
+ assertEquals(threads * loops, sum.sum());
+
+ X.println("_ ok");
+ }
+
+ assertTrue(exec.shutdown(0));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testShutdown() throws Exception {
+ for (int i = 0; i < 5; i++) {
+ final HadoopExecutorService exec = new HadoopExecutorService(log, "_GRID_NAME_", 10, 5);
+
+ final LongAdder8 sum = new LongAdder8();
+
+ final AtomicBoolean finish = new AtomicBoolean();
+
+ IgniteInternalFuture<?> fut = multithreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ while (!finish.get()) {
+ exec.submit(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ sum.increment();
+
+ return null;
+ }
+ });
+ }
+
+ return null;
+ }
+ }, 19);
+
+ Thread.sleep(200);
+
+ assertTrue(exec.shutdown(50));
+
+ long res = sum.sum();
+
+ assertTrue(res > 0);
+
+ finish.set(true);
+
+ fut.get();
+
+ assertEquals(res, sum.sum()); // Nothing was executed after shutdown.
+
+ X.println("_ ok");
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
new file mode 100644
index 0000000..7c43500
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteFileSystem;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.igfs.IgfsOutputStream;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.hadoop.impl.HadoopAbstractSelfTest;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo;
+
+/**
+ * Job tracker self test.
+ */
+public class HadoopExternalTaskExecutionSelfTest extends HadoopAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected boolean igfsEnabled() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-404");
+
+ startGrids(gridCount());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
+ HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+ // TODO: IGNITE-404: Uncomment when fixed.
+ //cfg.setExternalExecution(true);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setMarshaller(new JdkMarshaller());
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSimpleTaskSubmit() throws Exception {
+ String testInputFile = "/test";
+
+ prepareTestFile(testInputFile);
+
+ Configuration cfg = new Configuration();
+
+ setupFileSystems(cfg);
+
+ Job job = Job.getInstance(cfg);
+
+ job.setMapperClass(TestMapper.class);
+ job.setCombinerClass(TestReducer.class);
+ job.setReducerClass(TestReducer.class);
+
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(IntWritable.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
+
+ job.setNumReduceTasks(1);
+
+ FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/" + testInputFile));
+ FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output"));
+
+ job.setJarByClass(getClass());
+
+ IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
+ createJobInfo(job.getConfiguration()));
+
+ fut.get();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMapperException() throws Exception {
+ String testInputFile = "/test";
+
+ prepareTestFile(testInputFile);
+
+ Configuration cfg = new Configuration();
+
+ setupFileSystems(cfg);
+
+ Job job = Job.getInstance(cfg);
+
+ job.setMapperClass(TestFailingMapper.class);
+ job.setCombinerClass(TestReducer.class);
+ job.setReducerClass(TestReducer.class);
+
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(IntWritable.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
+
+ job.setNumReduceTasks(1);
+
+ FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/" + testInputFile));
+ FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output"));
+
+ job.setJarByClass(getClass());
+
+ IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
+ createJobInfo(job.getConfiguration()));
+
+ try {
+ fut.get();
+ }
+ catch (IgniteCheckedException e) {
+ IOException exp = X.cause(e, IOException.class);
+
+ assertNotNull(exp);
+ assertEquals("Test failure", exp.getMessage());
+ }
+ }
+
+ /**
+ * @param filePath File path to prepare.
+ * @throws Exception If failed.
+ */
+ private void prepareTestFile(String filePath) throws Exception {
+ IgniteFileSystem igfs = grid(0).fileSystem(igfsName);
+
+ try (IgfsOutputStream out = igfs.create(new IgfsPath(filePath), true)) {
+ PrintWriter wr = new PrintWriter(new OutputStreamWriter(out));
+
+ for (int i = 0; i < 1000; i++)
+ wr.println("Hello, world: " + i);
+
+ wr.flush();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestMapper extends Mapper<Object, Text, Text, IntWritable> {
+ /** One constant. */
+ private IntWritable one = new IntWritable(1);
+
+ /** Line constant. */
+ private Text line = new Text("line");
+
+ @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException {
+ ctx.write(line, one);
+ }
+ }
+
+ /**
+ * Failing mapper.
+ */
+ private static class TestFailingMapper extends Mapper<Object, Text, Text, IntWritable> {
+ @Override protected void map(Object key, Text val, Context c) throws IOException, InterruptedException {
+ throw new IOException("Test failure");
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
+ /** Line constant. */
+ private Text line = new Text("line");
+
+ @Override protected void setup(Context ctx) throws IOException, InterruptedException {
+ super.setup(ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context ctx)
+ throws IOException, InterruptedException {
+ int s = 0;
+
+ for (IntWritable val : values)
+ s += val.get();
+
+ System.out.println(">>>> Reduced: " + s);
+
+ ctx.write(line, new IntWritable(s));
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java
new file mode 100644
index 0000000..a40c531
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.taskexecutor.external.communication;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.HadoopExternalCommunication;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.HadoopMessageListener;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests Hadoop external communication component.
+ */
+public class HadoopExternalCommunicationSelfTest extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-404");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSimpleMessageSendingTcp() throws Exception {
+ checkSimpleMessageSending(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSimpleMessageSendingShmem() throws Exception {
+ checkSimpleMessageSending(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void checkSimpleMessageSending(boolean useShmem) throws Exception {
+ UUID parentNodeId = UUID.randomUUID();
+
+ Marshaller marsh = new JdkMarshaller();
+
+ IgniteLogger log = log();
+
+ HadoopExternalCommunication[] comms = new HadoopExternalCommunication[4];
+
+ try {
+ String name = "grid";
+
+ TestHadoopListener[] lsnrs = new TestHadoopListener[4];
+
+ int msgs = 10;
+
+ for (int i = 0; i < comms.length; i++) {
+ comms[i] = new HadoopExternalCommunication(parentNodeId, UUID.randomUUID(), marsh, log,
+ Executors.newFixedThreadPool(1), name + i);
+
+ if (useShmem)
+ comms[i].setSharedMemoryPort(14000);
+
+ lsnrs[i] = new TestHadoopListener(msgs);
+
+ comms[i].setListener(lsnrs[i]);
+
+ comms[i].start();
+ }
+
+ for (int r = 0; r < msgs; r++) {
+ for (int from = 0; from < comms.length; from++) {
+ for (int to = 0; to < comms.length; to++) {
+ if (from == to)
+ continue;
+
+ comms[from].sendMessage(comms[to].localProcessDescriptor(), new TestMessage(from, to));
+ }
+ }
+ }
+
+ U.sleep(1000);
+
+ for (TestHadoopListener lsnr : lsnrs) {
+ lsnr.await(3_000);
+
+ assertEquals(String.valueOf(lsnr.messages()), msgs * (comms.length - 1), lsnr.messages().size());
+ }
+ }
+ finally {
+ for (HadoopExternalCommunication comm : comms) {
+ if (comm != null)
+ comm.stop();
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestHadoopListener implements HadoopMessageListener {
+ /** Received messages (array list is safe because executor has one thread). */
+ private Collection<TestMessage> msgs = new ArrayList<>();
+
+ /** Await latch. */
+ private CountDownLatch receiveLatch;
+
+ /**
+ * @param msgs Number of messages to await.
+ */
+ private TestHadoopListener(int msgs) {
+ receiveLatch = new CountDownLatch(msgs);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMessageReceived(HadoopProcessDescriptor desc, HadoopMessage msg) {
+ assert msg instanceof TestMessage;
+
+ msgs.add((TestMessage)msg);
+
+ receiveLatch.countDown();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onConnectionLost(HadoopProcessDescriptor desc) {
+ // No-op.
+ }
+
+ /**
+ * @return Received messages.
+ */
+ public Collection<TestMessage> messages() {
+ return msgs;
+ }
+
+ /**
+ * @param millis Time to await.
+ * @throws InterruptedException If wait interrupted.
+ */
+ public void await(int millis) throws InterruptedException {
+ receiveLatch.await(millis, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestMessage implements HadoopMessage {
+ /** From index. */
+ private int from;
+
+ /** To index. */
+ private int to;
+
+ /**
+ * @param from From index.
+ * @param to To index.
+ */
+ private TestMessage(int from, int to) {
+ this.from = from;
+ this.to = to;
+ }
+
+ /**
+ * Required by {@link Externalizable}.
+ */
+ public TestMessage() {
+ // No-op.
+ }
+
+ /**
+ * @return From index.
+ */
+ public int from() {
+ return from;
+ }
+
+ /**
+ * @return To index.
+ */
+ public int to() {
+ return to;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt(from);
+ out.writeInt(to);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ from = in.readInt();
+ to = in.readInt();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/util/BasicUserNameMapperSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/util/BasicUserNameMapperSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/util/BasicUserNameMapperSelfTest.java
new file mode 100644
index 0000000..43924ed
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/util/BasicUserNameMapperSelfTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.util;
+
+import org.apache.ignite.hadoop.util.BasicUserNameMapper;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Test for basic user name mapper.
+ */
+public class BasicUserNameMapperSelfTest extends GridCommonAbstractTest {
+ /**
+ * Test null mappings.
+ *
+ * @throws Exception If failed.
+ */
+ public void testNullMappings() throws Exception {
+ checkNullOrEmptyMappings(null);
+ }
+
+ /**
+ * Test empty mappings.
+ *
+ * @throws Exception If failed.
+ */
+ public void testEmptyMappings() throws Exception {
+ checkNullOrEmptyMappings(new HashMap<String, String>());
+ }
+
+ /**
+ * Check null or empty mappings.
+ *
+ * @param map Mappings.
+ * @throws Exception If failed.
+ */
+ private void checkNullOrEmptyMappings(@Nullable Map<String, String> map) throws Exception {
+ BasicUserNameMapper mapper = create(map, false, null);
+
+ assertNull(mapper.map(null));
+ assertEquals("1", mapper.map("1"));
+ assertEquals("2", mapper.map("2"));
+
+ mapper = create(map, true, null);
+
+ assertNull(mapper.map(null));
+ assertNull(mapper.map("1"));
+ assertNull(mapper.map("2"));
+
+ mapper = create(map, false, "A");
+
+ assertNull(mapper.map(null));
+ assertEquals("1", mapper.map("1"));
+ assertEquals("2", mapper.map("2"));
+
+ mapper = create(map, true, "A");
+
+ assertEquals("A", mapper.map(null));
+ assertEquals("A", mapper.map("1"));
+ assertEquals("A", mapper.map("2"));
+ }
+
+ /**
+ * Test regular mappings.
+ *
+ * @throws Exception If failed.
+ */
+ public void testMappings() throws Exception {
+ Map<String, String> map = new HashMap<>();
+
+ map.put("1", "101");
+
+ BasicUserNameMapper mapper = create(map, false, null);
+
+ assertNull(mapper.map(null));
+ assertEquals("101", mapper.map("1"));
+ assertEquals("2", mapper.map("2"));
+
+ mapper = create(map, true, null);
+
+ assertNull(mapper.map(null));
+ assertEquals("101", mapper.map("1"));
+ assertNull(mapper.map("2"));
+
+ mapper = create(map, false, "A");
+
+ assertNull(mapper.map(null));
+ assertEquals("101", mapper.map("1"));
+ assertEquals("2", mapper.map("2"));
+
+ mapper = create(map, true, "A");
+
+ assertEquals("A", mapper.map(null));
+ assertEquals("101", mapper.map("1"));
+ assertEquals("A", mapper.map("2"));
+ }
+
+ /**
+ * Create mapper.
+ *
+ * @param dictionary Dictionary.
+ * @param useDfltUsrName Whether to use default user name.
+ * @param dfltUsrName Default user name.
+ * @return Mapper.
+ */
+ private BasicUserNameMapper create(@Nullable Map<String, String> dictionary, boolean useDfltUsrName,
+ @Nullable String dfltUsrName) {
+ BasicUserNameMapper mapper = new BasicUserNameMapper();
+
+ mapper.setMappings(dictionary);
+ mapper.setUseDefaultUserName(useDfltUsrName);
+ mapper.setDefaultUserName(dfltUsrName);
+
+ return mapper;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/util/ChainedUserNameMapperSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/util/ChainedUserNameMapperSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/util/ChainedUserNameMapperSelfTest.java
new file mode 100644
index 0000000..a9d295f
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/util/ChainedUserNameMapperSelfTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.util;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.hadoop.util.BasicUserNameMapper;
+import org.apache.ignite.hadoop.util.ChainedUserNameMapper;
+import org.apache.ignite.hadoop.util.KerberosUserNameMapper;
+import org.apache.ignite.hadoop.util.UserNameMapper;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import java.util.Collections;
+import java.util.concurrent.Callable;
+
+/**
+ * Tests for chained user name mapper.
+ */
+public class ChainedUserNameMapperSelfTest extends GridCommonAbstractTest {
+ /** Test instance. */
+ private static final String INSTANCE = "test_instance";
+
+ /** Test realm. */
+ private static final String REALM = "test_realm";
+
+ /**
+ * Test case when mappers are null.
+ *
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+ public void testNullMappers() throws Exception {
+ GridTestUtils.assertThrows(null, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ create((UserNameMapper[])null);
+
+ return null;
+ }
+ }, IgniteException.class, null);
+ }
+
+ /**
+ * Test case when one of mappers is null.
+ *
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+ public void testNullMapperElement() throws Exception {
+ GridTestUtils.assertThrows(null, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ create(new BasicUserNameMapper(), null);
+
+ return null;
+ }
+ }, IgniteException.class, null);
+ }
+
+ /**
+ * Test actual chaining logic.
+ *
+ * @throws Exception If failed.
+ */
+ public void testChaining() throws Exception {
+ BasicUserNameMapper mapper1 = new BasicUserNameMapper();
+
+ mapper1.setMappings(Collections.singletonMap("1", "101"));
+
+ KerberosUserNameMapper mapper2 = new KerberosUserNameMapper();
+
+ mapper2.setInstance(INSTANCE);
+ mapper2.setRealm(REALM);
+
+ ChainedUserNameMapper mapper = create(mapper1, mapper2);
+
+ assertEquals("101" + "/" + INSTANCE + "@" + REALM, mapper.map("1"));
+ assertEquals("2" + "/" + INSTANCE + "@" + REALM, mapper.map("2"));
+ assertEquals(IgfsUtils.fixUserName(null) + "/" + INSTANCE + "@" + REALM, mapper.map(null));
+ }
+
+ /**
+ * Create chained mapper.
+ *
+ * @param mappers Child mappers.
+ * @return Chained mapper.
+ */
+ private ChainedUserNameMapper create(UserNameMapper... mappers) {
+ ChainedUserNameMapper mapper = new ChainedUserNameMapper();
+
+ mapper.setMappers(mappers);
+
+ mapper.start();
+
+ return mapper;
+ }
+}