You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/01/25 10:16:15 UTC
ignite git commit: IGNITE-2343: Hadoop: InputSplit is now singleton,
it resolved the problem. This closes #399.
Repository: ignite
Updated Branches:
refs/heads/master 5f4a1130a -> 1f5b2021c
IGNITE-2343: Hadoop: InputSplit is now singleton, it resolved the problem. This closes #399.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1f5b2021
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1f5b2021
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1f5b2021
Branch: refs/heads/master
Commit: 1f5b2021c5218a3b448a9445445d81c2363911a4
Parents: 5f4a113
Author: iveselovskiy <iv...@gridgain.com>
Authored: Mon Jan 25 12:15:56 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jan 25 12:15:56 2016 +0300
----------------------------------------------------------------------
.../processors/hadoop/v2/HadoopV2Context.java | 2 ++
.../processors/hadoop/v2/HadoopV2MapTask.java | 23 +++++---------------
2 files changed, 7 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f5b2021/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java
index 5108e2d..2ff2945 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java
@@ -101,11 +101,13 @@ public class HadoopV2Context extends JobContextImpl implements MapContext, Reduc
inputSplit = new FileSplit(new Path(fileBlock.file()), fileBlock.start(), fileBlock.length(), null);
}
else
+ {
try {
inputSplit = (InputSplit) ((HadoopV2TaskContext)ctx).getNativeSplit(split);
} catch (IgniteCheckedException e) {
throw new IllegalStateException(e);
}
+ }
}
return inputSplit;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f5b2021/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java
index 989260c..fafa79b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java
@@ -17,20 +17,16 @@
package org.apache.ignite.internal.processors.hadoop.v2;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobContextImpl;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
@@ -48,26 +44,17 @@ public class HadoopV2MapTask extends HadoopV2Task {
/** {@inheritDoc} */
@SuppressWarnings({"ConstantConditions", "unchecked"})
@Override public void run0(HadoopV2TaskContext taskCtx) throws IgniteCheckedException {
- HadoopInputSplit split = info().inputSplit();
-
- InputSplit nativeSplit;
-
- if (split instanceof HadoopFileBlock) {
- HadoopFileBlock block = (HadoopFileBlock)split;
-
- nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), null);
- }
- else
- nativeSplit = (InputSplit)taskCtx.getNativeSplit(split);
-
- assert nativeSplit != null;
-
OutputFormat outputFormat = null;
Exception err = null;
JobContextImpl jobCtx = taskCtx.jobContext();
try {
+ InputSplit nativeSplit = hadoopContext().getInputSplit();
+
+ if (nativeSplit == null)
+ throw new IgniteCheckedException("Input split cannot be null.");
+
InputFormat inFormat = ReflectionUtils.newInstance(jobCtx.getInputFormatClass(),
hadoopContext().getConfiguration());