You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2017/01/23 10:12:46 UTC

[1/4] ignite git commit: IGNITE-4507: Hadoop: added direct output support for combiner. This closes #1434.

Repository: ignite
Updated Branches:
  refs/heads/ignite-2.0 38cb67d45 -> 82857d0cb


IGNITE-4507: Hadoop: added direct output support for combiner. This closes #1434.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d596f02b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d596f02b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d596f02b

Branch: refs/heads/ignite-2.0
Commit: d596f02b1c64c789f91dea57d349510101d3e201
Parents: d6d42c2
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Fri Jan 20 17:33:34 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Jan 20 17:33:34 2017 +0300

----------------------------------------------------------------------
 .../processors/hadoop/HadoopTaskContext.java    | 10 +++
 .../hadoop/impl/v1/HadoopV1MapTask.java         | 89 +++++++++++---------
 .../hadoop/impl/v1/HadoopV1ReduceTask.java      | 69 +++++++++------
 .../hadoop/impl/v2/HadoopV2Context.java         | 10 ---
 .../hadoop/impl/v2/HadoopV2MapTask.java         | 18 ++--
 .../hadoop/impl/v2/HadoopV2ReduceTask.java      | 14 +++
 .../hadoop/impl/v2/HadoopV2TaskContext.java     |  1 +
 .../hadoop/shuffle/HadoopShuffleJob.java        |  7 --
 .../shuffle/direct/HadoopDirectDataInput.java   |  2 +-
 .../hadoop/taskexecutor/HadoopRunnableTask.java | 12 ++-
 .../impl/HadoopAbstractMapReduceTest.java       |  2 +
 .../impl/HadoopMapReduceEmbeddedSelfTest.java   |  6 +-
 12 files changed, 145 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
index dddd017..d6e9394 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
@@ -207,4 +207,14 @@ public abstract class HadoopTaskContext {
      * @throws IgniteCheckedException On any error in callable.
      */
     public abstract <T> T runAsJobOwner(Callable<T> c) throws IgniteCheckedException;
+
+    /**
+     * Callback invoked from mapper thread when map is finished.
+     *
+     * @throws IgniteCheckedException If failed.
+     */
+    public void onMapperFinished() throws IgniteCheckedException {
+        if (output instanceof HadoopMapperAwareTaskOutput)
+            ((HadoopMapperAwareTaskOutput)output).onMapperFinished();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
index 65ff280..2aa4292 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
@@ -30,6 +30,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
 import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
 import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
@@ -45,7 +46,7 @@ public class HadoopV1MapTask extends HadoopV1Task {
     /**
      * Constructor.
      *
-     * @param taskInfo 
+     * @param taskInfo Taks info.
      */
     public HadoopV1MapTask(HadoopTaskInfo taskInfo) {
         super(taskInfo);
@@ -56,67 +57,79 @@ public class HadoopV1MapTask extends HadoopV1Task {
     @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
         HadoopJob job = taskCtx.job();
 
-        HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+        HadoopV2TaskContext taskCtx0 = (HadoopV2TaskContext)taskCtx;
 
-        JobConf jobConf = ctx.jobConf();
+        if (taskCtx.taskInfo().hasMapperIndex())
+            HadoopMapperUtils.mapperIndex(taskCtx.taskInfo().mapperIndex());
+        else
+            HadoopMapperUtils.clearMapperIndex();
 
-        InputFormat inFormat = jobConf.getInputFormat();
+        try {
+            JobConf jobConf = taskCtx0.jobConf();
 
-        HadoopInputSplit split = info().inputSplit();
+            InputFormat inFormat = jobConf.getInputFormat();
 
-        InputSplit nativeSplit;
+            HadoopInputSplit split = info().inputSplit();
 
-        if (split instanceof HadoopFileBlock) {
-            HadoopFileBlock block = (HadoopFileBlock)split;
+            InputSplit nativeSplit;
 
-            nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), EMPTY_HOSTS);
-        }
-        else
-            nativeSplit = (InputSplit)ctx.getNativeSplit(split);
+            if (split instanceof HadoopFileBlock) {
+                HadoopFileBlock block = (HadoopFileBlock)split;
 
-        assert nativeSplit != null;
+                nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), EMPTY_HOSTS);
+            }
+            else
+                nativeSplit = (InputSplit)taskCtx0.getNativeSplit(split);
 
-        Reporter reporter = new HadoopV1Reporter(taskCtx);
+            assert nativeSplit != null;
 
-        HadoopV1OutputCollector collector = null;
+            Reporter reporter = new HadoopV1Reporter(taskCtx);
 
-        try {
-            collector = collector(jobConf, ctx, !job.info().hasCombiner() && !job.info().hasReducer(),
-                fileName(), ctx.attemptId());
+            HadoopV1OutputCollector collector = null;
 
-            RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter);
+            try {
+                collector = collector(jobConf, taskCtx0, !job.info().hasCombiner() && !job.info().hasReducer(),
+                    fileName(), taskCtx0.attemptId());
 
-            Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf);
+                RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter);
 
-            Object key = reader.createKey();
-            Object val = reader.createValue();
+                Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf);
 
-            assert mapper != null;
+                Object key = reader.createKey();
+                Object val = reader.createValue();
+
+                assert mapper != null;
 
-            try {
                 try {
-                    while (reader.next(key, val)) {
-                        if (isCancelled())
-                            throw new HadoopTaskCancelledException("Map task cancelled.");
+                    try {
+                        while (reader.next(key, val)) {
+                            if (isCancelled())
+                                throw new HadoopTaskCancelledException("Map task cancelled.");
+
+                            mapper.map(key, val, collector, reporter);
+                        }
 
-                        mapper.map(key, val, collector, reporter);
+                        taskCtx.onMapperFinished();
+                    }
+                    finally {
+                        mapper.close();
                     }
                 }
                 finally {
-                    mapper.close();
+                    collector.closeWriter();
                 }
+
+                collector.commit();
             }
-            finally {
-                collector.closeWriter();
-            }
+            catch (Exception e) {
+                if (collector != null)
+                    collector.abort();
 
-            collector.commit();
+                throw new IgniteCheckedException(e);
+            }
         }
