You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2017/01/23 10:12:46 UTC
[1/4] ignite git commit: IGNITE-4507: Hadoop: added direct output
support for combiner. This closes #1434.
Repository: ignite
Updated Branches:
refs/heads/ignite-2.0 38cb67d45 -> 82857d0cb
IGNITE-4507: Hadoop: added direct output support for combiner. This closes #1434.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d596f02b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d596f02b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d596f02b
Branch: refs/heads/ignite-2.0
Commit: d596f02b1c64c789f91dea57d349510101d3e201
Parents: d6d42c2
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Fri Jan 20 17:33:34 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Jan 20 17:33:34 2017 +0300
----------------------------------------------------------------------
.../processors/hadoop/HadoopTaskContext.java | 10 +++
.../hadoop/impl/v1/HadoopV1MapTask.java | 89 +++++++++++---------
.../hadoop/impl/v1/HadoopV1ReduceTask.java | 69 +++++++++------
.../hadoop/impl/v2/HadoopV2Context.java | 10 ---
.../hadoop/impl/v2/HadoopV2MapTask.java | 18 ++--
.../hadoop/impl/v2/HadoopV2ReduceTask.java | 14 +++
.../hadoop/impl/v2/HadoopV2TaskContext.java | 1 +
.../hadoop/shuffle/HadoopShuffleJob.java | 7 --
.../shuffle/direct/HadoopDirectDataInput.java | 2 +-
.../hadoop/taskexecutor/HadoopRunnableTask.java | 12 ++-
.../impl/HadoopAbstractMapReduceTest.java | 2 +
.../impl/HadoopMapReduceEmbeddedSelfTest.java | 6 +-
12 files changed, 145 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
index dddd017..d6e9394 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
@@ -207,4 +207,14 @@ public abstract class HadoopTaskContext {
* @throws IgniteCheckedException On any error in callable.
*/
public abstract <T> T runAsJobOwner(Callable<T> c) throws IgniteCheckedException;
+
+ /**
+ * Callback invoked from mapper thread when map is finished.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ public void onMapperFinished() throws IgniteCheckedException {
+ if (output instanceof HadoopMapperAwareTaskOutput)
+ ((HadoopMapperAwareTaskOutput)output).onMapperFinished();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
index 65ff280..2aa4292 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
@@ -30,6 +30,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
@@ -45,7 +46,7 @@ public class HadoopV1MapTask extends HadoopV1Task {
/**
* Constructor.
*
- * @param taskInfo
+ * @param taskInfo Taks info.
*/
public HadoopV1MapTask(HadoopTaskInfo taskInfo) {
super(taskInfo);
@@ -56,67 +57,79 @@ public class HadoopV1MapTask extends HadoopV1Task {
@Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
HadoopJob job = taskCtx.job();
- HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+ HadoopV2TaskContext taskCtx0 = (HadoopV2TaskContext)taskCtx;
- JobConf jobConf = ctx.jobConf();
+ if (taskCtx.taskInfo().hasMapperIndex())
+ HadoopMapperUtils.mapperIndex(taskCtx.taskInfo().mapperIndex());
+ else
+ HadoopMapperUtils.clearMapperIndex();
- InputFormat inFormat = jobConf.getInputFormat();
+ try {
+ JobConf jobConf = taskCtx0.jobConf();
- HadoopInputSplit split = info().inputSplit();
+ InputFormat inFormat = jobConf.getInputFormat();
- InputSplit nativeSplit;
+ HadoopInputSplit split = info().inputSplit();
- if (split instanceof HadoopFileBlock) {
- HadoopFileBlock block = (HadoopFileBlock)split;
+ InputSplit nativeSplit;
- nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), EMPTY_HOSTS);
- }
- else
- nativeSplit = (InputSplit)ctx.getNativeSplit(split);
+ if (split instanceof HadoopFileBlock) {
+ HadoopFileBlock block = (HadoopFileBlock)split;
- assert nativeSplit != null;
+ nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), EMPTY_HOSTS);
+ }
+ else
+ nativeSplit = (InputSplit)taskCtx0.getNativeSplit(split);
- Reporter reporter = new HadoopV1Reporter(taskCtx);
+ assert nativeSplit != null;
- HadoopV1OutputCollector collector = null;
+ Reporter reporter = new HadoopV1Reporter(taskCtx);
- try {
- collector = collector(jobConf, ctx, !job.info().hasCombiner() && !job.info().hasReducer(),
- fileName(), ctx.attemptId());
+ HadoopV1OutputCollector collector = null;
- RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter);
+ try {
+ collector = collector(jobConf, taskCtx0, !job.info().hasCombiner() && !job.info().hasReducer(),
+ fileName(), taskCtx0.attemptId());
- Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf);
+ RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter);
- Object key = reader.createKey();
- Object val = reader.createValue();
+ Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf);
- assert mapper != null;
+ Object key = reader.createKey();
+ Object val = reader.createValue();
+
+ assert mapper != null;
- try {
try {
- while (reader.next(key, val)) {
- if (isCancelled())
- throw new HadoopTaskCancelledException("Map task cancelled.");
+ try {
+ while (reader.next(key, val)) {
+ if (isCancelled())
+ throw new HadoopTaskCancelledException("Map task cancelled.");
+
+ mapper.map(key, val, collector, reporter);
+ }
- mapper.map(key, val, collector, reporter);
+ taskCtx.onMapperFinished();
+ }
+ finally {
+ mapper.close();
}
}
finally {
- mapper.close();
+ collector.closeWriter();
}
+
+ collector.commit();
}
- finally {
- collector.closeWriter();
- }
+ catch (Exception e) {
+ if (collector != null)
+ collector.abort();
- collector.commit();
+ throw new IgniteCheckedException(e);
+ }
}
- catch (Exception e) {
- if (collector != null)
- collector.abort();
-
- throw new IgniteCheckedException(e);
+ finally {
+ HadoopMapperUtils.clearMapperIndex();
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
index 92c024e..5c1dd15 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
@@ -53,49 +54,63 @@ public class HadoopV1ReduceTask extends HadoopV1Task {
@Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
HadoopJob job = taskCtx.job();
- HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+ HadoopV2TaskContext taskCtx0 = (HadoopV2TaskContext)taskCtx;
- JobConf jobConf = ctx.jobConf();
-
- HadoopTaskInput input = taskCtx.input();
-
- HadoopV1OutputCollector collector = null;
+ if (!reduce && taskCtx.taskInfo().hasMapperIndex())
+ HadoopMapperUtils.mapperIndex(taskCtx.taskInfo().mapperIndex());
+ else
+ HadoopMapperUtils.clearMapperIndex();
try {
- collector = collector(jobConf, ctx, reduce || !job.info().hasReducer(), fileName(), ctx.attemptId());
+ JobConf jobConf = taskCtx0.jobConf();
- Reducer reducer;
- if (reduce) reducer = ReflectionUtils.newInstance(jobConf.getReducerClass(),
- jobConf);
- else reducer = ReflectionUtils.newInstance(jobConf.getCombinerClass(),
- jobConf);
+ HadoopTaskInput input = taskCtx.input();
- assert reducer != null;
+ HadoopV1OutputCollector collector = null;
try {
+ collector = collector(jobConf, taskCtx0, reduce || !job.info().hasReducer(), fileName(), taskCtx0.attemptId());
+
+ Reducer reducer;
+ if (reduce) reducer = ReflectionUtils.newInstance(jobConf.getReducerClass(),
+ jobConf);
+ else reducer = ReflectionUtils.newInstance(jobConf.getCombinerClass(),
+ jobConf);
+
+ assert reducer != null;
+
try {
- while (input.next()) {
- if (isCancelled())
- throw new HadoopTaskCancelledException("Reduce task cancelled.");
+ try {
+ while (input.next()) {
+ if (isCancelled())
+ throw new HadoopTaskCancelledException("Reduce task cancelled.");
+
+ reducer.reduce(input.key(), input.values(), collector, Reporter.NULL);
+ }
- reducer.reduce(input.key(), input.values(), collector, Reporter.NULL);
+ if (!reduce)
+ taskCtx.onMapperFinished();
+ }
+ finally {
+ reducer.close();
}
}
finally {
- reducer.close();
+ collector.closeWriter();
}
+
+ collector.commit();
}
- finally {
- collector.closeWriter();
- }
+ catch (Exception e) {
+ if (collector != null)
+ collector.abort();
- collector.commit();
+ throw new IgniteCheckedException(e);
+ }
}
- catch (Exception e) {
- if (collector != null)
- collector.abort();
-
- throw new IgniteCheckedException(e);
+ finally {
+ if (!reduce)
+ HadoopMapperUtils.clearMapperIndex();
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
index eec0636..1f4e675 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
@@ -154,16 +154,6 @@ public class HadoopV2Context extends JobContextImpl implements MapContext, Reduc
}
}
- /**
- * Callback invoked from mapper thread when map is finished.
- *
- * @throws IgniteCheckedException If failed.
- */
- public void onMapperFinished() throws IgniteCheckedException {
- if (output instanceof HadoopMapperAwareTaskOutput)
- ((HadoopMapperAwareTaskOutput)output).onMapperFinished();
- }
-
/** {@inheritDoc} */
@Override public OutputCommitter getOutputCommitter() {
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java
index eb3b935..1519199 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java
@@ -56,30 +56,32 @@ public class HadoopV2MapTask extends HadoopV2Task {
HadoopMapperUtils.clearMapperIndex();
try {
- InputSplit nativeSplit = hadoopContext().getInputSplit();
+ HadoopV2Context hadoopCtx = hadoopContext();
+
+ InputSplit nativeSplit = hadoopCtx.getInputSplit();
if (nativeSplit == null)
throw new IgniteCheckedException("Input split cannot be null.");
InputFormat inFormat = ReflectionUtils.newInstance(jobCtx.getInputFormatClass(),
- hadoopContext().getConfiguration());
+ hadoopCtx.getConfiguration());
- RecordReader reader = inFormat.createRecordReader(nativeSplit, hadoopContext());
+ RecordReader reader = inFormat.createRecordReader(nativeSplit, hadoopCtx);
- reader.initialize(nativeSplit, hadoopContext());
+ reader.initialize(nativeSplit, hadoopCtx);
- hadoopContext().reader(reader);
+ hadoopCtx.reader(reader);
HadoopJobInfo jobInfo = taskCtx.job().info();
outputFormat = jobInfo.hasCombiner() || jobInfo.hasReducer() ? null : prepareWriter(jobCtx);
- Mapper mapper = ReflectionUtils.newInstance(jobCtx.getMapperClass(), hadoopContext().getConfiguration());
+ Mapper mapper = ReflectionUtils.newInstance(jobCtx.getMapperClass(), hadoopCtx.getConfiguration());
try {
- mapper.run(new WrappedMapper().getMapContext(hadoopContext()));
+ mapper.run(new WrappedMapper().getMapContext(hadoopCtx));
- hadoopContext().onMapperFinished();
+ taskCtx.onMapperFinished();
}
finally {
closeWriter();
http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java
index 930ec1d..09e0634 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
/**
@@ -53,10 +54,17 @@ public class HadoopV2ReduceTask extends HadoopV2Task {
JobContextImpl jobCtx = taskCtx.jobContext();
+ // Set mapper index for combiner tasks
+ if (!reduce && taskCtx.taskInfo().hasMapperIndex())
+ HadoopMapperUtils.mapperIndex(taskCtx.taskInfo().mapperIndex());
+ else
+ HadoopMapperUtils.clearMapperIndex();
+
try {
outputFormat = reduce || !taskCtx.job().info().hasReducer() ? prepareWriter(jobCtx) : null;
Reducer reducer;
+
if (reduce) reducer = ReflectionUtils.newInstance(jobCtx.getReducerClass(),
jobCtx.getConfiguration());
else reducer = ReflectionUtils.newInstance(jobCtx.getCombinerClass(),
@@ -64,6 +72,9 @@ public class HadoopV2ReduceTask extends HadoopV2Task {
try {
reducer.run(new WrappedReducer().getReducerContext(hadoopContext()));
+
+ if (!reduce)
+ taskCtx.onMapperFinished();
}
finally {
closeWriter();
@@ -84,6 +95,9 @@ public class HadoopV2ReduceTask extends HadoopV2Task {
throw new IgniteCheckedException(e);
}
finally {
+ if (!reduce)
+ HadoopMapperUtils.clearMapperIndex();
+
if (err != null)
abort(outputFormat);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
index 5229590..8acc7aa 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
import org.apache.ignite.internal.processors.hadoop.HadoopJob;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapperAwareTaskOutput;
import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
import org.apache.ignite.internal.processors.hadoop.HadoopSplitWrapper;
http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
index 0394865..3646aa7 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
@@ -182,13 +182,6 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
boolean stripeMappers0 = get(job.info(), SHUFFLE_MAPPER_STRIPED_OUTPUT, false);
if (stripeMappers0) {
- if (job.info().hasCombiner()) {
- log.info("Striped mapper output is disabled because it cannot be used together with combiner [jobId=" +
- job.id() + ']');
-
- stripeMappers0 = false;
- }
-
if (!embedded) {
log.info("Striped mapper output is disabled becuase it cannot be used in external mode [jobId=" +
job.id() + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
index e3a713a..ef2905b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
@@ -48,7 +48,7 @@ public class HadoopDirectDataInput extends InputStream implements DataInput {
/** {@inheritDoc} */
@Override public int read() throws IOException {
- return readByte();
+ return (int)readByte() & 0xFF;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
index a57efe6..339bf5b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
@@ -122,7 +122,7 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
/**
* Implements actual task running.
- * @throws IgniteCheckedException
+ * @throws IgniteCheckedException On error.
*/
void call0() throws IgniteCheckedException {
execStartTs = U.currentTimeMillis();
@@ -144,7 +144,15 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
runTask(perfCntr);
if (info.type() == MAP && job.info().hasCombiner()) {
- ctx.taskInfo(new HadoopTaskInfo(COMBINE, info.jobId(), info.taskNumber(), info.attempt(), null));
+ // Switch to combiner.
+ HadoopTaskInfo combineTaskInfo = new HadoopTaskInfo(COMBINE, info.jobId(), info.taskNumber(),
+ info.attempt(), null);
+
+ // Mapper and combiner share the same index.
+ if (ctx.taskInfo().hasMapperIndex())
+ combineTaskInfo.mapperIndex(ctx.taskInfo().mapperIndex());
+
+ ctx.taskInfo(combineTaskInfo);
try {
runTask(perfCntr);
http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
index 89005f6..cd997a4 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
@@ -172,6 +172,8 @@ public class HadoopAbstractMapReduceTest extends HadoopAbstractWordCountTest {
*/
protected final void doTest(IgfsPath inFile, boolean useNewMapper, boolean useNewCombiner, boolean useNewReducer)
throws Exception {
+ log.info("useNewMapper=" + useNewMapper + ", useNewCombiner=" + useNewCombiner + ", useNewReducer=" + useNewReducer);
+
igfs.delete(new IgfsPath(PATH_OUTPUT), true);
JobConf jobConf = new JobConf();
http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
index 8897a38..bce67f6 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
@@ -55,14 +55,14 @@ public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest {
return cfg;
}
- /*
+ /**
* @throws Exception If fails.
*/
public void testMultiReducerWholeMapReduceExecution() throws Exception {
checkMultiReducerWholeMapReduceExecution(false);
}
- /*
+ /**
* @throws Exception If fails.
*/
public void testMultiReducerWholeMapReduceExecutionStriped() throws Exception {
@@ -100,6 +100,8 @@ public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest {
if (striped)
jobConf.set(HadoopJobProperty.SHUFFLE_MAPPER_STRIPED_OUTPUT.propertyName(), "true");
+ else
+ jobConf.set(HadoopJobProperty.SHUFFLE_MAPPER_STRIPED_OUTPUT.propertyName(), "false");
jobConf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName());
[2/4] ignite git commit: Merge remote-tracking branch 'origin/master'
Posted by pt...@apache.org.
Merge remote-tracking branch 'origin/master'
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b7997fca
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b7997fca
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b7997fca
Branch: refs/heads/ignite-2.0
Commit: b7997fcaaaf038149fc03344692055097d2ac872
Parents: d596f02 f41eb8d
Author: devozerov <vo...@gridgain.com>
Authored: Fri Jan 20 17:33:54 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Jan 20 17:33:54 2017 +0300
----------------------------------------------------------------------
.../tcp/ipfinder/s3/TcpDiscoveryS3IpFinder.java | 37 ++-
.../TcpDiscoveryS3IpFinderAbstractSelfTest.java | 84 +++++
...3IpFinderAwsCredentialsProviderSelfTest.java | 46 +++
...scoveryS3IpFinderAwsCredentialsSelfTest.java | 45 +++
.../s3/TcpDiscoveryS3IpFinderSelfTest.java | 79 -----
.../ignite/testsuites/IgniteS3TestSuite.java | 26 +-
.../store/jdbc/CacheAbstractJdbcStore.java | 12 +-
.../store/jdbc/JdbcTypesDefaultTransformer.java | 19 ++
.../cache/store/jdbc/JdbcTypesTransformer.java | 17 +
.../cache/query/GridCacheQueryAdapter.java | 8 +
.../internal/processors/odbc/IgniteTypes.java | 69 ++++
.../internal/processors/odbc/OdbcTypes.java | 131 ++++++++
.../internal/processors/odbc/OdbcUtils.java | 85 +++++
.../processors/odbc/escape/OdbcEscapeUtils.java | 52 ++-
.../platform/cache/PlatformCache.java | 11 +-
.../utils/PlatformConfigurationUtils.java | 128 ++++++-
.../CacheJdbcPojoStoreAbstractSelfTest.java | 23 +-
.../store/jdbc/CacheJdbcPojoStoreTest.java | 3 +
...eJdbcStoreAbstractMultithreadedSelfTest.java | 17 +-
.../ignite/cache/store/jdbc/model/Gender.java | 41 +++
.../ignite/cache/store/jdbc/model/Person.java | 31 +-
.../IgniteCacheQueryCacheDestroySelfTest.java | 142 ++++++++
.../odbc/OdbcEscapeSequenceSelfTest.java | 131 ++++++++
.../IgniteCacheQuerySelfTestSuite.java | 2 +
.../cpp/binary/src/impl/binary/binary_utils.cpp | 6 +-
.../cpp/common/include/ignite/common/utils.h | 8 +
.../cpp/common/os/linux/src/common/utils.cpp | 22 +-
.../cpp/common/os/win/src/common/utils.cpp | 14 +-
modules/platforms/cpp/odbc-test/Makefile.am | 1 +
.../odbc-test/include/sql_test_suite_fixture.h | 13 +
.../cpp/odbc-test/project/vs/odbc-test.vcxproj | 1 +
.../project/vs/odbc-test.vcxproj.filters | 3 +
.../cpp/odbc-test/src/api_robustness_test.cpp | 2 +-
.../src/sql_aggregate_functions_test.cpp | 4 +-
.../src/sql_esc_convert_function_test.cpp | 160 +++++++++
.../odbc-test/src/sql_test_suite_fixture.cpp | 52 ++-
.../cpp/odbc-test/src/sql_types_test.cpp | 131 +++++++-
.../odbc/src/app/application_data_buffer.cpp | 58 +++-
.../platforms/cpp/odbc/src/app/parameter.cpp | 4 +-
.../cpp/odbc/src/config/connection_info.cpp | 260 ++++++++++++++-
.../Apache.Ignite.Core.Tests.csproj | 4 +
.../Binary/BinaryBuilderSelfTest.cs | 159 ++++++---
.../BinaryBuilderSelfTestArrayIdentity.cs | 34 ++
.../Binary/BinaryEqualityComparerTest.cs | 279 ++++++++++++++++
.../Binary/IO/BinaryStreamsTest.cs | 19 ++
.../Cache/CacheConfigurationTest.cs | 5 +-
.../Cache/Query/CacheDmlQueriesTest.cs | 296 +++++++++++++++++
.../Cache/Store/CacheParallelLoadStoreTest.cs | 9 +-
.../Cache/Store/CacheStoreSessionTest.cs | 22 +-
.../Cache/Store/CacheStoreTest.cs | 333 ++++++++++++-------
.../Cache/Store/CacheTestStore.cs | 14 +
.../Cache/Store/NamedNodeCacheStoreTest.cs | 34 ++
.../IgniteConfigurationSerializerTest.cs | 46 ++-
.../IgniteConfigurationTest.cs | 28 ++
.../Apache.Ignite.Core.csproj | 5 +
.../Binary/BinaryArrayEqualityComparer.cs | 149 +++++++++
.../Binary/BinaryConfiguration.cs | 24 ++
.../Binary/BinaryTypeConfiguration.cs | 14 +
.../Cache/Configuration/QueryEntity.cs | 33 +-
.../Cache/Configuration/QueryField.cs | 6 +
.../Apache.Ignite.Core/IgniteConfiguration.cs | 85 ++++-
.../IgniteConfigurationSection.xsd | 19 ++
.../Apache.Ignite.Core/Impl/Binary/Binary.cs | 28 +-
.../Binary/BinaryEqualityComparerSerializer.cs | 99 ++++++
.../Impl/Binary/BinaryFieldEqualityComparer.cs | 138 ++++++++
.../Impl/Binary/BinaryFullTypeDescriptor.cs | 21 +-
.../Impl/Binary/BinaryObject.cs | 31 +-
.../Impl/Binary/BinaryObjectBuilder.cs | 62 +++-
.../Impl/Binary/BinaryObjectHeader.cs | 21 +-
.../Impl/Binary/BinaryObjectSchemaHolder.cs | 22 ++
.../Binary/BinarySurrogateTypeDescriptor.cs | 6 +
.../Impl/Binary/BinarySystemHandlers.cs | 6 +-
.../Impl/Binary/BinaryWriter.cs | 11 +-
.../Impl/Binary/DateTimeHolder.cs | 35 +-
.../Impl/Binary/IBinaryEqualityComparer.cs | 53 +++
.../Impl/Binary/IBinaryTypeDescriptor.cs | 5 +
.../Impl/Binary/Io/BinaryHeapStream.cs | 9 +
.../Impl/Binary/Io/BinaryStreamBase.cs | 13 +
.../Impl/Binary/Io/IBinaryStream.cs | 11 +-
.../Impl/Binary/Io/IBinaryStreamProcessor.cs | 36 ++
.../Impl/Binary/Marshaller.cs | 22 +-
.../Impl/Binary/SerializableObjectHolder.cs | 16 +
.../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs | 14 +-
.../Common/IgniteConfigurationXmlSerializer.cs | 5 +-
.../Impl/Memory/PlatformMemoryStream.cs | 16 +
.../Apache.Ignite.Examples.csproj | 1 +
.../Datagrid/QueryDmlExample.cs | 162 +++++++++
.../src/test/config/jdbc-pojo-store-builtin.xml | 8 +
.../src/test/config/jdbc-pojo-store-obj.xml | 8 +
89 files changed, 4009 insertions(+), 445 deletions(-)
----------------------------------------------------------------------
[4/4] ignite git commit: Merge branch 'master' into ignite-2.0
Posted by pt...@apache.org.
Merge branch 'master' into ignite-2.0
# Conflicts:
# modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
# modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/82857d0c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/82857d0c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/82857d0c
Branch: refs/heads/ignite-2.0
Commit: 82857d0cb6e2984a5359b822a9c903874414aa67
Parents: 38cb67d d5b2748
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Mon Jan 23 13:12:12 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Mon Jan 23 13:12:12 2017 +0300
----------------------------------------------------------------------
.../processors/hadoop/HadoopTaskContext.java | 10 ++
.../hadoop/impl/v1/HadoopV1MapTask.java | 89 +++++++------
.../hadoop/impl/v1/HadoopV1ReduceTask.java | 69 ++++++----
.../hadoop/impl/v2/HadoopV2Context.java | 10 --
.../hadoop/impl/v2/HadoopV2MapTask.java | 18 +--
.../hadoop/impl/v2/HadoopV2ReduceTask.java | 14 ++
.../hadoop/impl/v2/HadoopV2TaskContext.java | 1 +
.../hadoop/shuffle/HadoopShuffleJob.java | 7 -
.../shuffle/direct/HadoopDirectDataInput.java | 2 +-
.../hadoop/taskexecutor/HadoopRunnableTask.java | 12 +-
.../impl/HadoopAbstractMapReduceTest.java | 2 +
.../impl/HadoopMapReduceEmbeddedSelfTest.java | 6 +-
.../Impl/Binary/BinaryReader.cs | 131 ++++++++++---------
13 files changed, 213 insertions(+), 158 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/82857d0c/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/82857d0c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
----------------------------------------------------------------------
diff --cc modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
index cde6da6,2aa4292..2172ff2
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
@@@ -28,8 -28,9 +28,9 @@@ import org.apache.hadoop.mapred.Reporte
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+ import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils;
+import org.apache.ignite.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobEx;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
@@@ -54,52 -55,64 +55,64 @@@ public class HadoopV1MapTask extends Ha
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
- HadoopJob job = taskCtx.job();
+ HadoopJobEx job = taskCtx.job();
- HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+ HadoopV2TaskContext taskCtx0 = (HadoopV2TaskContext)taskCtx;
- JobConf jobConf = ctx.jobConf();
+ if (taskCtx.taskInfo().hasMapperIndex())
+ HadoopMapperUtils.mapperIndex(taskCtx.taskInfo().mapperIndex());
+ else
+ HadoopMapperUtils.clearMapperIndex();
- InputFormat inFormat = jobConf.getInputFormat();
+ try {
+ JobConf jobConf = taskCtx0.jobConf();
- HadoopInputSplit split = info().inputSplit();
+ InputFormat inFormat = jobConf.getInputFormat();
- InputSplit nativeSplit;
+ HadoopInputSplit split = info().inputSplit();
- if (split instanceof HadoopFileBlock) {
- HadoopFileBlock block = (HadoopFileBlock)split;
+ InputSplit nativeSplit;
- nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), EMPTY_HOSTS);
- }
- else
- nativeSplit = (InputSplit)ctx.getNativeSplit(split);
+ if (split instanceof HadoopFileBlock) {
+ HadoopFileBlock block = (HadoopFileBlock)split;
- assert nativeSplit != null;
+ nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), EMPTY_HOSTS);
+ }
+ else
+ nativeSplit = (InputSplit)taskCtx0.getNativeSplit(split);
- Reporter reporter = new HadoopV1Reporter(taskCtx);
+ assert nativeSplit != null;
- HadoopV1OutputCollector collector = null;
+ Reporter reporter = new HadoopV1Reporter(taskCtx);
- try {
- collector = collector(jobConf, ctx, !job.info().hasCombiner() && !job.info().hasReducer(),
- fileName(), ctx.attemptId());
+ HadoopV1OutputCollector collector = null;
- RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter);
+ try {
+ collector = collector(jobConf, taskCtx0, !job.info().hasCombiner() && !job.info().hasReducer(),
+ fileName(), taskCtx0.attemptId());
- Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf);
+ RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter);
- Object key = reader.createKey();
- Object val = reader.createValue();
+ Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf);
- assert mapper != null;
+ Object key = reader.createKey();
+ Object val = reader.createValue();
+
+ assert mapper != null;
- try {
try {
- while (reader.next(key, val)) {
- if (isCancelled())
- throw new HadoopTaskCancelledException("Map task cancelled.");
+ try {
+ while (reader.next(key, val)) {
+ if (isCancelled())
+ throw new HadoopTaskCancelledException("Map task cancelled.");
+
+ mapper.map(key, val, collector, reporter);
+ }
- mapper.map(key, val, collector, reporter);
+ taskCtx.onMapperFinished();
+ }
+ finally {
+ mapper.close();
}
}
finally {
http://git-wip-us.apache.org/repos/asf/ignite/blob/82857d0c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
----------------------------------------------------------------------
diff --cc modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
index 6b90653,5c1dd15..3ad583f
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
@@@ -22,7 -22,8 +22,8 @@@ import org.apache.hadoop.mapred.Reducer
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+ import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobEx;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
@@@ -51,34 -52,47 +52,47 @@@ public class HadoopV1ReduceTask extend
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
- HadoopJob job = taskCtx.job();
+ HadoopJobEx job = taskCtx.job();
- HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+ HadoopV2TaskContext taskCtx0 = (HadoopV2TaskContext)taskCtx;
- JobConf jobConf = ctx.jobConf();
-
- HadoopTaskInput input = taskCtx.input();
-
- HadoopV1OutputCollector collector = null;
+ if (!reduce && taskCtx.taskInfo().hasMapperIndex())
+ HadoopMapperUtils.mapperIndex(taskCtx.taskInfo().mapperIndex());
+ else
+ HadoopMapperUtils.clearMapperIndex();
try {
- collector = collector(jobConf, ctx, reduce || !job.info().hasReducer(), fileName(), ctx.attemptId());
+ JobConf jobConf = taskCtx0.jobConf();
- Reducer reducer;
- if (reduce) reducer = ReflectionUtils.newInstance(jobConf.getReducerClass(),
- jobConf);
- else reducer = ReflectionUtils.newInstance(jobConf.getCombinerClass(),
- jobConf);
+ HadoopTaskInput input = taskCtx.input();
- assert reducer != null;
+ HadoopV1OutputCollector collector = null;
try {
+ collector = collector(jobConf, taskCtx0, reduce || !job.info().hasReducer(), fileName(), taskCtx0.attemptId());
+
+ Reducer reducer;
+ if (reduce) reducer = ReflectionUtils.newInstance(jobConf.getReducerClass(),
+ jobConf);
+ else reducer = ReflectionUtils.newInstance(jobConf.getCombinerClass(),
+ jobConf);
+
+ assert reducer != null;
+
try {
- while (input.next()) {
- if (isCancelled())
- throw new HadoopTaskCancelledException("Reduce task cancelled.");
+ try {
+ while (input.next()) {
+ if (isCancelled())
+ throw new HadoopTaskCancelledException("Reduce task cancelled.");
+
+ reducer.reduce(input.key(), input.values(), collector, Reporter.NULL);
+ }
- reducer.reduce(input.key(), input.values(), collector, Reporter.NULL);
+ if (!reduce)
+ taskCtx.onMapperFinished();
+ }
+ finally {
+ reducer.close();
}
}
finally {
http://git-wip-us.apache.org/repos/asf/ignite/blob/82857d0c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/82857d0c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --cc modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
index c698ee3,8acc7aa..a9c0bb7
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
@@@ -44,10 -44,11 +44,11 @@@ import org.apache.ignite.hadoop.io.Part
import org.apache.ignite.hadoop.io.TextPartiallyRawComparator;
import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
import org.apache.ignite.internal.processors.hadoop.HadoopExternalSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobEx;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty;
+ import org.apache.ignite.internal.processors.hadoop.HadoopMapperAwareTaskOutput;
import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
import org.apache.ignite.internal.processors.hadoop.HadoopSplitWrapper;
http://git-wip-us.apache.org/repos/asf/ignite/blob/82857d0c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --cc modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
index 25925fc,3646aa7..c3e7018
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
@@@ -179,16 -179,9 +179,9 @@@ public class HadoopShuffleJob<T> implem
this.embedded = embedded;
// No stripes for combiner.
- boolean stripeMappers0 = get(job.info(), SHUFFLE_MAPPER_STRIPED_OUTPUT, false);
+ boolean stripeMappers0 = get(job.info(), SHUFFLE_MAPPER_STRIPED_OUTPUT, true);
if (stripeMappers0) {
- if (job.info().hasCombiner()) {
- log.info("Striped mapper output is disabled because it cannot be used together with combiner [jobId=" +
- job.id() + ']');
-
- stripeMappers0 = false;
- }
-
if (!embedded) {
log.info("Striped mapper output is disabled becuase it cannot be used in external mode [jobId=" +
job.id() + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/82857d0c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/82857d0c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/82857d0c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
----------------------------------------------------------------------
[3/4] ignite git commit: IGNITE-4588 .NET: Simplify BinaryReader
frame handling
Posted by pt...@apache.org.
IGNITE-4588 .NET: Simplify BinaryReader frame handling
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d5b27488
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d5b27488
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d5b27488
Branch: refs/heads/ignite-2.0
Commit: d5b274883b2047069457af60ad313ad9096f5641
Parents: b7997fc
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Mon Jan 23 13:08:15 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Mon Jan 23 13:08:15 2017 +0300
----------------------------------------------------------------------
.../Impl/Binary/BinaryReader.cs | 131 ++++++++++---------
1 file changed, 68 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d5b27488/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReader.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReader.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReader.cs
index d9facc3..70417f7 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReader.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReader.cs
@@ -41,29 +41,14 @@ namespace Apache.Ignite.Core.Impl.Binary
/** Handles. */
private BinaryReaderHandleDictionary _hnds;
- /** Current position. */
- private int _curPos;
-
- /** Current raw flag. */
- private bool _curRaw;
-
/** Detach flag. */
private bool _detach;
/** Binary read mode. */
private BinaryMode _mode;
- /** Current type structure tracker. */
- private BinaryStructureTracker _curStruct;
-
- /** Current schema. */
- private int[] _curSchema;
-
- /** Current schema with positions. */
- private Dictionary<int, int> _curSchemaMap;
-
- /** Current header. */
- private BinaryObjectHeader _curHdr;
+ /** Current frame. */
+ private Frame _frame;
/// <summary>
/// Constructor.
@@ -81,7 +66,7 @@ namespace Apache.Ignite.Core.Impl.Binary
_marsh = marsh;
_mode = mode;
_builder = builder;
- _curPos = stream.Position;
+ _frame.Pos = stream.Position;
Stream = stream;
}
@@ -438,7 +423,7 @@ namespace Apache.Ignite.Core.Impl.Binary
/** <inheritdoc /> */
public T ReadObject<T>(string fieldName)
{
- if (_curRaw)
+ if (_frame.Raw)
throw new BinaryObjectException("Cannot read named fields after raw data is read.");
if (SeekField(fieldName))
@@ -712,34 +697,22 @@ namespace Apache.Ignite.Core.Impl.Binary
}
// Preserve old frame.
- var oldHdr = _curHdr;
- int oldPos = _curPos;
- var oldStruct = _curStruct;
- bool oldRaw = _curRaw;
- var oldSchema = _curSchema;
- var oldSchemaMap = _curSchemaMap;
+ var oldFrame = _frame;
// Set new frame.
- _curHdr = hdr;
- _curPos = pos;
+ _frame.Hdr = hdr;
+ _frame.Pos = pos;
SetCurSchema(desc);
- _curStruct = new BinaryStructureTracker(desc, desc.ReaderTypeStructure);
- _curRaw = false;
+ _frame.Struct = new BinaryStructureTracker(desc, desc.ReaderTypeStructure);
+ _frame.Raw = false;
// Read object.
- Stream.Seek(pos + BinaryObjectHeader.Size, SeekOrigin.Begin);
-
var obj = desc.Serializer.ReadBinary<T>(this, desc.Type, pos);
- _curStruct.UpdateReaderStructure();
+ _frame.Struct.UpdateReaderStructure();
// Restore old frame.
- _curHdr = oldHdr;
- _curPos = oldPos;
- _curStruct = oldStruct;
- _curRaw = oldRaw;
- _curSchema = oldSchema;
- _curSchemaMap = oldSchemaMap;
+ _frame = oldFrame;
return obj;
}
@@ -756,15 +729,15 @@ namespace Apache.Ignite.Core.Impl.Binary
/// </summary>
private void SetCurSchema(IBinaryTypeDescriptor desc)
{
- if (_curHdr.HasSchema)
+ if (_frame.Hdr.HasSchema)
{
- _curSchema = desc.Schema.Get(_curHdr.SchemaId);
+ _frame.Schema = desc.Schema.Get(_frame.Hdr.SchemaId);
- if (_curSchema == null)
+ if (_frame.Schema == null)
{
- _curSchema = ReadSchema();
+ _frame.Schema = ReadSchema(desc.TypeId);
- desc.Schema.Add(_curHdr.SchemaId, _curSchema);
+ desc.Schema.Add(_frame.Hdr.SchemaId, _frame.Schema);
}
}
}
@@ -772,25 +745,31 @@ namespace Apache.Ignite.Core.Impl.Binary
/// <summary>
/// Reads the schema.
/// </summary>
- private int[] ReadSchema()
+ private int[] ReadSchema(int typeId)
{
- if (_curHdr.IsCompactFooter)
+ if (_frame.Hdr.IsCompactFooter)
{
// Get schema from Java
- var schema = Marshaller.Ignite.BinaryProcessor.GetSchema(_curHdr.TypeId, _curHdr.SchemaId);
+ var ignite = Marshaller.Ignite;
+
+ var schema = ignite == null
+ ? null
+ : ignite.BinaryProcessor.GetSchema(_frame.Hdr.TypeId, _frame.Hdr.SchemaId);
if (schema == null)
throw new BinaryObjectException("Cannot find schema for object with compact footer [" +
- "typeId=" + _curHdr.TypeId + ", schemaId=" + _curHdr.SchemaId + ']');
+ "typeId=" + typeId + ", schemaId=" + _frame.Hdr.SchemaId + ']');
return schema;
}
- Stream.Seek(_curPos + _curHdr.SchemaOffset, SeekOrigin.Begin);
+ var pos = Stream.Position;
+
+ Stream.Seek(_frame.Pos + _frame.Hdr.SchemaOffset, SeekOrigin.Begin);
- var count = _curHdr.SchemaFieldCount;
+ var count = _frame.Hdr.SchemaFieldCount;
- var offsetSize = _curHdr.SchemaFieldOffsetSize;
+ var offsetSize = _frame.Hdr.SchemaFieldOffsetSize;
var res = new int[count];
@@ -800,6 +779,8 @@ namespace Apache.Ignite.Core.Impl.Binary
Stream.Seek(offsetSize, SeekOrigin.Current);
}
+ Stream.Seek(pos, SeekOrigin.Begin);
+
return res;
}
/// <summary>
@@ -867,11 +848,11 @@ namespace Apache.Ignite.Core.Impl.Binary
/// </summary>
private void MarkRaw()
{
- if (!_curRaw)
+ if (!_frame.Raw)
{
- _curRaw = true;
+ _frame.Raw = true;
- Stream.Seek(_curPos + _curHdr.GetRawOffset(Stream, _curPos), SeekOrigin.Begin);
+ Stream.Seek(_frame.Pos + _frame.Hdr.GetRawOffset(Stream, _frame.Pos), SeekOrigin.Begin);
}
}
@@ -880,29 +861,29 @@ namespace Apache.Ignite.Core.Impl.Binary
/// </summary>
private bool SeekField(string fieldName)
{
- if (_curRaw)
+ if (_frame.Raw)
throw new BinaryObjectException("Cannot read named fields after raw data is read.");
- if (!_curHdr.HasSchema)
+ if (!_frame.Hdr.HasSchema)
return false;
- var actionId = _curStruct.CurStructAction;
+ var actionId = _frame.Struct.CurStructAction;
- var fieldId = _curStruct.GetFieldId(fieldName);
+ var fieldId = _frame.Struct.GetFieldId(fieldName);
- if (_curSchema == null || actionId >= _curSchema.Length || fieldId != _curSchema[actionId])
+ if (_frame.Schema == null || actionId >= _frame.Schema.Length || fieldId != _frame.Schema[actionId])
{
- _curSchemaMap = _curSchemaMap ?? BinaryObjectSchemaSerializer.ReadSchema(Stream, _curPos, _curHdr,
- () => _curSchema).ToDictionary();
+ _frame.SchemaMap = _frame.SchemaMap ?? BinaryObjectSchemaSerializer.ReadSchema(Stream, _frame.Pos,
+ _frame.Hdr, () => _frame.Schema).ToDictionary();
- _curSchema = null; // read order is different, ignore schema for future reads
+ _frame.Schema = null; // read order is different, ignore schema for future reads
int pos;
- if (!_curSchemaMap.TryGetValue(fieldId, out pos))
+ if (!_frame.SchemaMap.TryGetValue(fieldId, out pos))
return false;
- Stream.Seek(pos + _curPos, SeekOrigin.Begin);
+ Stream.Seek(pos + _frame.Pos, SeekOrigin.Begin);
}
return true;
@@ -982,5 +963,29 @@ namespace Apache.Ignite.Core.Impl.Binary
return TypeCaster<T>.Cast(new BinaryEnum(enumType, enumValue, reader.Marshaller));
}
+
+ /// <summary>
+ /// Stores current reader stack frame.
+ /// </summary>
+ private struct Frame
+ {
+ /** Current position. */
+ public int Pos;
+
+ /** Current raw flag. */
+ public bool Raw;
+
+ /** Current type structure tracker. */
+ public BinaryStructureTracker Struct;
+
+ /** Current schema. */
+ public int[] Schema;
+
+ /** Current schema with positions. */
+ public Dictionary<int, int> SchemaMap;
+
+ /** Current header. */
+ public BinaryObjectHeader Hdr;
+ }
}
}