You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2015/03/05 10:05:50 UTC

[50/58] [abbrv] incubator-ignite git commit: Merge branch 'sprint-2' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-futures-cleanup-1

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17ac3602/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
----------------------------------------------------------------------
diff --cc modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
index 0000000,04a96de..4b749f3
mode 000000,100644..100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
@@@ -1,0 -1,960 +1,958 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.hadoop.*;
+ import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
+ import org.apache.ignite.internal.processors.hadoop.message.*;
+ import org.apache.ignite.internal.processors.hadoop.taskexecutor.*;
+ import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child.*;
+ import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.internal.util.future.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.spi.*;
+ import org.jdk8.backport.*;
+ import org.jetbrains.annotations.*;
+ 
+ import java.io.*;
+ import java.util.*;
+ import java.util.concurrent.*;
+ import java.util.concurrent.locks.*;
+ 
+ import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.*;
+ 
+ /**
+  * External process registry. Handles external process lifecycle.
+  */
+ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
+     /** Hadoop context. */
+     private HadoopContext ctx;
+ 
+     /** */
+     private String javaCmd;
+ 
+     /** Logger. */
+     private IgniteLogger log;
+ 
+     /** Node process descriptor. */
+     private HadoopProcessDescriptor nodeDesc;
+ 
+     /** Output base. */
+     private File outputBase;
+ 
+     /** Path separator. */
+     private String pathSep;
+ 
+     /** Hadoop external communication. */
+     private HadoopExternalCommunication comm;
+ 
+     /** Starting processes. */
+     private final ConcurrentMap<UUID, HadoopProcess> runningProcsByProcId = new ConcurrentHashMap8<>();
+ 
+     /** Starting processes. */
+     private final ConcurrentMap<HadoopJobId, HadoopProcess> runningProcsByJobId = new ConcurrentHashMap8<>();
+ 
+     /** Busy lock. */
+     private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock();
+ 
+     /** Job tracker. */
+     private HadoopJobTracker jobTracker;
+ 
+     /** {@inheritDoc} */
+     @Override public void start(HadoopContext ctx) throws IgniteCheckedException {
+         this.ctx = ctx;
+ 
+         log = ctx.kernalContext().log(HadoopExternalTaskExecutor.class);
+ 
+         outputBase = U.resolveWorkDirectory("hadoop", false);
+ 
+         pathSep = System.getProperty("path.separator", U.isWindows() ? ";" : ":");
+ 
+         initJavaCommand();
+ 
+         comm = new HadoopExternalCommunication(
+             ctx.localNodeId(),
+             UUID.randomUUID(),
+             ctx.kernalContext().config().getMarshaller(),
+             log,
+             ctx.kernalContext().getSystemExecutorService(),
+             ctx.kernalContext().gridName());
+ 
+         comm.setListener(new MessageListener());
+ 
+         comm.start();
+ 
+         nodeDesc = comm.localProcessDescriptor();
+ 
+         ctx.kernalContext().ports().registerPort(nodeDesc.tcpPort(), IgnitePortProtocol.TCP,
+             HadoopExternalTaskExecutor.class);
+ 
+         if (nodeDesc.sharedMemoryPort() != -1)
+             ctx.kernalContext().ports().registerPort(nodeDesc.sharedMemoryPort(), IgnitePortProtocol.TCP,
+                 HadoopExternalTaskExecutor.class);
+ 
+         jobTracker = ctx.jobTracker();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void stop(boolean cancel) {
+         busyLock.writeLock();
+ 
+         try {
+             comm.stop();
+         }
+         catch (IgniteCheckedException e) {
+             U.error(log, "Failed to gracefully stop external hadoop communication server (will shutdown anyway)", e);
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void onJobStateChanged(final HadoopJobMetadata meta) {
+         final HadoopProcess proc = runningProcsByJobId.get(meta.jobId());
+ 
+         // If we have a local process for this job.
+         if (proc != null) {
+             if (log.isDebugEnabled())
+                 log.debug("Updating job information for remote task process [proc=" + proc + ", meta=" + meta + ']');
+ 
+             if (meta.phase() == HadoopJobPhase.PHASE_COMPLETE) {
+                 if (log.isDebugEnabled())
+                     log.debug("Completed job execution, will terminate child process [jobId=" + meta.jobId() +
+                         ", proc=" + proc + ']');
+ 
+                 runningProcsByJobId.remove(meta.jobId());
+                 runningProcsByProcId.remove(proc.descriptor().processId());
+ 
+                 proc.terminate();
+ 
+                 return;
+             }
+ 
+             if (proc.initFut.isDone()) {
+                 if (!proc.initFut.isFailed())
+                     sendJobInfoUpdate(proc, meta);
+                 else if (log.isDebugEnabled())
+                     log.debug("Failed to initialize child process (will skip job state notification) " +
+                         "[jobId=" + meta.jobId() + ", meta=" + meta + ']');
+             }
+             else {
+                 proc.initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() {
+                     @Override public void apply(IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) {
+                         try {
+                             f.get();
+ 
+                             sendJobInfoUpdate(proc, meta);
+                         }
+                         catch (IgniteCheckedException e) {
+                             if (log.isDebugEnabled())
+                                 log.debug("Failed to initialize child process (will skip job state notification) " +
+                                     "[jobId=" + meta.jobId() + ", meta=" + meta + ", err=" + e + ']');
+                         }
+ 
+                     }
+                 });
+             }
+         }
+         else if (ctx.isParticipating(meta)) {
+             HadoopJob job;
+ 
+             try {
+                 job = jobTracker.job(meta.jobId(), meta.jobInfo());
+             }
+             catch (IgniteCheckedException e) {
+                 U.error(log, "Failed to get job: " + meta.jobId(), e);
+ 
+                 return;
+             }
+ 
+             startProcess(job, meta.mapReducePlan());
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("ConstantConditions")
+     @Override public void run(final HadoopJob job, final Collection<HadoopTaskInfo> tasks) throws IgniteCheckedException {
+         if (!busyLock.tryReadLock()) {
+             if (log.isDebugEnabled())
+                 log.debug("Failed to start hadoop tasks (grid is stopping, will ignore).");
+ 
+             return;
+         }
+ 
+         try {
+             HadoopProcess proc = runningProcsByJobId.get(job.id());
+ 
+             HadoopTaskType taskType = F.first(tasks).type();
+ 
+             if (taskType == HadoopTaskType.SETUP || taskType == HadoopTaskType.ABORT ||
+                 taskType == HadoopTaskType.COMMIT) {
+                 if (proc == null || proc.terminated()) {
+                     runningProcsByJobId.remove(job.id(), proc);
+ 
+                     // Start new process for ABORT task since previous processes were killed.
+                     proc = startProcess(job, jobTracker.plan(job.id()));
+ 
+                     if (log.isDebugEnabled())
+                         log.debug("Starting new process for maintenance task [jobId=" + job.id() +
+                             ", proc=" + proc + ", taskType=" + taskType + ']');
+                 }
+             }
+             else
+                 assert proc != null : "Missing started process for task execution request: " + job.id() +
+                     ", tasks=" + tasks;
+ 
+             final HadoopProcess proc0 = proc;
+ 
+             proc.initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() {
+                 @Override public void apply(
+                     IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) {
+                     if (!busyLock.tryReadLock())
+                         return;
+ 
+                     try {
+                         f.get();
+ 
+                         proc0.addTasks(tasks);
+ 
+                         if (log.isDebugEnabled())
+                             log.debug("Sending task execution request to child process [jobId=" + job.id() +
+                                 ", proc=" + proc0 + ", tasks=" + tasks + ']');
+ 
+                         sendExecutionRequest(proc0, job, tasks);
+                     }
+                     catch (IgniteCheckedException e) {
+                         notifyTasksFailed(tasks, FAILED, e);
+                     }
+                     finally {
+                         busyLock.readUnlock();
+                     }
+                 }
+             });
+         }
+         finally {
+             busyLock.readUnlock();
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void cancelTasks(HadoopJobId jobId) {
+         HadoopProcess proc = runningProcsByJobId.get(jobId);
+ 
+         if (proc != null)
+             proc.terminate();
+     }
+ 
+     /**
+      * Sends execution request to remote node.
+      *
+      * @param proc Process to send request to.
+      * @param job Job instance.
+      * @param tasks Collection of tasks to execute in started process.
+      */
+     private void sendExecutionRequest(HadoopProcess proc, HadoopJob job, Collection<HadoopTaskInfo> tasks)
+         throws IgniteCheckedException {
+         // Must synchronize since concurrent process crash may happen and will receive onConnectionLost().
+         proc.lock();
+ 
+         try {
+             if (proc.terminated()) {
+                 notifyTasksFailed(tasks, CRASHED, null);
+ 
+                 return;
+             }
+ 
+             HadoopTaskExecutionRequest req = new HadoopTaskExecutionRequest();
+ 
+             req.jobId(job.id());
+             req.jobInfo(job.info());
+             req.tasks(tasks);
+ 
+             comm.sendMessage(proc.descriptor(), req);
+         }
+         finally {
+             proc.unlock();
+         }
+     }
+ 
+     /**
+      * @return External task metadata.
+      */
+     private HadoopExternalTaskMetadata buildTaskMeta() {
+         HadoopExternalTaskMetadata meta = new HadoopExternalTaskMetadata();
+ 
+         meta.classpath(Arrays.asList(System.getProperty("java.class.path").split(File.pathSeparator)));
+         meta.jvmOptions(Arrays.asList("-Xmx1g", "-ea", "-XX:+UseConcMarkSweepGC", "-XX:+CMSClassUnloadingEnabled",
+             "-DIGNITE_HOME=" + U.getIgniteHome()));
+ 
+         return meta;
+     }
+ 
+     /**
+      * @param tasks Tasks to notify about.
+      * @param state Fail state.
+      * @param e Optional error.
+      */
+     private void notifyTasksFailed(Iterable<HadoopTaskInfo> tasks, HadoopTaskState state, Throwable e) {
+         HadoopTaskStatus fail = new HadoopTaskStatus(state, e);
+ 
+         for (HadoopTaskInfo task : tasks)
+             jobTracker.onTaskFinished(task, fail);
+     }
+ 
+     /**
+      * Starts process template that will be ready to execute Hadoop tasks.
+      *
+      * @param job Job instance.
+      * @param plan Map reduce plan.
+      */
+     private HadoopProcess startProcess(final HadoopJob job, final HadoopMapReducePlan plan) {
+         final UUID childProcId = UUID.randomUUID();
+ 
+         HadoopJobId jobId = job.id();
+ 
+         final HadoopProcessFuture fut = new HadoopProcessFuture(childProcId, jobId, ctx.kernalContext());
+ 
+         final HadoopProcess proc = new HadoopProcess(jobId, fut, plan.reducers(ctx.localNodeId()));
+ 
+         HadoopProcess old = runningProcsByJobId.put(jobId, proc);
+ 
+         assert old == null;
+ 
+         old = runningProcsByProcId.put(childProcId, proc);
+ 
+         assert old == null;
+ 
+         ctx.kernalContext().closure().runLocalSafe(new Runnable() {
+             @Override public void run() {
+                 if (!busyLock.tryReadLock()) {
+                     fut.onDone(new IgniteCheckedException("Failed to start external process (grid is stopping)."));
+ 
+                     return;
+                 }
+ 
+                 try {
+                     HadoopExternalTaskMetadata startMeta = buildTaskMeta();
+ 
+                     if (log.isDebugEnabled())
+                         log.debug("Created hadoop child process metadata for job [job=" + job +
+                             ", childProcId=" + childProcId + ", taskMeta=" + startMeta + ']');
+ 
+                     Process proc = startJavaProcess(childProcId, startMeta, job);
+ 
+                     BufferedReader rdr = new BufferedReader(new InputStreamReader(proc.getInputStream()));
+ 
+                     String line;
+ 
+                     // Read up all the process output.
+                     while ((line = rdr.readLine()) != null) {
+                         if (log.isDebugEnabled())
+                             log.debug("Tracing process output: " + line);
+ 
+                         if ("Started".equals(line)) {
+                             // Process started successfully, it should not write anything more to the output stream.
+                             if (log.isDebugEnabled())
+                                 log.debug("Successfully started child process [childProcId=" + childProcId +
+                                     ", meta=" + job + ']');
+ 
+                             fut.onProcessStarted(proc);
+ 
+                             break;
+                         }
+                         else if ("Failed".equals(line)) {
+                             StringBuilder sb = new StringBuilder("Failed to start child process: " + job + "\n");
+ 
+                             while ((line = rdr.readLine()) != null)
+                                 sb.append("    ").append(line).append("\n");
+ 
+                             // Cut last character.
+                             sb.setLength(sb.length() - 1);
+ 
+                             log.warning(sb.toString());
+ 
+                             fut.onDone(new IgniteCheckedException(sb.toString()));
+ 
+                             break;
+                         }
+                     }
+                 }
+                 catch (Throwable e) {
+                     fut.onDone(new IgniteCheckedException("Failed to initialize child process: " + job, e));
+                 }
+                 finally {
+                     busyLock.readUnlock();
+                 }
+             }
+         }, true);
+ 
+         fut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() {
+             @Override public void apply(IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) {
+                 try {
+                     // Make sure there were no exceptions.
+                     f.get();
+ 
+                     prepareForJob(proc, job, plan);
+                 }
+                 catch (IgniteCheckedException ignore) {
+                     // Exception is printed in future's onDone() method.
+                 }
+             }
+         });
+ 
+         return proc;
+     }
+ 
+     /**
+      * Checks that java local command is available.
+      *
+      * @throws IgniteCheckedException If initialization failed.
+      */
+     private void initJavaCommand() throws IgniteCheckedException {
+         String javaHome = System.getProperty("java.home");
+ 
+         if (javaHome == null)
+             javaHome = System.getenv("JAVA_HOME");
+ 
+         if (javaHome == null)
+             throw new IgniteCheckedException("Failed to locate JAVA_HOME.");
+ 
+         javaCmd = javaHome + File.separator + "bin" + File.separator + (U.isWindows() ? "java.exe" : "java");
+ 
+         try {
+             Process proc = new ProcessBuilder(javaCmd, "-version").redirectErrorStream(true).start();
+ 
+             Collection<String> out = readProcessOutput(proc);
+ 
+             int res = proc.waitFor();
+ 
+             if (res != 0)
+                 throw new IgniteCheckedException("Failed to execute 'java -version' command (process finished with nonzero " +
+                     "code) [exitCode=" + res + ", javaCmd='" + javaCmd + "', msg=" + F.first(out) + ']');
+ 
+             if (log.isInfoEnabled()) {
+                 log.info("Will use java for external task execution: ");
+ 
+                 for (String s : out)
+                     log.info("    " + s);
+             }
+         }
+         catch (IOException e) {
+             throw new IgniteCheckedException("Failed to check java for external task execution.", e);
+         }
+         catch (InterruptedException e) {
+             Thread.currentThread().interrupt();
+ 
+             throw new IgniteCheckedException("Failed to wait for process completion (thread got interrupted).", e);
+         }
+     }
+ 
+     /**
+      * Reads process output line-by-line.
+      *
+      * @param proc Process to read output.
+      * @return Read lines.
+      * @throws IOException If read failed.
+      */
+     private Collection<String> readProcessOutput(Process proc) throws IOException {
+         BufferedReader rdr = new BufferedReader(new InputStreamReader(proc.getInputStream()));
+ 
+         Collection<String> res = new ArrayList<>();
+ 
+         String s;
+ 
+         while ((s = rdr.readLine()) != null)
+             res.add(s);
+ 
+         return res;
+     }
+ 
+     /**
+      * Builds process from metadata.
+      *
+      * @param childProcId Child process ID.
+      * @param startMeta Metadata.
+      * @param job Job.
+      * @return Started process.
+      */
+     private Process startJavaProcess(UUID childProcId, HadoopExternalTaskMetadata startMeta,
+         HadoopJob job) throws Exception {
+         String outFldr = jobWorkFolder(job.id()) + File.separator + childProcId;
+ 
+         if (log.isDebugEnabled())
+             log.debug("Will write process log output to: " + outFldr);
+ 
+         List<String> cmd = new ArrayList<>();
+ 
+         File workDir = U.resolveWorkDirectory("", false);
+ 
+         cmd.add(javaCmd);
+         cmd.addAll(startMeta.jvmOptions());
+         cmd.add("-cp");
+         cmd.add(buildClasspath(startMeta.classpath()));
+         cmd.add(HadoopExternalProcessStarter.class.getName());
+         cmd.add("-cpid");
+         cmd.add(String.valueOf(childProcId));
+         cmd.add("-ppid");
+         cmd.add(String.valueOf(nodeDesc.processId()));
+         cmd.add("-nid");
+         cmd.add(String.valueOf(nodeDesc.parentNodeId()));
+         cmd.add("-addr");
+         cmd.add(nodeDesc.address());
+         cmd.add("-tport");
+         cmd.add(String.valueOf(nodeDesc.tcpPort()));
+         cmd.add("-sport");
+         cmd.add(String.valueOf(nodeDesc.sharedMemoryPort()));
+         cmd.add("-out");
+         cmd.add(outFldr);
+         cmd.add("-wd");
+         cmd.add(workDir.getAbsolutePath());
+ 
+         return new ProcessBuilder(cmd)
+             .redirectErrorStream(true)
+             .directory(workDir)
+             .start();
+     }
+ 
+     /**
+      * Gets job work folder.
+      *
+      * @param jobId Job ID.
+      * @return Job work folder.
+      */
+     private String jobWorkFolder(HadoopJobId jobId) {
+         return outputBase + File.separator + "Job_" + jobId;
+     }
+ 
+     /**
+      * @param cp Classpath collection.
+      * @return Classpath string.
+      */
+     private String buildClasspath(Collection<String> cp) {
+         assert !cp.isEmpty();
+ 
+         StringBuilder sb = new StringBuilder();
+ 
+         for (String s : cp)
+             sb.append(s).append(pathSep);
+ 
+         sb.setLength(sb.length() - 1);
+ 
+         return sb.toString();
+     }
+ 
+     /**
+      * Sends job info update request to remote process.
+      *
+      * @param proc Process to send request to.
+      * @param meta Job metadata.
+      */
+     private void sendJobInfoUpdate(HadoopProcess proc, HadoopJobMetadata meta) {
+         Map<Integer, HadoopProcessDescriptor> rdcAddrs = meta.reducersAddresses();
+ 
+         int rdcNum = meta.mapReducePlan().reducers();
+ 
+         HadoopProcessDescriptor[] addrs = null;
+ 
+         if (rdcAddrs != null && rdcAddrs.size() == rdcNum) {
+             addrs = new HadoopProcessDescriptor[rdcNum];
+ 
+             for (int i = 0; i < rdcNum; i++) {
+                 HadoopProcessDescriptor desc = rdcAddrs.get(i);
+ 
+                 assert desc != null : "Missing reducing address [meta=" + meta + ", rdc=" + i + ']';
+ 
+                 addrs[i] = desc;
+             }
+         }
+ 
+         try {
+             comm.sendMessage(proc.descriptor(), new HadoopJobInfoUpdateRequest(proc.jobId, meta.phase(), addrs));
+         }
+         catch (IgniteCheckedException e) {
+             if (!proc.terminated()) {
+                 log.error("Failed to send job state update message to remote child process (will kill the process) " +
+                     "[jobId=" + proc.jobId + ", meta=" + meta + ']', e);
+ 
+                 proc.terminate();
+             }
+         }
+     }
+ 
+     /**
+      * Sends prepare request to remote process.
+      *
+      * @param proc Process to send request to.
+      * @param job Job.
+      * @param plan Map reduce plan.
+      */
+     private void prepareForJob(HadoopProcess proc, HadoopJob job, HadoopMapReducePlan plan) {
+         try {
+             comm.sendMessage(proc.descriptor(), new HadoopPrepareForJobRequest(job.id(), job.info(),
+                 plan.reducers(), plan.reducers(ctx.localNodeId())));
+         }
+         catch (IgniteCheckedException e) {
+             U.error(log, "Failed to send job prepare request to remote process [proc=" + proc + ", job=" + job +
+                 ", plan=" + plan + ']', e);
+ 
+             proc.terminate();
+         }
+     }
+ 
+     /**
+      * Processes task finished message.
+      *
+      * @param desc Remote process descriptor.
+      * @param taskMsg Task finished message.
+      */
+     private void processTaskFinishedMessage(HadoopProcessDescriptor desc, HadoopTaskFinishedMessage taskMsg) {
+         HadoopProcess proc = runningProcsByProcId.get(desc.processId());
+ 
+         if (proc != null)
+             proc.removeTask(taskMsg.taskInfo());
+ 
+         jobTracker.onTaskFinished(taskMsg.taskInfo(), taskMsg.status());
+     }
+ 
+     /**
+      *
+      */
+     private class MessageListener implements HadoopMessageListener {
+         /** {@inheritDoc} */
+         @Override public void onMessageReceived(HadoopProcessDescriptor desc, HadoopMessage msg) {
+             if (!busyLock.tryReadLock())
+                 return;
+ 
+             try {
+                 if (msg instanceof HadoopProcessStartedAck) {
+                     HadoopProcess proc = runningProcsByProcId.get(desc.processId());
+ 
+                     assert proc != null : "Missing child process for processId: " + desc;
+ 
+                     HadoopProcessFuture fut = proc.initFut;
+ 
+                     if (fut != null)
+                         fut.onReplyReceived(desc);
+                     // Safety.
+                     else
+                         log.warning("Failed to find process start future (will ignore): " + desc);
+                 }
+                 else if (msg instanceof HadoopTaskFinishedMessage) {
+                     HadoopTaskFinishedMessage taskMsg = (HadoopTaskFinishedMessage)msg;
+ 
+                     processTaskFinishedMessage(desc, taskMsg);
+                 }
+                 else
+                     log.warning("Unexpected message received by node [desc=" + desc + ", msg=" + msg + ']');
+             }
+             finally {
+                 busyLock.readUnlock();
+             }
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void onConnectionLost(HadoopProcessDescriptor desc) {
+             if (!busyLock.tryReadLock())
+                 return;
+ 
+             try {
+                 if (desc == null) {
+                     U.warn(log, "Handshake failed.");
+ 
+                     return;
+                 }
+ 
+                 // Notify job tracker about failed tasks.
+                 HadoopProcess proc = runningProcsByProcId.get(desc.processId());
+ 
+                 if (proc != null) {
+                     Collection<HadoopTaskInfo> tasks = proc.tasks();
+ 
+                     if (!F.isEmpty(tasks)) {
+                         log.warning("Lost connection with alive process (will terminate): " + desc);
+ 
+                         HadoopTaskStatus status = new HadoopTaskStatus(CRASHED,
+                             new IgniteCheckedException("Failed to run tasks (external process finished unexpectedly): " + desc));
+ 
+                         for (HadoopTaskInfo info : tasks)
+                             jobTracker.onTaskFinished(info, status);
+ 
+                         runningProcsByJobId.remove(proc.jobId(), proc);
+                     }
+ 
+                     // Safety.
+                     proc.terminate();
+                 }
+             }
+             finally {
+                 busyLock.readUnlock();
+             }
+         }
+     }
+ 
+     /**
+      * Hadoop process.
+      */
+     private static class HadoopProcess extends ReentrantLock {
+         /** */
+         private static final long serialVersionUID = 0L;
+ 
+         /** Job ID. */
+         private final HadoopJobId jobId;
+ 
+         /** Process. */
+         private Process proc;
+ 
+         /** Init future. Completes when process is ready to receive messages. */
+         private final HadoopProcessFuture initFut;
+ 
+         /** Process descriptor. */
+         private HadoopProcessDescriptor procDesc;
+ 
+         /** Reducers planned for this process. */
+         private Collection<Integer> reducers;
+ 
+         /** Tasks. */
+         private final Collection<HadoopTaskInfo> tasks = new ConcurrentLinkedDeque8<>();
+ 
+         /** Terminated flag. */
+         private volatile boolean terminated;
+ 
+         /**
+          * @param jobId Job ID.
+          * @param initFut Init future.
+          */
+         private HadoopProcess(HadoopJobId jobId, HadoopProcessFuture initFut,
+             int[] reducers) {
+             this.jobId = jobId;
+             this.initFut = initFut;
+ 
+             if (!F.isEmpty(reducers)) {
+                 this.reducers = new ArrayList<>(reducers.length);
+ 
+                 for (int r : reducers)
+                     this.reducers.add(r);
+             }
+         }
+ 
+         /**
+          * @return Communication process descriptor.
+          */
+         private HadoopProcessDescriptor descriptor() {
+             return procDesc;
+         }
+ 
+         /**
+          * @return Job ID.
+          */
+         public HadoopJobId jobId() {
+             return jobId;
+         }
+ 
+         /**
+          * Initialized callback.
+          *
+          * @param proc Java process representation.
+          * @param procDesc Process descriptor.
+          */
+         private void onInitialized(Process proc, HadoopProcessDescriptor procDesc) {
+             this.proc = proc;
+             this.procDesc = procDesc;
+         }
+ 
+         /**
+          * Terminates process (kills it).
+          */
+         private void terminate() {
+             // Guard against concurrent message sending.
+             lock();
+ 
+             try {
+                 terminated = true;
+ 
+                 if (!initFut.isDone())
+                     initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() {
+                         @Override public void apply(
+                             IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) {
+                             proc.destroy();
+                         }
+                     });
+                 else
+                     proc.destroy();
+             }
+             finally {
+                 unlock();
+             }
+         }
+ 
+         /**
+          * @return Terminated flag.
+          */
+         private boolean terminated() {
+             return terminated;
+         }
+ 
+         /**
+          * Sets process tasks.
+          *
+          * @param tasks Tasks to set.
+          */
+         private void addTasks(Collection<HadoopTaskInfo> tasks) {
+             this.tasks.addAll(tasks);
+         }
+ 
+         /**
+          * Removes task when it was completed.
+          *
+          * @param task Task to remove.
+          */
+         private void removeTask(HadoopTaskInfo task) {
+             if (tasks != null)
+                 tasks.remove(task);
+         }
+ 
+         /**
+          * @return Collection of tasks.
+          */
+         private Collection<HadoopTaskInfo> tasks() {
+             return tasks;
+         }
+ 
+         /**
+          * @return Planned reducers.
+          */
+         private Collection<Integer> reducers() {
+             return reducers;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public String toString() {
+             return S.toString(HadoopProcess.class, this);
+         }
+     }
+ 
+     /**
+      *
+      */
+     private class HadoopProcessFuture extends GridFutureAdapter<IgniteBiTuple<Process, HadoopProcessDescriptor>> {
+         /** */
+         private static final long serialVersionUID = 0L;
+ 
+         /** Child process ID. */
+         private UUID childProcId;
+ 
+         /** Job ID. */
+         private HadoopJobId jobId;
+ 
+         /** Process descriptor. */
+         private HadoopProcessDescriptor desc;
+ 
+         /** Running process. */
+         private Process proc;
+ 
+         /** Process started flag. */
+         private volatile boolean procStarted;
+ 
+         /** Reply received flag. */
+         private volatile boolean replyReceived;
+ 
+         /** Logger. */
+         private final IgniteLogger log = HadoopExternalTaskExecutor.this.log;
+ 
+         /**
+          * Empty constructor.
+          */
+         public HadoopProcessFuture() {
+             // No-op.
+         }
+ 
+         /**
+          * @param ctx Kernal context.
+          */
 -        private HadoopProcessFuture(UUID childProcId, HadoopJobId jobId, GridKernalContext ctx) {
 -            super(ctx);
 -
++        private GridHadoopProcessFuture(UUID childProcId, HadoopJobId jobId, GridKernalContext ctx) {
+             this.childProcId = childProcId;
+             this.jobId = jobId;
+         }
+ 
+         /**
+          * Process started callback.
+          */
+         public void onProcessStarted(Process proc) {
+             this.proc = proc;
+ 
+             procStarted = true;
+ 
+             if (procStarted && replyReceived)
+                 onDone(F.t(proc, desc));
+         }
+ 
+         /**
+          * Reply received callback.
+          */
+         public void onReplyReceived(HadoopProcessDescriptor desc) {
+             assert childProcId.equals(desc.processId());
+ 
+             this.desc = desc;
+ 
+             replyReceived = true;
+ 
+             if (procStarted && replyReceived)
+                 onDone(F.t(proc, desc));
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public boolean onDone(@Nullable IgniteBiTuple<Process, HadoopProcessDescriptor> res,
+             @Nullable Throwable err) {
+             if (err == null) {
+                 HadoopProcess proc = runningProcsByProcId.get(childProcId);
+ 
+                 assert proc != null;
+ 
+                 assert proc.initFut == this;
+ 
+                 proc.onInitialized(res.get1(), res.get2());
+ 
+                 if (!F.isEmpty(proc.reducers()))
+                     jobTracker.onExternalMappersInitialized(jobId, proc.reducers(), desc);
+             }
+             else {
+                 // Clean up since init failed.
+                 runningProcsByJobId.remove(jobId);
+                 runningProcsByProcId.remove(childProcId);
+             }
+ 
+             if (super.onDone(res, err)) {
+                 if (err == null) {
+                     if (log.isDebugEnabled())
+                         log.debug("Initialized child process for external task execution [jobId=" + jobId +
+                             ", desc=" + desc + ", initTime=" + duration() + ']');
+                 }
+                 else
+                     U.error(log, "Failed to initialize child process for external task execution [jobId=" + jobId +
+                         ", desc=" + desc + ']', err);
+ 
+                 return true;
+             }
+ 
+             return false;
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17ac3602/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
----------------------------------------------------------------------
diff --cc modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
index 0000000,e95b8cb..831885f
mode 000000,100644..100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
@@@ -1,0 -1,440 +1,440 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.hadoop.*;
+ import org.apache.ignite.internal.processors.hadoop.message.*;
+ import org.apache.ignite.internal.processors.hadoop.shuffle.*;
+ import org.apache.ignite.internal.processors.hadoop.taskexecutor.*;
+ import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*;
+ import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.*;
+ import org.apache.ignite.internal.util.future.*;
+ import org.apache.ignite.internal.util.lang.*;
+ import org.apache.ignite.internal.util.offheap.unsafe.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ 
+ import java.util.concurrent.*;
+ import java.util.concurrent.atomic.*;
+ 
+ import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.*;
+ 
+ /**
+  * Hadoop process base.
+  */
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ public class HadoopChildProcessRunner {
+     /** Node process descriptor. */
+     private HadoopProcessDescriptor nodeDesc;
+ 
+     /** Message processing executor service. */
+     private ExecutorService msgExecSvc;
+ 
+     /** Task executor service. */
+     private HadoopExecutorService execSvc;
+ 
+     /** */
+     protected GridUnsafeMemory mem = new GridUnsafeMemory(0);
+ 
+     /** External communication. */
+     private HadoopExternalCommunication comm;
+ 
+     /** Logger. */
+     private IgniteLogger log;
+ 
+     /** Init guard. */
+     private final AtomicBoolean initGuard = new AtomicBoolean();
+ 
+     /** Start time. */
+     private long startTime;
+ 
+     /** Init future. */
 -    private final GridFutureAdapterEx<?> initFut = new GridFutureAdapterEx<>();
++    private final GridFutureAdapter<?> initFut = new GridFutureAdapter<>();
+ 
+     /** Job instance. */
+     private HadoopJob job;
+ 
+     /** Number of uncompleted tasks. */
+     private final AtomicInteger pendingTasks = new AtomicInteger();
+ 
+     /** Shuffle job. */
+     private HadoopShuffleJob<HadoopProcessDescriptor> shuffleJob;
+ 
+     /** Concurrent mappers. */
+     private int concMappers;
+ 
+     /** Concurrent reducers. */
+     private int concReducers;
+ 
+     /**
+      * Starts child process runner.
+      */
+     public void start(HadoopExternalCommunication comm, HadoopProcessDescriptor nodeDesc,
+         ExecutorService msgExecSvc, IgniteLogger parentLog)
+         throws IgniteCheckedException {
+         this.comm = comm;
+         this.nodeDesc = nodeDesc;
+         this.msgExecSvc = msgExecSvc;
+ 
+         comm.setListener(new MessageListener());
+         log = parentLog.getLogger(HadoopChildProcessRunner.class);
+ 
+         startTime = U.currentTimeMillis();
+ 
+         // At this point node knows that this process has started.
+         comm.sendMessage(this.nodeDesc, new HadoopProcessStartedAck());
+     }
+ 
+     /**
+      * Initializes process for task execution.
+      *
+      * @param req Initialization request.
+      */
+     private void prepareProcess(HadoopPrepareForJobRequest req) {
+         if (initGuard.compareAndSet(false, true)) {
+             try {
+                 if (log.isDebugEnabled())
+                     log.debug("Initializing external hadoop task: " + req);
+ 
+                 assert job == null;
+ 
+                 job = req.jobInfo().createJob(req.jobId(), log);
+ 
+                 job.initialize(true, nodeDesc.processId());
+ 
+                 shuffleJob = new HadoopShuffleJob<>(comm.localProcessDescriptor(), log, job, mem,
+                     req.totalReducerCount(), req.localReducers());
+ 
+                 initializeExecutors(req);
+ 
+                 if (log.isDebugEnabled())
+                     log.debug("External process initialized [initWaitTime=" +
+                         (U.currentTimeMillis() - startTime) + ']');
+ 
+                 initFut.onDone(null, null);
+             }
+             catch (IgniteCheckedException e) {
+                 U.error(log, "Failed to initialize process: " + req, e);
+ 
+                 initFut.onDone(e);
+             }
+         }
+         else
+             log.warning("Duplicate initialize process request received (will ignore): " + req);
+     }
+ 
+     /**
+      * @param req Task execution request.
+      */
+     private void runTasks(final HadoopTaskExecutionRequest req) {
+         if (!initFut.isDone() && log.isDebugEnabled())
+             log.debug("Will wait for process initialization future completion: " + req);
+ 
+         initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() {
+             @Override public void apply(IgniteInternalFuture<?> f) {
+                 try {
+                     // Make sure init was successful.
+                     f.get();
+ 
+                     boolean set = pendingTasks.compareAndSet(0, req.tasks().size());
+ 
+                     assert set;
+ 
+                     HadoopTaskInfo info = F.first(req.tasks());
+ 
+                     assert info != null;
+ 
+                     int size = info.type() == MAP ? concMappers : concReducers;
+ 
+ //                    execSvc.setCorePoolSize(size);
+ //                    execSvc.setMaximumPoolSize(size);
+ 
+                     if (log.isDebugEnabled())
+                         log.debug("Set executor service size for task type [type=" + info.type() +
+                             ", size=" + size + ']');
+ 
+                     for (HadoopTaskInfo taskInfo : req.tasks()) {
+                         if (log.isDebugEnabled())
+                             log.debug("Submitted task for external execution: " + taskInfo);
+ 
+                         execSvc.submit(new HadoopRunnableTask(log, job, mem, taskInfo, nodeDesc.parentNodeId()) {
+                             @Override protected void onTaskFinished(HadoopTaskStatus status) {
+                                 onTaskFinished0(this, status);
+                             }
+ 
+                             @Override protected HadoopTaskInput createInput(HadoopTaskContext ctx)
+                                 throws IgniteCheckedException {
+                                 return shuffleJob.input(ctx);
+                             }
+ 
+                             @Override protected HadoopTaskOutput createOutput(HadoopTaskContext ctx)
+                                 throws IgniteCheckedException {
+                                 return shuffleJob.output(ctx);
+                             }
+                         });
+                     }
+                 }
+                 catch (IgniteCheckedException e) {
+                     for (HadoopTaskInfo info : req.tasks())
+                         notifyTaskFinished(info, new HadoopTaskStatus(HadoopTaskState.FAILED, e), false);
+                 }
+             }
+         });
+     }
+ 
+     /**
+      * Creates executor services.
+      *
+      * @param req Init child process request.
+      */
+     private void initializeExecutors(HadoopPrepareForJobRequest req) {
+         int cpus = Runtime.getRuntime().availableProcessors();
+ //
+ //        concMappers = get(req.jobInfo(), EXTERNAL_CONCURRENT_MAPPERS, cpus);
+ //        concReducers = get(req.jobInfo(), EXTERNAL_CONCURRENT_REDUCERS, cpus);
+ 
+         execSvc = new HadoopExecutorService(log, "", cpus * 2, 1024);
+     }
+ 
+     /**
+      * Updates external process map so that shuffle can proceed with sending messages to reducers.
+      *
+      * @param req Update request.
+      */
+     private void updateTasks(final HadoopJobInfoUpdateRequest req) {
+         initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() {
+             @Override public void apply(IgniteInternalFuture<?> gridFut) {
+                 assert initGuard.get();
+ 
+                 assert req.jobId().equals(job.id());
+ 
+                 if (req.reducersAddresses() != null) {
+                     if (shuffleJob.initializeReduceAddresses(req.reducersAddresses())) {
+                         shuffleJob.startSending("external",
+                             new IgniteInClosure2X<HadoopProcessDescriptor, HadoopShuffleMessage>() {
+                                 @Override public void applyx(HadoopProcessDescriptor dest,
+                                     HadoopShuffleMessage msg) throws IgniteCheckedException {
+                                     comm.sendMessage(dest, msg);
+                                 }
+                             });
+                     }
+                 }
+             }
+         });
+     }
+ 
+     /**
+      * Stops all executors and running tasks.
+      */
+     private void shutdown() {
+         if (execSvc != null)
+             execSvc.shutdown(5000);
+ 
+         if (msgExecSvc != null)
+             msgExecSvc.shutdownNow();
+ 
+         try {
+             job.dispose(true);
+         }
+         catch (IgniteCheckedException e) {
+             U.error(log, "Failed to dispose job.", e);
+         }
+     }
+ 
+     /**
+      * Notifies node about task finish.
+      *
+      * @param run Finished task runnable.
+      * @param status Task status.
+      */
+     private void onTaskFinished0(HadoopRunnableTask run, HadoopTaskStatus status) {
+         HadoopTaskInfo info = run.taskInfo();
+ 
+         int pendingTasks0 = pendingTasks.decrementAndGet();
+ 
+         if (log.isDebugEnabled())
+             log.debug("Hadoop task execution finished [info=" + info
+                 + ", state=" + status.state() + ", waitTime=" + run.waitTime() + ", execTime=" + run.executionTime() +
+                 ", pendingTasks=" + pendingTasks0 +
+                 ", err=" + status.failCause() + ']');
+ 
+         assert info.type() == MAP || info.type() == REDUCE : "Only MAP or REDUCE tasks are supported.";
+ 
+         boolean flush = pendingTasks0 == 0 && info.type() == MAP;
+ 
+         notifyTaskFinished(info, status, flush);
+     }
+ 
+     /**
+      * @param taskInfo Finished task info.
+      * @param status Task status.
+      */
+     private void notifyTaskFinished(final HadoopTaskInfo taskInfo, final HadoopTaskStatus status,
+         boolean flush) {
+ 
+         final HadoopTaskState state = status.state();
+         final Throwable err = status.failCause();
+ 
+         if (!flush) {
+             try {
+                 if (log.isDebugEnabled())
+                     log.debug("Sending notification to parent node [taskInfo=" + taskInfo + ", state=" + state +
+                         ", err=" + err + ']');
+ 
+                 comm.sendMessage(nodeDesc, new HadoopTaskFinishedMessage(taskInfo, status));
+             }
+             catch (IgniteCheckedException e) {
+                 log.error("Failed to send message to parent node (will terminate child process).", e);
+ 
+                 shutdown();
+ 
+                 terminate();
+             }
+         }
+         else {
+             if (log.isDebugEnabled())
+                 log.debug("Flushing shuffle messages before sending last task completion notification [taskInfo=" +
+                     taskInfo + ", state=" + state + ", err=" + err + ']');
+ 
+             final long start = U.currentTimeMillis();
+ 
+             try {
+                 shuffleJob.flush().listenAsync(new CI1<IgniteInternalFuture<?>>() {
+                     @Override public void apply(IgniteInternalFuture<?> f) {
+                         long end = U.currentTimeMillis();
+ 
+                         if (log.isDebugEnabled())
+                             log.debug("Finished flushing shuffle messages [taskInfo=" + taskInfo +
+                                 ", flushTime=" + (end - start) + ']');
+ 
+                         try {
+                             // Check for errors on shuffle.
+                             f.get();
+ 
+                             notifyTaskFinished(taskInfo, status, false);
+                         }
+                         catch (IgniteCheckedException e) {
+                             log.error("Failed to flush shuffle messages (will fail the task) [taskInfo=" + taskInfo +
+                                 ", state=" + state + ", err=" + err + ']', e);
+ 
+                             notifyTaskFinished(taskInfo,
+                                 new HadoopTaskStatus(HadoopTaskState.FAILED, e), false);
+                         }
+                     }
+                 });
+             }
+             catch (IgniteCheckedException e) {
+                 log.error("Failed to flush shuffle messages (will fail the task) [taskInfo=" + taskInfo +
+                     ", state=" + state + ", err=" + err + ']', e);
+ 
+                 notifyTaskFinished(taskInfo, new HadoopTaskStatus(HadoopTaskState.FAILED, e), false);
+             }
+         }
+     }
+ 
+     /**
+      * Checks if message was received from parent node and prints warning if not.
+      *
+      * @param desc Sender process ID.
+      * @param msg Received message.
+      * @return {@code True} if received from parent node.
+      */
+     private boolean validateNodeMessage(HadoopProcessDescriptor desc, HadoopMessage msg) {
+         if (!nodeDesc.processId().equals(desc.processId())) {
+             log.warning("Received process control request from unknown process (will ignore) [desc=" + desc +
+                 ", msg=" + msg + ']');
+ 
+             return false;
+         }
+ 
+         return true;
+     }
+ 
+     /**
+      * Stops execution of this process.
+      */
+     private void terminate() {
+         System.exit(1);
+     }
+ 
+     /**
+      * Message listener.
+      */
+     private class MessageListener implements HadoopMessageListener {
+         /** {@inheritDoc} */
+         @Override public void onMessageReceived(final HadoopProcessDescriptor desc, final HadoopMessage msg) {
+             if (msg instanceof HadoopTaskExecutionRequest) {
+                 if (validateNodeMessage(desc, msg))
+                     runTasks((HadoopTaskExecutionRequest)msg);
+             }
+             else if (msg instanceof HadoopJobInfoUpdateRequest) {
+                 if (validateNodeMessage(desc, msg))
+                     updateTasks((HadoopJobInfoUpdateRequest)msg);
+             }
+             else if (msg instanceof HadoopPrepareForJobRequest) {
+                 if (validateNodeMessage(desc, msg))
+                     prepareProcess((HadoopPrepareForJobRequest)msg);
+             }
+             else if (msg instanceof HadoopShuffleMessage) {
+                 if (log.isTraceEnabled())
+                     log.trace("Received shuffle message [desc=" + desc + ", msg=" + msg + ']');
+ 
+                 initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() {
+                     @Override public void apply(IgniteInternalFuture<?> f) {
+                         try {
+                             HadoopShuffleMessage m = (HadoopShuffleMessage)msg;
+ 
+                             shuffleJob.onShuffleMessage(m);
+ 
+                             comm.sendMessage(desc, new HadoopShuffleAck(m.id(), m.jobId()));
+                         }
+                         catch (IgniteCheckedException e) {
+                             U.error(log, "Failed to process hadoop shuffle message [desc=" + desc + ", msg=" + msg + ']', e);
+                         }
+                     }
+                 });
+             }
+             else if (msg instanceof HadoopShuffleAck) {
+                 if (log.isTraceEnabled())
+                     log.trace("Received shuffle ack [desc=" + desc + ", msg=" + msg + ']');
+ 
+                 shuffleJob.onShuffleAck((HadoopShuffleAck)msg);
+             }
+             else
+                 log.warning("Unknown message received (will ignore) [desc=" + desc + ", msg=" + msg + ']');
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void onConnectionLost(HadoopProcessDescriptor desc) {
+             if (log.isDebugEnabled())
+                 log.debug("Lost connection with remote process: " + desc);
+ 
+             if (desc == null)
+                 U.warn(log, "Handshake failed.");
+             else if (desc.processId().equals(nodeDesc.processId())) {
+                 log.warning("Child process lost connection with parent node (will terminate child process).");
+ 
+                 shutdown();
+ 
+                 terminate();
+             }
+         }
+     }
+ }