You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/21 14:54:24 UTC

[92/92] [abbrv] ignite git commit: Moved tasks to "hadoop" module.

Moved tasks to "hadoop" module.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7e740e07
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7e740e07
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7e740e07

Branch: refs/heads/ignite-3949
Commit: 7e740e07a999012d459843249674031454bdba5a
Parents: 9e42695
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Sep 21 17:38:15 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Sep 21 17:38:15 2016 +0300

----------------------------------------------------------------------
 .../hadoop/impl/proto/HadoopClientProtocol.java |   6 +
 .../proto/HadoopProtocolJobCountersTask.java    |  46 -------
 .../impl/proto/HadoopProtocolJobStatusTask.java |  82 -------------
 .../impl/proto/HadoopProtocolKillJobTask.java   |  46 -------
 .../proto/HadoopProtocolNextTaskIdTask.java     |  36 ------
 .../impl/proto/HadoopProtocolSubmitJobTask.java |  59 ---------
 .../impl/proto/HadoopProtocolTaskAdapter.java   | 120 -------------------
 .../impl/proto/HadoopProtocolTaskArguments.java |  84 -------------
 .../proto/HadoopProtocolJobCountersTask.java    |  46 +++++++
 .../proto/HadoopProtocolJobStatusTask.java      |  82 +++++++++++++
 .../hadoop/proto/HadoopProtocolKillJobTask.java |  46 +++++++
 .../proto/HadoopProtocolNextTaskIdTask.java     |  36 ++++++
 .../proto/HadoopProtocolSubmitJobTask.java      |  59 +++++++++
 .../hadoop/proto/HadoopProtocolTaskAdapter.java | 120 +++++++++++++++++++
 .../proto/HadoopProtocolTaskArguments.java      |  84 +++++++++++++
 15 files changed, 479 insertions(+), 473 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7e740e07/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java
index 9be4a66..ed8beb0 100644
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java
+++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopClientProtocol.java
@@ -53,6 +53,12 @@ import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus;
 import org.apache.ignite.internal.processors.hadoop.impl.HadoopMapReduceCounters;
 import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.hadoop.proto.HadoopProtocolJobCountersTask;
+import org.apache.ignite.internal.processors.hadoop.proto.HadoopProtocolJobStatusTask;
+import org.apache.ignite.internal.processors.hadoop.proto.HadoopProtocolKillJobTask;
+import org.apache.ignite.internal.processors.hadoop.proto.HadoopProtocolNextTaskIdTask;
+import org.apache.ignite.internal.processors.hadoop.proto.HadoopProtocolSubmitJobTask;
+import org.apache.ignite.internal.processors.hadoop.proto.HadoopProtocolTaskArguments;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
 import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e740e07/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopProtocolJobCountersTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopProtocolJobCountersTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopProtocolJobCountersTask.java
