You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/01/16 13:36:34 UTC

[2/2] ignite git commit: IGNITE-4428: Hadoop: moved HadoopMapReducePlanner and dependent classes to public space. This closes #1389. This closes #1394.

IGNITE-4428: Hadoop: moved HadoopMapReducePlanner and dependent classes to public space. This closes #1389. This closes #1394.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d14e0727
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d14e0727
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d14e0727

Branch: refs/heads/ignite-2.0
Commit: d14e0727b3dd61ab5ec2957133d77dbc25e9ba68
Parents: 77ca2e6
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Mon Jan 16 16:36:25 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Jan 16 16:36:25 2017 +0300

----------------------------------------------------------------------
 .../configuration/HadoopConfiguration.java      |   2 +-
 .../apache/ignite/hadoop/HadoopInputSplit.java  |  54 +++++++
 .../org/apache/ignite/hadoop/HadoopJob.java     |  74 ++++++++++
 .../ignite/hadoop/HadoopMapReducePlan.java      |  80 +++++++++++
 .../ignite/hadoop/HadoopMapReducePlanner.java   |  40 ++++++
 .../processors/hadoop/HadoopDefaultJobInfo.java |   4 +-
 .../processors/hadoop/HadoopFileBlock.java      |   1 +
 .../processors/hadoop/HadoopInputSplit.java     |  54 -------
 .../internal/processors/hadoop/HadoopJob.java   | 107 --------------
 .../internal/processors/hadoop/HadoopJobEx.java | 140 +++++++++++++++++++
 .../processors/hadoop/HadoopJobInfo.java        |  54 +++----
 .../processors/hadoop/HadoopMapReducePlan.java  |  80 -----------
 .../hadoop/HadoopMapReducePlanner.java          |  40 ------
 .../processors/hadoop/HadoopTaskContext.java    |   6 +-
 .../processors/hadoop/HadoopTaskInfo.java       |   1 +
 .../hadoop/counter/HadoopCounterWriter.java     |   4 +-
 .../resources/META-INF/classnames.properties    |   4 +-
 .../fs/IgniteHadoopFileSystemCounterWriter.java |   6 +-
 .../IgniteHadoopWeightedMapReducePlanner.java   |  10 +-
 .../planner/HadoopAbstractMapReducePlanner.java | 118 ++++++++++++++++
 .../planner/HadoopTestRoundRobinMrPlanner.java  |  75 ++++++++++
 .../processors/hadoop/HadoopCommonUtils.java    |   1 +
 .../processors/hadoop/HadoopContext.java        |   2 +
 .../processors/hadoop/HadoopExternalSplit.java  |   1 +
 .../processors/hadoop/HadoopSplitWrapper.java   |   1 +
 .../HadoopFileSystemCounterWriterDelegate.java  |   4 +-
 ...doopFileSystemCounterWriterDelegateImpl.java |   4 +-
 .../hadoop/impl/v1/HadoopV1MapTask.java         |   6 +-
 .../hadoop/impl/v1/HadoopV1ReduceTask.java      |   4 +-
 .../hadoop/impl/v1/HadoopV1Splitter.java        |   2 +-
 .../hadoop/impl/v2/HadoopV2Context.java         |   2 +-
 .../processors/hadoop/impl/v2/HadoopV2Job.java  |  15 +-
 .../hadoop/impl/v2/HadoopV2Splitter.java        |   2 +-
 .../hadoop/impl/v2/HadoopV2TaskContext.java     |   6 +-
 .../hadoop/jobtracker/HadoopJobMetadata.java    |   4 +-
 .../hadoop/jobtracker/HadoopJobTracker.java     |  32 ++---
 .../planner/HadoopAbstractMapReducePlanner.java | 116 ---------------
 .../planner/HadoopDefaultMapReducePlan.java     |   4 +-
 .../hadoop/shuffle/HadoopShuffle.java           |   4 +-
 .../hadoop/shuffle/HadoopShuffleJob.java        |   7 +-
 .../HadoopEmbeddedTaskExecutor.java             |   8 +-
 .../hadoop/taskexecutor/HadoopRunnableTask.java |   6 +-
 .../taskexecutor/HadoopTaskExecutorAdapter.java |   8 +-
 .../external/HadoopExternalTaskExecutor.java    |  16 +--
 .../child/HadoopChildProcessRunner.java         |   4 +-
 .../resources/META-INF/classnames.properties    |   4 +-
 .../hadoop/impl/HadoopCommandLineTest.java      |   4 +-
 .../hadoop/impl/HadoopJobTrackerSelfTest.java   |   1 +
 .../hadoop/impl/HadoopPlannerMockJob.java       |  10 +-
 .../hadoop/impl/HadoopTasksAllVersionsTest.java |  16 +--
 .../hadoop/impl/HadoopTasksV1Test.java          |   4 +-
 .../hadoop/impl/HadoopTasksV2Test.java          |   4 +-
 .../impl/HadoopTestRoundRobinMrPlanner.java     |  75 ----------
 .../hadoop/impl/HadoopTestTaskContext.java      |   6 +-
 .../hadoop/impl/HadoopV2JobSelfTest.java        |   6 +-
 .../HadoopWeightedMapReducePlannerTest.java     |   6 +-
 .../collections/HadoopAbstractMapTest.java      |   4 +-
 57 files changed, 738 insertions(+), 615 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/configuration/HadoopConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/HadoopConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/HadoopConfiguration.java
index 84014d6..6443a67 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/HadoopConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/HadoopConfiguration.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.configuration;
 
 import org.apache.ignite.lifecycle.LifecycleBean;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner;
