You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/21 14:53:01 UTC
[09/92] [abbrv] [partial] ignite git commit: Moving classes around.
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTestRoundRobinMrPlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTestRoundRobinMrPlanner.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTestRoundRobinMrPlanner.java
new file mode 100644
index 0000000..81f6f3c
--- /dev/null
+++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTestRoundRobinMrPlanner.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner;
+import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Round-robin mr planner.
+ */
+public class HadoopTestRoundRobinMrPlanner implements HadoopMapReducePlanner {
+ /** {@inheritDoc} */
+ @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top,
+ @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException {
+ if (top.isEmpty())
+ throw new IllegalArgumentException("Topology is empty");
+
+ // Has at least one element.
+ Iterator<ClusterNode> it = top.iterator();
+
+ Map<UUID, Collection<HadoopInputSplit>> mappers = new HashMap<>();
+
+ for (HadoopInputSplit block : job.input()) {
+ ClusterNode node = it.next();
+
+ Collection<HadoopInputSplit> nodeBlocks = mappers.get(node.id());
+
+ if (nodeBlocks == null) {
+ nodeBlocks = new ArrayList<>();
+
+ mappers.put(node.id(), nodeBlocks);
+ }
+
+ nodeBlocks.add(block);
+
+ if (!it.hasNext())
+ it = top.iterator();
+ }
+
+ int[] rdc = new int[job.info().reducers()];
+
+ for (int i = 0; i < rdc.length; i++)
+ rdc[i] = i;
+
+ return new HadoopDefaultMapReducePlan(mappers, Collections.singletonMap(it.next().id(), rdc));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTestTaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTestTaskContext.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTestTaskContext.java
new file mode 100644
index 0000000..357ddd4
--- /dev/null
+++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTestTaskContext.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
+import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2TaskContext;
+
+/**
+ * Context for test purpose.
+ */
+class HadoopTestTaskContext extends HadoopV2TaskContext {
+ /**
+ * Simple key-vale pair.
+ * @param <K> Key class.
+ * @param <V> Value class.
+ */
+ public static class Pair<K,V> {
+ /** Key */
+ private K key;
+
+ /** Value */
+ private V val;
+
+ /**
+ * @param key key.
+ * @param val value.
+ */
+ Pair(K key, V val) {
+ this.key = key;
+ this.val = val;
+ }
+
+ /**
+ * Getter of key.
+ * @return key.
+ */
+ K key() {
+ return key;
+ }
+
+ /**
+ * Getter of value.
+ * @return value.
+ */
+ V value() {
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return key + "," + val;
+ }
+ }
+
+ /** Mock output container- result data of task execution if it is not overridden. */
+ private List<Pair<String, Integer>> mockOutput = new ArrayList<>();
+
+ /** Mock input container- input data if it is not overridden. */
+ private Map<Object,List> mockInput = new TreeMap<>();
+
+ /** Context output implementation to write data into mockOutput. */
+ private HadoopTaskOutput output = new HadoopTaskOutput() {
+ /** {@inheritDoc} */
+ @Override public void write(Object key, Object val) {
+ //Check of casting and extract/copy values
+ String strKey = new String(((Text)key).getBytes());
+ int intVal = ((IntWritable)val).get();
+
+ mockOutput().add(new Pair<>(strKey, intVal));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ throw new UnsupportedOperationException();
+ }
+ };
+
+ /** Context input implementation to read data from mockInput. */
+ private HadoopTaskInput input = new HadoopTaskInput() {
+ /** Iterator of keys and associated lists of values. */
+ Iterator<Map.Entry<Object, List>> iter;
+
+ /** Current key and associated value list. */
+ Map.Entry<Object, List> currEntry;
+
+ /** {@inheritDoc} */
+ @Override public boolean next() {
+ if (iter == null)
+ iter = mockInput().entrySet().iterator();
+
+ if (iter.hasNext())
+ currEntry = iter.next();
+ else
+ currEntry = null;
+
+ return currEntry != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object key() {
+ return currEntry.getKey();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<?> values() {
+ return currEntry.getValue().iterator() ;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ throw new UnsupportedOperationException();
+ }
+ };
+
+ /**
+ * Getter of mock output container - result of task if it is not overridden.
+ *
+ * @return mock output.
+ */
+ public List<Pair<String, Integer>> mockOutput() {
+ return mockOutput;
+ }
+
+ /**
+ * Getter of mock input container- input data if it is not overridden.
+ *
+ * @return mock output.
+ */
+ public Map<Object, List> mockInput() {
+ return mockInput;
+ }
+
+ /**
+ * Generate one-key-multiple-values tree from array of key-value pairs, and wrap its into Writable objects.
+ * The result is placed into mock input.
+ *
+ * @param flatData list of key-value pair.
+ */
+ public void makeTreeOfWritables(Iterable<Pair<String, Integer>> flatData) {
+ Text key = new Text();
+
+ for (HadoopTestTaskContext.Pair<String, Integer> pair : flatData) {
+ key.set(pair.key);
+ ArrayList<IntWritable> valList;
+
+ if (!mockInput.containsKey(key)) {
+ valList = new ArrayList<>();
+ mockInput.put(key, valList);
+ key = new Text();
+ }
+ else
+ valList = (ArrayList<IntWritable>) mockInput.get(key);
+ valList.add(new IntWritable(pair.value()));
+ }
+ }
+
+ /**
+ * @param taskInfo Task info.
+ * @param gridJob Grid Hadoop job.
+ */
+ public HadoopTestTaskContext(HadoopTaskInfo taskInfo, HadoopJob gridJob) throws IgniteCheckedException {
+ super(taskInfo, gridJob, gridJob.id(), null, jobConfDataInput(gridJob));
+ }
+
+ /**
+ * Creates DataInput to read JobConf.
+ *
+ * @param job Job.
+ * @return DataInput with JobConf.
+ * @throws IgniteCheckedException If failed.
+ */
+ private static DataInput jobConfDataInput(HadoopJob job) throws IgniteCheckedException {
+ JobConf jobConf = new JobConf();
+
+ for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)job.info()).properties().entrySet())
+ jobConf.set(e.getKey(), e.getValue());
+
+ ByteArrayOutputStream buf = new ByteArrayOutputStream();
+
+ try {
+ jobConf.write(new DataOutputStream(buf));
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+
+ return new DataInputStream(new ByteArrayInputStream(buf.toByteArray()));
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopTaskOutput output() {
+ return output;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopTaskInput input() {
+ return input;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTestUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTestUtils.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTestUtils.java
new file mode 100644
index 0000000..e8ec8a9
--- /dev/null
+++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTestUtils.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Utility class for tests.
+ */
+public class HadoopTestUtils {
+ /** Base test directory. */
+ private static final File BASE_TEST_DIR = new File(U.getIgniteHome() + "/work/test/hadoop/");
+
+ /**
+ * @return Base directory for tests.
+ */
+ public static File baseTestDir() {
+ return BASE_TEST_DIR;
+ }
+
+ /**
+ * Get test directory.
+ *
+ * @param parts Parts.
+ * @return Directory.
+ */
+ public static File testDir(String... parts) {
+ File res = BASE_TEST_DIR;
+
+ if (parts != null) {
+ for (String part : parts)
+ res = new File(res, part);
+ }
+
+ return res;
+ }
+
+ /**
+ * Clear base test directory.
+ */
+ public static void clearBaseTestDir() {
+ if (baseTestDir().exists())
+ assert delete(baseTestDir());
+ }
+
+ /**
+ * Checks that job statistics file contains valid strings only.
+ *
+ * @param reader Buffered reader to get lines of job statistics.
+ * @return Amount of events.
+ * @throws IOException If failed.
+ */
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ public static long simpleCheckJobStatFile(BufferedReader reader) throws IOException {
+ Collection<String> phases = new HashSet<>();
+
+ phases.add("submit");
+ phases.add("prepare");
+ phases.add("start");
+ phases.add("finish");
+ phases.add("requestId");
+ phases.add("responseId");
+
+ Collection<String> evtTypes = new HashSet<>();
+
+ evtTypes.add("JOB");
+ evtTypes.add("SETUP");
+ evtTypes.add("MAP");
+ evtTypes.add("SHUFFLE");
+ evtTypes.add("REDUCE");
+ evtTypes.add("COMBINE");
+ evtTypes.add("COMMIT");
+
+ long evtCnt = 0;
+ String line;
+
+ Map<Long, String> reduceNodes = new HashMap<>();
+
+ while((line = reader.readLine()) != null) {
+ String[] splitLine = line.split(":");
+
+ //Try parse timestamp
+ Long.parseLong(splitLine[1]);
+
+ String[] evt = splitLine[0].split(" ");
+
+ assertTrue("Unknown event '" + evt[0] + "'", evtTypes.contains(evt[0]));
+
+ String phase;
+
+ if ("JOB".equals(evt[0]))
+ phase = evt[1];
+ else {
+ assertEquals(4, evt.length);
+ assertTrue("The node id is not defined", !F.isEmpty(evt[3]));
+
+ long taskNum = Long.parseLong(evt[1]);
+
+ if (("REDUCE".equals(evt[0]) || "SHUFFLE".equals(evt[0]))) {
+ String nodeId = reduceNodes.get(taskNum);
+
+ if (nodeId == null)
+ reduceNodes.put(taskNum, evt[3]);
+ else
+ assertEquals("Different nodes for SHUFFLE and REDUCE tasks", nodeId, evt[3]);
+ }
+
+ phase = evt[2];
+ }
+
+ assertTrue("Unknown phase '" + phase + "' in " + Arrays.toString(evt), phases.contains(phase));
+
+ evtCnt++;
+ }
+
+ return evtCnt;
+ }
+
+ /**
+ * Deletes file or directory with all sub-directories and files.
+ *
+ * @param file File or directory to delete.
+ * @return {@code true} if and only if the file or directory is successfully deleted,
+ * {@code false} otherwise
+ */
+ public static boolean delete(@Nullable File file) {
+ if (file == null)
+ return false;
+
+ boolean res = true;
+
+ if (file.isDirectory()) {
+ File[] files = file.listFiles();
+
+ if (files != null && files.length > 0)
+ for (File file1 : files)
+ if (file1.isDirectory())
+ res &= delete(file1);
+ else
+ res &= file1.delete();
+
+ res &= file.delete();
+ }
+ else
+ res = file.delete();
+
+ return res;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTxConfigCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTxConfigCacheTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTxConfigCacheTest.java
new file mode 100644
index 0000000..e85baed
--- /dev/null
+++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTxConfigCacheTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.IgniteTxConfigCacheSelfTest;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+
+/**
+ * Test checks whether hadoop system cache doesn't use user defined TX config.
+ */
+public class HadoopTxConfigCacheTest extends IgniteTxConfigCacheSelfTest {
+ /**
+ * Success if system caches weren't timed out.
+ *
+ * @throws Exception
+ */
+ public void testSystemCacheTx() throws Exception {
+ final Ignite ignite = grid(0);
+
+ final IgniteInternalCache<Object, Object> hadoopCache = getSystemCache(ignite, CU.SYS_CACHE_HADOOP_MR);
+
+ checkImplicitTxSuccess(hadoopCache);
+ checkStartTxSuccess(hadoopCache);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUserLibsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUserLibsSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUserLibsSelfTest.java
new file mode 100644
index 0000000..0e4a0ef
--- /dev/null
+++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUserLibsSelfTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.ignite.internal.processors.hadoop.HadoopClasspathUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+
+/**
+ * Tests for user libs parsing.
+ */
+public class HadoopUserLibsSelfTest extends GridCommonAbstractTest {
+ /** Directory 1. */
+ private static final File DIR_1 = HadoopTestUtils.testDir("dir1");
+
+ /** File 1 in directory 1. */
+ private static final File FILE_1_1 = new File(DIR_1, "file1.jar");
+
+ /** File 2 in directory 1. */
+ private static final File FILE_1_2 = new File(DIR_1, "file2.jar");
+
+ /** Directory 2. */
+ private static final File DIR_2 = HadoopTestUtils.testDir("dir2");
+
+ /** File 1 in directory 2. */
+ private static final File FILE_2_1 = new File(DIR_2, "file1.jar");
+
+ /** File 2 in directory 2. */
+ private static final File FILE_2_2 = new File(DIR_2, "file2.jar");
+
+ /** Missing directory. */
+ private static final File MISSING_DIR = HadoopTestUtils.testDir("missing_dir");
+
+ /** Missing file. */
+ private static final File MISSING_FILE = new File(MISSING_DIR, "file.jar");
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ HadoopTestUtils.clearBaseTestDir();
+
+ assert DIR_1.mkdirs();
+ assert DIR_2.mkdirs();
+
+ assert FILE_1_1.createNewFile();
+ assert FILE_1_2.createNewFile();
+ assert FILE_2_1.createNewFile();
+ assert FILE_2_2.createNewFile();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ // Sanity checks before test start.
+ ensureExists(FILE_1_1);
+ ensureExists(FILE_1_2);
+ ensureExists(FILE_2_1);
+ ensureExists(FILE_2_2);
+
+ ensureNotExists(MISSING_DIR);
+ ensureNotExists(MISSING_FILE);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ HadoopTestUtils.clearBaseTestDir();
+ }
+
+ /**
+ * Test null or empty user libs.
+ *
+ * @throws Exception If failed.
+ */
+ public void testNullOrEmptyUserLibs() throws Exception {
+ assert parse(null).isEmpty();
+ assert parse("").isEmpty();
+ }
+
+ /**
+ * Test single file.
+ *
+ * @throws Exception If failed.
+ */
+ public void testSingle() throws Exception {
+ Collection<File> res = parse(single(FILE_1_1));
+
+ assert res.size() == 1;
+ assert res.contains(FILE_1_1);
+
+ res = parse(single(MISSING_FILE));
+
+ assert res.size() == 0;
+ }
+
+ /**
+ * Test multiple files.
+ *
+ * @throws Exception If failed.
+ */
+ public void testMultiple() throws Exception {
+ Collection<File> res =
+ parse(merge(single(FILE_1_1), single(FILE_1_2), single(FILE_2_1), single(FILE_2_2), single(MISSING_FILE)));
+
+ assert res.size() == 4;
+ assert res.contains(FILE_1_1);
+ assert res.contains(FILE_1_2);
+ assert res.contains(FILE_2_1);
+ assert res.contains(FILE_2_2);
+ }
+
+ /**
+ * Test single wildcard.
+ *
+ * @throws Exception If failed.
+ */
+ public void testSingleWildcard() throws Exception {
+ Collection<File> res = parse(wildcard(DIR_1));
+
+ assert res.size() == 2;
+ assert res.contains(FILE_1_1);
+ assert res.contains(FILE_1_2);
+
+ res = parse(wildcard(MISSING_DIR));
+
+ assert res.size() == 0;
+ }
+
+ /**
+ * Test multiple wildcards.
+ *
+ * @throws Exception If failed.
+ */
+ public void testMultipleWildcards() throws Exception {
+ Collection<File> res = parse(merge(wildcard(DIR_1), wildcard(DIR_2), wildcard(MISSING_DIR)));
+
+ assert res.size() == 4;
+ assert res.contains(FILE_1_1);
+ assert res.contains(FILE_1_2);
+ assert res.contains(FILE_2_1);
+ assert res.contains(FILE_2_2);
+ }
+
+ /**
+ * Test mixed tokens.
+ *
+ * @throws Exception If failed.
+ */
+ public void testMixed() throws Exception {
+ String str = merge(
+ single(FILE_1_1),
+ wildcard(DIR_2),
+ single(MISSING_FILE),
+ wildcard(MISSING_DIR)
+ );
+
+ Collection<File> res = parse(str);
+
+ assert res.size() == 3;
+ assert res.contains(FILE_1_1);
+ assert res.contains(FILE_2_1);
+ assert res.contains(FILE_2_2);
+ }
+ /**
+ * Ensure provided file exists.
+ *
+ * @param file File.
+ */
+ private static void ensureExists(File file) {
+ assert file.exists();
+ }
+
+ /**
+ * Ensure provided file doesn't exist.
+ *
+ * @param file File.
+ */
+ private static void ensureNotExists(File file) {
+ assert !file.exists();
+ }
+
+ /**
+ * Merge string using path separator.
+ *
+ * @param vals Values.
+ * @return Result.
+ */
+ private static String merge(String... vals) {
+ StringBuilder res = new StringBuilder();
+
+ if (vals != null) {
+ boolean first = true;
+
+ for (String val : vals) {
+ if (first)
+ first = false;
+ else
+ res.append(File.pathSeparatorChar);
+
+ res.append(val);
+ }
+ }
+
+ return res.toString();
+ }
+
+ /**
+ * Parse string.
+ *
+ * @param str String.
+ * @return Files.
+ * @throws IOException If failed.
+ */
+ Collection<File> parse(String str) throws IOException {
+ Collection<HadoopClasspathUtils.SearchDirectory> dirs = HadoopClasspathUtils.parseUserLibs(str);
+
+ Collection<File> res = new HashSet<>();
+
+ for (HadoopClasspathUtils.SearchDirectory dir : dirs)
+ Collections.addAll(res, dir.files());
+
+ return res;
+ }
+
+ /**
+ * Get absolute path to a single file.
+ *
+ * @param file File.
+ * @return Path.
+ */
+ private static String single(File file) {
+ return file.getAbsolutePath();
+ }
+
+ /**
+ * Create a wildcard.
+ *
+ * @param file File.
+ * @return Wildcard.
+ */
+ private static String wildcard(File file) {
+ return file.getAbsolutePath() + File.separatorChar + "*";
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java
new file mode 100644
index 0000000..694af9b
--- /dev/null
+++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopV2JobSelfTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.InputStream;
+import java.util.UUID;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
+import org.apache.ignite.internal.processors.hadoop.HadoopHelperImpl;
+import org.apache.ignite.internal.processors.hadoop.v2.HadoopSerializationWrapper;
+import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job;
+
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo;
+
+/**
+ * Self test of {@link org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job}.
+ */
+public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest {
+ /** */
+ private static final String TEST_SERIALIZED_VALUE = "Test serialized value";
+
+ /**
+ * Custom serialization class that accepts {@link Writable}.
+ */
+ private static class CustomSerialization extends WritableSerialization {
+ /** {@inheritDoc} */
+ @Override public Deserializer<Writable> getDeserializer(Class<Writable> c) {
+ return new Deserializer<Writable>() {
+ @Override public void open(InputStream in) { }
+
+ @Override public Writable deserialize(Writable writable) {
+ return new Text(TEST_SERIALIZED_VALUE);
+ }
+
+ @Override public void close() { }
+ };
+ }
+ }
+
+ /**
+ * Tests that {@link HadoopJob} provides wrapped serializer if it's set in configuration.
+ *
+ * @throws IgniteCheckedException If fails.
+ */
+ public void testCustomSerializationApplying() throws IgniteCheckedException {
+ JobConf cfg = new JobConf();
+
+ cfg.setMapOutputKeyClass(IntWritable.class);
+ cfg.setMapOutputValueClass(Text.class);
+ cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName());
+
+ HadoopDefaultJobInfo info = createJobInfo(cfg);
+
+ final UUID uuid = UUID.randomUUID();
+
+ HadoopJobId id = new HadoopJobId(uuid, 1);
+
+ HadoopJob job = info.createJob(HadoopV2Job.class, id, log, null, new HadoopHelperImpl());
+
+ HadoopTaskContext taskCtx = job.getTaskContext(new HadoopTaskInfo(HadoopTaskType.MAP, null, 0, 0,
+ null));
+
+ HadoopSerialization ser = taskCtx.keySerialization();
+
+ assertEquals(HadoopSerializationWrapper.class.getName(), ser.getClass().getName());
+
+ DataInput in = new DataInputStream(new ByteArrayInputStream(new byte[0]));
+
+ assertEquals(TEST_SERIALIZED_VALUE, ser.read(in, null).toString());
+
+ ser = taskCtx.valueSerialization();
+
+ assertEquals(HadoopSerializationWrapper.class.getName(), ser.getClass().getName());
+
+ assertEquals(TEST_SERIALIZED_VALUE, ser.read(in, null).toString());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopValidationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopValidationSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopValidationSelfTest.java
new file mode 100644
index 0000000..2d61016
--- /dev/null
+++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopValidationSelfTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+
+/**
+ * Configuration validation tests.
+ */
+public class HadoopValidationSelfTest extends HadoopAbstractSelfTest {
+ /** Peer class loading enabled flag. */
+ public boolean peerClassLoading;
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids(true);
+
+ peerClassLoading = false;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setPeerClassLoadingEnabled(peerClassLoading);
+
+ return cfg;
+ }
+
+ /**
+ * Ensure that Grid starts when all configuration parameters are valid.
+ *
+ * @throws Exception If failed.
+ */
+ public void testValid() throws Exception {
+ startGrids(1);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopWeightedMapReducePlannerTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopWeightedMapReducePlannerTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopWeightedMapReducePlannerTest.java
new file mode 100644
index 0000000..430c675
--- /dev/null
+++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopWeightedMapReducePlannerTest.java
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.ignite.cluster.ClusterMetrics;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.hadoop.mapreduce.IgniteHadoopWeightedMapReducePlanner;
+import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
+import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner;
+import org.apache.ignite.internal.processors.igfs.IgfsIgniteMock;
+import org.apache.ignite.internal.processors.igfs.IgfsMock;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+
+/**
+ * Tests for weighted map-reduce planned.
+ */
+public class HadoopWeightedMapReducePlannerTest extends GridCommonAbstractTest {
+ /** ID 1. */
+ private static final UUID ID_1 = new UUID(0, 1);
+
+ /** ID 2. */
+ private static final UUID ID_2 = new UUID(0, 2);
+
+ /** ID 3. */
+ private static final UUID ID_3 = new UUID(0, 3);
+
+ /** MAC 1. */
+ private static final String MAC_1 = "mac1";
+
+ /** MAC 2. */
+ private static final String MAC_2 = "mac2";
+
+ /** MAC 3. */
+ private static final String MAC_3 = "mac3";
+
+ /** Host 1. */
+ private static final String HOST_1 = "host1";
+
+ /** Host 2. */
+ private static final String HOST_2 = "host2";
+
+ /** Host 3. */
+ private static final String HOST_3 = "host3";
+
+ /** Host 4. */
+ private static final String HOST_4 = "host4";
+
+ /** Host 5. */
+ private static final String HOST_5 = "host5";
+
+ /** Standard node 1. */
+ private static final MockNode NODE_1 = new MockNode(ID_1, MAC_1, HOST_1);
+
+ /** Standard node 2. */
+ private static final MockNode NODE_2 = new MockNode(ID_2, MAC_2, HOST_2);
+
+ /** Standard node 3. */
+ private static final MockNode NODE_3 = new MockNode(ID_3, MAC_3, HOST_3);
+
+ /** Standard nodes. */
+ private static final Collection<ClusterNode> NODES;
+
+ /**
+ * Static initializer.
+ */
+ static {
+ NODES = new ArrayList<>();
+
+ NODES.add(NODE_1);
+ NODES.add(NODE_2);
+ NODES.add(NODE_3);
+ }
+
+ /**
+ * Test one IGFS split being assigned to affinity node.
+ *
+ * @throws Exception If failed.
+ */
+ public void testOneIgfsSplitAffinity() throws Exception {
+ IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, NODE_2).add(100, NODE_3).buildIgfs();
+
+ List<HadoopInputSplit> splits = new ArrayList<>();
+
+ splits.add(new HadoopFileBlock(new String[] { HOST_1 }, URI.create("igfs://igfs@/file"), 0, 50));
+
+ final int expReducers = 4;
+
+ HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, expReducers);
+
+ IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs);
+
+ HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null);
+
+ assert plan.mappers() == 1;
+ assert plan.mapperNodeIds().size() == 1;
+ assert plan.mapperNodeIds().contains(ID_1);
+
+ checkPlanMappers(plan, splits, NODES, false/*only 1 split*/);
+ checkPlanReducers(plan, NODES, expReducers, false/* because of threshold behavior.*/);
+ }
+
+ /**
+ * Test one HDFS splits.
+ *
+ * @throws Exception If failed.
+ */
+ public void testHdfsSplitsAffinity() throws Exception {
+ IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, NODE_2).add(100, NODE_3).buildIgfs();
+
+ final List<HadoopInputSplit> splits = new ArrayList<>();
+
+ splits.add(new HadoopFileBlock(new String[] { HOST_1 }, URI.create("hfds://" + HOST_1 + "/x"), 0, 50));
+ splits.add(new HadoopFileBlock(new String[] { HOST_2 }, URI.create("hfds://" + HOST_2 + "/x"), 50, 100));
+ splits.add(new HadoopFileBlock(new String[] { HOST_3 }, URI.create("hfds://" + HOST_3 + "/x"), 100, 37));
+
+ // The following splits belong to hosts that are out of Ignite topology at all.
+ // This means that these splits should be assigned to any least loaded modes:
+ splits.add(new HadoopFileBlock(new String[] { HOST_4 }, URI.create("hfds://" + HOST_4 + "/x"), 138, 2));
+ splits.add(new HadoopFileBlock(new String[] { HOST_5 }, URI.create("hfds://" + HOST_5 + "/x"), 140, 3));
+
+ final int expReducers = 7;
+
+ HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, expReducers);
+
+ IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs);
+
+ final HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null);
+
+ checkPlanMappers(plan, splits, NODES, true);
+
+ checkPlanReducers(plan, NODES, expReducers, true);
+ }
+
+ /**
+ * Test HDFS splits with Replication == 3.
+ *
+ * @throws Exception If failed.
+ */
+ public void testHdfsSplitsReplication() throws Exception {
+ IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, NODE_2).add(100, NODE_3).buildIgfs();
+
+ final List<HadoopInputSplit> splits = new ArrayList<>();
+
+ splits.add(new HadoopFileBlock(new String[] { HOST_1, HOST_2, HOST_3 }, URI.create("hfds://" + HOST_1 + "/x"), 0, 50));
+ splits.add(new HadoopFileBlock(new String[] { HOST_2, HOST_3, HOST_4 }, URI.create("hfds://" + HOST_2 + "/x"), 50, 100));
+ splits.add(new HadoopFileBlock(new String[] { HOST_3, HOST_4, HOST_5 }, URI.create("hfds://" + HOST_3 + "/x"), 100, 37));
+ // The following splits belong to hosts that are out of Ignite topology at all.
+ // This means that these splits should be assigned to any least loaded modes:
+ splits.add(new HadoopFileBlock(new String[] { HOST_4, HOST_5, HOST_1 }, URI.create("hfds://" + HOST_4 + "/x"), 138, 2));
+ splits.add(new HadoopFileBlock(new String[] { HOST_5, HOST_1, HOST_2 }, URI.create("hfds://" + HOST_5 + "/x"), 140, 3));
+
+ final int expReducers = 8;
+
+ HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, expReducers);
+
+ IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs);
+
+ final HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null);
+
+ checkPlanMappers(plan, splits, NODES, true);
+
+ checkPlanReducers(plan, NODES, expReducers, true);
+ }
+
+ /**
+ * Get all IDs.
+ *
+ * @param nodes Nodes.
+ * @return IDs.
+ */
+ private static Set<UUID> allIds(Collection<ClusterNode> nodes) {
+ Set<UUID> allIds = new HashSet<>();
+
+ for (ClusterNode n : nodes)
+ allIds.add(n.id());
+
+ return allIds;
+ }
+
+ /**
+ * Check mappers for the plan.
+ *
+ * @param plan Plan.
+ * @param splits Splits.
+ * @param nodes Nodes.
+ * @param expectUniformity WHether uniformity is expected.
+ */
+ private static void checkPlanMappers(HadoopMapReducePlan plan, List<HadoopInputSplit> splits,
+ Collection<ClusterNode> nodes, boolean expectUniformity) {
+ // Number of mappers should correspomd to the number of input splits:
+ assertEquals(splits.size(), plan.mappers());
+
+ if (expectUniformity) {
+ // mappers are assigned to all available nodes:
+ assertEquals(nodes.size(), plan.mapperNodeIds().size());
+
+
+ assertEquals(allIds(nodes), plan.mapperNodeIds());
+ }
+
+ // Check all splits are covered by mappers:
+ Set<HadoopInputSplit> set = new HashSet<>();
+
+ for (UUID id: plan.mapperNodeIds()) {
+ Collection<HadoopInputSplit> sp = plan.mappers(id);
+
+ assert sp != null;
+
+ for (HadoopInputSplit s: sp)
+ assertTrue(set.add(s));
+ }
+
+ // must be of the same size & contain same elements:
+ assertEquals(set, new HashSet<>(splits));
+ }
+
+ /**
+ * Check plan reducers.
+ *
+ * @param plan Plan.
+ * @param nodes Nodes.
+ * @param expReducers Expected reducers.
+ * @param expectUniformity Expected uniformity.
+ */
+ private static void checkPlanReducers(HadoopMapReducePlan plan,
+ Collection<ClusterNode> nodes, int expReducers, boolean expectUniformity) {
+
+ assertEquals(expReducers, plan.reducers());
+
+ if (expectUniformity)
+ assertEquals(allIds(nodes), plan.reducerNodeIds());
+
+ int sum = 0;
+ int lenSum = 0;
+
+ for (UUID uuid: plan.reducerNodeIds()) {
+ int[] rr = plan.reducers(uuid);
+
+ assert rr != null;
+
+ lenSum += rr.length;
+
+ for (int i: rr)
+ sum += i;
+ }
+
+ assertEquals(expReducers, lenSum);
+
+ // Numbers in the arrays must be consequtive integers stating from 0,
+ // check that simply calculating their total sum:
+ assertEquals((lenSum * (lenSum - 1) / 2), sum);
+ }
+
+ /**
+ * Create planner for IGFS.
+ *
+ * @param igfs IGFS.
+ * @return Planner.
+ */
+ private static IgniteHadoopWeightedMapReducePlanner createPlanner(IgfsMock igfs) {
+ IgniteHadoopWeightedMapReducePlanner planner = new IgniteHadoopWeightedMapReducePlanner();
+
+ IgfsIgniteMock ignite = new IgfsIgniteMock(null, igfs);
+
+ GridTestUtils.setFieldValue(planner, HadoopAbstractMapReducePlanner.class, "ignite", ignite);
+
+ return planner;
+ }
+
+ /**
+ * Throw {@link UnsupportedOperationException}.
+ */
+ private static void throwUnsupported() {
+ throw new UnsupportedOperationException("Should not be called!");
+ }
+
+ /**
+ * Mocked node.
+ */
+ private static class MockNode implements ClusterNode {
+ /** ID. */
+ private final UUID id;
+
+ /** MAC addresses. */
+ private final String macs;
+
+ /** Addresses. */
+ private final List<String> addrs;
+
+ /**
+ * Constructor.
+ *
+ * @param id Node ID.
+ * @param macs MAC addresses.
+ * @param addrs Addresses.
+ */
+ public MockNode(UUID id, String macs, String... addrs) {
+ assert addrs != null;
+
+ this.id = id;
+ this.macs = macs;
+
+ this.addrs = Arrays.asList(addrs);
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID id() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Nullable @Override public <T> T attribute(String name) {
+ if (F.eq(name, IgniteNodeAttributes.ATTR_MACS))
+ return (T)macs;
+
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<String> addresses() {
+ return addrs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object consistentId() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClusterMetrics metrics() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<String, Object> attributes() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<String> hostNames() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long order() {
+ throwUnsupported();
+
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteProductVersion version() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isLocal() {
+ throwUnsupported();
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isDaemon() {
+ throwUnsupported();
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isClient() {
+ throwUnsupported();
+
+ return false;
+ }
+ }
+
+ /**
+ * Locations builder.
+ */
+ private static class LocationsBuilder {
+ /** Locations. */
+ private final TreeMap<Long, Collection<MockNode>> locs = new TreeMap<>();
+
+ /**
+ * Create new locations builder.
+ *
+ * @return Locations builder.
+ */
+ public static LocationsBuilder create() {
+ return new LocationsBuilder();
+ }
+
+ /**
+ * Add locations.
+ *
+ * @param start Start.
+ * @param nodes Nodes.
+ * @return This builder for chaining.
+ */
+ public LocationsBuilder add(long start, MockNode... nodes) {
+ locs.put(start, Arrays.asList(nodes));
+
+ return this;
+ }
+
+ /**
+ * Build locations.
+ *
+ * @return Locations.
+ */
+ public TreeMap<Long, Collection<MockNode>> build() {
+ return locs;
+ }
+
+ /**
+ * Build IGFS.
+ *
+ * @return IGFS.
+ */
+ public MockIgfs buildIgfs() {
+ return new MockIgfs(build());
+ }
+ }
+
+ /**
+ * Mocked IGFS.
+ */
+ private static class MockIgfs extends IgfsMock {
+ /** Block locations. */
+ private final TreeMap<Long, Collection<MockNode>> locs;
+
+ /**
+ * Constructor.
+ *
+ * @param locs Block locations.
+ */
+ public MockIgfs(TreeMap<Long, Collection<MockNode>> locs) {
+ super("igfs");
+
+ this.locs = locs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) {
+ Collection<IgfsBlockLocation> res = new ArrayList<>();
+
+ long cur = start;
+ long remaining = len;
+
+ long prevLocStart = -1;
+ Collection<MockNode> prevLocNodes = null;
+
+ for (Map.Entry<Long, Collection<MockNode>> locEntry : locs.entrySet()) {
+ long locStart = locEntry.getKey();
+ Collection<MockNode> locNodes = locEntry.getValue();
+
+ if (prevLocNodes != null) {
+ if (cur < locStart) {
+ // Add part from previous block.
+ long prevLen = locStart - prevLocStart;
+
+ res.add(new IgfsBlockLocationMock(cur, prevLen, prevLocNodes));
+
+ cur = locStart;
+ remaining -= prevLen;
+ }
+ }
+
+ prevLocStart = locStart;
+ prevLocNodes = locNodes;
+
+ if (remaining == 0)
+ break;
+ }
+
+ // Add remainder.
+ if (remaining != 0)
+ res.add(new IgfsBlockLocationMock(cur, remaining, prevLocNodes));
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean exists(IgfsPath path) {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isProxy(URI path) {
+ return false;
+ }
+ }
+
+ /**
+ * Mocked block location.
+ */
+ private static class IgfsBlockLocationMock implements IgfsBlockLocation {
+ /** Start. */
+ private final long start;
+
+ /** Length. */
+ private final long len;
+
+ /** Node IDs. */
+ private final List<UUID> nodeIds;
+
+ /**
+ * Constructor.
+ *
+ * @param start Start.
+ * @param len Length.
+ * @param nodes Nodes.
+ */
+ public IgfsBlockLocationMock(long start, long len, Collection<MockNode> nodes) {
+ this.start = start;
+ this.len = len;
+
+ this.nodeIds = new ArrayList<>(nodes.size());
+
+ for (MockNode node : nodes)
+ nodeIds.add(node.id);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long start() {
+ return start;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long length() {
+ return len;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<UUID> nodeIds() {
+ return nodeIds;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<String> names() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<String> hosts() {
+ throwUnsupported();
+
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopWeightedPlannerMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopWeightedPlannerMapReduceTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopWeightedPlannerMapReduceTest.java
new file mode 100644
index 0000000..13f00bd
--- /dev/null
+++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopWeightedPlannerMapReduceTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.hadoop.mapreduce.IgniteHadoopWeightedMapReducePlanner;
+
+/**
+ * Tests whole map-red execution Weighted planner.
+ */
+public class HadoopWeightedPlannerMapReduceTest extends HadoopMapReduceTest {
+ /** {@inheritDoc} */
+ @Override protected HadoopConfiguration createHadoopConfiguration() {
+ HadoopConfiguration hadoopCfg = new HadoopConfiguration();
+
+ // Use weighted planner with default settings:
+ IgniteHadoopWeightedMapReducePlanner planner = new IgniteHadoopWeightedMapReducePlanner();
+
+ hadoopCfg.setMapReducePlanner(planner);
+
+ return hadoopCfg;
+ }
+}