deleted file mode 100644
index ba372fb..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopProtocolJobCountersTask.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.proto;
-
-import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.compute.ComputeJobContext;
-import org.apache.ignite.internal.processors.hadoop.Hadoop;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-
-/**
- * Task to get job counters.
- */
-public class HadoopProtocolJobCountersTask extends HadoopProtocolTaskAdapter<HadoopCounters> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** {@inheritDoc} */
-    @Override public HadoopCounters run(ComputeJobContext jobCtx, Hadoop hadoop,
-        HadoopProtocolTaskArguments args) throws IgniteCheckedException {
-
-        UUID nodeId = UUID.fromString(args.<String>get(0));
-        Integer id = args.get(1);
-
-        assert nodeId != null;
-        assert id != null;
-
-        return hadoop.counters(new HadoopJobId(nodeId, id));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e740e07/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopProtocolJobStatusTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopProtocolJobStatusTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopProtocolJobStatusTask.java
deleted file mode 100644
index 7cb3542..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopProtocolJobStatusTask.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.proto;
-
-import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.compute.ComputeJobContext;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.hadoop.Hadoop;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.lang.IgniteInClosure;
-
-/**
- * Job status task.
- */
-public class HadoopProtocolJobStatusTask extends HadoopProtocolTaskAdapter<HadoopJobStatus> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Default poll delay */
-    private static final long DFLT_POLL_DELAY = 100L;
-
-    /** Attribute for held status. */
-    private static final String ATTR_HELD = "held";
-
-    /** {@inheritDoc} */
-    @Override public HadoopJobStatus run(final ComputeJobContext jobCtx, Hadoop hadoop,
-        HadoopProtocolTaskArguments args) throws IgniteCheckedException {
-        UUID nodeId = UUID.fromString(args.<String>get(0));
-        Integer id = args.get(1);
-        Long pollDelay = args.get(2);
-
-        assert nodeId != null;
-        assert id != null;
-
-        HadoopJobId jobId = new HadoopJobId(nodeId, id);
-
-        if (pollDelay == null)
-            pollDelay = DFLT_POLL_DELAY;
-
-        if (pollDelay > 0) {
-            IgniteInternalFuture<?> fut = hadoop.finishFuture(jobId);
-
-            if (fut != null) {
-                if (fut.isDone() || F.eq(jobCtx.getAttribute(ATTR_HELD), true))
-                    return hadoop.status(jobId);
-                else {
-                    fut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
-                        @Override public void apply(IgniteInternalFuture<?> fut0) {
-                            jobCtx.callcc();
-                        }
-                    });
-
-                    jobCtx.setAttribute(ATTR_HELD, true);
-
-                    return jobCtx.holdcc(pollDelay);
-                }
-            }
-            else
-                return null;
-        }
-        else
-            return hadoop.status(jobId);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e740e07/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopProtocolKillJobTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopProtocolKillJobTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopProtocolKillJobTask.java
deleted file mode 100644
index 1f8e7fc..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopProtocolKillJobTask.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.proto;
-
-import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.compute.ComputeJobContext;
-import org.apache.ignite.internal.processors.hadoop.Hadoop;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-
-/**
- * Kill job task.
- */
-public class HadoopProtocolKillJobTask extends HadoopProtocolTaskAdapter<Boolean> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** {@inheritDoc} */
-    @Override public Boolean run(ComputeJobContext jobCtx, Hadoop hadoop,
-        HadoopProtocolTaskArguments args) throws IgniteCheckedException {
-        UUID nodeId = UUID.fromString(args.<String>get(0));
-        Integer id = args.get(1);
-
-        assert nodeId != null;
-        assert id != null;
-
-        HadoopJobId jobId = new HadoopJobId(nodeId, id);
-
-        return hadoop.kill(jobId);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e740e07/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopProtocolNextTaskIdTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopProtocolNextTaskIdTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopProtocolNextTaskIdTask.java
deleted file mode 100644
index 1f12eb2..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopProtocolNextTaskIdTask.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.proto;
-
-import org.apache.ignite.compute.ComputeJobContext;
-import org.apache.ignite.internal.processors.hadoop.Hadoop;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-
-/**
- * Task to get the next job ID.
- */
-public class HadoopProtocolNextTaskIdTask extends HadoopProtocolTaskAdapter<HadoopJobId> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** {@inheritDoc} */
-    @Override public HadoopJobId run(ComputeJobContext jobCtx, Hadoop hadoop,
-        HadoopProtocolTaskArguments args) {
-        return hadoop.nextJobId();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e740e07/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopProtocolSubmitJobTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopProtocolSubmitJobTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopProtocolSubmitJobTask.java
deleted file mode 100644
index 1736ed5..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopProtocolSubmitJobTask.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.proto;
-
-import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.compute.ComputeJobContext;
-import org.apache.ignite.internal.processors.hadoop.Hadoop;
-import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus;
-
-import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_CANCELLING;
-
-/**
- * Submit job task.
- */
-public class HadoopProtocolSubmitJobTask extends HadoopProtocolTaskAdapter<HadoopJobStatus> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** {@inheritDoc} */
-    @Override public HadoopJobStatus run(ComputeJobContext jobCtx, Hadoop hadoop,
-        HadoopProtocolTaskArguments args) throws IgniteCheckedException {
-        UUID nodeId = UUID.fromString(args.<String>get(0));
-        Integer id = args.get(1);
-        HadoopDefaultJobInfo info = args.get(2);
-
-        assert nodeId != null;
-        assert id != null;
-        assert info != null;
-
-        HadoopJobId jobId = new HadoopJobId(nodeId, id);
-
-        hadoop.submit(jobId, info);
-
-        HadoopJobStatus res = hadoop.status(jobId);
-
-        if (res == null) // Submission failed.
-            res = new HadoopJobStatus(jobId, info.jobName(), info.user(), 0, 0, 0, 0, PHASE_CANCELLING, true, 1);
-
-        return res;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e740e07/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopProtocolTaskAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopProtocolTaskAdapter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopProtocolTaskAdapter.java
deleted file mode 100644
index 1ab59a7..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopProtocolTaskAdapter.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.proto;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.compute.ComputeJobContext;
-import org.apache.ignite.compute.ComputeJobResult;
-import org.apache.ignite.compute.ComputeJobResultPolicy;
-import org.apache.ignite.compute.ComputeTask;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.processors.hadoop.Hadoop;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.resources.IgniteInstanceResource;
-import org.apache.ignite.resources.JobContextResource;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Hadoop protocol task adapter.
- */
-public abstract class HadoopProtocolTaskAdapter<R> implements ComputeTask<HadoopProtocolTaskArguments, R> {
-    /** {@inheritDoc} */
-    @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
-        @Nullable HadoopProtocolTaskArguments arg) {
-        return Collections.singletonMap(new Job(arg), subgrid.get(0));
-    }
-
-    /** {@inheritDoc} */
-    @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
-        return ComputeJobResultPolicy.REDUCE;
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public R reduce(List<ComputeJobResult> results) {
-        if (!F.isEmpty(results)) {
-            ComputeJobResult res = results.get(0);
-
-            return res.getData();
-        }
-        else
-            return null;
-    }
-
-    /**
-     * Job wrapper.
-     */
-    private class Job implements ComputeJob {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        @IgniteInstanceResource
-        private Ignite ignite;
-
-        /** */
-        @SuppressWarnings("UnusedDeclaration")
-        @JobContextResource
-        private ComputeJobContext jobCtx;
-
-        /** Argument. */
-        private final HadoopProtocolTaskArguments args;
-
-        /**
-         * Constructor.
-         *
-         * @param args Job argument.
-         */
-        private Job(HadoopProtocolTaskArguments args) {
-            this.args = args;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void cancel() {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Nullable @Override public Object execute() {
-            try {
-                return run(jobCtx, ((IgniteEx)ignite).hadoop(), args);
-            }
-            catch (IgniteCheckedException e) {
-                throw U.convertException(e);
-            }
-        }
-    }
-
-    /**
-     * Run the task.
-     *
-     * @param jobCtx Job context.
-     * @param hadoop Hadoop facade.
-     * @param args Arguments.
-     * @return Job result.
-     * @throws IgniteCheckedException If failed.
-     */
-    public abstract R run(ComputeJobContext jobCtx, Hadoop hadoop, HadoopProtocolTaskArguments args)
-        throws IgniteCheckedException;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e740e07/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopProtocolTaskArguments.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopProtocolTaskArguments.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopProtocolTaskArguments.java
deleted file mode 100644
index 13d226f..0000000
--- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/proto/HadoopProtocolTaskArguments.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl.proto;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Task arguments.
- */
-public class HadoopProtocolTaskArguments implements Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Arguments. */
-    private Object[] args;
-
-    /**
-     * {@link Externalizable} support.
-     */
-    public HadoopProtocolTaskArguments() {
-        // No-op.
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param args Arguments.
-     */
-    public HadoopProtocolTaskArguments(Object... args) {
-        this.args = args;
-    }
-
-    /**
-     * @param idx Argument index.
-     * @return Argument.
-     */
-    @SuppressWarnings("unchecked")
-    @Nullable public <T> T get(int idx) {
-        return (args != null && args.length > idx) ? (T)args[idx] : null;
-    }
-
-    /**
-     * @return Size.
-     */
-    public int size() {
-        return args != null ? args.length : 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        U.writeArray(out, args);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        args = U.readArray(in);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(HadoopProtocolTaskArguments.class, this);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e740e07/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java
new file mode 100644
index 0000000..8f0271c
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobCountersTask.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.proto;
+
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.compute.ComputeJobContext;
+import org.apache.ignite.internal.processors.hadoop.Hadoop;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+
+/**
+ * Task to get job counters.
+ */
+public class HadoopProtocolJobCountersTask extends HadoopProtocolTaskAdapter<HadoopCounters> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** {@inheritDoc} */
+    @Override public HadoopCounters run(ComputeJobContext jobCtx, Hadoop hadoop,
+        HadoopProtocolTaskArguments args) throws IgniteCheckedException {
+
+        UUID nodeId = UUID.fromString(args.<String>get(0));
+        Integer id = args.get(1);
+
+        assert nodeId != null;
+        assert id != null;
+
+        return hadoop.counters(new HadoopJobId(nodeId, id));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e740e07/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java
new file mode 100644
index 0000000..c08fe77
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolJobStatusTask.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.proto;
+
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.compute.ComputeJobContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.hadoop.Hadoop;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteInClosure;
+
+/**
+ * Job status task.
+ */
+public class HadoopProtocolJobStatusTask extends HadoopProtocolTaskAdapter<HadoopJobStatus> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Default poll delay */
+    private static final long DFLT_POLL_DELAY = 100L;
+
+    /** Attribute for held status. */
+    private static final String ATTR_HELD = "held";
+
+    /** {@inheritDoc} */
+    @Override public HadoopJobStatus run(final ComputeJobContext jobCtx, Hadoop hadoop,
+        HadoopProtocolTaskArguments args) throws IgniteCheckedException {
+        UUID nodeId = UUID.fromString(args.<String>get(0));
+        Integer id = args.get(1);
+        Long pollDelay = args.get(2);
+
+        assert nodeId != null;
+        assert id != null;
+
+        HadoopJobId jobId = new HadoopJobId(nodeId, id);
+
+        if (pollDelay == null)
+            pollDelay = DFLT_POLL_DELAY;
+
+        if (pollDelay > 0) {
+            IgniteInternalFuture<?> fut = hadoop.finishFuture(jobId);
+
+            if (fut != null) {
+                if (fut.isDone() || F.eq(jobCtx.getAttribute(ATTR_HELD), true))
+                    return hadoop.status(jobId);
+                else {
+                    fut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
+                        @Override public void apply(IgniteInternalFuture<?> fut0) {
+                            jobCtx.callcc();
+                        }
+                    });
+
+                    jobCtx.setAttribute(ATTR_HELD, true);
+
+                    return jobCtx.holdcc(pollDelay);
+                }
+            }
+            else
+                return null;
+        }
+        else
+            return hadoop.status(jobId);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e740e07/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java
new file mode 100644
index 0000000..0f65664
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolKillJobTask.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.proto;
+
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.compute.ComputeJobContext;
+import org.apache.ignite.internal.processors.hadoop.Hadoop;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+
+/**
+ * Kill job task.
+ */
+public class HadoopProtocolKillJobTask extends HadoopProtocolTaskAdapter<Boolean> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** {@inheritDoc} */
+    @Override public Boolean run(ComputeJobContext jobCtx, Hadoop hadoop,
+        HadoopProtocolTaskArguments args) throws IgniteCheckedException {
+        UUID nodeId = UUID.fromString(args.<String>get(0));
+        Integer id = args.get(1);
+
+        assert nodeId != null;
+        assert id != null;
+
+        HadoopJobId jobId = new HadoopJobId(nodeId, id);
+
+        return hadoop.kill(jobId);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e740e07/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java
new file mode 100644
index 0000000..bde7821
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolNextTaskIdTask.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.proto;
+
+import org.apache.ignite.compute.ComputeJobContext;
+import org.apache.ignite.internal.processors.hadoop.Hadoop;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+
+/**
+ * Task to get the next job ID.
+ */
+public class HadoopProtocolNextTaskIdTask extends HadoopProtocolTaskAdapter<HadoopJobId> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** {@inheritDoc} */
+    @Override public HadoopJobId run(ComputeJobContext jobCtx, Hadoop hadoop,
+        HadoopProtocolTaskArguments args) {
+        return hadoop.nextJobId();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e740e07/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java
new file mode 100644
index 0000000..3eb819b
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.proto;
+
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.compute.ComputeJobContext;
+import org.apache.ignite.internal.processors.hadoop.Hadoop;
+import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_CANCELLING;
+
+/**
+ * Submit job task.
+ */
+public class HadoopProtocolSubmitJobTask extends HadoopProtocolTaskAdapter<HadoopJobStatus> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** {@inheritDoc} */
+    @Override public HadoopJobStatus run(ComputeJobContext jobCtx, Hadoop hadoop,
+        HadoopProtocolTaskArguments args) throws IgniteCheckedException {
+        UUID nodeId = UUID.fromString(args.<String>get(0));
+        Integer id = args.get(1);
+        HadoopDefaultJobInfo info = args.get(2);
+
+        assert nodeId != null;
+        assert id != null;
+        assert info != null;
+
+        HadoopJobId jobId = new HadoopJobId(nodeId, id);
+
+        hadoop.submit(jobId, info);
+
+        HadoopJobStatus res = hadoop.status(jobId);
+
+        if (res == null) // Submission failed.
+            res = new HadoopJobStatus(jobId, info.jobName(), info.user(), 0, 0, 0, 0, PHASE_CANCELLING, true, 1);
+
+        return res;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e740e07/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java
new file mode 100644
index 0000000..c3227ae
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.proto;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobContext;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeJobResultPolicy;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.hadoop.Hadoop;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.JobContextResource;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Hadoop protocol task adapter.
+ */
+public abstract class HadoopProtocolTaskAdapter<R> implements ComputeTask<HadoopProtocolTaskArguments, R> {
+    /** {@inheritDoc} */
+    @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+        @Nullable HadoopProtocolTaskArguments arg) {
+        return Collections.singletonMap(new Job(arg), subgrid.get(0));
+    }
+
+    /** {@inheritDoc} */
+    @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+        return ComputeJobResultPolicy.REDUCE;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public R reduce(List<ComputeJobResult> results) {
+        if (!F.isEmpty(results)) {
+            ComputeJobResult res = results.get(0);
+
+            return res.getData();
+        }
+        else
+            return null;
+    }
+
+    /**
+     * Job wrapper.
+     */
+    private class Job implements ComputeJob {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** */
+        @SuppressWarnings("UnusedDeclaration")
+        @JobContextResource
+        private ComputeJobContext jobCtx;
+
+        /** Argument. */
+        private final HadoopProtocolTaskArguments args;
+
+        /**
+         * Constructor.
+         *
+         * @param args Job argument.
+         */
+        private Job(HadoopProtocolTaskArguments args) {
+            this.args = args;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void cancel() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Object execute() {
+            try {
+                return run(jobCtx, ((IgniteEx)ignite).hadoop(), args);
+            }
+            catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
+        }
+    }
+
+    /**
+     * Run the task.
+     *
+     * @param jobCtx Job context.
+     * @param hadoop Hadoop facade.
+     * @param args Arguments.
+     * @return Job result.
+     * @throws IgniteCheckedException If failed.
+     */
+    public abstract R run(ComputeJobContext jobCtx, Hadoop hadoop, HadoopProtocolTaskArguments args)
+        throws IgniteCheckedException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e740e07/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java
new file mode 100644
index 0000000..e497454
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.proto;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Task arguments.
+ */
+public class HadoopProtocolTaskArguments implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Arguments. */
+    private Object[] args;
+
+    /**
+     * {@link Externalizable} support.
+     */
+    public HadoopProtocolTaskArguments() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param args Arguments.
+     */
+    public HadoopProtocolTaskArguments(Object... args) {
+        this.args = args;
+    }
+
+    /**
+     * @param idx Argument index.
+     * @return Argument.
+     */
+    @SuppressWarnings("unchecked")
+    @Nullable public <T> T get(int idx) {
+        return (args != null && args.length > idx) ? (T)args[idx] : null;
+    }
+
+    /**
+     * @return Size.
+     */
+    public int size() {
+        return args != null ? args.length : 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeArray(out, args);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        args = U.readArray(in);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopProtocolTaskArguments.class, this);
+    }
+}
\ No newline at end of file