+import org.apache.ignite.hadoop.HadoopMapReducePlanner;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopInputSplit.java b/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopInputSplit.java
new file mode 100644
index 0000000..4138e64
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopInputSplit.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop;
+
+import java.io.Externalizable;
+
+/**
+ * Abstract fragment of an input data source.
+ */
+public abstract class HadoopInputSplit implements Externalizable {
+    /** */
+    protected String[] hosts;
+
+    /**
+     * Array of hosts where this input split resides.
+     *
+     * @return Hosts.
+     */
+    public String[] hosts() {
+        assert hosts != null;
+
+        return hosts;
+    }
+
+    /**
+     * This method must be implemented for purpose of internal implementation.
+     *
+     * @param obj Another object.
+     * @return {@code true} If objects are equal.
+     */
+    @Override public abstract boolean equals(Object obj);
+
+    /**
+     * This method must be implemented for purpose of internal implementation.
+     *
+     * @return Hash code of the object.
+     */
+    @Override public abstract int hashCode();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopJob.java b/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopJob.java
new file mode 100644
index 0000000..8ee0330
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopJob.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.hadoop;
+
+import java.util.Collection;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Compact job description.
+ */
+public interface HadoopJob {
+    /**
+     * Gets collection of input splits for this job.
+     *
+     * @return Input splits.
+     */
+    public Collection<HadoopInputSplit> input();
+
+    /**
+     * Gets optional configuration property for the job.
+     *
+     * @param name Property name.
+     * @return Value or {@code null} if none.
+     */
+    @Nullable String property(String name);
+
+    /**
+     * Checks whether job has combiner.
+     *
+     * @return {@code true} If job has combiner.
+     */
+    boolean hasCombiner();
+
+    /**
+     * Checks whether job has reducer.
+     * Actual number of reducers will be in {@link HadoopMapReducePlan#reducers()}.
+     *
+     * @return Number of reducer.
+     */
+    boolean hasReducer();
+
+    /**
+     * @return Number of reducers configured for job.
+     */
+    int reducers();
+
+    /**
+     * Gets job name.
+     *
+     * @return Job name.
+     */
+    String jobName();
+
+    /**
+     * Gets user name.
+     *
+     * @return User name.
+     */
+    String user();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopMapReducePlan.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopMapReducePlan.java b/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopMapReducePlan.java
new file mode 100644
index 0000000..f77fb64
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopMapReducePlan.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.UUID;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Map-reduce job execution plan.
+ */
+public interface HadoopMapReducePlan extends Serializable {
+    /**
+     * Gets collection of file blocks for which mappers should be executed.
+     *
+     * @param nodeId Node ID to check.
+     * @return Collection of file blocks or {@code null} if no mappers should be executed on given node.
+     */
+    @Nullable public Collection<HadoopInputSplit> mappers(UUID nodeId);
+
+    /**
+     * Gets reducer IDs that should be started on given node.
+     *
+     * @param nodeId Node ID to check.
+     * @return Array of reducer IDs.
+     */
+    @Nullable public int[] reducers(UUID nodeId);
+
+    /**
+     * Gets collection of all node IDs involved in map part of job execution.
+     *
+     * @return Collection of node IDs.
+     */
+    public Collection<UUID> mapperNodeIds();
+
+    /**
+     * Gets collection of all node IDs involved in reduce part of job execution.
+     *
+     * @return Collection of node IDs.
+     */
+    public Collection<UUID> reducerNodeIds();
+
+    /**
+     * Gets overall number of mappers for the job.
+     *
+     * @return Number of mappers.
+     */
+    public int mappers();
+
+    /**
+     * Gets overall number of reducers for the job.
+     *
+     * @return Number of reducers.
+     */
+    public int reducers();
+
+    /**
+     * Gets node ID for reducer.
+     *
+     * @param reducer Reducer.
+     * @return Node ID.
+     */
+    public UUID nodeForReducer(int reducer);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopMapReducePlanner.java b/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopMapReducePlanner.java
new file mode 100644
index 0000000..8d77b70
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/hadoop/HadoopMapReducePlanner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop;
+
+import java.util.Collection;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Map-reduce execution planner.
+ */
+public interface HadoopMapReducePlanner {
+    /**
+     * Prepares map-reduce execution plan for the given job and topology.
+     *
+     * @param job Job.
+     * @param top Topology.
+     * @param oldPlan Old plan in case of partial failure.
+     * @return Map reduce plan.
+     * @throws IgniteCheckedException If an error occurs.
+     */
+    public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top,
+        @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
index ae17ac8..ab38e4c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java
@@ -82,12 +82,12 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable {
     }
 
     /** {@inheritDoc} */
-    @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls, HadoopJobId jobId, IgniteLogger log,
+    @Override public HadoopJobEx createJob(Class<? extends HadoopJobEx> jobCls, HadoopJobId jobId, IgniteLogger log,
         @Nullable String[] libNames, HadoopHelper helper) throws IgniteCheckedException {
         assert jobCls != null;
 
         try {
-            Constructor<? extends HadoopJob> constructor = jobCls.getConstructor(HadoopJobId.class,
+            Constructor<? extends HadoopJobEx> constructor = jobCls.getConstructor(HadoopJobId.class,
                 HadoopDefaultJobInfo.class, IgniteLogger.class, String[].class, HadoopHelper.class);
 
             return constructor.newInstance(jobId, this, log, libNames, helper);

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopFileBlock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopFileBlock.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopFileBlock.java
index bc665eb..351abce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopFileBlock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopFileBlock.java
@@ -22,6 +22,7 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.net.URI;
 import java.util.Arrays;
+import org.apache.ignite.hadoop.HadoopInputSplit;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopInputSplit.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopInputSplit.java
deleted file mode 100644
index 998cb61..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopInputSplit.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.Externalizable;
-
-/**
- * Abstract fragment of an input data source.
- */
-public abstract class HadoopInputSplit implements Externalizable {
-    /** */
-    protected String[] hosts;
-
-    /**
-     * Array of hosts where this input split resides.
-     *
-     * @return Hosts.
-     */
-    public String[] hosts() {
-        assert hosts != null;
-
-        return hosts;
-    }
-
-    /**
-     * This method must be implemented for purpose of internal implementation.
-     *
-     * @param obj Another object.
-     * @return {@code true} If objects are equal.
-     */
-    @Override public abstract boolean equals(Object obj);
-
-    /**
-     * This method must be implemented for purpose of internal implementation.
-     *
-     * @return Hash code of the object.
-     */
-    @Override public abstract int hashCode();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
deleted file mode 100644
index a77c744..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJob.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.util.Collection;
-import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
-
-/**
- * Hadoop job.
- */
-public interface HadoopJob {
-    /**
-     * Gets job ID.
-     *
-     * @return Job ID.
-     */
-    public HadoopJobId id();
-
-    /**
-     * Gets job information.
-     *
-     * @return Job information.
-     */
-    public HadoopJobInfo info();
-
-    /**
-     * Gets collection of input splits for this job.
-     *
-     * @return Input splits.
-     */
-    public Collection<HadoopInputSplit> input() throws IgniteCheckedException;
-
-    /**
-     * Returns context for task execution.
-     *
-     * @param info Task info.
-     * @return Task Context.
-     * @throws IgniteCheckedException If failed.
-     */
-    public HadoopTaskContext getTaskContext(HadoopTaskInfo info) throws IgniteCheckedException;
-
-    /**
-     * Does all the needed initialization for the job. Will be called on each node where tasks for this job must
-     * be executed.
-     * <p>
-     * If job is running in external mode this method will be called on instance in Ignite node with parameter
-     * {@code false} and on instance in external process with parameter {@code true}.
-     *
-     * @param external If {@code true} then this job instance resides in external process.
-     * @param locNodeId Local node ID.
-     * @throws IgniteCheckedException If failed.
-     */
-    public void initialize(boolean external, UUID locNodeId) throws IgniteCheckedException;
-
-    /**
-     * Release all the resources.
-     * <p>
-     * If job is running in external mode this method will be called on instance in Ignite node with parameter
-     * {@code false} and on instance in external process with parameter {@code true}.
-     *
-     * @param external If {@code true} then this job instance resides in external process.
-     * @throws IgniteCheckedException If failed.
-     */
-    public void dispose(boolean external) throws IgniteCheckedException;
-
-    /**
-     * Prepare local environment for the task.
-     *
-     * @param info Task info.
-     * @throws IgniteCheckedException If failed.
-     */
-    public void prepareTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException;
-
-    /**
-     * Cleans up local environment of the task.
-     *
-     * @param info Task info.
-     * @throws IgniteCheckedException If failed.
-     */
-    public void cleanupTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException;
-
-    /**
-     * Cleans up the job staging directory.
-     */
-    public void cleanupStagingDirectory();
-
-    /**
-     * @return Ignite work directory.
-     */
-    public String igniteWorkDirectory();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobEx.java
new file mode 100644
index 0000000..ba78af9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobEx.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.hadoop.HadoopInputSplit;
+import org.apache.ignite.hadoop.HadoopJob;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Hadoop job.
+ */
+public abstract class HadoopJobEx implements HadoopJob {
+    /**
+     * Gets job ID.
+     *
+     * @return Job ID.
+     */
+    abstract public HadoopJobId id();
+
+    /**
+     * Gets job information.
+     *
+     * @return Job information.
+     */
+    abstract public HadoopJobInfo info();
+
+    /**
+     * Gets collection of input splits for this job.
+     *
+     * @return Input splits.
+     */
+    abstract public Collection<HadoopInputSplit> input();
+
+    /**
+     * Returns context for task execution.
+     *
+     * @param info Task info.
+     * @return Task Context.
+     * @throws IgniteCheckedException If failed.
+     */
+    abstract public HadoopTaskContext getTaskContext(HadoopTaskInfo info) throws IgniteCheckedException;
+
+    /**
+     * Does all the needed initialization for the job. Will be called on each node where tasks for this job must
+     * be executed.
+     * <p>
+     * If job is running in external mode this method will be called on instance in Ignite node with parameter
+     * {@code false} and on instance in external process with parameter {@code true}.
+     *
+     * @param external If {@code true} then this job instance resides in external process.
+     * @param locNodeId Local node ID.
+     * @throws IgniteCheckedException If failed.
+     */
+    abstract public void initialize(boolean external, UUID locNodeId) throws IgniteCheckedException;
+
+    /**
+     * Release all the resources.
+     * <p>
+     * If job is running in external mode this method will be called on instance in Ignite node with parameter
+     * {@code false} and on instance in external process with parameter {@code true}.
+     *
+     * @param external If {@code true} then this job instance resides in external process.
+     * @throws IgniteCheckedException If failed.
+     */
+    abstract public void dispose(boolean external) throws IgniteCheckedException;
+
+    /**
+     * Prepare local environment for the task.
+     *
+     * @param info Task info.
+     * @throws IgniteCheckedException If failed.
+     */
+    abstract public void prepareTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException;
+
+    /**
+     * Cleans up local environment of the task.
+     *
+     * @param info Task info.
+     * @throws IgniteCheckedException If failed.
+     */
+    abstract public void cleanupTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException;
+
+    /**
+     * Cleans up the job staging directory.
+     */
+    abstract public void cleanupStagingDirectory();
+
+    /**
+     * @return Ignite work directory.
+     */
+    abstract public String igniteWorkDirectory();
+
+    /** {@inheritDoc} */
+    @Nullable @Override public String property(String name) {
+        return info().property(name);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasCombiner() {
+        return info().hasCombiner();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasReducer() {
+        return info().hasReducer();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int reducers() {
+        return info().reducers();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String jobName() {
+        return info().jobName();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String user() {
+        return info().user();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
index 853c63d..4cc8f80 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java
@@ -17,29 +17,29 @@
 
 package org.apache.ignite.internal.processors.hadoop;
 
-import java.io.Serializable;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.hadoop.HadoopMapReducePlan;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Compact job description.
+ * Extended job description.
  */
-public interface HadoopJobInfo extends Serializable {
+public interface HadoopJobInfo {
     /**
      * Gets optional configuration property for the job.
      *
      * @param name Property name.
      * @return Value or {@code null} if none.
      */
-    @Nullable public String property(String name);
+    @Nullable String property(String name);
 
     /**
      * Checks whether job has combiner.
      *
      * @return {@code true} If job has combiner.
      */
-    public boolean hasCombiner();
+    boolean hasCombiner();
 
     /**
      * Checks whether job has reducer.
@@ -47,42 +47,42 @@ public interface HadoopJobInfo extends Serializable {
      *
      * @return Number of reducer.
      */
-    public boolean hasReducer();
-
-    /**
-     * Creates new job instance for the given ID.
-     * {@link HadoopJobInfo} is reusable for multiple jobs while {@link HadoopJob} is for one job execution.
-     * This method will be called once for the same ID on one node, though it can be called on the same host
-     * multiple times from different processes (in case of multiple nodes on the same host or external execution).
-     *
-     * @param jobCls The job class.
-     * @param jobId Job ID.
-     * @param log Logger.
-     * @param libNames Optional additional native library names.
-     * @param helper HadoopHelper.
-     * @return Job.
-     * @throws IgniteCheckedException If failed.
-     */
-    public HadoopJob createJob(Class<? extends HadoopJob> jobCls,
-        HadoopJobId jobId, IgniteLogger log, @Nullable String[] libNames, HadoopHelper helper)
-            throws IgniteCheckedException;
+    boolean hasReducer();
 
     /**
      * @return Number of reducers configured for job.
      */
-    public int reducers();
+    int reducers();
 
     /**
      * Gets job name.
      *
      * @return Job name.
      */
-    public String jobName();
+    String jobName();
 
     /**
      * Gets user name.
      *
      * @return User name.
      */
-    public String user();
+    String user();
+
+    /**
+     * Creates new job instance for the given ID.
+     * {@link HadoopJobInfo} is reusable for multiple jobs while {@link HadoopJobEx} is for one job execution.
+     * This method will be called once for the same ID on one node, though it can be called on the same host
+     * multiple times from different processes (in case of multiple nodes on the same host or external execution).
+     *
+     * @param jobCls The job class.
+     * @param jobId Job ID.
+     * @param log Logger.
+     * @param libNames Optional additional native library names.
+     * @param helper HadoopHelper.
+     * @return Job.
+     * @throws IgniteCheckedException If failed.
+     */
+    public HadoopJobEx createJob(Class<? extends HadoopJobEx> jobCls,
+        HadoopJobId jobId, IgniteLogger log, @Nullable String[] libNames, HadoopHelper helper)
+            throws IgniteCheckedException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlan.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlan.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlan.java
deleted file mode 100644
index aadc2bf..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlan.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.UUID;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Map-reduce job execution plan.
- */
-public interface HadoopMapReducePlan extends Serializable {
-    /**
-     * Gets collection of file blocks for which mappers should be executed.
-     *
-     * @param nodeId Node ID to check.
-     * @return Collection of file blocks or {@code null} if no mappers should be executed on given node.
-     */
-    @Nullable public Collection<HadoopInputSplit> mappers(UUID nodeId);
-
-    /**
-     * Gets reducer IDs that should be started on given node.
-     *
-     * @param nodeId Node ID to check.
-     * @return Array of reducer IDs.
-     */
-    @Nullable public int[] reducers(UUID nodeId);
-
-    /**
-     * Gets collection of all node IDs involved in map part of job execution.
-     *
-     * @return Collection of node IDs.
-     */
-    public Collection<UUID> mapperNodeIds();
-
-    /**
-     * Gets collection of all node IDs involved in reduce part of job execution.
-     *
-     * @return Collection of node IDs.
-     */
-    public Collection<UUID> reducerNodeIds();
-
-    /**
-     * Gets overall number of mappers for the job.
-     *
-     * @return Number of mappers.
-     */
-    public int mappers();
-
-    /**
-     * Gets overall number of reducers for the job.
-     *
-     * @return Number of reducers.
-     */
-    public int reducers();
-
-    /**
-     * Gets node ID for reducer.
-     *
-     * @param reducer Reducer.
-     * @return Node ID.
-     */
-    public UUID nodeForReducer(int reducer);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java
deleted file mode 100644
index 0009c4a..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReducePlanner.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import java.util.Collection;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cluster.ClusterNode;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Map-reduce execution planner.
- */
-public interface HadoopMapReducePlanner {
-    /**
-     * Prepares map-reduce execution plan for the given job and topology.
-     *
-     * @param job Job.
-     * @param top Topology.
-     * @param oldPlan Old plan in case of partial failure.
-     * @return Map reduce plan.
-     * @throws IgniteCheckedException If an error occurs.
-     */
-    public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top,
-        @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
index dddd017..194c1dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
@@ -29,7 +29,7 @@ import org.apache.ignite.internal.processors.hadoop.io.PartiallyOffheapRawCompar
  */
 public abstract class HadoopTaskContext {
     /** */
-    protected final HadoopJob job;
+    protected final HadoopJobEx job;
 
     /** */
     private HadoopTaskInput input;
@@ -44,7 +44,7 @@ public abstract class HadoopTaskContext {
      * @param taskInfo Task info.
      * @param job Job.
      */
-    protected HadoopTaskContext(HadoopTaskInfo taskInfo, HadoopJob job) {
+    protected HadoopTaskContext(HadoopTaskInfo taskInfo, HadoopJobEx job) {
         this.taskInfo = taskInfo;
         this.job = job;
     }
@@ -88,7 +88,7 @@ public abstract class HadoopTaskContext {
     /**
      * @return Job.
      */
-    public HadoopJob job() {
+    public HadoopJobEx job() {
         return job;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java
index 3509367..eb3113c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java
@@ -21,6 +21,7 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import org.apache.ignite.hadoop.HadoopInputSplit;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java
index 6c033b2..93a69db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterWriter.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.processors.hadoop.counter;
 
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobEx;
 
 /**
  * The object that writes some system counters to some storage for each running job. This operation is a part of
@@ -32,5 +32,5 @@ public interface HadoopCounterWriter {
      * @param cntrs Counters.
      * @throws IgniteCheckedException If failed.
      */
-    public void write(HadoopJob job, HadoopCounters cntrs) throws IgniteCheckedException;
+    public void write(HadoopJobEx job, HadoopCounters cntrs) throws IgniteCheckedException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 212e94a..02bad40 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1110,13 +1110,13 @@ org.apache.ignite.internal.processors.dr.GridDrType
 org.apache.ignite.internal.processors.dr.IgniteDrDataStreamerCacheUpdater
 org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo
 org.apache.ignite.internal.processors.hadoop.HadoopFileBlock
-org.apache.ignite.internal.processors.hadoop.HadoopInputSplit
+org.apache.ignite.hadoop.HadoopInputSplit
 org.apache.ignite.internal.processors.hadoop.HadoopJobId
 org.apache.ignite.internal.processors.hadoop.HadoopJobInfo
 org.apache.ignite.internal.processors.hadoop.HadoopJobPhase
 org.apache.ignite.internal.processors.hadoop.HadoopJobProperty
 org.apache.ignite.internal.processors.hadoop.HadoopJobStatus
-org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan
+org.apache.ignite.hadoop.HadoopMapReducePlan
 org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo
 org.apache.ignite.internal.processors.hadoop.HadoopTaskType
 org.apache.ignite.internal.processors.hadoop.message.HadoopMessage

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
index f1c1b16..1128fa4 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.hadoop.fs;
 
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobEx;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounterWriter;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
 import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils;
@@ -41,7 +41,7 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
     private volatile HadoopFileSystemCounterWriterDelegate delegate;
 
     /** {@inheritDoc} */
-    @Override public void write(HadoopJob job, HadoopCounters cntrs)
+    @Override public void write(HadoopJobEx job, HadoopCounters cntrs)
         throws IgniteCheckedException {
         delegate(job).write(job, cntrs);
     }
@@ -52,7 +52,7 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter
      * @param job Job.
      * @return Delegate.
      */
-    private HadoopFileSystemCounterWriterDelegate delegate(HadoopJob job) {
+    private HadoopFileSystemCounterWriterDelegate delegate(HadoopJobEx job) {
         HadoopFileSystemCounterWriterDelegate delegate0 = delegate;
 
         if (delegate0 == null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
index 178cdb5..bb0b47f 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
@@ -21,16 +21,16 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteFileSystem;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.hadoop.HadoopJob;
 import org.apache.ignite.igfs.IgfsBlockLocation;
 import org.apache.ignite.igfs.IgfsPath;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
 import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
+import org.apache.ignite.hadoop.HadoopInputSplit;
+import org.apache.ignite.hadoop.HadoopMapReducePlan;
 import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
-import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner;
+import org.apache.ignite.hadoop.planner.HadoopAbstractMapReducePlanner;
 import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan;
 import org.apache.ignite.internal.processors.hadoop.planner.HadoopMapReducePlanGroup;
 import org.apache.ignite.internal.processors.hadoop.planner.HadoopMapReducePlanTopology;
@@ -117,7 +117,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc
     @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> nodes,
         @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException {
         List<HadoopInputSplit> splits = HadoopCommonUtils.sortInputSplits(job.input());
-        int reducerCnt = job.info().reducers();
+        int reducerCnt = job.reducers();
 
         if (reducerCnt < 0)
             throw new IgniteCheckedException("Number of reducers must be non-negative, actual: " + reducerCnt);

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/hadoop/planner/HadoopAbstractMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/planner/HadoopAbstractMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/planner/HadoopAbstractMapReducePlanner.java
new file mode 100644
index 0000000..dd01f11
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/planner/HadoopAbstractMapReducePlanner.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.planner;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.hadoop.HadoopMapReducePlanner;
+import org.apache.ignite.internal.processors.hadoop.planner.HadoopMapReducePlanGroup;
+import org.apache.ignite.internal.processors.hadoop.planner.HadoopMapReducePlanTopology;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
+
+/**
+ * Base class for map-reduce planners.
+ */
+public abstract class HadoopAbstractMapReducePlanner implements HadoopMapReducePlanner {
+    /** Injected grid. */
+    @IgniteInstanceResource
+    protected Ignite ignite;
+
+    /** Logger. */
+    @SuppressWarnings("UnusedDeclaration")
+    @LoggerResource
+    protected IgniteLogger log;
+
+    /**
+     * Create plan topology.
+     *
+     * @param nodes Topology nodes.
+     * @return Plan topology.
+     */
+    protected static HadoopMapReducePlanTopology topology(Collection<ClusterNode> nodes) {
+        Map<String, HadoopMapReducePlanGroup> macsMap = new HashMap<>(nodes.size());
+
+        Map<UUID, HadoopMapReducePlanGroup> idToGrp = new HashMap<>(nodes.size());
+        Map<String, HadoopMapReducePlanGroup> hostToGrp = new HashMap<>(nodes.size());
+
+        for (ClusterNode node : nodes) {
+            String macs = node.attribute(ATTR_MACS);
+
+            HadoopMapReducePlanGroup grp = macsMap.get(macs);
+
+            if (grp == null) {
+                grp = new HadoopMapReducePlanGroup(node, macs);
+
+                macsMap.put(macs, grp);
+            }
+            else
+                grp.add(node);
+
+            idToGrp.put(node.id(), grp);
+
+            for (String host : node.addresses()) {
+                HadoopMapReducePlanGroup hostGrp = hostToGrp.get(host);
+
+                if (hostGrp == null)
+                    hostToGrp.put(host, grp);
+                else
+                    assert hostGrp == grp;
+            }
+        }
+
+        return new HadoopMapReducePlanTopology(new ArrayList<>(macsMap.values()), idToGrp, hostToGrp);
+    }
+
+
+    /**
+     * Groups nodes by host names.
+     *
+     * @param top Topology to group.
+     * @return Map.
+     */
+    protected static Map<String, Collection<UUID>> groupByHost(Collection<ClusterNode> top) {
+        Map<String, Collection<UUID>> grouped = U.newHashMap(top.size());
+
+        for (ClusterNode node : top) {
+            for (String host : node.hostNames()) {
+                Collection<UUID> nodeIds = grouped.get(host);
+
+                if (nodeIds == null) {
+                    // Expecting 1-2 nodes per host.
+                    nodeIds = new ArrayList<>(2);
+
+                    grouped.put(host, nodeIds);
+                }
+
+                nodeIds.add(node.id());
+            }
+        }
+
+        return grouped;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/hadoop/planner/HadoopTestRoundRobinMrPlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/planner/HadoopTestRoundRobinMrPlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/planner/HadoopTestRoundRobinMrPlanner.java
new file mode 100644
index 0000000..d9de0c1
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/planner/HadoopTestRoundRobinMrPlanner.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.planner;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.hadoop.HadoopInputSplit;
+import org.apache.ignite.hadoop.HadoopJob;
+import org.apache.ignite.hadoop.HadoopMapReducePlan;
+import org.apache.ignite.hadoop.HadoopMapReducePlanner;
+import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Round-robin mr planner.
+ */
+public class HadoopTestRoundRobinMrPlanner implements HadoopMapReducePlanner {
+    /** {@inheritDoc} */
+    @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top,
+        @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException {
+        if (top.isEmpty())
+            throw new IllegalArgumentException("Topology is empty");
+
+        // Has at least one element.
+        Iterator<ClusterNode> it = top.iterator();
+
+        Map<UUID, Collection<HadoopInputSplit>> mappers = new HashMap<>();
+
+        for (HadoopInputSplit block : job.input()) {
+            ClusterNode node = it.next();
+
+            Collection<HadoopInputSplit> nodeBlocks = mappers.get(node.id());
+
+            if (nodeBlocks == null) {
+                nodeBlocks = new ArrayList<>();
+
+                mappers.put(node.id(), nodeBlocks);
+            }
+
+            nodeBlocks.add(block);
+
+            if (!it.hasNext())
+                it = top.iterator();
+        }
+
+        int[] rdc = new int[job.reducers()];
+
+        for (int i = 0; i < rdc.length; i++)
+            rdc[i] = i;
+
+        return new HadoopDefaultMapReducePlan(mappers, Collections.singletonMap(it.next().id(), rdc));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java
index 37af147..7e74d82 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCommonUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.hadoop;
 
+import org.apache.ignite.hadoop.HadoopInputSplit;
 import org.jetbrains.annotations.Nullable;
 
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
index 4326ad2..f125485 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopContext.java
@@ -23,6 +23,8 @@ import java.util.List;
 import java.util.UUID;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.hadoop.HadoopMapReducePlan;
+import org.apache.ignite.hadoop.HadoopMapReducePlanner;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobMetadata;
 import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java
index a9b4532..7db535a 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopExternalSplit.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.hadoop;
 
+import org.apache.ignite.hadoop.HadoopInputSplit;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
 import java.io.Externalizable;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java
index fb6d0f3..fe5d434 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSplitWrapper.java
@@ -22,6 +22,7 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.Arrays;
 
+import org.apache.ignite.hadoop.HadoopInputSplit;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegate.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegate.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegate.java
index 541cf80..0d82b5b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegate.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/delegate/HadoopFileSystemCounterWriterDelegate.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.processors.hadoop.delegate;
 
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobEx;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
 
 /**
@@ -32,5 +32,5 @@ public interface HadoopFileSystemCounterWriterDelegate {
      * @param cntrs Counters.
      * @throws IgniteCheckedException If failed.
      */
-    public void write(HadoopJob job, HadoopCounters cntrs) throws IgniteCheckedException;
+    public void write(HadoopJobEx job, HadoopCounters cntrs) throws IgniteCheckedException;
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopFileSystemCounterWriterDelegateImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopFileSystemCounterWriterDelegateImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopFileSystemCounterWriterDelegateImpl.java
index d4c10da..6b36d26 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopFileSystemCounterWriterDelegateImpl.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopFileSystemCounterWriterDelegateImpl.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter;
 import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobEx;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
@@ -60,7 +60,7 @@ public class HadoopFileSystemCounterWriterDelegateImpl implements HadoopFileSyst
     }
 
     /** {@inheritDoc} */
-    public void write(HadoopJob job, HadoopCounters cntrs) throws IgniteCheckedException {
+    public void write(HadoopJobEx job, HadoopCounters cntrs) throws IgniteCheckedException {
         Configuration hadoopCfg = HadoopUtils.safeCreateConfiguration();
 
         final HadoopJobInfo jobInfo = job.info();

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
index 65ff280..cde6da6 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1MapTask.java
@@ -28,8 +28,8 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobEx;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
@@ -54,7 +54,7 @@ public class HadoopV1MapTask extends HadoopV1Task {
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
-        HadoopJob job = taskCtx.job();
+        HadoopJobEx job = taskCtx.job();
 
         HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
index 92c024e..6b90653 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1ReduceTask.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobEx;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
@@ -51,7 +51,7 @@ public class HadoopV1ReduceTask extends HadoopV1Task {
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException {
-        HadoopJob job = taskCtx.job();
+        HadoopJobEx job = taskCtx.job();
 
         HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Splitter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Splitter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Splitter.java
index 11a3598..26325b9 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Splitter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v1/HadoopV1Splitter.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.hadoop.HadoopInputSplit;
 import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
index eec0636..11f2ecc 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.hadoop.HadoopInputSplit;
 import org.apache.ignite.internal.processors.hadoop.HadoopMapperAwareTaskOutput;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
index a24e581..5d3f22d 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.split.JobSplit;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
 import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
@@ -37,8 +38,8 @@ import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo;
 import org.apache.ignite.internal.processors.hadoop.HadoopExternalSplit;
 import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
 import org.apache.ignite.internal.processors.hadoop.HadoopHelper;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobEx;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty;
@@ -85,7 +86,7 @@ import static org.apache.ignite.internal.processors.hadoop.impl.fs.HadoopFileSys
 /**
  * Hadoop job implementation for v2 API.
  */
-public class HadoopV2Job implements HadoopJob {
+public class HadoopV2Job extends HadoopJobEx {
     /** */
     private final JobConf jobConf;
 
@@ -139,6 +140,7 @@ public class HadoopV2Job implements HadoopJob {
      * @param jobInfo Job info.
      * @param log Logger.
      * @param libNames Optional additional native library names.
+     * @param helper Hadoop helper.
      */
     public HadoopV2Job(HadoopJobId jobId, final HadoopDefaultJobInfo jobInfo, IgniteLogger log,
         @Nullable String[] libNames, HadoopHelper helper) {
@@ -182,7 +184,7 @@ public class HadoopV2Job implements HadoopJob {
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<HadoopInputSplit> input() throws IgniteCheckedException {
+    @Override public Collection<HadoopInputSplit> input() {
         ClassLoader oldLdr = HadoopCommonUtils.setContextClassLoader(jobConf.getClassLoader());
 
         try {
@@ -239,6 +241,9 @@ public class HadoopV2Job implements HadoopJob {
                     throw transformException(e);
             }
         }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
         finally {
             HadoopCommonUtils.restoreContextClassLoader(oldLdr);
         }
@@ -274,7 +279,7 @@ public class HadoopV2Job implements HadoopJob {
                 fullCtxClsQueue.add(cls);
             }
 
-            Constructor<?> ctr = cls.getConstructor(HadoopTaskInfo.class, HadoopJob.class,
+            Constructor<?> ctr = cls.getConstructor(HadoopTaskInfo.class, HadoopJobEx.class,
                 HadoopJobId.class, UUID.class, DataInput.class);
 
             if (jobConfData == null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Splitter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Splitter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Splitter.java
index 667ef1e..c878515 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Splitter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Splitter.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.hadoop.HadoopInputSplit;
 import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
 import org.jetbrains.annotations.Nullable;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
index d328550..8b8a728 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
@@ -45,8 +45,8 @@ import org.apache.ignite.hadoop.io.TextPartiallyRawComparator;
 import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
 import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
 import org.apache.ignite.internal.processors.hadoop.HadoopExternalSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobEx;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty;
 import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
@@ -165,7 +165,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
      * @param locNodeId Local node ID.
      * @param jobConfDataInput DataInput for read JobConf.
      */
-    public HadoopV2TaskContext(HadoopTaskInfo taskInfo, HadoopJob job, HadoopJobId jobId,
+    public HadoopV2TaskContext(HadoopTaskInfo taskInfo, HadoopJobEx job, HadoopJobId jobId,
         @Nullable UUID locNodeId, DataInput jobConfDataInput) throws IgniteCheckedException {
         super(taskInfo, job);
         this.locNodeId = locNodeId;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
index 090b336..1035701 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
@@ -24,11 +24,11 @@ import java.io.ObjectOutput;
 import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.hadoop.HadoopInputSplit;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
+import org.apache.ignite.hadoop.HadoopMapReducePlan;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl;
 import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
index 9542372..9284c02 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
@@ -49,14 +49,14 @@ import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
 import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
 import org.apache.ignite.internal.processors.hadoop.HadoopComponent;
 import org.apache.ignite.internal.processors.hadoop.HadoopContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobEx;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner;
+import org.apache.ignite.hadoop.HadoopMapReducePlan;
+import org.apache.ignite.hadoop.HadoopMapReducePlanner;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounterWriter;
@@ -114,7 +114,7 @@ public class HadoopJobTracker extends HadoopComponent {
     private HadoopMapReducePlanner mrPlanner;
 
     /** All the known jobs. */
-    private final ConcurrentMap<HadoopJobId, GridFutureAdapter<HadoopJob>> jobs = new ConcurrentHashMap8<>();
+    private final ConcurrentMap<HadoopJobId, GridFutureAdapter<HadoopJobEx>> jobs = new ConcurrentHashMap8<>();
 
     /** Locally active jobs. */
     private final ConcurrentMap<HadoopJobId, JobLocalState> activeJobs = new ConcurrentHashMap8<>();
@@ -129,8 +129,8 @@ public class HadoopJobTracker extends HadoopComponent {
     /** Component busy lock. */
     private GridSpinReadWriteLock busyLock;
 
-    /** Class to create HadoopJob instances from. */
-    private Class<? extends HadoopJob> jobCls;
+    /** Class to create HadoopJobEx instances from. */
+    private Class<? extends HadoopJobEx> jobCls;
 
     /** Closure to check result of async transform of system cache. */
     private final IgniteInClosure<IgniteInternalFuture<?>> failsLog = new CI1<IgniteInternalFuture<?>>() {
@@ -158,7 +158,7 @@ public class HadoopJobTracker extends HadoopComponent {
         HadoopClassLoader ldr = ctx.kernalContext().hadoopHelper().commonClassLoader();
 
         try {
-            jobCls = (Class<HadoopJob>)ldr.loadClass(HadoopCommonUtils.JOB_CLS_NAME);
+            jobCls = (Class<HadoopJobEx>)ldr.loadClass(HadoopCommonUtils.JOB_CLS_NAME);
         }
         catch (Exception ioe) {
             throw new IgniteCheckedException("Failed to load job class [class=" +
@@ -310,7 +310,7 @@ public class HadoopJobTracker extends HadoopComponent {
             if (jobs.containsKey(jobId) || jobMetaCache().containsKey(jobId))
                 throw new IgniteCheckedException("Failed to submit job. Job with the same ID already exists: " + jobId);
 
-            HadoopJob job = job(jobId, info);
+            HadoopJobEx job = job(jobId, info);
 
             HadoopMapReducePlan mrPlan = mrPlanner.preparePlan(job, ctx.nodes(), null);
 
@@ -692,7 +692,7 @@ public class HadoopJobTracker extends HadoopComponent {
                 try {
                     if (checkSetup && phase == PHASE_SETUP && !activeJobs.containsKey(jobId)) {
                         // Failover setup task.
-                        HadoopJob job = job(jobId, meta.jobInfo());
+                        HadoopJobEx job = job(jobId, meta.jobInfo());
 
                         Collection<HadoopTaskInfo> setupTask = setupTask(jobId);
 
@@ -818,7 +818,7 @@ public class HadoopJobTracker extends HadoopComponent {
         throws IgniteCheckedException {
         JobLocalState state = activeJobs.get(jobId);
 
-        HadoopJob job = job(jobId, meta.jobInfo());
+        HadoopJobEx job = job(jobId, meta.jobInfo());
 
         HadoopMapReducePlan plan = meta.mapReducePlan();
 
@@ -1048,7 +1048,7 @@ public class HadoopJobTracker extends HadoopComponent {
      * @param job Job instance.
      * @return Collection of task infos.
      */
-    private Collection<HadoopTaskInfo> reducerTasks(int[] reducers, HadoopJob job) {
+    private Collection<HadoopTaskInfo> reducerTasks(int[] reducers, HadoopJobEx job) {
         UUID locNodeId = ctx.localNodeId();
         HadoopJobId jobId = job.id();
 
@@ -1097,15 +1097,15 @@ public class HadoopJobTracker extends HadoopComponent {
      * @return Job.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable public HadoopJob job(HadoopJobId jobId, @Nullable HadoopJobInfo jobInfo) throws IgniteCheckedException {
-        GridFutureAdapter<HadoopJob> fut = jobs.get(jobId);
+    @Nullable public HadoopJobEx job(HadoopJobId jobId, @Nullable HadoopJobInfo jobInfo) throws IgniteCheckedException {
+        GridFutureAdapter<HadoopJobEx> fut = jobs.get(jobId);
 
-        if (fut != null || (fut = jobs.putIfAbsent(jobId, new GridFutureAdapter<HadoopJob>())) != null)
+        if (fut != null || (fut = jobs.putIfAbsent(jobId, new GridFutureAdapter<HadoopJobEx>())) != null)
             return fut.get();
 
         fut = jobs.get(jobId);
 
-        HadoopJob job = null;
+        HadoopJobEx job = null;
 
         try {
             if (jobInfo == null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/d14e0727/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java
deleted file mode 100644
index f01f72b..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.planner;
-
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.resources.IgniteInstanceResource;
-import org.apache.ignite.resources.LoggerResource;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
-
-/**
- * Base class for map-reduce planners.
- */
-public abstract class HadoopAbstractMapReducePlanner implements HadoopMapReducePlanner {
-    /** Injected grid. */
-    @IgniteInstanceResource
-    protected Ignite ignite;
-
-    /** Logger. */
-    @SuppressWarnings("UnusedDeclaration")
-    @LoggerResource
-    protected IgniteLogger log;
-
-    /**
-     * Create plan topology.
-     *
-     * @param nodes Topology nodes.
-     * @return Plan topology.
-     */
-    protected static HadoopMapReducePlanTopology topology(Collection<ClusterNode> nodes) {
-        Map<String, HadoopMapReducePlanGroup> macsMap = new HashMap<>(nodes.size());
-
-        Map<UUID, HadoopMapReducePlanGroup> idToGrp = new HashMap<>(nodes.size());
-        Map<String, HadoopMapReducePlanGroup> hostToGrp = new HashMap<>(nodes.size());
-
-        for (ClusterNode node : nodes) {
-            String macs = node.attribute(ATTR_MACS);
-
-            HadoopMapReducePlanGroup grp = macsMap.get(macs);
-
-            if (grp == null) {
-                grp = new HadoopMapReducePlanGroup(node, macs);
-
-                macsMap.put(macs, grp);
-            }
-            else
-                grp.add(node);
-
-            idToGrp.put(node.id(), grp);
-
-            for (String host : node.addresses()) {
-                HadoopMapReducePlanGroup hostGrp = hostToGrp.get(host);
-
-                if (hostGrp == null)
-                    hostToGrp.put(host, grp);
-                else
-                    assert hostGrp == grp;
-            }
-        }
-
-        return new HadoopMapReducePlanTopology(new ArrayList<>(macsMap.values()), idToGrp, hostToGrp);
-    }
-
-
-    /**
-     * Groups nodes by host names.
-     *
-     * @param top Topology to group.
-     * @return Map.
-     */
-    protected static Map<String, Collection<UUID>> groupByHost(Collection<ClusterNode> top) {
-        Map<String, Collection<UUID>> grouped = U.newHashMap(top.size());
-
-        for (ClusterNode node : top) {
-            for (String host : node.hostNames()) {
-                Collection<UUID> nodeIds = grouped.get(host);
-
-                if (nodeIds == null) {
-                    // Expecting 1-2 nodes per host.
-                    nodeIds = new ArrayList<>(2);
-
-                    grouped.put(host, nodeIds);
-                }
-
-                nodeIds.add(node.id());
-            }
-        }
-
-        return grouped;
-    }
-}