You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/03/04 16:35:05 UTC
[03/45] incubator-ignite git commit: IGNITE-386: Squashed changes.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPopularWordsTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPopularWordsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPopularWordsTest.java
new file mode 100644
index 0000000..a2f2ac3
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPopularWordsTest.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import com.google.common.collect.*;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.*;
+import org.apache.hadoop.mapreduce.lib.output.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.Map.*;
+
+import static com.google.common.collect.Maps.*;
+import static com.google.common.collect.MinMaxPriorityQueue.*;
+import static java.util.Collections.*;
+
+/**
+ * Hadoop-based 10 popular words example: all files in a given directory are tokenized and for each word longer than
+ * 3 characters the number of occurrences ins calculated. Finally, 10 words with the highest occurrence count are
+ * output.
+ *
+ * NOTE: in order to run this example on Windows please ensure that cygwin is installed and available in the system
+ * path.
+ */
+public class HadoopPopularWordsTest {
+ /** Ignite home. */
+ private static final String IGNITE_HOME = U.getIgniteHome();
+
+ /** The path to the input directory. ALl files in that directory will be processed. */
+ private static final Path BOOKS_LOCAL_DIR =
+ new Path("file:" + IGNITE_HOME, "modules/tests/java/org/apache/ignite/grid/hadoop/books");
+
+ /** The path to the output directory. THe result file will be written to this location. */
+ private static final Path RESULT_LOCAL_DIR =
+ new Path("file:" + IGNITE_HOME, "modules/tests/java/org/apache/ignite/grid/hadoop/output");
+
+ /** Popular books source dir in DFS. */
+ private static final Path BOOKS_DFS_DIR = new Path("tmp/word-count-example/in");
+
+ /** Popular books source dir in DFS. */
+ private static final Path RESULT_DFS_DIR = new Path("tmp/word-count-example/out");
+
+ /** Path to the distributed file system configuration. */
+ private static final String DFS_CFG = "examples/config/filesystem/core-site.xml";
+
+ /** Top N words to select **/
+ private static final int POPULAR_WORDS_CNT = 10;
+
+ /**
+ * For each token in the input string the mapper emits a {word, 1} pair.
+ */
+ private static class TokenizingMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
+ /** Constant value. */
+ private static final IntWritable ONE = new IntWritable(1);
+
+ /** The word converted into the Text. */
+ private Text word = new Text();
+
+ /**
+ * Emits a entry where the key is the word and the value is always 1.
+ *
+ * @param key the current position in the input file (not used here)
+ * @param val the text string
+ * @param ctx mapper context
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Override protected void map(LongWritable key, Text val, Context ctx)
+ throws IOException, InterruptedException {
+ // Get the mapped object.
+ final String line = val.toString();
+
+ // Splits the given string to words.
+ final String[] words = line.split("[^a-zA-Z0-9]");
+
+ for (final String w : words) {
+ // Only emit counts for longer words.
+ if (w.length() <= 3)
+ continue;
+
+ word.set(w);
+
+ // Write the word into the context with the initial count equals 1.
+ ctx.write(word, ONE);
+ }
+ }
+ }
+
+ /**
+ * The reducer uses a priority queue to rank the words based on its number of occurrences.
+ */
+ private static class TopNWordsReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
+ private MinMaxPriorityQueue<Entry<Integer, String>> q;
+
+ TopNWordsReducer() {
+ q = orderedBy(reverseOrder(new Comparator<Entry<Integer, String>>() {
+ @Override public int compare(Entry<Integer, String> o1, Entry<Integer, String> o2) {
+ return o1.getKey().compareTo(o2.getKey());
+ }
+ })).expectedSize(POPULAR_WORDS_CNT).maximumSize(POPULAR_WORDS_CNT).create();
+ }
+
+ /**
+ * This method doesn't emit anything, but just keeps track of the top N words.
+ *
+ * @param key The word.
+ * @param vals The words counts.
+ * @param ctx Reducer context.
+ * @throws IOException If failed.
+ * @throws InterruptedException If failed.
+ */
+ @Override public void reduce(Text key, Iterable<IntWritable> vals, Context ctx) throws IOException,
+ InterruptedException {
+ int sum = 0;
+
+ for (IntWritable val : vals)
+ sum += val.get();
+
+ q.add(immutableEntry(sum, key.toString()));
+ }
+
+ /**
+ * This method is called after all the word entries have been processed. It writes the accumulated
+ * statistics to the job output file.
+ *
+ * @param ctx The job context.
+ * @throws IOException If failed.
+ * @throws InterruptedException If failed.
+ */
+ @Override protected void cleanup(Context ctx) throws IOException, InterruptedException {
+ IntWritable i = new IntWritable();
+
+ Text txt = new Text();
+
+ // iterate in desc order
+ while (!q.isEmpty()) {
+ Entry<Integer, String> e = q.removeFirst();
+
+ i.set(e.getKey());
+
+ txt.set(e.getValue());
+
+ ctx.write(txt, i);
+ }
+ }
+ }
+
+ /**
+ * Configures the Hadoop MapReduce job.
+ *
+ * @return Instance of the Hadoop MapRed job.
+ * @throws IOException If failed.
+ */
+ private Job createConfigBasedHadoopJob() throws IOException {
+ Job jobCfg = new Job();
+
+ Configuration cfg = jobCfg.getConfiguration();
+
+ // Use explicit configuration of distributed file system, if provided.
+ if (DFS_CFG != null)
+ cfg.addResource(U.resolveIgniteUrl(DFS_CFG));
+
+ jobCfg.setJobName("HadoopPopularWordExample");
+ jobCfg.setJarByClass(HadoopPopularWordsTest.class);
+ jobCfg.setInputFormatClass(TextInputFormat.class);
+ jobCfg.setOutputKeyClass(Text.class);
+ jobCfg.setOutputValueClass(IntWritable.class);
+ jobCfg.setMapperClass(TokenizingMapper.class);
+ jobCfg.setReducerClass(TopNWordsReducer.class);
+
+ FileInputFormat.setInputPaths(jobCfg, BOOKS_DFS_DIR);
+ FileOutputFormat.setOutputPath(jobCfg, RESULT_DFS_DIR);
+
+ // Local job tracker allows the only task per wave, but text input format
+ // replaces it with the calculated value based on input split size option.
+ if ("local".equals(cfg.get("mapred.job.tracker", "local"))) {
+ // Split job into tasks using 32MB split size.
+ FileInputFormat.setMinInputSplitSize(jobCfg, 32 * 1024 * 1024);
+ FileInputFormat.setMaxInputSplitSize(jobCfg, Long.MAX_VALUE);
+ }
+
+ return jobCfg;
+ }
+
+ /**
+ * Runs the Hadoop job.
+ *
+ * @return {@code True} if succeeded, {@code false} otherwise.
+ * @throws Exception If failed.
+ */
+ private boolean runWordCountConfigBasedHadoopJob() throws Exception {
+ Job job = createConfigBasedHadoopJob();
+
+ // Distributed file system this job will work with.
+ FileSystem fs = FileSystem.get(job.getConfiguration());
+
+ X.println(">>> Using distributed file system: " + fs.getHomeDirectory());
+
+ // Prepare input and output job directories.
+ prepareDirectories(fs);
+
+ long time = System.currentTimeMillis();
+
+ // Run job.
+ boolean res = job.waitForCompletion(true);
+
+ X.println(">>> Job execution time: " + (System.currentTimeMillis() - time) / 1000 + " sec.");
+
+ // Move job results into local file system, so you can view calculated results.
+ publishResults(fs);
+
+ return res;
+ }
+
+ /**
+ * Prepare job's data: cleanup result directories that might have left over
+ * after previous runs, copy input files from the local file system into DFS.
+ *
+ * @param fs Distributed file system to use in job.
+ * @throws IOException If failed.
+ */
+ private void prepareDirectories(FileSystem fs) throws IOException {
+ X.println(">>> Cleaning up DFS result directory: " + RESULT_DFS_DIR);
+
+ fs.delete(RESULT_DFS_DIR, true);
+
+ X.println(">>> Cleaning up DFS input directory: " + BOOKS_DFS_DIR);
+
+ fs.delete(BOOKS_DFS_DIR, true);
+
+ X.println(">>> Copy local files into DFS input directory: " + BOOKS_DFS_DIR);
+
+ fs.copyFromLocalFile(BOOKS_LOCAL_DIR, BOOKS_DFS_DIR);
+ }
+
+ /**
+ * Publish job execution results into local file system, so you can view them.
+ *
+ * @param fs Distributed file sytem used in job.
+ * @throws IOException If failed.
+ */
+ private void publishResults(FileSystem fs) throws IOException {
+ X.println(">>> Cleaning up DFS input directory: " + BOOKS_DFS_DIR);
+
+ fs.delete(BOOKS_DFS_DIR, true);
+
+ X.println(">>> Cleaning up LOCAL result directory: " + RESULT_LOCAL_DIR);
+
+ fs.delete(RESULT_LOCAL_DIR, true);
+
+ X.println(">>> Moving job results into LOCAL result directory: " + RESULT_LOCAL_DIR);
+
+ fs.copyToLocalFile(true, RESULT_DFS_DIR, RESULT_LOCAL_DIR);
+ }
+
+ /**
+ * Executes a modified version of the Hadoop word count example. Here, in addition to counting the number of
+ * occurrences of the word in the source files, the N most popular words are selected.
+ *
+ * @param args None.
+ */
+ public static void main(String[] args) {
+ try {
+ new HadoopPopularWordsTest().runWordCountConfigBasedHadoopJob();
+ }
+ catch (Exception e) {
+ X.println(">>> Failed to run word count example: " + e.getMessage());
+ }
+
+ System.exit(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java
new file mode 100644
index 0000000..5d5bb94
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSerializationWrapperSelfTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.serializer.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Test of wrapper of the native serialization.
+ */
+public class HadoopSerializationWrapperSelfTest extends GridCommonAbstractTest {
+ /**
+ * Tests read/write of IntWritable via native WritableSerialization.
+ * @throws Exception If fails.
+ */
+ public void testIntWritableSerialization() throws Exception {
+ HadoopSerialization ser = new HadoopSerializationWrapper(new WritableSerialization(), IntWritable.class);
+
+ ByteArrayOutputStream buf = new ByteArrayOutputStream();
+
+ DataOutput out = new DataOutputStream(buf);
+
+ ser.write(out, new IntWritable(3));
+ ser.write(out, new IntWritable(-5));
+
+ assertEquals("[0, 0, 0, 3, -1, -1, -1, -5]", Arrays.toString(buf.toByteArray()));
+
+ DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray()));
+
+ assertEquals(3, ((IntWritable)ser.read(in, null)).get());
+ assertEquals(-5, ((IntWritable)ser.read(in, null)).get());
+ }
+
+ /**
+ * Tests read/write of Integer via native JavaleSerialization.
+ * @throws Exception If fails.
+ */
+ public void testIntJavaSerialization() throws Exception {
+ HadoopSerialization ser = new HadoopSerializationWrapper(new JavaSerialization(), Integer.class);
+
+ ByteArrayOutputStream buf = new ByteArrayOutputStream();
+
+ DataOutput out = new DataOutputStream(buf);
+
+ ser.write(out, 3);
+ ser.write(out, -5);
+ ser.close();
+
+ DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray()));
+
+ assertEquals(3, ((Integer)ser.read(in, null)).intValue());
+ assertEquals(-5, ((Integer)ser.read(in, null)).intValue());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSharedMap.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSharedMap.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSharedMap.java
new file mode 100644
index 0000000..c73ee9f
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSharedMap.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.jdk8.backport.*;
+
+import java.util.concurrent.*;
+
+/**
+ * For tests.
+ */
+public class HadoopSharedMap {
+ /** */
+ private static final ConcurrentMap<String, HadoopSharedMap> maps = new ConcurrentHashMap8<>();
+
+ /** */
+ private final ConcurrentMap<String, Object> map = new ConcurrentHashMap8<>();
+
+ /**
+ * Private.
+ */
+ private HadoopSharedMap() {
+ // No-op.
+ }
+
+ /**
+ * Puts object by key.
+ *
+ * @param key Key.
+ * @param val Value.
+ */
+ public <T> T put(String key, T val) {
+ Object old = map.putIfAbsent(key, val);
+
+ return old == null ? val : (T)old;
+ }
+
+ /**
+ * @param cls Class.
+ * @return Map of static fields.
+ */
+ public static HadoopSharedMap map(Class<?> cls) {
+ HadoopSharedMap m = maps.get(cls.getName());
+
+ if (m != null)
+ return m;
+
+ HadoopSharedMap old = maps.putIfAbsent(cls.getName(), m = new HadoopSharedMap());
+
+ return old == null ? m : old;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java
new file mode 100644
index 0000000..772e77d
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingExternalTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.configuration.*;
+
+/**
+ * External test for sorting.
+ */
+public class HadoopSortingExternalTest extends HadoopSortingTest {
+ /** {@inheritDoc} */
+ @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
+ HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+ cfg.setExternalExecution(true);
+
+ return cfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingTest.java
new file mode 100644
index 0000000..3f6594a
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSortingTest.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.serializer.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.*;
+import org.apache.hadoop.mapreduce.lib.output.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
+
+/**
+ * Tests correct sorting.
+ */
+public class HadoopSortingTest extends HadoopAbstractSelfTest {
+ /** */
+ private static final String PATH_INPUT = "/test-in";
+
+ /** */
+ private static final String PATH_OUTPUT = "/test-out";
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 3;
+ }
+
+ /**
+ * @return {@code True} if IGFS is enabled on Hadoop nodes.
+ */
+ @Override protected boolean igfsEnabled() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ startGrids(gridCount());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
+ HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+ cfg.setExternalExecution(false);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSortSimple() throws Exception {
+ // Generate test data.
+ Job job = Job.getInstance();
+
+ job.setInputFormatClass(InFormat.class);
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(NullWritable.class);
+
+ job.setMapperClass(Mapper.class);
+ job.setNumReduceTasks(0);
+
+ setupFileSystems(job.getConfiguration());
+
+ FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_INPUT));
+
+ X.printerrln("Data generation started.");
+
+ grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
+ createJobInfo(job.getConfiguration())).get(180000);
+
+ X.printerrln("Data generation complete.");
+
+ // Run main map-reduce job.
+ job = Job.getInstance();
+
+ setupFileSystems(job.getConfiguration());
+
+ job.getConfiguration().set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, JavaSerialization.class.getName() +
+ "," + WritableSerialization.class.getName());
+
+ FileInputFormat.setInputPaths(job, new Path(igfsScheme() + PATH_INPUT));
+ FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT));
+
+ job.setSortComparatorClass(JavaSerializationComparator.class);
+
+ job.setMapperClass(MyMapper.class);
+ job.setReducerClass(MyReducer.class);
+
+ job.setNumReduceTasks(2);
+
+ job.setMapOutputKeyClass(UUID.class);
+ job.setMapOutputValueClass(NullWritable.class);
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(NullWritable.class);
+
+ X.printerrln("Job started.");
+
+ grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 2),
+ createJobInfo(job.getConfiguration())).get(180000);
+
+ X.printerrln("Job complete.");
+
+ // Check result.
+ Path outDir = new Path(igfsScheme() + PATH_OUTPUT);
+
+ AbstractFileSystem fs = AbstractFileSystem.get(new URI(igfsScheme()), job.getConfiguration());
+
+ for (FileStatus file : fs.listStatus(outDir)) {
+ X.printerrln("__ file: " + file);
+
+ if (file.getLen() == 0)
+ continue;
+
+ FSDataInputStream in = fs.open(file.getPath());
+
+ Scanner sc = new Scanner(in);
+
+ UUID prev = null;
+
+ while(sc.hasNextLine()) {
+ UUID next = UUID.fromString(sc.nextLine());
+
+// X.printerrln("___ check: " + next);
+
+ if (prev != null)
+ assertTrue(prev.compareTo(next) < 0);
+
+ prev = next;
+ }
+ }
+ }
+
+ public static class InFormat extends InputFormat<Text, NullWritable> {
+ /** {@inheritDoc} */
+ @Override public List<InputSplit> getSplits(JobContext ctx) throws IOException, InterruptedException {
+ List<InputSplit> res = new ArrayList<>();
+
+ FakeSplit split = new FakeSplit(20);
+
+ for (int i = 0; i < 10; i++)
+ res.add(split);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public RecordReader<Text, NullWritable> createRecordReader(final InputSplit split,
+ TaskAttemptContext ctx) throws IOException, InterruptedException {
+ return new RecordReader<Text, NullWritable>() {
+ /** */
+ int cnt;
+
+ /** */
+ Text txt = new Text();
+
+ @Override public void initialize(InputSplit split, TaskAttemptContext ctx) {
+ // No-op.
+ }
+
+ @Override public boolean nextKeyValue() throws IOException, InterruptedException {
+ return ++cnt <= split.getLength();
+ }
+
+ @Override public Text getCurrentKey() {
+ txt.set(UUID.randomUUID().toString());
+
+// X.printerrln("___ read: " + txt);
+
+ return txt;
+ }
+
+ @Override public NullWritable getCurrentValue() {
+ return NullWritable.get();
+ }
+
+ @Override public float getProgress() throws IOException, InterruptedException {
+ return (float)cnt / split.getLength();
+ }
+
+ @Override public void close() {
+ // No-op.
+ }
+ };
+ }
+ }
+
+ public static class MyMapper extends Mapper<LongWritable, Text, UUID, NullWritable> {
+ /** {@inheritDoc} */
+ @Override protected void map(LongWritable key, Text val, Context ctx) throws IOException, InterruptedException {
+// X.printerrln("___ map: " + val);
+
+ ctx.write(UUID.fromString(val.toString()), NullWritable.get());
+ }
+ }
+
+ public static class MyReducer extends Reducer<UUID, NullWritable, Text, NullWritable> {
+ /** */
+ private Text text = new Text();
+
+ /** {@inheritDoc} */
+ @Override protected void reduce(UUID key, Iterable<NullWritable> vals, Context ctx)
+ throws IOException, InterruptedException {
+// X.printerrln("___ rdc: " + key);
+
+ text.set(key.toString());
+
+ ctx.write(text, NullWritable.get());
+ }
+ }
+
+ public static class FakeSplit extends InputSplit implements Writable {
+ /** */
+ private static final String[] HOSTS = {"127.0.0.1"};
+
+ /** */
+ private int len;
+
+ /**
+ * @param len Length.
+ */
+ public FakeSplit(int len) {
+ this.len = len;
+ }
+
+ /**
+ *
+ */
+ public FakeSplit() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getLength() throws IOException, InterruptedException {
+ return len;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String[] getLocations() throws IOException, InterruptedException {
+ return HOSTS;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(DataOutput out) throws IOException {
+ out.writeInt(len);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readFields(DataInput in) throws IOException {
+ len = in.readInt();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java
new file mode 100644
index 0000000..ee490be
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapperSelfTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapreduce.lib.input.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+import org.apache.ignite.testframework.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Self test of {@link org.apache.ignite.internal.processors.hadoop.v2.HadoopSplitWrapper}.
+ */
+public class HadoopSplitWrapperSelfTest extends HadoopAbstractSelfTest {
+ /**
+ * Tests serialization of wrapper and the wrapped native split.
+ * @throws Exception If fails.
+ */
+ public void testSerialization() throws Exception {
+ FileSplit nativeSplit = new FileSplit(new Path("/path/to/file"), 100, 500, new String[]{"host1", "host2"});
+
+ assertEquals("/path/to/file:100+500", nativeSplit.toString());
+
+ HadoopSplitWrapper split = HadoopUtils.wrapSplit(10, nativeSplit, nativeSplit.getLocations());
+
+ assertEquals("[host1, host2]", Arrays.toString(split.hosts()));
+
+ ByteArrayOutputStream buf = new ByteArrayOutputStream();
+
+ ObjectOutput out = new ObjectOutputStream(buf);
+
+ out.writeObject(split);
+
+ ObjectInput in = new ObjectInputStream(new ByteArrayInputStream(buf.toByteArray()));
+
+ final HadoopSplitWrapper res = (HadoopSplitWrapper)in.readObject();
+
+ assertEquals("/path/to/file:100+500", HadoopUtils.unwrapSplit(res).toString());
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ res.hosts();
+
+ return null;
+ }
+ }, AssertionError.class, null);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java
new file mode 100644
index 0000000..1a93223
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.conf.*;
+import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem;
+import org.apache.ignite.internal.util.typedef.*;
+
+/**
+ * Hadoop node startup.
+ */
+public class HadoopStartup {
+ /**
+ * @param args Arguments.
+ */
+ public static void main(String[] args) {
+ G.start("config/hadoop/default-config.xml");
+ }
+
+ /**
+ * @return Configuration for job run.
+ */
+ @SuppressWarnings("UnnecessaryFullyQualifiedName")
+ public static Configuration configuration() {
+ Configuration cfg = new Configuration();
+
+ cfg.set("fs.defaultFS", "igfs://igfs@localhost");
+
+ cfg.set("fs.igfs.impl", org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.class.getName());
+ cfg.set("fs.AbstractFileSystem.igfs.impl", IgniteHadoopFileSystem.class.getName());
+
+ cfg.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
+
+ cfg.set("mapreduce.framework.name", "ignite");
+ cfg.set("mapreduce.jobtracker.address", "localhost:11211");
+
+ return cfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
new file mode 100644
index 0000000..20c5db2
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java
@@ -0,0 +1,551 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.*;
+import org.apache.hadoop.mapreduce.lib.output.*;
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.hadoop.fs.v1.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.testframework.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
+
+/**
+ * Tests map-reduce task execution basics.
+ */
+public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
+ /** */
+ private static HadoopSharedMap m = HadoopSharedMap.map(HadoopTaskExecutionSelfTest.class);
+
+ /** Line count. */
+ private static final AtomicInteger totalLineCnt = m.put("totalLineCnt", new AtomicInteger());
+
+ /** Executed tasks. */
+ private static final AtomicInteger executedTasks = m.put("executedTasks", new AtomicInteger());
+
+ /** Cancelled tasks. */
+ private static final AtomicInteger cancelledTasks = m.put("cancelledTasks", new AtomicInteger());
+
+ /** Working directory of each task. */
+ private static final Map<String, String> taskWorkDirs = m.put("taskWorkDirs",
+ new ConcurrentHashMap<String, String>());
+
+ /** Mapper id to fail. */
+ private static final AtomicInteger failMapperId = m.put("failMapperId", new AtomicInteger());
+
+ /** Number of splits of the current input. */
+ private static final AtomicInteger splitsCount = m.put("splitsCount", new AtomicInteger());
+
+ /** Test param. */
+ private static final String MAP_WRITE = "test.map.write";
+
+
+ /** {@inheritDoc} */
+ @Override public FileSystemConfiguration igfsConfiguration() {
+ FileSystemConfiguration cfg = super.igfsConfiguration();
+
+ cfg.setFragmentizerEnabled(false);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean igfsEnabled() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrids(gridCount());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ grid(0).fileSystem(igfsName).format();
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
+ HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+ cfg.setMaxParallelTasks(5);
+ cfg.setExternalExecution(false);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMapRun() throws Exception {
+ int lineCnt = 10000;
+ String fileName = "/testFile";
+
+ prepareFile(fileName, lineCnt);
+
+ totalLineCnt.set(0);
+ taskWorkDirs.clear();
+
+ Configuration cfg = new Configuration();
+
+ cfg.setStrings("fs.igfs.impl", IgniteHadoopFileSystem.class.getName());
+
+ Job job = Job.getInstance(cfg);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
+
+ job.setMapperClass(TestMapper.class);
+
+ job.setNumReduceTasks(0);
+
+ job.setInputFormatClass(TextInputFormat.class);
+
+ FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/"));
+ FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output/"));
+
+ job.setJarByClass(getClass());
+
+ IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
+ createJobInfo(job.getConfiguration()));
+
+ fut.get();
+
+ assertEquals(lineCnt, totalLineCnt.get());
+
+ assertEquals(32, taskWorkDirs.size());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMapCombineRun() throws Exception {
+ int lineCnt = 10001;
+ String fileName = "/testFile";
+
+ prepareFile(fileName, lineCnt);
+
+ totalLineCnt.set(0);
+ taskWorkDirs.clear();
+
+ Configuration cfg = new Configuration();
+
+ cfg.setStrings("fs.igfs.impl", IgniteHadoopFileSystem.class.getName());
+ cfg.setBoolean(MAP_WRITE, true);
+
+ Job job = Job.getInstance(cfg);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
+
+ job.setMapperClass(TestMapper.class);
+ job.setCombinerClass(TestCombiner.class);
+ job.setReducerClass(TestReducer.class);
+
+ job.setNumReduceTasks(2);
+
+ job.setInputFormatClass(TextInputFormat.class);
+
+ FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/"));
+ FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output"));
+
+ job.setJarByClass(getClass());
+
+ HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 2);
+
+ IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+
+ fut.get();
+
+ assertEquals(lineCnt, totalLineCnt.get());
+
+ assertEquals(34, taskWorkDirs.size());
+
+ for (int g = 0; g < gridCount(); g++)
+ grid(g).hadoop().finishFuture(jobId).get();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMapperException() throws Exception {
+ prepareFile("/testFile", 1000);
+
+ Configuration cfg = new Configuration();
+
+ cfg.setStrings("fs.igfs.impl", IgniteHadoopFileSystem.class.getName());
+
+ Job job = Job.getInstance(cfg);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
+
+ job.setMapperClass(FailMapper.class);
+
+ job.setNumReduceTasks(0);
+
+ job.setInputFormatClass(TextInputFormat.class);
+
+ FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/"));
+ FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output/"));
+
+ job.setJarByClass(getClass());
+
+ final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 3),
+ createJobInfo(job.getConfiguration()));
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ fut.get();
+
+ return null;
+ }
+ }, IgniteCheckedException.class, null);
+ }
+
+ /**
+ * @param fileName File name.
+ * @param lineCnt Line count.
+ * @throws Exception If failed.
+ */
+ private void prepareFile(String fileName, int lineCnt) throws Exception {
+ IgniteFileSystem igfs = grid(0).fileSystem(igfsName);
+
+ try (OutputStream os = igfs.create(new IgfsPath(fileName), true)) {
+ PrintWriter w = new PrintWriter(new OutputStreamWriter(os));
+
+ for (int i = 0; i < lineCnt; i++)
+ w.print("Hello, Hadoop map-reduce!\n");
+
+ w.flush();
+ }
+ }
+
+ /**
+ * Prepare job with mappers to cancel.
+ * @return Fully configured job.
+ * @throws Exception If fails.
+ */
+ private Configuration prepareJobForCancelling() throws Exception {
+ prepareFile("/testFile", 1500);
+
+ executedTasks.set(0);
+ cancelledTasks.set(0);
+ failMapperId.set(0);
+ splitsCount.set(0);
+
+ Configuration cfg = new Configuration();
+
+ setupFileSystems(cfg);
+
+ Job job = Job.getInstance(cfg);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
+
+ job.setMapperClass(CancellingTestMapper.class);
+
+ job.setNumReduceTasks(0);
+
+ job.setInputFormatClass(InFormat.class);
+
+ FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/"));
+ FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output/"));
+
+ job.setJarByClass(getClass());
+
+ return job.getConfiguration();
+ }
+
+ /**
+ * Test input format.
+ */
+ private static class InFormat extends TextInputFormat {
+ @Override public List<InputSplit> getSplits(JobContext ctx) throws IOException {
+ List<InputSplit> res = super.getSplits(ctx);
+
+ splitsCount.set(res.size());
+
+ X.println("___ split of input: " + splitsCount.get());
+
+ return res;
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTaskCancelling() throws Exception {
+ Configuration cfg = prepareJobForCancelling();
+
+ HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1);
+
+ final IgniteInternalFuture<?> fut = grid(0).hadoop().submit(jobId, createJobInfo(cfg));
+
+ if (!GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return splitsCount.get() > 0;
+ }
+ }, 20000)) {
+ U.dumpThreads(log);
+
+ assertTrue(false);
+ }
+
+ if (!GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return executedTasks.get() == splitsCount.get();
+ }
+ }, 20000)) {
+ U.dumpThreads(log);
+
+ assertTrue(false);
+ }
+
+ // Fail mapper with id "1", cancels others
+ failMapperId.set(1);
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ fut.get();
+
+ return null;
+ }
+ }, IgniteCheckedException.class, null);
+
+ assertEquals(executedTasks.get(), cancelledTasks.get() + 1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJobKill() throws Exception {
+ Configuration cfg = prepareJobForCancelling();
+
+ Hadoop hadoop = grid(0).hadoop();
+
+ HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1);
+
+ //Kill unknown job.
+ boolean killRes = hadoop.kill(jobId);
+
+ assertFalse(killRes);
+
+ final IgniteInternalFuture<?> fut = hadoop.submit(jobId, createJobInfo(cfg));
+
+ if (!GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return splitsCount.get() > 0;
+ }
+ }, 20000)) {
+ U.dumpThreads(log);
+
+ assertTrue(false);
+ }
+
+ if (!GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ X.println("___ executed tasks: " + executedTasks.get());
+
+ return executedTasks.get() == splitsCount.get();
+ }
+ }, 20000)) {
+ U.dumpThreads(log);
+
+ fail();
+ }
+
+ //Kill really ran job.
+ killRes = hadoop.kill(jobId);
+
+ assertTrue(killRes);
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ fut.get();
+
+ return null;
+ }
+ }, IgniteCheckedException.class, null);
+
+ assertEquals(executedTasks.get(), cancelledTasks.get());
+
+ //Kill the same job again.
+ killRes = hadoop.kill(jobId);
+
+ assertTrue(killRes);
+ }
+
+ private static class CancellingTestMapper extends Mapper<Object, Text, Text, IntWritable> {
+ private int mapperId;
+
+ /** {@inheritDoc} */
+ @Override protected void setup(Context ctx) throws IOException, InterruptedException {
+ mapperId = executedTasks.incrementAndGet();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run(Context ctx) throws IOException, InterruptedException {
+ try {
+ super.run(ctx);
+ }
+ catch (HadoopTaskCancelledException e) {
+ cancelledTasks.incrementAndGet();
+
+ throw e;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException {
+ if (mapperId == failMapperId.get())
+ throw new IOException();
+
+ Thread.sleep(1000);
+ }
+ }
+
+ /**
+ * Test failing mapper.
+ */
+ private static class FailMapper extends Mapper<Object, Text, Text, IntWritable> {
+ /** {@inheritDoc} */
+ @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException {
+ throw new IOException("Expected");
+ }
+ }
+
+ /**
+ * Mapper calculates number of lines.
+ */
+ private static class TestMapper extends Mapper<Object, Text, Text, IntWritable> {
+ /** Writable integer constant of '1'. */
+ private static final IntWritable ONE = new IntWritable(1);
+
+ /** Line count constant. */
+ public static final Text LINE_COUNT = new Text("lineCount");
+
+ /** {@inheritDoc} */
+ @Override protected void setup(Context ctx) throws IOException, InterruptedException {
+ X.println("___ Mapper: " + ctx.getTaskAttemptID());
+
+ String taskId = ctx.getTaskAttemptID().toString();
+
+ LocalFileSystem locFs = FileSystem.getLocal(ctx.getConfiguration());
+
+ String workDir = locFs.getWorkingDirectory().toString();
+
+ assertNull(taskWorkDirs.put(workDir, taskId));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException {
+ if (ctx.getConfiguration().getBoolean(MAP_WRITE, false))
+ ctx.write(LINE_COUNT, ONE);
+ else
+ totalLineCnt.incrementAndGet();
+ }
+ }
+
+ /**
+ * Combiner calculates number of lines.
+ */
+ private static class TestCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
+ /** */
+ IntWritable sum = new IntWritable();
+
+ /** {@inheritDoc} */
+ @Override protected void setup(Context ctx) throws IOException, InterruptedException {
+ X.println("___ Combiner: ");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException,
+ InterruptedException {
+ int lineCnt = 0;
+
+ for (IntWritable value : values)
+ lineCnt += value.get();
+
+ sum.set(lineCnt);
+
+ X.println("___ combo: " + lineCnt);
+
+ ctx.write(key, sum);
+ }
+ }
+
+ /**
+ * Combiner calculates number of lines.
+ */
+ private static class TestReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
+ /** */
+ IntWritable sum = new IntWritable();
+
+ /** {@inheritDoc} */
+ @Override protected void setup(Context ctx) throws IOException, InterruptedException {
+ X.println("___ Reducer: " + ctx.getTaskAttemptID());
+
+ String taskId = ctx.getTaskAttemptID().toString();
+ String workDir = FileSystem.getLocal(ctx.getConfiguration()).getWorkingDirectory().toString();
+
+ assertNull(taskWorkDirs.put(workDir, taskId));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException,
+ InterruptedException {
+ int lineCnt = 0;
+
+ for (IntWritable value : values) {
+ lineCnt += value.get();
+
+ X.println("___ rdcr: " + value.get());
+ }
+
+ sum.set(lineCnt);
+
+ ctx.write(key, sum);
+
+ X.println("___ RDCR SUM: " + lineCnt);
+
+ totalLineCnt.addAndGet(lineCnt);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
new file mode 100644
index 0000000..aaf0f92
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksAllVersionsTest.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import com.google.common.base.*;
+import org.apache.hadoop.io.*;
+import org.apache.ignite.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.processors.hadoop.examples.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * Tests of Map, Combine and Reduce task executions of any version of hadoop API.
+ */
+abstract class HadoopTasksAllVersionsTest extends HadoopAbstractWordCountTest {
+ /** Empty hosts array. */
+ private static final String[] HOSTS = new String[0];
+
+ /**
+ * Creates some grid hadoop job. Override this method to create tests for any job implementation.
+ *
+ * @param inFile Input file name for the job.
+ * @param outFile Output file name for the job.
+ * @return Hadoop job.
+ * @throws IOException If fails.
+ */
+ public abstract HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception;
+
+ /**
+ * @return prefix of reducer output file name. It's "part-" for v1 and "part-r-" for v2 API
+ */
+ public abstract String getOutputFileNamePrefix();
+
+ /**
+ * Tests map task execution.
+ *
+ * @throws Exception If fails.
+ */
+ @SuppressWarnings("ConstantConditions")
+ public void testMapTask() throws Exception {
+ IgfsPath inDir = new IgfsPath(PATH_INPUT);
+
+ igfs.mkdirs(inDir);
+
+ IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input");
+
+ URI inFileUri = URI.create(igfsScheme() + inFile.toString());
+
+ try (PrintWriter pw = new PrintWriter(igfs.create(inFile, true))) {
+ pw.println("hello0 world0");
+ pw.println("world1 hello1");
+ }
+
+ HadoopFileBlock fileBlock1 = new HadoopFileBlock(HOSTS, inFileUri, 0, igfs.info(inFile).length() - 1);
+
+ try (PrintWriter pw = new PrintWriter(igfs.append(inFile, false))) {
+ pw.println("hello2 world2");
+ pw.println("world3 hello3");
+ }
+ HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, fileBlock1.length(),
+ igfs.info(inFile).length() - fileBlock1.length());
+
+ HadoopV2Job gridJob = getHadoopJob(igfsScheme() + inFile.toString(), igfsScheme() + PATH_OUTPUT);
+
+ HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock1);
+
+ HadoopTestTaskContext ctx = new HadoopTestTaskContext(taskInfo, gridJob);
+
+ ctx.mockOutput().clear();
+
+ ctx.run();
+
+ assertEquals("hello0,1; world0,1; world1,1; hello1,1", Joiner.on("; ").join(ctx.mockOutput()));
+
+ ctx.mockOutput().clear();
+
+ ctx.taskInfo(new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock2));
+
+ ctx.run();
+
+ assertEquals("hello2,1; world2,1; world3,1; hello3,1", Joiner.on("; ").join(ctx.mockOutput()));
+ }
+
+ /**
+ * Generates input data for reduce-like operation into mock context input and runs the operation.
+ *
+ * @param gridJob Job is to create reduce task from.
+ * @param taskType Type of task - combine or reduce.
+ * @param taskNum Number of task in job.
+ * @param words Pairs of words and its counts.
+ * @return Context with mock output.
+ * @throws IgniteCheckedException If fails.
+ */
+ private HadoopTestTaskContext runTaskWithInput(HadoopV2Job gridJob, HadoopTaskType taskType,
+ int taskNum, String... words) throws IgniteCheckedException {
+ HadoopTaskInfo taskInfo = new HadoopTaskInfo(taskType, gridJob.id(), taskNum, 0, null);
+
+ HadoopTestTaskContext ctx = new HadoopTestTaskContext(taskInfo, gridJob);
+
+ for (int i = 0; i < words.length; i+=2) {
+ List<IntWritable> valList = new ArrayList<>();
+
+ for (int j = 0; j < Integer.parseInt(words[i + 1]); j++)
+ valList.add(new IntWritable(1));
+
+ ctx.mockInput().put(new Text(words[i]), valList);
+ }
+
+ ctx.run();
+
+ return ctx;
+ }
+
+ /**
+ * Tests reduce task execution.
+ *
+ * @throws Exception If fails.
+ */
+ public void testReduceTask() throws Exception {
+ HadoopV2Job gridJob = getHadoopJob(igfsScheme() + PATH_INPUT, igfsScheme() + PATH_OUTPUT);
+
+ runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 0, "word1", "5", "word2", "10");
+ runTaskWithInput(gridJob, HadoopTaskType.REDUCE, 1, "word3", "7", "word4", "15");
+
+ assertEquals(
+ "word1\t5\n" +
+ "word2\t10\n",
+ readAndSortFile(PATH_OUTPUT + "/_temporary/0/task_00000000-0000-0000-0000-000000000000_0000_r_000000/" +
+ getOutputFileNamePrefix() + "00000")
+ );
+
+ assertEquals(
+ "word3\t7\n" +
+ "word4\t15\n",
+ readAndSortFile(PATH_OUTPUT + "/_temporary/0/task_00000000-0000-0000-0000-000000000000_0000_r_000001/" +
+ getOutputFileNamePrefix() + "00001")
+ );
+ }
+
+ /**
+ * Tests combine task execution.
+ *
+ * @throws Exception If fails.
+ */
+ public void testCombinerTask() throws Exception {
+ HadoopV2Job gridJob = getHadoopJob("/", "/");
+
+ HadoopTestTaskContext ctx =
+ runTaskWithInput(gridJob, HadoopTaskType.COMBINE, 0, "word1", "5", "word2", "10");
+
+ assertEquals("word1,5; word2,10", Joiner.on("; ").join(ctx.mockOutput()));
+
+ ctx = runTaskWithInput(gridJob, HadoopTaskType.COMBINE, 1, "word3", "7", "word4", "15");
+
+ assertEquals("word3,7; word4,15", Joiner.on("; ").join(ctx.mockOutput()));
+ }
+
+ /**
+ * Runs chain of map-combine task on file block.
+ *
+ * @param fileBlock block of input file to be processed.
+ * @param gridJob Hadoop job implementation.
+ * @return Context of combine task with mock output.
+ * @throws IgniteCheckedException If fails.
+ */
+ private HadoopTestTaskContext runMapCombineTask(HadoopFileBlock fileBlock, HadoopV2Job gridJob)
+ throws IgniteCheckedException {
+ HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.MAP, gridJob.id(), 0, 0, fileBlock);
+
+ HadoopTestTaskContext mapCtx = new HadoopTestTaskContext(taskInfo, gridJob);
+
+ mapCtx.run();
+
+ //Prepare input for combine
+ taskInfo = new HadoopTaskInfo(HadoopTaskType.COMBINE, gridJob.id(), 0, 0, null);
+
+ HadoopTestTaskContext combineCtx = new HadoopTestTaskContext(taskInfo, gridJob);
+
+ combineCtx.makeTreeOfWritables(mapCtx.mockOutput());
+
+ combineCtx.run();
+
+ return combineCtx;
+ }
+
+ /**
+ * Tests all job in complex.
+ * Runs 2 chains of map-combine tasks and sends result into one reduce task.
+ *
+ * @throws Exception If fails.
+ */
+ @SuppressWarnings("ConstantConditions")
+ public void testAllTasks() throws Exception {
+ IgfsPath inDir = new IgfsPath(PATH_INPUT);
+
+ igfs.mkdirs(inDir);
+
+ IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input");
+
+ URI inFileUri = URI.create(igfsScheme() + inFile.toString());
+
+ generateTestFile(inFile.toString(), "red", 100, "blue", 200, "green", 150, "yellow", 70);
+
+ //Split file into two blocks
+ long fileLen = igfs.info(inFile).length();
+
+ Long l = fileLen / 2;
+
+ HadoopFileBlock fileBlock1 = new HadoopFileBlock(HOSTS, inFileUri, 0, l);
+ HadoopFileBlock fileBlock2 = new HadoopFileBlock(HOSTS, inFileUri, l, fileLen - l);
+
+ HadoopV2Job gridJob = getHadoopJob(inFileUri.toString(), igfsScheme() + PATH_OUTPUT);
+
+ HadoopTestTaskContext combine1Ctx = runMapCombineTask(fileBlock1, gridJob);
+
+ HadoopTestTaskContext combine2Ctx = runMapCombineTask(fileBlock2, gridJob);
+
+ //Prepare input for combine
+ HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.REDUCE, gridJob.id(), 0, 0, null);
+
+ HadoopTestTaskContext reduceCtx = new HadoopTestTaskContext(taskInfo, gridJob);
+
+ reduceCtx.makeTreeOfWritables(combine1Ctx.mockOutput());
+ reduceCtx.makeTreeOfWritables(combine2Ctx.mockOutput());
+
+ reduceCtx.run();
+
+ reduceCtx.taskInfo(new HadoopTaskInfo(HadoopTaskType.COMMIT, gridJob.id(), 0, 0, null));
+
+ reduceCtx.run();
+
+ assertEquals(
+ "blue\t200\n" +
+ "green\t150\n" +
+ "red\t100\n" +
+ "yellow\t70\n",
+ readAndSortFile(PATH_OUTPUT + "/" + getOutputFileNamePrefix() + "00000")
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
new file mode 100644
index 0000000..b41a260
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.ignite.internal.processors.hadoop.examples.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+
+import java.io.*;
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
+
+/**
+ * Tests of Map, Combine and Reduce task executions via running of job of hadoop API v1.
+ */
+public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest {
+ /**
+ * Creates WordCount hadoop job for API v1.
+ *
+ * @param inFile Input file name for the job.
+ * @param outFile Output file name for the job.
+ * @return Hadoop job.
+ * @throws IOException If fails.
+ */
+ @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception {
+ JobConf jobConf = HadoopWordCount1.getJob(inFile, outFile);
+
+ setupFileSystems(jobConf);
+
+ HadoopDefaultJobInfo jobInfo = createJobInfo(jobConf);
+
+ HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0);
+
+ return new HadoopV2Job(jobId, jobInfo, log);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getOutputFileNamePrefix() {
+ return "part-";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
new file mode 100644
index 0000000..b677c63
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.*;
+import org.apache.hadoop.mapreduce.lib.output.*;
+import org.apache.ignite.internal.processors.hadoop.examples.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
+
+/**
+ * Tests of Map, Combine and Reduce task executions via running of job of hadoop API v2.
+ */
+public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest {
+ /**
+ * Creates WordCount hadoop job for API v2.
+ *
+ * @param inFile Input file name for the job.
+ * @param outFile Output file name for the job.
+ * @return Hadoop job.
+ * @throws Exception if fails.
+ */
+ @Override public HadoopV2Job getHadoopJob(String inFile, String outFile) throws Exception {
+ Job job = Job.getInstance();
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
+
+ HadoopWordCount2.setTasksClasses(job, true, true, true);
+
+ Configuration conf = job.getConfiguration();
+
+ setupFileSystems(conf);
+
+ FileInputFormat.setInputPaths(job, new Path(inFile));
+ FileOutputFormat.setOutputPath(job, new Path(outFile));
+
+ job.setJarByClass(HadoopWordCount2.class);
+
+ Job hadoopJob = HadoopWordCount2.getJob(inFile, outFile);
+
+ HadoopDefaultJobInfo jobInfo = createJobInfo(hadoopJob.getConfiguration());
+
+ HadoopJobId jobId = new HadoopJobId(new UUID(0, 0), 0);
+
+ return new HadoopV2Job(jobId, jobInfo, log);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getOutputFileNamePrefix() {
+ return "part-r-";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestRoundRobinMrPlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestRoundRobinMrPlanner.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestRoundRobinMrPlanner.java
new file mode 100644
index 0000000..a56c7c7
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestRoundRobinMrPlanner.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.processors.hadoop.planner.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Round-robin mr planner.
+ */
+public class HadoopTestRoundRobinMrPlanner implements HadoopMapReducePlanner {
+ /** {@inheritDoc} */
+ @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top,
+ @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException {
+ if (top.isEmpty())
+ throw new IllegalArgumentException("Topology is empty");
+
+ // Has at least one element.
+ Iterator<ClusterNode> it = top.iterator();
+
+ Map<UUID, Collection<HadoopInputSplit>> mappers = new HashMap<>();
+
+ for (HadoopInputSplit block : job.input()) {
+ ClusterNode node = it.next();
+
+ Collection<HadoopInputSplit> nodeBlocks = mappers.get(node.id());
+
+ if (nodeBlocks == null) {
+ nodeBlocks = new ArrayList<>();
+
+ mappers.put(node.id(), nodeBlocks);
+ }
+
+ nodeBlocks.add(block);
+
+ if (!it.hasNext())
+ it = top.iterator();
+ }
+
+ int[] rdc = new int[job.info().reducers()];
+
+ for (int i = 0; i < rdc.length; i++)
+ rdc[i] = i;
+
+ return new HadoopDefaultMapReducePlan(mappers, Collections.singletonMap(it.next().id(), rdc));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java
new file mode 100644
index 0000000..e444270
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestTaskContext.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Context for test purpose.
+ */
+class HadoopTestTaskContext extends HadoopV2TaskContext {
+ /**
+ * Simple key-vale pair.
+ * @param <K> Key class.
+ * @param <V> Value class.
+ */
+ public static class Pair<K,V> {
+ /** Key */
+ private K key;
+
+ /** Value */
+ private V val;
+
+ /**
+ * @param key key.
+ * @param val value.
+ */
+ Pair(K key, V val) {
+ this.key = key;
+ this.val = val;
+ }
+
+ /**
+ * Getter of key.
+ * @return key.
+ */
+ K key() {
+ return key;
+ }
+
+ /**
+ * Getter of value.
+ * @return value.
+ */
+ V value() {
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return key + "," + val;
+ }
+ }
+
+ /** Mock output container- result data of task execution if it is not overridden. */
+ private List<Pair<String, Integer>> mockOutput = new ArrayList<>();
+
+ /** Mock input container- input data if it is not overridden. */
+ private Map<Object,List> mockInput = new TreeMap<>();
+
+ /** Context output implementation to write data into mockOutput. */
+ private HadoopTaskOutput output = new HadoopTaskOutput() {
+ /** {@inheritDoc} */
+ @Override public void write(Object key, Object val) {
+ //Check of casting and extract/copy values
+ String strKey = new String(((Text)key).getBytes());
+ int intVal = ((IntWritable)val).get();
+
+ mockOutput().add(new Pair<>(strKey, intVal));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ throw new UnsupportedOperationException();
+ }
+ };
+
+ /** Context input implementation to read data from mockInput. */
+ private HadoopTaskInput input = new HadoopTaskInput() {
+ /** Iterator of keys and associated lists of values. */
+ Iterator<Map.Entry<Object, List>> iter;
+
+ /** Current key and associated value list. */
+ Map.Entry<Object, List> currEntry;
+
+ /** {@inheritDoc} */
+ @Override public boolean next() {
+ if (iter == null)
+ iter = mockInput().entrySet().iterator();
+
+ if (iter.hasNext())
+ currEntry = iter.next();
+ else
+ currEntry = null;
+
+ return currEntry != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object key() {
+ return currEntry.getKey();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<?> values() {
+ return currEntry.getValue().iterator() ;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ throw new UnsupportedOperationException();
+ }
+ };
+
+ /**
+ * Getter of mock output container - result of task if it is not overridden.
+ *
+ * @return mock output.
+ */
+ public List<Pair<String, Integer>> mockOutput() {
+ return mockOutput;
+ }
+
+ /**
+ * Getter of mock input container- input data if it is not overridden.
+ *
+ * @return mock output.
+ */
+ public Map<Object, List> mockInput() {
+ return mockInput;
+ }
+
+ /**
+ * Generate one-key-multiple-values tree from array of key-value pairs, and wrap its into Writable objects.
+ * The result is placed into mock input.
+ *
+ * @param flatData list of key-value pair.
+ */
+ public void makeTreeOfWritables(Iterable<Pair<String, Integer>> flatData) {
+ Text key = new Text();
+
+ for (HadoopTestTaskContext.Pair<String, Integer> pair : flatData) {
+ key.set(pair.key);
+ ArrayList<IntWritable> valList;
+
+ if (!mockInput.containsKey(key)) {
+ valList = new ArrayList<>();
+ mockInput.put(key, valList);
+ key = new Text();
+ }
+ else
+ valList = (ArrayList<IntWritable>) mockInput.get(key);
+ valList.add(new IntWritable(pair.value()));
+ }
+ }
+
+ /**
+ * @param taskInfo Task info.
+ * @param gridJob Grid Hadoop job.
+ */
+ public HadoopTestTaskContext(HadoopTaskInfo taskInfo, HadoopJob gridJob) throws IgniteCheckedException {
+ super(taskInfo, gridJob, gridJob.id(), null, jobConfDataInput(gridJob));
+ }
+
+ /**
+ * Creates DataInput to read JobConf.
+ *
+ * @param job Job.
+ * @return DataInput with JobConf.
+ * @throws IgniteCheckedException If failed.
+ */
+ private static DataInput jobConfDataInput(HadoopJob job) throws IgniteCheckedException {
+ JobConf jobConf = new JobConf();
+
+ for (Map.Entry<String, String> e : ((HadoopDefaultJobInfo)job.info()).properties().entrySet())
+ jobConf.set(e.getKey(), e.getValue());
+
+ ByteArrayOutputStream buf = new ByteArrayOutputStream();
+
+ try {
+ jobConf.write(new DataOutputStream(buf));
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+
+ return new DataInputStream(new ByteArrayInputStream(buf.toByteArray()));
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopTaskOutput output() {
+ return output;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopTaskInput input() {
+ return input;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestUtils.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestUtils.java
new file mode 100644
index 0000000..ef60762
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTestUtils.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.internal.util.typedef.*;
+
+import java.io.*;
+import java.util.*;
+
+import static org.junit.Assert.*;
+
+/**
+ * Utility class for tests.
+ */
+public class HadoopTestUtils {
+ /**
+ * Checks that job statistics file contains valid strings only.
+ *
+ * @param reader Buffered reader to get lines of job statistics.
+ * @return Amount of events.
+ * @throws IOException If failed.
+ */
+ public static long simpleCheckJobStatFile(BufferedReader reader) throws IOException {
+ Collection<String> phases = new HashSet<>();
+
+ phases.add("submit");
+ phases.add("prepare");
+ phases.add("start");
+ phases.add("finish");
+ phases.add("requestId");
+ phases.add("responseId");
+
+ Collection<String> evtTypes = new HashSet<>();
+
+ evtTypes.add("JOB");
+ evtTypes.add("SETUP");
+ evtTypes.add("MAP");
+ evtTypes.add("SHUFFLE");
+ evtTypes.add("REDUCE");
+ evtTypes.add("COMBINE");
+ evtTypes.add("COMMIT");
+
+ long evtCnt = 0;
+ String line;
+
+ Map<Long, String> reduceNodes = new HashMap<>();
+
+ while((line = reader.readLine()) != null) {
+ String[] splitLine = line.split(":");
+
+ //Try parse timestamp
+ Long.parseLong(splitLine[1]);
+
+ String[] evt = splitLine[0].split(" ");
+
+ assertTrue("Unknown event '" + evt[0] + "'", evtTypes.contains(evt[0]));
+
+ String phase;
+
+ if ("JOB".equals(evt[0]))
+ phase = evt[1];
+ else {
+ assertEquals(4, evt.length);
+ assertTrue("The node id is not defined", !F.isEmpty(evt[3]));
+
+ long taskNum = Long.parseLong(evt[1]);
+
+ if (("REDUCE".equals(evt[0]) || "SHUFFLE".equals(evt[0]))) {
+ String nodeId = reduceNodes.get(taskNum);
+
+ if (nodeId == null)
+ reduceNodes.put(taskNum, evt[3]);
+ else
+ assertEquals("Different nodes for SHUFFLE and REDUCE tasks", nodeId, evt[3]);
+ }
+
+ phase = evt[2];
+ }
+
+ assertTrue("Unknown phase '" + phase + "' in " + Arrays.toString(evt), phases.contains(phase));
+
+ evtCnt++;
+ }
+
+ return evtCnt;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/498dcfab/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
new file mode 100644
index 0000000..ebc89f4
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.serializer.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+
+import java.io.*;
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
+
+/**
+ * Self test of {@link org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Job}.
+ */
+public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest {
+ /** */
+ private static final String TEST_SERIALIZED_VALUE = "Test serialized value";
+
+ /**
+ * Custom serialization class that accepts {@link Writable}.
+ */
+ private static class CustomSerialization extends WritableSerialization {
+ /** {@inheritDoc} */
+ @Override public Deserializer<Writable> getDeserializer(Class<Writable> c) {
+ return new Deserializer<Writable>() {
+ @Override public void open(InputStream in) { }
+
+ @Override public Writable deserialize(Writable writable) {
+ return new Text(TEST_SERIALIZED_VALUE);
+ }
+
+ @Override public void close() { }
+ };
+ }
+ }
+
+ /**
+ * Tests that {@link HadoopJob} provides wrapped serializer if it's set in configuration.
+ *
+ * @throws IgniteCheckedException If fails.
+ */
+ public void testCustomSerializationApplying() throws IgniteCheckedException {
+ JobConf cfg = new JobConf();
+
+ cfg.setMapOutputKeyClass(IntWritable.class);
+ cfg.setMapOutputValueClass(Text.class);
+ cfg.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName());
+
+ HadoopJob job = new HadoopV2Job(new HadoopJobId(UUID.randomUUID(), 1), createJobInfo(cfg), log);
+
+ HadoopTaskContext taskCtx = job.getTaskContext(new HadoopTaskInfo(HadoopTaskType.MAP, null, 0, 0,
+ null));
+
+ HadoopSerialization ser = taskCtx.keySerialization();
+
+ assertEquals(HadoopSerializationWrapper.class.getName(), ser.getClass().getName());
+
+ DataInput in = new DataInputStream(new ByteArrayInputStream(new byte[0]));
+
+ assertEquals(TEST_SERIALIZED_VALUE, ser.read(in, null).toString());
+
+ ser = taskCtx.valueSerialization();
+
+ assertEquals(HadoopSerializationWrapper.class.getName(), ser.getClass().getName());
+
+ assertEquals(TEST_SERIALIZED_VALUE, ser.read(in, null).toString());
+ }
+}