You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/16 11:20:52 UTC

[09/51] [partial] ignite git commit: IGNITE-3916: Initial impl.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java
deleted file mode 100644
index 5ede18e..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor;
-
-import java.util.Collection;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
-import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobMetadata;
-import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker;
-import org.apache.ignite.internal.util.GridConcurrentHashSet;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-
-/**
- * Task executor.
- */
-public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter {
-    /** Job tracker. */
-    private HadoopJobTracker jobTracker;
-
-    /** */
-    private final ConcurrentMap<HadoopJobId, Collection<HadoopRunnableTask>> jobs = new ConcurrentHashMap<>();
-
-    /** Executor service to run tasks. */
-    private HadoopExecutorService exec;
-
-    /** {@inheritDoc} */
-    @Override public void onKernalStart() throws IgniteCheckedException {
-        super.onKernalStart();
-
-        jobTracker = ctx.jobTracker();
-
-        exec = new HadoopExecutorService(log, ctx.kernalContext().gridName(),
-            ctx.configuration().getMaxParallelTasks(), ctx.configuration().getMaxTaskQueueSize());
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onKernalStop(boolean cancel) {
-        if (exec != null) {
-            exec.shutdown(3000);
-
-            if (cancel) {
-                for (HadoopJobId jobId : jobs.keySet())
-                    cancelTasks(jobId);
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void stop(boolean cancel) {
-        if (exec != null && !exec.shutdown(30000))
-            U.warn(log, "Failed to finish running tasks in 30 sec.");
-    }
-
-    /** {@inheritDoc} */
-    @Override public void run(final HadoopJob job, Collection<HadoopTaskInfo> tasks) throws IgniteCheckedException {
-        if (log.isDebugEnabled())
-            log.debug("Submitting tasks for local execution [locNodeId=" + ctx.localNodeId() +
-                ", tasksCnt=" + tasks.size() + ']');
-
-        Collection<HadoopRunnableTask> executedTasks = jobs.get(job.id());
-
-        if (executedTasks == null) {
-            executedTasks = new GridConcurrentHashSet<>();
-
-            Collection<HadoopRunnableTask> extractedCol = jobs.put(job.id(), executedTasks);
-
-            assert extractedCol == null;
-        }
-
-        final Collection<HadoopRunnableTask> finalExecutedTasks = executedTasks;
-
-        for (final HadoopTaskInfo info : tasks) {
-            assert info != null;
-
-            HadoopRunnableTask task = new HadoopRunnableTask(log, job, ctx.shuffle().memory(), info,
-                ctx.localNodeId()) {
-                @Override protected void onTaskFinished(HadoopTaskStatus status) {
-                    if (log.isDebugEnabled())
-                        log.debug("Finished task execution [jobId=" + job.id() + ", taskInfo=" + info + ", " +
-                            "waitTime=" + waitTime() + ", execTime=" + executionTime() + ']');
-
-                    finalExecutedTasks.remove(this);
-
-                    jobTracker.onTaskFinished(info, status);
-                }
-
-                @Override protected HadoopTaskInput createInput(HadoopTaskContext taskCtx) throws IgniteCheckedException {
-                    return ctx.shuffle().input(taskCtx);
-                }
-
-                @Override protected HadoopTaskOutput createOutput(HadoopTaskContext taskCtx) throws IgniteCheckedException {
-                    return ctx.shuffle().output(taskCtx);
-                }
-            };
-
-            executedTasks.add(task);
-
-            exec.submit(task);
-        }
-    }
-
-    /**
-     * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks
-     * for this job ID.
-     * <p>
-     * It is guaranteed that this method will not be called concurrently with
-     * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method. No more job submissions will be performed via
-     * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method for given job ID after this method is called.
-     *
-     * @param jobId Job ID to cancel.
-     */
-    @Override public void cancelTasks(HadoopJobId jobId) {
-        Collection<HadoopRunnableTask> executedTasks = jobs.get(jobId);
-
-        if (executedTasks != null) {
-            for (HadoopRunnableTask task : executedTasks)
-                task.cancel();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onJobStateChanged(HadoopJobMetadata meta) throws IgniteCheckedException {
-        if (meta.phase() == HadoopJobPhase.PHASE_COMPLETE) {
-            Collection<HadoopRunnableTask> executedTasks = jobs.remove(meta.jobId());
-
-            assert executedTasks == null || executedTasks.isEmpty();
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java
deleted file mode 100644
index 993ecc9..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor;
-
-
-import java.util.Collection;
-import java.util.concurrent.Callable;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.internal.util.worker.GridWorkerListener;
-import org.apache.ignite.internal.util.worker.GridWorkerListenerAdapter;
-import org.apache.ignite.thread.IgniteThread;
-import org.jsr166.ConcurrentHashMap8;
-
-import static java.util.Collections.newSetFromMap;
-
-/**
- * Executor service without thread pooling.
- */
-public class HadoopExecutorService {
-    /** */
-    private final LinkedBlockingQueue<Callable<?>> queue;
-
-    /** */
-    private final Collection<GridWorker> workers = newSetFromMap(new ConcurrentHashMap8<GridWorker, Boolean>());
-
-    /** */
-    private final AtomicInteger active = new AtomicInteger();
-
-    /** */
-    private final int maxTasks;
-
-    /** */
-    private final String gridName;
-
-    /** */
-    private final IgniteLogger log;
-
-    /** */
-    private volatile boolean shutdown;
-
-    /** */
-    private final GridWorkerListener lsnr = new GridWorkerListenerAdapter() {
-            @Override public void onStopped(GridWorker w) {
-                workers.remove(w);
-
-                if (shutdown) {
-                    active.decrementAndGet();
-
-                    return;
-                }
-
-                Callable<?> task = queue.poll();
-
-                if (task != null)
-                    startThread(task);
-                else {
-                    active.decrementAndGet();
-
-                    if (!queue.isEmpty())
-                        startFromQueue();
-                }
-            }
-        };
-
-    /**
-     * @param log Logger.
-     * @param gridName Grid name.
-     * @param maxTasks Max number of tasks.
-     * @param maxQueue Max queue length.
-     */
-    public HadoopExecutorService(IgniteLogger log, String gridName, int maxTasks, int maxQueue) {
-        assert maxTasks > 0 : maxTasks;
-        assert maxQueue > 0 : maxQueue;
-
-        this.maxTasks = maxTasks;
-        this.queue = new LinkedBlockingQueue<>(maxQueue);
-        this.gridName = gridName;
-        this.log = log.getLogger(HadoopExecutorService.class);
-    }
-
-    /**
-     * @return Number of active workers.
-     */
-    public int active() {
-        return workers.size();
-    }
-
-    /**
-     * Submit task.
-     *
-     * @param task Task.
-     */
-    public void submit(Callable<?> task) {
-        while (queue.isEmpty()) {
-            int active0 = active.get();
-
-            if (active0 == maxTasks)
-                break;
-
-            if (active.compareAndSet(active0, active0 + 1)) {
-                startThread(task);
-
-                return; // Started in new thread bypassing queue.
-            }
-        }
-
-        try {
-            while (!queue.offer(task, 100, TimeUnit.MILLISECONDS)) {
-                if (shutdown)
-                    return; // Rejected due to shutdown.
-            }
-        }
-        catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-
-            return;
-        }
-
-        startFromQueue();
-    }
-
-    /**
-     * Attempts to start task from queue.
-     */
-    private void startFromQueue() {
-        do {
-            int active0 = active.get();
-
-            if (active0 == maxTasks)
-                break;
-
-            if (active.compareAndSet(active0, active0 + 1)) {
-                Callable<?> task = queue.poll();
-
-                if (task == null) {
-                    int res = active.decrementAndGet();
-
-                    assert res >= 0 : res;
-
-                    break;
-                }
-
-                startThread(task);
-            }
-        }
-        while (!queue.isEmpty());
-    }
-
-    /**
-     * @param task Task.
-     */
-    private void startThread(final Callable<?> task) {
-        String workerName;
-
-        if (task instanceof HadoopRunnableTask) {
-            final HadoopTaskInfo i = ((HadoopRunnableTask)task).taskInfo();
-
-            workerName = "Hadoop-task-" + i.jobId() + "-" + i.type() + "-" + i.taskNumber() + "-" + i.attempt();
-        }
-        else
-            workerName = task.toString();
-
-        GridWorker w = new GridWorker(gridName, workerName, log, lsnr) {
-            @Override protected void body() {
-                try {
-                    task.call();
-                }
-                catch (Exception e) {
-                    log.error("Failed to execute task: " + task, e);
-                }
-            }
-        };
-
-        workers.add(w);
-
-        if (shutdown)
-            w.cancel();
-
-        new IgniteThread(w).start();
-    }
-
-    /**
-     * Shuts down this executor service.
-     *
-     * @param awaitTimeMillis Time in milliseconds to wait for tasks completion.
-     * @return {@code true} If all tasks completed.
-     */
-    public boolean shutdown(long awaitTimeMillis) {
-        shutdown = true;
-
-        for (GridWorker w : workers)
-            w.cancel();
-
-        while (awaitTimeMillis > 0 && !workers.isEmpty()) {
-            try {
-                Thread.sleep(100);
-
-                awaitTimeMillis -= 100;
-            }
-            catch (InterruptedException e) {
-                break;
-            }
-        }
-
-        return workers.isEmpty();
-    }
-
-    /**
-     * @return {@code true} If method {@linkplain #shutdown(long)} was already called.
-     */
-    public boolean isShutdown() {
-        return shutdown;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
deleted file mode 100644
index a57efe6..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor;
-
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter;
-import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopHashMultimap;
-import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopMultimap;
-import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopSkipList;
-import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.COMBINER_HASHMAP_SIZE;
-import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_COMBINER_NO_SORTING;
-import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get;
-import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.COMBINE;
-import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.MAP;
-
-/**
- * Runnable task.
- */
-public abstract class HadoopRunnableTask implements Callable<Void> {
-    /** */
-    private final GridUnsafeMemory mem;
-
-    /** */
-    private final IgniteLogger log;
-
-    /** */
-    private final HadoopJob job;
-
-    /** Task to run. */
-    private final HadoopTaskInfo info;
-
-    /** Submit time. */
-    private final long submitTs = U.currentTimeMillis();
-
-    /** Execution start timestamp. */
-    private long execStartTs;
-
-    /** Execution end timestamp. */
-    private long execEndTs;
-
-    /** */
-    private HadoopMultimap combinerInput;
-
-    /** */
-    private volatile HadoopTaskContext ctx;
-
-    /** Set if task is to cancelling. */
-    private volatile boolean cancelled;
-
-    /** Node id. */
-    private UUID nodeId;
-
-    /**
-     * @param log Log.
-     * @param job Job.
-     * @param mem Memory.
-     * @param info Task info.
-     * @param nodeId Node id.
-     */
-    protected HadoopRunnableTask(IgniteLogger log, HadoopJob job, GridUnsafeMemory mem, HadoopTaskInfo info,
-        UUID nodeId) {
-        this.nodeId = nodeId;
-        this.log = log.getLogger(HadoopRunnableTask.class);
-        this.job = job;
-        this.mem = mem;
-        this.info = info;
-    }
-
-    /**
-     * @return Wait time.
-     */
-    public long waitTime() {
-        return execStartTs - submitTs;
-    }
-
-    /**
-     * @return Execution time.
-     */
-    public long executionTime() {
-        return execEndTs - execStartTs;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Void call() throws IgniteCheckedException {
-        ctx = job.getTaskContext(info);
-
-        return ctx.runAsJobOwner(new Callable<Void>() {
-            @Override public Void call() throws Exception {
-                call0();
-
-                return null;
-            }
-        });
-    }
-
-    /**
-     * Implements actual task running.
-     * @throws IgniteCheckedException
-     */
-    void call0() throws IgniteCheckedException {
-        execStartTs = U.currentTimeMillis();
-
-        Throwable err = null;
-
-        HadoopTaskState state = HadoopTaskState.COMPLETED;
-
-        HadoopPerformanceCounter perfCntr = null;
-
-        try {
-            perfCntr = HadoopPerformanceCounter.getCounter(ctx.counters(), nodeId);
-
-            perfCntr.onTaskSubmit(info, submitTs);
-            perfCntr.onTaskPrepare(info, execStartTs);
-
-            ctx.prepareTaskEnvironment();
-
-            runTask(perfCntr);
-
-            if (info.type() == MAP && job.info().hasCombiner()) {
-                ctx.taskInfo(new HadoopTaskInfo(COMBINE, info.jobId(), info.taskNumber(), info.attempt(), null));
-
-                try {
-                    runTask(perfCntr);
-                }
-                finally {
-                    ctx.taskInfo(info);
-                }
-            }
-        }
-        catch (HadoopTaskCancelledException ignored) {
-            state = HadoopTaskState.CANCELED;
-        }
-        catch (Throwable e) {
-            state = HadoopTaskState.FAILED;
-            err = e;
-
-            U.error(log, "Task execution failed.", e);
-
-            if (e instanceof Error)
-                throw e;
-        }
-        finally {
-            execEndTs = U.currentTimeMillis();
-
-            if (perfCntr != null)
-                perfCntr.onTaskFinish(info, execEndTs);
-
-            onTaskFinished(new HadoopTaskStatus(state, err, ctx==null ? null : ctx.counters()));
-
-            if (combinerInput != null)
-                combinerInput.close();
-
-            if (ctx != null)
-                ctx.cleanupTaskEnvironment();
-        }
-    }
-
-    /**
-     * @param perfCntr Performance counter.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void runTask(HadoopPerformanceCounter perfCntr) throws IgniteCheckedException {
-        if (cancelled)
-            throw new HadoopTaskCancelledException("Task cancelled.");
-
-        try (HadoopTaskOutput out = createOutputInternal(ctx);
-             HadoopTaskInput in = createInputInternal(ctx)) {
-
-            ctx.input(in);
-            ctx.output(out);
-
-            perfCntr.onTaskStart(ctx.taskInfo(), U.currentTimeMillis());
-
-            ctx.run();
-        }
-    }
-
-    /**
-     * Cancel the executed task.
-     */
-    public void cancel() {
-        cancelled = true;
-
-        if (ctx != null)
-            ctx.cancel();
-    }
-
-    /**
-     * @param status Task status.
-     */
-    protected abstract void onTaskFinished(HadoopTaskStatus status);
-
-    /**
-     * @param ctx Task context.
-     * @return Task input.
-     * @throws IgniteCheckedException If failed.
-     */
-    @SuppressWarnings("unchecked")
-    private HadoopTaskInput createInputInternal(HadoopTaskContext ctx) throws IgniteCheckedException {
-        switch (ctx.taskInfo().type()) {
-            case SETUP:
-            case MAP:
-            case COMMIT:
-            case ABORT:
-                return null;
-
-            case COMBINE:
-                assert combinerInput != null;
-
-                return combinerInput.input(ctx);
-
-            default:
-                return createInput(ctx);
-        }
-    }
-
-    /**
-     * @param ctx Task context.
-     * @return Input.
-     * @throws IgniteCheckedException If failed.
-     */
-    protected abstract HadoopTaskInput createInput(HadoopTaskContext ctx) throws IgniteCheckedException;
-
-    /**
-     * @param ctx Task info.
-     * @return Output.
-     * @throws IgniteCheckedException If failed.
-     */
-    protected abstract HadoopTaskOutput createOutput(HadoopTaskContext ctx) throws IgniteCheckedException;
-
-    /**
-     * @param ctx Task info.
-     * @return Task output.
-     * @throws IgniteCheckedException If failed.
-     */
-    private HadoopTaskOutput createOutputInternal(HadoopTaskContext ctx) throws IgniteCheckedException {
-        switch (ctx.taskInfo().type()) {
-            case SETUP:
-            case REDUCE:
-            case COMMIT:
-            case ABORT:
-                return null;
-
-            case MAP:
-                if (job.info().hasCombiner()) {
-                    assert combinerInput == null;
-
-                    combinerInput = get(job.info(), SHUFFLE_COMBINER_NO_SORTING, false) ?
-                        new HadoopHashMultimap(job.info(), mem, get(job.info(), COMBINER_HASHMAP_SIZE, 8 * 1024)):
-                        new HadoopSkipList(job.info(), mem); // TODO replace with red-black tree
-
-                    return combinerInput.startAdding(ctx);
-                }
-
-            default:
-                return createOutput(ctx);
-        }
-    }
-
-    /**
-     * @return Task info.
-     */
-    public HadoopTaskInfo taskInfo() {
-        return info;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java
deleted file mode 100644
index f13c76a..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor;
-
-import java.util.Collection;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopComponent;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobMetadata;
-
-/**
- * Common superclass for task executor.
- */
-public abstract class HadoopTaskExecutorAdapter extends HadoopComponent {
-    /**
-     * Runs tasks.
-     *
-     * @param job Job.
-     * @param tasks Tasks.
-     * @throws IgniteCheckedException If failed.
-     */
-    public abstract void run(final HadoopJob job, Collection<HadoopTaskInfo> tasks) throws IgniteCheckedException;
-
-    /**
-     * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks
-     * for this job ID.
-     * <p>
-     * It is guaranteed that this method will not be called concurrently with
-     * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method. No more job submissions will be performed via
-     * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method for given job ID after this method is called.
-     *
-     * @param jobId Job ID to cancel.
-     */
-    public abstract void cancelTasks(HadoopJobId jobId) throws IgniteCheckedException;
-
-    /**
-     * On job state change callback;
-     *
-     * @param meta Job metadata.
-     */
-    public abstract void onJobStateChanged(HadoopJobMetadata meta) throws IgniteCheckedException;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java
deleted file mode 100644
index b22d291..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor;
-
-/**
-* State of the task.
-*/
-public enum HadoopTaskState {
-    /** Running task. */
-    RUNNING,
-
-    /** Completed task. */
-    COMPLETED,
-
-    /** Failed task. */
-    FAILED,
-
-    /** Canceled task. */
-    CANCELED,
-
-    /** Process crashed. */
-    CRASHED
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java
deleted file mode 100644
index fa09ff7..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Task status.
- */
-public class HadoopTaskStatus implements Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private HadoopTaskState state;
-
-    /** */
-    private Throwable failCause;
-
-    /** */
-    private HadoopCounters cntrs;
-
-    /**
-     * Default constructor required by {@link Externalizable}.
-     */
-    public HadoopTaskStatus() {
-        // No-op.
-    }
-
-    /**
-     * Creates new instance.
-     *
-     * @param state Task state.
-     * @param failCause Failure cause (if any).
-     */
-    public HadoopTaskStatus(HadoopTaskState state, @Nullable Throwable failCause) {
-        this(state, failCause, null);
-    }
-
-    /**
-     * Creates new instance.
-     *
-     * @param state Task state.
-     * @param failCause Failure cause (if any).
-     * @param cntrs Task counters.
-     */
-    public HadoopTaskStatus(HadoopTaskState state, @Nullable Throwable failCause,
-        @Nullable HadoopCounters cntrs) {
-        assert state != null;
-
-        this.state = state;
-        this.failCause = failCause;
-        this.cntrs = cntrs;
-    }
-
-    /**
-     * @return State.
-     */
-    public HadoopTaskState state() {
-        return state;
-    }
-
-    /**
-     * @return Fail cause.
-     */
-    @Nullable public Throwable failCause() {
-        return failCause;
-    }
-
-    /**
-     * @return Counters.
-     */
-    @Nullable public HadoopCounters counters() {
-        return cntrs;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(HadoopTaskStatus.class, this);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeObject(state);
-        out.writeObject(failCause);
-        out.writeObject(cntrs);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        state = (HadoopTaskState)in.readObject();
-        failCause = (Throwable)in.readObject();
-        cntrs = (HadoopCounters)in.readObject();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
deleted file mode 100644
index dc5874d..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java
+++ /dev/null
@@ -1,976 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.locks.ReentrantLock;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.hadoop.HadoopContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
-import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobMetadata;
-import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker;
-import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskExecutorAdapter;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskStatus;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child.HadoopExternalProcessStarter;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.HadoopExternalCommunication;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.HadoopMessageListener;
-import org.apache.ignite.internal.util.GridSpinReadWriteLock;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.spi.IgnitePortProtocol;
-import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
-
-import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.CRASHED;
-import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.FAILED;
-
-/**
- * External process registry. Handles external process lifecycle.
- */
-public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter {
-    /** Hadoop context. */
-    private HadoopContext ctx;
-
-    /** */
-    private String javaCmd;
-
-    /** Logger. */
-    private IgniteLogger log;
-
-    /** Node process descriptor. */
-    private HadoopProcessDescriptor nodeDesc;
-
-    /** Output base. */
-    private File outputBase;
-
-    /** Path separator. */
-    private String pathSep;
-
-    /** Hadoop external communication. */
-    private HadoopExternalCommunication comm;
-
-    /** Starting processes. */
-    private final ConcurrentMap<UUID, HadoopProcess> runningProcsByProcId = new ConcurrentHashMap8<>();
-
-    /** Starting processes. */
-    private final ConcurrentMap<HadoopJobId, HadoopProcess> runningProcsByJobId = new ConcurrentHashMap8<>();
-
-    /** Busy lock. */
-    private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock();
-
-    /** Job tracker. */
-    private HadoopJobTracker jobTracker;
-
-    /** {@inheritDoc} */
-    @Override public void start(HadoopContext ctx) throws IgniteCheckedException {
-        this.ctx = ctx;
-
-        log = ctx.kernalContext().log(HadoopExternalTaskExecutor.class);
-
-        outputBase = U.resolveWorkDirectory("hadoop", false);
-
-        pathSep = System.getProperty("path.separator", U.isWindows() ? ";" : ":");
-
-        initJavaCommand();
-
-        comm = new HadoopExternalCommunication(
-            ctx.localNodeId(),
-            UUID.randomUUID(),
-            ctx.kernalContext().config().getMarshaller(),
-            log,
-            ctx.kernalContext().getSystemExecutorService(),
-            ctx.kernalContext().gridName());
-
-        comm.setListener(new MessageListener());
-
-        comm.start();
-
-        nodeDesc = comm.localProcessDescriptor();
-
-        ctx.kernalContext().ports().registerPort(nodeDesc.tcpPort(), IgnitePortProtocol.TCP,
-            HadoopExternalTaskExecutor.class);
-
-        if (nodeDesc.sharedMemoryPort() != -1)
-            ctx.kernalContext().ports().registerPort(nodeDesc.sharedMemoryPort(), IgnitePortProtocol.TCP,
-                HadoopExternalTaskExecutor.class);
-
-        jobTracker = ctx.jobTracker();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void stop(boolean cancel) {
-        busyLock.writeLock();
-
-        try {
-            comm.stop();
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to gracefully stop external hadoop communication server (will shutdown anyway)", e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onJobStateChanged(final HadoopJobMetadata meta) {
-        final HadoopProcess proc = runningProcsByJobId.get(meta.jobId());
-
-        // If we have a local process for this job.
-        if (proc != null) {
-            if (log.isDebugEnabled())
-                log.debug("Updating job information for remote task process [proc=" + proc + ", meta=" + meta + ']');
-
-            if (meta.phase() == HadoopJobPhase.PHASE_COMPLETE) {
-                if (log.isDebugEnabled())
-                    log.debug("Completed job execution, will terminate child process [jobId=" + meta.jobId() +
-                        ", proc=" + proc + ']');
-
-                runningProcsByJobId.remove(meta.jobId());
-                runningProcsByProcId.remove(proc.descriptor().processId());
-
-                proc.terminate();
-
-                return;
-            }
-
-            if (proc.initFut.isDone()) {
-                if (!proc.initFut.isFailed())
-                    sendJobInfoUpdate(proc, meta);
-                else if (log.isDebugEnabled())
-                    log.debug("Failed to initialize child process (will skip job state notification) " +
-                        "[jobId=" + meta.jobId() + ", meta=" + meta + ']');
-            }
-            else {
-                proc.initFut.listen(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() {
-                    @Override
-                    public void apply(IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) {
-                        try {
-                            f.get();
-
-                            sendJobInfoUpdate(proc, meta);
-                        }
-                        catch (IgniteCheckedException e) {
-                            if (log.isDebugEnabled())
-                                log.debug("Failed to initialize child process (will skip job state notification) " +
-                                    "[jobId=" + meta.jobId() + ", meta=" + meta + ", err=" + e + ']');
-                        }
-
-                    }
-                });
-            }
-        }
-        else if (ctx.isParticipating(meta)) {
-            HadoopJob job;
-
-            try {
-                job = jobTracker.job(meta.jobId(), meta.jobInfo());
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to get job: " + meta.jobId(), e);
-
-                return;
-            }
-
-            startProcess(job, meta.mapReducePlan());
-        }
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("ConstantConditions")
-    @Override public void run(final HadoopJob job, final Collection<HadoopTaskInfo> tasks) throws IgniteCheckedException {
-        if (!busyLock.tryReadLock()) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to start hadoop tasks (grid is stopping, will ignore).");
-
-            return;
-        }
-
-        try {
-            HadoopProcess proc = runningProcsByJobId.get(job.id());
-
-            HadoopTaskType taskType = F.first(tasks).type();
-
-            if (taskType == HadoopTaskType.SETUP || taskType == HadoopTaskType.ABORT ||
-                taskType == HadoopTaskType.COMMIT) {
-                if (proc == null || proc.terminated()) {
-                    runningProcsByJobId.remove(job.id(), proc);
-
-                    // Start new process for ABORT task since previous processes were killed.
-                    proc = startProcess(job, jobTracker.plan(job.id()));
-
-                    if (log.isDebugEnabled())
-                        log.debug("Starting new process for maintenance task [jobId=" + job.id() +
-                            ", proc=" + proc + ", taskType=" + taskType + ']');
-                }
-            }
-            else
-                assert proc != null : "Missing started process for task execution request: " + job.id() +
-                    ", tasks=" + tasks;
-
-            final HadoopProcess proc0 = proc;
-
-            proc.initFut.listen(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() {
-                @Override public void apply(
-                    IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) {
-                    if (!busyLock.tryReadLock())
-                        return;
-
-                    try {
-                        f.get();
-
-                        proc0.addTasks(tasks);
-
-                        if (log.isDebugEnabled())
-                            log.debug("Sending task execution request to child process [jobId=" + job.id() +
-                                ", proc=" + proc0 + ", tasks=" + tasks + ']');
-
-                        sendExecutionRequest(proc0, job, tasks);
-                    }
-                    catch (IgniteCheckedException e) {
-                        notifyTasksFailed(tasks, FAILED, e);
-                    }
-                    finally {
-                        busyLock.readUnlock();
-                    }
-                }
-            });
-        }
-        finally {
-            busyLock.readUnlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void cancelTasks(HadoopJobId jobId) {
-        HadoopProcess proc = runningProcsByJobId.get(jobId);
-
-        if (proc != null)
-            proc.terminate();
-    }
-
-    /**
-     * Sends execution request to remote node.
-     *
-     * @param proc Process to send request to.
-     * @param job Job instance.
-     * @param tasks Collection of tasks to execute in started process.
-     */
-    private void sendExecutionRequest(HadoopProcess proc, HadoopJob job, Collection<HadoopTaskInfo> tasks)
-        throws IgniteCheckedException {
-        // Must synchronize since concurrent process crash may happen and will receive onConnectionLost().
-        proc.lock();
-
-        try {
-            if (proc.terminated()) {
-                notifyTasksFailed(tasks, CRASHED, null);
-
-                return;
-            }
-
-            HadoopTaskExecutionRequest req = new HadoopTaskExecutionRequest();
-
-            req.jobId(job.id());
-            req.jobInfo(job.info());
-            req.tasks(tasks);
-
-            comm.sendMessage(proc.descriptor(), req);
-        }
-        finally {
-            proc.unlock();
-        }
-    }
-
-    /**
-     * @return External task metadata.
-     */
-    private HadoopExternalTaskMetadata buildTaskMeta() {
-        HadoopExternalTaskMetadata meta = new HadoopExternalTaskMetadata();
-
-        meta.classpath(Arrays.asList(System.getProperty("java.class.path").split(File.pathSeparator)));
-        meta.jvmOptions(Arrays.asList("-Xmx1g", "-ea", "-XX:+UseConcMarkSweepGC", "-XX:+CMSClassUnloadingEnabled",
-            "-DIGNITE_HOME=" + U.getIgniteHome()));
-
-        return meta;
-    }
-
-    /**
-     * @param tasks Tasks to notify about.
-     * @param state Fail state.
-     * @param e Optional error.
-     */
-    private void notifyTasksFailed(Iterable<HadoopTaskInfo> tasks, HadoopTaskState state, Throwable e) {
-        HadoopTaskStatus fail = new HadoopTaskStatus(state, e);
-
-        for (HadoopTaskInfo task : tasks)
-            jobTracker.onTaskFinished(task, fail);
-    }
-
-    /**
-     * Starts process template that will be ready to execute Hadoop tasks.
-     *
-     * @param job Job instance.
-     * @param plan Map reduce plan.
-     */
-    private HadoopProcess startProcess(final HadoopJob job, final HadoopMapReducePlan plan) {
-        final UUID childProcId = UUID.randomUUID();
-
-        HadoopJobId jobId = job.id();
-
-        final HadoopProcessFuture fut = new HadoopProcessFuture(childProcId, jobId);
-
-        final HadoopProcess proc = new HadoopProcess(jobId, fut, plan.reducers(ctx.localNodeId()));
-
-        HadoopProcess old = runningProcsByJobId.put(jobId, proc);
-
-        assert old == null;
-
-        old = runningProcsByProcId.put(childProcId, proc);
-
-        assert old == null;
-
-        ctx.kernalContext().closure().runLocalSafe(new Runnable() {
-            @Override public void run() {
-                if (!busyLock.tryReadLock()) {
-                    fut.onDone(new IgniteCheckedException("Failed to start external process (grid is stopping)."));
-
-                    return;
-                }
-
-                try {
-                    HadoopExternalTaskMetadata startMeta = buildTaskMeta();
-
-                    if (log.isDebugEnabled())
-                        log.debug("Created hadoop child process metadata for job [job=" + job +
-                            ", childProcId=" + childProcId + ", taskMeta=" + startMeta + ']');
-
-                    Process proc = startJavaProcess(childProcId, startMeta, job);
-
-                    BufferedReader rdr = new BufferedReader(new InputStreamReader(proc.getInputStream()));
-
-                    String line;
-
-                    // Read up all the process output.
-                    while ((line = rdr.readLine()) != null) {
-                        if (log.isDebugEnabled())
-                            log.debug("Tracing process output: " + line);
-
-                        if ("Started".equals(line)) {
-                            // Process started successfully, it should not write anything more to the output stream.
-                            if (log.isDebugEnabled())
-                                log.debug("Successfully started child process [childProcId=" + childProcId +
-                                    ", meta=" + job + ']');
-
-                            fut.onProcessStarted(proc);
-
-                            break;
-                        }
-                        else if ("Failed".equals(line)) {
-                            StringBuilder sb = new StringBuilder("Failed to start child process: " + job + "\n");
-
-                            while ((line = rdr.readLine()) != null)
-                                sb.append("    ").append(line).append("\n");
-
-                            // Cut last character.
-                            sb.setLength(sb.length() - 1);
-
-                            log.warning(sb.toString());
-
-                            fut.onDone(new IgniteCheckedException(sb.toString()));
-
-                            break;
-                        }
-                    }
-                }
-                catch (Throwable e) {
-                    fut.onDone(new IgniteCheckedException("Failed to initialize child process: " + job, e));
-
-                    if (e instanceof Error)
-                        throw (Error)e;
-                }
-                finally {
-                    busyLock.readUnlock();
-                }
-            }
-        }, true);
-
-        fut.listen(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() {
-            @Override public void apply(IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) {
-                try {
-                    // Make sure there were no exceptions.
-                    f.get();
-
-                    prepareForJob(proc, job, plan);
-                }
-                catch (IgniteCheckedException ignore) {
-                    // Exception is printed in future's onDone() method.
-                }
-            }
-        });
-
-        return proc;
-    }
-
-    /**
-     * Checks that java local command is available.
-     *
-     * @throws IgniteCheckedException If initialization failed.
-     */
-    private void initJavaCommand() throws IgniteCheckedException {
-        String javaHome = System.getProperty("java.home");
-
-        if (javaHome == null)
-            javaHome = System.getenv("JAVA_HOME");
-
-        if (javaHome == null)
-            throw new IgniteCheckedException("Failed to locate JAVA_HOME.");
-
-        javaCmd = javaHome + File.separator + "bin" + File.separator + (U.isWindows() ? "java.exe" : "java");
-
-        try {
-            Process proc = new ProcessBuilder(javaCmd, "-version").redirectErrorStream(true).start();
-
-            Collection<String> out = readProcessOutput(proc);
-
-            int res = proc.waitFor();
-
-            if (res != 0)
-                throw new IgniteCheckedException("Failed to execute 'java -version' command (process finished with nonzero " +
-                    "code) [exitCode=" + res + ", javaCmd='" + javaCmd + "', msg=" + F.first(out) + ']');
-
-            if (log.isInfoEnabled()) {
-                log.info("Will use java for external task execution: ");
-
-                for (String s : out)
-                    log.info("    " + s);
-            }
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException("Failed to check java for external task execution.", e);
-        }
-        catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-
-            throw new IgniteCheckedException("Failed to wait for process completion (thread got interrupted).", e);
-        }
-    }
-
-    /**
-     * Reads process output line-by-line.
-     *
-     * @param proc Process to read output.
-     * @return Read lines.
-     * @throws IOException If read failed.
-     */
-    private Collection<String> readProcessOutput(Process proc) throws IOException {
-        BufferedReader rdr = new BufferedReader(new InputStreamReader(proc.getInputStream()));
-
-        Collection<String> res = new ArrayList<>();
-
-        String s;
-
-        while ((s = rdr.readLine()) != null)
-            res.add(s);
-
-        return res;
-    }
-
-    /**
-     * Builds process from metadata.
-     *
-     * @param childProcId Child process ID.
-     * @param startMeta Metadata.
-     * @param job Job.
-     * @return Started process.
-     */
-    private Process startJavaProcess(UUID childProcId, HadoopExternalTaskMetadata startMeta,
-        HadoopJob job) throws Exception {
-        String outFldr = jobWorkFolder(job.id()) + File.separator + childProcId;
-
-        if (log.isDebugEnabled())
-            log.debug("Will write process log output to: " + outFldr);
-
-        List<String> cmd = new ArrayList<>();
-
-        File workDir = U.resolveWorkDirectory("", false);
-
-        cmd.add(javaCmd);
-        cmd.addAll(startMeta.jvmOptions());
-        cmd.add("-cp");
-        cmd.add(buildClasspath(startMeta.classpath()));
-        cmd.add(HadoopExternalProcessStarter.class.getName());
-        cmd.add("-cpid");
-        cmd.add(String.valueOf(childProcId));
-        cmd.add("-ppid");
-        cmd.add(String.valueOf(nodeDesc.processId()));
-        cmd.add("-nid");
-        cmd.add(String.valueOf(nodeDesc.parentNodeId()));
-        cmd.add("-addr");
-        cmd.add(nodeDesc.address());
-        cmd.add("-tport");
-        cmd.add(String.valueOf(nodeDesc.tcpPort()));
-        cmd.add("-sport");
-        cmd.add(String.valueOf(nodeDesc.sharedMemoryPort()));
-        cmd.add("-out");
-        cmd.add(outFldr);
-        cmd.add("-wd");
-        cmd.add(workDir.getAbsolutePath());
-
-        return new ProcessBuilder(cmd)
-            .redirectErrorStream(true)
-            .directory(workDir)
-            .start();
-    }
-
-    /**
-     * Gets job work folder.
-     *
-     * @param jobId Job ID.
-     * @return Job work folder.
-     */
-    private String jobWorkFolder(HadoopJobId jobId) {
-        return outputBase + File.separator + "Job_" + jobId;
-    }
-
-    /**
-     * @param cp Classpath collection.
-     * @return Classpath string.
-     */
-    private String buildClasspath(Collection<String> cp) {
-        assert !cp.isEmpty();
-
-        StringBuilder sb = new StringBuilder();
-
-        for (String s : cp)
-            sb.append(s).append(pathSep);
-
-        sb.setLength(sb.length() - 1);
-
-        return sb.toString();
-    }
-
-    /**
-     * Sends job info update request to remote process.
-     *
-     * @param proc Process to send request to.
-     * @param meta Job metadata.
-     */
-    private void sendJobInfoUpdate(HadoopProcess proc, HadoopJobMetadata meta) {
-        Map<Integer, HadoopProcessDescriptor> rdcAddrs = meta.reducersAddresses();
-
-        int rdcNum = meta.mapReducePlan().reducers();
-
-        HadoopProcessDescriptor[] addrs = null;
-
-        if (rdcAddrs != null && rdcAddrs.size() == rdcNum) {
-            addrs = new HadoopProcessDescriptor[rdcNum];
-
-            for (int i = 0; i < rdcNum; i++) {
-                HadoopProcessDescriptor desc = rdcAddrs.get(i);
-
-                assert desc != null : "Missing reducing address [meta=" + meta + ", rdc=" + i + ']';
-
-                addrs[i] = desc;
-            }
-        }
-
-        try {
-            comm.sendMessage(proc.descriptor(), new HadoopJobInfoUpdateRequest(proc.jobId, meta.phase(), addrs));
-        }
-        catch (IgniteCheckedException e) {
-            if (!proc.terminated()) {
-                log.error("Failed to send job state update message to remote child process (will kill the process) " +
-                    "[jobId=" + proc.jobId + ", meta=" + meta + ']', e);
-
-                proc.terminate();
-            }
-        }
-    }
-
-    /**
-     * Sends prepare request to remote process.
-     *
-     * @param proc Process to send request to.
-     * @param job Job.
-     * @param plan Map reduce plan.
-     */
-    private void prepareForJob(HadoopProcess proc, HadoopJob job, HadoopMapReducePlan plan) {
-        try {
-            comm.sendMessage(proc.descriptor(), new HadoopPrepareForJobRequest(job.id(), job.info(),
-                plan.reducers(), plan.reducers(ctx.localNodeId())));
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send job prepare request to remote process [proc=" + proc + ", job=" + job +
-                ", plan=" + plan + ']', e);
-
-            proc.terminate();
-        }
-    }
-
-    /**
-     * Processes task finished message.
-     *
-     * @param desc Remote process descriptor.
-     * @param taskMsg Task finished message.
-     */
-    private void processTaskFinishedMessage(HadoopProcessDescriptor desc, HadoopTaskFinishedMessage taskMsg) {
-        HadoopProcess proc = runningProcsByProcId.get(desc.processId());
-
-        if (proc != null)
-            proc.removeTask(taskMsg.taskInfo());
-
-        jobTracker.onTaskFinished(taskMsg.taskInfo(), taskMsg.status());
-    }
-
-    /**
-     *
-     */
-    private class MessageListener implements HadoopMessageListener {
-        /** {@inheritDoc} */
-        @Override public void onMessageReceived(HadoopProcessDescriptor desc, HadoopMessage msg) {
-            if (!busyLock.tryReadLock())
-                return;
-
-            try {
-                if (msg instanceof HadoopProcessStartedAck) {
-                    HadoopProcess proc = runningProcsByProcId.get(desc.processId());
-
-                    assert proc != null : "Missing child process for processId: " + desc;
-
-                    HadoopProcessFuture fut = proc.initFut;
-
-                    if (fut != null)
-                        fut.onReplyReceived(desc);
-                    // Safety.
-                    else
-                        log.warning("Failed to find process start future (will ignore): " + desc);
-                }
-                else if (msg instanceof HadoopTaskFinishedMessage) {
-                    HadoopTaskFinishedMessage taskMsg = (HadoopTaskFinishedMessage)msg;
-
-                    processTaskFinishedMessage(desc, taskMsg);
-                }
-                else
-                    log.warning("Unexpected message received by node [desc=" + desc + ", msg=" + msg + ']');
-            }
-            finally {
-                busyLock.readUnlock();
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onConnectionLost(HadoopProcessDescriptor desc) {
-            if (!busyLock.tryReadLock())
-                return;
-
-            try {
-                if (desc == null) {
-                    U.warn(log, "Handshake failed.");
-
-                    return;
-                }
-
-                // Notify job tracker about failed tasks.
-                HadoopProcess proc = runningProcsByProcId.get(desc.processId());
-
-                if (proc != null) {
-                    Collection<HadoopTaskInfo> tasks = proc.tasks();
-
-                    if (!F.isEmpty(tasks)) {
-                        log.warning("Lost connection with alive process (will terminate): " + desc);
-
-                        HadoopTaskStatus status = new HadoopTaskStatus(CRASHED,
-                            new IgniteCheckedException("Failed to run tasks (external process finished unexpectedly): " + desc));
-
-                        for (HadoopTaskInfo info : tasks)
-                            jobTracker.onTaskFinished(info, status);
-
-                        runningProcsByJobId.remove(proc.jobId(), proc);
-                    }
-
-                    // Safety.
-                    proc.terminate();
-                }
-            }
-            finally {
-                busyLock.readUnlock();
-            }
-        }
-    }
-
-    /**
-     * Hadoop process.
-     */
-    private static class HadoopProcess extends ReentrantLock {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Job ID. */
-        private final HadoopJobId jobId;
-
-        /** Process. */
-        private Process proc;
-
-        /** Init future. Completes when process is ready to receive messages. */
-        private final HadoopProcessFuture initFut;
-
-        /** Process descriptor. */
-        private HadoopProcessDescriptor procDesc;
-
-        /** Reducers planned for this process. */
-        private Collection<Integer> reducers;
-
-        /** Tasks. */
-        private final Collection<HadoopTaskInfo> tasks = new ConcurrentLinkedDeque8<>();
-
-        /** Terminated flag. */
-        private volatile boolean terminated;
-
-        /**
-         * @param jobId Job ID.
-         * @param initFut Init future.
-         */
-        private HadoopProcess(HadoopJobId jobId, HadoopProcessFuture initFut,
-            int[] reducers) {
-            this.jobId = jobId;
-            this.initFut = initFut;
-
-            if (!F.isEmpty(reducers)) {
-                this.reducers = new ArrayList<>(reducers.length);
-
-                for (int r : reducers)
-                    this.reducers.add(r);
-            }
-        }
-
-        /**
-         * @return Communication process descriptor.
-         */
-        private HadoopProcessDescriptor descriptor() {
-            return procDesc;
-        }
-
-        /**
-         * @return Job ID.
-         */
-        public HadoopJobId jobId() {
-            return jobId;
-        }
-
-        /**
-         * Initialized callback.
-         *
-         * @param proc Java process representation.
-         * @param procDesc Process descriptor.
-         */
-        private void onInitialized(Process proc, HadoopProcessDescriptor procDesc) {
-            this.proc = proc;
-            this.procDesc = procDesc;
-        }
-
-        /**
-         * Terminates process (kills it).
-         */
-        private void terminate() {
-            // Guard against concurrent message sending.
-            lock();
-
-            try {
-                terminated = true;
-
-                if (!initFut.isDone())
-                    initFut.listen(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() {
-                        @Override public void apply(
-                            IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) {
-                            proc.destroy();
-                        }
-                    });
-                else
-                    proc.destroy();
-            }
-            finally {
-                unlock();
-            }
-        }
-
-        /**
-         * @return Terminated flag.
-         */
-        private boolean terminated() {
-            return terminated;
-        }
-
-        /**
-         * Sets process tasks.
-         *
-         * @param tasks Tasks to set.
-         */
-        private void addTasks(Collection<HadoopTaskInfo> tasks) {
-            this.tasks.addAll(tasks);
-        }
-
-        /**
-         * Removes task when it was completed.
-         *
-         * @param task Task to remove.
-         */
-        private void removeTask(HadoopTaskInfo task) {
-            if (tasks != null)
-                tasks.remove(task);
-        }
-
-        /**
-         * @return Collection of tasks.
-         */
-        private Collection<HadoopTaskInfo> tasks() {
-            return tasks;
-        }
-
-        /**
-         * @return Planned reducers.
-         */
-        private Collection<Integer> reducers() {
-            return reducers;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(HadoopProcess.class, this);
-        }
-    }
-
-    /**
-     *
-     */
-    private class HadoopProcessFuture extends GridFutureAdapter<IgniteBiTuple<Process, HadoopProcessDescriptor>> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Child process ID. */
-        private UUID childProcId;
-
-        /** Job ID. */
-        private HadoopJobId jobId;
-
-        /** Process descriptor. */
-        private HadoopProcessDescriptor desc;
-
-        /** Running process. */
-        private Process proc;
-
-        /** Process started flag. */
-        private volatile boolean procStarted;
-
-        /** Reply received flag. */
-        private volatile boolean replyReceived;
-
-        /** Logger. */
-        private final IgniteLogger log = HadoopExternalTaskExecutor.this.log;
-
-        /**
-         */
-        private HadoopProcessFuture(UUID childProcId, HadoopJobId jobId) {
-            this.childProcId = childProcId;
-            this.jobId = jobId;
-        }
-
-        /**
-         * Process started callback.
-         */
-        public void onProcessStarted(Process proc) {
-            this.proc = proc;
-
-            procStarted = true;
-
-            if (procStarted && replyReceived)
-                onDone(F.t(proc, desc));
-        }
-
-        /**
-         * Reply received callback.
-         */
-        public void onReplyReceived(HadoopProcessDescriptor desc) {
-            assert childProcId.equals(desc.processId());
-
-            this.desc = desc;
-
-            replyReceived = true;
-
-            if (procStarted && replyReceived)
-                onDone(F.t(proc, desc));
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean onDone(@Nullable IgniteBiTuple<Process, HadoopProcessDescriptor> res,
-            @Nullable Throwable err) {
-            if (err == null) {
-                HadoopProcess proc = runningProcsByProcId.get(childProcId);
-
-                assert proc != null;
-
-                assert proc.initFut == this;
-
-                proc.onInitialized(res.get1(), res.get2());
-
-                if (!F.isEmpty(proc.reducers()))
-                    jobTracker.onExternalMappersInitialized(jobId, proc.reducers(), desc);
-            }
-            else {
-                // Clean up since init failed.
-                runningProcsByJobId.remove(jobId);
-                runningProcsByProcId.remove(childProcId);
-            }
-
-            if (super.onDone(res, err)) {
-                if (err == null) {
-                    if (log.isDebugEnabled())
-                        log.debug("Initialized child process for external task execution [jobId=" + jobId +
-                            ", desc=" + desc + ", initTime=" + duration() + ']');
-                }
-                else
-                    U.error(log, "Failed to initialize child process for external task execution [jobId=" + jobId +
-                        ", desc=" + desc + ']', err);
-
-                return true;
-            }
-
-            return false;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskMetadata.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskMetadata.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskMetadata.java
deleted file mode 100644
index 27b0329..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskMetadata.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
-
-import java.util.Collection;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- * External task metadata (classpath, JVM options) needed to start external process execution.
- */
-public class HadoopExternalTaskMetadata {
-    /** Process classpath. */
-    private Collection<String> classpath;
-
-    /** JVM options. */
-    @GridToStringInclude
-    private Collection<String> jvmOpts;
-
-    /**
-     * @return JVM Options.
-     */
-    public Collection<String> jvmOptions() {
-        return jvmOpts;
-    }
-
-    /**
-     * @param jvmOpts JVM options.
-     */
-    public void jvmOptions(Collection<String> jvmOpts) {
-        this.jvmOpts = jvmOpts;
-    }
-
-    /**
-     * @return Classpath.
-     */
-    public Collection<String> classpath() {
-        return classpath;
-    }
-
-    /**
-     * @param classpath Classpath.
-     */
-    public void classpath(Collection<String> classpath) {
-        this.classpath = classpath;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(HadoopExternalTaskMetadata.class, this);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java
deleted file mode 100644
index 96b3675..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase;
-import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * Job info update request.
- */
-public class HadoopJobInfoUpdateRequest implements HadoopMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Job ID. */
-    @GridToStringInclude
-    private HadoopJobId jobId;
-
-    /** Job phase. */
-    @GridToStringInclude
-    private HadoopJobPhase jobPhase;
-
-    /** Reducers addresses. */
-    @GridToStringInclude
-    private HadoopProcessDescriptor[] reducersAddrs;
-
-    /**
-     * Constructor required by {@link Externalizable}.
-     */
-    public HadoopJobInfoUpdateRequest() {
-        // No-op.
-    }
-
-    /**
-     * @param jobId Job ID.
-     * @param jobPhase Job phase.
-     * @param reducersAddrs Reducers addresses.
-     */
-    public HadoopJobInfoUpdateRequest(HadoopJobId jobId, HadoopJobPhase jobPhase,
-        HadoopProcessDescriptor[] reducersAddrs) {
-        assert jobId != null;
-
-        this.jobId = jobId;
-        this.jobPhase = jobPhase;
-        this.reducersAddrs = reducersAddrs;
-    }
-
-    /**
-     * @return Job ID.
-     */
-    public HadoopJobId jobId() {
-        return jobId;
-    }
-
-    /**
-     * @return Job phase.
-     */
-    public HadoopJobPhase jobPhase() {
-        return jobPhase;
-    }
-
-    /**
-     * @return Reducers addresses.
-     */
-    public HadoopProcessDescriptor[] reducersAddresses() {
-        return reducersAddrs;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        jobId.writeExternal(out);
-
-        out.writeObject(jobPhase);
-        U.writeArray(out, reducersAddrs);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        jobId = new HadoopJobId();
-        jobId.readExternal(in);
-
-        jobPhase = (HadoopJobPhase)in.readObject();
-        reducersAddrs = (HadoopProcessDescriptor[])U.readArray(in);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(HadoopJobInfoUpdateRequest.class, this);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java
deleted file mode 100644
index 43bdc36..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
-import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * Child process initialization request.
- */
-public class HadoopPrepareForJobRequest implements HadoopMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Job ID. */
-    @GridToStringInclude
-    private HadoopJobId jobId;
-
-    /** Job info. */
-    @GridToStringInclude
-    private HadoopJobInfo jobInfo;
-
-    /** Total amount of reducers in the job. */
-    @GridToStringInclude
-    private int totalReducersCnt;
-
-    /** Reducers to be executed on current node. */
-    @GridToStringInclude
-    private int[] locReducers;
-
-    /**
-     * Constructor required by {@link Externalizable}.
-     */
-    public HadoopPrepareForJobRequest() {
-        // No-op.
-    }
-
-    /**
-     * @param jobId Job ID.
-     * @param jobInfo Job info.
-     * @param totalReducersCnt Number of reducers in the job.
-     * @param locReducers Reducers to be executed on current node.
-     */
-    public HadoopPrepareForJobRequest(HadoopJobId jobId, HadoopJobInfo jobInfo, int totalReducersCnt,
-        int[] locReducers) {
-        assert jobId != null;
-
-        this.jobId = jobId;
-        this.jobInfo = jobInfo;
-        this.totalReducersCnt = totalReducersCnt;
-        this.locReducers = locReducers;
-    }
-
-    /**
-     * @return Job info.
-     */
-    public HadoopJobInfo jobInfo() {
-        return jobInfo;
-    }
-
-    /**
-     * @return Job ID.
-     */
-    public HadoopJobId jobId() {
-        return jobId;
-    }
-
-    /**
-     * @return Reducers to be executed on current node.
-     */
-    public int[] localReducers() {
-        return locReducers;
-    }
-
-    /**
-     * @return Number of reducers in job.
-     */
-    public int totalReducerCount() {
-        return totalReducersCnt;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        jobId.writeExternal(out);
-
-        out.writeObject(jobInfo);
-        out.writeInt(totalReducersCnt);
-
-        U.writeIntArray(out, locReducers);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        jobId = new HadoopJobId();
-        jobId.readExternal(in);
-
-        jobInfo = (HadoopJobInfo)in.readObject();
-        totalReducersCnt = in.readInt();
-
-        locReducers = U.readIntArray(in);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(HadoopPrepareForJobRequest.class, this);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessDescriptor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessDescriptor.java
deleted file mode 100644
index 2dc233b..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessDescriptor.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
-
-import java.io.Serializable;
-import java.util.UUID;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- * Process descriptor used to identify process for which task is running.
- */
-public class HadoopProcessDescriptor implements Serializable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Parent node ID. */
-    private UUID parentNodeId;
-
-    /** Process ID. */
-    private UUID procId;
-
-    /** Address. */
-    private String addr;
-
-    /** TCP port. */
-    private int tcpPort;
-
-    /** Shared memory port. */
-    private int shmemPort;
-
-    /**
-     * @param parentNodeId Parent node ID.
-     * @param procId Process ID.
-     */
-    public HadoopProcessDescriptor(UUID parentNodeId, UUID procId) {
-        this.parentNodeId = parentNodeId;
-        this.procId = procId;
-    }
-
-    /**
-     * Gets process ID.
-     *
-     * @return Process ID.
-     */
-    public UUID processId() {
-        return procId;
-    }
-
-    /**
-     * Gets parent node ID.
-     *
-     * @return Parent node ID.
-     */
-    public UUID parentNodeId() {
-        return parentNodeId;
-    }
-
-    /**
-     * Gets host address.
-     *
-     * @return Host address.
-     */
-    public String address() {
-        return addr;
-    }
-
-    /**
-     * Sets host address.
-     *
-     * @param addr Host address.
-     */
-    public void address(String addr) {
-        this.addr = addr;
-    }
-
-    /**
-     * @return Shared memory port.
-     */
-    public int sharedMemoryPort() {
-        return shmemPort;
-    }
-
-    /**
-     * Sets shared memory port.
-     *
-     * @param shmemPort Shared memory port.
-     */
-    public void sharedMemoryPort(int shmemPort) {
-        this.shmemPort = shmemPort;
-    }
-
-    /**
-     * @return TCP port.
-     */
-    public int tcpPort() {
-        return tcpPort;
-    }
-
-    /**
-     * Sets TCP port.
-     *
-     * @param tcpPort TCP port.
-     */
-    public void tcpPort(int tcpPort) {
-        this.tcpPort = tcpPort;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object o) {
-        if (this == o)
-            return true;
-
-        if (!(o instanceof HadoopProcessDescriptor))
-            return false;
-
-        HadoopProcessDescriptor that = (HadoopProcessDescriptor)o;
-
-        return parentNodeId.equals(that.parentNodeId) && procId.equals(that.procId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        int result = parentNodeId.hashCode();
-
-        result = 31 * result + procId.hashCode();
-
-        return result;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(HadoopProcessDescriptor.class, this);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessStartedAck.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessStartedAck.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessStartedAck.java
deleted file mode 100644
index b35f3ec..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessStartedAck.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor.external;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-/**
- * Process started message.
- */
-public class HadoopProcessStartedAck implements HadoopMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(HadoopProcessStartedAck.class, this);
-    }
-}
\ No newline at end of file