-        catch (Exception e) {
-            if (collector != null)
-                collector.abort();
-
-            throw new IgniteCheckedException(e);
+        finally {
+            HadoopMapperUtils.clearMapperIndex();
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
index 92c024e..5c1dd15 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
@@ -53,49 +54,63 @@ public class HadoopV1ReduceTask extends HadoopV1Task {
     @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
         HadoopJob job = taskCtx.job();
 
-        HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+        HadoopV2TaskContext taskCtx0 = (HadoopV2TaskContext)taskCtx;
 
-        JobConf jobConf = ctx.jobConf();
-
-        HadoopTaskInput input = taskCtx.input();
-
-        HadoopV1OutputCollector collector = null;
+        if (!reduce && taskCtx.taskInfo().hasMapperIndex())
+            HadoopMapperUtils.mapperIndex(taskCtx.taskInfo().mapperIndex());
+        else
+            HadoopMapperUtils.clearMapperIndex();
 
         try {
-            collector = collector(jobConf, ctx, reduce || !job.info().hasReducer(), fileName(), ctx.attemptId());
+            JobConf jobConf = taskCtx0.jobConf();
 
-            Reducer reducer;
-            if (reduce) reducer = ReflectionUtils.newInstance(jobConf.getReducerClass(),
-                jobConf);
-            else reducer = ReflectionUtils.newInstance(jobConf.getCombinerClass(),
-                jobConf);
+            HadoopTaskInput input = taskCtx.input();
 
-            assert reducer != null;
+            HadoopV1OutputCollector collector = null;
 
             try {
+                collector = collector(jobConf, taskCtx0, reduce || !job.info().hasReducer(), fileName(), taskCtx0.attemptId());
+
+                Reducer reducer;
+                if (reduce) reducer = ReflectionUtils.newInstance(jobConf.getReducerClass(),
+                    jobConf);
+                else reducer = ReflectionUtils.newInstance(jobConf.getCombinerClass(),
+                    jobConf);
+
+                assert reducer != null;
+
                 try {
-                    while (input.next()) {
-                        if (isCancelled())
-                            throw new HadoopTaskCancelledException("Reduce task cancelled.");
+                    try {
+                        while (input.next()) {
+                            if (isCancelled())
+                                throw new HadoopTaskCancelledException("Reduce task cancelled.");
+
+                            reducer.reduce(input.key(), input.values(), collector, Reporter.NULL);
+                        }
 
-                        reducer.reduce(input.key(), input.values(), collector, Reporter.NULL);
+                        if (!reduce)
+                            taskCtx.onMapperFinished();
+                    }
+                    finally {
+                        reducer.close();
                     }
                 }
                 finally {
-                    reducer.close();
+                    collector.closeWriter();
                 }
+
+                collector.commit();
             }
-            finally {
-                collector.closeWriter();
-            }
+            catch (Exception e) {
+                if (collector != null)
+                    collector.abort();
 
-            collector.commit();
+                throw new IgniteCheckedException(e);
+            }
         }
-        catch (Exception e) {
-            if (collector != null)
-                collector.abort();
-
-            throw new IgniteCheckedException(e);
+        finally {
+            if (!reduce)
+                HadoopMapperUtils.clearMapperIndex();
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
index eec0636..1f4e675 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
@@ -154,16 +154,6 @@ public class HadoopV2Context extends JobContextImpl implements MapContext, Reduc
         }
     }
 
-    /**
-     * Callback invoked from mapper thread when map is finished.
-     *
-     * @throws IgniteCheckedException If failed.
-     */
-    public void onMapperFinished() throws IgniteCheckedException {
-        if (output instanceof HadoopMapperAwareTaskOutput)
-            ((HadoopMapperAwareTaskOutput)output).onMapperFinished();
-    }
-
     /** {@inheritDoc} */
     @Override public OutputCommitter getOutputCommitter() {
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java
index eb3b935..1519199 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java
@@ -56,30 +56,32 @@ public class HadoopV2MapTask extends HadoopV2Task {
             HadoopMapperUtils.clearMapperIndex();
 
         try {
-            InputSplit nativeSplit = hadoopContext().getInputSplit();
+            HadoopV2Context hadoopCtx = hadoopContext();
+
+            InputSplit nativeSplit = hadoopCtx.getInputSplit();
 
             if (nativeSplit == null)
                 throw new IgniteCheckedException("Input split cannot be null.");
 
             InputFormat inFormat = ReflectionUtils.newInstance(jobCtx.getInputFormatClass(),
-                hadoopContext().getConfiguration());
+                hadoopCtx.getConfiguration());
 
-            RecordReader reader = inFormat.createRecordReader(nativeSplit, hadoopContext());
+            RecordReader reader = inFormat.createRecordReader(nativeSplit, hadoopCtx);
 
-            reader.initialize(nativeSplit, hadoopContext());
+            reader.initialize(nativeSplit, hadoopCtx);
 
-            hadoopContext().reader(reader);
+            hadoopCtx.reader(reader);
 
             HadoopJobInfo jobInfo = taskCtx.job().info();
 
             outputFormat = jobInfo.hasCombiner() || jobInfo.hasReducer() ? null : prepareWriter(jobCtx);
 
-            Mapper mapper = ReflectionUtils.newInstance(jobCtx.getMapperClass(), hadoopContext().getConfiguration());
+            Mapper mapper = ReflectionUtils.newInstance(jobCtx.getMapperClass(), hadoopCtx.getConfiguration());
 
             try {
-                mapper.run(new WrappedMapper().getMapContext(hadoopContext()));
+                mapper.run(new WrappedMapper().getMapContext(hadoopCtx));
 
-                hadoopContext().onMapperFinished();
+                taskCtx.onMapperFinished();
             }
             finally {
                 closeWriter();

http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java
index 930ec1d..09e0634 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
 
 /**
@@ -53,10 +54,17 @@ public class HadoopV2ReduceTask extends HadoopV2Task {
 
         JobContextImpl jobCtx = taskCtx.jobContext();
 
+        // Set mapper index for combiner tasks
+        if (!reduce && taskCtx.taskInfo().hasMapperIndex())
+            HadoopMapperUtils.mapperIndex(taskCtx.taskInfo().mapperIndex());
+        else
+            HadoopMapperUtils.clearMapperIndex();
+
         try {
             outputFormat = reduce || !taskCtx.job().info().hasReducer() ? prepareWriter(jobCtx) : null;
 
             Reducer reducer;
+
             if (reduce) reducer = ReflectionUtils.newInstance(jobCtx.getReducerClass(),
                 jobCtx.getConfiguration());
             else reducer = ReflectionUtils.newInstance(jobCtx.getCombinerClass(),
@@ -64,6 +72,9 @@ public class HadoopV2ReduceTask extends HadoopV2Task {
 
             try {
                 reducer.run(new WrappedReducer().getReducerContext(hadoopContext()));
+
+                if (!reduce)
+                    taskCtx.onMapperFinished();
             }
             finally {
                 closeWriter();
@@ -84,6 +95,9 @@ public class HadoopV2ReduceTask extends HadoopV2Task {
             throw new IgniteCheckedException(e);
         }
         finally {
+            if (!reduce)
+                HadoopMapperUtils.clearMapperIndex();
+
             if (err != null)
                 abort(outputFormat);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
index 5229590..8acc7aa 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
 import org.apache.ignite.internal.processors.hadoop.HadoopJob;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapperAwareTaskOutput;
 import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
 import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
 import org.apache.ignite.internal.processors.hadoop.HadoopSplitWrapper;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
index 0394865..3646aa7 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
@@ -182,13 +182,6 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
         boolean stripeMappers0 = get(job.info(), SHUFFLE_MAPPER_STRIPED_OUTPUT, false);
 
         if (stripeMappers0) {
-            if (job.info().hasCombiner()) {
-                log.info("Striped mapper output is disabled because it cannot be used together with combiner [jobId=" +
-                    job.id() + ']');
-
-                stripeMappers0 = false;
-            }
-
             if (!embedded) {
                 log.info("Striped mapper output is disabled becuase it cannot be used in external mode [jobId=" +
                     job.id() + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
index e3a713a..ef2905b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
@@ -48,7 +48,7 @@ public class HadoopDirectDataInput extends InputStream implements DataInput {
 
     /** {@inheritDoc} */
     @Override public int read() throws IOException {
-        return readByte();
+        return (int)readByte() & 0xFF;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
index a57efe6..339bf5b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
@@ -122,7 +122,7 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
 
     /**
      * Implements actual task running.
-     * @throws IgniteCheckedException
+     * @throws IgniteCheckedException On error.
      */
     void call0() throws IgniteCheckedException {
         execStartTs = U.currentTimeMillis();
@@ -144,7 +144,15 @@ public abstract class HadoopRunnableTask implements Callable<Void> {
             runTask(perfCntr);
 
             if (info.type() == MAP && job.info().hasCombiner()) {
-                ctx.taskInfo(new HadoopTaskInfo(COMBINE, info.jobId(), info.taskNumber(), info.attempt(), null));
+                // Switch to combiner.
+                HadoopTaskInfo combineTaskInfo = new HadoopTaskInfo(COMBINE, info.jobId(), info.taskNumber(),
+                    info.attempt(), null);
+
+                // Mapper and combiner share the same index.
+                if (ctx.taskInfo().hasMapperIndex())
+                    combineTaskInfo.mapperIndex(ctx.taskInfo().mapperIndex());
+
+                ctx.taskInfo(combineTaskInfo);
 
                 try {
                     runTask(perfCntr);

http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
index 89005f6..cd997a4 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
@@ -172,6 +172,8 @@ public class HadoopAbstractMapReduceTest extends HadoopAbstractWordCountTest {
      */
     protected final void doTest(IgfsPath inFile, boolean useNewMapper, boolean useNewCombiner, boolean useNewReducer)
         throws Exception {
+        log.info("useNewMapper=" + useNewMapper + ", useNewCombiner=" + useNewCombiner + ", useNewReducer=" + useNewReducer);
+
         igfs.delete(new IgfsPath(PATH_OUTPUT), true);
 
         JobConf jobConf = new JobConf();

http://git-wip-us.apache.org/repos/asf/ignite/blob/d596f02b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
index 8897a38..bce67f6 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
@@ -55,14 +55,14 @@ public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest {
         return cfg;
     }
 
-    /*
+    /**
      * @throws Exception If fails.
      */
     public void testMultiReducerWholeMapReduceExecution() throws Exception {
         checkMultiReducerWholeMapReduceExecution(false);
     }
 
-    /*
+    /**
      * @throws Exception If fails.
      */
     public void testMultiReducerWholeMapReduceExecutionStriped() throws Exception {
@@ -100,6 +100,8 @@ public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest {
 
             if (striped)
                 jobConf.set(HadoopJobProperty.SHUFFLE_MAPPER_STRIPED_OUTPUT.propertyName(), "true");
+            else
+                jobConf.set(HadoopJobProperty.SHUFFLE_MAPPER_STRIPED_OUTPUT.propertyName(), "false");
 
             jobConf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName());
 


[2/4] ignite git commit: Merge remote-tracking branch 'origin/master'

Posted by pt...@apache.org.
Merge remote-tracking branch 'origin/master'


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b7997fca
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b7997fca
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b7997fca

Branch: refs/heads/ignite-2.0
Commit: b7997fcaaaf038149fc03344692055097d2ac872
Parents: d596f02 f41eb8d
Author: devozerov <vo...@gridgain.com>
Authored: Fri Jan 20 17:33:54 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Jan 20 17:33:54 2017 +0300

----------------------------------------------------------------------
 .../tcp/ipfinder/s3/TcpDiscoveryS3IpFinder.java |  37 ++-
 .../TcpDiscoveryS3IpFinderAbstractSelfTest.java |  84 +++++
 ...3IpFinderAwsCredentialsProviderSelfTest.java |  46 +++
 ...scoveryS3IpFinderAwsCredentialsSelfTest.java |  45 +++
 .../s3/TcpDiscoveryS3IpFinderSelfTest.java      |  79 -----
 .../ignite/testsuites/IgniteS3TestSuite.java    |  26 +-
 .../store/jdbc/CacheAbstractJdbcStore.java      |  12 +-
 .../store/jdbc/JdbcTypesDefaultTransformer.java |  19 ++
 .../cache/store/jdbc/JdbcTypesTransformer.java  |  17 +
 .../cache/query/GridCacheQueryAdapter.java      |   8 +
 .../internal/processors/odbc/IgniteTypes.java   |  69 ++++
 .../internal/processors/odbc/OdbcTypes.java     | 131 ++++++++
 .../internal/processors/odbc/OdbcUtils.java     |  85 +++++
 .../processors/odbc/escape/OdbcEscapeUtils.java |  52 ++-
 .../platform/cache/PlatformCache.java           |  11 +-
 .../utils/PlatformConfigurationUtils.java       | 128 ++++++-
 .../CacheJdbcPojoStoreAbstractSelfTest.java     |  23 +-
 .../store/jdbc/CacheJdbcPojoStoreTest.java      |   3 +
 ...eJdbcStoreAbstractMultithreadedSelfTest.java |  17 +-
 .../ignite/cache/store/jdbc/model/Gender.java   |  41 +++
 .../ignite/cache/store/jdbc/model/Person.java   |  31 +-
 .../IgniteCacheQueryCacheDestroySelfTest.java   | 142 ++++++++
 .../odbc/OdbcEscapeSequenceSelfTest.java        | 131 ++++++++
 .../IgniteCacheQuerySelfTestSuite.java          |   2 +
 .../cpp/binary/src/impl/binary/binary_utils.cpp |   6 +-
 .../cpp/common/include/ignite/common/utils.h    |   8 +
 .../cpp/common/os/linux/src/common/utils.cpp    |  22 +-
 .../cpp/common/os/win/src/common/utils.cpp      |  14 +-
 modules/platforms/cpp/odbc-test/Makefile.am     |   1 +
 .../odbc-test/include/sql_test_suite_fixture.h  |  13 +
 .../cpp/odbc-test/project/vs/odbc-test.vcxproj  |   1 +
 .../project/vs/odbc-test.vcxproj.filters        |   3 +
 .../cpp/odbc-test/src/api_robustness_test.cpp   |   2 +-
 .../src/sql_aggregate_functions_test.cpp        |   4 +-
 .../src/sql_esc_convert_function_test.cpp       | 160 +++++++++
 .../odbc-test/src/sql_test_suite_fixture.cpp    |  52 ++-
 .../cpp/odbc-test/src/sql_types_test.cpp        | 131 +++++++-
 .../odbc/src/app/application_data_buffer.cpp    |  58 +++-
 .../platforms/cpp/odbc/src/app/parameter.cpp    |   4 +-
 .../cpp/odbc/src/config/connection_info.cpp     | 260 ++++++++++++++-
 .../Apache.Ignite.Core.Tests.csproj             |   4 +
 .../Binary/BinaryBuilderSelfTest.cs             | 159 ++++++---
 .../BinaryBuilderSelfTestArrayIdentity.cs       |  34 ++
 .../Binary/BinaryEqualityComparerTest.cs        | 279 ++++++++++++++++
 .../Binary/IO/BinaryStreamsTest.cs              |  19 ++
 .../Cache/CacheConfigurationTest.cs             |   5 +-
 .../Cache/Query/CacheDmlQueriesTest.cs          | 296 +++++++++++++++++
 .../Cache/Store/CacheParallelLoadStoreTest.cs   |   9 +-
 .../Cache/Store/CacheStoreSessionTest.cs        |  22 +-
 .../Cache/Store/CacheStoreTest.cs               | 333 ++++++++++++-------
 .../Cache/Store/CacheTestStore.cs               |  14 +
 .../Cache/Store/NamedNodeCacheStoreTest.cs      |  34 ++
 .../IgniteConfigurationSerializerTest.cs        |  46 ++-
 .../IgniteConfigurationTest.cs                  |  28 ++
 .../Apache.Ignite.Core.csproj                   |   5 +
 .../Binary/BinaryArrayEqualityComparer.cs       | 149 +++++++++
 .../Binary/BinaryConfiguration.cs               |  24 ++
 .../Binary/BinaryTypeConfiguration.cs           |  14 +
 .../Cache/Configuration/QueryEntity.cs          |  33 +-
 .../Cache/Configuration/QueryField.cs           |   6 +
 .../Apache.Ignite.Core/IgniteConfiguration.cs   |  85 ++++-
 .../IgniteConfigurationSection.xsd              |  19 ++
 .../Apache.Ignite.Core/Impl/Binary/Binary.cs    |  28 +-
 .../Binary/BinaryEqualityComparerSerializer.cs  |  99 ++++++
 .../Impl/Binary/BinaryFieldEqualityComparer.cs  | 138 ++++++++
 .../Impl/Binary/BinaryFullTypeDescriptor.cs     |  21 +-
 .../Impl/Binary/BinaryObject.cs                 |  31 +-
 .../Impl/Binary/BinaryObjectBuilder.cs          |  62 +++-
 .../Impl/Binary/BinaryObjectHeader.cs           |  21 +-
 .../Impl/Binary/BinaryObjectSchemaHolder.cs     |  22 ++
 .../Binary/BinarySurrogateTypeDescriptor.cs     |   6 +
 .../Impl/Binary/BinarySystemHandlers.cs         |   6 +-
 .../Impl/Binary/BinaryWriter.cs                 |  11 +-
 .../Impl/Binary/DateTimeHolder.cs               |  35 +-
 .../Impl/Binary/IBinaryEqualityComparer.cs      |  53 +++
 .../Impl/Binary/IBinaryTypeDescriptor.cs        |   5 +
 .../Impl/Binary/Io/BinaryHeapStream.cs          |   9 +
 .../Impl/Binary/Io/BinaryStreamBase.cs          |  13 +
 .../Impl/Binary/Io/IBinaryStream.cs             |  11 +-
 .../Impl/Binary/Io/IBinaryStreamProcessor.cs    |  36 ++
 .../Impl/Binary/Marshaller.cs                   |  22 +-
 .../Impl/Binary/SerializableObjectHolder.cs     |  16 +
 .../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs  |  14 +-
 .../Common/IgniteConfigurationXmlSerializer.cs  |   5 +-
 .../Impl/Memory/PlatformMemoryStream.cs         |  16 +
 .../Apache.Ignite.Examples.csproj               |   1 +
 .../Datagrid/QueryDmlExample.cs                 | 162 +++++++++
 .../src/test/config/jdbc-pojo-store-builtin.xml |   8 +
 .../src/test/config/jdbc-pojo-store-obj.xml     |   8 +
 89 files changed, 4009 insertions(+), 445 deletions(-)
----------------------------------------------------------------------



[4/4] ignite git commit: Merge branch 'master' into ignite-2.0

Posted by pt...@apache.org.
Merge branch 'master' into ignite-2.0

# Conflicts:
#	modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
#	modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/82857d0c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/82857d0c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/82857d0c

Branch: refs/heads/ignite-2.0
Commit: 82857d0cb6e2984a5359b822a9c903874414aa67
Parents: 38cb67d d5b2748
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Mon Jan 23 13:12:12 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Mon Jan 23 13:12:12 2017 +0300

----------------------------------------------------------------------
 .../processors/hadoop/HadoopTaskContext.java    |  10 ++
 .../hadoop/impl/v1/HadoopV1MapTask.java         |  89 +++++++------
 .../hadoop/impl/v1/HadoopV1ReduceTask.java      |  69 ++++++----
 .../hadoop/impl/v2/HadoopV2Context.java         |  10 --
 .../hadoop/impl/v2/HadoopV2MapTask.java         |  18 +--
 .../hadoop/impl/v2/HadoopV2ReduceTask.java      |  14 ++
 .../hadoop/impl/v2/HadoopV2TaskContext.java     |   1 +
 .../hadoop/shuffle/HadoopShuffleJob.java        |   7 -
 .../shuffle/direct/HadoopDirectDataInput.java   |   2 +-
 .../hadoop/taskexecutor/HadoopRunnableTask.java |  12 +-
 .../impl/HadoopAbstractMapReduceTest.java       |   2 +
 .../impl/HadoopMapReduceEmbeddedSelfTest.java   |   6 +-
 .../Impl/Binary/BinaryReader.cs                 | 131 ++++++++++---------
 13 files changed, 213 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/82857d0c/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/82857d0c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
----------------------------------------------------------------------
diff --cc modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
index cde6da6,2aa4292..2172ff2
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
@@@ -28,8 -28,9 +28,9 @@@ import org.apache.hadoop.mapred.Reporte
  import org.apache.hadoop.util.ReflectionUtils;
  import org.apache.ignite.IgniteCheckedException;
  import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
 -import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
 -import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+ import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils;
 +import org.apache.ignite.hadoop.HadoopInputSplit;
 +import org.apache.ignite.internal.processors.hadoop.HadoopJobEx;
  import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
  import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
  import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
@@@ -54,52 -55,64 +55,64 @@@ public class HadoopV1MapTask extends Ha
      /** {@inheritDoc} */
      @SuppressWarnings("unchecked")
      @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
 -        HadoopJob job = taskCtx.job();
 +        HadoopJobEx job = taskCtx.job();
  
-         HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+         HadoopV2TaskContext taskCtx0 = (HadoopV2TaskContext)taskCtx;
  
-         JobConf jobConf = ctx.jobConf();
+         if (taskCtx.taskInfo().hasMapperIndex())
+             HadoopMapperUtils.mapperIndex(taskCtx.taskInfo().mapperIndex());
+         else
+             HadoopMapperUtils.clearMapperIndex();
  
-         InputFormat inFormat = jobConf.getInputFormat();
+         try {
+             JobConf jobConf = taskCtx0.jobConf();
  
-         HadoopInputSplit split = info().inputSplit();
+             InputFormat inFormat = jobConf.getInputFormat();
  
-         InputSplit nativeSplit;
+             HadoopInputSplit split = info().inputSplit();
  
-         if (split instanceof HadoopFileBlock) {
-             HadoopFileBlock block = (HadoopFileBlock)split;
+             InputSplit nativeSplit;
  
-             nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), EMPTY_HOSTS);
-         }
-         else
-             nativeSplit = (InputSplit)ctx.getNativeSplit(split);
+             if (split instanceof HadoopFileBlock) {
+                 HadoopFileBlock block = (HadoopFileBlock)split;
  
-         assert nativeSplit != null;
+                 nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), EMPTY_HOSTS);
+             }
+             else
+                 nativeSplit = (InputSplit)taskCtx0.getNativeSplit(split);
  
-         Reporter reporter = new HadoopV1Reporter(taskCtx);
+             assert nativeSplit != null;
  
-         HadoopV1OutputCollector collector = null;
+             Reporter reporter = new HadoopV1Reporter(taskCtx);
  
-         try {
-             collector = collector(jobConf, ctx, !job.info().hasCombiner() && !job.info().hasReducer(),
-                 fileName(), ctx.attemptId());
+             HadoopV1OutputCollector collector = null;
  
-             RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter);
+             try {
+                 collector = collector(jobConf, taskCtx0, !job.info().hasCombiner() && !job.info().hasReducer(),
+                     fileName(), taskCtx0.attemptId());
  
-             Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf);
+                 RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter);
  
-             Object key = reader.createKey();
-             Object val = reader.createValue();
+                 Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf);
  
-             assert mapper != null;
+                 Object key = reader.createKey();
+                 Object val = reader.createValue();
+ 
+                 assert mapper != null;
  
-             try {
                  try {
-                     while (reader.next(key, val)) {
-                         if (isCancelled())
-                             throw new HadoopTaskCancelledException("Map task cancelled.");
+                     try {
+                         while (reader.next(key, val)) {
+                             if (isCancelled())
+                                 throw new HadoopTaskCancelledException("Map task cancelled.");
+ 
+                             mapper.map(key, val, collector, reporter);
+                         }
  
-                         mapper.map(key, val, collector, reporter);
+                         taskCtx.onMapperFinished();
+                     }
+                     finally {
+                         mapper.close();
                      }
                  }
                  finally {

http://git-wip-us.apache.org/repos/asf/ignite/blob/82857d0c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
----------------------------------------------------------------------
diff --cc modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
index 6b90653,5c1dd15..3ad583f
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
@@@ -22,7 -22,8 +22,8 @@@ import org.apache.hadoop.mapred.Reducer
  import org.apache.hadoop.mapred.Reporter;
  import org.apache.hadoop.util.ReflectionUtils;
  import org.apache.ignite.IgniteCheckedException;
 -import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+ import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils;
 +import org.apache.ignite.internal.processors.hadoop.HadoopJobEx;
  import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
  import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
  import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
@@@ -51,34 -52,47 +52,47 @@@ public class HadoopV1ReduceTask extend
      /** {@inheritDoc} */
      @SuppressWarnings("unchecked")
      @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
 -        HadoopJob job = taskCtx.job();
 +        HadoopJobEx job = taskCtx.job();
  
-         HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+         HadoopV2TaskContext taskCtx0 = (HadoopV2TaskContext)taskCtx;
  
-         JobConf jobConf = ctx.jobConf();
- 
-         HadoopTaskInput input = taskCtx.input();
- 
-         HadoopV1OutputCollector collector = null;
+         if (!reduce && taskCtx.taskInfo().hasMapperIndex())
+             HadoopMapperUtils.mapperIndex(taskCtx.taskInfo().mapperIndex());
+         else
+             HadoopMapperUtils.clearMapperIndex();
  
          try {
-             collector = collector(jobConf, ctx, reduce || !job.info().hasReducer(), fileName(), ctx.attemptId());
+             JobConf jobConf = taskCtx0.jobConf();
  
-             Reducer reducer;
-             if (reduce) reducer = ReflectionUtils.newInstance(jobConf.getReducerClass(),
-                 jobConf);
-             else reducer = ReflectionUtils.newInstance(jobConf.getCombinerClass(),
-                 jobConf);
+             HadoopTaskInput input = taskCtx.input();
  
-             assert reducer != null;
+             HadoopV1OutputCollector collector = null;
  
              try {
+                 collector = collector(jobConf, taskCtx0, reduce || !job.info().hasReducer(), fileName(), taskCtx0.attemptId());
+ 
+                 Reducer reducer;
+                 if (reduce) reducer = ReflectionUtils.newInstance(jobConf.getReducerClass(),
+                     jobConf);
+                 else reducer = ReflectionUtils.newInstance(jobConf.getCombinerClass(),
+                     jobConf);
+ 
+                 assert reducer != null;
+ 
                  try {
-                     while (input.next()) {
-                         if (isCancelled())
-                             throw new HadoopTaskCancelledException("Reduce task cancelled.");
+                     try {
+                         while (input.next()) {
+                             if (isCancelled())
+                                 throw new HadoopTaskCancelledException("Reduce task cancelled.");
+ 
+                             reducer.reduce(input.key(), input.values(), collector, Reporter.NULL);
+                         }
  
-                         reducer.reduce(input.key(), input.values(), collector, Reporter.NULL);
+                         if (!reduce)
+                             taskCtx.onMapperFinished();
+                     }
+                     finally {
+                         reducer.close();
                      }
                  }
                  finally {

http://git-wip-us.apache.org/repos/asf/ignite/blob/82857d0c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/82857d0c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --cc modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
index c698ee3,8acc7aa..a9c0bb7
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
@@@ -44,10 -44,11 +44,11 @@@ import org.apache.ignite.hadoop.io.Part
  import org.apache.ignite.hadoop.io.TextPartiallyRawComparator;
  import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
  import org.apache.ignite.internal.processors.hadoop.HadoopExternalSplit;
 -import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
 -import org.apache.ignite.internal.processors.hadoop.HadoopJob;
 +import org.apache.ignite.hadoop.HadoopInputSplit;
 +import org.apache.ignite.internal.processors.hadoop.HadoopJobEx;
  import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
  import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty;
+ import org.apache.ignite.internal.processors.hadoop.HadoopMapperAwareTaskOutput;
  import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
  import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
  import org.apache.ignite.internal.processors.hadoop.HadoopSplitWrapper;

http://git-wip-us.apache.org/repos/asf/ignite/blob/82857d0c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --cc modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
index 25925fc,3646aa7..c3e7018
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
@@@ -179,16 -179,9 +179,9 @@@ public class HadoopShuffleJob<T> implem
          this.embedded = embedded;
  
          // No stripes for combiner.
 -        boolean stripeMappers0 = get(job.info(), SHUFFLE_MAPPER_STRIPED_OUTPUT, false);
 +        boolean stripeMappers0 = get(job.info(), SHUFFLE_MAPPER_STRIPED_OUTPUT, true);
  
          if (stripeMappers0) {
-             if (job.info().hasCombiner()) {
-                 log.info("Striped mapper output is disabled because it cannot be used together with combiner [jobId=" +
-                     job.id() + ']');
- 
-                 stripeMappers0 = false;
-             }
- 
              if (!embedded) {
                  log.info("Striped mapper output is disabled becuase it cannot be used in external mode [jobId=" +
                      job.id() + ']');

http://git-wip-us.apache.org/repos/asf/ignite/blob/82857d0c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/82857d0c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/82857d0c/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.java
----------------------------------------------------------------------


[3/4] ignite git commit: IGNITE-4588 .NET: Simplify BinaryReader frame handling

Posted by pt...@apache.org.
IGNITE-4588 .NET: Simplify BinaryReader frame handling


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d5b27488
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d5b27488
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d5b27488

Branch: refs/heads/ignite-2.0
Commit: d5b274883b2047069457af60ad313ad9096f5641
Parents: b7997fc
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Mon Jan 23 13:08:15 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Mon Jan 23 13:08:15 2017 +0300

----------------------------------------------------------------------
 .../Impl/Binary/BinaryReader.cs                 | 131 ++++++++++---------
 1 file changed, 68 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d5b27488/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReader.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReader.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReader.cs
index d9facc3..70417f7 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReader.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReader.cs
@@ -41,29 +41,14 @@ namespace Apache.Ignite.Core.Impl.Binary
         /** Handles. */
         private BinaryReaderHandleDictionary _hnds;
 
-        /** Current position. */
-        private int _curPos;
-
-        /** Current raw flag. */
-        private bool _curRaw;
-
         /** Detach flag. */
         private bool _detach;
 
         /** Binary read mode. */
         private BinaryMode _mode;
 
-        /** Current type structure tracker. */
-        private BinaryStructureTracker _curStruct;
-
-        /** Current schema. */
-        private int[] _curSchema;
-
-        /** Current schema with positions. */
-        private Dictionary<int, int> _curSchemaMap;
-
-        /** Current header. */
-        private BinaryObjectHeader _curHdr;
+        /** Current frame. */
+        private Frame _frame;
 
         /// <summary>
         /// Constructor.
@@ -81,7 +66,7 @@ namespace Apache.Ignite.Core.Impl.Binary
             _marsh = marsh;
             _mode = mode;
             _builder = builder;
-            _curPos = stream.Position;
+            _frame.Pos = stream.Position;
 
             Stream = stream;
         }
@@ -438,7 +423,7 @@ namespace Apache.Ignite.Core.Impl.Binary
         /** <inheritdoc /> */
         public T ReadObject<T>(string fieldName)
         {
-            if (_curRaw)
+            if (_frame.Raw)
                 throw new BinaryObjectException("Cannot read named fields after raw data is read.");
 
             if (SeekField(fieldName))
@@ -712,34 +697,22 @@ namespace Apache.Ignite.Core.Impl.Binary
                     }
 
                     // Preserve old frame.
-                    var oldHdr = _curHdr;
-                    int oldPos = _curPos;
-                    var oldStruct = _curStruct;
-                    bool oldRaw = _curRaw;
-                    var oldSchema = _curSchema;
-                    var oldSchemaMap = _curSchemaMap;
+                    var oldFrame = _frame;
 
                     // Set new frame.
-                    _curHdr = hdr;
-                    _curPos = pos;
+                    _frame.Hdr = hdr;
+                    _frame.Pos = pos;
                     SetCurSchema(desc);
-                    _curStruct = new BinaryStructureTracker(desc, desc.ReaderTypeStructure);
-                    _curRaw = false;
+                    _frame.Struct = new BinaryStructureTracker(desc, desc.ReaderTypeStructure);
+                    _frame.Raw = false;
 
                     // Read object.
-                    Stream.Seek(pos + BinaryObjectHeader.Size, SeekOrigin.Begin);
-
                     var obj = desc.Serializer.ReadBinary<T>(this, desc.Type, pos);
 
-                    _curStruct.UpdateReaderStructure();
+                    _frame.Struct.UpdateReaderStructure();
 
                     // Restore old frame.
-                    _curHdr = oldHdr;
-                    _curPos = oldPos;
-                    _curStruct = oldStruct;
-                    _curRaw = oldRaw;
-                    _curSchema = oldSchema;
-                    _curSchemaMap = oldSchemaMap;
+                    _frame = oldFrame;
 
                     return obj;
                 }
@@ -756,15 +729,15 @@ namespace Apache.Ignite.Core.Impl.Binary
         /// </summary>
         private void SetCurSchema(IBinaryTypeDescriptor desc)
         {
-            if (_curHdr.HasSchema)
+            if (_frame.Hdr.HasSchema)
             {
-                _curSchema = desc.Schema.Get(_curHdr.SchemaId);
+                _frame.Schema = desc.Schema.Get(_frame.Hdr.SchemaId);
 
-                if (_curSchema == null)
+                if (_frame.Schema == null)
                 {
-                    _curSchema = ReadSchema();
+                    _frame.Schema = ReadSchema(desc.TypeId);
 
-                    desc.Schema.Add(_curHdr.SchemaId, _curSchema);
+                    desc.Schema.Add(_frame.Hdr.SchemaId, _frame.Schema);
                 }
             }
         }
@@ -772,25 +745,31 @@ namespace Apache.Ignite.Core.Impl.Binary
         /// <summary>
         /// Reads the schema.
         /// </summary>
-        private int[] ReadSchema()
+        private int[] ReadSchema(int typeId)
         {
-            if (_curHdr.IsCompactFooter)
+            if (_frame.Hdr.IsCompactFooter)
             {
                 // Get schema from Java
-                var schema = Marshaller.Ignite.BinaryProcessor.GetSchema(_curHdr.TypeId, _curHdr.SchemaId);
+                var ignite = Marshaller.Ignite;
+
+                var schema = ignite == null 
+                    ? null 
+                    : ignite.BinaryProcessor.GetSchema(_frame.Hdr.TypeId, _frame.Hdr.SchemaId);
 
                 if (schema == null)
                     throw new BinaryObjectException("Cannot find schema for object with compact footer [" +
-                        "typeId=" + _curHdr.TypeId + ", schemaId=" + _curHdr.SchemaId + ']');
+                        "typeId=" + typeId + ", schemaId=" + _frame.Hdr.SchemaId + ']');
 
                 return schema;
             }
 
-            Stream.Seek(_curPos + _curHdr.SchemaOffset, SeekOrigin.Begin);
+            var pos = Stream.Position;
+
+            Stream.Seek(_frame.Pos + _frame.Hdr.SchemaOffset, SeekOrigin.Begin);
 
-            var count = _curHdr.SchemaFieldCount;
+            var count = _frame.Hdr.SchemaFieldCount;
 
-            var offsetSize = _curHdr.SchemaFieldOffsetSize;
+            var offsetSize = _frame.Hdr.SchemaFieldOffsetSize;
 
             var res = new int[count];
 
@@ -800,6 +779,8 @@ namespace Apache.Ignite.Core.Impl.Binary
                 Stream.Seek(offsetSize, SeekOrigin.Current);
             }
 
+            Stream.Seek(pos, SeekOrigin.Begin);
+
             return res;
         }
         /// <summary>
@@ -867,11 +848,11 @@ namespace Apache.Ignite.Core.Impl.Binary
         /// </summary>
         private void MarkRaw()
         {
-            if (!_curRaw)
+            if (!_frame.Raw)
             {
-                _curRaw = true;
+                _frame.Raw = true;
 
-                Stream.Seek(_curPos + _curHdr.GetRawOffset(Stream, _curPos), SeekOrigin.Begin);
+                Stream.Seek(_frame.Pos + _frame.Hdr.GetRawOffset(Stream, _frame.Pos), SeekOrigin.Begin);
             }
         }
 
@@ -880,29 +861,29 @@ namespace Apache.Ignite.Core.Impl.Binary
         /// </summary>
         private bool SeekField(string fieldName)
         {
-            if (_curRaw)
+            if (_frame.Raw)
                 throw new BinaryObjectException("Cannot read named fields after raw data is read.");
 
-            if (!_curHdr.HasSchema)
+            if (!_frame.Hdr.HasSchema)
                 return false;
 
-            var actionId = _curStruct.CurStructAction;
+            var actionId = _frame.Struct.CurStructAction;
 
-            var fieldId = _curStruct.GetFieldId(fieldName);
+            var fieldId = _frame.Struct.GetFieldId(fieldName);
 
-            if (_curSchema == null || actionId >= _curSchema.Length || fieldId != _curSchema[actionId])
+            if (_frame.Schema == null || actionId >= _frame.Schema.Length || fieldId != _frame.Schema[actionId])
             {
-                _curSchemaMap = _curSchemaMap ?? BinaryObjectSchemaSerializer.ReadSchema(Stream, _curPos, _curHdr,
-                                    () => _curSchema).ToDictionary();
+                _frame.SchemaMap = _frame.SchemaMap ?? BinaryObjectSchemaSerializer.ReadSchema(Stream, _frame.Pos,
+                    _frame.Hdr, () => _frame.Schema).ToDictionary();
 
-                _curSchema = null; // read order is different, ignore schema for future reads
+                _frame.Schema = null; // read order is different, ignore schema for future reads
 
                 int pos;
 
-                if (!_curSchemaMap.TryGetValue(fieldId, out pos))
+                if (!_frame.SchemaMap.TryGetValue(fieldId, out pos))
                     return false;
 
-                Stream.Seek(pos + _curPos, SeekOrigin.Begin);
+                Stream.Seek(pos + _frame.Pos, SeekOrigin.Begin);
             }
 
             return true;
@@ -982,5 +963,29 @@ namespace Apache.Ignite.Core.Impl.Binary
 
             return TypeCaster<T>.Cast(new BinaryEnum(enumType, enumValue, reader.Marshaller));
         }
+
+        /// <summary>
+        /// Stores current reader stack frame.
+        /// </summary>
+        private struct Frame
+        {
+            /** Current position. */
+            public int Pos;
+
+            /** Current raw flag. */
+            public bool Raw;
+
+            /** Current type structure tracker. */
+            public BinaryStructureTracker Struct;
+
+            /** Current schema. */
+            public int[] Schema;
+
+            /** Current schema with positions. */
+            public Dictionary<int, int> SchemaMap;
+
+            /** Current header. */
+            public BinaryObjectHeader Hdr;
+        }
     }
 }