You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/03/03 14:08:18 UTC

[02/31] incubator-ignite git commit: # IGNITE-386: WIP on internal namings.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java
new file mode 100644
index 0000000..f896daa
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+
+/**
+ * Task executor.
+ */
+public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter {
+    /** Job tracker. */
+    private HadoopJobTracker jobTracker;
+
+    /** */
+    private final ConcurrentMap<GridHadoopJobId, Collection<GridHadoopRunnableTask>> jobs = new ConcurrentHashMap<>();
+
+    /** Executor service to run tasks. */
+    private GridHadoopExecutorService exec;
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStart() throws IgniteCheckedException {
+        super.onKernalStart();
+
+        jobTracker = ctx.jobTracker();
+
+        exec = new GridHadoopExecutorService(log, ctx.kernalContext().gridName(),
+            ctx.configuration().getMaxParallelTasks(), ctx.configuration().getMaxTaskQueueSize());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        if (exec != null) {
+            exec.shutdown(3000);
+
+            if (cancel) {
+                for (GridHadoopJobId jobId : jobs.keySet())
+                    cancelTasks(jobId);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) {
+        if (exec != null && !exec.shutdown(30000))
+            U.warn(log, "Failed to finish running tasks in 30 sec.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void run(final GridHadoopJob job, Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException {
+        if (log.isDebugEnabled())
+            log.debug("Submitting tasks for local execution [locNodeId=" + ctx.localNodeId() +
+                ", tasksCnt=" + tasks.size() + ']');
+
+        Collection<GridHadoopRunnableTask> executedTasks = jobs.get(job.id());
+
+        if (executedTasks == null) {
+            executedTasks = new GridConcurrentHashSet<>();
+
+            Collection<GridHadoopRunnableTask> extractedCol = jobs.put(job.id(), executedTasks);
+
+            assert extractedCol == null;
+        }
+
+        final Collection<GridHadoopRunnableTask> finalExecutedTasks = executedTasks;
+
+        for (final GridHadoopTaskInfo info : tasks) {
+            assert info != null;
+
+            GridHadoopRunnableTask task = new GridHadoopRunnableTask(log, job, ctx.shuffle().memory(), info,
+                ctx.localNodeId()) {
+                @Override protected void onTaskFinished(GridHadoopTaskStatus status) {
+                    if (log.isDebugEnabled())
+                        log.debug("Finished task execution [jobId=" + job.id() + ", taskInfo=" + info + ", " +
+                            "waitTime=" + waitTime() + ", execTime=" + executionTime() + ']');
+
+                    finalExecutedTasks.remove(this);
+
+                    jobTracker.onTaskFinished(info, status);
+                }
+
+                @Override protected GridHadoopTaskInput createInput(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
+                    return ctx.shuffle().input(taskCtx);
+                }
+
+                @Override protected GridHadoopTaskOutput createOutput(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
+                    return ctx.shuffle().output(taskCtx);
+                }
+            };
+
+            executedTasks.add(task);
+
+            exec.submit(task);
+        }
+    }
+
+    /**
+     * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks
+     * for this job ID.
+     * <p>
+     * It is guaranteed that this method will not be called concurrently with
+     * {@link #run(GridHadoopJob, Collection)} method. No more job submissions will be performed via
+     * {@link #run(GridHadoopJob, Collection)} method for given job ID after this method is called.
+     *
+     * @param jobId Job ID to cancel.
+     */
+    @Override public void cancelTasks(GridHadoopJobId jobId) {
+        Collection<GridHadoopRunnableTask> executedTasks = jobs.get(jobId);
+
+        if (executedTasks != null) {
+            for (GridHadoopRunnableTask task : executedTasks)
+                task.cancel();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onJobStateChanged(GridHadoopJobMetadata meta) throws IgniteCheckedException {
+        if (meta.phase() == GridHadoopJobPhase.PHASE_COMPLETE) {
+            Collection<GridHadoopRunnableTask> executedTasks = jobs.remove(meta.jobId());
+
+            assert executedTasks == null || executedTasks.isEmpty();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java
new file mode 100644
index 0000000..a3d3bf7
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
+
+import java.util.*;
+
+/**
+ * Common superclass for task executor.
+ */
+public abstract class HadoopTaskExecutorAdapter extends HadoopComponent {
+    /**
+     * Runs tasks.
+     *
+     * @param job Job.
+     * @param tasks Tasks.
+     * @throws IgniteCheckedException If failed.
+     */
+    public abstract void run(final GridHadoopJob job, Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException;
+
+    /**
+     * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks
+     * for this job ID.
+     * <p>
+     * It is guaranteed that this method will not be called concurrently with
+     * {@link #run(GridHadoopJob, Collection)} method. No more job submissions will be performed via
+     * {@link #run(GridHadoopJob, Collection)} method for given job ID after this method is called.
+     *
+     * @param jobId Job ID to cancel.
+     */
+    public abstract void cancelTasks(GridHadoopJobId jobId) throws IgniteCheckedException;
+
+    /**
+     * On job state change callback;
+     *
+     * @param meta Job metadata.
+     */
+    public abstract void onJobStateChanged(GridHadoopJobMetadata meta) throws IgniteCheckedException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopExternalTaskExecutor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopExternalTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopExternalTaskExecutor.java
deleted file mode 100644
index 72185c0..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopExternalTaskExecutor.java
+++ /dev/null
@@ -1,960 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
-import org.apache.ignite.internal.processors.hadoop.message.*;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.*;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child.*;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.spi.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.locks.*;
-
-import static org.apache.ignite.internal.processors.hadoop.taskexecutor.GridHadoopTaskState.*;
-
-/**
- * External process registry. Handles external process lifecycle.
- */
-public class GridHadoopExternalTaskExecutor extends GridHadoopTaskExecutorAdapter {
-    /** Hadoop context. */
-    private GridHadoopContext ctx;
-
-    /** */
-    private String javaCmd;
-
-    /** Logger. */
-    private IgniteLogger log;
-
-    /** Node process descriptor. */
-    private GridHadoopProcessDescriptor nodeDesc;
-
-    /** Output base. */
-    private File outputBase;
-
-    /** Path separator. */
-    private String pathSep;
-
-    /** Hadoop external communication. */
-    private GridHadoopExternalCommunication comm;
-
-    /** Starting processes. */
-    private final ConcurrentMap<UUID, HadoopProcess> runningProcsByProcId = new ConcurrentHashMap8<>();
-
-    /** Starting processes. */
-    private final ConcurrentMap<GridHadoopJobId, HadoopProcess> runningProcsByJobId = new ConcurrentHashMap8<>();
-
-    /** Busy lock. */
-    private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock();
-
-    /** Job tracker. */
-    private GridHadoopJobTracker jobTracker;
-
-    /** {@inheritDoc} */
-    @Override public void start(GridHadoopContext ctx) throws IgniteCheckedException {
-        this.ctx = ctx;
-
-        log = ctx.kernalContext().log(GridHadoopExternalTaskExecutor.class);
-
-        outputBase = U.resolveWorkDirectory("hadoop", false);
-
-        pathSep = System.getProperty("path.separator", U.isWindows() ? ";" : ":");
-
-        initJavaCommand();
-
-        comm = new GridHadoopExternalCommunication(
-            ctx.localNodeId(),
-            UUID.randomUUID(),
-            ctx.kernalContext().config().getMarshaller(),
-            log,
-            ctx.kernalContext().getSystemExecutorService(),
-            ctx.kernalContext().gridName());
-
-        comm.setListener(new MessageListener());
-
-        comm.start();
-
-        nodeDesc = comm.localProcessDescriptor();
-
-        ctx.kernalContext().ports().registerPort(nodeDesc.tcpPort(), IgnitePortProtocol.TCP,
-            GridHadoopExternalTaskExecutor.class);
-
-        if (nodeDesc.sharedMemoryPort() != -1)
-            ctx.kernalContext().ports().registerPort(nodeDesc.sharedMemoryPort(), IgnitePortProtocol.TCP,
-                GridHadoopExternalTaskExecutor.class);
-
-        jobTracker = ctx.jobTracker();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void stop(boolean cancel) {
-        busyLock.writeLock();
-
-        try {
-            comm.stop();
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to gracefully stop external hadoop communication server (will shutdown anyway)", e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onJobStateChanged(final GridHadoopJobMetadata meta) {
-        final HadoopProcess proc = runningProcsByJobId.get(meta.jobId());
-
-        // If we have a local process for this job.
-        if (proc != null) {
-            if (log.isDebugEnabled())
-                log.debug("Updating job information for remote task process [proc=" + proc + ", meta=" + meta + ']');
-
-            if (meta.phase() == GridHadoopJobPhase.PHASE_COMPLETE) {
-                if (log.isDebugEnabled())
-                    log.debug("Completed job execution, will terminate child process [jobId=" + meta.jobId() +
-                        ", proc=" + proc + ']');
-
-                runningProcsByJobId.remove(meta.jobId());
-                runningProcsByProcId.remove(proc.descriptor().processId());
-
-                proc.terminate();
-
-                return;
-            }
-
-            if (proc.initFut.isDone()) {
-                if (!proc.initFut.isFailed())
-                    sendJobInfoUpdate(proc, meta);
-                else if (log.isDebugEnabled())
-                    log.debug("Failed to initialize child process (will skip job state notification) " +
-                        "[jobId=" + meta.jobId() + ", meta=" + meta + ']');
-            }
-            else {
-                proc.initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>>>() {
-                    @Override public void apply(IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>> f) {
-                        try {
-                            f.get();
-
-                            sendJobInfoUpdate(proc, meta);
-                        }
-                        catch (IgniteCheckedException e) {
-                            if (log.isDebugEnabled())
-                                log.debug("Failed to initialize child process (will skip job state notification) " +
-                                    "[jobId=" + meta.jobId() + ", meta=" + meta + ", err=" + e + ']');
-                        }
-
-                    }
-                });
-            }
-        }
-        else if (ctx.isParticipating(meta)) {
-            GridHadoopJob job;
-
-            try {
-                job = jobTracker.job(meta.jobId(), meta.jobInfo());
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to get job: " + meta.jobId(), e);
-
-                return;
-            }
-
-            startProcess(job, meta.mapReducePlan());
-        }
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("ConstantConditions")
-    @Override public void run(final GridHadoopJob job, final Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException {
-        if (!busyLock.tryReadLock()) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to start hadoop tasks (grid is stopping, will ignore).");
-
-            return;
-        }
-
-        try {
-            HadoopProcess proc = runningProcsByJobId.get(job.id());
-
-            GridHadoopTaskType taskType = F.first(tasks).type();
-
-            if (taskType == GridHadoopTaskType.SETUP || taskType == GridHadoopTaskType.ABORT ||
-                taskType == GridHadoopTaskType.COMMIT) {
-                if (proc == null || proc.terminated()) {
-                    runningProcsByJobId.remove(job.id(), proc);
-
-                    // Start new process for ABORT task since previous processes were killed.
-                    proc = startProcess(job, jobTracker.plan(job.id()));
-
-                    if (log.isDebugEnabled())
-                        log.debug("Starting new process for maintenance task [jobId=" + job.id() +
-                            ", proc=" + proc + ", taskType=" + taskType + ']');
-                }
-            }
-            else
-                assert proc != null : "Missing started process for task execution request: " + job.id() +
-                    ", tasks=" + tasks;
-
-            final HadoopProcess proc0 = proc;
-
-            proc.initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>>>() {
-                @Override public void apply(
-                    IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>> f) {
-                    if (!busyLock.tryReadLock())
-                        return;
-
-                    try {
-                        f.get();
-
-                        proc0.addTasks(tasks);
-
-                        if (log.isDebugEnabled())
-                            log.debug("Sending task execution request to child process [jobId=" + job.id() +
-                                ", proc=" + proc0 + ", tasks=" + tasks + ']');
-
-                        sendExecutionRequest(proc0, job, tasks);
-                    }
-                    catch (IgniteCheckedException e) {
-                        notifyTasksFailed(tasks, FAILED, e);
-                    }
-                    finally {
-                        busyLock.readUnlock();
-                    }
-                }
-            });
-        }
-        finally {
-            busyLock.readUnlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void cancelTasks(GridHadoopJobId jobId) {
-        HadoopProcess proc = runningProcsByJobId.get(jobId);
-
-        if (proc != null)
-            proc.terminate();
-    }
-
-    /**
-     * Sends execution request to remote node.
-     *
-     * @param proc Process to send request to.
-     * @param job Job instance.
-     * @param tasks Collection of tasks to execute in started process.
-     */
-    private void sendExecutionRequest(HadoopProcess proc, GridHadoopJob job, Collection<GridHadoopTaskInfo> tasks)
-        throws IgniteCheckedException {
-        // Must synchronize since concurrent process crash may happen and will receive onConnectionLost().
-        proc.lock();
-
-        try {
-            if (proc.terminated()) {
-                notifyTasksFailed(tasks, CRASHED, null);
-
-                return;
-            }
-
-            GridHadoopTaskExecutionRequest req = new GridHadoopTaskExecutionRequest();
-
-            req.jobId(job.id());
-            req.jobInfo(job.info());
-            req.tasks(tasks);
-
-            comm.sendMessage(proc.descriptor(), req);
-        }
-        finally {
-            proc.unlock();
-        }
-    }
-
-    /**
-     * @return External task metadata.
-     */
-    private GridHadoopExternalTaskMetadata buildTaskMeta() {
-        GridHadoopExternalTaskMetadata meta = new GridHadoopExternalTaskMetadata();
-
-        meta.classpath(Arrays.asList(System.getProperty("java.class.path").split(File.pathSeparator)));
-        meta.jvmOptions(Arrays.asList("-Xmx1g", "-ea", "-XX:+UseConcMarkSweepGC", "-XX:+CMSClassUnloadingEnabled",
-            "-DIGNITE_HOME=" + U.getIgniteHome()));
-
-        return meta;
-    }
-
-    /**
-     * @param tasks Tasks to notify about.
-     * @param state Fail state.
-     * @param e Optional error.
-     */
-    private void notifyTasksFailed(Iterable<GridHadoopTaskInfo> tasks, GridHadoopTaskState state, Throwable e) {
-        GridHadoopTaskStatus fail = new GridHadoopTaskStatus(state, e);
-
-        for (GridHadoopTaskInfo task : tasks)
-            jobTracker.onTaskFinished(task, fail);
-    }
-
-    /**
-     * Starts process template that will be ready to execute Hadoop tasks.
-     *
-     * @param job Job instance.
-     * @param plan Map reduce plan.
-     */
-    private HadoopProcess startProcess(final GridHadoopJob job, final GridHadoopMapReducePlan plan) {
-        final UUID childProcId = UUID.randomUUID();
-
-        GridHadoopJobId jobId = job.id();
-
-        final GridHadoopProcessFuture fut = new GridHadoopProcessFuture(childProcId, jobId, ctx.kernalContext());
-
-        final HadoopProcess proc = new HadoopProcess(jobId, fut, plan.reducers(ctx.localNodeId()));
-
-        HadoopProcess old = runningProcsByJobId.put(jobId, proc);
-
-        assert old == null;
-
-        old = runningProcsByProcId.put(childProcId, proc);
-
-        assert old == null;
-
-        ctx.kernalContext().closure().runLocalSafe(new Runnable() {
-            @Override public void run() {
-                if (!busyLock.tryReadLock()) {
-                    fut.onDone(new IgniteCheckedException("Failed to start external process (grid is stopping)."));
-
-                    return;
-                }
-
-                try {
-                    GridHadoopExternalTaskMetadata startMeta = buildTaskMeta();
-
-                    if (log.isDebugEnabled())
-                        log.debug("Created hadoop child process metadata for job [job=" + job +
-                            ", childProcId=" + childProcId + ", taskMeta=" + startMeta + ']');
-
-                    Process proc = startJavaProcess(childProcId, startMeta, job);
-
-                    BufferedReader rdr = new BufferedReader(new InputStreamReader(proc.getInputStream()));
-
-                    String line;
-
-                    // Read up all the process output.
-                    while ((line = rdr.readLine()) != null) {
-                        if (log.isDebugEnabled())
-                            log.debug("Tracing process output: " + line);
-
-                        if ("Started".equals(line)) {
-                            // Process started successfully, it should not write anything more to the output stream.
-                            if (log.isDebugEnabled())
-                                log.debug("Successfully started child process [childProcId=" + childProcId +
-                                    ", meta=" + job + ']');
-
-                            fut.onProcessStarted(proc);
-
-                            break;
-                        }
-                        else if ("Failed".equals(line)) {
-                            StringBuilder sb = new StringBuilder("Failed to start child process: " + job + "\n");
-
-                            while ((line = rdr.readLine()) != null)
-                                sb.append("    ").append(line).append("\n");
-
-                            // Cut last character.
-                            sb.setLength(sb.length() - 1);
-
-                            log.warning(sb.toString());
-
-                            fut.onDone(new IgniteCheckedException(sb.toString()));
-
-                            break;
-                        }
-                    }
-                }
-                catch (Throwable e) {
-                    fut.onDone(new IgniteCheckedException("Failed to initialize child process: " + job, e));
-                }
-                finally {
-                    busyLock.readUnlock();
-                }
-            }
-        }, true);
-
-        fut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>>>() {
-            @Override public void apply(IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>> f) {
-                try {
-                    // Make sure there were no exceptions.
-                    f.get();
-
-                    prepareForJob(proc, job, plan);
-                }
-                catch (IgniteCheckedException ignore) {
-                    // Exception is printed in future's onDone() method.
-                }
-            }
-        });
-
-        return proc;
-    }
-
-    /**
-     * Checks that java local command is available.
-     *
-     * @throws IgniteCheckedException If initialization failed.
-     */
-    private void initJavaCommand() throws IgniteCheckedException {
-        String javaHome = System.getProperty("java.home");
-
-        if (javaHome == null)
-            javaHome = System.getenv("JAVA_HOME");
-
-        if (javaHome == null)
-            throw new IgniteCheckedException("Failed to locate JAVA_HOME.");
-
-        javaCmd = javaHome + File.separator + "bin" + File.separator + (U.isWindows() ? "java.exe" : "java");
-
-        try {
-            Process proc = new ProcessBuilder(javaCmd, "-version").redirectErrorStream(true).start();
-
-            Collection<String> out = readProcessOutput(proc);
-
-            int res = proc.waitFor();
-
-            if (res != 0)
-                throw new IgniteCheckedException("Failed to execute 'java -version' command (process finished with nonzero " +
-                    "code) [exitCode=" + res + ", javaCmd='" + javaCmd + "', msg=" + F.first(out) + ']');
-
-            if (log.isInfoEnabled()) {
-                log.info("Will use java for external task execution: ");
-
-                for (String s : out)
-                    log.info("    " + s);
-            }
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException("Failed to check java for external task execution.", e);
-        }
-        catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-
-            throw new IgniteCheckedException("Failed to wait for process completion (thread got interrupted).", e);
-        }
-    }
-
-    /**
-     * Reads process output line-by-line.
-     *
-     * @param proc Process to read output.
-     * @return Read lines.
-     * @throws IOException If read failed.
-     */
-    private Collection<String> readProcessOutput(Process proc) throws IOException {
-        BufferedReader rdr = new BufferedReader(new InputStreamReader(proc.getInputStream()));
-
-        Collection<String> res = new ArrayList<>();
-
-        String s;
-
-        while ((s = rdr.readLine()) != null)
-            res.add(s);
-
-        return res;
-    }
-
-    /**
-     * Builds process from metadata.
-     *
-     * @param childProcId Child process ID.
-     * @param startMeta Metadata.
-     * @param job Job.
-     * @return Started process.
-     */
-    private Process startJavaProcess(UUID childProcId, GridHadoopExternalTaskMetadata startMeta,
-        GridHadoopJob job) throws Exception {
-        String outFldr = jobWorkFolder(job.id()) + File.separator + childProcId;
-
-        if (log.isDebugEnabled())
-            log.debug("Will write process log output to: " + outFldr);
-
-        List<String> cmd = new ArrayList<>();
-
-        File workDir = U.resolveWorkDirectory("", false);
-
-        cmd.add(javaCmd);
-        cmd.addAll(startMeta.jvmOptions());
-        cmd.add("-cp");
-        cmd.add(buildClasspath(startMeta.classpath()));
-        cmd.add(GridHadoopExternalProcessStarter.class.getName());
-        cmd.add("-cpid");
-        cmd.add(String.valueOf(childProcId));
-        cmd.add("-ppid");
-        cmd.add(String.valueOf(nodeDesc.processId()));
-        cmd.add("-nid");
-        cmd.add(String.valueOf(nodeDesc.parentNodeId()));
-        cmd.add("-addr");
-        cmd.add(nodeDesc.address());
-        cmd.add("-tport");
-        cmd.add(String.valueOf(nodeDesc.tcpPort()));
-        cmd.add("-sport");
-        cmd.add(String.valueOf(nodeDesc.sharedMemoryPort()));
-        cmd.add("-out");
-        cmd.add(outFldr);
-        cmd.add("-wd");
-        cmd.add(workDir.getAbsolutePath());
-
-        return new ProcessBuilder(cmd)
-            .redirectErrorStream(true)
-            .directory(workDir)
-            .start();
-    }
-
-    /**
-     * Gets job work folder.
-     *
-     * @param jobId Job ID.
-     * @return Job work folder.
-     */
-    private String jobWorkFolder(GridHadoopJobId jobId) {
-        return outputBase + File.separator + "Job_" + jobId;
-    }
-
-    /**
-     * @param cp Classpath collection.
-     * @return Classpath string.
-     */
-    private String buildClasspath(Collection<String> cp) {
-        assert !cp.isEmpty();
-
-        StringBuilder sb = new StringBuilder();
-
-        for (String s : cp)
-            sb.append(s).append(pathSep);
-
-        sb.setLength(sb.length() - 1);
-
-        return sb.toString();
-    }
-
-    /**
-     * Sends job info update request to remote process.
-     *
-     * @param proc Process to send request to.
-     * @param meta Job metadata.
-     */
-    private void sendJobInfoUpdate(HadoopProcess proc, GridHadoopJobMetadata meta) {
-        Map<Integer, GridHadoopProcessDescriptor> rdcAddrs = meta.reducersAddresses();
-
-        int rdcNum = meta.mapReducePlan().reducers();
-
-        GridHadoopProcessDescriptor[] addrs = null;
-
-        if (rdcAddrs != null && rdcAddrs.size() == rdcNum) {
-            addrs = new GridHadoopProcessDescriptor[rdcNum];
-
-            for (int i = 0; i < rdcNum; i++) {
-                GridHadoopProcessDescriptor desc = rdcAddrs.get(i);
-
-                assert desc != null : "Missing reducing address [meta=" + meta + ", rdc=" + i + ']';
-
-                addrs[i] = desc;
-            }
-        }
-
-        try {
-            comm.sendMessage(proc.descriptor(), new GridHadoopJobInfoUpdateRequest(proc.jobId, meta.phase(), addrs));
-        }
-        catch (IgniteCheckedException e) {
-            if (!proc.terminated()) {
-                log.error("Failed to send job state update message to remote child process (will kill the process) " +
-                    "[jobId=" + proc.jobId + ", meta=" + meta + ']', e);
-
-                proc.terminate();
-            }
-        }
-    }
-
-    /**
-     * Sends prepare request to remote process.
-     *
-     * @param proc Process to send request to.
-     * @param job Job.
-     * @param plan Map reduce plan.
-     */
-    private void prepareForJob(HadoopProcess proc, GridHadoopJob job, GridHadoopMapReducePlan plan) {
-        try {
-            comm.sendMessage(proc.descriptor(), new GridHadoopPrepareForJobRequest(job.id(), job.info(),
-                plan.reducers(), plan.reducers(ctx.localNodeId())));
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send job prepare request to remote process [proc=" + proc + ", job=" + job +
-                ", plan=" + plan + ']', e);
-
-            proc.terminate();
-        }
-    }
-
-    /**
-     * Processes task finished message.
-     *
-     * @param desc Remote process descriptor.
-     * @param taskMsg Task finished message.
-     */
-    private void processTaskFinishedMessage(GridHadoopProcessDescriptor desc, GridHadoopTaskFinishedMessage taskMsg) {
-        HadoopProcess proc = runningProcsByProcId.get(desc.processId());
-
-        if (proc != null)
-            proc.removeTask(taskMsg.taskInfo());
-
-        jobTracker.onTaskFinished(taskMsg.taskInfo(), taskMsg.status());
-    }
-
-    /**
-     *
-     */
-    private class MessageListener implements GridHadoopMessageListener {
-        /** {@inheritDoc} */
-        @Override public void onMessageReceived(GridHadoopProcessDescriptor desc, GridHadoopMessage msg) {
-            if (!busyLock.tryReadLock())
-                return;
-
-            try {
-                if (msg instanceof GridHadoopProcessStartedAck) {
-                    HadoopProcess proc = runningProcsByProcId.get(desc.processId());
-
-                    assert proc != null : "Missing child process for processId: " + desc;
-
-                    GridHadoopProcessFuture fut = proc.initFut;
-
-                    if (fut != null)
-                        fut.onReplyReceived(desc);
-                    // Safety.
-                    else
-                        log.warning("Failed to find process start future (will ignore): " + desc);
-                }
-                else if (msg instanceof GridHadoopTaskFinishedMessage) {
-                    GridHadoopTaskFinishedMessage taskMsg = (GridHadoopTaskFinishedMessage)msg;
-
-                    processTaskFinishedMessage(desc, taskMsg);
-                }
-                else
-                    log.warning("Unexpected message received by node [desc=" + desc + ", msg=" + msg + ']');
-            }
-            finally {
-                busyLock.readUnlock();
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onConnectionLost(GridHadoopProcessDescriptor desc) {
-            if (!busyLock.tryReadLock())
-                return;
-
-            try {
-                if (desc == null) {
-                    U.warn(log, "Handshake failed.");
-
-                    return;
-                }
-
-                // Notify job tracker about failed tasks.
-                HadoopProcess proc = runningProcsByProcId.get(desc.processId());
-
-                if (proc != null) {
-                    Collection<GridHadoopTaskInfo> tasks = proc.tasks();
-
-                    if (!F.isEmpty(tasks)) {
-                        log.warning("Lost connection with alive process (will terminate): " + desc);
-
-                        GridHadoopTaskStatus status = new GridHadoopTaskStatus(CRASHED,
-                            new IgniteCheckedException("Failed to run tasks (external process finished unexpectedly): " + desc));
-
-                        for (GridHadoopTaskInfo info : tasks)
-                            jobTracker.onTaskFinished(info, status);
-
-                        runningProcsByJobId.remove(proc.jobId(), proc);
-                    }
-
-                    // Safety.
-                    proc.terminate();
-                }
-            }
-            finally {
-                busyLock.readUnlock();
-            }
-        }
-    }
-
-    /**
-     * Hadoop process.
-     */
-    private static class HadoopProcess extends ReentrantLock {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Job ID. */
-        private final GridHadoopJobId jobId;
-
-        /** Process. */
-        private Process proc;
-
-        /** Init future. Completes when process is ready to receive messages. */
-        private final GridHadoopProcessFuture initFut;
-
-        /** Process descriptor. */
-        private GridHadoopProcessDescriptor procDesc;
-
-        /** Reducers planned for this process. */
-        private Collection<Integer> reducers;
-
-        /** Tasks. */
-        private final Collection<GridHadoopTaskInfo> tasks = new ConcurrentLinkedDeque8<>();
-
-        /** Terminated flag. */
-        private volatile boolean terminated;
-
-        /**
-         * @param jobId Job ID.
-         * @param initFut Init future.
-         */
-        private HadoopProcess(GridHadoopJobId jobId, GridHadoopProcessFuture initFut,
-            int[] reducers) {
-            this.jobId = jobId;
-            this.initFut = initFut;
-
-            if (!F.isEmpty(reducers)) {
-                this.reducers = new ArrayList<>(reducers.length);
-
-                for (int r : reducers)
-                    this.reducers.add(r);
-            }
-        }
-
-        /**
-         * @return Communication process descriptor.
-         */
-        private GridHadoopProcessDescriptor descriptor() {
-            return procDesc;
-        }
-
-        /**
-         * @return Job ID.
-         */
-        public GridHadoopJobId jobId() {
-            return jobId;
-        }
-
-        /**
-         * Initialized callback.
-         *
-         * @param proc Java process representation.
-         * @param procDesc Process descriptor.
-         */
-        private void onInitialized(Process proc, GridHadoopProcessDescriptor procDesc) {
-            this.proc = proc;
-            this.procDesc = procDesc;
-        }
-
-        /**
-         * Terminates process (kills it).
-         */
-        private void terminate() {
-            // Guard against concurrent message sending.
-            lock();
-
-            try {
-                terminated = true;
-
-                if (!initFut.isDone())
-                    initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>>>() {
-                        @Override public void apply(
-                            IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>> f) {
-                            proc.destroy();
-                        }
-                    });
-                else
-                    proc.destroy();
-            }
-            finally {
-                unlock();
-            }
-        }
-
-        /**
-         * @return Terminated flag.
-         */
-        private boolean terminated() {
-            return terminated;
-        }
-
-        /**
-         * Sets process tasks.
-         *
-         * @param tasks Tasks to set.
-         */
-        private void addTasks(Collection<GridHadoopTaskInfo> tasks) {
-            this.tasks.addAll(tasks);
-        }
-
-        /**
-         * Removes task when it was completed.
-         *
-         * @param task Task to remove.
-         */
-        private void removeTask(GridHadoopTaskInfo task) {
-            if (tasks != null)
-                tasks.remove(task);
-        }
-
-        /**
-         * @return Collection of tasks.
-         */
-        private Collection<GridHadoopTaskInfo> tasks() {
-            return tasks;
-        }
-
-        /**
-         * @return Planned reducers.
-         */
-        private Collection<Integer> reducers() {
-            return reducers;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(HadoopProcess.class, this);
-        }
-    }
-
-    /**
-     *
-     */
-    private class GridHadoopProcessFuture extends GridFutureAdapter<IgniteBiTuple<Process, GridHadoopProcessDescriptor>> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Child process ID. */
-        private UUID childProcId;
-
-        /** Job ID. */
-        private GridHadoopJobId jobId;
-
-        /** Process descriptor. */
-        private GridHadoopProcessDescriptor desc;
-
-        /** Running process. */
-        private Process proc;
-
-        /** Process started flag. */
-        private volatile boolean procStarted;
-
-        /** Reply received flag. */
-        private volatile boolean replyReceived;
-
-        /** Logger. */
-        private final IgniteLogger log = GridHadoopExternalTaskExecutor.this.log;
-
-        /**
-         * Empty constructor.
-         */
-        public GridHadoopProcessFuture() {
-            // No-op.
-        }
-
-        /**
-         * @param ctx Kernal context.
-         */
-        private GridHadoopProcessFuture(UUID childProcId, GridHadoopJobId jobId, GridKernalContext ctx) {
-            super(ctx);
-
-            this.childProcId = childProcId;
-            this.jobId = jobId;
-        }
-
-        /**
-         * Process started callback.
-         */
-        public void onProcessStarted(Process proc) {
-            this.proc = proc;
-
-            procStarted = true;
-
-            if (procStarted && replyReceived)
-                onDone(F.t(proc, desc));
-        }
-
-        /**
-         * Reply received callback.
-         */
-        public void onReplyReceived(GridHadoopProcessDescriptor desc) {
-            assert childProcId.equals(desc.processId());
-
-            this.desc = desc;
-
-            replyReceived = true;
-
-            if (procStarted && replyReceived)
-                onDone(F.t(proc, desc));
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean onDone(@Nullable IgniteBiTuple<Process, GridHadoopProcessDescriptor> res,
-            @Nullable Throwable err) {
-            if (err == null) {
-                HadoopProcess proc = runningProcsByProcId.get(childProcId);
-
-                assert proc != null;
-
-                assert proc.initFut == this;
-
-                proc.onInitialized(res.get1(), res.get2());
-
-                if (!F.isEmpty(proc.reducers()))
-                    jobTracker.onExternalMappersInitialized(jobId, proc.reducers(), desc);
-            }
-            else {
-                // Clean up since init failed.
-                runningProcsByJobId.remove(jobId);
-                runningProcsByProcId.remove(childProcId);
-            }
-
-            if (super.onDone(res, err)) {
-                if (err == null) {
-                    if (log.isDebugEnabled())
-                        log.debug("Initialized child process for external task execution [jobId=" + jobId +
-                            ", desc=" + desc + ", initTime=" + duration() + ']');
-                }
-                else
-                    U.error(log, "Failed to initialize child process for external task execution [jobId=" + jobId +
-                        ", desc=" + desc + ']', err);
-
-                return true;
-            }
-
-            return false;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
new file mode 100644
index 0000000..616d383
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
@@ -0,0 +1,960 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
+import org.apache.ignite.internal.processors.hadoop.message.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.locks.*;
+
+import static org.apache.ignite.internal.processors.hadoop.taskexecutor.GridHadoopTaskState.*;
+
+/**
+ * External process registry. Handles external process lifecycle.
+ */
+public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
+    /** Hadoop context. */
+    private HadoopContext ctx;
+
+    /** */
+    private String javaCmd;
+
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** Node process descriptor. */
+    private GridHadoopProcessDescriptor nodeDesc;
+
+    /** Output base. */
+    private File outputBase;
+
+    /** Path separator. */
+    private String pathSep;
+
+    /** Hadoop external communication. */
+    private GridHadoopExternalCommunication comm;
+
+    /** Starting processes. */
+    private final ConcurrentMap<UUID, HadoopProcess> runningProcsByProcId = new ConcurrentHashMap8<>();
+
+    /** Starting processes. */
+    private final ConcurrentMap<GridHadoopJobId, HadoopProcess> runningProcsByJobId = new ConcurrentHashMap8<>();
+
+    /** Busy lock. */
+    private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock();
+
+    /** Job tracker. */
+    private HadoopJobTracker jobTracker;
+
+    /** {@inheritDoc} */
+    @Override public void start(HadoopContext ctx) throws IgniteCheckedException {
+        this.ctx = ctx;
+
+        log = ctx.kernalContext().log(HadoopExternalTaskExecutor.class);
+
+        outputBase = U.resolveWorkDirectory("hadoop", false);
+
+        pathSep = System.getProperty("path.separator", U.isWindows() ? ";" : ":");
+
+        initJavaCommand();
+
+        comm = new GridHadoopExternalCommunication(
+            ctx.localNodeId(),
+            UUID.randomUUID(),
+            ctx.kernalContext().config().getMarshaller(),
+            log,
+            ctx.kernalContext().getSystemExecutorService(),
+            ctx.kernalContext().gridName());
+
+        comm.setListener(new MessageListener());
+
+        comm.start();
+
+        nodeDesc = comm.localProcessDescriptor();
+
+        ctx.kernalContext().ports().registerPort(nodeDesc.tcpPort(), IgnitePortProtocol.TCP,
+            HadoopExternalTaskExecutor.class);
+
+        if (nodeDesc.sharedMemoryPort() != -1)
+            ctx.kernalContext().ports().registerPort(nodeDesc.sharedMemoryPort(), IgnitePortProtocol.TCP,
+                HadoopExternalTaskExecutor.class);
+
+        jobTracker = ctx.jobTracker();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) {
+        busyLock.writeLock();
+
+        try {
+            comm.stop();
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to gracefully stop external hadoop communication server (will shutdown anyway)", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onJobStateChanged(final GridHadoopJobMetadata meta) {
+        final HadoopProcess proc = runningProcsByJobId.get(meta.jobId());
+
+        // If we have a local process for this job.
+        if (proc != null) {
+            if (log.isDebugEnabled())
+                log.debug("Updating job information for remote task process [proc=" + proc + ", meta=" + meta + ']');
+
+            if (meta.phase() == GridHadoopJobPhase.PHASE_COMPLETE) {
+                if (log.isDebugEnabled())
+                    log.debug("Completed job execution, will terminate child process [jobId=" + meta.jobId() +
+                        ", proc=" + proc + ']');
+
+                runningProcsByJobId.remove(meta.jobId());
+                runningProcsByProcId.remove(proc.descriptor().processId());
+
+                proc.terminate();
+
+                return;
+            }
+
+            if (proc.initFut.isDone()) {
+                if (!proc.initFut.isFailed())
+                    sendJobInfoUpdate(proc, meta);
+                else if (log.isDebugEnabled())
+                    log.debug("Failed to initialize child process (will skip job state notification) " +
+                        "[jobId=" + meta.jobId() + ", meta=" + meta + ']');
+            }
+            else {
+                proc.initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>>>() {
+                    @Override public void apply(IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>> f) {
+                        try {
+                            f.get();
+
+                            sendJobInfoUpdate(proc, meta);
+                        }
+                        catch (IgniteCheckedException e) {
+                            if (log.isDebugEnabled())
+                                log.debug("Failed to initialize child process (will skip job state notification) " +
+                                    "[jobId=" + meta.jobId() + ", meta=" + meta + ", err=" + e + ']');
+                        }
+
+                    }
+                });
+            }
+        }
+        else if (ctx.isParticipating(meta)) {
+            GridHadoopJob job;
+
+            try {
+                job = jobTracker.job(meta.jobId(), meta.jobInfo());
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to get job: " + meta.jobId(), e);
+
+                return;
+            }
+
+            startProcess(job, meta.mapReducePlan());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("ConstantConditions")
+    @Override public void run(final GridHadoopJob job, final Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException {
+        if (!busyLock.tryReadLock()) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to start hadoop tasks (grid is stopping, will ignore).");
+
+            return;
+        }
+
+        try {
+            HadoopProcess proc = runningProcsByJobId.get(job.id());
+
+            GridHadoopTaskType taskType = F.first(tasks).type();
+
+            if (taskType == GridHadoopTaskType.SETUP || taskType == GridHadoopTaskType.ABORT ||
+                taskType == GridHadoopTaskType.COMMIT) {
+                if (proc == null || proc.terminated()) {
+                    runningProcsByJobId.remove(job.id(), proc);
+
+                    // Start new process for ABORT task since previous processes were killed.
+                    proc = startProcess(job, jobTracker.plan(job.id()));
+
+                    if (log.isDebugEnabled())
+                        log.debug("Starting new process for maintenance task [jobId=" + job.id() +
+                            ", proc=" + proc + ", taskType=" + taskType + ']');
+                }
+            }
+            else
+                assert proc != null : "Missing started process for task execution request: " + job.id() +
+                    ", tasks=" + tasks;
+
+            final HadoopProcess proc0 = proc;
+
+            proc.initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>>>() {
+                @Override public void apply(
+                    IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>> f) {
+                    if (!busyLock.tryReadLock())
+                        return;
+
+                    try {
+                        f.get();
+
+                        proc0.addTasks(tasks);
+
+                        if (log.isDebugEnabled())
+                            log.debug("Sending task execution request to child process [jobId=" + job.id() +
+                                ", proc=" + proc0 + ", tasks=" + tasks + ']');
+
+                        sendExecutionRequest(proc0, job, tasks);
+                    }
+                    catch (IgniteCheckedException e) {
+                        notifyTasksFailed(tasks, FAILED, e);
+                    }
+                    finally {
+                        busyLock.readUnlock();
+                    }
+                }
+            });
+        }
+        finally {
+            busyLock.readUnlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancelTasks(GridHadoopJobId jobId) {
+        HadoopProcess proc = runningProcsByJobId.get(jobId);
+
+        if (proc != null)
+            proc.terminate();
+    }
+
+    /**
+     * Sends execution request to remote node.
+     *
+     * @param proc Process to send request to.
+     * @param job Job instance.
+     * @param tasks Collection of tasks to execute in started process.
+     */
+    private void sendExecutionRequest(HadoopProcess proc, GridHadoopJob job, Collection<GridHadoopTaskInfo> tasks)
+        throws IgniteCheckedException {
+        // Must synchronize since concurrent process crash may happen and will receive onConnectionLost().
+        proc.lock();
+
+        try {
+            if (proc.terminated()) {
+                notifyTasksFailed(tasks, CRASHED, null);
+
+                return;
+            }
+
+            GridHadoopTaskExecutionRequest req = new GridHadoopTaskExecutionRequest();
+
+            req.jobId(job.id());
+            req.jobInfo(job.info());
+            req.tasks(tasks);
+
+            comm.sendMessage(proc.descriptor(), req);
+        }
+        finally {
+            proc.unlock();
+        }
+    }
+
+    /**
+     * @return External task metadata.
+     */
+    private GridHadoopExternalTaskMetadata buildTaskMeta() {
+        GridHadoopExternalTaskMetadata meta = new GridHadoopExternalTaskMetadata();
+
+        meta.classpath(Arrays.asList(System.getProperty("java.class.path").split(File.pathSeparator)));
+        meta.jvmOptions(Arrays.asList("-Xmx1g", "-ea", "-XX:+UseConcMarkSweepGC", "-XX:+CMSClassUnloadingEnabled",
+            "-DIGNITE_HOME=" + U.getIgniteHome()));
+
+        return meta;
+    }
+
+    /**
+     * @param tasks Tasks to notify about.
+     * @param state Fail state.
+     * @param e Optional error.
+     */
+    private void notifyTasksFailed(Iterable<GridHadoopTaskInfo> tasks, GridHadoopTaskState state, Throwable e) {
+        GridHadoopTaskStatus fail = new GridHadoopTaskStatus(state, e);
+
+        for (GridHadoopTaskInfo task : tasks)
+            jobTracker.onTaskFinished(task, fail);
+    }
+
+    /**
+     * Starts process template that will be ready to execute Hadoop tasks.
+     *
+     * @param job Job instance.
+     * @param plan Map reduce plan.
+     */
+    private HadoopProcess startProcess(final GridHadoopJob job, final GridHadoopMapReducePlan plan) {
+        final UUID childProcId = UUID.randomUUID();
+
+        GridHadoopJobId jobId = job.id();
+
+        final GridHadoopProcessFuture fut = new GridHadoopProcessFuture(childProcId, jobId, ctx.kernalContext());
+
+        final HadoopProcess proc = new HadoopProcess(jobId, fut, plan.reducers(ctx.localNodeId()));
+
+        HadoopProcess old = runningProcsByJobId.put(jobId, proc);
+
+        assert old == null;
+
+        old = runningProcsByProcId.put(childProcId, proc);
+
+        assert old == null;
+
+        ctx.kernalContext().closure().runLocalSafe(new Runnable() {
+            @Override public void run() {
+                if (!busyLock.tryReadLock()) {
+                    fut.onDone(new IgniteCheckedException("Failed to start external process (grid is stopping)."));
+
+                    return;
+                }
+
+                try {
+                    GridHadoopExternalTaskMetadata startMeta = buildTaskMeta();
+
+                    if (log.isDebugEnabled())
+                        log.debug("Created hadoop child process metadata for job [job=" + job +
+                            ", childProcId=" + childProcId + ", taskMeta=" + startMeta + ']');
+
+                    Process proc = startJavaProcess(childProcId, startMeta, job);
+
+                    BufferedReader rdr = new BufferedReader(new InputStreamReader(proc.getInputStream()));
+
+                    String line;
+
+                    // Read up all the process output.
+                    while ((line = rdr.readLine()) != null) {
+                        if (log.isDebugEnabled())
+                            log.debug("Tracing process output: " + line);
+
+                        if ("Started".equals(line)) {
+                            // Process started successfully, it should not write anything more to the output stream.
+                            if (log.isDebugEnabled())
+                                log.debug("Successfully started child process [childProcId=" + childProcId +
+                                    ", meta=" + job + ']');
+
+                            fut.onProcessStarted(proc);
+
+                            break;
+                        }
+                        else if ("Failed".equals(line)) {
+                            StringBuilder sb = new StringBuilder("Failed to start child process: " + job + "\n");
+
+                            while ((line = rdr.readLine()) != null)
+                                sb.append("    ").append(line).append("\n");
+
+                            // Cut last character.
+                            sb.setLength(sb.length() - 1);
+
+                            log.warning(sb.toString());
+
+                            fut.onDone(new IgniteCheckedException(sb.toString()));
+
+                            break;
+                        }
+                    }
+                }
+                catch (Throwable e) {
+                    fut.onDone(new IgniteCheckedException("Failed to initialize child process: " + job, e));
+                }
+                finally {
+                    busyLock.readUnlock();
+                }
+            }
+        }, true);
+
+        fut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>>>() {
+            @Override public void apply(IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>> f) {
+                try {
+                    // Make sure there were no exceptions.
+                    f.get();
+
+                    prepareForJob(proc, job, plan);
+                }
+                catch (IgniteCheckedException ignore) {
+                    // Exception is printed in future's onDone() method.
+                }
+            }
+        });
+
+        return proc;
+    }
+
+    /**
+     * Checks that java local command is available.
+     *
+     * @throws IgniteCheckedException If initialization failed.
+     */
+    private void initJavaCommand() throws IgniteCheckedException {
+        String javaHome = System.getProperty("java.home");
+
+        if (javaHome == null)
+            javaHome = System.getenv("JAVA_HOME");
+
+        if (javaHome == null)
+            throw new IgniteCheckedException("Failed to locate JAVA_HOME.");
+
+        javaCmd = javaHome + File.separator + "bin" + File.separator + (U.isWindows() ? "java.exe" : "java");
+
+        try {
+            Process proc = new ProcessBuilder(javaCmd, "-version").redirectErrorStream(true).start();
+
+            Collection<String> out = readProcessOutput(proc);
+
+            int res = proc.waitFor();
+
+            if (res != 0)
+                throw new IgniteCheckedException("Failed to execute 'java -version' command (process finished with nonzero " +
+                    "code) [exitCode=" + res + ", javaCmd='" + javaCmd + "', msg=" + F.first(out) + ']');
+
+            if (log.isInfoEnabled()) {
+                log.info("Will use java for external task execution: ");
+
+                for (String s : out)
+                    log.info("    " + s);
+            }
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException("Failed to check java for external task execution.", e);
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteCheckedException("Failed to wait for process completion (thread got interrupted).", e);
+        }
+    }
+
+    /**
+     * Reads process output line-by-line.
+     *
+     * @param proc Process to read output.
+     * @return Read lines.
+     * @throws IOException If read failed.
+     */
+    private Collection<String> readProcessOutput(Process proc) throws IOException {
+        BufferedReader rdr = new BufferedReader(new InputStreamReader(proc.getInputStream()));
+
+        Collection<String> res = new ArrayList<>();
+
+        String s;
+
+        while ((s = rdr.readLine()) != null)
+            res.add(s);
+
+        return res;
+    }
+
+    /**
+     * Builds process from metadata.
+     *
+     * @param childProcId Child process ID.
+     * @param startMeta Metadata.
+     * @param job Job.
+     * @return Started process.
+     */
+    private Process startJavaProcess(UUID childProcId, GridHadoopExternalTaskMetadata startMeta,
+        GridHadoopJob job) throws Exception {
+        String outFldr = jobWorkFolder(job.id()) + File.separator + childProcId;
+
+        if (log.isDebugEnabled())
+            log.debug("Will write process log output to: " + outFldr);
+
+        List<String> cmd = new ArrayList<>();
+
+        File workDir = U.resolveWorkDirectory("", false);
+
+        cmd.add(javaCmd);
+        cmd.addAll(startMeta.jvmOptions());
+        cmd.add("-cp");
+        cmd.add(buildClasspath(startMeta.classpath()));
+        cmd.add(GridHadoopExternalProcessStarter.class.getName());
+        cmd.add("-cpid");
+        cmd.add(String.valueOf(childProcId));
+        cmd.add("-ppid");
+        cmd.add(String.valueOf(nodeDesc.processId()));
+        cmd.add("-nid");
+        cmd.add(String.valueOf(nodeDesc.parentNodeId()));
+        cmd.add("-addr");
+        cmd.add(nodeDesc.address());
+        cmd.add("-tport");
+        cmd.add(String.valueOf(nodeDesc.tcpPort()));
+        cmd.add("-sport");
+        cmd.add(String.valueOf(nodeDesc.sharedMemoryPort()));
+        cmd.add("-out");
+        cmd.add(outFldr);
+        cmd.add("-wd");
+        cmd.add(workDir.getAbsolutePath());
+
+        return new ProcessBuilder(cmd)
+            .redirectErrorStream(true)
+            .directory(workDir)
+            .start();
+    }
+
+    /**
+     * Gets job work folder.
+     *
+     * @param jobId Job ID.
+     * @return Job work folder.
+     */
+    private String jobWorkFolder(GridHadoopJobId jobId) {
+        return outputBase + File.separator + "Job_" + jobId;
+    }
+
+    /**
+     * @param cp Classpath collection.
+     * @return Classpath string.
+     */
+    private String buildClasspath(Collection<String> cp) {
+        assert !cp.isEmpty();
+
+        StringBuilder sb = new StringBuilder();
+
+        for (String s : cp)
+            sb.append(s).append(pathSep);
+
+        sb.setLength(sb.length() - 1);
+
+        return sb.toString();
+    }
+
+    /**
+     * Sends job info update request to remote process.
+     *
+     * @param proc Process to send request to.
+     * @param meta Job metadata.
+     */
+    private void sendJobInfoUpdate(HadoopProcess proc, GridHadoopJobMetadata meta) {
+        Map<Integer, GridHadoopProcessDescriptor> rdcAddrs = meta.reducersAddresses();
+
+        int rdcNum = meta.mapReducePlan().reducers();
+
+        GridHadoopProcessDescriptor[] addrs = null;
+
+        if (rdcAddrs != null && rdcAddrs.size() == rdcNum) {
+            addrs = new GridHadoopProcessDescriptor[rdcNum];
+
+            for (int i = 0; i < rdcNum; i++) {
+                GridHadoopProcessDescriptor desc = rdcAddrs.get(i);
+
+                assert desc != null : "Missing reducing address [meta=" + meta + ", rdc=" + i + ']';
+
+                addrs[i] = desc;
+            }
+        }
+
+        try {
+            comm.sendMessage(proc.descriptor(), new GridHadoopJobInfoUpdateRequest(proc.jobId, meta.phase(), addrs));
+        }
+        catch (IgniteCheckedException e) {
+            if (!proc.terminated()) {
+                log.error("Failed to send job state update message to remote child process (will kill the process) " +
+                    "[jobId=" + proc.jobId + ", meta=" + meta + ']', e);
+
+                proc.terminate();
+            }
+        }
+    }
+
+    /**
+     * Sends prepare request to remote process.
+     *
+     * @param proc Process to send request to.
+     * @param job Job.
+     * @param plan Map reduce plan.
+     */
+    private void prepareForJob(HadoopProcess proc, GridHadoopJob job, GridHadoopMapReducePlan plan) {
+        try {
+            comm.sendMessage(proc.descriptor(), new GridHadoopPrepareForJobRequest(job.id(), job.info(),
+                plan.reducers(), plan.reducers(ctx.localNodeId())));
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send job prepare request to remote process [proc=" + proc + ", job=" + job +
+                ", plan=" + plan + ']', e);
+
+            proc.terminate();
+        }
+    }
+
+    /**
+     * Processes task finished message.
+     *
+     * @param desc Remote process descriptor.
+     * @param taskMsg Task finished message.
+     */
+    private void processTaskFinishedMessage(GridHadoopProcessDescriptor desc, GridHadoopTaskFinishedMessage taskMsg) {
+        HadoopProcess proc = runningProcsByProcId.get(desc.processId());
+
+        if (proc != null)
+            proc.removeTask(taskMsg.taskInfo());
+
+        jobTracker.onTaskFinished(taskMsg.taskInfo(), taskMsg.status());
+    }
+
+    /**
+     *
+     */
+    private class MessageListener implements GridHadoopMessageListener {
+        /** {@inheritDoc} */
+        @Override public void onMessageReceived(GridHadoopProcessDescriptor desc, GridHadoopMessage msg) {
+            if (!busyLock.tryReadLock())
+                return;
+
+            try {
+                if (msg instanceof GridHadoopProcessStartedAck) {
+                    HadoopProcess proc = runningProcsByProcId.get(desc.processId());
+
+                    assert proc != null : "Missing child process for processId: " + desc;
+
+                    GridHadoopProcessFuture fut = proc.initFut;
+
+                    if (fut != null)
+                        fut.onReplyReceived(desc);
+                    // Safety.
+                    else
+                        log.warning("Failed to find process start future (will ignore): " + desc);
+                }
+                else if (msg instanceof GridHadoopTaskFinishedMessage) {
+                    GridHadoopTaskFinishedMessage taskMsg = (GridHadoopTaskFinishedMessage)msg;
+
+                    processTaskFinishedMessage(desc, taskMsg);
+                }
+                else
+                    log.warning("Unexpected message received by node [desc=" + desc + ", msg=" + msg + ']');
+            }
+            finally {
+                busyLock.readUnlock();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onConnectionLost(GridHadoopProcessDescriptor desc) {
+            if (!busyLock.tryReadLock())
+                return;
+
+            try {
+                if (desc == null) {
+                    U.warn(log, "Handshake failed.");
+
+                    return;
+                }
+
+                // Notify job tracker about failed tasks.
+                HadoopProcess proc = runningProcsByProcId.get(desc.processId());
+
+                if (proc != null) {
+                    Collection<GridHadoopTaskInfo> tasks = proc.tasks();
+
+                    if (!F.isEmpty(tasks)) {
+                        log.warning("Lost connection with alive process (will terminate): " + desc);
+
+                        GridHadoopTaskStatus status = new GridHadoopTaskStatus(CRASHED,
+                            new IgniteCheckedException("Failed to run tasks (external process finished unexpectedly): " + desc));
+
+                        for (GridHadoopTaskInfo info : tasks)
+                            jobTracker.onTaskFinished(info, status);
+
+                        runningProcsByJobId.remove(proc.jobId(), proc);
+                    }
+
+                    // Safety.
+                    proc.terminate();
+                }
+            }
+            finally {
+                busyLock.readUnlock();
+            }
+        }
+    }
+
+    /**
+     * Hadoop process.
+     */
+    private static class HadoopProcess extends ReentrantLock {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Job ID. */
+        private final GridHadoopJobId jobId;
+
+        /** Process. */
+        private Process proc;
+
+        /** Init future. Completes when process is ready to receive messages. */
+        private final GridHadoopProcessFuture initFut;
+
+        /** Process descriptor. */
+        private GridHadoopProcessDescriptor procDesc;
+
+        /** Reducers planned for this process. */
+        private Collection<Integer> reducers;
+
+        /** Tasks. */
+        private final Collection<GridHadoopTaskInfo> tasks = new ConcurrentLinkedDeque8<>();
+
+        /** Terminated flag. */
+        private volatile boolean terminated;
+
+        /**
+         * @param jobId Job ID.
+         * @param initFut Init future.
+         */
+        private HadoopProcess(GridHadoopJobId jobId, GridHadoopProcessFuture initFut,
+            int[] reducers) {
+            this.jobId = jobId;
+            this.initFut = initFut;
+
+            if (!F.isEmpty(reducers)) {
+                this.reducers = new ArrayList<>(reducers.length);
+
+                for (int r : reducers)
+                    this.reducers.add(r);
+            }
+        }
+
+        /**
+         * @return Communication process descriptor.
+         */
+        private GridHadoopProcessDescriptor descriptor() {
+            return procDesc;
+        }
+
+        /**
+         * @return Job ID.
+         */
+        public GridHadoopJobId jobId() {
+            return jobId;
+        }
+
+        /**
+         * Initialized callback.
+         *
+         * @param proc Java process representation.
+         * @param procDesc Process descriptor.
+         */
+        private void onInitialized(Process proc, GridHadoopProcessDescriptor procDesc) {
+            this.proc = proc;
+            this.procDesc = procDesc;
+        }
+
+        /**
+         * Terminates process (kills it).
+         */
+        private void terminate() {
+            // Guard against concurrent message sending.
+            lock();
+
+            try {
+                terminated = true;
+
+                if (!initFut.isDone())
+                    initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>>>() {
+                        @Override public void apply(
+                            IgniteInternalFuture<IgniteBiTuple<Process, GridHadoopProcessDescriptor>> f) {
+                            proc.destroy();
+                        }
+                    });
+                else
+                    proc.destroy();
+            }
+            finally {
+                unlock();
+            }
+        }
+
+        /**
+         * @return Terminated flag.
+         */
+        private boolean terminated() {
+            return terminated;
+        }
+
+        /**
+         * Sets process tasks.
+         *
+         * @param tasks Tasks to set.
+         */
+        private void addTasks(Collection<GridHadoopTaskInfo> tasks) {
+            this.tasks.addAll(tasks);
+        }
+
+        /**
+         * Removes task when it was completed.
+         *
+         * @param task Task to remove.
+         */
+        private void removeTask(GridHadoopTaskInfo task) {
+            if (tasks != null)
+                tasks.remove(task);
+        }
+
+        /**
+         * @return Collection of tasks.
+         */
+        private Collection<GridHadoopTaskInfo> tasks() {
+            return tasks;
+        }
+
+        /**
+         * @return Planned reducers.
+         */
+        private Collection<Integer> reducers() {
+            return reducers;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(HadoopProcess.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private class GridHadoopProcessFuture extends GridFutureAdapter<IgniteBiTuple<Process, GridHadoopProcessDescriptor>> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Child process ID. */
+        private UUID childProcId;
+
+        /** Job ID. */
+        private GridHadoopJobId jobId;
+
+        /** Process descriptor. */
+        private GridHadoopProcessDescriptor desc;
+
+        /** Running process. */
+        private Process proc;
+
+        /** Process started flag. */
+        private volatile boolean procStarted;
+
+        /** Reply received flag. */
+        private volatile boolean replyReceived;
+
+        /** Logger. */
+        private final IgniteLogger log = HadoopExternalTaskExecutor.this.log;
+
+        /**
+         * Empty constructor.
+         */
+        public GridHadoopProcessFuture() {
+            // No-op.
+        }
+
+        /**
+         * @param ctx Kernal context.
+         */
+        private GridHadoopProcessFuture(UUID childProcId, GridHadoopJobId jobId, GridKernalContext ctx) {
+            super(ctx);
+
+            this.childProcId = childProcId;
+            this.jobId = jobId;
+        }
+
+        /**
+         * Process started callback.
+         */
+        public void onProcessStarted(Process proc) {
+            this.proc = proc;
+
+            procStarted = true;
+
+            if (procStarted && replyReceived)
+                onDone(F.t(proc, desc));
+        }
+
+        /**
+         * Reply received callback.
+         */
+        public void onReplyReceived(GridHadoopProcessDescriptor desc) {
+            assert childProcId.equals(desc.processId());
+
+            this.desc = desc;
+
+            replyReceived = true;
+
+            if (procStarted && replyReceived)
+                onDone(F.t(proc, desc));
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean onDone(@Nullable IgniteBiTuple<Process, GridHadoopProcessDescriptor> res,
+            @Nullable Throwable err) {
+            if (err == null) {
+                HadoopProcess proc = runningProcsByProcId.get(childProcId);
+
+                assert proc != null;
+
+                assert proc.initFut == this;
+
+                proc.onInitialized(res.get1(), res.get2());
+
+                if (!F.isEmpty(proc.reducers()))
+                    jobTracker.onExternalMappersInitialized(jobId, proc.reducers(), desc);
+            }
+            else {
+                // Clean up since init failed.
+                runningProcsByJobId.remove(jobId);
+                runningProcsByProcId.remove(childProcId);
+            }
+
+            if (super.onDone(res, err)) {
+                if (err == null) {
+                    if (log.isDebugEnabled())
+                        log.debug("Initialized child process for external task execution [jobId=" + jobId +
+                            ", desc=" + desc + ", initTime=" + duration() + ']');
+                }
+                else
+                    U.error(log, "Failed to initialize child process for external task execution [jobId=" + jobId +
+                        ", desc=" + desc + ']', err);
+
+                return true;
+            }
+
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1MapTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1MapTask.java
index 878b61b..da59483 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1MapTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1MapTask.java
@@ -86,7 +86,7 @@ public class GridHadoopV1MapTask extends GridHadoopV1Task {
                 try {
                     while (reader.next(key, val)) {
                         if (isCancelled())
-                            throw new GridHadoopTaskCancelledException("Map task cancelled.");
+                            throw new HadoopTaskCancelledException("Map task cancelled.");
 
                         mapper.map(key, val, collector, reporter);
                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1ReduceTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1ReduceTask.java
index 7deea90..3aca637 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1ReduceTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1ReduceTask.java
@@ -67,7 +67,7 @@ public class GridHadoopV1ReduceTask extends GridHadoopV1Task {
                 try {
                     while (input.next()) {
                         if (isCancelled())
-                            throw new GridHadoopTaskCancelledException("Reduce task cancelled.");
+                            throw new HadoopTaskCancelledException("Reduce task cancelled.");
 
                         reducer.reduce(input.key(), input.values(), collector, Reporter.NULL);
                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Splitter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Splitter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Splitter.java
index 257f4ea..0e1fb44 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Splitter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Splitter.java
@@ -58,7 +58,7 @@ public class GridHadoopV1Splitter {
                     res.add(new GridHadoopFileBlock(s.getLocations(), s.getPath().toUri(), s.getStart(), s.getLength()));
                 }
                 else
-                    res.add(GridHadoopUtils.wrapSplit(i, nativeSplit, nativeSplit.getLocations()));
+                    res.add(HadoopUtils.wrapSplit(i, nativeSplit, nativeSplit.getLocations()));
             }
 
             return res;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Task.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Task.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Task.java
index 86a7264..305bc4e 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Task.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Task.java
@@ -72,7 +72,7 @@ public abstract class GridHadoopV1Task extends GridHadoopTask {
             /** {@inheritDoc} */
             @Override public void collect(Object key, Object val) throws IOException {
                 if (cancelled)
-                    throw new GridHadoopTaskCancelledException("Task cancelled.");
+                    throw new HadoopTaskCancelledException("Task cancelled.");
 
                 super.collect(key, val);
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Context.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Context.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Context.java
index 287b10f..160e34b 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Context.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Context.java
@@ -92,7 +92,7 @@ public class GridHadoopV2Context extends JobContextImpl implements MapContext, R
             else if (split instanceof GridHadoopExternalSplit)
                 throw new UnsupportedOperationException(); // TODO
             else if (split instanceof GridHadoopSplitWrapper)
-                inputSplit = (InputSplit)GridHadoopUtils.unwrapSplit((GridHadoopSplitWrapper)split);
+                inputSplit = (InputSplit) HadoopUtils.unwrapSplit((GridHadoopSplitWrapper) split);
             else
                 throw new IllegalStateException();
         }
@@ -103,7 +103,7 @@ public class GridHadoopV2Context extends JobContextImpl implements MapContext, R
     /** {@inheritDoc} */
     @Override public boolean nextKeyValue() throws IOException, InterruptedException {
         if (cancelled)
-            throw new GridHadoopTaskCancelledException("Task cancelled.");
+            throw new HadoopTaskCancelledException("Task cancelled.");
 
         return reader.nextKeyValue();
     }
@@ -125,7 +125,7 @@ public class GridHadoopV2Context extends JobContextImpl implements MapContext, R
     @SuppressWarnings("unchecked")
     @Override public void write(Object key, Object val) throws IOException, InterruptedException {
         if (cancelled)
-            throw new GridHadoopTaskCancelledException("Task cancelled.");
+            throw new HadoopTaskCancelledException("Task cancelled.");
 
         if (writer != null)
             writer.write(key, val);
@@ -191,7 +191,7 @@ public class GridHadoopV2Context extends JobContextImpl implements MapContext, R
     /** {@inheritDoc} */
     @Override public boolean nextKey() throws IOException, InterruptedException {
         if (cancelled)
-            throw new GridHadoopTaskCancelledException("Task cancelled.");
+            throw new HadoopTaskCancelledException("Task cancelled.");
 
         return input.next();
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Job.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Job.java
index 7c36948..5f1af22 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Job.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Job.java
@@ -39,7 +39,7 @@ import java.util.*;
 import java.util.Queue;
 import java.util.concurrent.*;
 
-import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*;
+import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
 
 /**
  * Hadoop job implementation for v2 API.
@@ -81,7 +81,7 @@ public class GridHadoopV2Job implements GridHadoopJob {
      * @param jobInfo Job info.
      * @param log Logger.
      */
-    public GridHadoopV2Job(GridHadoopJobId jobId, final GridHadoopDefaultJobInfo jobInfo, IgniteLogger log) {
+    public GridHadoopV2Job(GridHadoopJobId jobId, final HadoopDefaultJobInfo jobInfo, IgniteLogger log) {
         assert jobId != null;
         assert jobInfo != null;
 
@@ -90,7 +90,7 @@ public class GridHadoopV2Job implements GridHadoopJob {
 
         hadoopJobID = new JobID(jobId.globalId().toString(), jobId.localId());
 
-        GridHadoopClassLoader clsLdr = (GridHadoopClassLoader)getClass().getClassLoader();
+        HadoopClassLoader clsLdr = (HadoopClassLoader)getClass().getClassLoader();
 
         // Before create JobConf instance we should set new context class loader.
         Thread.currentThread().setContextClassLoader(clsLdr);
@@ -196,7 +196,7 @@ public class GridHadoopV2Job implements GridHadoopJob {
         try {
             if (cls == null) {
                 // If there is no pooled class, then load new one.
-                GridHadoopClassLoader ldr = new GridHadoopClassLoader(rsrcMgr.classPath());
+                HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath());
 
                 cls = ldr.loadClass(GridHadoopV2TaskContext.class.getName());
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Splitter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Splitter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Splitter.java
index e8ce70b..68338a6 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Splitter.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Splitter.java
@@ -59,7 +59,7 @@ public class GridHadoopV2Splitter {
                     res.add(new GridHadoopFileBlock(s.getLocations(), s.getPath().toUri(), s.getStart(), s.getLength()));
                 }
                 else
-                    res.add(GridHadoopUtils.wrapSplit(id, nativeSplit, nativeSplit.getLocations()));
+                    res.add(HadoopUtils.wrapSplit(id, nativeSplit, nativeSplit.getLocations()));
 
                 id++;
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ace354c6/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2TaskContext.java
index 82be91f..3e88362 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2TaskContext.java
@@ -40,7 +40,7 @@ import java.io.*;
 import java.util.*;
 
 import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*;
-import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*;
+import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
 
 /**
  * Context for task execution.
@@ -186,7 +186,7 @@ public class GridHadoopV2TaskContext extends GridHadoopTaskContext {
             }
 
             if (cancelled)
-                throw new GridHadoopTaskCancelledException("Task cancelled.");
+                throw new HadoopTaskCancelledException("Task cancelled.");
 
             try {
                 task.run(this);