You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2017/01/23 10:42:00 UTC
[07/10] ignite git commit: IGNITE-4507: Hadoop: added direct output
support for combiner. This closes #1434.
IGNITE-4507: Hadoop: added direct output support for combiner. This closes #1434.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d596f02b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d596f02b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d596f02b
Branch: refs/heads/ignite-comm-balance-master
Commit: d596f02b1c64c789f91dea57d349510101d3e201
Parents: d6d42c2
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Fri Jan 20 17:33:34 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Jan 20 17:33:34 2017 +0300
----------------------------------------------------------------------
.../processors/hadoop/HadoopTaskContext.java | 10 +++
.../hadoop/impl/v1/HadoopV1MapTask.java | 89 +++++++++++---------
.../hadoop/impl/v1/HadoopV1ReduceTask.java | 69 +++++++++------
.../hadoop/impl/v2/HadoopV2Context.java | 10 ---
.../hadoop/impl/v2/HadoopV2MapTask.java | 18 ++--
.../hadoop/impl/v2/HadoopV2ReduceTask.java | 14 +++
.../hadoop/impl/v2/HadoopV2TaskContext.java | 1 +
.../hadoop/shuffle/HadoopShuffleJob.java | 7 --
.../shuffle/direct/HadoopDirectDataInput.java | 2 +-
.../hadoop/taskexecutor/HadoopRunnableTask.java | 12 ++-
.../impl/HadoopAbstractMapReduceTest.java | 2 +
.../impl/HadoopMapReduceEmbeddedSelfTest.java | 6 +-
12 files changed, 145 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
index dddd017..d6e9394 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
@@ -207,4 +207,14 @@ public abstract class HadoopTaskContext {
* @throws IgniteCheckedException On any error in callable.
*/
public abstract <T> T runAsJobOwner(Callable<T> c) throws IgniteCheckedException;
+
+ /**
+ * Callback invoked from mapper thread when map is finished.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ public void onMapperFinished() throws IgniteCheckedException {
+ if (output instanceof HadoopMapperAwareTaskOutput)
+ ((HadoopMapperAwareTaskOutput)output).onMapperFinished();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
index 65ff280..2aa4292 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
@@ -30,6 +30,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
@@ -45,7 +46,7 @@ public class HadoopV1MapTask extends HadoopV1Task {
/**
* Constructor.
*
- * @param taskInfo
+ * @param taskInfo Taks info.
*/
public HadoopV1MapTask(HadoopTaskInfo taskInfo) {
super(taskInfo);
@@ -56,67 +57,79 @@ public class HadoopV1MapTask extends HadoopV1Task {
@Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
HadoopJob job = taskCtx.job();
- HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+ HadoopV2TaskContext taskCtx0 = (HadoopV2TaskContext)taskCtx;
- JobConf jobConf = ctx.jobConf();
+ if (taskCtx.taskInfo().hasMapperIndex())
+ HadoopMapperUtils.mapperIndex(taskCtx.taskInfo().mapperIndex());
+ else
+ HadoopMapperUtils.clearMapperIndex();
- InputFormat inFormat = jobConf.getInputFormat();
+ try {
+ JobConf jobConf = taskCtx0.jobConf();
- HadoopInputSplit split = info().inputSplit();
+ InputFormat inFormat = jobConf.getInputFormat();
- InputSplit nativeSplit;
+ HadoopInputSplit split = info().inputSplit();
- if (split instanceof HadoopFileBlock) {
- HadoopFileBlock block = (HadoopFileBlock)split;
+ InputSplit nativeSplit;
- nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), EMPTY_HOSTS);
- }
- else
- nativeSplit = (InputSplit)ctx.getNativeSplit(split);
+ if (split instanceof HadoopFileBlock) {
+ HadoopFileBlock block = (HadoopFileBlock)split;
- assert nativeSplit != null;
+ nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), EMPTY_HOSTS);
+ }
+ else
+ nativeSplit = (InputSplit)taskCtx0.getNativeSplit(split);
- Reporter reporter = new HadoopV1Reporter(taskCtx);
+ assert nativeSplit != null;
- HadoopV1OutputCollector collector = null;
+ Reporter reporter = new HadoopV1Reporter(taskCtx);
- try {
- collector = collector(jobConf, ctx, !job.info().hasCombiner() && !job.info().hasReducer(),
- fileName(), ctx.attemptId());
+ HadoopV1OutputCollector collector = null;
- RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter);
+ try {
+ collector = collector(jobConf, taskCtx0, !job.info().hasCombiner() && !job.info().hasReducer(),
+ fileName(), taskCtx0.attemptId());
- Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf);
+ RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter);
- Object key = reader.createKey();
- Object val = reader.createValue();
+ Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf);
- assert mapper != null;
+ Object key = reader.createKey();
+ Object val = reader.createValue();
+
+ assert mapper != null;
- try {
try {
- while (reader.next(key, val)) {
- if (isCancelled())
- throw new HadoopTaskCancelledException("Map task cancelled.");
+ try {
+ while (reader.next(key, val)) {
+ if (isCancelled())
+ throw new HadoopTaskCancelledException("Map task cancelled.");
+
+ mapper.map(key, val, collector, reporter);
+ }
- mapper.map(key, val, collector, reporter);
+ taskCtx.onMapperFinished();
+ }
+ finally {
+ mapper.close();
}
}
finally {
- mapper.close();
+ collector.closeWriter();
}
+
+ collector.commit();
}
- finally {
- collector.closeWriter();
- }
+ catch (Exception e) {
+ if (collector != null)
+ collector.abort();
- collector.commit();
+ throw new IgniteCheckedException(e);
+ }
}
- catch (Exception e) {
- if (collector != null)
- collector.abort();
-
- throw new IgniteCheckedException(e);
+ finally {
+ HadoopMapperUtils.clearMapperIndex();
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
index 92c024e..5c1dd15 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
@@ -53,49 +54,63 @@ public class HadoopV1ReduceTask extends HadoopV1Task {
@Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
HadoopJob job = taskCtx.job();
- HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+ HadoopV2TaskContext taskCtx0 = (HadoopV2TaskContext)taskCtx;
- JobConf jobConf = ctx.jobConf();
-
- HadoopTaskInput input = taskCtx.input();
-
- HadoopV1OutputCollector collector = null;
+ if (!reduce && taskCtx.taskInfo().hasMapperIndex())
+ HadoopMapperUtils.mapperIndex(taskCtx.taskInfo().mapperIndex());
+ else
+ HadoopMapperUtils.clearMapperIndex();
try {
- collector = collector(jobConf, ctx, reduce || !job.info().hasReducer(), fileName(), ctx.attemptId());
+ JobConf jobConf = taskCtx0.jobConf();
- Reducer reducer;
- if (reduce) reducer = ReflectionUtils.newInstance(jobConf.getReducerClass(),
- jobConf);
- else reducer = ReflectionUtils.newInstance(jobConf.getCombinerClass(),
- jobConf);
+ HadoopTaskInput input = taskCtx.input();
- assert reducer != null;
+ HadoopV1OutputCollector collector = null;
try {
+ collector = collector(jobConf, taskCtx0, reduce || !job.info().hasReducer(), fileName(), taskCtx0.attemptId());
+
+ Reducer reducer;
+ if (reduce) reducer = ReflectionUtils.newInstance(jobConf.getReducerClass(),
+ jobConf);
+ else reducer = ReflectionUtils.newInstance(jobConf.getCombinerClass(),
+ jobConf);
+
+ assert reducer != null;
+
try {
- while (input.next()) {
- if (isCancelled())
- throw new HadoopTaskCancelledException("Reduce task cancelled.");
+ try {
+ while (input.next()) {
+ if (isCancelled())
+ throw new HadoopTaskCancelledException("Reduce task cancelled.");
+
+ reducer.reduce(input.key(), input.values(), collector, Reporter.NULL);
+ }
- reducer.reduce(input.key(), input.values(), collector, Reporter.NULL);
+ if (!reduce)
+ taskCtx.onMapperFinished();
+ }
+ finally {
+ reducer.close();
}
}
finally {
- reducer.close();
+ collector.closeWriter();
}
+
+ collector.commit();
}
- finally {
- collector.closeWriter();
- }
+ catch (Exception e) {
+ if (collector != null)
+ collector.abort();
- collector.commit();
+ throw new IgniteCheckedException(e);
+ }
}
- catch (Exception e) {
- if (collector != null)
- collector.abort();
-
- throw new IgniteCheckedException(e);
+ finally {
+ if (!reduce)
+ HadoopMapperUtils.clearMapperIndex();
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
index eec0636..1f4e675 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
@@ -154,16 +154,6 @@ public class HadoopV2Context extends JobContextImpl implements MapContext, Reduc
}
}
- /**
- * Callback invoked from mapper thread when map is finished.
- *
- * @throws IgniteCheckedException If failed.
- */
- public void onMapperFinished() throws IgniteCheckedException {
- if (output instanceof HadoopMapperAwareTaskOutput)
- ((HadoopMapperAwareTaskOutput)output).onMapperFinished();
- }
-
/** {@inheritDoc} */
@Override public OutputCommitter getOutputCommitter() {
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java
index eb3b935..1519199 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java
@@ -56,30 +56,32 @@ public class HadoopV2MapTask extends HadoopV2Task {
HadoopMapperUtils.clearMapperIndex();
try {
- InputSplit nativeSplit = hadoopContext().getInputSplit();
+ HadoopV2Context hadoopCtx = hadoopContext();
+
+ InputSplit nativeSplit = hadoopCtx.getInputSplit();
if (nativeSplit == null)
throw new IgniteCheckedException("Input split cannot be null.");
InputFormat inFormat = ReflectionUtils.newInstance(jobCtx.getInputFormatClass(),
- hadoopContext().getConfiguration());
+ hadoopCtx.getConfiguration());
- RecordReader reader = inFormat.createRecordReader(nativeSplit, hadoopContext());
+ RecordReader reader = inFormat.createRecordReader(nativeSplit, hadoopCtx);
- reader.initialize(nativeSplit, hadoopContext());
+ reader.initialize(nativeSplit, hadoopCtx);
- hadoopContext().reader(reader);
+ hadoopCtx.reader(reader);
HadoopJobInfo jobInfo = taskCtx.job().info();
outputFormat = jobInfo.hasCombiner() || jobInfo.hasReducer() ? null : prepareWriter(jobCtx);
- Mapper mapper = ReflectionUtils.newInstance(jobCtx.getMapperClass(), hadoopContext().getConfiguration());
+ Mapper mapper = ReflectionUtils.newInstance(jobCtx.getMapperClass(), hadoopCtx.getConfiguration());
try {
- mapper.run(new WrappedMapper().getMapContext(hadoopContext()));
+ mapper.run(new WrappedMapper().getMapContext(hadoopCtx));
- hadoopContext().onMapperFinished();
+ taskCtx.onMapperFinished();
}
finally {
closeWriter();
http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java
index 930ec1d..09e0634 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
/**
@@ -53,10 +54,17 @@ public class HadoopV2ReduceTask extends HadoopV2Task {
JobContextImpl jobCtx = taskCtx.jobContext();
+ // Set mapper index for combiner tasks
+ if (!reduce && taskCtx.taskInfo().hasMapperIndex())
+ HadoopMapperUtils.mapperIndex(taskCtx.taskInfo().mapperIndex());
+ else
+ HadoopMapperUtils.clearMapperIndex();
+
try {
outputFormat = reduce || !taskCtx.job().info().hasReducer() ? prepareWriter(jobCtx) : null;
Reducer reducer;
+
if (reduce) reducer = ReflectionUtils.newInstance(jobCtx.getReducerClass(),
jobCtx.getConfiguration());
else reducer = ReflectionUtils.newInstance(jobCtx.getCombinerClass(),
@@ -64,6 +72,9 @@ public class HadoopV2ReduceTask extends HadoopV2Task {
try {
reducer.run(new WrappedReducer().getReducerContext(hadoopContext()));
+
+ if (!reduce)
+ taskCtx.onMapperFinished();
}
finally {
closeWriter();
@@ -84,6 +95,9 @@ public class HadoopV2ReduceTask extends HadoopV2Task {
throw new IgniteCheckedException(e);
}
finally {
+ if (!reduce)
+ HadoopMapperUtils.clearMapperIndex();
+
if (err != null)
abort(outputFormat);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
index 5229590..8acc7aa 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
import org.apache.ignite.internal.processors.hadoop.HadoopJob;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapperAwareTaskOutput;
import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
import org.apache.ignite.internal.processors.hadoop.HadoopSplitWrapper;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
index 0394865..3646aa7 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
@@ -182,13 +182,6 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
boolean stripeMappers0 = get(job.info(), SHUFFLE_MAPPER_STRIPED_OUTPUT, false);
if (stripeMappers0) {
- if (job.info().hasCombiner()) {
- log.info("Striped mapper output is disabled because it cannot be used together with combiner [jobId=" +
- job.id() + ']');
-
- stripeMappers0 = false;
- }
-
if (!embedded) {
log.info("Striped mapper output is disabled becuase it cannot be used in external mode [jobId=" +
job.id() + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
index e3a713a..ef2905b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
@@ -48,7 +48,7 @@ public class HadoopDirectDataInput extends InputStream implements DataInput {
/** {@inheritDoc} */
@Override public int read() throws IOException {
- return readByte();
+ return (int)readByte() & 0xFF;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
index a57efe6..339bf5b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
@@ -122,7 +122,7 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
/**
* Implements actual task running.
- * @throws IgniteCheckedException
+ * @throws IgniteCheckedException On error.
*/
void call0() throws IgniteCheckedException {
execStartTs = U.currentTimeMillis();
@@ -144,7 +144,15 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
runTask(perfCntr);
if (info.type() == MAP && job.info().hasCombiner()) {
- ctx.taskInfo(new HadoopTaskInfo(COMBINE, info.jobId(), info.taskNumber(), info.attempt(), null));
+ // Switch to combiner.
+ HadoopTaskInfo combineTaskInfo = new HadoopTaskInfo(COMBINE, info.jobId(), info.taskNumber(),
+ info.attempt(), null);
+
+ // Mapper and combiner share the same index.
+ if (ctx.taskInfo().hasMapperIndex())
+ combineTaskInfo.mapperIndex(ctx.taskInfo().mapperIndex());
+
+ ctx.taskInfo(combineTaskInfo);
try {
runTask(perfCntr);
http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
index 89005f6..cd997a4 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
@@ -172,6 +172,8 @@ public class HadoopAbstractMapReduceTest extends HadoopAbstractWordCountTest {
*/
protected final void doTest(IgfsPath inFile, boolean useNewMapper, boolean useNewCombiner, boolean useNewReducer)
throws Exception {
+ log.info("useNewMapper=" + useNewMapper + ", useNewCombiner=" + useNewCombiner + ", useNewReducer=" + useNewReducer);
+
igfs.delete(new IgfsPath(PATH_OUTPUT), true);
JobConf jobConf = new JobConf();
http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
index 8897a38..bce67f6 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
@@ -55,14 +55,14 @@ public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest {
return cfg;
}
- /*
+ /**
* @throws Exception If fails.
*/
public void testMultiReducerWholeMapReduceExecution() throws Exception {
checkMultiReducerWholeMapReduceExecution(false);
}
- /*
+ /**
* @throws Exception If fails.
*/
public void testMultiReducerWholeMapReduceExecutionStriped() throws Exception {
@@ -100,6 +100,8 @@ public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest {
if (striped)
jobConf.set(HadoopJobProperty.SHUFFLE_MAPPER_STRIPED_OUTPUT.propertyName(), "true");
+ else
+ jobConf.set(HadoopJobProperty.SHUFFLE_MAPPER_STRIPED_OUTPUT.propertyName(), "false");
jobConf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName());