You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/03/05 10:06:31 UTC

[20/51] incubator-ignite git commit: IGNITE-386: Squashed changes.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
new file mode 100644
index 0000000..e95b8cb
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.message.*;
+import org.apache.ignite.internal.processors.hadoop.shuffle.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.offheap.unsafe.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.*;
+
+/**
+ * Hadoop process base.
+ */
+@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+public class HadoopChildProcessRunner {
+    /** Node process descriptor. */
+    private HadoopProcessDescriptor nodeDesc;
+
+    /** Message processing executor service. */
+    private ExecutorService msgExecSvc;
+
+    /** Task executor service. */
+    private HadoopExecutorService execSvc;
+
+    /** */
+    protected GridUnsafeMemory mem = new GridUnsafeMemory(0);
+
+    /** External communication. */
+    private HadoopExternalCommunication comm;
+
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** Init guard. */
+    private final AtomicBoolean initGuard = new AtomicBoolean();
+
+    /** Start time. */
+    private long startTime;
+
+    /** Init future. */
+    private final GridFutureAdapterEx<?> initFut = new GridFutureAdapterEx<>();
+
+    /** Job instance. */
+    private HadoopJob job;
+
+    /** Number of uncompleted tasks. */
+    private final AtomicInteger pendingTasks = new AtomicInteger();
+
+    /** Shuffle job. */
+    private HadoopShuffleJob<HadoopProcessDescriptor> shuffleJob;
+
+    /** Concurrent mappers. */
+    private int concMappers;
+
+    /** Concurrent reducers. */
+    private int concReducers;
+
+    /**
+     * Starts child process runner.
+     */
+    public void start(HadoopExternalCommunication comm, HadoopProcessDescriptor nodeDesc,
+        ExecutorService msgExecSvc, IgniteLogger parentLog)
+        throws IgniteCheckedException {
+        this.comm = comm;
+        this.nodeDesc = nodeDesc;
+        this.msgExecSvc = msgExecSvc;
+
+        comm.setListener(new MessageListener());
+        log = parentLog.getLogger(HadoopChildProcessRunner.class);
+
+        startTime = U.currentTimeMillis();
+
+        // At this point node knows that this process has started.
+        comm.sendMessage(this.nodeDesc, new HadoopProcessStartedAck());
+    }
+
+    /**
+     * Initializes process for task execution.
+     *
+     * @param req Initialization request.
+     */
+    private void prepareProcess(HadoopPrepareForJobRequest req) {
+        if (initGuard.compareAndSet(false, true)) {
+            try {
+                if (log.isDebugEnabled())
+                    log.debug("Initializing external hadoop task: " + req);
+
+                assert job == null;
+
+                job = req.jobInfo().createJob(req.jobId(), log);
+
+                job.initialize(true, nodeDesc.processId());
+
+                shuffleJob = new HadoopShuffleJob<>(comm.localProcessDescriptor(), log, job, mem,
+                    req.totalReducerCount(), req.localReducers());
+
+                initializeExecutors(req);
+
+                if (log.isDebugEnabled())
+                    log.debug("External process initialized [initWaitTime=" +
+                        (U.currentTimeMillis() - startTime) + ']');
+
+                initFut.onDone(null, null);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to initialize process: " + req, e);
+
+                initFut.onDone(e);
+            }
+        }
+        else
+            log.warning("Duplicate initialize process request received (will ignore): " + req);
+    }
+
+    /**
+     * @param req Task execution request.
+     */
+    private void runTasks(final HadoopTaskExecutionRequest req) {
+        if (!initFut.isDone() && log.isDebugEnabled())
+            log.debug("Will wait for process initialization future completion: " + req);
+
+        initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() {
+            @Override public void apply(IgniteInternalFuture<?> f) {
+                try {
+                    // Make sure init was successful.
+                    f.get();
+
+                    boolean set = pendingTasks.compareAndSet(0, req.tasks().size());
+
+                    assert set;
+
+                    HadoopTaskInfo info = F.first(req.tasks());
+
+                    assert info != null;
+
+                    int size = info.type() == MAP ? concMappers : concReducers;
+
+//                    execSvc.setCorePoolSize(size);
+//                    execSvc.setMaximumPoolSize(size);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Set executor service size for task type [type=" + info.type() +
+                            ", size=" + size + ']');
+
+                    for (HadoopTaskInfo taskInfo : req.tasks()) {
+                        if (log.isDebugEnabled())
+                            log.debug("Submitted task for external execution: " + taskInfo);
+
+                        execSvc.submit(new HadoopRunnableTask(log, job, mem, taskInfo, nodeDesc.parentNodeId()) {
+                            @Override protected void onTaskFinished(HadoopTaskStatus status) {
+                                onTaskFinished0(this, status);
+                            }
+
+                            @Override protected HadoopTaskInput createInput(HadoopTaskContext ctx)
+                                throws IgniteCheckedException {
+                                return shuffleJob.input(ctx);
+                            }
+
+                            @Override protected HadoopTaskOutput createOutput(HadoopTaskContext ctx)
+                                throws IgniteCheckedException {
+                                return shuffleJob.output(ctx);
+                            }
+                        });
+                    }
+                }
+                catch (IgniteCheckedException e) {
+                    for (HadoopTaskInfo info : req.tasks())
+                        notifyTaskFinished(info, new HadoopTaskStatus(HadoopTaskState.FAILED, e), false);
+                }
+            }
+        });
+    }
+
+    /**
+     * Creates executor services.
+     *
+     * @param req Init child process request.
+     */
+    private void initializeExecutors(HadoopPrepareForJobRequest req) {
+        int cpus = Runtime.getRuntime().availableProcessors();
+//
+//        concMappers = get(req.jobInfo(), EXTERNAL_CONCURRENT_MAPPERS, cpus);
+//        concReducers = get(req.jobInfo(), EXTERNAL_CONCURRENT_REDUCERS, cpus);
+
+        execSvc = new HadoopExecutorService(log, "", cpus * 2, 1024);
+    }
+
+    /**
+     * Updates external process map so that shuffle can proceed with sending messages to reducers.
+     *
+     * @param req Update request.
+     */
+    private void updateTasks(final HadoopJobInfoUpdateRequest req) {
+        initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() {
+            @Override public void apply(IgniteInternalFuture<?> gridFut) {
+                assert initGuard.get();
+
+                assert req.jobId().equals(job.id());
+
+                if (req.reducersAddresses() != null) {
+                    if (shuffleJob.initializeReduceAddresses(req.reducersAddresses())) {
+                        shuffleJob.startSending("external",
+                            new IgniteInClosure2X<HadoopProcessDescriptor, HadoopShuffleMessage>() {
+                                @Override public void applyx(HadoopProcessDescriptor dest,
+                                    HadoopShuffleMessage msg) throws IgniteCheckedException {
+                                    comm.sendMessage(dest, msg);
+                                }
+                            });
+                    }
+                }
+            }
+        });
+    }
+
+    /**
+     * Stops all executors and running tasks.
+     */
+    private void shutdown() {
+        if (execSvc != null)
+            execSvc.shutdown(5000);
+
+        if (msgExecSvc != null)
+            msgExecSvc.shutdownNow();
+
+        try {
+            job.dispose(true);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to dispose job.", e);
+        }
+    }
+
+    /**
+     * Notifies node about task finish.
+     *
+     * @param run Finished task runnable.
+     * @param status Task status.
+     */
+    private void onTaskFinished0(HadoopRunnableTask run, HadoopTaskStatus status) {
+        HadoopTaskInfo info = run.taskInfo();
+
+        int pendingTasks0 = pendingTasks.decrementAndGet();
+
+        if (log.isDebugEnabled())
+            log.debug("Hadoop task execution finished [info=" + info
+                + ", state=" + status.state() + ", waitTime=" + run.waitTime() + ", execTime=" + run.executionTime() +
+                ", pendingTasks=" + pendingTasks0 +
+                ", err=" + status.failCause() + ']');
+
+        assert info.type() == MAP || info.type() == REDUCE : "Only MAP or REDUCE tasks are supported.";
+
+        boolean flush = pendingTasks0 == 0 && info.type() == MAP;
+
+        notifyTaskFinished(info, status, flush);
+    }
+
+    /**
+     * @param taskInfo Finished task info.
+     * @param status Task status.
+     */
+    private void notifyTaskFinished(final HadoopTaskInfo taskInfo, final HadoopTaskStatus status,
+        boolean flush) {
+
+        final HadoopTaskState state = status.state();
+        final Throwable err = status.failCause();
+
+        if (!flush) {
+            try {
+                if (log.isDebugEnabled())
+                    log.debug("Sending notification to parent node [taskInfo=" + taskInfo + ", state=" + state +
+                        ", err=" + err + ']');
+
+                comm.sendMessage(nodeDesc, new HadoopTaskFinishedMessage(taskInfo, status));
+            }
+            catch (IgniteCheckedException e) {
+                log.error("Failed to send message to parent node (will terminate child process).", e);
+
+                shutdown();
+
+                terminate();
+            }
+        }
+        else {
+            if (log.isDebugEnabled())
+                log.debug("Flushing shuffle messages before sending last task completion notification [taskInfo=" +
+                    taskInfo + ", state=" + state + ", err=" + err + ']');
+
+            final long start = U.currentTimeMillis();
+
+            try {
+                shuffleJob.flush().listenAsync(new CI1<IgniteInternalFuture<?>>() {
+                    @Override public void apply(IgniteInternalFuture<?> f) {
+                        long end = U.currentTimeMillis();
+
+                        if (log.isDebugEnabled())
+                            log.debug("Finished flushing shuffle messages [taskInfo=" + taskInfo +
+                                ", flushTime=" + (end - start) + ']');
+
+                        try {
+                            // Check for errors on shuffle.
+                            f.get();
+
+                            notifyTaskFinished(taskInfo, status, false);
+                        }
+                        catch (IgniteCheckedException e) {
+                            log.error("Failed to flush shuffle messages (will fail the task) [taskInfo=" + taskInfo +
+                                ", state=" + state + ", err=" + err + ']', e);
+
+                            notifyTaskFinished(taskInfo,
+                                new HadoopTaskStatus(HadoopTaskState.FAILED, e), false);
+                        }
+                    }
+                });
+            }
+            catch (IgniteCheckedException e) {
+                log.error("Failed to flush shuffle messages (will fail the task) [taskInfo=" + taskInfo +
+                    ", state=" + state + ", err=" + err + ']', e);
+
+                notifyTaskFinished(taskInfo, new HadoopTaskStatus(HadoopTaskState.FAILED, e), false);
+            }
+        }
+    }
+
+    /**
+     * Checks if message was received from parent node and prints warning if not.
+     *
+     * @param desc Sender process ID.
+     * @param msg Received message.
+     * @return {@code True} if received from parent node.
+     */
+    private boolean validateNodeMessage(HadoopProcessDescriptor desc, HadoopMessage msg) {
+        if (!nodeDesc.processId().equals(desc.processId())) {
+            log.warning("Received process control request from unknown process (will ignore) [desc=" + desc +
+                ", msg=" + msg + ']');
+
+            return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * Stops execution of this process.
+     */
+    private void terminate() {
+        System.exit(1);
+    }
+
+    /**
+     * Message listener.
+     */
+    private class MessageListener implements HadoopMessageListener {
+        /** {@inheritDoc} */
+        @Override public void onMessageReceived(final HadoopProcessDescriptor desc, final HadoopMessage msg) {
+            if (msg instanceof HadoopTaskExecutionRequest) {
+                if (validateNodeMessage(desc, msg))
+                    runTasks((HadoopTaskExecutionRequest)msg);
+            }
+            else if (msg instanceof HadoopJobInfoUpdateRequest) {
+                if (validateNodeMessage(desc, msg))
+                    updateTasks((HadoopJobInfoUpdateRequest)msg);
+            }
+            else if (msg instanceof HadoopPrepareForJobRequest) {
+                if (validateNodeMessage(desc, msg))
+                    prepareProcess((HadoopPrepareForJobRequest)msg);
+            }
+            else if (msg instanceof HadoopShuffleMessage) {
+                if (log.isTraceEnabled())
+                    log.trace("Received shuffle message [desc=" + desc + ", msg=" + msg + ']');
+
+                initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() {
+                    @Override public void apply(IgniteInternalFuture<?> f) {
+                        try {
+                            HadoopShuffleMessage m = (HadoopShuffleMessage)msg;
+
+                            shuffleJob.onShuffleMessage(m);
+
+                            comm.sendMessage(desc, new HadoopShuffleAck(m.id(), m.jobId()));
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to process hadoop shuffle message [desc=" + desc + ", msg=" + msg + ']', e);
+                        }
+                    }
+                });
+            }
+            else if (msg instanceof HadoopShuffleAck) {
+                if (log.isTraceEnabled())
+                    log.trace("Received shuffle ack [desc=" + desc + ", msg=" + msg + ']');
+
+                shuffleJob.onShuffleAck((HadoopShuffleAck)msg);
+            }
+            else
+                log.warning("Unknown message received (will ignore) [desc=" + desc + ", msg=" + msg + ']');
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onConnectionLost(HadoopProcessDescriptor desc) {
+            if (log.isDebugEnabled())
+                log.debug("Lost connection with remote process: " + desc);
+
+            if (desc == null)
+                U.warn(log, "Handshake failed.");
+            else if (desc.processId().equals(nodeDesc.processId())) {
+                log.warning("Child process lost connection with parent node (will terminate child process).");
+
+                shutdown();
+
+                terminate();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopExternalProcessStarter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopExternalProcessStarter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopExternalProcessStarter.java
new file mode 100644
index 0000000..3a94d43
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopExternalProcessStarter.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.logger.log4j.*;
+import org.apache.ignite.marshaller.optimized.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Hadoop external process base class.
+ */
+public class HadoopExternalProcessStarter {
+    /** Path to Log4j configuration file. */
+    public static final String DFLT_LOG4J_CONFIG = "config/ignite-log4j.xml";
+
+    /** Arguments. */
+    private Args args;
+
+    /** System out. */
+    private OutputStream out;
+
+    /** System err. */
+    private OutputStream err;
+
+    /**
+     * @param args Parsed arguments.
+     */
+    public HadoopExternalProcessStarter(Args args) {
+        this.args = args;
+    }
+
+    /**
+     * @param cmdArgs Process arguments.
+     */
+    public static void main(String[] cmdArgs) {
+        try {
+            Args args = arguments(cmdArgs);
+
+            new HadoopExternalProcessStarter(args).run();
+        }
+        catch (Exception e) {
+            System.err.println("Failed");
+
+            System.err.println(e.getMessage());
+
+            e.printStackTrace(System.err);
+        }
+    }
+
+    /**
+     *
+     * @throws Exception
+     */
+    public void run() throws Exception {
+        U.setWorkDirectory(args.workDir, U.getIgniteHome());
+
+        File outputDir = outputDirectory();
+
+        initializeStreams(outputDir);
+
+        ExecutorService msgExecSvc = Executors.newFixedThreadPool(
+            Integer.getInteger("MSG_THREAD_POOL_SIZE", Runtime.getRuntime().availableProcessors() * 2));
+
+        IgniteLogger log = logger(outputDir);
+
+        HadoopExternalCommunication comm = new HadoopExternalCommunication(
+            args.nodeId,
+            args.childProcId,
+            new OptimizedMarshaller(),
+            log,
+            msgExecSvc,
+            "external"
+        );
+
+        comm.start();
+
+        HadoopProcessDescriptor nodeDesc = new HadoopProcessDescriptor(args.nodeId, args.parentProcId);
+        nodeDesc.address(args.addr);
+        nodeDesc.tcpPort(args.tcpPort);
+        nodeDesc.sharedMemoryPort(args.shmemPort);
+
+        HadoopChildProcessRunner runner = new HadoopChildProcessRunner();
+
+        runner.start(comm, nodeDesc, msgExecSvc, log);
+
+        System.err.println("Started");
+        System.err.flush();
+
+        System.setOut(new PrintStream(out));
+        System.setErr(new PrintStream(err));
+    }
+
+    /**
+     * @param outputDir Directory for process output.
+     * @throws Exception
+     */
+    private void initializeStreams(File outputDir) throws Exception {
+        out = new FileOutputStream(new File(outputDir, args.childProcId + ".out"));
+        err = new FileOutputStream(new File(outputDir, args.childProcId + ".err"));
+    }
+
+    /**
+     * @return Path to output directory.
+     * @throws IOException If failed.
+     */
+    private File outputDirectory() throws IOException {
+        File f = new File(args.out);
+
+        if (!f.exists()) {
+            if (!f.mkdirs())
+                throw new IOException("Failed to create output directory: " + args.out);
+        }
+        else {
+            if (f.isFile())
+                throw new IOException("Output directory is a file: " + args.out);
+        }
+
+        return f;
+    }
+
+    /**
+     * @param outputDir Directory for process output.
+     * @return Logger.
+     */
+    private IgniteLogger logger(final File outputDir) {
+        final URL url = U.resolveIgniteUrl(DFLT_LOG4J_CONFIG);
+
+        Log4JLogger logger;
+
+        try {
+            logger = url != null ? new Log4JLogger(url) : new Log4JLogger(true);
+        }
+        catch (IgniteCheckedException e) {
+            System.err.println("Failed to create URL-based logger. Will use default one.");
+
+            e.printStackTrace();
+
+            logger = new Log4JLogger(true);
+        }
+
+        logger.updateFilePath(new IgniteClosure<String, String>() {
+            @Override public String apply(String s) {
+                return new File(outputDir, args.childProcId + ".log").getAbsolutePath();
+            }
+        });
+
+        return logger;
+    }
+
+    /**
+     * @param processArgs Process arguments.
+     * @return Child process instance.
+     */
+    private static Args arguments(String[] processArgs) throws Exception {
+        Args args = new Args();
+
+        for (int i = 0; i < processArgs.length; i++) {
+            String arg = processArgs[i];
+
+            switch (arg) {
+                case "-cpid": {
+                    if (i == processArgs.length - 1)
+                        throw new Exception("Missing process ID for '-cpid' parameter");
+
+                    String procIdStr = processArgs[++i];
+
+                    args.childProcId = UUID.fromString(procIdStr);
+
+                    break;
+                }
+
+                case "-ppid": {
+                    if (i == processArgs.length - 1)
+                        throw new Exception("Missing process ID for '-ppid' parameter");
+
+                    String procIdStr = processArgs[++i];
+
+                    args.parentProcId = UUID.fromString(procIdStr);
+
+                    break;
+                }
+
+                case "-nid": {
+                    if (i == processArgs.length - 1)
+                        throw new Exception("Missing node ID for '-nid' parameter");
+
+                    String nodeIdStr = processArgs[++i];
+
+                    args.nodeId = UUID.fromString(nodeIdStr);
+
+                    break;
+                }
+
+                case "-addr": {
+                    if (i == processArgs.length - 1)
+                        throw new Exception("Missing node address for '-addr' parameter");
+
+                    args.addr = processArgs[++i];
+
+                    break;
+                }
+
+                case "-tport": {
+                    if (i == processArgs.length - 1)
+                        throw new Exception("Missing tcp port for '-tport' parameter");
+
+                    args.tcpPort = Integer.parseInt(processArgs[++i]);
+
+                    break;
+                }
+
+                case "-sport": {
+                    if (i == processArgs.length - 1)
+                        throw new Exception("Missing shared memory port for '-sport' parameter");
+
+                    args.shmemPort = Integer.parseInt(processArgs[++i]);
+
+                    break;
+                }
+
+                case "-out": {
+                    if (i == processArgs.length - 1)
+                        throw new Exception("Missing output folder name for '-out' parameter");
+
+                    args.out = processArgs[++i];
+
+                    break;
+                }
+
+                case "-wd": {
+                    if (i == processArgs.length - 1)
+                        throw new Exception("Missing work folder name for '-wd' parameter");
+
+                    args.workDir = processArgs[++i];
+
+                    break;
+                }
+            }
+        }
+
+        return args;
+    }
+
+    /**
+     * Execution arguments.
+     */
+    private static class Args {
+        /** Process ID. */
+        private UUID childProcId;
+
+        /** Process ID. */
+        private UUID parentProcId;
+
+        /** Process ID. */
+        private UUID nodeId;
+
+        /** Node address. */
+        private String addr;
+
+        /** TCP port */
+        private int tcpPort;
+
+        /** Shmem port. */
+        private int shmemPort = -1;
+
+        /** Output folder. */
+        private String out;
+
+        /** Work directory. */
+        private String workDir;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopAbstractCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopAbstractCommunicationClient.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopAbstractCommunicationClient.java
deleted file mode 100644
index 5dee79b..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopAbstractCommunicationClient.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication;
-
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.util.concurrent.atomic.*;
-
-/**
- * Implements basic lifecycle for communication clients.
- */
-public abstract class GridHadoopAbstractCommunicationClient implements GridHadoopCommunicationClient {
-    /** Time when this client was last used. */
-    private volatile long lastUsed = U.currentTimeMillis();
-
-    /** Reservations. */
-    private final AtomicInteger reserves = new AtomicInteger();
-
-    /** {@inheritDoc} */
-    @Override public boolean close() {
-        return reserves.compareAndSet(0, -1);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void forceClose() {
-        reserves.set(-1);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean closed() {
-        return reserves.get() == -1;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean reserve() {
-        while (true) {
-            int r = reserves.get();
-
-            if (r == -1)
-                return false;
-
-            if (reserves.compareAndSet(r, r + 1))
-                return true;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void release() {
-        while (true) {
-            int r = reserves.get();
-
-            if (r == -1)
-                return;
-
-            if (reserves.compareAndSet(r, r - 1))
-                return;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean reserved() {
-        return reserves.get() > 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getIdleTime() {
-        return U.currentTimeMillis() - lastUsed;
-    }
-
-    /**
-     * Updates used time.
-     */
-    protected void markUsed() {
-        lastUsed = U.currentTimeMillis();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridHadoopAbstractCommunicationClient.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopCommunicationClient.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopCommunicationClient.java
deleted file mode 100644
index b375b55..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopCommunicationClient.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.message.*;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*;
-
-/**
- *
- */
-public interface GridHadoopCommunicationClient {
-    /**
-     * @return {@code True} if client has been closed by this call,
-     *      {@code false} if failed to close client (due to concurrent reservation or concurrent close).
-     */
-    public boolean close();
-
-    /**
-     * Forces client close.
-     */
-    public void forceClose();
-
-    /**
-     * @return {@code True} if client is closed;
-     */
-    public boolean closed();
-
-    /**
-     * @return {@code True} if client was reserved, {@code false} otherwise.
-     */
-    public boolean reserve();
-
-    /**
-     * Releases this client by decreasing reservations.
-     */
-    public void release();
-
-    /**
-     * @return {@code True} if client was reserved.
-     */
-    public boolean reserved();
-
-    /**
-     * Gets idle time of this client.
-     *
-     * @return Idle time of this client.
-     */
-    public long getIdleTime();
-
-    /**
-     * @param desc Process descriptor.
-     * @param msg Message to send.
-     * @throws IgniteCheckedException If failed.
-     */
-    public void sendMessage(GridHadoopProcessDescriptor desc, GridHadoopMessage msg) throws IgniteCheckedException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunication.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunication.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunication.java
deleted file mode 100644
index e3457a9..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunication.java
+++ /dev/null
@@ -1,1431 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.message.*;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.ipc.*;
-import org.apache.ignite.internal.util.ipc.shmem.*;
-import org.apache.ignite.internal.util.nio.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.util.worker.*;
-import org.apache.ignite.marshaller.*;
-import org.apache.ignite.thread.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.net.*;
-import java.nio.*;
-import java.nio.channels.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- * Hadoop external communication class.
- */
-public class GridHadoopExternalCommunication {
-    /** IPC error message. */
-    public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate shared memory segment " +
-        "(switching to TCP, may be slower).";
-
-    /** Default port which node sets listener to (value is <tt>47100</tt>). */
-    public static final int DFLT_PORT = 27100;
-
-    /** Default connection timeout (value is <tt>1000</tt>ms). */
-    public static final long DFLT_CONN_TIMEOUT = 1000;
-
-    /** Default Maximum connection timeout (value is <tt>600,000</tt>ms). */
-    public static final long DFLT_MAX_CONN_TIMEOUT = 10 * 60 * 1000;
-
-    /** Default reconnect attempts count (value is <tt>10</tt>). */
-    public static final int DFLT_RECONNECT_CNT = 10;
-
-    /** Default message queue limit per connection (for incoming and outgoing . */
-    public static final int DFLT_MSG_QUEUE_LIMIT = GridNioServer.DFLT_SEND_QUEUE_LIMIT;
-
-    /**
-     * Default count of selectors for TCP server equals to
-     * {@code "Math.min(4, Runtime.getRuntime().availableProcessors())"}.
-     */
-    public static final int DFLT_SELECTORS_CNT = 1;
-
-    /** Node ID meta for session. */
-    private static final int PROCESS_META = GridNioSessionMetaKey.nextUniqueKey();
-
-    /** Handshake timeout meta for session. */
-    private static final int HANDSHAKE_FINISH_META = GridNioSessionMetaKey.nextUniqueKey();
-
-    /** Message tracker meta for session. */
-    private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey();
-
-    /**
-     * Default local port range (value is <tt>100</tt>).
-     * See {@link #setLocalPortRange(int)} for details.
-     */
-    public static final int DFLT_PORT_RANGE = 100;
-
-    /** Default value for {@code TCP_NODELAY} socket option (value is <tt>true</tt>). */
-    public static final boolean DFLT_TCP_NODELAY = true;
-
-    /** Server listener. */
-    private final GridNioServerListener<GridHadoopMessage> srvLsnr =
-        new GridNioServerListenerAdapter<GridHadoopMessage>() {
-            @Override public void onConnected(GridNioSession ses) {
-                GridHadoopProcessDescriptor desc = ses.meta(PROCESS_META);
-
-                assert desc != null : "Received connected notification without finished handshake: " + ses;
-            }
-
-            /** {@inheritDoc} */
-            @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
-                if (log.isDebugEnabled())
-                    log.debug("Closed connection for session: " + ses);
-
-                if (e != null)
-                    U.error(log, "Session disconnected due to exception: " + ses, e);
-
-                GridHadoopProcessDescriptor desc = ses.meta(PROCESS_META);
-
-                if (desc != null) {
-                    GridHadoopCommunicationClient rmv = clients.remove(desc.processId());
-
-                    if (rmv != null)
-                        rmv.forceClose();
-                }
-
-                GridHadoopMessageListener lsnr0 = lsnr;
-
-                if (lsnr0 != null)
-                    // Notify listener about connection close.
-                    lsnr0.onConnectionLost(desc);
-            }
-
-            /** {@inheritDoc} */
-            @Override public void onMessage(GridNioSession ses, GridHadoopMessage msg) {
-                notifyListener(ses.<GridHadoopProcessDescriptor>meta(PROCESS_META), msg);
-
-                if (msgQueueLimit > 0) {
-                    GridNioMessageTracker tracker = ses.meta(TRACKER_META);
-
-                    assert tracker != null : "Missing tracker for limited message queue: " + ses;
-
-                    tracker.run();
-                }
-            }
-        };
-
-    /** Logger. */
-    private IgniteLogger log;
-
-    /** Local process descriptor. */
-    private GridHadoopProcessDescriptor locProcDesc;
-
-    /** Marshaller. */
-    private Marshaller marsh;
-
-    /** Message notification executor service. */
-    private ExecutorService execSvc;
-
-    /** Grid name. */
-    private String gridName;
-
-    /** Complex variable that represents this node IP address. */
-    private volatile InetAddress locHost;
-
-    /** Local port which node uses. */
-    private int locPort = DFLT_PORT;
-
-    /** Local port range. */
-    private int locPortRange = DFLT_PORT_RANGE;
-
-    /** Local port which node uses to accept shared memory connections. */
-    private int shmemPort = -1;
-
-    /** Allocate direct buffer or heap buffer. */
-    private boolean directBuf = true;
-
-    /** Connect timeout. */
-    private long connTimeout = DFLT_CONN_TIMEOUT;
-
-    /** Maximum connect timeout. */
-    private long maxConnTimeout = DFLT_MAX_CONN_TIMEOUT;
-
-    /** Reconnect attempts count. */
-    @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
-    private int reconCnt = DFLT_RECONNECT_CNT;
-
-    /** Socket send buffer. */
-    private int sockSndBuf;
-
-    /** Socket receive buffer. */
-    private int sockRcvBuf;
-
-    /** Message queue limit. */
-    private int msgQueueLimit = DFLT_MSG_QUEUE_LIMIT;
-
-    /** NIO server. */
-    private GridNioServer<GridHadoopMessage> nioSrvr;
-
-    /** Shared memory server. */
-    private IpcSharedMemoryServerEndpoint shmemSrv;
-
-    /** {@code TCP_NODELAY} option value for created sockets. */
-    private boolean tcpNoDelay = DFLT_TCP_NODELAY;
-
-    /** Shared memory accept worker. */
-    private ShmemAcceptWorker shmemAcceptWorker;
-
-    /** Shared memory workers. */
-    private final Collection<ShmemWorker> shmemWorkers = new ConcurrentLinkedDeque8<>();
-
-    /** Clients. */
-    private final ConcurrentMap<UUID, GridHadoopCommunicationClient> clients = GridConcurrentFactory.newMap();
-
-    /** Message listener. */
-    private volatile GridHadoopMessageListener lsnr;
-
-    /** Bound port. */
-    private int boundTcpPort = -1;
-
-    /** Bound port for shared memory server. */
-    private int boundTcpShmemPort = -1;
-
-    /** Count of selectors to use in TCP server. */
-    private int selectorsCnt = DFLT_SELECTORS_CNT;
-
-    /** Local node ID message. */
-    private ProcessHandshakeMessage locIdMsg;
-
-    /** Locks. */
-    private final GridKeyLock locks = new GridKeyLock();
-
-    /**
-     * @param parentNodeId Parent node ID.
-     * @param procId Process ID.
-     * @param marsh Marshaller to use.
-     * @param log Logger.
-     * @param execSvc Executor service for message notification.
-     * @param gridName Grid name.
-     */
-    public GridHadoopExternalCommunication(
-        UUID parentNodeId,
-        UUID procId,
-        Marshaller marsh,
-        IgniteLogger log,
-        ExecutorService execSvc,
-        String gridName
-    ) {
-        locProcDesc = new GridHadoopProcessDescriptor(parentNodeId, procId);
-
-        this.marsh = marsh;
-        this.log = log.getLogger(GridHadoopExternalCommunication.class);
-        this.execSvc = execSvc;
-        this.gridName = gridName;
-    }
-
-    /**
-     * Sets local port for socket binding.
-     * <p>
-     * If not provided, default value is {@link #DFLT_PORT}.
-     *
-     * @param locPort Port number.
-     */
-    public void setLocalPort(int locPort) {
-        this.locPort = locPort;
-    }
-
-    /**
-     * Gets local port for socket binding.
-     *
-     * @return Local port.
-     */
-    public int getLocalPort() {
-        return locPort;
-    }
-
-    /**
-     * Sets local port range for local host ports (value must greater than or equal to <tt>0</tt>).
-     * If provided local port (see {@link #setLocalPort(int)}} is occupied,
-     * implementation will try to increment the port number for as long as it is less than
-     * initial value plus this range.
-     * <p>
-     * If port range value is <tt>0</tt>, then implementation will try bind only to the port provided by
-     * {@link #setLocalPort(int)} method and fail if binding to this port did not succeed.
-     * <p>
-     * Local port range is very useful during development when more than one grid nodes need to run
-     * on the same physical machine.
-     * <p>
-     * If not provided, default value is {@link #DFLT_PORT_RANGE}.
-     *
-     * @param locPortRange New local port range.
-     */
-    public void setLocalPortRange(int locPortRange) {
-        this.locPortRange = locPortRange;
-    }
-
-    /**
-     * @return Local port range.
-     */
-    public int getLocalPortRange() {
-        return locPortRange;
-    }
-
-    /**
-     * Sets local port to accept shared memory connections.
-     * <p>
-     * If set to {@code -1} shared memory communication will be disabled.
-     * <p>
-     * If not provided, shared memory is disabled.
-     *
-     * @param shmemPort Port number.
-     */
-    public void setSharedMemoryPort(int shmemPort) {
-        this.shmemPort = shmemPort;
-    }
-
-    /**
-     * Gets shared memory port to accept incoming connections.
-     *
-     * @return Shared memory port.
-     */
-    public int getSharedMemoryPort() {
-        return shmemPort;
-    }
-
-    /**
-     * Sets connect timeout used when establishing connection
-     * with remote nodes.
-     * <p>
-     * {@code 0} is interpreted as infinite timeout.
-     * <p>
-     * If not provided, default value is {@link #DFLT_CONN_TIMEOUT}.
-     *
-     * @param connTimeout Connect timeout.
-     */
-    public void setConnectTimeout(long connTimeout) {
-        this.connTimeout = connTimeout;
-    }
-
-    /**
-     * @return Connection timeout.
-     */
-    public long getConnectTimeout() {
-        return connTimeout;
-    }
-
-    /**
-     * Sets maximum connect timeout. If handshake is not established within connect timeout,
-     * then SPI tries to repeat handshake procedure with increased connect timeout.
-     * Connect timeout can grow till maximum timeout value,
-     * if maximum timeout value is reached then the handshake is considered as failed.
-     * <p>
-     * {@code 0} is interpreted as infinite timeout.
-     * <p>
-     * If not provided, default value is {@link #DFLT_MAX_CONN_TIMEOUT}.
-     *
-     * @param maxConnTimeout Maximum connect timeout.
-     */
-    public void setMaxConnectTimeout(long maxConnTimeout) {
-        this.maxConnTimeout = maxConnTimeout;
-    }
-
-    /**
-     * Gets maximum connection timeout.
-     *
-     * @return Maximum connection timeout.
-     */
-    public long getMaxConnectTimeout() {
-        return maxConnTimeout;
-    }
-
-    /**
-     * Sets maximum number of reconnect attempts used when establishing connection
-     * with remote nodes.
-     * <p>
-     * If not provided, default value is {@link #DFLT_RECONNECT_CNT}.
-     *
-     * @param reconCnt Maximum number of reconnection attempts.
-     */
-    public void setReconnectCount(int reconCnt) {
-        this.reconCnt = reconCnt;
-    }
-
-    /**
-     * @return Reconnect count.
-     */
-    public int getReconnectCount() {
-        return reconCnt;
-    }
-
-    /**
-     * Sets flag to allocate direct or heap buffer in SPI.
-     * If value is {@code true}, then SPI will use {@link ByteBuffer#allocateDirect(int)} call.
-     * Otherwise, SPI will use {@link ByteBuffer#allocate(int)} call.
-     * <p>
-     * If not provided, default value is {@code true}.
-     *
-     * @param directBuf Flag indicates to allocate direct or heap buffer in SPI.
-     */
-    public void setDirectBuffer(boolean directBuf) {
-        this.directBuf = directBuf;
-    }
-
-    /**
-     * @return Direct buffer flag.
-     */
-    public boolean isDirectBuffer() {
-        return directBuf;
-    }
-
-    /**
-     * Sets the count of selectors te be used in TCP server.
-     * <p/>
-     * If not provided, default value is {@link #DFLT_SELECTORS_CNT}.
-     *
-     * @param selectorsCnt Selectors count.
-     */
-    public void setSelectorsCount(int selectorsCnt) {
-        this.selectorsCnt = selectorsCnt;
-    }
-
-    /**
-     * @return Number of selectors to use.
-     */
-    public int getSelectorsCount() {
-        return selectorsCnt;
-    }
-
-    /**
-     * Sets value for {@code TCP_NODELAY} socket option. Each
-     * socket will be opened using provided value.
-     * <p>
-     * Setting this option to {@code true} disables Nagle's algorithm
-     * for socket decreasing latency and delivery time for small messages.
-     * <p>
-     * For systems that work under heavy network load it is advisable to
-     * set this value to {@code false}.
-     * <p>
-     * If not provided, default value is {@link #DFLT_TCP_NODELAY}.
-     *
-     * @param tcpNoDelay {@code True} to disable TCP delay.
-     */
-    public void setTcpNoDelay(boolean tcpNoDelay) {
-        this.tcpNoDelay = tcpNoDelay;
-    }
-
-    /**
-     * @return {@code TCP_NO_DELAY} flag.
-     */
-    public boolean isTcpNoDelay() {
-        return tcpNoDelay;
-    }
-
-    /**
-     * Sets receive buffer size for sockets created or accepted by this SPI.
-     * <p>
-     * If not provided, default is {@code 0} which leaves buffer unchanged after
-     * socket creation (OS defaults).
-     *
-     * @param sockRcvBuf Socket receive buffer size.
-     */
-    public void setSocketReceiveBuffer(int sockRcvBuf) {
-        this.sockRcvBuf = sockRcvBuf;
-    }
-
-    /**
-     * @return Socket receive buffer size.
-     */
-    public int getSocketReceiveBuffer() {
-        return sockRcvBuf;
-    }
-
-    /**
-     * Sets send buffer size for sockets created or accepted by this SPI.
-     * <p>
-     * If not provided, default is {@code 0} which leaves the buffer unchanged
-     * after socket creation (OS defaults).
-     *
-     * @param sockSndBuf Socket send buffer size.
-     */
-    public void setSocketSendBuffer(int sockSndBuf) {
-        this.sockSndBuf = sockSndBuf;
-    }
-
-    /**
-     * @return Socket send buffer size.
-     */
-    public int getSocketSendBuffer() {
-        return sockSndBuf;
-    }
-
-    /**
-     * Sets message queue limit for incoming and outgoing messages.
-     * <p>
-     * When set to positive number send queue is limited to the configured value.
-     * {@code 0} disables the size limitations.
-     * <p>
-     * If not provided, default is {@link #DFLT_MSG_QUEUE_LIMIT}.
-     *
-     * @param msgQueueLimit Send queue size limit.
-     */
-    public void setMessageQueueLimit(int msgQueueLimit) {
-        this.msgQueueLimit = msgQueueLimit;
-    }
-
-    /**
-     * @return Message queue size limit.
-     */
-    public int getMessageQueueLimit() {
-        return msgQueueLimit;
-    }
-
-    /**
-     * Sets Hadoop communication message listener.
-     *
-     * @param lsnr Message listener.
-     */
-    public void setListener(GridHadoopMessageListener lsnr) {
-        this.lsnr = lsnr;
-    }
-
-    /**
-     * @return Outbound message queue size.
-     */
-    public int getOutboundMessagesQueueSize() {
-        return nioSrvr.outboundMessagesQueueSize();
-    }
-
-    /**
-     * Starts communication.
-     *
-     * @throws IgniteCheckedException If failed.
-     */
-    public void start() throws IgniteCheckedException {
-        try {
-            locHost = U.getLocalHost();
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException("Failed to initialize local address.", e);
-        }
-
-        try {
-            shmemSrv = resetShmemServer();
-        }
-        catch (IgniteCheckedException e) {
-            U.warn(log, "Failed to start shared memory communication server.", e);
-        }
-
-        try {
-            // This method potentially resets local port to the value
-            // local node was bound to.
-            nioSrvr = resetNioServer();
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteCheckedException("Failed to initialize TCP server: " + locHost, e);
-        }
-
-        locProcDesc.address(locHost.getHostAddress());
-        locProcDesc.sharedMemoryPort(boundTcpShmemPort);
-        locProcDesc.tcpPort(boundTcpPort);
-
-        locIdMsg = new ProcessHandshakeMessage(locProcDesc);
-
-        if (shmemSrv != null) {
-            shmemAcceptWorker = new ShmemAcceptWorker(shmemSrv);
-
-            new IgniteThread(shmemAcceptWorker).start();
-        }
-
-        nioSrvr.start();
-    }
-
-    /**
-     * Gets local process descriptor.
-     *
-     * @return Local process descriptor.
-     */
-    public GridHadoopProcessDescriptor localProcessDescriptor() {
-        return locProcDesc;
-    }
-
-    /**
-     * Gets filters used by communication.
-     *
-     * @return Filters array.
-     */
-    private GridNioFilter[] filters() {
-        return new GridNioFilter[] {
-            new GridNioAsyncNotifyFilter(gridName, execSvc, log),
-            new HandshakeAndBackpressureFilter(),
-            new GridHadoopMarshallerFilter(marsh),
-            new GridNioCodecFilter(new GridBufferedParser(directBuf, ByteOrder.nativeOrder()), log, false)
-        };
-    }
-
-    /**
-     * Recreates tpcSrvr socket instance.
-     *
-     * @return Server instance.
-     * @throws IgniteCheckedException Thrown if it's not possible to create server.
-     */
-    private GridNioServer<GridHadoopMessage> resetNioServer() throws IgniteCheckedException {
-        if (boundTcpPort >= 0)
-            throw new IgniteCheckedException("Tcp NIO server was already created on port " + boundTcpPort);
-
-        IgniteCheckedException lastEx = null;
-
-        // If configured TCP port is busy, find first available in range.
-        for (int port = locPort; port < locPort + locPortRange; port++) {
-            try {
-                GridNioServer<GridHadoopMessage> srvr =
-                    GridNioServer.<GridHadoopMessage>builder()
-                        .address(locHost)
-                        .port(port)
-                        .listener(srvLsnr)
-                        .logger(log.getLogger(GridNioServer.class))
-                        .selectorCount(selectorsCnt)
-                        .gridName(gridName)
-                        .tcpNoDelay(tcpNoDelay)
-                        .directBuffer(directBuf)
-                        .byteOrder(ByteOrder.nativeOrder())
-                        .socketSendBufferSize(sockSndBuf)
-                        .socketReceiveBufferSize(sockRcvBuf)
-                        .sendQueueLimit(msgQueueLimit)
-                        .directMode(false)
-                        .filters(filters())
-                        .build();
-
-                boundTcpPort = port;
-
-                // Ack Port the TCP server was bound to.
-                if (log.isInfoEnabled())
-                    log.info("Successfully bound to TCP port [port=" + boundTcpPort +
-                        ", locHost=" + locHost + ']');
-
-                return srvr;
-            }
-            catch (IgniteCheckedException e) {
-                lastEx = e;
-
-                if (log.isDebugEnabled())
-                    log.debug("Failed to bind to local port (will try next port within range) [port=" + port +
-                        ", locHost=" + locHost + ']');
-            }
-        }
-
-        // If free port wasn't found.
-        throw new IgniteCheckedException("Failed to bind to any port within range [startPort=" + locPort +
-            ", portRange=" + locPortRange + ", locHost=" + locHost + ']', lastEx);
-    }
-
-    /**
-     * Creates new shared memory communication server.
-     * @return Server.
-     * @throws IgniteCheckedException If failed.
-     */
-    @Nullable private IpcSharedMemoryServerEndpoint resetShmemServer() throws IgniteCheckedException {
-        if (boundTcpShmemPort >= 0)
-            throw new IgniteCheckedException("Shared memory server was already created on port " + boundTcpShmemPort);
-
-        if (shmemPort == -1 || U.isWindows())
-            return null;
-
-        IgniteCheckedException lastEx = null;
-
-        // If configured TCP port is busy, find first available in range.
-        for (int port = shmemPort; port < shmemPort + locPortRange; port++) {
-            try {
-                IpcSharedMemoryServerEndpoint srv = new IpcSharedMemoryServerEndpoint(
-                    log.getLogger(IpcSharedMemoryServerEndpoint.class),
-                    locProcDesc.processId(), gridName);
-
-                srv.setPort(port);
-
-                srv.omitOutOfResourcesWarning(true);
-
-                srv.start();
-
-                boundTcpShmemPort = port;
-
-                // Ack Port the TCP server was bound to.
-                if (log.isInfoEnabled())
-                    log.info("Successfully bound shared memory communication to TCP port [port=" + boundTcpShmemPort +
-                        ", locHost=" + locHost + ']');
-
-                return srv;
-            }
-            catch (IgniteCheckedException e) {
-                lastEx = e;
-
-                if (log.isDebugEnabled())
-                    log.debug("Failed to bind to local port (will try next port within range) [port=" + port +
-                        ", locHost=" + locHost + ']');
-            }
-        }
-
-        // If free port wasn't found.
-        throw new IgniteCheckedException("Failed to bind shared memory communication to any port within range [startPort=" +
-            locPort + ", portRange=" + locPortRange + ", locHost=" + locHost + ']', lastEx);
-    }
-
-    /**
-     * Stops the server.
-     *
-     * @throws IgniteCheckedException
-     */
-    public void stop() throws IgniteCheckedException {
-        // Stop TCP server.
-        if (nioSrvr != null)
-            nioSrvr.stop();
-
-        U.cancel(shmemAcceptWorker);
-        U.join(shmemAcceptWorker, log);
-
-        U.cancel(shmemWorkers);
-        U.join(shmemWorkers, log);
-
-        shmemWorkers.clear();
-
-        // Force closing on stop (safety).
-        for (GridHadoopCommunicationClient client : clients.values())
-            client.forceClose();
-
-        // Clear resources.
-        nioSrvr = null;
-
-        boundTcpPort = -1;
-    }
-
-    /**
-     * Sends message to Hadoop process.
-     *
-     * @param desc
-     * @param msg
-     * @throws IgniteCheckedException
-     */
-    public void sendMessage(GridHadoopProcessDescriptor desc, GridHadoopMessage msg) throws
-        IgniteCheckedException {
-        assert desc != null;
-        assert msg != null;
-
-        if (log.isTraceEnabled())
-            log.trace("Sending message to Hadoop process [desc=" + desc + ", msg=" + msg + ']');
-
-        GridHadoopCommunicationClient client = null;
-
-        boolean closeOnRelease = true;
-
-        try {
-            client = reserveClient(desc);
-
-            client.sendMessage(desc, msg);
-
-            closeOnRelease = false;
-        }
-        finally {
-            if (client != null) {
-                if (closeOnRelease) {
-                    client.forceClose();
-
-                    clients.remove(desc.processId(), client);
-                }
-                else
-                    client.release();
-            }
-        }
-    }
-
-    /**
-     * Returns existing or just created client to node.
-     *
-     * @param desc Node to which client should be open.
-     * @return The existing or just created client.
-     * @throws IgniteCheckedException Thrown if any exception occurs.
-     */
-    private GridHadoopCommunicationClient reserveClient(GridHadoopProcessDescriptor desc) throws IgniteCheckedException {
-        assert desc != null;
-
-        UUID procId = desc.processId();
-
-        while (true) {
-            GridHadoopCommunicationClient client = clients.get(procId);
-
-            if (client == null) {
-                if (log.isDebugEnabled())
-                    log.debug("Did not find client for remote process [locProcDesc=" + locProcDesc + ", desc=" +
-                        desc + ']');
-
-                // Do not allow concurrent connects.
-                Object sync = locks.lock(procId);
-
-                try {
-                    client = clients.get(procId);
-
-                    if (client == null) {
-                        GridHadoopCommunicationClient old = clients.put(procId, client = createNioClient(desc));
-
-                        assert old == null;
-                    }
-                }
-                finally {
-                    locks.unlock(procId, sync);
-                }
-
-                assert client != null;
-            }
-
-            if (client.reserve())
-                return client;
-            else
-                // Client has just been closed by idle worker. Help it and try again.
-                clients.remove(procId, client);
-        }
-    }
-
-    /**
-     * @param desc Process descriptor.
-     * @return Client.
-     * @throws IgniteCheckedException If failed.
-     */
-    @Nullable protected GridHadoopCommunicationClient createNioClient(GridHadoopProcessDescriptor desc)
-        throws  IgniteCheckedException {
-        assert desc != null;
-
-        int shmemPort = desc.sharedMemoryPort();
-
-        // If remote node has shared memory server enabled and has the same set of MACs
-        // then we are likely to run on the same host and shared memory communication could be tried.
-        if (shmemPort != -1 && locProcDesc.parentNodeId().equals(desc.parentNodeId())) {
-            try {
-                return createShmemClient(desc, shmemPort);
-            }
-            catch (IgniteCheckedException e) {
-                if (e.hasCause(IpcOutOfSystemResourcesException.class))
-                    // Has cause or is itself the IpcOutOfSystemResourcesException.
-                    LT.warn(log, null, OUT_OF_RESOURCES_TCP_MSG);
-                else if (log.isDebugEnabled())
-                    log.debug("Failed to establish shared memory connection with local hadoop process: " +
-                        desc);
-            }
-        }
-
-        return createTcpClient(desc);
-    }
-
-    /**
-     * @param desc Process descriptor.
-     * @param port Port.
-     * @return Client.
-     * @throws IgniteCheckedException If failed.
-     */
-    @Nullable protected GridHadoopCommunicationClient createShmemClient(GridHadoopProcessDescriptor desc, int port)
-        throws IgniteCheckedException {
-        int attempt = 1;
-
-        int connectAttempts = 1;
-
-        long connTimeout0 = connTimeout;
-
-        while (true) {
-            IpcEndpoint clientEndpoint;
-
-            try {
-                clientEndpoint = new IpcSharedMemoryClientEndpoint(port, (int)connTimeout, log);
-            }
-            catch (IgniteCheckedException e) {
-                // Reconnect for the second time, if connection is not established.
-                if (connectAttempts < 2 && X.hasCause(e, ConnectException.class)) {
-                    connectAttempts++;
-
-                    continue;
-                }
-
-                throw e;
-            }
-
-            GridHadoopCommunicationClient client = null;
-
-            try {
-                ShmemWorker worker = new ShmemWorker(clientEndpoint, false);
-
-                shmemWorkers.add(worker);
-
-                GridNioSession ses = worker.session();
-
-                HandshakeFinish fin = new HandshakeFinish();
-
-                // We are in lock, it is safe to get session and attach
-                ses.addMeta(HANDSHAKE_FINISH_META, fin);
-
-                client = new GridHadoopTcpNioCommunicationClient(ses);
-
-                new IgniteThread(worker).start();
-
-                fin.await(connTimeout0);
-            }
-            catch (GridHadoopHandshakeTimeoutException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
-                        ", err=" + e.getMessage() + ", client=" + client + ']');
-
-                if (client != null)
-                    client.forceClose();
-
-                if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
-                    if (log.isDebugEnabled())
-                        log.debug("Handshake timedout (will stop attempts to perform the handshake) " +
-                            "[timeout=" + connTimeout0 + ", maxConnTimeout=" + maxConnTimeout +
-                            ", attempt=" + attempt + ", reconCnt=" + reconCnt +
-                            ", err=" + e.getMessage() + ", client=" + client + ']');
-
-                    throw e;
-                }
-                else {
-                    attempt++;
-
-                    connTimeout0 *= 2;
-
-                    continue;
-                }
-            }
-            catch (RuntimeException | Error e) {
-                if (log.isDebugEnabled())
-                    log.debug(
-                        "Caught exception (will close client) [err=" + e.getMessage() + ", client=" + client + ']');
-
-                if (client != null)
-                    client.forceClose();
-
-                throw e;
-            }
-
-            return client;
-        }
-    }
-
-    /**
-     * Establish TCP connection to remote hadoop process and returns client.
-     *
-     * @param desc Process descriptor.
-     * @return Client.
-     * @throws IgniteCheckedException If failed.
-     */
-    protected GridHadoopCommunicationClient createTcpClient(GridHadoopProcessDescriptor desc) throws IgniteCheckedException {
-        String addr = desc.address();
-
-        int port = desc.tcpPort();
-
-        if (log.isDebugEnabled())
-            log.debug("Trying to connect to remote process [locProcDesc=" + locProcDesc + ", desc=" + desc + ']');
-
-        boolean conn = false;
-        GridHadoopTcpNioCommunicationClient client = null;
-        IgniteCheckedException errs = null;
-
-        int connectAttempts = 1;
-
-        long connTimeout0 = connTimeout;
-
-        int attempt = 1;
-
-        while (!conn) { // Reconnection on handshake timeout.
-            try {
-                SocketChannel ch = SocketChannel.open();
-
-                ch.configureBlocking(true);
-
-                ch.socket().setTcpNoDelay(tcpNoDelay);
-                ch.socket().setKeepAlive(true);
-
-                if (sockRcvBuf > 0)
-                    ch.socket().setReceiveBufferSize(sockRcvBuf);
-
-                if (sockSndBuf > 0)
-                    ch.socket().setSendBufferSize(sockSndBuf);
-
-                ch.socket().connect(new InetSocketAddress(addr, port), (int)connTimeout);
-
-                HandshakeFinish fin = new HandshakeFinish();
-
-                GridNioSession ses = nioSrvr.createSession(ch, F.asMap(HANDSHAKE_FINISH_META, fin)).get();
-
-                client = new GridHadoopTcpNioCommunicationClient(ses);
-
-                if (log.isDebugEnabled())
-                    log.debug("Waiting for handshake finish for client: " + client);
-
-                fin.await(connTimeout0);
-
-                conn = true;
-            }
-            catch (GridHadoopHandshakeTimeoutException e) {
-                if (client != null) {
-                    client.forceClose();
-
-                    client = null;
-                }
-
-                if (log.isDebugEnabled())
-                    log.debug(
-                        "Handshake timedout (will retry with increased timeout) [timeout=" + connTimeout0 +
-                            ", desc=" + desc + ", port=" + port + ", err=" + e + ']');
-
-                if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
-                    if (log.isDebugEnabled())
-                        log.debug("Handshake timed out (will stop attempts to perform the handshake) " +
-                            "[timeout=" + connTimeout0 + ", maxConnTimeout=" + maxConnTimeout +
-                            ", attempt=" + attempt + ", reconCnt=" + reconCnt +
-                            ", err=" + e.getMessage() + ", addr=" + addr + ']');
-
-                    if (errs == null)
-                        errs = new IgniteCheckedException("Failed to connect to remote Hadoop process " +
-                            "(is process still running?) [desc=" + desc + ", addrs=" + addr + ']');
-
-                    errs.addSuppressed(e);
-
-                    break;
-                }
-                else {
-                    attempt++;
-
-                    connTimeout0 *= 2;
-
-                    // Continue loop.
-                }
-            }
-            catch (Exception e) {
-                if (client != null) {
-                    client.forceClose();
-
-                    client = null;
-                }
-
-                if (log.isDebugEnabled())
-                    log.debug("Client creation failed [addr=" + addr + ", port=" + port +
-                        ", err=" + e + ']');
-
-                if (X.hasCause(e, SocketTimeoutException.class))
-                    LT.warn(log, null, "Connect timed out (consider increasing 'connTimeout' " +
-                        "configuration property) [addr=" + addr + ", port=" + port + ']');
-
-                if (errs == null)
-                    errs = new IgniteCheckedException("Failed to connect to remote Hadoop process (is process still running?) " +
-                        "[desc=" + desc + ", addrs=" + addr + ']');
-
-                errs.addSuppressed(e);
-
-                // Reconnect for the second time, if connection is not established.
-                if (connectAttempts < 2 &&
-                    (e instanceof ConnectException || X.hasCause(e, ConnectException.class))) {
-                    connectAttempts++;
-
-                    continue;
-                }
-
-                break;
-            }
-        }
-
-        if (client == null) {
-            assert errs != null;
-
-            if (X.hasCause(errs, ConnectException.class))
-                LT.warn(log, null, "Failed to connect to a remote Hadoop process (is process still running?). " +
-                    "Make sure operating system firewall is disabled on local and remote host) " +
-                    "[addrs=" + addr + ", port=" + port + ']');
-
-            throw errs;
-        }
-
-        if (log.isDebugEnabled())
-            log.debug("Created client: " + client);
-
-        return client;
-    }
-
-    /**
-     * @param desc Sender process descriptor.
-     * @param msg Communication message.
-     */
-    protected void notifyListener(GridHadoopProcessDescriptor desc, GridHadoopMessage msg) {
-        GridHadoopMessageListener lsnr = this.lsnr;
-
-        if (lsnr != null)
-            // Notify listener of a new message.
-            lsnr.onMessageReceived(desc, msg);
-        else if (log.isDebugEnabled())
-            log.debug("Received communication message without any registered listeners (will ignore) " +
-                "[senderProcDesc=" + desc + ", msg=" + msg + ']');
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridHadoopExternalCommunication.class, this);
-    }
-
-    /**
-     * This worker takes responsibility to shut the server down when stopping,
-     * No other thread shall stop passed server.
-     */
-    private class ShmemAcceptWorker extends GridWorker {
-        /** */
-        private final IpcSharedMemoryServerEndpoint srv;
-
-        /**
-         * @param srv Server.
-         */
-        ShmemAcceptWorker(IpcSharedMemoryServerEndpoint srv) {
-            super(gridName, "shmem-communication-acceptor", log);
-
-            this.srv = srv;
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException {
-            try {
-                while (!Thread.interrupted()) {
-                    ShmemWorker e = new ShmemWorker(srv.accept(), true);
-
-                    shmemWorkers.add(e);
-
-                    new IgniteThread(e).start();
-                }
-            }
-            catch (IgniteCheckedException e) {
-                if (!isCancelled())
-                    U.error(log, "Shmem server failed.", e);
-            }
-            finally {
-                srv.close();
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public void cancel() {
-            super.cancel();
-
-            srv.close();
-        }
-    }
-
-    /**
-     *
-     */
-    private class ShmemWorker extends GridWorker {
-        /** */
-        private final IpcEndpoint endpoint;
-
-        /** Adapter. */
-        private GridHadoopIpcToNioAdapter<GridHadoopMessage> adapter;
-
-        /**
-         * @param endpoint Endpoint.
-         */
-        private ShmemWorker(IpcEndpoint endpoint, boolean accepted) {
-            super(gridName, "shmem-worker", log);
-
-            this.endpoint = endpoint;
-
-            adapter = new GridHadoopIpcToNioAdapter<>(
-                GridHadoopExternalCommunication.this.log,
-                endpoint,
-                accepted,
-                srvLsnr,
-                filters());
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException {
-            try {
-                adapter.serve();
-            }
-            finally {
-                shmemWorkers.remove(this);
-
-                endpoint.close();
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public void cancel() {
-            super.cancel();
-
-            endpoint.close();
-        }
-
-        /** @{@inheritDoc} */
-        @Override protected void cleanup() {
-            super.cleanup();
-
-            endpoint.close();
-        }
-
-        /** @{@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(ShmemWorker.class, this);
-        }
-
-        /**
-         * @return NIO session for this worker.
-         */
-        public GridNioSession session() {
-            return adapter.session();
-        }
-    }
-
-    /**
-     *
-     */
-    private static class HandshakeFinish {
-        /** Await latch. */
-        private CountDownLatch latch = new CountDownLatch(1);
-
-        /**
-         * Finishes handshake.
-         */
-        public void finish() {
-            latch.countDown();
-        }
-
-        /**
-         * @param time Time to wait.
-         * @throws GridHadoopHandshakeTimeoutException If failed to wait.
-         */
-        public void await(long time) throws GridHadoopHandshakeTimeoutException {
-            try {
-                if (!latch.await(time, TimeUnit.MILLISECONDS))
-                    throw new GridHadoopHandshakeTimeoutException("Failed to wait for handshake to finish [timeout=" +
-                        time + ']');
-            }
-            catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-
-                throw new GridHadoopHandshakeTimeoutException("Failed to wait for handshake to finish (thread was " +
-                    "interrupted) [timeout=" + time + ']', e);
-            }
-        }
-    }
-
-    /**
-     *
-     */
-    private class HandshakeAndBackpressureFilter extends GridNioFilterAdapter {
-        /**
-         * Assigns filter name to a filter.
-         */
-        protected HandshakeAndBackpressureFilter() {
-            super("HadoopHandshakeFilter");
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onSessionOpened(final GridNioSession ses) throws IgniteCheckedException {
-            if (ses.accepted()) {
-                if (log.isDebugEnabled())
-                    log.debug("Accepted connection, initiating handshake: " + ses);
-
-                // Server initiates handshake.
-                ses.send(locIdMsg).listenAsync(new CI1<GridNioFuture<?>>() {
-                    @Override public void apply(GridNioFuture<?> fut) {
-                        try {
-                            // Make sure there were no errors.
-                            fut.get();
-                        }
-                        catch (IgniteCheckedException | IOException e) {
-                            log.warning("Failed to send handshake message, will close session: " + ses, e);
-
-                            ses.close();
-                        }
-                    }
-                });
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onSessionClosed(GridNioSession ses) throws IgniteCheckedException {
-            proceedSessionClosed(ses);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onExceptionCaught(GridNioSession ses, IgniteCheckedException ex) throws IgniteCheckedException {
-            proceedExceptionCaught(ses, ex);
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
-            if (ses.meta(PROCESS_META) == null && !(msg instanceof ProcessHandshakeMessage))
-                log.warning("Writing message before handshake has finished [ses=" + ses + ", msg=" + msg + ']');
-
-            return proceedSessionWrite(ses, msg);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onMessageReceived(GridNioSession ses, Object msg) throws IgniteCheckedException {
-            GridHadoopProcessDescriptor desc = ses.meta(PROCESS_META);
-
-            UUID rmtProcId = desc == null ? null : desc.processId();
-
-            if (rmtProcId == null) {
-                if (!(msg instanceof ProcessHandshakeMessage)) {
-                    log.warning("Invalid handshake message received, will close connection [ses=" + ses +
-                        ", msg=" + msg + ']');
-
-                    ses.close();
-
-                    return;
-                }
-
-                ProcessHandshakeMessage nId = (ProcessHandshakeMessage)msg;
-
-                if (log.isDebugEnabled())
-                    log.debug("Received handshake message [ses=" + ses + ", msg=" + msg + ']');
-
-                ses.addMeta(PROCESS_META, nId.processDescriptor());
-
-                if (!ses.accepted())
-                    // Send handshake reply.
-                    ses.send(locIdMsg);
-                else {
-                    //
-                    rmtProcId = nId.processDescriptor().processId();
-
-                    if (log.isDebugEnabled())
-                        log.debug("Finished handshake with remote client: " + ses);
-
-                    Object sync = locks.tryLock(rmtProcId);
-
-                    if (sync != null) {
-                        try {
-                            if (clients.get(rmtProcId) == null) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Will reuse session for descriptor: " + rmtProcId);
-
-                                // Handshake finished flag is true.
-                                clients.put(rmtProcId, new GridHadoopTcpNioCommunicationClient(ses));
-                            }
-                            else {
-                                if (log.isDebugEnabled())
-                                    log.debug("Will not reuse client as another already exists [locProcDesc=" +
-                                        locProcDesc + ", desc=" + desc + ']');
-                            }
-                        }
-                        finally {
-                            locks.unlock(rmtProcId, sync);
-                        }
-                    }
-                    else {
-                        if (log.isDebugEnabled())
-                            log.debug("Concurrent connection is being established, will not reuse client session [" +
-                                "locProcDesc=" + locProcDesc + ", desc=" + desc + ']');
-                    }
-                }
-
-                if (log.isDebugEnabled())
-                    log.debug("Handshake is finished for session [ses=" + ses + ", locProcDesc=" + locProcDesc + ']');
-
-                HandshakeFinish to = ses.meta(HANDSHAKE_FINISH_META);
-
-                if (to != null)
-                    to.finish();
-
-                // Notify session opened (both parties).
-                proceedSessionOpened(ses);
-            }
-            else {
-                if (msgQueueLimit > 0) {
-                    GridNioMessageTracker tracker = ses.meta(TRACKER_META);
-
-                    if (tracker == null) {
-                        GridNioMessageTracker old = ses.addMeta(TRACKER_META, tracker =
-                            new GridNioMessageTracker(ses, msgQueueLimit));
-
-                        assert old == null;
-                    }
-
-                    tracker.onMessageReceived();
-                }
-
-                proceedMessageReceived(ses, msg);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public GridNioFuture<Boolean> onSessionClose(GridNioSession ses) throws IgniteCheckedException {
-            return proceedSessionClose(ses);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onSessionIdleTimeout(GridNioSession ses) throws IgniteCheckedException {
-            proceedSessionIdleTimeout(ses);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onSessionWriteTimeout(GridNioSession ses) throws IgniteCheckedException {
-            proceedSessionWriteTimeout(ses);
-        }
-    }
-
-    /**
-     * Process ID message.
-     */
-    @SuppressWarnings("PublicInnerClass")
-    public static class ProcessHandshakeMessage implements GridHadoopMessage {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Node ID. */
-        private GridHadoopProcessDescriptor procDesc;
-
-        /** */
-        public ProcessHandshakeMessage() {
-            // No-op.
-        }
-
-        /**
-         * @param procDesc Process descriptor.
-         */
-        private ProcessHandshakeMessage(GridHadoopProcessDescriptor procDesc) {
-            this.procDesc = procDesc;
-        }
-
-        /**
-         * @return Process ID.
-         */
-        public GridHadoopProcessDescriptor processDescriptor() {
-            return procDesc;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeObject(procDesc);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            procDesc = (GridHadoopProcessDescriptor)in.readObject();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(ProcessHandshakeMessage.class, this);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopHandshakeTimeoutException.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopHandshakeTimeoutException.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopHandshakeTimeoutException.java
deleted file mode 100644
index e001dc9..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopHandshakeTimeoutException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication;
-
-import org.apache.ignite.*;
-import org.jetbrains.annotations.*;
-
-/** Internal exception class for proper timeout handling. */
-class GridHadoopHandshakeTimeoutException extends IgniteCheckedException {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /**
-     * @param msg Message.
-     */
-    GridHadoopHandshakeTimeoutException(String msg) {
-        super(msg);
-    }
-
-    /**
-     * @param msg Message.
-     * @param cause Cause.
-     */
-    GridHadoopHandshakeTimeoutException(String msg, @Nullable Throwable cause) {
-        super(msg, cause);
-    }
-}