You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2015/03/05 10:05:51 UTC

[51/58] [abbrv] incubator-ignite git commit: Merge branch 'sprint-2' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-futures-cleanup-1

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17ac3602/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
----------------------------------------------------------------------
diff --cc modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
index 0000000,39f42b2..30133f5
mode 000000,100644..100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
@@@ -1,0 -1,1626 +1,1626 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.hadoop.jobtracker;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.events.*;
+ import org.apache.ignite.events.EventType;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.managers.eventstorage.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.hadoop.*;
+ import org.apache.ignite.internal.processors.hadoop.counter.*;
+ import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+ import org.apache.ignite.internal.processors.hadoop.taskexecutor.*;
+ import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.internal.util.future.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.lang.*;
+ import org.jdk8.backport.*;
+ import org.jetbrains.annotations.*;
+ 
+ import javax.cache.event.*;
+ import javax.cache.expiry.*;
+ import javax.cache.processor.*;
+ import java.io.*;
+ import java.util.*;
+ import java.util.concurrent.*;
+ import java.util.concurrent.atomic.*;
+ 
+ import static java.util.concurrent.TimeUnit.*;
+ import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.*;
+ import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.*;
+ import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.*;
+ 
+ /**
+  * Hadoop job tracker.
+  */
+ public class HadoopJobTracker extends HadoopComponent {
+     /** */
+     private final GridMutex mux = new GridMutex();
+ 
+     /** */
+     private volatile GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> jobMetaPrj;
+ 
+     /** Projection with expiry policy for finished job updates. */
+     private volatile GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> finishedJobMetaPrj;
+ 
+     /** Map-reduce execution planner. */
+     @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+     private HadoopMapReducePlanner mrPlanner;
+ 
+     /** All the known jobs. */
 -    private final ConcurrentMap<HadoopJobId, GridFutureAdapterEx<HadoopJob>> jobs = new ConcurrentHashMap8<>();
++    private final ConcurrentMap<HadoopJobId, GridFutureAdapter<HadoopJob>> jobs = new ConcurrentHashMap8<>();
+ 
+     /** Locally active jobs. */
+     private final ConcurrentMap<HadoopJobId, JobLocalState> activeJobs = new ConcurrentHashMap8<>();
+ 
+     /** Locally requested finish futures. */
+     private final ConcurrentMap<HadoopJobId, GridFutureAdapter<HadoopJobId>> activeFinishFuts =
+         new ConcurrentHashMap8<>();
+ 
+     /** Event processing service. */
+     private ExecutorService evtProcSvc;
+ 
+     /** Component busy lock. */
+     private GridSpinReadWriteLock busyLock;
+ 
+     /** Closure to check result of async transform of system cache. */
+     private final IgniteInClosure<IgniteInternalFuture<?>> failsLog = new CI1<IgniteInternalFuture<?>>() {
+         @Override public void apply(IgniteInternalFuture<?> gridFut) {
+             try {
+                 gridFut.get();
+             }
+             catch (IgniteCheckedException e) {
+                 U.error(log, "Failed to transform system cache.", e);
+             }
+         }
+     };
+ 
+     /** {@inheritDoc} */
+     @Override public void start(HadoopContext ctx) throws IgniteCheckedException {
+         super.start(ctx);
+ 
+         busyLock = new GridSpinReadWriteLock();
+ 
+         evtProcSvc = Executors.newFixedThreadPool(1);
+     }
+ 
+     /**
+      * @return Job meta projection.
+      */
+     @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
+     private GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> jobMetaCache() {
+         GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> prj = jobMetaPrj;
+ 
+         if (prj == null) {
+             synchronized (mux) {
+                 if ((prj = jobMetaPrj) == null) {
+                     CacheProjection<Object, Object> sysCache = ctx.kernalContext().cache()
+                         .cache(CU.SYS_CACHE_HADOOP_MR);
+ 
+                     assert sysCache != null;
+ 
+                     mrPlanner = ctx.planner();
+ 
+                     try {
+                         ctx.kernalContext().resource().injectGeneric(mrPlanner);
+                     }
+                     catch (IgniteCheckedException e) { // Must not happen.
+                         U.error(log, "Failed to inject resources.", e);
+ 
+                         throw new IllegalStateException(e);
+                     }
+ 
+                     jobMetaPrj = prj = (GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata>)
+                         sysCache.projection(HadoopJobId.class, HadoopJobMetadata.class);
+ 
+                     if (ctx.configuration().getFinishedJobInfoTtl() > 0) {
+                         ExpiryPolicy finishedJobPlc = new ModifiedExpiryPolicy(
+                             new Duration(MILLISECONDS, ctx.configuration().getFinishedJobInfoTtl()));
+ 
+                         finishedJobMetaPrj = prj.withExpiryPolicy(finishedJobPlc);
+                     }
+                     else
+                         finishedJobMetaPrj = jobMetaPrj;
+                 }
+             }
+         }
+ 
+         return prj;
+     }
+ 
+     /**
+      * @return Projection with expiry policy for finished job updates.
+      */
+     private GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> finishedJobMetaCache() {
+         GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> prj = finishedJobMetaPrj;
+ 
+         if (prj == null) {
+             jobMetaCache();
+ 
+             prj = finishedJobMetaPrj;
+ 
+             assert prj != null;
+         }
+ 
+         return prj;
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("deprecation")
+     @Override public void onKernalStart() throws IgniteCheckedException {
+         super.onKernalStart();
+ 
+         jobMetaCache().context().continuousQueries().executeInternalQuery(
+             new CacheEntryUpdatedListener<HadoopJobId, HadoopJobMetadata>() {
+                 @Override public void onUpdated(final Iterable<CacheEntryEvent<? extends HadoopJobId,
+                     ? extends HadoopJobMetadata>> evts) {
+                     if (!busyLock.tryReadLock())
+                         return;
+ 
+                     try {
+                         // Must process query callback in a separate thread to avoid deadlocks.
+                         evtProcSvc.submit(new EventHandler() {
+                             @Override protected void body() throws IgniteCheckedException {
+                                 processJobMetadataUpdates(evts);
+                             }
+                         });
+                     }
+                     finally {
+                         busyLock.readUnlock();
+                     }
+                 }
+             },
+             null,
+             true,
+             true
+         );
+ 
+         ctx.kernalContext().event().addLocalEventListener(new GridLocalEventListener() {
+             @Override public void onEvent(final Event evt) {
+                 if (!busyLock.tryReadLock())
+                     return;
+ 
+                 try {
+                     // Must process discovery callback in a separate thread to avoid deadlock.
+                     evtProcSvc.submit(new EventHandler() {
+                         @Override protected void body() {
+                             processNodeLeft((DiscoveryEvent)evt);
+                         }
+                     });
+                 }
+                 finally {
+                     busyLock.readUnlock();
+                 }
+             }
+         }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void onKernalStop(boolean cancel) {
+         super.onKernalStop(cancel);
+ 
+         busyLock.writeLock();
+ 
+         evtProcSvc.shutdown();
+ 
+         // Fail all pending futures.
+         for (GridFutureAdapter<HadoopJobId> fut : activeFinishFuts.values())
+             fut.onDone(new IgniteCheckedException("Failed to execute Hadoop map-reduce job (grid is stopping)."));
+     }
+ 
+     /**
+      * Submits execution of Hadoop job to grid.
+      *
+      * @param jobId Job ID.
+      * @param info Job info.
+      * @return Job completion future.
+      */
+     @SuppressWarnings("unchecked")
+     public IgniteInternalFuture<HadoopJobId> submit(HadoopJobId jobId, HadoopJobInfo info) {
+         if (!busyLock.tryReadLock()) {
 -            return new GridFinishedFutureEx<>(new IgniteCheckedException("Failed to execute map-reduce job " +
++            return new GridFinishedFuture<>(new IgniteCheckedException("Failed to execute map-reduce job " +
+                 "(grid is stopping): " + info));
+         }
+ 
+         try {
+             long jobPrepare = U.currentTimeMillis();
+ 
+             if (jobs.containsKey(jobId) || jobMetaCache().containsKey(jobId))
+                 throw new IgniteCheckedException("Failed to submit job. Job with the same ID already exists: " + jobId);
+ 
+             HadoopJob job = job(jobId, info);
+ 
+             HadoopMapReducePlan mrPlan = mrPlanner.preparePlan(job, ctx.nodes(), null);
+ 
+             HadoopJobMetadata meta = new HadoopJobMetadata(ctx.localNodeId(), jobId, info);
+ 
+             meta.mapReducePlan(mrPlan);
+ 
+             meta.pendingSplits(allSplits(mrPlan));
+             meta.pendingReducers(allReducers(mrPlan));
+ 
+             GridFutureAdapter<HadoopJobId> completeFut = new GridFutureAdapter<>();
+ 
+             GridFutureAdapter<HadoopJobId> old = activeFinishFuts.put(jobId, completeFut);
+ 
+             assert old == null : "Duplicate completion future [jobId=" + jobId + ", old=" + old + ']';
+ 
+             if (log.isDebugEnabled())
+                 log.debug("Submitting job metadata [jobId=" + jobId + ", meta=" + meta + ']');
+ 
+             long jobStart = U.currentTimeMillis();
+ 
+             HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(meta.counters(),
+                 ctx.localNodeId());
+ 
+             perfCntr.clientSubmissionEvents(info);
+             perfCntr.onJobPrepare(jobPrepare);
+             perfCntr.onJobStart(jobStart);
+ 
+             if (jobMetaCache().putIfAbsent(jobId, meta) != null)
+                 throw new IgniteCheckedException("Failed to submit job. Job with the same ID already exists: " + jobId);
+ 
+             return completeFut;
+         }
+         catch (IgniteCheckedException e) {
+             U.error(log, "Failed to submit job: " + jobId, e);
+ 
 -            return new GridFinishedFutureEx<>(e);
++            return new GridFinishedFuture<>(e);
+         }
+         finally {
+             busyLock.readUnlock();
+         }
+     }
+ 
+     /**
+      * Convert Hadoop job metadata to job status.
+      *
+      * @param meta Metadata.
+      * @return Status.
+      */
+     @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+     public static HadoopJobStatus status(HadoopJobMetadata meta) {
+         HadoopJobInfo jobInfo = meta.jobInfo();
+ 
+         return new HadoopJobStatus(
+             meta.jobId(),
+             jobInfo.jobName(),
+             jobInfo.user(),
+             meta.pendingSplits() != null ? meta.pendingSplits().size() : 0,
+             meta.pendingReducers() != null ? meta.pendingReducers().size() : 0,
+             meta.mapReducePlan().mappers(),
+             meta.mapReducePlan().reducers(),
+             meta.phase(),
+             meta.failCause() != null,
+             meta.version()
+         );
+     }
+ 
+     /**
+      * Gets hadoop job status for given job ID.
+      *
+      * @param jobId Job ID to get status for.
+      * @return Job status for given job ID or {@code null} if job was not found.
+      */
+     @Nullable public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException {
+         if (!busyLock.tryReadLock())
+             return null; // Grid is stopping.
+ 
+         try {
+             HadoopJobMetadata meta = jobMetaCache().get(jobId);
+ 
+             return meta != null ? status(meta) : null;
+         }
+         finally {
+             busyLock.readUnlock();
+         }
+     }
+ 
+     /**
+      * Gets job finish future.
+      *
+      * @param jobId Job ID.
+      * @return Finish future or {@code null}.
+      * @throws IgniteCheckedException If failed.
+      */
+     @Nullable public IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException {
+         if (!busyLock.tryReadLock())
+             return null; // Grid is stopping.
+ 
+         try {
+             HadoopJobMetadata meta = jobMetaCache().get(jobId);
+ 
+             if (meta == null)
+                 return null;
+ 
+             if (log.isTraceEnabled())
+                 log.trace("Got job metadata for status check [locNodeId=" + ctx.localNodeId() + ", meta=" + meta + ']');
+ 
+             if (meta.phase() == PHASE_COMPLETE) {
+                 if (log.isTraceEnabled())
+                     log.trace("Job is complete, returning finished future: " + jobId);
+ 
 -                return new GridFinishedFutureEx<>(jobId, meta.failCause());
++                return new GridFinishedFuture<>(jobId);
+             }
+ 
+             GridFutureAdapter<HadoopJobId> fut = F.addIfAbsent(activeFinishFuts, jobId,
+                 new GridFutureAdapter<HadoopJobId>());
+ 
+             // Get meta from cache one more time to close the window.
+             meta = jobMetaCache().get(jobId);
+ 
+             if (log.isTraceEnabled())
+                 log.trace("Re-checking job metadata [locNodeId=" + ctx.localNodeId() + ", meta=" + meta + ']');
+ 
+             if (meta == null) {
+                 fut.onDone();
+ 
+                 activeFinishFuts.remove(jobId , fut);
+             }
+             else if (meta.phase() == PHASE_COMPLETE) {
+                 fut.onDone(jobId, meta.failCause());
+ 
+                 activeFinishFuts.remove(jobId , fut);
+             }
+ 
+             return fut;
+         }
+         finally {
+             busyLock.readUnlock();
+         }
+     }
+ 
+     /**
+      * Gets job plan by job ID.
+      *
+      * @param jobId Job ID.
+      * @return Job plan.
+      * @throws IgniteCheckedException If failed.
+      */
+     public HadoopMapReducePlan plan(HadoopJobId jobId) throws IgniteCheckedException {
+         if (!busyLock.tryReadLock())
+             return null;
+ 
+         try {
+             HadoopJobMetadata meta = jobMetaCache().get(jobId);
+ 
+             if (meta != null)
+                 return meta.mapReducePlan();
+ 
+             return null;
+         }
+         finally {
+             busyLock.readUnlock();
+         }
+     }
+ 
+     /**
+      * Callback from task executor invoked when a task has been finished.
+      *
+      * @param info Task info.
+      * @param status Task status.
+      */
+     @SuppressWarnings({"ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
+     public void onTaskFinished(HadoopTaskInfo info, HadoopTaskStatus status) {
+         if (!busyLock.tryReadLock())
+             return;
+ 
+         try {
+             assert status.state() != RUNNING;
+ 
+             if (log.isDebugEnabled())
+                 log.debug("Received task finished callback [info=" + info + ", status=" + status + ']');
+ 
+             JobLocalState state = activeJobs.get(info.jobId());
+ 
+             // Task CRASHes with null fail cause.
+             assert (status.state() != FAILED) || status.failCause() != null :
+                 "Invalid task status [info=" + info + ", status=" + status + ']';
+ 
+             assert state != null || (ctx.jobUpdateLeader() && (info.type() == COMMIT || info.type() == ABORT)):
+                 "Missing local state for finished task [info=" + info + ", status=" + status + ']';
+ 
+             StackedProcessor incrCntrs = null;
+ 
+             if (status.state() == COMPLETED)
+                 incrCntrs = new IncrementCountersProcessor(null, status.counters());
+ 
+             switch (info.type()) {
+                 case SETUP: {
+                     state.onSetupFinished(info, status, incrCntrs);
+ 
+                     break;
+                 }
+ 
+                 case MAP: {
+                     state.onMapFinished(info, status, incrCntrs);
+ 
+                     break;
+                 }
+ 
+                 case REDUCE: {
+                     state.onReduceFinished(info, status, incrCntrs);
+ 
+                     break;
+                 }
+ 
+                 case COMBINE: {
+                     state.onCombineFinished(info, status, incrCntrs);
+ 
+                     break;
+                 }
+ 
+                 case COMMIT:
+                 case ABORT: {
+                     GridCacheProjectionEx<HadoopJobId, HadoopJobMetadata> cache = finishedJobMetaCache();
+ 
+                     cache.invokeAsync(info.jobId(), new UpdatePhaseProcessor(incrCntrs, PHASE_COMPLETE)).
+                         listenAsync(failsLog);
+ 
+                     break;
+                 }
+             }
+         }
+         finally {
+             busyLock.readUnlock();
+         }
+     }
+ 
+     /**
+      * @param jobId Job id.
+      * @param c Closure of operation.
+      */
+     private void transform(HadoopJobId jobId, EntryProcessor<HadoopJobId, HadoopJobMetadata, Void> c) {
+         jobMetaCache().invokeAsync(jobId, c).listenAsync(failsLog);
+     }
+ 
+     /**
+      * Callback from task executor called when process is ready to received shuffle messages.
+      *
+      * @param jobId Job ID.
+      * @param reducers Reducers.
+      * @param desc Process descriptor.
+      */
+     public void onExternalMappersInitialized(HadoopJobId jobId, Collection<Integer> reducers,
+         HadoopProcessDescriptor desc) {
+         transform(jobId, new InitializeReducersProcessor(null, reducers, desc));
+     }
+ 
+     /**
+      * Gets all input splits for given hadoop map-reduce plan.
+      *
+      * @param plan Map-reduce plan.
+      * @return Collection of all input splits that should be processed.
+      */
+     @SuppressWarnings("ConstantConditions")
+     private Map<HadoopInputSplit, Integer> allSplits(HadoopMapReducePlan plan) {
+         Map<HadoopInputSplit, Integer> res = new HashMap<>();
+ 
+         int taskNum = 0;
+ 
+         for (UUID nodeId : plan.mapperNodeIds()) {
+             for (HadoopInputSplit split : plan.mappers(nodeId)) {
+                 if (res.put(split, taskNum++) != null)
+                     throw new IllegalStateException("Split duplicate.");
+             }
+         }
+ 
+         return res;
+     }
+ 
+     /**
+      * Gets all reducers for this job.
+      *
+      * @param plan Map-reduce plan.
+      * @return Collection of reducers.
+      */
+     private Collection<Integer> allReducers(HadoopMapReducePlan plan) {
+         Collection<Integer> res = new HashSet<>();
+ 
+         for (int i = 0; i < plan.reducers(); i++)
+             res.add(i);
+ 
+         return res;
+     }
+ 
+     /**
+      * Processes node leave (or fail) event.
+      *
+      * @param evt Discovery event.
+      */
+     @SuppressWarnings("ConstantConditions")
+     private void processNodeLeft(DiscoveryEvent evt) {
+         if (log.isDebugEnabled())
+             log.debug("Processing discovery event [locNodeId=" + ctx.localNodeId() + ", evt=" + evt + ']');
+ 
+         // Check only if this node is responsible for job status updates.
+         if (ctx.jobUpdateLeader()) {
+             boolean checkSetup = evt.eventNode().order() < ctx.localNodeOrder();
+ 
+             // Iteration over all local entries is correct since system cache is REPLICATED.
+             for (Object metaObj : jobMetaCache().values()) {
+                 HadoopJobMetadata meta = (HadoopJobMetadata)metaObj;
+ 
+                 HadoopJobId jobId = meta.jobId();
+ 
+                 HadoopMapReducePlan plan = meta.mapReducePlan();
+ 
+                 HadoopJobPhase phase = meta.phase();
+ 
+                 try {
+                     if (checkSetup && phase == PHASE_SETUP && !activeJobs.containsKey(jobId)) {
+                         // Failover setup task.
+                         HadoopJob job = job(jobId, meta.jobInfo());
+ 
+                         Collection<HadoopTaskInfo> setupTask = setupTask(jobId);
+ 
+                         assert setupTask != null;
+ 
+                         ctx.taskExecutor().run(job, setupTask);
+                     }
+                     else if (phase == PHASE_MAP || phase == PHASE_REDUCE) {
+                         // Must check all nodes, even that are not event node ID due to
+                         // multiple node failure possibility.
+                         Collection<HadoopInputSplit> cancelSplits = null;
+ 
+                         for (UUID nodeId : plan.mapperNodeIds()) {
+                             if (ctx.kernalContext().discovery().node(nodeId) == null) {
+                                 // Node has left the grid.
+                                 Collection<HadoopInputSplit> mappers = plan.mappers(nodeId);
+ 
+                                 if (cancelSplits == null)
+                                     cancelSplits = new HashSet<>();
+ 
+                                 cancelSplits.addAll(mappers);
+                             }
+                         }
+ 
+                         Collection<Integer> cancelReducers = null;
+ 
+                         for (UUID nodeId : plan.reducerNodeIds()) {
+                             if (ctx.kernalContext().discovery().node(nodeId) == null) {
+                                 // Node has left the grid.
+                                 int[] reducers = plan.reducers(nodeId);
+ 
+                                 if (cancelReducers == null)
+                                     cancelReducers = new HashSet<>();
+ 
+                                 for (int rdc : reducers)
+                                     cancelReducers.add(rdc);
+                             }
+                         }
+ 
+                         if (cancelSplits != null || cancelReducers != null)
+                             jobMetaCache().invoke(meta.jobId(), new CancelJobProcessor(null, new IgniteCheckedException(
+                                 "One or more nodes participating in map-reduce job execution failed."), cancelSplits,
+                                 cancelReducers));
+                     }
+                 }
+                 catch (IgniteCheckedException e) {
+                     U.error(log, "Failed to cancel job: " + meta, e);
+                 }
+             }
+         }
+     }
+ 
+     /**
+      * @param updated Updated cache entries.
+      * @throws IgniteCheckedException If failed.
+      */
+     private void processJobMetadataUpdates(
+         Iterable<CacheEntryEvent<? extends HadoopJobId, ? extends HadoopJobMetadata>> updated)
+         throws IgniteCheckedException {
+         UUID locNodeId = ctx.localNodeId();
+ 
+         for (CacheEntryEvent<? extends HadoopJobId, ? extends HadoopJobMetadata> entry : updated) {
+             HadoopJobId jobId = entry.getKey();
+             HadoopJobMetadata meta = entry.getValue();
+ 
+             if (meta == null || !ctx.isParticipating(meta))
+                 continue;
+ 
+             if (log.isDebugEnabled())
+                 log.debug("Processing job metadata update callback [locNodeId=" + locNodeId +
+                     ", meta=" + meta + ']');
+ 
+             try {
+                 ctx.taskExecutor().onJobStateChanged(meta);
+             }
+             catch (IgniteCheckedException e) {
+                 U.error(log, "Failed to process job state changed callback (will fail the job) " +
+                     "[locNodeId=" + locNodeId + ", jobId=" + jobId + ", meta=" + meta + ']', e);
+ 
+                 transform(jobId, new CancelJobProcessor(null, e));
+ 
+                 continue;
+             }
+ 
+             processJobMetaUpdate(jobId, meta, locNodeId);
+         }
+     }
+ 
+     /**
+      * @param jobId  Job ID.
+      * @param plan Map-reduce plan.
+      */
+     private void printPlan(HadoopJobId jobId, HadoopMapReducePlan plan) {
+         log.info("Plan for " + jobId);
+ 
+         SB b = new SB();
+ 
+         b.a("   Map: ");
+ 
+         for (UUID nodeId : plan.mapperNodeIds())
+             b.a(nodeId).a("=").a(plan.mappers(nodeId).size()).a(' ');
+ 
+         log.info(b.toString());
+ 
+         b = new SB();
+ 
+         b.a("   Reduce: ");
+ 
+         for (UUID nodeId : plan.reducerNodeIds())
+             b.a(nodeId).a("=").a(Arrays.toString(plan.reducers(nodeId))).a(' ');
+ 
+         log.info(b.toString());
+     }
+ 
+     /**
+      * @param jobId Job ID.
+      * @param meta Job metadata.
+      * @param locNodeId Local node ID.
+      * @throws IgniteCheckedException If failed.
+      */
+     private void processJobMetaUpdate(HadoopJobId jobId, HadoopJobMetadata meta, UUID locNodeId)
+         throws IgniteCheckedException {
+         JobLocalState state = activeJobs.get(jobId);
+ 
+         HadoopJob job = job(jobId, meta.jobInfo());
+ 
+         HadoopMapReducePlan plan = meta.mapReducePlan();
+ 
+         switch (meta.phase()) {
+             case PHASE_SETUP: {
+                 if (ctx.jobUpdateLeader()) {
+                     Collection<HadoopTaskInfo> setupTask = setupTask(jobId);
+ 
+                     if (setupTask != null)
+                         ctx.taskExecutor().run(job, setupTask);
+                 }
+ 
+                 break;
+             }
+ 
+             case PHASE_MAP: {
+                 // Check if we should initiate new task on local node.
+                 Collection<HadoopTaskInfo> tasks = mapperTasks(plan.mappers(locNodeId), meta);
+ 
+                 if (tasks != null)
+                     ctx.taskExecutor().run(job, tasks);
+ 
+                 break;
+             }
+ 
+             case PHASE_REDUCE: {
+                 if (meta.pendingReducers().isEmpty() && ctx.jobUpdateLeader()) {
+                     HadoopTaskInfo info = new HadoopTaskInfo(COMMIT, jobId, 0, 0, null);
+ 
+                     if (log.isDebugEnabled())
+                         log.debug("Submitting COMMIT task for execution [locNodeId=" + locNodeId +
+                                 ", jobId=" + jobId + ']');
+ 
+                     ctx.taskExecutor().run(job, Collections.singletonList(info));
+ 
+                     break;
+                 }
+ 
+                 Collection<HadoopTaskInfo> tasks = reducerTasks(plan.reducers(locNodeId), job);
+ 
+                 if (tasks != null)
+                     ctx.taskExecutor().run(job, tasks);
+ 
+                 break;
+             }
+ 
+             case PHASE_CANCELLING: {
+                 // Prevent multiple task executor notification.
+                 if (state != null && state.onCancel()) {
+                     if (log.isDebugEnabled())
+                         log.debug("Cancelling local task execution for job: " + meta);
+ 
+                     ctx.taskExecutor().cancelTasks(jobId);
+                 }
+ 
+                 if (meta.pendingSplits().isEmpty() && meta.pendingReducers().isEmpty()) {
+                     if (ctx.jobUpdateLeader()) {
+                         if (state == null)
+                             state = initState(jobId);
+ 
+                         // Prevent running multiple abort tasks.
+                         if (state.onAborted()) {
+                             HadoopTaskInfo info = new HadoopTaskInfo(ABORT, jobId, 0, 0, null);
+ 
+                             if (log.isDebugEnabled())
+                                 log.debug("Submitting ABORT task for execution [locNodeId=" + locNodeId +
+                                         ", jobId=" + jobId + ']');
+ 
+                             ctx.taskExecutor().run(job, Collections.singletonList(info));
+                         }
+                     }
+ 
+                     break;
+                 }
+                 else {
+                     // Check if there are unscheduled mappers or reducers.
+                     Collection<HadoopInputSplit> cancelMappers = new ArrayList<>();
+                     Collection<Integer> cancelReducers = new ArrayList<>();
+ 
+                     Collection<HadoopInputSplit> mappers = plan.mappers(ctx.localNodeId());
+ 
+                     if (mappers != null) {
+                         for (HadoopInputSplit b : mappers) {
+                             if (state == null || !state.mapperScheduled(b))
+                                 cancelMappers.add(b);
+                         }
+                     }
+ 
+                     int[] rdc = plan.reducers(ctx.localNodeId());
+ 
+                     if (rdc != null) {
+                         for (int r : rdc) {
+                             if (state == null || !state.reducerScheduled(r))
+                                 cancelReducers.add(r);
+                         }
+                     }
+ 
+                     if (!cancelMappers.isEmpty() || !cancelReducers.isEmpty())
+                         transform(jobId, new CancelJobProcessor(null, cancelMappers, cancelReducers));
+                 }
+ 
+                 break;
+             }
+ 
+             case PHASE_COMPLETE: {
+                 if (log.isDebugEnabled())
+                     log.debug("Job execution is complete, will remove local state from active jobs " +
+                         "[jobId=" + jobId + ", meta=" + meta + ']');
+ 
+                 if (state != null) {
+                     state = activeJobs.remove(jobId);
+ 
+                     assert state != null;
+ 
+                     ctx.shuffle().jobFinished(jobId);
+                 }
+ 
+                 GridFutureAdapter<HadoopJobId> finishFut = activeFinishFuts.remove(jobId);
+ 
+                 if (finishFut != null) {
+                     if (log.isDebugEnabled())
+                         log.debug("Completing job future [locNodeId=" + locNodeId + ", meta=" + meta + ']');
+ 
+                     finishFut.onDone(jobId, meta.failCause());
+                 }
+ 
+                 if (ctx.jobUpdateLeader())
+                     job.cleanupStagingDirectory();
+ 
+                 jobs.remove(jobId);
+ 
+                 job.dispose(false);
+ 
+                 if (ctx.jobUpdateLeader()) {
+                     ClassLoader ldr = job.getClass().getClassLoader();
+ 
+                     try {
+                         String statWriterClsName = job.info().property(HadoopUtils.JOB_COUNTER_WRITER_PROPERTY);
+ 
+                         if (statWriterClsName != null) {
+                             Class<?> cls = ldr.loadClass(statWriterClsName);
+ 
+                             HadoopCounterWriter writer = (HadoopCounterWriter)cls.newInstance();
+ 
+                             HadoopCounters cntrs = meta.counters();
+ 
+                             writer.write(job.info(), jobId, cntrs);
+                         }
+                     }
+                     catch (Exception e) {
+                         log.error("Can't write statistic due to: ", e);
+                     }
+                 }
+ 
+                 break;
+             }
+ 
+             default:
+                 throw new IllegalStateException("Unknown phase: " + meta.phase());
+         }
+     }
+ 
+     /**
+      * Creates setup task based on job information.
+      *
+      * @param jobId Job ID.
+      * @return Setup task wrapped in collection.
+      */
+     @Nullable private Collection<HadoopTaskInfo> setupTask(HadoopJobId jobId) {
+         if (activeJobs.containsKey(jobId))
+             return null;
+         else {
+             initState(jobId);
+ 
+             return Collections.singleton(new HadoopTaskInfo(SETUP, jobId, 0, 0, null));
+         }
+     }
+ 
+     /**
+      * Creates mapper tasks based on job information.
+      *
+      * @param mappers Mapper blocks.
+      * @param meta Job metadata.
+      * @return Collection of created task infos or {@code null} if no mapper tasks scheduled for local node.
+      */
+     private Collection<HadoopTaskInfo> mapperTasks(Iterable<HadoopInputSplit> mappers, HadoopJobMetadata meta) {
+         UUID locNodeId = ctx.localNodeId();
+         HadoopJobId jobId = meta.jobId();
+ 
+         JobLocalState state = activeJobs.get(jobId);
+ 
+         Collection<HadoopTaskInfo> tasks = null;
+ 
+         if (mappers != null) {
+             if (state == null)
+                 state = initState(jobId);
+ 
+             for (HadoopInputSplit split : mappers) {
+                 if (state.addMapper(split)) {
+                     if (log.isDebugEnabled())
+                         log.debug("Submitting MAP task for execution [locNodeId=" + locNodeId +
+                             ", split=" + split + ']');
+ 
+                     HadoopTaskInfo taskInfo = new HadoopTaskInfo(MAP, jobId, meta.taskNumber(split), 0, split);
+ 
+                     if (tasks == null)
+                         tasks = new ArrayList<>();
+ 
+                     tasks.add(taskInfo);
+                 }
+             }
+         }
+ 
+         return tasks;
+     }
+ 
+     /**
+      * Creates reducer tasks based on job information.
+      *
+      * @param reducers Reducers (may be {@code null}).
+      * @param job Job instance.
+      * @return Collection of task infos.
+      */
+     private Collection<HadoopTaskInfo> reducerTasks(int[] reducers, HadoopJob job) {
+         UUID locNodeId = ctx.localNodeId();
+         HadoopJobId jobId = job.id();
+ 
+         JobLocalState state = activeJobs.get(jobId);
+ 
+         Collection<HadoopTaskInfo> tasks = null;
+ 
+         if (reducers != null) {
+             if (state == null)
+                 state = initState(job.id());
+ 
+             for (int rdc : reducers) {
+                 if (state.addReducer(rdc)) {
+                     if (log.isDebugEnabled())
+                         log.debug("Submitting REDUCE task for execution [locNodeId=" + locNodeId +
+                             ", rdc=" + rdc + ']');
+ 
+                     HadoopTaskInfo taskInfo = new HadoopTaskInfo(REDUCE, jobId, rdc, 0, null);
+ 
+                     if (tasks == null)
+                         tasks = new ArrayList<>();
+ 
+                     tasks.add(taskInfo);
+                 }
+             }
+         }
+ 
+         return tasks;
+     }
+ 
+     /**
+      * Initializes local state for given job metadata.
+      *
+      * @param jobId Job ID.
+      * @return Local state.
+      */
+     private JobLocalState initState(HadoopJobId jobId) {
+         return F.addIfAbsent(activeJobs, jobId, new JobLocalState());
+     }
+ 
+     /**
+      * Gets or creates job instance.
+      *
+      * @param jobId Job ID.
+      * @param jobInfo Job info.
+      * @return Job.
+      * @throws IgniteCheckedException If failed.
+      */
+     @Nullable public HadoopJob job(HadoopJobId jobId, @Nullable HadoopJobInfo jobInfo) throws IgniteCheckedException {
 -        GridFutureAdapterEx<HadoopJob> fut = jobs.get(jobId);
++        GridFutureAdapter<HadoopJob> fut = jobs.get(jobId);
+ 
 -        if (fut != null || (fut = jobs.putIfAbsent(jobId, new GridFutureAdapterEx<HadoopJob>())) != null)
++        if (fut != null || (fut = jobs.putIfAbsent(jobId, new GridFutureAdapter<HadoopJob>())) != null)
+             return fut.get();
+ 
+         fut = jobs.get(jobId);
+ 
+         HadoopJob job = null;
+ 
+         try {
+             if (jobInfo == null) {
+                 HadoopJobMetadata meta = jobMetaCache().get(jobId);
+ 
+                 if (meta == null)
+                     throw new IgniteCheckedException("Failed to find job metadata for ID: " + jobId);
+ 
+                 jobInfo = meta.jobInfo();
+             }
+ 
+             job = jobInfo.createJob(jobId, log);
+ 
+             job.initialize(false, ctx.localNodeId());
+ 
+             fut.onDone(job);
+ 
+             return job;
+         }
+         catch (IgniteCheckedException e) {
+             fut.onDone(e);
+ 
+             jobs.remove(jobId, fut);
+ 
+             if (job != null) {
+                 try {
+                     job.dispose(false);
+                 }
+                 catch (IgniteCheckedException e0) {
+                     U.error(log, "Failed to dispose job: " + jobId, e0);
+                 }
+             }
+ 
+             throw e;
+         }
+     }
+ 
+     /**
+      * Kills job.
+      *
+      * @param jobId Job ID.
+      * @return {@code True} if job was killed.
+      * @throws IgniteCheckedException If failed.
+      */
+     public boolean killJob(HadoopJobId jobId) throws IgniteCheckedException {
+         if (!busyLock.tryReadLock())
+             return false; // Grid is stopping.
+ 
+         try {
+             HadoopJobMetadata meta = jobMetaCache().get(jobId);
+ 
+             if (meta != null && meta.phase() != PHASE_COMPLETE && meta.phase() != PHASE_CANCELLING) {
+                 HadoopTaskCancelledException err = new HadoopTaskCancelledException("Job cancelled.");
+ 
+                 jobMetaCache().invoke(jobId, new CancelJobProcessor(null, err));
+             }
+         }
+         finally {
+             busyLock.readUnlock();
+         }
+ 
+         IgniteInternalFuture<?> fut = finishFuture(jobId);
+ 
+         if (fut != null) {
+             try {
+                 fut.get();
+             }
+             catch (Throwable e) {
+                 if (e.getCause() instanceof HadoopTaskCancelledException)
+                     return true;
+             }
+         }
+ 
+         return false;
+     }
+ 
+     /**
+      * Returns job counters.
+      *
+      * @param jobId Job identifier.
+      * @return Job counters or {@code null} if job cannot be found.
+      * @throws IgniteCheckedException If failed.
+      */
+     @Nullable public HadoopCounters jobCounters(HadoopJobId jobId) throws IgniteCheckedException {
+         if (!busyLock.tryReadLock())
+             return null;
+ 
+         try {
+             final HadoopJobMetadata meta = jobMetaCache().get(jobId);
+ 
+             return meta != null ? meta.counters() : null;
+         }
+         finally {
+             busyLock.readUnlock();
+         }
+     }
+ 
+     /**
+      * Event handler protected by busy lock.
+      */
+     private abstract class EventHandler implements Runnable {
+         /** {@inheritDoc} */
+         @Override public void run() {
+             if (!busyLock.tryReadLock())
+                 return;
+ 
+             try {
+                 body();
+             }
+             catch (Throwable e) {
+                 U.error(log, "Unhandled exception while processing event.", e);
+             }
+             finally {
+                 busyLock.readUnlock();
+             }
+         }
+ 
+         /**
+          * Handler body.
+          */
+         protected abstract void body() throws Exception;
+     }
+ 
+     /**
+      *
+      */
+     private class JobLocalState {
+         /** Mappers. */
+         private final Collection<HadoopInputSplit> currMappers = new HashSet<>();
+ 
+         /** Reducers. */
+         private final Collection<Integer> currReducers = new HashSet<>();
+ 
+         /** Number of completed mappers. */
+         private final AtomicInteger completedMappersCnt = new AtomicInteger();
+ 
+         /** Cancelled flag. */
+         private boolean cancelled;
+ 
+         /** Aborted flag. */
+         private boolean aborted;
+ 
+         /**
+          * @param mapSplit Map split to add.
+          * @return {@code True} if mapper was added.
+          */
+         private boolean addMapper(HadoopInputSplit mapSplit) {
+             return currMappers.add(mapSplit);
+         }
+ 
+         /**
+          * @param rdc Reducer number to add.
+          * @return {@code True} if reducer was added.
+          */
+         private boolean addReducer(int rdc) {
+             return currReducers.add(rdc);
+         }
+ 
+         /**
+          * Checks whether this split was scheduled for given attempt.
+          *
+          * @param mapSplit Map split to check.
+          * @return {@code True} if mapper was scheduled.
+          */
+         public boolean mapperScheduled(HadoopInputSplit mapSplit) {
+             return currMappers.contains(mapSplit);
+         }
+ 
+         /**
+          * Checks whether this split was scheduled for given attempt.
+          *
+          * @param rdc Reducer number to check.
+          * @return {@code True} if reducer was scheduled.
+          */
+         public boolean reducerScheduled(int rdc) {
+             return currReducers.contains(rdc);
+         }
+ 
+         /**
+          * @param taskInfo Task info.
+          * @param status Task status.
+          * @param prev Previous closure.
+          */
+         private void onSetupFinished(final HadoopTaskInfo taskInfo, HadoopTaskStatus status, StackedProcessor prev) {
+             final HadoopJobId jobId = taskInfo.jobId();
+ 
+             if (status.state() == FAILED || status.state() == CRASHED)
+                 transform(jobId, new CancelJobProcessor(prev, status.failCause()));
+             else
+                 transform(jobId, new UpdatePhaseProcessor(prev, PHASE_MAP));
+         }
+ 
+         /**
+          * @param taskInfo Task info.
+          * @param status Task status.
+          * @param prev Previous closure.
+          */
+         private void onMapFinished(final HadoopTaskInfo taskInfo, HadoopTaskStatus status,
+             final StackedProcessor prev) {
+             final HadoopJobId jobId = taskInfo.jobId();
+ 
+             boolean lastMapperFinished = completedMappersCnt.incrementAndGet() == currMappers.size();
+ 
+             if (status.state() == FAILED || status.state() == CRASHED) {
+                 // Fail the whole job.
+                 transform(jobId, new RemoveMappersProcessor(prev, taskInfo.inputSplit(), status.failCause()));
+ 
+                 return;
+             }
+ 
+             IgniteInClosure<IgniteInternalFuture<?>> cacheUpdater = new CIX1<IgniteInternalFuture<?>>() {
+                 @Override public void applyx(IgniteInternalFuture<?> f) {
+                     Throwable err = null;
+ 
+                     if (f != null) {
+                         try {
+                             f.get();
+                         }
+                         catch (IgniteCheckedException e) {
+                             err = e;
+                         }
+                     }
+ 
+                     transform(jobId, new RemoveMappersProcessor(prev, taskInfo.inputSplit(), err));
+                 }
+             };
+ 
+             if (lastMapperFinished)
+                 ctx.shuffle().flush(jobId).listenAsync(cacheUpdater);
+             else
+                 cacheUpdater.apply(null);
+         }
+ 
+         /**
+          * @param taskInfo Task info.
+          * @param status Task status.
+          * @param prev Previous closure.
+          */
+         private void onReduceFinished(HadoopTaskInfo taskInfo, HadoopTaskStatus status, StackedProcessor prev) {
+             HadoopJobId jobId = taskInfo.jobId();
+             if (status.state() == FAILED || status.state() == CRASHED)
+                 // Fail the whole job.
+                 transform(jobId, new RemoveReducerProcessor(prev, taskInfo.taskNumber(), status.failCause()));
+             else
+                 transform(jobId, new RemoveReducerProcessor(prev, taskInfo.taskNumber()));
+         }
+ 
+         /**
+          * @param taskInfo Task info.
+          * @param status Task status.
+          * @param prev Previous closure.
+          */
+         private void onCombineFinished(HadoopTaskInfo taskInfo, HadoopTaskStatus status,
+             final StackedProcessor prev) {
+             final HadoopJobId jobId = taskInfo.jobId();
+ 
+             if (status.state() == FAILED || status.state() == CRASHED)
+                 // Fail the whole job.
+                 transform(jobId, new RemoveMappersProcessor(prev, currMappers, status.failCause()));
+             else {
+                 ctx.shuffle().flush(jobId).listenAsync(new CIX1<IgniteInternalFuture<?>>() {
+                     @Override public void applyx(IgniteInternalFuture<?> f) {
+                         Throwable err = null;
+ 
+                         if (f != null) {
+                             try {
+                                 f.get();
+                             }
+                             catch (IgniteCheckedException e) {
+                                 err = e;
+                             }
+                         }
+ 
+                         transform(jobId, new RemoveMappersProcessor(prev, currMappers, err));
+                     }
+                 });
+             }
+         }
+ 
+         /**
+          * @return {@code True} if job was cancelled by this (first) call.
+          */
+         public boolean onCancel() {
+             if (!cancelled && !aborted) {
+                 cancelled = true;
+ 
+                 return true;
+             }
+ 
+             return false;
+         }
+ 
+         /**
+          * @return {@code True} if job was aborted this (first) call.
+          */
+         public boolean onAborted() {
+             if (!aborted) {
+                 aborted = true;
+ 
+                 return true;
+             }
+ 
+             return false;
+         }
+     }
+ 
+     /**
+      * Update job phase transform closure.
+      */
+     private static class UpdatePhaseProcessor extends StackedProcessor {
+         /** */
+         private static final long serialVersionUID = 0L;
+ 
+         /** Phase to update. */
+         private final HadoopJobPhase phase;
+ 
+         /**
+          * @param prev Previous closure.
+          * @param phase Phase to update.
+          */
+         private UpdatePhaseProcessor(@Nullable StackedProcessor prev, HadoopJobPhase phase) {
+             super(prev);
+ 
+             this.phase = phase;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
+             cp.phase(phase);
+         }
+     }
+ 
+     /**
+      * Remove mapper transform closure.
+      */
+     private static class RemoveMappersProcessor extends StackedProcessor {
+         /** */
+         private static final long serialVersionUID = 0L;
+ 
+         /** Mapper split to remove. */
+         private final Collection<HadoopInputSplit> splits;
+ 
+         /** Error. */
+         private final Throwable err;
+ 
+         /**
+          * @param prev Previous closure.
+          * @param split Mapper split to remove.
+          * @param err Error.
+          */
+         private RemoveMappersProcessor(@Nullable StackedProcessor prev, HadoopInputSplit split, Throwable err) {
+             this(prev, Collections.singletonList(split), err);
+         }
+ 
+         /**
+          * @param prev Previous closure.
+          * @param splits Mapper splits to remove.
+          * @param err Error.
+          */
+         private RemoveMappersProcessor(@Nullable StackedProcessor prev, Collection<HadoopInputSplit> splits,
+             Throwable err) {
+             super(prev);
+ 
+             this.splits = splits;
+             this.err = err;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
+             Map<HadoopInputSplit, Integer> splitsCp = new HashMap<>(cp.pendingSplits());
+ 
+             for (HadoopInputSplit s : splits)
+                 splitsCp.remove(s);
+ 
+             cp.pendingSplits(splitsCp);
+ 
+             if (cp.phase() != PHASE_CANCELLING && err != null)
+                 cp.failCause(err);
+ 
+             if (err != null)
+                 cp.phase(PHASE_CANCELLING);
+ 
+             if (splitsCp.isEmpty()) {
+                 if (cp.phase() != PHASE_CANCELLING)
+                     cp.phase(PHASE_REDUCE);
+             }
+         }
+     }
+ 
+     /**
+      * Remove reducer transform closure.
+      */
+     private static class RemoveReducerProcessor extends StackedProcessor {
+         /** */
+         private static final long serialVersionUID = 0L;
+ 
+         /** Mapper split to remove. */
+         private final int rdc;
+ 
+         /** Error. */
+         private Throwable err;
+ 
+         /**
+          * @param prev Previous closure.
+          * @param rdc Reducer to remove.
+          */
+         private RemoveReducerProcessor(@Nullable StackedProcessor prev, int rdc) {
+             super(prev);
+ 
+             this.rdc = rdc;
+         }
+ 
+         /**
+          * @param prev Previous closure.
+          * @param rdc Reducer to remove.
+          * @param err Error.
+          */
+         private RemoveReducerProcessor(@Nullable StackedProcessor prev, int rdc, Throwable err) {
+             super(prev);
+ 
+             this.rdc = rdc;
+             this.err = err;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
+             Collection<Integer> rdcCp = new HashSet<>(cp.pendingReducers());
+ 
+             rdcCp.remove(rdc);
+ 
+             cp.pendingReducers(rdcCp);
+ 
+             if (err != null) {
+                 cp.phase(PHASE_CANCELLING);
+                 cp.failCause(err);
+             }
+         }
+     }
+ 
+     /**
+      * Initialize reducers.
+      */
+     private static class InitializeReducersProcessor extends StackedProcessor {
+         /** */
+         private static final long serialVersionUID = 0L;
+ 
+         /** Reducers. */
+         private final Collection<Integer> rdc;
+ 
+         /** Process descriptor for reducers. */
+         private final HadoopProcessDescriptor desc;
+ 
+         /**
+          * @param prev Previous closure.
+          * @param rdc Reducers to initialize.
+          * @param desc External process descriptor.
+          */
+         private InitializeReducersProcessor(@Nullable StackedProcessor prev,
+             Collection<Integer> rdc,
+             HadoopProcessDescriptor desc) {
+             super(prev);
+ 
+             assert !F.isEmpty(rdc);
+             assert desc != null;
+ 
+             this.rdc = rdc;
+             this.desc = desc;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
+             Map<Integer, HadoopProcessDescriptor> oldMap = meta.reducersAddresses();
+ 
+             Map<Integer, HadoopProcessDescriptor> rdcMap = oldMap == null ?
+                 new HashMap<Integer, HadoopProcessDescriptor>() : new HashMap<>(oldMap);
+ 
+             for (Integer r : rdc)
+                 rdcMap.put(r, desc);
+ 
+             cp.reducersAddresses(rdcMap);
+         }
+     }
+ 
+     /**
+      * Remove reducer transform closure.
+      */
+     private static class CancelJobProcessor extends StackedProcessor {
+         /** */
+         private static final long serialVersionUID = 0L;
+ 
+         /** Mapper split to remove. */
+         private final Collection<HadoopInputSplit> splits;
+ 
+         /** Reducers to remove. */
+         private final Collection<Integer> rdc;
+ 
+         /** Error. */
+         private final Throwable err;
+ 
+         /**
+          * @param prev Previous closure.
+          * @param err Fail cause.
+          */
+         private CancelJobProcessor(@Nullable StackedProcessor prev, Throwable err) {
+             this(prev, err, null, null);
+         }
+ 
+         /**
+          * @param prev Previous closure.
+          * @param splits Splits to remove.
+          * @param rdc Reducers to remove.
+          */
+         private CancelJobProcessor(@Nullable StackedProcessor prev,
+             Collection<HadoopInputSplit> splits,
+             Collection<Integer> rdc) {
+             this(prev, null, splits, rdc);
+         }
+ 
+         /**
+          * @param prev Previous closure.
+          * @param err Error.
+          * @param splits Splits to remove.
+          * @param rdc Reducers to remove.
+          */
+         private CancelJobProcessor(@Nullable StackedProcessor prev,
+             Throwable err,
+             Collection<HadoopInputSplit> splits,
+             Collection<Integer> rdc) {
+             super(prev);
+ 
+             this.splits = splits;
+             this.rdc = rdc;
+             this.err = err;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
+             assert meta.phase() == PHASE_CANCELLING || err != null: "Invalid phase for cancel: " + meta;
+ 
+             Collection<Integer> rdcCp = new HashSet<>(cp.pendingReducers());
+ 
+             if (rdc != null)
+                 rdcCp.removeAll(rdc);
+ 
+             cp.pendingReducers(rdcCp);
+ 
+             Map<HadoopInputSplit, Integer> splitsCp = new HashMap<>(cp.pendingSplits());
+ 
+             if (splits != null) {
+                 for (HadoopInputSplit s : splits)
+                     splitsCp.remove(s);
+             }
+ 
+             cp.pendingSplits(splitsCp);
+ 
+             cp.phase(PHASE_CANCELLING);
+ 
+             if (err != null)
+                 cp.failCause(err);
+         }
+     }
+ 
+     /**
+      * Increment counter values closure.
+      */
+     private static class IncrementCountersProcessor extends StackedProcessor {
+         /** */
+         private static final long serialVersionUID = 0L;
+ 
+         /** */
+         private final HadoopCounters counters;
+ 
+         /**
+          * @param prev Previous closure.
+          * @param counters Task counters to add into job counters.
+          */
+         private IncrementCountersProcessor(@Nullable StackedProcessor prev, HadoopCounters counters) {
+             super(prev);
+ 
+             assert counters != null;
+ 
+             this.counters = counters;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
+             HadoopCounters cntrs = new HadoopCountersImpl(cp.counters());
+ 
+             cntrs.merge(counters);
+ 
+             cp.counters(cntrs);
+         }
+     }
+ 
+     /**
+      * Abstract stacked closure.
+      */
+     private abstract static class StackedProcessor implements
+         EntryProcessor<HadoopJobId, HadoopJobMetadata, Void>, Serializable {
+         /** */
+         private static final long serialVersionUID = 0L;
+ 
+         /** */
+         private final StackedProcessor prev;
+ 
+         /**
+          * @param prev Previous closure.
+          */
+         private StackedProcessor(@Nullable StackedProcessor prev) {
+             this.prev = prev;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public Void process(MutableEntry<HadoopJobId, HadoopJobMetadata> e, Object... args) {
+             HadoopJobMetadata val = apply(e.getValue());
+ 
+             if (val != null)
+                 e.setValue(val);
+             else
+                 e.remove();;
+ 
+             return null;
+         }
+ 
+         /**
+          * @param meta Old value.
+          * @return New value.
+          */
+         private HadoopJobMetadata apply(HadoopJobMetadata meta) {
+             if (meta == null)
+                 return null;
+ 
+             HadoopJobMetadata cp = prev != null ? prev.apply(meta) : new HadoopJobMetadata(meta);
+ 
+             update(meta, cp);
+ 
+             return cp;
+         }
+ 
+         /**
+          * Update given job metadata object.
+          *
+          * @param meta Initial job metadata.
+          * @param cp Copy.
+          */
+         protected abstract void update(HadoopJobMetadata meta, HadoopJobMetadata cp);
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17ac3602/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
----------------------------------------------------------------------
diff --cc modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
index 0000000,422d941..d173927
mode 000000,100644..100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
@@@ -1,0 -1,256 +1,256 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.hadoop.shuffle;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.hadoop.*;
+ import org.apache.ignite.internal.processors.hadoop.message.*;
+ import org.apache.ignite.internal.util.future.*;
+ import org.apache.ignite.internal.util.lang.*;
+ import org.apache.ignite.internal.util.offheap.unsafe.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.lang.*;
+ 
+ import java.util.*;
+ import java.util.concurrent.*;
+ 
+ /**
+  * Shuffle.
+  */
+ public class HadoopShuffle extends HadoopComponent {
+     /** */
+     private final ConcurrentMap<HadoopJobId, HadoopShuffleJob<UUID>> jobs = new ConcurrentHashMap<>();
+ 
+     /** */
+     protected final GridUnsafeMemory mem = new GridUnsafeMemory(0);
+ 
+     /** {@inheritDoc} */
+     @Override public void start(HadoopContext ctx) throws IgniteCheckedException {
+         super.start(ctx);
+ 
+         ctx.kernalContext().io().addUserMessageListener(GridTopic.TOPIC_HADOOP,
+             new IgniteBiPredicate<UUID, Object>() {
+                 @Override public boolean apply(UUID nodeId, Object msg) {
+                     return onMessageReceived(nodeId, (HadoopMessage)msg);
+                 }
+             });
+     }
+ 
+     /**
+      * Stops shuffle.
+      *
+      * @param cancel If should cancel all ongoing activities.
+      */
+     @Override public void stop(boolean cancel) {
+         for (HadoopShuffleJob job : jobs.values()) {
+             try {
+                 job.close();
+             }
+             catch (IgniteCheckedException e) {
+                 U.error(log, "Failed to close job.", e);
+             }
+         }
+ 
+         jobs.clear();
+     }
+ 
+     /**
+      * Creates new shuffle job.
+      *
+      * @param jobId Job ID.
+      * @return Created shuffle job.
+      * @throws IgniteCheckedException If job creation failed.
+      */
+     private HadoopShuffleJob<UUID> newJob(HadoopJobId jobId) throws IgniteCheckedException {
+         HadoopMapReducePlan plan = ctx.jobTracker().plan(jobId);
+ 
+         HadoopShuffleJob<UUID> job = new HadoopShuffleJob<>(ctx.localNodeId(), log,
+             ctx.jobTracker().job(jobId, null), mem, plan.reducers(), plan.reducers(ctx.localNodeId()));
+ 
+         UUID[] rdcAddrs = new UUID[plan.reducers()];
+ 
+         for (int i = 0; i < rdcAddrs.length; i++) {
+             UUID nodeId = plan.nodeForReducer(i);
+ 
+             assert nodeId != null : "Plan is missing node for reducer [plan=" + plan + ", rdc=" + i + ']';
+ 
+             rdcAddrs[i] = nodeId;
+         }
+ 
+         boolean init = job.initializeReduceAddresses(rdcAddrs);
+ 
+         assert init;
+ 
+         return job;
+     }
+ 
+     /**
+      * @param nodeId Node ID to send message to.
+      * @param msg Message to send.
+      * @throws IgniteCheckedException If send failed.
+      */
+     private void send0(UUID nodeId, Object msg) throws IgniteCheckedException {
+         ClusterNode node = ctx.kernalContext().discovery().node(nodeId);
+ 
+         ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0);
+     }
+ 
+     /**
+      * @param jobId Task info.
+      * @return Shuffle job.
+      */
+     private HadoopShuffleJob<UUID> job(HadoopJobId jobId) throws IgniteCheckedException {
+         HadoopShuffleJob<UUID> res = jobs.get(jobId);
+ 
+         if (res == null) {
+             res = newJob(jobId);
+ 
+             HadoopShuffleJob<UUID> old = jobs.putIfAbsent(jobId, res);
+ 
+             if (old != null) {
+                 res.close();
+ 
+                 res = old;
+             }
+             else if (res.reducersInitialized())
+                 startSending(res);
+         }
+ 
+         return res;
+     }
+ 
+     /**
+      * Starts message sending thread.
+      *
+      * @param shuffleJob Job to start sending for.
+      */
+     private void startSending(HadoopShuffleJob<UUID> shuffleJob) {
+         shuffleJob.startSending(ctx.kernalContext().gridName(),
+             new IgniteInClosure2X<UUID, HadoopShuffleMessage>() {
+                 @Override public void applyx(UUID dest, HadoopShuffleMessage msg) throws IgniteCheckedException {
+                     send0(dest, msg);
+                 }
+             }
+         );
+     }
+ 
+     /**
+      * Message received callback.
+      *
+      * @param src Sender node ID.
+      * @param msg Received message.
+      * @return {@code True}.
+      */
+     public boolean onMessageReceived(UUID src, HadoopMessage msg) {
+         if (msg instanceof HadoopShuffleMessage) {
+             HadoopShuffleMessage m = (HadoopShuffleMessage)msg;
+ 
+             try {
+                 job(m.jobId()).onShuffleMessage(m);
+             }
+             catch (IgniteCheckedException e) {
+                 U.error(log, "Message handling failed.", e);
+             }
+ 
+             try {
+                 // Reply with ack.
+                 send0(src, new HadoopShuffleAck(m.id(), m.jobId()));
+             }
+             catch (IgniteCheckedException e) {
+                 U.error(log, "Failed to reply back to shuffle message sender [snd=" + src + ", msg=" + msg + ']', e);
+             }
+         }
+         else if (msg instanceof HadoopShuffleAck) {
+             HadoopShuffleAck m = (HadoopShuffleAck)msg;
+ 
+             try {
+                 job(m.jobId()).onShuffleAck(m);
+             }
+             catch (IgniteCheckedException e) {
+                 U.error(log, "Message handling failed.", e);
+             }
+         }
+         else
+             throw new IllegalStateException("Unknown message type received to Hadoop shuffle [src=" + src +
+                 ", msg=" + msg + ']');
+ 
+         return true;
+     }
+ 
+     /**
+      * @param taskCtx Task info.
+      * @return Output.
+      */
+     public HadoopTaskOutput output(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+         return job(taskCtx.taskInfo().jobId()).output(taskCtx);
+     }
+ 
+     /**
+      * @param taskCtx Task info.
+      * @return Input.
+      */
+     public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+         return job(taskCtx.taskInfo().jobId()).input(taskCtx);
+     }
+ 
+     /**
+      * @param jobId Job id.
+      */
+     public void jobFinished(HadoopJobId jobId) {
+         HadoopShuffleJob job = jobs.remove(jobId);
+ 
+         if (job != null) {
+             try {
+                 job.close();
+             }
+             catch (IgniteCheckedException e) {
+                 U.error(log, "Failed to close job: " + jobId, e);
+             }
+         }
+     }
+ 
+     /**
+      * Flushes all the outputs for the given job to remote nodes.
+      *
+      * @param jobId Job ID.
+      * @return Future.
+      */
+     public IgniteInternalFuture<?> flush(HadoopJobId jobId) {
+         HadoopShuffleJob job = jobs.get(jobId);
+ 
+         if (job == null)
 -            return new GridFinishedFutureEx<>();
++            return new GridFinishedFuture<>();
+ 
+         try {
+             return job.flush();
+         }
+         catch (IgniteCheckedException e) {
 -            return new GridFinishedFutureEx<>(e);
++            return new GridFinishedFuture<>(e);
+         }
+     }
+ 
+     /**
+      * @return Memory.
+      */
+     public GridUnsafeMemory memory() {
+         return mem;
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17ac3602/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --cc modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
index 0000000,7ae52df..1f92c66
mode 000000,100644..100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
@@@ -1,0 -1,593 +1,593 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.hadoop.shuffle;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.hadoop.*;
+ import org.apache.ignite.internal.processors.hadoop.counter.*;
+ import org.apache.ignite.internal.processors.hadoop.shuffle.collections.*;
+ import org.apache.ignite.internal.util.future.*;
+ import org.apache.ignite.internal.util.io.*;
+ import org.apache.ignite.internal.util.lang.*;
+ import org.apache.ignite.internal.util.offheap.unsafe.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.internal.util.worker.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.thread.*;
+ 
+ import java.util.*;
+ import java.util.concurrent.*;
+ import java.util.concurrent.atomic.*;
+ 
+ import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.*;
+ import static org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*;
+ 
+ /**
+  * Shuffle job.
+  */
+ public class HadoopShuffleJob<T> implements AutoCloseable {
+     /** */
+     private static final int MSG_BUF_SIZE = 128 * 1024;
+ 
+     /** */
+     private final HadoopJob job;
+ 
+     /** */
+     private final GridUnsafeMemory mem;
+ 
+     /** */
+     private final boolean needPartitioner;
+ 
+     /** Collection of task contexts for each reduce task. */
+     private final Map<Integer, HadoopTaskContext> reducersCtx = new HashMap<>();
+ 
+     /** Reducers addresses. */
+     private T[] reduceAddrs;
+ 
+     /** Local reducers address. */
+     private final T locReduceAddr;
+ 
+     /** */
+     private final HadoopShuffleMessage[] msgs;
+ 
+     /** */
+     private final AtomicReferenceArray<HadoopMultimap> maps;
+ 
+     /** */
+     private volatile IgniteInClosure2X<T, HadoopShuffleMessage> io;
+ 
+     /** */
 -    protected ConcurrentMap<Long, IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapterEx<?>>> sentMsgs =
++    protected ConcurrentMap<Long, IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>>> sentMsgs =
+         new ConcurrentHashMap<>();
+ 
+     /** */
+     private volatile GridWorker snd;
+ 
+     /** Latch for remote addresses waiting. */
+     private final CountDownLatch ioInitLatch = new CountDownLatch(1);
+ 
+     /** Finished flag. Set on flush or close. */
+     private volatile boolean flushed;
+ 
+     /** */
+     private final IgniteLogger log;
+ 
+     /**
+      * @param locReduceAddr Local reducer address.
+      * @param log Logger.
+      * @param job Job.
+      * @param mem Memory.
+      * @param totalReducerCnt Amount of reducers in the Job.
+      * @param locReducers Reducers will work on current node.
+      * @throws IgniteCheckedException If error.
+      */
+     public HadoopShuffleJob(T locReduceAddr, IgniteLogger log, HadoopJob job, GridUnsafeMemory mem,
+         int totalReducerCnt, int[] locReducers) throws IgniteCheckedException {
+         this.locReduceAddr = locReduceAddr;
+         this.job = job;
+         this.mem = mem;
+         this.log = log.getLogger(HadoopShuffleJob.class);
+ 
+         if (!F.isEmpty(locReducers)) {
+             for (int rdc : locReducers) {
+                 HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.REDUCE, job.id(), rdc, 0, null);
+ 
+                 reducersCtx.put(rdc, job.getTaskContext(taskInfo));
+             }
+         }
+ 
+         needPartitioner = totalReducerCnt > 1;
+ 
+         maps = new AtomicReferenceArray<>(totalReducerCnt);
+         msgs = new HadoopShuffleMessage[totalReducerCnt];
+     }
+ 
+     /**
+      * @param reduceAddrs Addresses of reducers.
+      * @return {@code True} if addresses were initialized by this call.
+      */
+     public boolean initializeReduceAddresses(T[] reduceAddrs) {
+         if (this.reduceAddrs == null) {
+             this.reduceAddrs = reduceAddrs;
+ 
+             return true;
+         }
+ 
+         return false;
+     }
+ 
+     /**
+      * @return {@code True} if reducers addresses were initialized.
+      */
+     public boolean reducersInitialized() {
+         return reduceAddrs != null;
+     }
+ 
+     /**
+      * @param gridName Grid name.
+      * @param io IO Closure for sending messages.
+      */
+     @SuppressWarnings("BusyWait")
+     public void startSending(String gridName, IgniteInClosure2X<T, HadoopShuffleMessage> io) {
+         assert snd == null;
+         assert io != null;
+ 
+         this.io = io;
+ 
+         if (!flushed) {
+             snd = new GridWorker(gridName, "hadoop-shuffle-" + job.id(), log) {
+                 @Override protected void body() throws InterruptedException {
+                     try {
+                         while (!isCancelled()) {
+                             Thread.sleep(5);
+ 
+                             collectUpdatesAndSend(false);
+                         }
+                     }
+                     catch (IgniteCheckedException e) {
+                         throw new IllegalStateException(e);
+                     }
+                 }
+             };
+ 
+             new IgniteThread(snd).start();
+         }
+ 
+         ioInitLatch.countDown();
+     }
+ 
+     /**
+      * @param maps Maps.
+      * @param idx Index.
+      * @return Map.
+      */
+     private HadoopMultimap getOrCreateMap(AtomicReferenceArray<HadoopMultimap> maps, int idx) {
+         HadoopMultimap map = maps.get(idx);
+ 
+         if (map == null) { // Create new map.
+             map = get(job.info(), SHUFFLE_REDUCER_NO_SORTING, false) ?
+                 new HadoopConcurrentHashMultimap(job.info(), mem, get(job.info(), PARTITION_HASHMAP_SIZE, 8 * 1024)):
+                 new HadoopSkipList(job.info(), mem);
+ 
+             if (!maps.compareAndSet(idx, null, map)) {
+                 map.close();
+ 
+                 return maps.get(idx);
+             }
+         }
+ 
+         return map;
+     }
+ 
+     /**
+      * @param msg Message.
+      * @throws IgniteCheckedException Exception.
+      */
+     public void onShuffleMessage(HadoopShuffleMessage msg) throws IgniteCheckedException {
+         assert msg.buffer() != null;
+         assert msg.offset() > 0;
+ 
+         HadoopTaskContext taskCtx = reducersCtx.get(msg.reducer());
+ 
+         HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(taskCtx.counters(), null);
+ 
+         perfCntr.onShuffleMessage(msg.reducer(), U.currentTimeMillis());
+ 
+         HadoopMultimap map = getOrCreateMap(maps, msg.reducer());
+ 
+         // Add data from message to the map.
+         try (HadoopMultimap.Adder adder = map.startAdding(taskCtx)) {
+             final GridUnsafeDataInput dataInput = new GridUnsafeDataInput();
+             final UnsafeValue val = new UnsafeValue(msg.buffer());
+ 
+             msg.visit(new HadoopShuffleMessage.Visitor() {
+                 /** */
+                 private HadoopMultimap.Key key;
+ 
+                 @Override public void onKey(byte[] buf, int off, int len) throws IgniteCheckedException {
+                     dataInput.bytes(buf, off, off + len);
+ 
+                     key = adder.addKey(dataInput, key);
+                 }
+ 
+                 @Override public void onValue(byte[] buf, int off, int len) {
+                     val.off = off;
+                     val.size = len;
+ 
+                     key.add(val);
+                 }
+             });
+         }
+     }
+ 
+     /**
+      * @param ack Shuffle ack.
+      */
+     @SuppressWarnings("ConstantConditions")
+     public void onShuffleAck(HadoopShuffleAck ack) {
 -        IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapterEx<?>> tup = sentMsgs.get(ack.id());
++        IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>> tup = sentMsgs.get(ack.id());
+ 
+         if (tup != null)
+             tup.get2().onDone();
+         else
+             log.warning("Received shuffle ack for not registered shuffle id: " + ack);
+     }
+ 
+     /**
+      * Unsafe value.
+      */
+     private static class UnsafeValue implements HadoopMultimap.Value {
+         /** */
+         private final byte[] buf;
+ 
+         /** */
+         private int off;
+ 
+         /** */
+         private int size;
+ 
+         /**
+          * @param buf Buffer.
+          */
+         private UnsafeValue(byte[] buf) {
+             assert buf != null;
+ 
+             this.buf = buf;
+         }
+ 
+         /** */
+         @Override public int size() {
+             return size;
+         }
+ 
+         /** */
+         @Override public void copyTo(long ptr) {
+             UNSAFE.copyMemory(buf, BYTE_ARR_OFF + off, null, ptr, size);
+         }
+     }
+ 
+     /**
+      * Sends map updates to remote reducers.
+      */
+     private void collectUpdatesAndSend(boolean flush) throws IgniteCheckedException {
+         for (int i = 0; i < maps.length(); i++) {
+             HadoopMultimap map = maps.get(i);
+ 
+             if (map == null || locReduceAddr.equals(reduceAddrs[i]))
+                 continue; // Skip empty map and local node.
+ 
+             if (msgs[i] == null)
+                 msgs[i] = new HadoopShuffleMessage(job.id(), i, MSG_BUF_SIZE);
+ 
+             final int idx = i;
+ 
+             map.visit(false, new HadoopMultimap.Visitor() {
+                 /** */
+                 private long keyPtr;
+ 
+                 /** */
+                 private int keySize;
+ 
+                 /** */
+                 private boolean keyAdded;
+ 
+                 /** {@inheritDoc} */
+                 @Override public void onKey(long keyPtr, int keySize) {
+                     this.keyPtr = keyPtr;
+                     this.keySize = keySize;
+ 
+                     keyAdded = false;
+                 }
+ 
+                 private boolean tryAdd(long valPtr, int valSize) {
+                     HadoopShuffleMessage msg = msgs[idx];
+ 
+                     if (!keyAdded) { // Add key and value.
+                         int size = keySize + valSize;
+ 
+                         if (!msg.available(size, false))
+                             return false;
+ 
+                         msg.addKey(keyPtr, keySize);
+                         msg.addValue(valPtr, valSize);
+ 
+                         keyAdded = true;
+ 
+                         return true;
+                     }
+ 
+                     if (!msg.available(valSize, true))
+                         return false;
+ 
+                     msg.addValue(valPtr, valSize);
+ 
+                     return true;
+                 }
+ 
+                 /** {@inheritDoc} */
+                 @Override public void onValue(long valPtr, int valSize) {
+                     if (tryAdd(valPtr, valSize))
+                         return;
+ 
+                     send(idx, keySize + valSize);
+ 
+                     keyAdded = false;
+ 
+                     if (!tryAdd(valPtr, valSize))
+                         throw new IllegalStateException();
+                 }
+             });
+ 
+             if (flush && msgs[i].offset() != 0)
+                 send(i, 0);
+         }
+     }
+ 
+     /**
+      * @param idx Index of message.
+      * @param newBufMinSize Min new buffer size.
+      */
+     private void send(final int idx, int newBufMinSize) {
 -        final GridFutureAdapterEx<?> fut = new GridFutureAdapterEx<>();
++        final GridFutureAdapter<?> fut = new GridFutureAdapter<>();
+ 
+         HadoopShuffleMessage msg = msgs[idx];
+ 
+         final long msgId = msg.id();
+ 
 -        IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapterEx<?>> old = sentMsgs.putIfAbsent(msgId,
 -            new IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapterEx<?>>(msg, fut));
++        IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>> old = sentMsgs.putIfAbsent(msgId,
++            new IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>>(msg, fut));
+ 
+         assert old == null;
+ 
+         try {
+             io.apply(reduceAddrs[idx], msg);
+         }
+         catch (GridClosureException e) {
+             fut.onDone(U.unwrap(e));
+         }
+ 
+         fut.listenAsync(new IgniteInClosure<IgniteInternalFuture<?>>() {
+             @Override public void apply(IgniteInternalFuture<?> f) {
+                 try {
+                     f.get();
+ 
+                     // Clean up the future from map only if there was no exception.
+                     // Otherwise flush() should fail.
+                     sentMsgs.remove(msgId);
+                 }
+                 catch (IgniteCheckedException e) {
+                     log.error("Failed to send message.", e);
+                 }
+             }
+         });
+ 
+         msgs[idx] = newBufMinSize == 0 ? null : new HadoopShuffleMessage(job.id(), idx,
+             Math.max(MSG_BUF_SIZE, newBufMinSize));
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void close() throws IgniteCheckedException {
+         if (snd != null) {
+             snd.cancel();
+ 
+             try {
+                 snd.join();
+             }
+             catch (InterruptedException e) {
+                 throw new IgniteInterruptedCheckedException(e);
+             }
+         }
+ 
+         close(maps);
+     }
+ 
+     /**
+      * @param maps Maps.
+      */
+     private void close(AtomicReferenceArray<HadoopMultimap> maps) {
+         for (int i = 0; i < maps.length(); i++) {
+             HadoopMultimap map = maps.get(i);
+ 
+             if (map != null)
+                 map.close();
+         }
+     }
+ 
+     /**
+      * @return Future.
+      */
+     @SuppressWarnings("unchecked")
+     public IgniteInternalFuture<?> flush() throws IgniteCheckedException {
+         if (log.isDebugEnabled())
+             log.debug("Flushing job " + job.id() + " on address " + locReduceAddr);
+ 
+         flushed = true;
+ 
+         if (maps.length() == 0)
 -            return new GridFinishedFutureEx<>();
++            return new GridFinishedFuture<>();
+ 
+         U.await(ioInitLatch);
+ 
+         GridWorker snd0 = snd;
+ 
+         if (snd0 != null) {
+             if (log.isDebugEnabled())
+                 log.debug("Cancelling sender thread.");
+ 
+             snd0.cancel();
+ 
+             try {
+                 snd0.join();
+ 
+                 if (log.isDebugEnabled())
+                     log.debug("Finished waiting for sending thread to complete on shuffle job flush: " + job.id());
+             }
+             catch (InterruptedException e) {
+                 throw new IgniteInterruptedCheckedException(e);
+             }
+         }
+ 
+         collectUpdatesAndSend(true); // With flush.
+ 
+         if (log.isDebugEnabled())
+             log.debug("Finished sending collected updates to remote reducers: " + job.id());
+ 
+         GridCompoundFuture fut = new GridCompoundFuture<>();
+ 
 -        for (IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapterEx<?>> tup : sentMsgs.values())
++        for (IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>> tup : sentMsgs.values())
+             fut.add(tup.get2());
+ 
+         fut.markInitialized();
+ 
+         if (log.isDebugEnabled())
+             log.debug("Collected futures to compound futures for flush: " + sentMsgs.size());
+ 
+         return fut;
+     }
+ 
+     /**
+      * @param taskCtx Task context.
+      * @return Output.
+      * @throws IgniteCheckedException If failed.
+      */
+     public HadoopTaskOutput output(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+         switch (taskCtx.taskInfo().type()) {
+             case MAP:
+                 assert !job.info().hasCombiner() : "The output creation is allowed if combiner has not been defined.";
+ 
+             case COMBINE:
+                 return new PartitionedOutput(taskCtx);
+ 
+             default:
+                 throw new IllegalStateException("Illegal type: " + taskCtx.taskInfo().type());
+         }
+     }
+ 
+     /**
+      * @param taskCtx Task context.
+      * @return Input.
+      * @throws IgniteCheckedException If failed.
+      */
+     @SuppressWarnings("unchecked")
+     public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+         switch (taskCtx.taskInfo().type()) {
+             case REDUCE:
+                 int reducer = taskCtx.taskInfo().taskNumber();
+ 
+                 HadoopMultimap m = maps.get(reducer);
+ 
+                 if (m != null)
+                     return m.input(taskCtx);
+ 
+                 return new HadoopTaskInput() { // Empty input.
+                     @Override public boolean next() {
+                         return false;
+                     }
+ 
+                     @Override public Object key() {
+                         throw new IllegalStateException();
+                     }
+ 
+                     @Override public Iterator<?> values() {
+                         throw new IllegalStateException();
+                     }
+ 
+                     @Override public void close() {
+                         // No-op.
+                     }
+                 };
+ 
+             default:
+                 throw new IllegalStateException("Illegal type: " + taskCtx.taskInfo().type());
+         }
+     }
+ 
+     /**
+      * Partitioned output.
+      */
+     private class PartitionedOutput implements HadoopTaskOutput {
+         /** */
+         private final HadoopTaskOutput[] adders = new HadoopTaskOutput[maps.length()];
+ 
+         /** */
+         private HadoopPartitioner partitioner;
+ 
+         /** */
+         private final HadoopTaskContext taskCtx;
+ 
+         /**
+          * Constructor.
+          * @param taskCtx Task context.
+          */
+         private PartitionedOutput(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+             this.taskCtx = taskCtx;
+ 
+             if (needPartitioner)
+                 partitioner = taskCtx.partitioner();
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void write(Object key, Object val) throws IgniteCheckedException {
+             int part = 0;
+ 
+             if (partitioner != null) {
+                 part = partitioner.partition(key, val, adders.length);
+ 
+                 if (part < 0 || part >= adders.length)
+                     throw new IgniteCheckedException("Invalid partition: " + part);
+             }
+ 
+             HadoopTaskOutput out = adders[part];
+ 
+             if (out == null)
+                 adders[part] = out = getOrCreateMap(maps, part).startAdding(taskCtx);
+ 
+             out.write(key, val);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void close() throws IgniteCheckedException {
+             for (HadoopTaskOutput adder : adders) {
+                 if (adder != null)
+                     adder.close();
+             }
+         }
+     }
+ }