You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/09/26 11:24:58 UTC
[17/47] ignite git commit: IGNITE-3912: Hadoop: Implemented new class
loading architecture for embedded execution mode.
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java
new file mode 100644
index 0000000..a3bf49c
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopJobTrackerSelfTest.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.hadoop.Hadoop;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo;
+import static org.apache.ignite.internal.processors.hadoop.state.HadoopJobTrackerSelfTestState.combineExecCnt;
+import static org.apache.ignite.internal.processors.hadoop.state.HadoopJobTrackerSelfTestState.latch;
+import static org.apache.ignite.internal.processors.hadoop.state.HadoopJobTrackerSelfTestState.mapExecCnt;
+import static org.apache.ignite.internal.processors.hadoop.state.HadoopJobTrackerSelfTestState.reduceExecCnt;
+
+/**
+ * Job tracker self test.
+ */
+public class HadoopJobTrackerSelfTest extends HadoopAbstractSelfTest {
+ /** */
+ private static final String PATH_OUTPUT = "/test-out";
+
+ /** Test block count parameter name. */
+ private static final int BLOCK_CNT = 10;
+
+ /** {@inheritDoc} */
+ @Override protected boolean igfsEnabled() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrids(gridCount());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ latch.put("mapAwaitLatch", new CountDownLatch(1));
+ latch.put("reduceAwaitLatch", new CountDownLatch(1));
+ latch.put("combineAwaitLatch", new CountDownLatch(1));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ mapExecCnt.set(0);
+ combineExecCnt.set(0);
+ reduceExecCnt.set(0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
+ HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+ cfg.setMapReducePlanner(new HadoopTestRoundRobinMrPlanner());
+
+ // TODO: IGNITE-404: Uncomment when fixed.
+ //cfg.setExternalExecution(false);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSimpleTaskSubmit() throws Exception {
+ try {
+ UUID globalId = UUID.randomUUID();
+
+ Job job = Job.getInstance();
+ setupFileSystems(job.getConfiguration());
+
+ job.setMapperClass(TestMapper.class);
+ job.setReducerClass(TestReducer.class);
+ job.setInputFormatClass(InFormat.class);
+
+ FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT + "1"));
+
+ HadoopJobId jobId = new HadoopJobId(globalId, 1);
+
+ grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+
+ checkStatus(jobId, false);
+
+ info("Releasing map latch.");
+
+ latch.get("mapAwaitLatch").countDown();
+
+ checkStatus(jobId, false);
+
+ info("Releasing reduce latch.");
+
+ latch.get("reduceAwaitLatch").countDown();
+
+ checkStatus(jobId, true);
+
+ assertEquals(10, mapExecCnt.get());
+ assertEquals(0, combineExecCnt.get());
+ assertEquals(1, reduceExecCnt.get());
+ }
+ finally {
+ // Safety.
+ latch.get("mapAwaitLatch").countDown();
+ latch.get("combineAwaitLatch").countDown();
+ latch.get("reduceAwaitLatch").countDown();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTaskWithCombinerPerMap() throws Exception {
+ try {
+ UUID globalId = UUID.randomUUID();
+
+ Job job = Job.getInstance();
+ setupFileSystems(job.getConfiguration());
+
+ job.setMapperClass(TestMapper.class);
+ job.setReducerClass(TestReducer.class);
+ job.setCombinerClass(TestCombiner.class);
+ job.setInputFormatClass(InFormat.class);
+
+ FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT + "2"));
+
+ HadoopJobId jobId = new HadoopJobId(globalId, 1);
+
+ grid(0).hadoop().submit(jobId, createJobInfo(job.getConfiguration()));
+
+ checkStatus(jobId, false);
+
+ info("Releasing map latch.");
+
+ latch.get("mapAwaitLatch").countDown();
+
+ checkStatus(jobId, false);
+
+ // All maps are completed. We have a combiner, so no reducers should be executed
+ // before combiner latch is released.
+
+ U.sleep(50);
+
+ assertEquals(0, reduceExecCnt.get());
+
+ info("Releasing combiner latch.");
+
+ latch.get("combineAwaitLatch").countDown();
+
+ checkStatus(jobId, false);
+
+ info("Releasing reduce latch.");
+
+ latch.get("reduceAwaitLatch").countDown();
+
+ checkStatus(jobId, true);
+
+ assertEquals(10, mapExecCnt.get());
+ assertEquals(10, combineExecCnt.get());
+ assertEquals(1, reduceExecCnt.get());
+ }
+ finally {
+ // Safety.
+ latch.get("mapAwaitLatch").countDown();
+ latch.get("combineAwaitLatch").countDown();
+ latch.get("reduceAwaitLatch").countDown();
+ }
+ }
+
+ /**
+ * Checks job execution status.
+ *
+ * @param jobId Job ID.
+ * @param complete Completion status.
+ * @throws Exception If failed.
+ */
+ private void checkStatus(HadoopJobId jobId, boolean complete) throws Exception {
+ for (int i = 0; i < gridCount(); i++) {
+ IgniteKernal kernal = (IgniteKernal)grid(i);
+
+ Hadoop hadoop = kernal.hadoop();
+
+ HadoopJobStatus stat = hadoop.status(jobId);
+
+ assert stat != null;
+
+ IgniteInternalFuture<?> fut = hadoop.finishFuture(jobId);
+
+ if (!complete)
+ assertFalse(fut.isDone());
+ else {
+ info("Waiting for status future completion on node [idx=" + i + ", nodeId=" +
+ kernal.getLocalNodeId() + ']');
+
+ fut.get();
+ }
+ }
+ }
+
+ /**
+ * Test input format
+ */
+ public static class InFormat extends InputFormat {
+
+ @Override public List<InputSplit> getSplits(JobContext ctx) throws IOException, InterruptedException {
+ List<InputSplit> res = new ArrayList<>(BLOCK_CNT);
+
+ for (int i = 0; i < BLOCK_CNT; i++)
+ try {
+ res.add(new FileSplit(new Path(new URI("someFile")), i, i + 1, new String[] {"localhost"}));
+ }
+ catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+
+ return res;
+ }
+
+ @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext ctx) throws IOException, InterruptedException {
+ return new RecordReader() {
+ @Override public void initialize(InputSplit split, TaskAttemptContext ctx) {
+ }
+
+ @Override public boolean nextKeyValue() {
+ return false;
+ }
+
+ @Override public Object getCurrentKey() {
+ return null;
+ }
+
+ @Override public Object getCurrentValue() {
+ return null;
+ }
+
+ @Override public float getProgress() {
+ return 0;
+ }
+
+ @Override public void close() {
+
+ }
+ };
+ }
+ }
+
+ /**
+ * Test mapper.
+ */
+ private static class TestMapper extends Mapper {
+ @Override public void run(Context ctx) throws IOException, InterruptedException {
+ System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId());
+
+ latch.get("mapAwaitLatch").await();
+
+ mapExecCnt.incrementAndGet();
+
+ System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId());
+ }
+ }
+
+ /**
+ * Test reducer.
+ */
+ private static class TestReducer extends Reducer {
+ @Override public void run(Context ctx) throws IOException, InterruptedException {
+ System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId());
+
+ latch.get("reduceAwaitLatch").await();
+
+ reduceExecCnt.incrementAndGet();
+
+ System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId());
+ }
+ }
+
+ /**
+ * Test combiner.
+ */
+ private static class TestCombiner extends Reducer {
+ @Override public void run(Context ctx) throws IOException, InterruptedException {
+ System.out.println("Running task: " + ctx.getTaskAttemptID().getTaskID().getId());
+
+ latch.get("combineAwaitLatch").await();
+
+ combineExecCnt.incrementAndGet();
+
+ System.out.println("Completed task: " + ctx.getTaskAttemptID().getTaskID().getId());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
new file mode 100644
index 0000000..b04deeb
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.util.UUID;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.impl.examples.HadoopWordCount1;
+import org.apache.ignite.internal.processors.hadoop.impl.examples.HadoopWordCount2;
+
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo;
+import static org.apache.ignite.internal.processors.hadoop.state.HadoopMapReduceEmbeddedSelfTestState.flags;
+
+/**
+ * Tests map-reduce execution with embedded mode.
+ */
+public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest {
+ /** {@inheritDoc} */
+ @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
+ HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+ // TODO: IGNITE-404: Uncomment when fixed.
+ //cfg.setExternalExecution(false);
+
+ return cfg;
+ }
+
+ /**
+ * Tests whole job execution with all phases in old and new versions of API with definition of custom
+ * Serialization, Partitioner and IO formats.
+ * @throws Exception If fails.
+ */
+ public void testMultiReducerWholeMapReduceExecution() throws Exception {
+ IgfsPath inDir = new IgfsPath(PATH_INPUT);
+
+ igfs.mkdirs(inDir);
+
+ IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input");
+
+ generateTestFile(inFile.toString(), "key1", 10000, "key2", 20000, "key3", 15000, "key4", 7000, "key5", 12000,
+ "key6", 18000 );
+
+ for (int i = 0; i < 2; i++) {
+ boolean useNewAPI = i == 1;
+
+ igfs.delete(new IgfsPath(PATH_OUTPUT), true);
+
+ flags.put("serializationWasConfigured", false);
+ flags.put("partitionerWasConfigured", false);
+ flags.put("inputFormatWasConfigured", false);
+ flags.put("outputFormatWasConfigured", false);
+
+ JobConf jobConf = new JobConf();
+
+ jobConf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName());
+
+ //To split into about 6-7 items for v2
+ jobConf.setInt(FileInputFormat.SPLIT_MAXSIZE, 65000);
+
+ //For v1
+ jobConf.setInt("fs.local.block.size", 65000);
+
+ // File system coordinates.
+ setupFileSystems(jobConf);
+
+ HadoopWordCount1.setTasksClasses(jobConf, !useNewAPI, !useNewAPI, !useNewAPI);
+
+ if (!useNewAPI) {
+ jobConf.setPartitionerClass(CustomV1Partitioner.class);
+ jobConf.setInputFormat(CustomV1InputFormat.class);
+ jobConf.setOutputFormat(CustomV1OutputFormat.class);
+ }
+
+ Job job = Job.getInstance(jobConf);
+
+ HadoopWordCount2.setTasksClasses(job, useNewAPI, useNewAPI, useNewAPI, false);
+
+ if (useNewAPI) {
+ job.setPartitionerClass(CustomV2Partitioner.class);
+ job.setInputFormatClass(CustomV2InputFormat.class);
+ job.setOutputFormatClass(CustomV2OutputFormat.class);
+ }
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
+
+ FileInputFormat.setInputPaths(job, new Path(igfsScheme() + inFile.toString()));
+ FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT));
+
+ job.setNumReduceTasks(3);
+
+ job.setJarByClass(HadoopWordCount2.class);
+
+ IgniteInternalFuture<?> fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
+ createJobInfo(job.getConfiguration()));
+
+ fut.get();
+
+ assertTrue("Serialization was configured (new API is " + useNewAPI + ")",
+ flags.get("serializationWasConfigured"));
+
+ assertTrue("Partitioner was configured (new API is = " + useNewAPI + ")",
+ flags.get("partitionerWasConfigured"));
+
+ assertTrue("Input format was configured (new API is = " + useNewAPI + ")",
+ flags.get("inputFormatWasConfigured"));
+
+ assertTrue("Output format was configured (new API is = " + useNewAPI + ")",
+ flags.get("outputFormatWasConfigured"));
+
+ assertEquals("Use new API = " + useNewAPI,
+ "key3\t15000\n" +
+ "key6\t18000\n",
+ readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00000")
+ );
+
+ assertEquals("Use new API = " + useNewAPI,
+ "key1\t10000\n" +
+ "key4\t7000\n",
+ readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00001")
+ );
+
+ assertEquals("Use new API = " + useNewAPI,
+ "key2\t20000\n" +
+ "key5\t12000\n",
+ readAndSortFile(PATH_OUTPUT + "/" + (useNewAPI ? "part-r-" : "part-") + "00002")
+ );
+
+ }
+ }
+
+ /**
+ * Custom serialization class that inherits behaviour of native {@link WritableSerialization}.
+ */
+ protected static class CustomSerialization extends WritableSerialization {
+ @Override public void setConf(Configuration conf) {
+ super.setConf(conf);
+
+ flags.put("serializationWasConfigured", true);
+ }
+ }
+
+ /**
+ * Custom implementation of Partitioner in v1 API.
+ */
+ private static class CustomV1Partitioner extends org.apache.hadoop.mapred.lib.HashPartitioner {
+ /** {@inheritDoc} */
+ @Override public void configure(JobConf job) {
+ flags.put("partitionerWasConfigured", true);
+ }
+ }
+
+ /**
+ * Custom implementation of Partitioner in v2 API.
+ */
+ private static class CustomV2Partitioner extends org.apache.hadoop.mapreduce.lib.partition.HashPartitioner
+ implements Configurable {
+ /** {@inheritDoc} */
+ @Override public void setConf(Configuration conf) {
+ flags.put("partitionerWasConfigured", true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Configuration getConf() {
+ return null;
+ }
+ }
+
+ /**
+ * Custom implementation of InputFormat in v2 API.
+ */
+ private static class CustomV2InputFormat extends org.apache.hadoop.mapreduce.lib.input.TextInputFormat implements Configurable {
+ /** {@inheritDoc} */
+ @Override public void setConf(Configuration conf) {
+ flags.put("inputFormatWasConfigured", true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Configuration getConf() {
+ return null;
+ }
+ }
+
+ /**
+ * Custom implementation of OutputFormat in v2 API.
+ */
+ private static class CustomV2OutputFormat extends org.apache.hadoop.mapreduce.lib.output.TextOutputFormat implements Configurable {
+ /** {@inheritDoc} */
+ @Override public void setConf(Configuration conf) {
+ flags.put("outputFormatWasConfigured", true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Configuration getConf() {
+ return null;
+ }
+ }
+
+ /**
+ * Custom implementation of InputFormat in v1 API.
+ */
+ private static class CustomV1InputFormat extends org.apache.hadoop.mapred.TextInputFormat {
+ /** {@inheritDoc} */
+ @Override public void configure(JobConf job) {
+ super.configure(job);
+
+ flags.put("inputFormatWasConfigured", true);
+ }
+ }
+
+ /**
+ * Custom implementation of OutputFormat in v1 API.
+ */
+ private static class CustomV1OutputFormat extends org.apache.hadoop.mapred.TextOutputFormat implements JobConfigurable {
+ /** {@inheritDoc} */
+ @Override public void configure(JobConf job) {
+ flags.put("outputFormatWasConfigured", true);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceErrorResilienceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceErrorResilienceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceErrorResilienceTest.java
new file mode 100644
index 0000000..afd6f26
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceErrorResilienceTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.processors.hadoop.impl.examples.HadoopWordCount2;
+
+/**
+ * Test of error resiliency after an error in a map-reduce job execution.
+ * Combinations tested:
+ * { new ALI, old API }
+ * x { unchecked exception, checked exception, error }
+ * x { phase where the error happens }.
+ */
+public class HadoopMapReduceErrorResilienceTest extends HadoopAbstractMapReduceTest {
+ /**
+ * Tests recovery.
+ *
+ * @throws Exception If failed.
+ */
+ public void testRecoveryAfterAnError0_Runtime() throws Exception {
+ doTestRecoveryAfterAnError(0, HadoopErrorSimulator.Kind.Runtime);
+ }
+
+ /**
+ * Tests recovery.
+ *
+ * @throws Exception If failed.
+ */
+ public void testRecoveryAfterAnError0_IOException() throws Exception {
+ doTestRecoveryAfterAnError(0, HadoopErrorSimulator.Kind.IOException);
+ }
+
+ /**
+ * Tests recovery.
+ *
+ * @throws Exception If failed.
+ */
+ public void testRecoveryAfterAnError0_Error() throws Exception {
+ doTestRecoveryAfterAnError(0, HadoopErrorSimulator.Kind.Error);
+ }
+
+ /**
+ * Tests recovery.
+ *
+ * @throws Exception If failed.
+ */
+ public void testRecoveryAfterAnError7_Runtime() throws Exception {
+ doTestRecoveryAfterAnError(7, HadoopErrorSimulator.Kind.Runtime);
+ }
+ /**
+ * Tests recovery.
+ *
+ * @throws Exception If failed.
+ */
+ public void testRecoveryAfterAnError7_IOException() throws Exception {
+ doTestRecoveryAfterAnError(7, HadoopErrorSimulator.Kind.IOException);
+ }
+ /**
+ * Tests recovery.
+ *
+ * @throws Exception If failed.
+ */
+ public void testRecoveryAfterAnError7_Error() throws Exception {
+ doTestRecoveryAfterAnError(7, HadoopErrorSimulator.Kind.Error);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 10 * 60 * 1000L;
+ }
+
+ /**
+ * Tests correct work after an error.
+ *
+ * @throws Exception On error.
+ */
+ private void doTestRecoveryAfterAnError(int useNewBits, HadoopErrorSimulator.Kind simulatorKind) throws Exception {
+ try {
+ IgfsPath inDir = new IgfsPath(PATH_INPUT);
+
+ igfs.mkdirs(inDir);
+
+ IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input");
+
+ generateTestFile(inFile.toString(), "red", red, "blue", blue, "green", green, "yellow", yellow);
+
+ boolean useNewMapper = (useNewBits & 1) == 0;
+ boolean useNewCombiner = (useNewBits & 2) == 0;
+ boolean useNewReducer = (useNewBits & 4) == 0;
+
+ for (int i = 0; i < 12; i++) {
+ int bits = 1 << i;
+
+ System.out.println("############################ Simulator kind = " + simulatorKind
+ + ", Stage bits = " + bits);
+
+ HadoopErrorSimulator sim = HadoopErrorSimulator.create(simulatorKind, bits);
+
+ doTestWithErrorSimulator(sim, inFile, useNewMapper, useNewCombiner, useNewReducer);
+ }
+ } catch (Throwable t) {
+ t.printStackTrace();
+
+ fail("Unexpected throwable: " + t);
+ }
+ }
+
+ /**
+ * Performs test with given error simulator.
+ *
+ * @param sim The simulator.
+ * @param inFile Input file.
+ * @param useNewMapper If the use new mapper API.
+ * @param useNewCombiner If to use new combiner.
+ * @param useNewReducer If to use new reducer API.
+ * @throws Exception If failed.
+ */
+ private void doTestWithErrorSimulator(HadoopErrorSimulator sim, IgfsPath inFile, boolean useNewMapper,
+ boolean useNewCombiner, boolean useNewReducer) throws Exception {
+ // Set real simulating error simulator:
+ assertTrue(HadoopErrorSimulator.setInstance(HadoopErrorSimulator.noopInstance, sim));
+
+ try {
+ // Expect failure there:
+ doTest(inFile, useNewMapper, useNewCombiner, useNewReducer);
+ }
+ catch (Throwable t) { // This may be an Error.
+ // Expected:
+ System.out.println(t.toString()); // Ignore, continue the test.
+ }
+
+ // Set no-op error simulator:
+ assertTrue(HadoopErrorSimulator.setInstance(sim, HadoopErrorSimulator.noopInstance));
+
+ // Expect success there:
+ doTest(inFile, useNewMapper, useNewCombiner, useNewReducer);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceTest.java
new file mode 100644
index 0000000..feccb59
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.processors.hadoop.impl.examples.HadoopWordCount2;
+
+/**
+ * Test of whole cycle of map-reduce processing via Job tracker.
+ */
+public class HadoopMapReduceTest extends HadoopAbstractMapReduceTest {
+ /**
+ * Tests whole job execution with all phases in all combination of new and old versions of API.
+ * @throws Exception If fails.
+ */
+ public void testWholeMapReduceExecution() throws Exception {
+ IgfsPath inDir = new IgfsPath(PATH_INPUT);
+
+ igfs.mkdirs(inDir);
+
+ IgfsPath inFile = new IgfsPath(inDir, HadoopWordCount2.class.getSimpleName() + "-input");
+
+ generateTestFile(inFile.toString(), "red", red, "blue", blue, "green", green, "yellow", yellow );
+
+ for (boolean[] apiMode: getApiModes()) {
+ assert apiMode.length == 3;
+
+ boolean useNewMapper = apiMode[0];
+ boolean useNewCombiner = apiMode[1];
+ boolean useNewReducer = apiMode[2];
+
+ doTest(inFile, useNewMapper, useNewCombiner, useNewReducer);
+ }
+ }
+
+ /**
+ * Gets API mode combinations to be tested.
+ * Each boolean[] is { newMapper, newCombiner, newReducer } flag triplet.
+ *
+ * @return Arrays of booleans indicating API combinations to test.
+ */
+ protected boolean[][] getApiModes() {
+ return new boolean[][] {
+ { false, false, false },
+ { false, false, true },
+ { false, true, false },
+ { true, false, false },
+ { true, true, true },
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopNoHadoopMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopNoHadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopNoHadoopMapReduceTest.java
new file mode 100644
index 0000000..3bb8735
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopNoHadoopMapReduceTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+
+/**
+ * Test attempt to execute a map-reduce task while no Hadoop processor available.
+ */
+public class HadoopNoHadoopMapReduceTest extends HadoopMapReduceTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration c = super.getConfiguration(gridName);
+
+ c.setHadoopConfiguration(null);
+ c.setPeerClassLoadingEnabled(true);
+
+ return c;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testWholeMapReduceExecution() throws Exception {
+ try {
+ super.testWholeMapReduceExecution();
+
+ fail("IllegalStateException expected.");
+ }
+ catch (IllegalStateException ignore) {
+ // No-op.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java
new file mode 100644
index 0000000..220614c
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPlannerMockJob.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.hadoop.HadoopHelper;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Collection;
+import java.util.UUID;
+
+/**
+ * Mock job for planner tests.
+ */
+public class HadoopPlannerMockJob implements HadoopJob {
+ /** Input splits. */
+ private final Collection<HadoopInputSplit> splits;
+
+ /** Reducers count. */
+ private final int reducers;
+
+ /**
+ * Constructor.
+ *
+ * @param splits Input splits.
+ * @param reducers Reducers.
+ */
+ public HadoopPlannerMockJob(Collection<HadoopInputSplit> splits, int reducers) {
+ this.splits = splits;
+ this.reducers = reducers;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<HadoopInputSplit> input() throws IgniteCheckedException {
+ return splits;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopJobInfo info() {
+ return new JobInfo(reducers);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopJobId id() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopTaskContext getTaskContext(HadoopTaskInfo info) throws IgniteCheckedException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void initialize(boolean external, UUID nodeId) throws IgniteCheckedException {
+ throwUnsupported();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void dispose(boolean external) throws IgniteCheckedException {
+ throwUnsupported();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void prepareTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException {
+ throwUnsupported();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cleanupTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException {
+ throwUnsupported();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cleanupStagingDirectory() {
+ throwUnsupported();
+ }
+
+ /**
+ * Throw {@link UnsupportedOperationException}.
+ */
+ private static void throwUnsupported() {
+ throw new UnsupportedOperationException("Should not be called!");
+ }
+
+ /**
+ * Mocked job info.
+ */
+ private static class JobInfo implements HadoopJobInfo {
+ /** Reducers. */
+ private final int reducers;
+
+ /**
+ * Constructor.
+ *
+ * @param reducers Reducers.
+ */
+ public JobInfo(int reducers) {
+ this.reducers = reducers;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int reducers() {
+ return reducers;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public String property(String name) {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasCombiner() {
+ throwUnsupported();
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasReducer() {
+ throwUnsupported();
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls, HadoopJobId jobId, IgniteLogger log,
+ @Nullable String[] libNames, HadoopHelper helper) throws IgniteCheckedException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String jobName() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String user() {
+ throwUnsupported();
+
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPopularWordsTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPopularWordsTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPopularWordsTest.java
new file mode 100644
index 0000000..5a55430
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopPopularWordsTest.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import com.google.common.collect.MinMaxPriorityQueue;
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Map.Entry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static com.google.common.collect.Maps.immutableEntry;
+import static com.google.common.collect.MinMaxPriorityQueue.orderedBy;
+import static java.util.Collections.reverseOrder;
+
+/**
+ * Hadoop-based 10 popular words example: all files in a given directory are tokenized and for each word longer than
+ * 3 characters the number of occurrences ins calculated. Finally, 10 words with the highest occurrence count are
+ * output.
+ *
+ * NOTE: in order to run this example on Windows please ensure that cygwin is installed and available in the system
+ * path.
+ */
+public class HadoopPopularWordsTest {
+ /** Ignite home. */
+ private static final String IGNITE_HOME = U.getIgniteHome();
+
+ /** The path to the input directory. ALl files in that directory will be processed. */
+ private static final Path BOOKS_LOCAL_DIR =
+ new Path("file:" + IGNITE_HOME, "modules/tests/java/org/apache/ignite/grid/hadoop/books");
+
+ /** The path to the output directory. THe result file will be written to this location. */
+ private static final Path RESULT_LOCAL_DIR =
+ new Path("file:" + IGNITE_HOME, "modules/tests/java/org/apache/ignite/grid/hadoop/output");
+
+ /** Popular books source dir in DFS. */
+ private static final Path BOOKS_DFS_DIR = new Path("tmp/word-count-example/in");
+
+ /** Popular books source dir in DFS. */
+ private static final Path RESULT_DFS_DIR = new Path("tmp/word-count-example/out");
+
+ /** Path to the distributed file system configuration. */
+ private static final String DFS_CFG = "examples/config/filesystem/core-site.xml";
+
+ /** Top N words to select **/
+ private static final int POPULAR_WORDS_CNT = 10;
+
+ /**
+ * For each token in the input string the mapper emits a {word, 1} pair.
+ */
+ private static class TokenizingMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
+ /** Constant value. */
+ private static final IntWritable ONE = new IntWritable(1);
+
+ /** The word converted into the Text. */
+ private Text word = new Text();
+
+ /**
+ * Emits a entry where the key is the word and the value is always 1.
+ *
+ * @param key the current position in the input file (not used here)
+ * @param val the text string
+ * @param ctx mapper context
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Override protected void map(LongWritable key, Text val, Context ctx)
+ throws IOException, InterruptedException {
+ // Get the mapped object.
+ final String line = val.toString();
+
+ // Splits the given string to words.
+ final String[] words = line.split("[^a-zA-Z0-9]");
+
+ for (final String w : words) {
+ // Only emit counts for longer words.
+ if (w.length() <= 3)
+ continue;
+
+ word.set(w);
+
+ // Write the word into the context with the initial count equals 1.
+ ctx.write(word, ONE);
+ }
+ }
+ }
+
+ /**
+ * The reducer uses a priority queue to rank the words based on its number of occurrences.
+ */
+ private static class TopNWordsReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
+ private MinMaxPriorityQueue<Entry<Integer, String>> q;
+
+ TopNWordsReducer() {
+ q = orderedBy(reverseOrder(new Comparator<Entry<Integer, String>>() {
+ @Override public int compare(Entry<Integer, String> o1, Entry<Integer, String> o2) {
+ return o1.getKey().compareTo(o2.getKey());
+ }
+ })).expectedSize(POPULAR_WORDS_CNT).maximumSize(POPULAR_WORDS_CNT).create();
+ }
+
+ /**
+ * This method doesn't emit anything, but just keeps track of the top N words.
+ *
+ * @param key The word.
+ * @param vals The words counts.
+ * @param ctx Reducer context.
+ * @throws IOException If failed.
+ * @throws InterruptedException If failed.
+ */
+ @Override public void reduce(Text key, Iterable<IntWritable> vals, Context ctx) throws IOException,
+ InterruptedException {
+ int sum = 0;
+
+ for (IntWritable val : vals)
+ sum += val.get();
+
+ q.add(immutableEntry(sum, key.toString()));
+ }
+
+ /**
+ * This method is called after all the word entries have been processed. It writes the accumulated
+ * statistics to the job output file.
+ *
+ * @param ctx The job context.
+ * @throws IOException If failed.
+ * @throws InterruptedException If failed.
+ */
+ @Override protected void cleanup(Context ctx) throws IOException, InterruptedException {
+ IntWritable i = new IntWritable();
+
+ Text txt = new Text();
+
+ // iterate in desc order
+ while (!q.isEmpty()) {
+ Entry<Integer, String> e = q.removeFirst();
+
+ i.set(e.getKey());
+
+ txt.set(e.getValue());
+
+ ctx.write(txt, i);
+ }
+ }
+ }
+
+ /**
+ * Configures the Hadoop MapReduce job.
+ *
+ * @return Instance of the Hadoop MapRed job.
+ * @throws IOException If failed.
+ */
+ @SuppressWarnings("deprecation")
+ private Job createConfigBasedHadoopJob() throws IOException {
+ Job jobCfg = new Job();
+
+ Configuration cfg = jobCfg.getConfiguration();
+
+ // Use explicit configuration of distributed file system, if provided.
+ cfg.addResource(U.resolveIgniteUrl(DFS_CFG));
+
+ jobCfg.setJobName("HadoopPopularWordExample");
+ jobCfg.setJarByClass(HadoopPopularWordsTest.class);
+ jobCfg.setInputFormatClass(TextInputFormat.class);
+ jobCfg.setOutputKeyClass(Text.class);
+ jobCfg.setOutputValueClass(IntWritable.class);
+ jobCfg.setMapperClass(TokenizingMapper.class);
+ jobCfg.setReducerClass(TopNWordsReducer.class);
+
+ FileInputFormat.setInputPaths(jobCfg, BOOKS_DFS_DIR);
+ FileOutputFormat.setOutputPath(jobCfg, RESULT_DFS_DIR);
+
+ // Local job tracker allows the only task per wave, but text input format
+ // replaces it with the calculated value based on input split size option.
+ if ("local".equals(cfg.get("mapred.job.tracker", "local"))) {
+ // Split job into tasks using 32MB split size.
+ FileInputFormat.setMinInputSplitSize(jobCfg, 32 * 1024 * 1024);
+ FileInputFormat.setMaxInputSplitSize(jobCfg, Long.MAX_VALUE);
+ }
+
+ return jobCfg;
+ }
+
+ /**
+ * Runs the Hadoop job.
+ *
+ * @return {@code True} if succeeded, {@code false} otherwise.
+ * @throws Exception If failed.
+ */
+ private boolean runWordCountConfigBasedHadoopJob() throws Exception {
+ Job job = createConfigBasedHadoopJob();
+
+ // Distributed file system this job will work with.
+ FileSystem fs = FileSystem.get(job.getConfiguration());
+
+ X.println(">>> Using distributed file system: " + fs.getHomeDirectory());
+
+ // Prepare input and output job directories.
+ prepareDirectories(fs);
+
+ long time = System.currentTimeMillis();
+
+ // Run job.
+ boolean res = job.waitForCompletion(true);
+
+ X.println(">>> Job execution time: " + (System.currentTimeMillis() - time) / 1000 + " sec.");
+
+ // Move job results into local file system, so you can view calculated results.
+ publishResults(fs);
+
+ return res;
+ }
+
+ /**
+ * Prepare job's data: cleanup result directories that might have left over
+ * after previous runs, copy input files from the local file system into DFS.
+ *
+ * @param fs Distributed file system to use in job.
+ * @throws IOException If failed.
+ */
+ private void prepareDirectories(FileSystem fs) throws IOException {
+ X.println(">>> Cleaning up DFS result directory: " + RESULT_DFS_DIR);
+
+ fs.delete(RESULT_DFS_DIR, true);
+
+ X.println(">>> Cleaning up DFS input directory: " + BOOKS_DFS_DIR);
+
+ fs.delete(BOOKS_DFS_DIR, true);
+
+ X.println(">>> Copy local files into DFS input directory: " + BOOKS_DFS_DIR);
+
+ fs.copyFromLocalFile(BOOKS_LOCAL_DIR, BOOKS_DFS_DIR);
+ }
+
+ /**
+ * Publish job execution results into local file system, so you can view them.
+ *
+ * @param fs Distributed file sytem used in job.
+ * @throws IOException If failed.
+ */
+ private void publishResults(FileSystem fs) throws IOException {
+ X.println(">>> Cleaning up DFS input directory: " + BOOKS_DFS_DIR);
+
+ fs.delete(BOOKS_DFS_DIR, true);
+
+ X.println(">>> Cleaning up LOCAL result directory: " + RESULT_LOCAL_DIR);
+
+ fs.delete(RESULT_LOCAL_DIR, true);
+
+ X.println(">>> Moving job results into LOCAL result directory: " + RESULT_LOCAL_DIR);
+
+ fs.copyToLocalFile(true, RESULT_DFS_DIR, RESULT_LOCAL_DIR);
+ }
+
+ /**
+ * Executes a modified version of the Hadoop word count example. Here, in addition to counting the number of
+ * occurrences of the word in the source files, the N most popular words are selected.
+ *
+ * @param args None.
+ */
+ public static void main(String[] args) {
+ try {
+ new HadoopPopularWordsTest().runWordCountConfigBasedHadoopJob();
+ }
+ catch (Exception e) {
+ X.println(">>> Failed to run word count example: " + e.getMessage());
+ }
+
+ System.exit(0);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSerializationWrapperSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSerializationWrapperSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSerializationWrapperSelfTest.java
new file mode 100644
index 0000000..5ccc8ce
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSerializationWrapperSelfTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.util.Arrays;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.serializer.JavaSerialization;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
+import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopSerializationWrapper;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Test of wrapper of the native serialization.
+ */
+public class HadoopSerializationWrapperSelfTest extends GridCommonAbstractTest {
+ /**
+ * Tests read/write of IntWritable via native WritableSerialization.
+ * @throws Exception If fails.
+ */
+ public void testIntWritableSerialization() throws Exception {
+ HadoopSerialization ser = new HadoopSerializationWrapper(new WritableSerialization(), IntWritable.class);
+
+ ByteArrayOutputStream buf = new ByteArrayOutputStream();
+
+ DataOutput out = new DataOutputStream(buf);
+
+ ser.write(out, new IntWritable(3));
+ ser.write(out, new IntWritable(-5));
+
+ assertEquals("[0, 0, 0, 3, -1, -1, -1, -5]", Arrays.toString(buf.toByteArray()));
+
+ DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray()));
+
+ assertEquals(3, ((IntWritable)ser.read(in, null)).get());
+ assertEquals(-5, ((IntWritable)ser.read(in, null)).get());
+ }
+
+ /**
+ * Tests read/write of Integer via native JavaleSerialization.
+ * @throws Exception If fails.
+ */
+ public void testIntJavaSerialization() throws Exception {
+ HadoopSerialization ser = new HadoopSerializationWrapper(new JavaSerialization(), Integer.class);
+
+ ByteArrayOutputStream buf = new ByteArrayOutputStream();
+
+ DataOutput out = new DataOutputStream(buf);
+
+ ser.write(out, 3);
+ ser.write(out, -5);
+ ser.close();
+
+ DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray()));
+
+ assertEquals(3, ((Integer)ser.read(in, null)).intValue());
+ assertEquals(-5, ((Integer)ser.read(in, null)).intValue());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSnappyFullMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSnappyFullMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSnappyFullMapReduceTest.java
new file mode 100644
index 0000000..e27c212
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSnappyFullMapReduceTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+/**
+ * Same test as HadoopMapReduceTest, but with enabled Snappy output compression.
+ */
+public class HadoopSnappyFullMapReduceTest extends HadoopMapReduceTest {
+ /** {@inheritDoc} */
+ @Override protected boolean compressOutputSnappy() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean[][] getApiModes() {
+ return new boolean[][] {
+ { false, false, true },
+ { true, true, true },
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSnappyTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSnappyTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSnappyTest.java
new file mode 100644
index 0000000..80ff754
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSnappyTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.Arrays;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.SnappyCodec;
+import org.apache.hadoop.io.compress.snappy.SnappyCompressor;
+import org.apache.hadoop.util.NativeCodeLoader;
+import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
+import org.apache.ignite.internal.processors.hadoop.HadoopHelperImpl;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests isolated Hadoop Snappy codec usage.
+ */
+public class HadoopSnappyTest extends GridCommonAbstractTest {
+ /** Length of data. */
+ private static final int BYTE_SIZE = 1024 * 50;
+
+ /**
+ * Checks Snappy codec usage.
+ *
+ * @throws Exception On error.
+ */
+ public void testSnappy() throws Throwable {
+ // Run Snappy test in default class loader:
+ checkSnappy();
+
+ // Run the same in several more class loaders simulating jobs and tasks:
+ for (int i = 0; i < 2; i++) {
+ ClassLoader hadoopClsLdr = new HadoopClassLoader(null, "cl-" + i, null, new HadoopHelperImpl());
+
+ Class<?> cls = (Class)Class.forName(HadoopSnappyTest.class.getName(), true, hadoopClsLdr);
+
+ assertEquals(hadoopClsLdr, cls.getClassLoader());
+
+ U.invoke(cls, null, "checkSnappy");
+ }
+ }
+
+ /**
+ * Internal check routine.
+ *
+ * @throws Throwable If failed.
+ */
+ public static void checkSnappy() throws Throwable {
+ try {
+ byte[] expBytes = new byte[BYTE_SIZE];
+ byte[] actualBytes = new byte[BYTE_SIZE];
+
+ for (int i = 0; i < expBytes.length ; i++)
+ expBytes[i] = (byte)ThreadLocalRandom.current().nextInt(16);
+
+ SnappyCodec codec = new SnappyCodec();
+
+ codec.setConf(new Configuration());
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ try (CompressionOutputStream cos = codec.createOutputStream(baos)) {
+ cos.write(expBytes);
+ cos.flush();
+ }
+
+ try (CompressionInputStream cis = codec.createInputStream(new ByteArrayInputStream(baos.toByteArray()))) {
+ int read = cis.read(actualBytes, 0, actualBytes.length);
+
+ assert read == actualBytes.length;
+ }
+
+ assert Arrays.equals(expBytes, actualBytes);
+ }
+ catch (Throwable e) {
+ System.out.println("Snappy check failed:");
+ System.out.println("### NativeCodeLoader.isNativeCodeLoaded: " + NativeCodeLoader.isNativeCodeLoaded());
+ System.out.println("### SnappyCompressor.isNativeCodeLoaded: " + SnappyCompressor.isNativeCodeLoaded());
+
+ throw e;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingExternalTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingExternalTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingExternalTest.java
new file mode 100644
index 0000000..eb4a7d4
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingExternalTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+
+/**
+ * External test for sorting.
+ */
+public class HadoopSortingExternalTest extends HadoopSortingTest {
+ /** {@inheritDoc} */
+ @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
+ HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+ // TODO: IGNITE-404: Uncomment when fixed.
+ //cfg.setExternalExecution(true);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setMarshaller(new JdkMarshaller());
+
+ return cfg;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java
new file mode 100644
index 0000000..a4e7368
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSortingTest.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Scanner;
+import java.util.UUID;
+import org.apache.hadoop.fs.AbstractFileSystem;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.JavaSerialization;
+import org.apache.hadoop.io.serializer.JavaSerializationComparator;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.util.typedef.X;
+
+import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo;
+
+/**
+ * Tests correct sorting.
+ */
+public class HadoopSortingTest extends HadoopAbstractSelfTest {
+ /** */
+ private static final String PATH_INPUT = "/test-in";
+
+ /** */
+ private static final String PATH_OUTPUT = "/test-out";
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 3;
+ }
+
+ /**
+ * @return {@code True} if IGFS is enabled on Hadoop nodes.
+ */
+ @Override protected boolean igfsEnabled() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ startGrids(gridCount());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopConfiguration hadoopConfiguration(String gridName) {
+ HadoopConfiguration cfg = super.hadoopConfiguration(gridName);
+
+ // TODO: IGNITE-404: Uncomment when fixed.
+ //cfg.setExternalExecution(false);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSortSimple() throws Exception {
+ // Generate test data.
+ Job job = Job.getInstance();
+
+ job.setInputFormatClass(InFormat.class);
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(NullWritable.class);
+
+ job.setMapperClass(Mapper.class);
+ job.setNumReduceTasks(0);
+
+ setupFileSystems(job.getConfiguration());
+
+ FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_INPUT));
+
+ X.printerrln("Data generation started.");
+
+ grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1),
+ createJobInfo(job.getConfiguration())).get(180000);
+
+ X.printerrln("Data generation complete.");
+
+ // Run main map-reduce job.
+ job = Job.getInstance();
+
+ setupFileSystems(job.getConfiguration());
+
+ job.getConfiguration().set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, JavaSerialization.class.getName() +
+ "," + WritableSerialization.class.getName());
+
+ FileInputFormat.setInputPaths(job, new Path(igfsScheme() + PATH_INPUT));
+ FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT));
+
+ job.setSortComparatorClass(JavaSerializationComparator.class);
+
+ job.setMapperClass(MyMapper.class);
+ job.setReducerClass(MyReducer.class);
+
+ job.setNumReduceTasks(2);
+
+ job.setMapOutputKeyClass(UUID.class);
+ job.setMapOutputValueClass(NullWritable.class);
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(NullWritable.class);
+
+ X.printerrln("Job started.");
+
+ grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 2),
+ createJobInfo(job.getConfiguration())).get(180000);
+
+ X.printerrln("Job complete.");
+
+ // Check result.
+ Path outDir = new Path(igfsScheme() + PATH_OUTPUT);
+
+ AbstractFileSystem fs = AbstractFileSystem.get(new URI(igfsScheme()), job.getConfiguration());
+
+ for (FileStatus file : fs.listStatus(outDir)) {
+ X.printerrln("__ file: " + file);
+
+ if (file.getLen() == 0)
+ continue;
+
+ FSDataInputStream in = fs.open(file.getPath());
+
+ Scanner sc = new Scanner(in);
+
+ UUID prev = null;
+
+ while(sc.hasNextLine()) {
+ UUID next = UUID.fromString(sc.nextLine());
+
+// X.printerrln("___ check: " + next);
+
+ if (prev != null)
+ assertTrue(prev.compareTo(next) < 0);
+
+ prev = next;
+ }
+ }
+ }
+
+ public static class InFormat extends InputFormat<Text, NullWritable> {
+ /** {@inheritDoc} */
+ @Override public List<InputSplit> getSplits(JobContext ctx) throws IOException, InterruptedException {
+ List<InputSplit> res = new ArrayList<>();
+
+ FakeSplit split = new FakeSplit(20);
+
+ for (int i = 0; i < 10; i++)
+ res.add(split);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public RecordReader<Text, NullWritable> createRecordReader(final InputSplit split,
+ TaskAttemptContext ctx) throws IOException, InterruptedException {
+ return new RecordReader<Text, NullWritable>() {
+ /** */
+ int cnt;
+
+ /** */
+ Text txt = new Text();
+
+ @Override public void initialize(InputSplit split, TaskAttemptContext ctx) {
+ // No-op.
+ }
+
+ @Override public boolean nextKeyValue() throws IOException, InterruptedException {
+ return ++cnt <= split.getLength();
+ }
+
+ @Override public Text getCurrentKey() {
+ txt.set(UUID.randomUUID().toString());
+
+// X.printerrln("___ read: " + txt);
+
+ return txt;
+ }
+
+ @Override public NullWritable getCurrentValue() {
+ return NullWritable.get();
+ }
+
+ @Override public float getProgress() throws IOException, InterruptedException {
+ return (float)cnt / split.getLength();
+ }
+
+ @Override public void close() {
+ // No-op.
+ }
+ };
+ }
+ }
+
+ public static class MyMapper extends Mapper<LongWritable, Text, UUID, NullWritable> {
+ /** {@inheritDoc} */
+ @Override protected void map(LongWritable key, Text val, Context ctx) throws IOException, InterruptedException {
+// X.printerrln("___ map: " + val);
+
+ ctx.write(UUID.fromString(val.toString()), NullWritable.get());
+ }
+ }
+
+ public static class MyReducer extends Reducer<UUID, NullWritable, Text, NullWritable> {
+ /** */
+ private Text text = new Text();
+
+ /** {@inheritDoc} */
+ @Override protected void reduce(UUID key, Iterable<NullWritable> vals, Context ctx)
+ throws IOException, InterruptedException {
+// X.printerrln("___ rdc: " + key);
+
+ text.set(key.toString());
+
+ ctx.write(text, NullWritable.get());
+ }
+ }
+
+ public static class FakeSplit extends InputSplit implements Writable {
+ /** */
+ private static final String[] HOSTS = {"127.0.0.1"};
+
+ /** */
+ private int len;
+
+ /**
+ * @param len Length.
+ */
+ public FakeSplit(int len) {
+ this.len = len;
+ }
+
+ /**
+ *
+ */
+ public FakeSplit() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getLength() throws IOException, InterruptedException {
+ return len;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String[] getLocations() throws IOException, InterruptedException {
+ return HOSTS;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(DataOutput out) throws IOException {
+ out.writeInt(len);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readFields(DataInput in) throws IOException {
+ len = in.readInt();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSplitWrapperSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSplitWrapperSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSplitWrapperSelfTest.java
new file mode 100644
index 0000000..be2bfc2
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopSplitWrapperSelfTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+import java.util.concurrent.Callable;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopSplitWrapper;
+import org.apache.ignite.testframework.GridTestUtils;
+
+/**
+ * Self test of {@link HadoopSplitWrapper}.
+ */
+public class HadoopSplitWrapperSelfTest extends HadoopAbstractSelfTest {
+ /**
+ * Tests serialization of wrapper and the wrapped native split.
+ * @throws Exception If fails.
+ */
+ public void testSerialization() throws Exception {
+ FileSplit nativeSplit = new FileSplit(new Path("/path/to/file"), 100, 500, new String[]{"host1", "host2"});
+
+ assertEquals("/path/to/file:100+500", nativeSplit.toString());
+
+ HadoopSplitWrapper split = HadoopUtils.wrapSplit(10, nativeSplit, nativeSplit.getLocations());
+
+ assertEquals("[host1, host2]", Arrays.toString(split.hosts()));
+
+ ByteArrayOutputStream buf = new ByteArrayOutputStream();
+
+ ObjectOutput out = new ObjectOutputStream(buf);
+
+ out.writeObject(split);
+
+ ObjectInput in = new ObjectInputStream(new ByteArrayInputStream(buf.toByteArray()));
+
+ final HadoopSplitWrapper res = (HadoopSplitWrapper)in.readObject();
+
+ assertEquals("/path/to/file:100+500", HadoopUtils.unwrapSplit(res).toString());
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ res.hosts();
+
+ return null;
+ }
+ }, AssertionError.class, null);
+ }
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8032fc2c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopStartup.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopStartup.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopStartup.java
new file mode 100644
index 0000000..66e341b
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopStartup.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem;
+import org.apache.ignite.internal.util.typedef.G;
+
+/**
+ * Hadoop node startup.
+ */
+public class HadoopStartup {
+ /**
+ * @param args Arguments.
+ */
+ public static void main(String[] args) {
+ G.start("config/hadoop/default-config.xml");
+ }
+
+ /**
+ * @return Configuration for job run.
+ */
+ @SuppressWarnings("UnnecessaryFullyQualifiedName")
+ public static Configuration configuration() {
+ Configuration cfg = new Configuration();
+
+ cfg.set("fs.defaultFS", "igfs://igfs@localhost");
+
+ cfg.set("fs.igfs.impl", org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.class.getName());
+ cfg.set("fs.AbstractFileSystem.igfs.impl", IgniteHadoopFileSystem.class.getName());
+
+ cfg.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
+
+ cfg.set("mapreduce.framework.name", "ignite");
+ cfg.set("mapreduce.jobtracker.address", "localhost:11211");
+
+ return cfg;
+ }
+}
\ No newline at end of file