You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2015/03/05 10:05:26 UTC

[26/58] [abbrv] incubator-ignite git commit: IGNITE-386: Squashed changes.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolKillJobTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolKillJobTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolKillJobTask.java
deleted file mode 100644
index 384bc23..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolKillJobTask.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.proto;
-
-import org.apache.ignite.*;
-import org.apache.ignite.compute.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-
-import java.util.*;
-
-/**
- * Kill job task.
- */
-public class GridHadoopProtocolKillJobTask extends GridHadoopProtocolTaskAdapter<Boolean> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** {@inheritDoc} */
-    @Override public Boolean run(ComputeJobContext jobCtx, GridHadoop hadoop,
-        GridHadoopProtocolTaskArguments args) throws IgniteCheckedException {
-        UUID nodeId = UUID.fromString(args.<String>get(0));
-        Integer id = args.get(1);
-
-        assert nodeId != null;
-        assert id != null;
-
-        GridHadoopJobId jobId = new GridHadoopJobId(nodeId, id);
-
-        return hadoop.kill(jobId);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolNextTaskIdTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolNextTaskIdTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolNextTaskIdTask.java
deleted file mode 100644
index f76f3b6..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolNextTaskIdTask.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.proto;
-
-import org.apache.ignite.compute.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-
-/**
- * Task to get the next job ID.
- */
-public class GridHadoopProtocolNextTaskIdTask extends GridHadoopProtocolTaskAdapter<GridHadoopJobId> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** {@inheritDoc} */
-    @Override public GridHadoopJobId run(ComputeJobContext jobCtx, GridHadoop hadoop,
-        GridHadoopProtocolTaskArguments args) {
-        return hadoop.nextJobId();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolSubmitJobTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolSubmitJobTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolSubmitJobTask.java
deleted file mode 100644
index c734acd..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolSubmitJobTask.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.proto;
-
-import org.apache.ignite.*;
-import org.apache.ignite.compute.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-
-import java.util.*;
-
-import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobPhase.*;
-
-/**
- * Submit job task.
- */
-public class GridHadoopProtocolSubmitJobTask extends GridHadoopProtocolTaskAdapter<GridHadoopJobStatus> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** {@inheritDoc} */
-    @Override public GridHadoopJobStatus run(ComputeJobContext jobCtx, GridHadoop hadoop,
-        GridHadoopProtocolTaskArguments args) throws IgniteCheckedException {
-        UUID nodeId = UUID.fromString(args.<String>get(0));
-        Integer id = args.get(1);
-        GridHadoopDefaultJobInfo info = args.get(2);
-
-        assert nodeId != null;
-        assert id != null;
-        assert info != null;
-
-        GridHadoopJobId jobId = new GridHadoopJobId(nodeId, id);
-
-        hadoop.submit(jobId, info);
-
-        GridHadoopJobStatus res = hadoop.status(jobId);
-
-        if (res == null) // Submission failed.
-            res = new GridHadoopJobStatus(jobId, info.jobName(), info.user(), 0, 0, 0, 0, PHASE_CANCELLING, true, 1);
-
-        return res;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskAdapter.java
deleted file mode 100644
index 086545c..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskAdapter.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.proto;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.compute.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.resources.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Hadoop protocol task adapter.
- */
-public abstract class GridHadoopProtocolTaskAdapter<R> implements ComputeTask<GridHadoopProtocolTaskArguments, R> {
-    /** {@inheritDoc} */
-    @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
-        @Nullable GridHadoopProtocolTaskArguments arg) {
-        return Collections.singletonMap(new Job(arg), subgrid.get(0));
-    }
-
-    /** {@inheritDoc} */
-    @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
-        return ComputeJobResultPolicy.REDUCE;
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public R reduce(List<ComputeJobResult> results) {
-        if (!F.isEmpty(results)) {
-            ComputeJobResult res = results.get(0);
-
-            return res.getData();
-        }
-        else
-            return null;
-    }
-
-    /**
-     * Job wrapper.
-     */
-    private class Job implements ComputeJob {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        @IgniteInstanceResource
-        private Ignite ignite;
-
-        /** */
-        @SuppressWarnings("UnusedDeclaration")
-        @JobContextResource
-        private ComputeJobContext jobCtx;
-
-        /** Argument. */
-        private final GridHadoopProtocolTaskArguments args;
-
-        /**
-         * Constructor.
-         *
-         * @param args Job argument.
-         */
-        private Job(GridHadoopProtocolTaskArguments args) {
-            this.args = args;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void cancel() {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public Object execute() {
-            try {
-                return run(jobCtx, ((IgniteEx)ignite).hadoop(), args);
-            }
-            catch (IgniteCheckedException e) {
-                throw U.convertException(e);
-            }
-        }
-    }
-
-    /**
-     * Run the task.
-     *
-     * @param jobCtx Job context.
-     * @param hadoop Hadoop facade.
-     * @param args Arguments.
-     * @return Job result.
-     * @throws IgniteCheckedException If failed.
-     */
-    public abstract R run(ComputeJobContext jobCtx, GridHadoop hadoop, GridHadoopProtocolTaskArguments args)
-        throws IgniteCheckedException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskArguments.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskArguments.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskArguments.java
deleted file mode 100644
index ae91a52..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/GridHadoopProtocolTaskArguments.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.proto;
-
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-
-/**
- * Task arguments.
- */
-public class GridHadoopProtocolTaskArguments implements Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Arguments. */
-    private Object[] args;
-
-    /**
-     * {@link Externalizable} support.
-     */
-    public GridHadoopProtocolTaskArguments() {
-        // No-op.
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param args Arguments.
-     */
-    public GridHadoopProtocolTaskArguments(Object... args) {
-        this.args = args;
-    }
-
-    /**
-     * @param idx Argument index.
-     * @return Argument.
-     */
-    @SuppressWarnings("unchecked")
-    @Nullable public <T> T get(int idx) {
-        return (args != null && args.length > idx) ? (T)args[idx] : null;
-    }
-
-    /**
-     * @return Size.
-     */
-    public int size() {
-        return args != null ? args.length : 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        U.writeArray(out, args);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        args = U.readArray(in);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridHadoopProtocolTaskArguments.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java
new file mode 100644
index 0000000..b454760
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopClientProtocol.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.proto;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.protocol.*;
+import org.apache.hadoop.mapreduce.security.token.delegation.*;
+import org.apache.hadoop.mapreduce.v2.*;
+import org.apache.hadoop.mapreduce.v2.jobhistory.*;
+import org.apache.hadoop.security.*;
+import org.apache.hadoop.security.authorize.*;
+import org.apache.hadoop.security.token.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.client.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
+
+/**
+ * Hadoop client protocol.
+ */
+public class HadoopClientProtocol implements ClientProtocol {
+    /** Ignite framework name property. */
+    public static final String FRAMEWORK_NAME = "ignite";
+
+    /** Protocol version. */
+    private static final long PROTO_VER = 1L;
+
+    /** Default Ignite system directory. */
+    private static final String SYS_DIR = ".ignite/system";
+
+    /** Configuration. */
+    private final Configuration conf;
+
+    /** Ignite client. */
+    private volatile GridClient cli;
+
+    /** Last received version. */
+    private long lastVer = -1;
+
+    /** Last received status. */
+    private HadoopJobStatus lastStatus;
+
+    /**
+     * Constructor.
+     *
+     * @param conf Configuration.
+     * @param cli Ignite client.
+     */
+    public HadoopClientProtocol(Configuration conf, GridClient cli) {
+        assert cli != null;
+
+        this.conf = conf;
+        this.cli = cli;
+    }
+
+    /** {@inheritDoc} */
+    @Override public JobID getNewJobID() throws IOException, InterruptedException {
+        try {
+            conf.setLong(REQ_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis());
+
+            HadoopJobId jobID = cli.compute().execute(HadoopProtocolNextTaskIdTask.class.getName(), null);
+
+            conf.setLong(RESPONSE_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis());
+
+            return new JobID(jobID.globalId().toString(), jobID.localId());
+        }
+        catch (GridClientException e) {
+            throw new IOException("Failed to get new job ID.", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException,
+        InterruptedException {
+        try {
+            conf.setLong(JOB_SUBMISSION_START_TS_PROPERTY, U.currentTimeMillis());
+
+            HadoopJobStatus status = cli.compute().execute(HadoopProtocolSubmitJobTask.class.getName(),
+                new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf)));
+
+            if (status == null)
+                throw new IOException("Failed to submit job (null status obtained): " + jobId);
+
+            return processStatus(status);
+        }
+        catch (GridClientException | IgniteCheckedException e) {
+            throw new IOException("Failed to submit job.", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterMetrics getClusterMetrics() throws IOException, InterruptedException {
+        return new ClusterMetrics(0, 0, 0, 0, 0, 0, 1000, 1000, 1, 100, 0, 0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cluster.JobTrackerStatus getJobTrackerStatus() throws IOException, InterruptedException {
+        return Cluster.JobTrackerStatus.RUNNING;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getTaskTrackerExpiryInterval() throws IOException, InterruptedException {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public AccessControlList getQueueAdmins(String queueName) throws IOException {
+        return new AccessControlList("*");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void killJob(JobID jobId) throws IOException, InterruptedException {
+        try {
+            cli.compute().execute(HadoopProtocolKillJobTask.class.getName(),
+                new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId()));
+        }
+        catch (GridClientException e) {
+            throw new IOException("Failed to kill job: " + jobId, e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setJobPriority(JobID jobid, String priority) throws IOException, InterruptedException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException,
+        InterruptedException {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public JobStatus getJobStatus(JobID jobId) throws IOException, InterruptedException {
+        try {
+            Long delay = conf.getLong(HadoopJobProperty.JOB_STATUS_POLL_DELAY.propertyName(), -1);
+
+            HadoopProtocolTaskArguments args = delay >= 0 ?
+                new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId(), delay) :
+                new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId());
+
+            HadoopJobStatus status = cli.compute().execute(HadoopProtocolJobStatusTask.class.getName(), args);
+
+            if (status == null)
+                throw new IOException("Job tracker doesn't have any information about the job: " + jobId);
+
+            return processStatus(status);
+        }
+        catch (GridClientException e) {
+            throw new IOException("Failed to get job status: " + jobId, e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Counters getJobCounters(JobID jobId) throws IOException, InterruptedException {
+        try {
+            final HadoopCounters counters = cli.compute().execute(HadoopProtocolJobCountersTask.class.getName(),
+                new HadoopProtocolTaskArguments(jobId.getJtIdentifier(), jobId.getId()));
+
+            if (counters == null)
+                throw new IOException("Job tracker doesn't have any information about the job: " + jobId);
+
+            return new HadoopMapReduceCounters(counters);
+        }
+        catch (GridClientException e) {
+            throw new IOException("Failed to get job counters: " + jobId, e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public TaskReport[] getTaskReports(JobID jobid, TaskType type) throws IOException, InterruptedException {
+        return new TaskReport[0];
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getFilesystemName() throws IOException, InterruptedException {
+        return FileSystem.get(conf).getUri().toString();
+    }
+
+    /** {@inheritDoc} */
+    @Override public JobStatus[] getAllJobs() throws IOException, InterruptedException {
+        return new JobStatus[0];
+    }
+
+    /** {@inheritDoc} */
+    @Override public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobid, int fromEventId, int maxEvents)
+        throws IOException, InterruptedException {
+        return new TaskCompletionEvent[0];
+    }
+
+    /** {@inheritDoc} */
+    @Override public String[] getTaskDiagnostics(TaskAttemptID taskId) throws IOException, InterruptedException {
+        return new String[0];
+    }
+
+    /** {@inheritDoc} */
+    @Override public TaskTrackerInfo[] getActiveTrackers() throws IOException, InterruptedException {
+        return new TaskTrackerInfo[0];
+    }
+
+    /** {@inheritDoc} */
+    @Override public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException, InterruptedException {
+        return new TaskTrackerInfo[0];
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getSystemDir() throws IOException, InterruptedException {
+        Path sysDir = new Path(SYS_DIR);
+
+        return sysDir.toString();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getStagingAreaDir() throws IOException, InterruptedException {
+        String usr = UserGroupInformation.getCurrentUser().getShortUserName();
+
+        return HadoopUtils.stagingAreaDir(conf, usr).toString();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getJobHistoryDir() throws IOException, InterruptedException {
+        return JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
+    }
+
+    /** {@inheritDoc} */
+    @Override public QueueInfo[] getQueues() throws IOException, InterruptedException {
+        return new QueueInfo[0];
+    }
+
+    /** {@inheritDoc} */
+    @Override public QueueInfo getQueue(String queueName) throws IOException, InterruptedException {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException, InterruptedException {
+        return new QueueAclsInfo[0];
+    }
+
+    /** {@inheritDoc} */
+    @Override public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
+        return new QueueInfo[0];
+    }
+
+    /** {@inheritDoc} */
+    @Override public QueueInfo[] getChildQueues(String queueName) throws IOException, InterruptedException {
+        return new QueueInfo[0];
+    }
+
+    /** {@inheritDoc} */
+    @Override public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException,
+        InterruptedException {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long renewDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException,
+        InterruptedException {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancelDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException,
+        InterruptedException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID) throws IOException,
+        InterruptedException {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
+        return PROTO_VER;
+    }
+
+    /** {@inheritDoc} */
+    @Override public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash)
+        throws IOException {
+        return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion, clientMethodsHash);
+    }
+
+    /**
+     * Process received status update.
+     *
+     * @param status Ignite status.
+     * @return Hadoop status.
+     */
+    private JobStatus processStatus(HadoopJobStatus status) {
+        // IMPORTANT! This method will only work in single-threaded environment. It is valid at the moment because
+        // IgniteHadoopClientProtocolProvider creates new instance of this class for every new job and Job class
+        // serializes invocations of submitJob() and getJobStatus() methods. However, if any of these conditions will
+        // change in future and either protocol will serve statuses for several jobs or status update will not be
+        // serialized anymore, then we have to fallback to concurrent approach (e.g. using ConcurrentHashMap).
+        // (vozerov)
+        if (lastVer < status.version()) {
+            lastVer = status.version();
+
+            lastStatus = status;
+        }
+        else
+            assert lastStatus != null;
+
+        return HadoopUtils.status(lastStatus, conf);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java
new file mode 100644
index 0000000..ebdda9f
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.proto;
+
+import org.apache.ignite.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+
+import java.util.*;
+
+/**
+ * Task to get job counters.
+ */
+public class HadoopProtocolJobCountersTask extends HadoopProtocolTaskAdapter<HadoopCounters> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** {@inheritDoc} */
+    @Override public HadoopCounters run(ComputeJobContext jobCtx, Hadoop hadoop,
+        HadoopProtocolTaskArguments args) throws IgniteCheckedException {
+
+        UUID nodeId = UUID.fromString(args.<String>get(0));
+        Integer id = args.get(1);
+
+        assert nodeId != null;
+        assert id != null;
+
+        return hadoop.counters(new HadoopJobId(nodeId, id));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java
new file mode 100644
index 0000000..1734562
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.proto;
+
+import org.apache.ignite.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+
+import java.util.*;
+
+/**
+ * Job status task.
+ */
+public class HadoopProtocolJobStatusTask extends HadoopProtocolTaskAdapter<HadoopJobStatus> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Default poll delay */
+    private static final long DFLT_POLL_DELAY = 100L;
+
+    /** Attribute for held status. */
+    private static final String ATTR_HELD = "held";
+
+    /** {@inheritDoc} */
+    @Override public HadoopJobStatus run(final ComputeJobContext jobCtx, Hadoop hadoop,
+        HadoopProtocolTaskArguments args) throws IgniteCheckedException {
+        UUID nodeId = UUID.fromString(args.<String>get(0));
+        Integer id = args.get(1);
+        Long pollDelay = args.get(2);
+
+        assert nodeId != null;
+        assert id != null;
+
+        HadoopJobId jobId = new HadoopJobId(nodeId, id);
+
+        if (pollDelay == null)
+            pollDelay = DFLT_POLL_DELAY;
+
+        if (pollDelay > 0) {
+            IgniteInternalFuture<?> fut = hadoop.finishFuture(jobId);
+
+            if (fut != null) {
+                if (fut.isDone() || F.eq(jobCtx.getAttribute(ATTR_HELD), true))
+                    return hadoop.status(jobId);
+                else {
+                    fut.listenAsync(new IgniteInClosure<IgniteInternalFuture<?>>() {
+                        @Override public void apply(IgniteInternalFuture<?> fut0) {
+                            jobCtx.callcc();
+                        }
+                    });
+
+                    jobCtx.setAttribute(ATTR_HELD, true);
+
+                    return jobCtx.holdcc(pollDelay);
+                }
+            }
+            else
+                return null;
+        }
+        else
+            return hadoop.status(jobId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java
new file mode 100644
index 0000000..d173612
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.proto;
+
+import org.apache.ignite.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+
+import java.util.*;
+
+/**
+ * Kill job task.
+ */
+public class HadoopProtocolKillJobTask extends HadoopProtocolTaskAdapter<Boolean> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** {@inheritDoc} */
+    @Override public Boolean run(ComputeJobContext jobCtx, Hadoop hadoop,
+        HadoopProtocolTaskArguments args) throws IgniteCheckedException {
+        UUID nodeId = UUID.fromString(args.<String>get(0));
+        Integer id = args.get(1);
+
+        assert nodeId != null;
+        assert id != null;
+
+        HadoopJobId jobId = new HadoopJobId(nodeId, id);
+
+        return hadoop.kill(jobId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java
new file mode 100644
index 0000000..2782530
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.proto;
+
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+
+/**
+ * Task to get the next job ID.
+ */
+public class HadoopProtocolNextTaskIdTask extends HadoopProtocolTaskAdapter<HadoopJobId> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** {@inheritDoc} */
+    @Override public HadoopJobId run(ComputeJobContext jobCtx, Hadoop hadoop,
+        HadoopProtocolTaskArguments args) {
+        return hadoop.nextJobId();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java
new file mode 100644
index 0000000..f65d9bb
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.proto;
+
+import org.apache.ignite.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.*;
+
+/**
+ * Submit job task.
+ */
+public class HadoopProtocolSubmitJobTask extends HadoopProtocolTaskAdapter<HadoopJobStatus> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** {@inheritDoc} */
+    @Override public HadoopJobStatus run(ComputeJobContext jobCtx, Hadoop hadoop,
+        HadoopProtocolTaskArguments args) throws IgniteCheckedException {
+        UUID nodeId = UUID.fromString(args.<String>get(0));
+        Integer id = args.get(1);
+        HadoopDefaultJobInfo info = args.get(2);
+
+        assert nodeId != null;
+        assert id != null;
+        assert info != null;
+
+        HadoopJobId jobId = new HadoopJobId(nodeId, id);
+
+        hadoop.submit(jobId, info);
+
+        HadoopJobStatus res = hadoop.status(jobId);
+
+        if (res == null) // Submission failed.
+            res = new HadoopJobStatus(jobId, info.jobName(), info.user(), 0, 0, 0, 0, PHASE_CANCELLING, true, 1);
+
+        return res;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java
new file mode 100644
index 0000000..f763ccc
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.proto;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.resources.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Hadoop protocol task adapter.
+ */
+public abstract class HadoopProtocolTaskAdapter<R> implements ComputeTask<HadoopProtocolTaskArguments, R> {
+    /** {@inheritDoc} */
+    @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+        @Nullable HadoopProtocolTaskArguments arg) {
+        return Collections.singletonMap(new Job(arg), subgrid.get(0));
+    }
+
+    /** {@inheritDoc} */
+    @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+        return ComputeJobResultPolicy.REDUCE;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public R reduce(List<ComputeJobResult> results) {
+        if (!F.isEmpty(results)) {
+            ComputeJobResult res = results.get(0);
+
+            return res.getData();
+        }
+        else
+            return null;
+    }
+
+    /**
+     * Job wrapper.
+     */
+    private class Job implements ComputeJob {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** */
+        @SuppressWarnings("UnusedDeclaration")
+        @JobContextResource
+        private ComputeJobContext jobCtx;
+
+        /** Argument. */
+        private final HadoopProtocolTaskArguments args;
+
+        /**
+         * Constructor.
+         *
+         * @param args Job argument.
+         */
+        private Job(HadoopProtocolTaskArguments args) {
+            this.args = args;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void cancel() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Object execute() {
+            try {
+                return run(jobCtx, ((IgniteEx)ignite).hadoop(), args);
+            }
+            catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
+        }
+    }
+
+    /**
+     * Run the task.
+     *
+     * @param jobCtx Job context.
+     * @param hadoop Hadoop facade.
+     * @param args Arguments.
+     * @return Job result.
+     * @throws IgniteCheckedException If failed.
+     */
+    public abstract R run(ComputeJobContext jobCtx, Hadoop hadoop, HadoopProtocolTaskArguments args)
+        throws IgniteCheckedException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java
new file mode 100644
index 0000000..5c470ba
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.proto;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Task arguments.
+ */
+public class HadoopProtocolTaskArguments implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Arguments. */
+    private Object[] args;
+
+    /**
+     * {@link Externalizable} support.
+     */
+    public HadoopProtocolTaskArguments() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param args Arguments.
+     */
+    public HadoopProtocolTaskArguments(Object... args) {
+        this.args = args;
+    }
+
+    /**
+     * @param idx Argument index.
+     * @return Argument.
+     */
+    @SuppressWarnings("unchecked")
+    @Nullable public <T> T get(int idx) {
+        return (args != null && args.length > idx) ? (T)args[idx] : null;
+    }
+
+    /**
+     * @return Size.
+     */
+    public int size() {
+        return args != null ? args.length : 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeArray(out, args);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        args = U.readArray(in);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopProtocolTaskArguments.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffle.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffle.java
deleted file mode 100644
index 396124e..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffle.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.message.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.offheap.unsafe.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- * Shuffle.
- */
-public class GridHadoopShuffle extends GridHadoopComponent {
-    /** */
-    private final ConcurrentMap<GridHadoopJobId, GridHadoopShuffleJob<UUID>> jobs = new ConcurrentHashMap<>();
-
-    /** */
-    protected final GridUnsafeMemory mem = new GridUnsafeMemory(0);
-
-    /** {@inheritDoc} */
-    @Override public void start(GridHadoopContext ctx) throws IgniteCheckedException {
-        super.start(ctx);
-
-        ctx.kernalContext().io().addUserMessageListener(GridTopic.TOPIC_HADOOP,
-            new IgniteBiPredicate<UUID, Object>() {
-                @Override public boolean apply(UUID nodeId, Object msg) {
-                    return onMessageReceived(nodeId, (GridHadoopMessage)msg);
-                }
-            });
-    }
-
-    /**
-     * Stops shuffle.
-     *
-     * @param cancel If should cancel all ongoing activities.
-     */
-    @Override public void stop(boolean cancel) {
-        for (GridHadoopShuffleJob job : jobs.values()) {
-            try {
-                job.close();
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to close job.", e);
-            }
-        }
-
-        jobs.clear();
-    }
-
-    /**
-     * Creates new shuffle job.
-     *
-     * @param jobId Job ID.
-     * @return Created shuffle job.
-     * @throws IgniteCheckedException If job creation failed.
-     */
-    private GridHadoopShuffleJob<UUID> newJob(GridHadoopJobId jobId) throws IgniteCheckedException {
-        GridHadoopMapReducePlan plan = ctx.jobTracker().plan(jobId);
-
-        GridHadoopShuffleJob<UUID> job = new GridHadoopShuffleJob<>(ctx.localNodeId(), log,
-            ctx.jobTracker().job(jobId, null), mem, plan.reducers(), plan.reducers(ctx.localNodeId()));
-
-        UUID[] rdcAddrs = new UUID[plan.reducers()];
-
-        for (int i = 0; i < rdcAddrs.length; i++) {
-            UUID nodeId = plan.nodeForReducer(i);
-
-            assert nodeId != null : "Plan is missing node for reducer [plan=" + plan + ", rdc=" + i + ']';
-
-            rdcAddrs[i] = nodeId;
-        }
-
-        boolean init = job.initializeReduceAddresses(rdcAddrs);
-
-        assert init;
-
-        return job;
-    }
-
-    /**
-     * @param nodeId Node ID to send message to.
-     * @param msg Message to send.
-     * @throws IgniteCheckedException If send failed.
-     */
-    private void send0(UUID nodeId, Object msg) throws IgniteCheckedException {
-        ClusterNode node = ctx.kernalContext().discovery().node(nodeId);
-
-        ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0);
-    }
-
-    /**
-     * @param jobId Task info.
-     * @return Shuffle job.
-     */
-    private GridHadoopShuffleJob<UUID> job(GridHadoopJobId jobId) throws IgniteCheckedException {
-        GridHadoopShuffleJob<UUID> res = jobs.get(jobId);
-
-        if (res == null) {
-            res = newJob(jobId);
-
-            GridHadoopShuffleJob<UUID> old = jobs.putIfAbsent(jobId, res);
-
-            if (old != null) {
-                res.close();
-
-                res = old;
-            }
-            else if (res.reducersInitialized())
-                startSending(res);
-        }
-
-        return res;
-    }
-
-    /**
-     * Starts message sending thread.
-     *
-     * @param shuffleJob Job to start sending for.
-     */
-    private void startSending(GridHadoopShuffleJob<UUID> shuffleJob) {
-        shuffleJob.startSending(ctx.kernalContext().gridName(),
-            new IgniteInClosure2X<UUID, GridHadoopShuffleMessage>() {
-                @Override public void applyx(UUID dest, GridHadoopShuffleMessage msg) throws IgniteCheckedException {
-                    send0(dest, msg);
-                }
-            }
-        );
-    }
-
-    /**
-     * Message received callback.
-     *
-     * @param src Sender node ID.
-     * @param msg Received message.
-     * @return {@code True}.
-     */
-    public boolean onMessageReceived(UUID src, GridHadoopMessage msg) {
-        if (msg instanceof GridHadoopShuffleMessage) {
-            GridHadoopShuffleMessage m = (GridHadoopShuffleMessage)msg;
-
-            try {
-                job(m.jobId()).onShuffleMessage(m);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Message handling failed.", e);
-            }
-
-            try {
-                // Reply with ack.
-                send0(src, new GridHadoopShuffleAck(m.id(), m.jobId()));
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to reply back to shuffle message sender [snd=" + src + ", msg=" + msg + ']', e);
-            }
-        }
-        else if (msg instanceof GridHadoopShuffleAck) {
-            GridHadoopShuffleAck m = (GridHadoopShuffleAck)msg;
-
-            try {
-                job(m.jobId()).onShuffleAck(m);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Message handling failed.", e);
-            }
-        }
-        else
-            throw new IllegalStateException("Unknown message type received to Hadoop shuffle [src=" + src +
-                ", msg=" + msg + ']');
-
-        return true;
-    }
-
-    /**
-     * @param taskCtx Task info.
-     * @return Output.
-     */
-    public GridHadoopTaskOutput output(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
-        return job(taskCtx.taskInfo().jobId()).output(taskCtx);
-    }
-
-    /**
-     * @param taskCtx Task info.
-     * @return Input.
-     */
-    public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
-        return job(taskCtx.taskInfo().jobId()).input(taskCtx);
-    }
-
-    /**
-     * @param jobId Job id.
-     */
-    public void jobFinished(GridHadoopJobId jobId) {
-        GridHadoopShuffleJob job = jobs.remove(jobId);
-
-        if (job != null) {
-            try {
-                job.close();
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to close job: " + jobId, e);
-            }
-        }
-    }
-
-    /**
-     * Flushes all the outputs for the given job to remote nodes.
-     *
-     * @param jobId Job ID.
-     * @return Future.
-     */
-    public IgniteInternalFuture<?> flush(GridHadoopJobId jobId) {
-        GridHadoopShuffleJob job = jobs.get(jobId);
-
-        if (job == null)
-            return new GridFinishedFutureEx<>();
-
-        try {
-            return job.flush();
-        }
-        catch (IgniteCheckedException e) {
-            return new GridFinishedFutureEx<>(e);
-        }
-    }
-
-    /**
-     * @return Memory.
-     */
-    public GridUnsafeMemory memory() {
-        return mem;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleAck.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleAck.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleAck.java
deleted file mode 100644
index a8a52a9..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleAck.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle;
-
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.message.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-
-/**
- * Acknowledgement message.
- */
-public class GridHadoopShuffleAck implements GridHadoopMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    @GridToStringInclude
-    private long msgId;
-
-    /** */
-    @GridToStringInclude
-    private GridHadoopJobId jobId;
-
-    /**
-     *
-     */
-    public GridHadoopShuffleAck() {
-        // No-op.
-    }
-
-    /**
-     * @param msgId Message ID.
-     */
-    public GridHadoopShuffleAck(long msgId, GridHadoopJobId jobId) {
-        assert jobId != null;
-
-        this.msgId = msgId;
-        this.jobId = jobId;
-    }
-
-    /**
-     * @return Message ID.
-     */
-    public long id() {
-        return msgId;
-    }
-
-    /**
-     * @return Job ID.
-     */
-    public GridHadoopJobId jobId() {
-        return jobId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        jobId.writeExternal(out);
-        out.writeLong(msgId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        jobId = new GridHadoopJobId();
-
-        jobId.readExternal(in);
-        msgId = in.readLong();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridHadoopShuffleAck.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleJob.java
deleted file mode 100644
index 545c1b8..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleJob.java
+++ /dev/null
@@ -1,593 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.counter.*;
-import org.apache.ignite.internal.processors.hadoop.shuffle.collections.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.io.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.offheap.unsafe.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.util.worker.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.thread.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobProperty.*;
-import static org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*;
-
-/**
- * Shuffle job.
- */
-public class GridHadoopShuffleJob<T> implements AutoCloseable {
-    /** */
-    private static final int MSG_BUF_SIZE = 128 * 1024;
-
-    /** */
-    private final GridHadoopJob job;
-
-    /** */
-    private final GridUnsafeMemory mem;
-
-    /** */
-    private final boolean needPartitioner;
-
-    /** Collection of task contexts for each reduce task. */
-    private final Map<Integer, GridHadoopTaskContext> reducersCtx = new HashMap<>();
-
-    /** Reducers addresses. */
-    private T[] reduceAddrs;
-
-    /** Local reducers address. */
-    private final T locReduceAddr;
-
-    /** */
-    private final GridHadoopShuffleMessage[] msgs;
-
-    /** */
-    private final AtomicReferenceArray<GridHadoopMultimap> maps;
-
-    /** */
-    private volatile IgniteInClosure2X<T, GridHadoopShuffleMessage> io;
-
-    /** */
-    protected ConcurrentMap<Long, IgniteBiTuple<GridHadoopShuffleMessage, GridFutureAdapterEx<?>>> sentMsgs =
-        new ConcurrentHashMap<>();
-
-    /** */
-    private volatile GridWorker snd;
-
-    /** Latch for remote addresses waiting. */
-    private final CountDownLatch ioInitLatch = new CountDownLatch(1);
-
-    /** Finished flag. Set on flush or close. */
-    private volatile boolean flushed;
-
-    /** */
-    private final IgniteLogger log;
-
-    /**
-     * @param locReduceAddr Local reducer address.
-     * @param log Logger.
-     * @param job Job.
-     * @param mem Memory.
-     * @param totalReducerCnt Amount of reducers in the Job.
-     * @param locReducers Reducers will work on current node.
-     * @throws IgniteCheckedException If error.
-     */
-    public GridHadoopShuffleJob(T locReduceAddr, IgniteLogger log, GridHadoopJob job, GridUnsafeMemory mem,
-        int totalReducerCnt, int[] locReducers) throws IgniteCheckedException {
-        this.locReduceAddr = locReduceAddr;
-        this.job = job;
-        this.mem = mem;
-        this.log = log.getLogger(GridHadoopShuffleJob.class);
-
-        if (!F.isEmpty(locReducers)) {
-            for (int rdc : locReducers) {
-                GridHadoopTaskInfo taskInfo = new GridHadoopTaskInfo(GridHadoopTaskType.REDUCE, job.id(), rdc, 0, null);
-
-                reducersCtx.put(rdc, job.getTaskContext(taskInfo));
-            }
-        }
-
-        needPartitioner = totalReducerCnt > 1;
-
-        maps = new AtomicReferenceArray<>(totalReducerCnt);
-        msgs = new GridHadoopShuffleMessage[totalReducerCnt];
-    }
-
-    /**
-     * @param reduceAddrs Addresses of reducers.
-     * @return {@code True} if addresses were initialized by this call.
-     */
-    public boolean initializeReduceAddresses(T[] reduceAddrs) {
-        if (this.reduceAddrs == null) {
-            this.reduceAddrs = reduceAddrs;
-
-            return true;
-        }
-
-        return false;
-    }
-
-    /**
-     * @return {@code True} if reducers addresses were initialized.
-     */
-    public boolean reducersInitialized() {
-        return reduceAddrs != null;
-    }
-
-    /**
-     * @param gridName Grid name.
-     * @param io IO Closure for sending messages.
-     */
-    @SuppressWarnings("BusyWait")
-    public void startSending(String gridName, IgniteInClosure2X<T, GridHadoopShuffleMessage> io) {
-        assert snd == null;
-        assert io != null;
-
-        this.io = io;
-
-        if (!flushed) {
-            snd = new GridWorker(gridName, "hadoop-shuffle-" + job.id(), log) {
-                @Override protected void body() throws InterruptedException {
-                    try {
-                        while (!isCancelled()) {
-                            Thread.sleep(5);
-
-                            collectUpdatesAndSend(false);
-                        }
-                    }
-                    catch (IgniteCheckedException e) {
-                        throw new IllegalStateException(e);
-                    }
-                }
-            };
-
-            new IgniteThread(snd).start();
-        }
-
-        ioInitLatch.countDown();
-    }
-
-    /**
-     * @param maps Maps.
-     * @param idx Index.
-     * @return Map.
-     */
-    private GridHadoopMultimap getOrCreateMap(AtomicReferenceArray<GridHadoopMultimap> maps, int idx) {
-        GridHadoopMultimap map = maps.get(idx);
-
-        if (map == null) { // Create new map.
-            map = get(job.info(), SHUFFLE_REDUCER_NO_SORTING, false) ?
-                new GridHadoopConcurrentHashMultimap(job.info(), mem, get(job.info(), PARTITION_HASHMAP_SIZE, 8 * 1024)):
-                new GridHadoopSkipList(job.info(), mem);
-
-            if (!maps.compareAndSet(idx, null, map)) {
-                map.close();
-
-                return maps.get(idx);
-            }
-        }
-
-        return map;
-    }
-
-    /**
-     * @param msg Message.
-     * @throws IgniteCheckedException Exception.
-     */
-    public void onShuffleMessage(GridHadoopShuffleMessage msg) throws IgniteCheckedException {
-        assert msg.buffer() != null;
-        assert msg.offset() > 0;
-
-        GridHadoopTaskContext taskCtx = reducersCtx.get(msg.reducer());
-
-        GridHadoopPerformanceCounter perfCntr = GridHadoopPerformanceCounter.getCounter(taskCtx.counters(), null);
-
-        perfCntr.onShuffleMessage(msg.reducer(), U.currentTimeMillis());
-
-        GridHadoopMultimap map = getOrCreateMap(maps, msg.reducer());
-
-        // Add data from message to the map.
-        try (GridHadoopMultimap.Adder adder = map.startAdding(taskCtx)) {
-            final GridUnsafeDataInput dataInput = new GridUnsafeDataInput();
-            final UnsafeValue val = new UnsafeValue(msg.buffer());
-
-            msg.visit(new GridHadoopShuffleMessage.Visitor() {
-                /** */
-                private GridHadoopMultimap.Key key;
-
-                @Override public void onKey(byte[] buf, int off, int len) throws IgniteCheckedException {
-                    dataInput.bytes(buf, off, off + len);
-
-                    key = adder.addKey(dataInput, key);
-                }
-
-                @Override public void onValue(byte[] buf, int off, int len) {
-                    val.off = off;
-                    val.size = len;
-
-                    key.add(val);
-                }
-            });
-        }
-    }
-
-    /**
-     * @param ack Shuffle ack.
-     */
-    @SuppressWarnings("ConstantConditions")
-    public void onShuffleAck(GridHadoopShuffleAck ack) {
-        IgniteBiTuple<GridHadoopShuffleMessage, GridFutureAdapterEx<?>> tup = sentMsgs.get(ack.id());
-
-        if (tup != null)
-            tup.get2().onDone();
-        else
-            log.warning("Received shuffle ack for not registered shuffle id: " + ack);
-    }
-
-    /**
-     * Unsafe value.
-     */
-    private static class UnsafeValue implements GridHadoopMultimap.Value {
-        /** */
-        private final byte[] buf;
-
-        /** */
-        private int off;
-
-        /** */
-        private int size;
-
-        /**
-         * @param buf Buffer.
-         */
-        private UnsafeValue(byte[] buf) {
-            assert buf != null;
-
-            this.buf = buf;
-        }
-
-        /** */
-        @Override public int size() {
-            return size;
-        }
-
-        /** */
-        @Override public void copyTo(long ptr) {
-            UNSAFE.copyMemory(buf, BYTE_ARR_OFF + off, null, ptr, size);
-        }
-    }
-
-    /**
-     * Sends map updates to remote reducers.
-     */
-    private void collectUpdatesAndSend(boolean flush) throws IgniteCheckedException {
-        for (int i = 0; i < maps.length(); i++) {
-            GridHadoopMultimap map = maps.get(i);
-
-            if (map == null || locReduceAddr.equals(reduceAddrs[i]))
-                continue; // Skip empty map and local node.
-
-            if (msgs[i] == null)
-                msgs[i] = new GridHadoopShuffleMessage(job.id(), i, MSG_BUF_SIZE);
-
-            final int idx = i;
-
-            map.visit(false, new GridHadoopMultimap.Visitor() {
-                /** */
-                private long keyPtr;
-
-                /** */
-                private int keySize;
-
-                /** */
-                private boolean keyAdded;
-
-                /** {@inheritDoc} */
-                @Override public void onKey(long keyPtr, int keySize) {
-                    this.keyPtr = keyPtr;
-                    this.keySize = keySize;
-
-                    keyAdded = false;
-                }
-
-                private boolean tryAdd(long valPtr, int valSize) {
-                    GridHadoopShuffleMessage msg = msgs[idx];
-
-                    if (!keyAdded) { // Add key and value.
-                        int size = keySize + valSize;
-
-                        if (!msg.available(size, false))
-                            return false;
-
-                        msg.addKey(keyPtr, keySize);
-                        msg.addValue(valPtr, valSize);
-
-                        keyAdded = true;
-
-                        return true;
-                    }
-
-                    if (!msg.available(valSize, true))
-                        return false;
-
-                    msg.addValue(valPtr, valSize);
-
-                    return true;
-                }
-
-                /** {@inheritDoc} */
-                @Override public void onValue(long valPtr, int valSize) {
-                    if (tryAdd(valPtr, valSize))
-                        return;
-
-                    send(idx, keySize + valSize);
-
-                    keyAdded = false;
-
-                    if (!tryAdd(valPtr, valSize))
-                        throw new IllegalStateException();
-                }
-            });
-
-            if (flush && msgs[i].offset() != 0)
-                send(i, 0);
-        }
-    }
-
-    /**
-     * @param idx Index of message.
-     * @param newBufMinSize Min new buffer size.
-     */
-    private void send(final int idx, int newBufMinSize) {
-        final GridFutureAdapterEx<?> fut = new GridFutureAdapterEx<>();
-
-        GridHadoopShuffleMessage msg = msgs[idx];
-
-        final long msgId = msg.id();
-
-        IgniteBiTuple<GridHadoopShuffleMessage, GridFutureAdapterEx<?>> old = sentMsgs.putIfAbsent(msgId,
-            new IgniteBiTuple<GridHadoopShuffleMessage, GridFutureAdapterEx<?>>(msg, fut));
-
-        assert old == null;
-
-        try {
-            io.apply(reduceAddrs[idx], msg);
-        }
-        catch (GridClosureException e) {
-            fut.onDone(U.unwrap(e));
-        }
-
-        fut.listenAsync(new IgniteInClosure<IgniteInternalFuture<?>>() {
-            @Override public void apply(IgniteInternalFuture<?> f) {
-                try {
-                    f.get();
-
-                    // Clean up the future from map only if there was no exception.
-                    // Otherwise flush() should fail.
-                    sentMsgs.remove(msgId);
-                }
-                catch (IgniteCheckedException e) {
-                    log.error("Failed to send message.", e);
-                }
-            }
-        });
-
-        msgs[idx] = newBufMinSize == 0 ? null : new GridHadoopShuffleMessage(job.id(), idx,
-            Math.max(MSG_BUF_SIZE, newBufMinSize));
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close() throws IgniteCheckedException {
-        if (snd != null) {
-            snd.cancel();
-
-            try {
-                snd.join();
-            }
-            catch (InterruptedException e) {
-                throw new IgniteInterruptedCheckedException(e);
-            }
-        }
-
-        close(maps);
-    }
-
-    /**
-     * @param maps Maps.
-     */
-    private void close(AtomicReferenceArray<GridHadoopMultimap> maps) {
-        for (int i = 0; i < maps.length(); i++) {
-            GridHadoopMultimap map = maps.get(i);
-
-            if (map != null)
-                map.close();
-        }
-    }
-
-    /**
-     * @return Future.
-     */
-    @SuppressWarnings("unchecked")
-    public IgniteInternalFuture<?> flush() throws IgniteCheckedException {
-        if (log.isDebugEnabled())
-            log.debug("Flushing job " + job.id() + " on address " + locReduceAddr);
-
-        flushed = true;
-
-        if (maps.length() == 0)
-            return new GridFinishedFutureEx<>();
-
-        U.await(ioInitLatch);
-
-        GridWorker snd0 = snd;
-
-        if (snd0 != null) {
-            if (log.isDebugEnabled())
-                log.debug("Cancelling sender thread.");
-
-            snd0.cancel();
-
-            try {
-                snd0.join();
-
-                if (log.isDebugEnabled())
-                    log.debug("Finished waiting for sending thread to complete on shuffle job flush: " + job.id());
-            }
-            catch (InterruptedException e) {
-                throw new IgniteInterruptedCheckedException(e);
-            }
-        }
-
-        collectUpdatesAndSend(true); // With flush.
-
-        if (log.isDebugEnabled())
-            log.debug("Finished sending collected updates to remote reducers: " + job.id());
-
-        GridCompoundFuture fut = new GridCompoundFuture<>();
-
-        for (IgniteBiTuple<GridHadoopShuffleMessage, GridFutureAdapterEx<?>> tup : sentMsgs.values())
-            fut.add(tup.get2());
-
-        fut.markInitialized();
-
-        if (log.isDebugEnabled())
-            log.debug("Collected futures to compound futures for flush: " + sentMsgs.size());
-
-        return fut;
-    }
-
-    /**
-     * @param taskCtx Task context.
-     * @return Output.
-     * @throws IgniteCheckedException If failed.
-     */
-    public GridHadoopTaskOutput output(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
-        switch (taskCtx.taskInfo().type()) {
-            case MAP:
-                assert !job.info().hasCombiner() : "The output creation is allowed if combiner has not been defined.";
-
-            case COMBINE:
-                return new PartitionedOutput(taskCtx);
-
-            default:
-                throw new IllegalStateException("Illegal type: " + taskCtx.taskInfo().type());
-        }
-    }
-
-    /**
-     * @param taskCtx Task context.
-     * @return Input.
-     * @throws IgniteCheckedException If failed.
-     */
-    @SuppressWarnings("unchecked")
-    public GridHadoopTaskInput input(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
-        switch (taskCtx.taskInfo().type()) {
-            case REDUCE:
-                int reducer = taskCtx.taskInfo().taskNumber();
-
-                GridHadoopMultimap m = maps.get(reducer);
-
-                if (m != null)
-                    return m.input(taskCtx);
-
-                return new GridHadoopTaskInput() { // Empty input.
-                    @Override public boolean next() {
-                        return false;
-                    }
-
-                    @Override public Object key() {
-                        throw new IllegalStateException();
-                    }
-
-                    @Override public Iterator<?> values() {
-                        throw new IllegalStateException();
-                    }
-
-                    @Override public void close() {
-                        // No-op.
-                    }
-                };
-
-            default:
-                throw new IllegalStateException("Illegal type: " + taskCtx.taskInfo().type());
-        }
-    }
-
-    /**
-     * Partitioned output.
-     */
-    private class PartitionedOutput implements GridHadoopTaskOutput {
-        /** */
-        private final GridHadoopTaskOutput[] adders = new GridHadoopTaskOutput[maps.length()];
-
-        /** */
-        private GridHadoopPartitioner partitioner;
-
-        /** */
-        private final GridHadoopTaskContext taskCtx;
-
-        /**
-         * Constructor.
-         * @param taskCtx Task context.
-         */
-        private PartitionedOutput(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
-            this.taskCtx = taskCtx;
-
-            if (needPartitioner)
-                partitioner = taskCtx.partitioner();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void write(Object key, Object val) throws IgniteCheckedException {
-            int part = 0;
-
-            if (partitioner != null) {
-                part = partitioner.partition(key, val, adders.length);
-
-                if (part < 0 || part >= adders.length)
-                    throw new IgniteCheckedException("Invalid partition: " + part);
-            }
-
-            GridHadoopTaskOutput out = adders[part];
-
-            if (out == null)
-                adders[part] = out = getOrCreateMap(maps, part).startAdding(taskCtx);
-
-            out.write(key, val);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close() throws IgniteCheckedException {
-            for (GridHadoopTaskOutput adder : adders) {
-                if (adder != null)
-                    adder.close();
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleMessage.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleMessage.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleMessage.java
deleted file mode 100644
index 24ebc0c..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/GridHadoopShuffleMessage.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.message.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*;
-
-/**
- * Shuffle message.
- */
-public class GridHadoopShuffleMessage implements GridHadoopMessage {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private static final AtomicLong ids = new AtomicLong();
-
-    /** */
-    private static final byte MARKER_KEY = (byte)17;
-
-    /** */
-    private static final byte MARKER_VALUE = (byte)31;
-
-    /** */
-    @GridToStringInclude
-    private long msgId;
-
-    /** */
-    @GridToStringInclude
-    private GridHadoopJobId jobId;
-
-    /** */
-    @GridToStringInclude
-    private int reducer;
-
-    /** */
-    private byte[] buf;
-
-    /** */
-    @GridToStringInclude
-    private int off;
-
-    /**
-     *
-     */
-    public GridHadoopShuffleMessage() {
-        // No-op.
-    }
-
-    /**
-     * @param size Size.
-     */
-    public GridHadoopShuffleMessage(GridHadoopJobId jobId, int reducer, int size) {
-        assert jobId != null;
-
-        buf = new byte[size];
-
-        this.jobId = jobId;
-        this.reducer = reducer;
-
-        msgId = ids.incrementAndGet();
-    }
-
-    /**
-     * @return Message ID.
-     */
-    public long id() {
-        return msgId;
-    }
-
-    /**
-     * @return Job ID.
-     */
-    public GridHadoopJobId jobId() {
-        return jobId;
-    }
-
-    /**
-     * @return Reducer.
-     */
-    public int reducer() {
-        return reducer;
-    }
-
-    /**
-     * @return Buffer.
-     */
-    public byte[] buffer() {
-        return buf;
-    }
-
-    /**
-     * @return Offset.
-     */
-    public int offset() {
-        return off;
-    }
-
-    /**
-     * @param size Size.
-     * @param valOnly Only value wll be added.
-     * @return {@code true} If this message can fit additional data of this size
-     */
-    public boolean available(int size, boolean valOnly) {
-        size += valOnly ? 5 : 10;
-
-        if (off + size > buf.length) {
-            if (off == 0) { // Resize if requested size is too big.
-                buf = new byte[size];
-
-                return true;
-            }
-
-            return false;
-        }
-
-        return true;
-    }
-
-    /**
-     * @param keyPtr Key pointer.
-     * @param keySize Key size.
-     */
-    public void addKey(long keyPtr, int keySize) {
-        add(MARKER_KEY, keyPtr, keySize);
-    }
-
-    /**
-     * @param valPtr Value pointer.
-     * @param valSize Value size.
-     */
-    public void addValue(long valPtr, int valSize) {
-        add(MARKER_VALUE, valPtr, valSize);
-    }
-
-    /**
-     * @param marker Marker.
-     * @param ptr Pointer.
-     * @param size Size.
-     */
-    private void add(byte marker, long ptr, int size) {
-        buf[off++] = marker;
-
-        UNSAFE.putInt(buf, BYTE_ARR_OFF + off, size);
-
-        off += 4;
-
-        UNSAFE.copyMemory(null, ptr, buf, BYTE_ARR_OFF + off, size);
-
-        off += size;
-    }
-
-    /**
-     * @param v Visitor.
-     */
-    public void visit(Visitor v) throws IgniteCheckedException {
-        for (int i = 0; i < off;) {
-            byte marker = buf[i++];
-
-            int size = UNSAFE.getInt(buf, BYTE_ARR_OFF + i);
-
-            i += 4;
-
-            if (marker == MARKER_VALUE)
-                v.onValue(buf, i, size);
-            else if (marker == MARKER_KEY)
-                v.onKey(buf, i, size);
-            else
-                throw new IllegalStateException();
-
-            i += size;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        jobId.writeExternal(out);
-        out.writeLong(msgId);
-        out.writeInt(reducer);
-        out.writeInt(off);
-        U.writeByteArray(out, buf);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        jobId = new GridHadoopJobId();
-
-        jobId.readExternal(in);
-        msgId = in.readLong();
-        reducer = in.readInt();
-        off = in.readInt();
-        buf = U.readByteArray(in);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridHadoopShuffleMessage.class, this);
-    }
-
-    /**
-     * Visitor.
-     */
-    public static interface Visitor {
-        /**
-         * @param buf Buffer.
-         * @param off Offset.
-         * @param len Length.
-         */
-        public void onKey(byte[] buf, int off, int len) throws IgniteCheckedException;
-
-        /**
-         * @param buf Buffer.
-         * @param off Offset.
-         * @param len Length.
-         */
-        public void onValue(byte[] buf, int off, int len) throws IgniteCheckedException;
-    }
-}