You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/03/03 14:08:33 UTC

[17/31] incubator-ignite git commit: # IGNITE-386: WIP on internal namings (3).

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2CleanupTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2CleanupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2CleanupTask.java
new file mode 100644
index 0000000..0a2af6d
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2CleanupTask.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.*;
+
+import java.io.*;
+
+/**
+ * Hadoop cleanup task (commits or aborts job).
+ */
+public class HadoopV2CleanupTask extends HadoopV2Task {
+    /** Abort flag. */
+    private final boolean abort;
+
+    /**
+     * @param taskInfo Task info.
+     * @param abort Abort flag.
+     */
+    public HadoopV2CleanupTask(GridHadoopTaskInfo taskInfo, boolean abort) {
+        super(taskInfo);
+
+        this.abort = abort;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("ConstantConditions")
+    @Override public void run0(HadoopV2TaskContext taskCtx) throws IgniteCheckedException {
+        JobContextImpl jobCtx = taskCtx.jobContext();
+
+        try {
+            OutputFormat outputFormat = getOutputFormat(jobCtx);
+
+            OutputCommitter committer = outputFormat.getOutputCommitter(hadoopContext());
+
+            if (committer != null) {
+                if (abort)
+                    committer.abortJob(jobCtx, JobStatus.State.FAILED);
+                else
+                    committer.commitJob(jobCtx);
+            }
+        }
+        catch (ClassNotFoundException | IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInterruptedCheckedException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java
new file mode 100644
index 0000000..a4b5eca
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Context.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.*;
+import org.apache.hadoop.mapreduce.task.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.counter.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Hadoop context implementation for v2 API. It provides IO operations for hadoop tasks.
+ */
+public class HadoopV2Context extends JobContextImpl implements MapContext, ReduceContext {
+    /** Input reader to overriding of GridHadoopTaskContext input. */
+    private RecordReader reader;
+
+    /** Output writer to overriding of GridHadoopTaskContext output. */
+    private RecordWriter writer;
+
+    /** Output is provided by executor environment. */
+    private final GridHadoopTaskOutput output;
+
+    /** Input is provided by executor environment. */
+    private final GridHadoopTaskInput input;
+
+    /** Unique identifier for a task attempt. */
+    private final TaskAttemptID taskAttemptID;
+
+    /** Indicates that this task is to be cancelled. */
+    private volatile boolean cancelled;
+
+    /** Input split. */
+    private InputSplit inputSplit;
+
+    /** */
+    private final GridHadoopTaskContext ctx;
+
+    /** */
+    private String status;
+
+    /**
+     * @param ctx Context for IO operations.
+     */
+    public HadoopV2Context(HadoopV2TaskContext ctx) {
+        super(ctx.jobConf(), ctx.jobContext().getJobID());
+
+        taskAttemptID = ctx.attemptId();
+
+        conf.set("mapreduce.job.id", taskAttemptID.getJobID().toString());
+        conf.set("mapreduce.task.id", taskAttemptID.getTaskID().toString());
+
+        output = ctx.output();
+        input = ctx.input();
+
+        this.ctx = ctx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public InputSplit getInputSplit() {
+        if (inputSplit == null) {
+            GridHadoopInputSplit split = ctx.taskInfo().inputSplit();
+
+            if (split == null)
+                return null;
+
+            if (split instanceof GridHadoopFileBlock) {
+                GridHadoopFileBlock fileBlock = (GridHadoopFileBlock)split;
+
+                inputSplit = new FileSplit(new Path(fileBlock.file()), fileBlock.start(), fileBlock.length(), null);
+            }
+            else if (split instanceof HadoopExternalSplit)
+                throw new UnsupportedOperationException(); // TODO
+            else if (split instanceof HadoopSplitWrapper)
+                inputSplit = (InputSplit) HadoopUtils.unwrapSplit((HadoopSplitWrapper) split);
+            else
+                throw new IllegalStateException();
+        }
+
+        return inputSplit;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean nextKeyValue() throws IOException, InterruptedException {
+        if (cancelled)
+            throw new HadoopTaskCancelledException("Task cancelled.");
+
+        return reader.nextKeyValue();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object getCurrentKey() throws IOException, InterruptedException {
+        if (reader != null)
+            return reader.getCurrentKey();
+
+        return input.key();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object getCurrentValue() throws IOException, InterruptedException {
+        return reader.getCurrentValue();
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void write(Object key, Object val) throws IOException, InterruptedException {
+        if (cancelled)
+            throw new HadoopTaskCancelledException("Task cancelled.");
+
+        if (writer != null)
+            writer.write(key, val);
+        else {
+            try {
+                output.write(key, val);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IOException(e);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public OutputCommitter getOutputCommitter() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public TaskAttemptID getTaskAttemptID() {
+        return taskAttemptID;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setStatus(String msg) {
+        status = msg;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getStatus() {
+        return status;
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getProgress() {
+        return 0.5f; // TODO
+    }
+
+    /** {@inheritDoc} */
+    @Override public Counter getCounter(Enum<?> cntrName) {
+        return getCounter(cntrName.getDeclaringClass().getName(), cntrName.name());
+    }
+
+    /** {@inheritDoc} */
+    @Override public Counter getCounter(String grpName, String cntrName) {
+        return new HadoopV2Counter(ctx.counter(grpName, cntrName, HadoopLongCounter.class));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void progress() {
+        // No-op.
+    }
+
+    /**
+     * Overrides default input data reader.
+     *
+     * @param reader New reader.
+     */
+    public void reader(RecordReader reader) {
+        this.reader = reader;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean nextKey() throws IOException, InterruptedException {
+        if (cancelled)
+            throw new HadoopTaskCancelledException("Task cancelled.");
+
+        return input.next();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Iterable getValues() throws IOException, InterruptedException {
+        return new Iterable() {
+            @Override public Iterator iterator() {
+                return input.values();
+            }
+        };
+    }
+
+    /**
+     * @return Overridden output data writer.
+     */
+    public RecordWriter writer() {
+        return writer;
+    }
+
+    /**
+     * Overrides default output data writer.
+     *
+     * @param writer New writer.
+     */
+    public void writer(RecordWriter writer) {
+        this.writer = writer;
+    }
+
+    /**
+     * Cancels the task by stop the IO.
+     */
+    public void cancel() {
+        cancelled = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Counter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Counter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Counter.java
new file mode 100644
index 0000000..96ede0d
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Counter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import org.apache.hadoop.mapreduce.*;
+import org.apache.ignite.internal.processors.hadoop.counter.*;
+
+import java.io.*;
+
+/**
+ * Adapter from own counter implementation into Hadoop API Counter od version 2.0.
+ */
+public class HadoopV2Counter implements Counter {
+    /** Delegate. */
+    private final HadoopLongCounter cntr;
+
+    /**
+     * Creates new instance with given delegate.
+     *
+     * @param cntr Internal counter.
+     */
+    public HadoopV2Counter(HadoopLongCounter cntr) {
+        assert cntr != null : "counter must be non-null";
+
+        this.cntr = cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setDisplayName(String displayName) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getName() {
+        return cntr.name();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getDisplayName() {
+        return getName();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getValue() {
+        return cntr.value();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setValue(long val) {
+        cntr.value(val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void increment(long incr) {
+        cntr.increment(incr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Counter getUnderlyingCounter() {
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(DataOutput out) throws IOException {
+        throw new UnsupportedOperationException("not implemented");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readFields(DataInput in) throws IOException {
+        throw new UnsupportedOperationException("not implemented");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
new file mode 100644
index 0000000..47535e8
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.split.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.fs.*;
+import org.apache.ignite.internal.processors.hadoop.v1.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jdk8.backport.*;
+
+import java.io.*;
+import java.lang.reflect.*;
+import java.util.*;
+import java.util.Queue;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
+
+/**
+ * Hadoop job implementation for v2 API.
+ */
+public class HadoopV2Job implements GridHadoopJob {
+    /** */
+    private final JobConf jobConf;
+
+    /** */
+    private final JobContextImpl jobCtx;
+
+    /** Hadoop job ID. */
+    private final GridHadoopJobId jobId;
+
+    /** Job info. */
+    protected GridHadoopJobInfo jobInfo;
+
+    /** */
+    private final JobID hadoopJobID;
+
+    /** */
+    private final HadoopV2JobResourceManager rsrcMgr;
+
+    /** */
+    private final ConcurrentMap<T2<GridHadoopTaskType, Integer>, GridFutureAdapter<GridHadoopTaskContext>> ctxs =
+        new ConcurrentHashMap8<>();
+
+    /** Pooling task context class and thus class loading environment. */
+    private final Queue<Class<?>> taskCtxClsPool = new ConcurrentLinkedQueue<>();
+
+    /** Local node ID */
+    private UUID locNodeId;
+
+    /** Serialized JobConf. */
+    private volatile byte[] jobConfData;
+
+    /**
+     * @param jobId Job ID.
+     * @param jobInfo Job info.
+     * @param log Logger.
+     */
+    public HadoopV2Job(GridHadoopJobId jobId, final HadoopDefaultJobInfo jobInfo, IgniteLogger log) {
+        assert jobId != null;
+        assert jobInfo != null;
+
+        this.jobId = jobId;
+        this.jobInfo = jobInfo;
+
+        hadoopJobID = new JobID(jobId.globalId().toString(), jobId.localId());
+
+        HadoopClassLoader clsLdr = (HadoopClassLoader)getClass().getClassLoader();
+
+        // Before create JobConf instance we should set new context class loader.
+        Thread.currentThread().setContextClassLoader(clsLdr);
+
+        jobConf = new JobConf();
+
+        HadoopFileSystemsUtils.setupFileSystems(jobConf);
+
+        Thread.currentThread().setContextClassLoader(null);
+
+        for (Map.Entry<String,String> e : jobInfo.properties().entrySet())
+            jobConf.set(e.getKey(), e.getValue());
+
+        jobCtx = new JobContextImpl(jobConf, hadoopJobID);
+
+        rsrcMgr = new HadoopV2JobResourceManager(jobId, jobCtx, log);
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridHadoopJobId id() {
+        return jobId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridHadoopJobInfo info() {
+        return jobInfo;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<GridHadoopInputSplit> input() throws IgniteCheckedException {
+        Thread.currentThread().setContextClassLoader(jobConf.getClassLoader());
+
+        try {
+            String jobDirPath = jobConf.get(MRJobConfig.MAPREDUCE_JOB_DIR);
+
+            if (jobDirPath == null) { // Probably job was submitted not by hadoop client.
+                // Assume that we have needed classes and try to generate input splits ourself.
+                if (jobConf.getUseNewMapper())
+                    return HadoopV2Splitter.splitJob(jobCtx);
+                else
+                    return HadoopV1Splitter.splitJob(jobConf);
+            }
+
+            Path jobDir = new Path(jobDirPath);
+
+            try (FileSystem fs = FileSystem.get(jobDir.toUri(), jobConf)) {
+                JobSplit.TaskSplitMetaInfo[] metaInfos = SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf,
+                    jobDir);
+
+                if (F.isEmpty(metaInfos))
+                    throw new IgniteCheckedException("No input splits found.");
+
+                Path splitsFile = JobSubmissionFiles.getJobSplitFile(jobDir);
+
+                try (FSDataInputStream in = fs.open(splitsFile)) {
+                    Collection<GridHadoopInputSplit> res = new ArrayList<>(metaInfos.length);
+
+                    for (JobSplit.TaskSplitMetaInfo metaInfo : metaInfos) {
+                        long off = metaInfo.getStartOffset();
+
+                        String[] hosts = metaInfo.getLocations();
+
+                        in.seek(off);
+
+                        String clsName = Text.readString(in);
+
+                        GridHadoopFileBlock block = HadoopV1Splitter.readFileBlock(clsName, in, hosts);
+
+                        if (block == null)
+                            block = HadoopV2Splitter.readFileBlock(clsName, in, hosts);
+
+                        res.add(block != null ? block : new HadoopExternalSplit(hosts, off));
+                    }
+
+                    return res;
+                }
+            }
+            catch (Throwable e) {
+                throw transformException(e);
+            }
+        }
+        finally {
+            Thread.currentThread().setContextClassLoader(null);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridHadoopTaskContext getTaskContext(GridHadoopTaskInfo info) throws IgniteCheckedException {
+        T2<GridHadoopTaskType, Integer> locTaskId = new T2<>(info.type(),  info.taskNumber());
+
+        GridFutureAdapter<GridHadoopTaskContext> fut = ctxs.get(locTaskId);
+
+        if (fut != null)
+            return fut.get();
+
+        GridFutureAdapter<GridHadoopTaskContext> old = ctxs.putIfAbsent(locTaskId, fut = new GridFutureAdapter<>());
+
+        if (old != null)
+            return old.get();
+
+        Class<?> cls = taskCtxClsPool.poll();
+
+        try {
+            if (cls == null) {
+                // If there is no pooled class, then load new one.
+                HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath());
+
+                cls = ldr.loadClass(HadoopV2TaskContext.class.getName());
+            }
+
+            Constructor<?> ctr = cls.getConstructor(GridHadoopTaskInfo.class, GridHadoopJob.class,
+                GridHadoopJobId.class, UUID.class, DataInput.class);
+
+            if (jobConfData == null)
+                synchronized(jobConf) {
+                    if (jobConfData == null) {
+                        ByteArrayOutputStream buf = new ByteArrayOutputStream();
+
+                        jobConf.write(new DataOutputStream(buf));
+
+                        jobConfData = buf.toByteArray();
+                    }
+                }
+
+            GridHadoopTaskContext res = (GridHadoopTaskContext)ctr.newInstance(info, this, jobId, locNodeId,
+                new DataInputStream(new ByteArrayInputStream(jobConfData)));
+
+            fut.onDone(res);
+
+            return res;
+        }
+        catch (Throwable e) {
+            IgniteCheckedException te = transformException(e);
+
+            fut.onDone(te);
+
+            throw te;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initialize(boolean external, UUID locNodeId) throws IgniteCheckedException {
+        this.locNodeId = locNodeId;
+
+        Thread.currentThread().setContextClassLoader(jobConf.getClassLoader());
+
+        try {
+            rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(locNodeId, jobId));
+        }
+        finally {
+            Thread.currentThread().setContextClassLoader(null);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void dispose(boolean external) throws IgniteCheckedException {
+        if (rsrcMgr != null && !external) {
+            File jobLocDir = jobLocalDir(locNodeId, jobId);
+
+            if (jobLocDir.exists())
+                U.delete(jobLocDir);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepareTaskEnvironment(GridHadoopTaskInfo info) throws IgniteCheckedException {
+        rsrcMgr.prepareTaskWorkDir(taskLocalDir(locNodeId, info));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cleanupTaskEnvironment(GridHadoopTaskInfo info) throws IgniteCheckedException {
+        GridHadoopTaskContext ctx = ctxs.remove(new T2<>(info.type(), info.taskNumber())).get();
+
+        taskCtxClsPool.offer(ctx.getClass());
+
+        File locDir = taskLocalDir(locNodeId, info);
+
+        if (locDir.exists())
+            U.delete(locDir);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cleanupStagingDirectory() {
+        if (rsrcMgr != null)
+            rsrcMgr.cleanupStagingDirectory();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
new file mode 100644
index 0000000..04481bb
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.util.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.fs.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+import java.nio.file.*;
+import java.util.*;
+
+/**
+ * Provides all resources are needed to the job execution. Downloads the main jar, the configuration and additional
+ * files are needed to be placed on local files system.
+ */
+public class HadoopV2JobResourceManager {
+    /** Hadoop job context. */
+    private final JobContextImpl ctx;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Job ID. */
+    private final GridHadoopJobId jobId;
+
+    /** Class path list. */
+    private URL[] clsPath;
+
+    /** Set of local resources. */
+    private final Collection<File> rsrcSet = new HashSet<>();
+
+    /** Staging directory to delivery job jar and config to the work nodes. */
+    private Path stagingDir;
+
+    /**
+     * Creates new instance.
+     * @param jobId Job ID.
+     * @param ctx Hadoop job context.
+     * @param log Logger.
+     */
+    public HadoopV2JobResourceManager(GridHadoopJobId jobId, JobContextImpl ctx, IgniteLogger log) {
+        this.jobId = jobId;
+        this.ctx = ctx;
+        this.log = log.getLogger(HadoopV2JobResourceManager.class);
+    }
+
+    /**
+     * Set working directory in local file system.
+     *
+     * @param dir Working directory.
+     * @throws IOException If fails.
+     */
+    private void setLocalFSWorkingDirectory(File dir) throws IOException {
+        JobConf cfg = ctx.getJobConf();
+
+        Thread.currentThread().setContextClassLoader(cfg.getClassLoader());
+
+        try {
+            cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, dir.getAbsolutePath());
+
+            if(!cfg.getBoolean("fs.file.impl.disable.cache", false))
+                FileSystem.getLocal(cfg).setWorkingDirectory(new Path(dir.getAbsolutePath()));
+        }
+        finally {
+            Thread.currentThread().setContextClassLoader(null);
+        }
+    }
+
+    /**
+     * Prepare job resources. Resolve the classpath list and download it if needed.
+     *
+     * @param download {@code true} If need to download resources.
+     * @param jobLocDir Work directory for the job.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void prepareJobEnvironment(boolean download, File jobLocDir) throws IgniteCheckedException {
+        try {
+            if (jobLocDir.exists())
+                throw new IgniteCheckedException("Local job directory already exists: " + jobLocDir.getAbsolutePath());
+
+            JobConf cfg = ctx.getJobConf();
+
+            String mrDir = cfg.get("mapreduce.job.dir");
+
+            if (mrDir != null) {
+                stagingDir = new Path(new URI(mrDir));
+
+                if (download) {
+                    FileSystem fs = FileSystem.get(stagingDir.toUri(), cfg);
+
+                    if (!fs.exists(stagingDir))
+                        throw new IgniteCheckedException("Failed to find map-reduce submission directory (does not exist): " +
+                            stagingDir);
+
+                    if (!FileUtil.copy(fs, stagingDir, jobLocDir, false, cfg))
+                        throw new IgniteCheckedException("Failed to copy job submission directory contents to local file system " +
+                            "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath() + ", jobId=" + jobId + ']');
+                }
+
+                File jarJobFile = new File(jobLocDir, "job.jar");
+
+                Collection<URL> clsPathUrls = new ArrayList<>();
+
+                clsPathUrls.add(jarJobFile.toURI().toURL());
+
+                rsrcSet.add(jarJobFile);
+                rsrcSet.add(new File(jobLocDir, "job.xml"));
+
+                processFiles(jobLocDir, ctx.getCacheFiles(), download, false, null, MRJobConfig.CACHE_LOCALFILES);
+                processFiles(jobLocDir, ctx.getCacheArchives(), download, true, null, MRJobConfig.CACHE_LOCALARCHIVES);
+                processFiles(jobLocDir, ctx.getFileClassPaths(), download, false, clsPathUrls, null);
+                processFiles(jobLocDir, ctx.getArchiveClassPaths(), download, true, clsPathUrls, null);
+
+                if (!clsPathUrls.isEmpty()) {
+                    clsPath = new URL[clsPathUrls.size()];
+
+                    clsPathUrls.toArray(clsPath);
+                }
+            }
+            else if (!jobLocDir.mkdirs())
+                throw new IgniteCheckedException("Failed to create local job directory: " + jobLocDir.getAbsolutePath());
+
+            setLocalFSWorkingDirectory(jobLocDir);
+        }
+        catch (URISyntaxException | IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /**
+     * Process list of resources.
+     *
+     * @param jobLocDir Job working directory.
+     * @param files Array of {@link java.net.URI} or {@link org.apache.hadoop.fs.Path} to process resources.
+     * @param download {@code true}, if need to download. Process class path only else.
+     * @param extract {@code true}, if need to extract archive.
+     * @param clsPathUrls Collection to add resource as classpath resource.
+     * @param rsrcNameProp Property for resource name array setting.
+     * @throws IOException If failed.
+     */
+    private void processFiles(File jobLocDir, @Nullable Object[] files, boolean download, boolean extract,
+        @Nullable Collection<URL> clsPathUrls, @Nullable String rsrcNameProp) throws IOException {
+        if (F.isEmptyOrNulls(files))
+            return;
+
+        Collection<String> res = new ArrayList<>();
+
+        for (Object pathObj : files) {
+            String locName = null;
+            Path srcPath;
+
+            if (pathObj instanceof URI) {
+                URI uri = (URI)pathObj;
+
+                locName = uri.getFragment();
+
+                srcPath = new Path(uri);
+            }
+            else
+                srcPath = (Path)pathObj;
+
+            if (locName == null)
+                locName = srcPath.getName();
+
+            File dstPath = new File(jobLocDir.getAbsolutePath(), locName);
+
+            res.add(locName);
+
+            rsrcSet.add(dstPath);
+
+            if (clsPathUrls != null)
+                clsPathUrls.add(dstPath.toURI().toURL());
+
+            if (!download)
+                continue;
+
+            JobConf cfg = ctx.getJobConf();
+
+            FileSystem dstFs = FileSystem.getLocal(cfg);
+
+            FileSystem srcFs = srcPath.getFileSystem(cfg);
+
+            if (extract) {
+                File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives");
+
+                if (!archivesPath.exists() && !archivesPath.mkdir())
+                    throw new IOException("Failed to create directory " +
+                         "[path=" + archivesPath + ", jobId=" + jobId + ']');
+
+                File archiveFile = new File(archivesPath, locName);
+
+                FileUtil.copy(srcFs, srcPath, dstFs, new Path(archiveFile.toString()), false, cfg);
+
+                String archiveNameLC = archiveFile.getName().toLowerCase();
+
+                if (archiveNameLC.endsWith(".jar"))
+                    RunJar.unJar(archiveFile, dstPath);
+                else if (archiveNameLC.endsWith(".zip"))
+                    FileUtil.unZip(archiveFile, dstPath);
+                else if (archiveNameLC.endsWith(".tar.gz") ||
+                    archiveNameLC.endsWith(".tgz") ||
+                    archiveNameLC.endsWith(".tar"))
+                    FileUtil.unTar(archiveFile, dstPath);
+                else
+                    throw new IOException("Cannot unpack archive [path=" + srcPath + ", jobId=" + jobId + ']');
+            }
+            else
+                FileUtil.copy(srcFs, srcPath, dstFs, new Path(dstPath.toString()), false, cfg);
+        }
+
+        if (!res.isEmpty() && rsrcNameProp != null)
+            ctx.getJobConf().setStrings(rsrcNameProp, res.toArray(new String[res.size()]));
+    }
+
+    /**
+     * Prepares working directory for the task.
+     *
+     * <ul>
+     *     <li>Creates working directory.</li>
+     *     <li>Creates symbolic links to all job resources in working directory.</li>
+     * </ul>
+     *
+     * @param path Path to working directory of the task.
+     * @throws IgniteCheckedException If fails.
+     */
+    public void prepareTaskWorkDir(File path) throws IgniteCheckedException {
+        try {
+            if (path.exists())
+                throw new IOException("Task local directory already exists: " + path);
+
+            if (!path.mkdir())
+                throw new IOException("Failed to create directory: " + path);
+
+            for (File resource : rsrcSet) {
+                File symLink = new File(path, resource.getName());
+
+                try {
+                    Files.createSymbolicLink(symLink.toPath(), resource.toPath());
+                }
+                catch (IOException e) {
+                    String msg = "Unable to create symlink \"" + symLink + "\" to \"" + resource + "\".";
+
+                    if (U.isWindows() && e instanceof FileSystemException)
+                        msg += "\n\nAbility to create symbolic links is required!\n" +
+                                "On Windows platform you have to grant permission 'Create symbolic links'\n" +
+                                "to your user or run the Accelerator as Administrator.\n";
+
+                    throw new IOException(msg, e);
+                }
+            }
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException("Unable to prepare local working directory for the task " +
+                 "[jobId=" + jobId + ", path=" + path+ ']', e);
+        }
+    }
+
+    /**
+     * Cleans up job staging directory.
+     */
+    public void cleanupStagingDirectory() {
+        try {
+            if (stagingDir != null)
+                stagingDir.getFileSystem(ctx.getJobConf()).delete(stagingDir, true);
+        }
+        catch (Exception e) {
+            log.error("Failed to remove job staging directory [path=" + stagingDir + ", jobId=" + jobId + ']' , e);
+        }
+    }
+
+    /**
+     * Returns array of class path for current job.
+     *
+     * @return Class path collection.
+     */
+    @Nullable public URL[] classPath() {
+        return clsPath;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java
new file mode 100644
index 0000000..afa203f
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.map.*;
+import org.apache.hadoop.util.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.*;
+
+/**
+ * Hadoop map task implementation for v2 API.
+ */
+public class HadoopV2MapTask extends HadoopV2Task {
+    /**
+     * @param taskInfo Task info.
+     */
+    public HadoopV2MapTask(GridHadoopTaskInfo taskInfo) {
+        super(taskInfo);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"ConstantConditions", "unchecked"})
+    @Override public void run0(HadoopV2TaskContext taskCtx) throws IgniteCheckedException {
+        GridHadoopInputSplit split = info().inputSplit();
+
+        InputSplit nativeSplit;
+
+        if (split instanceof GridHadoopFileBlock) {
+            GridHadoopFileBlock block = (GridHadoopFileBlock)split;
+
+            nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), null);
+        }
+        else
+            nativeSplit = (InputSplit)taskCtx.getNativeSplit(split);
+
+        assert nativeSplit != null;
+
+        OutputFormat outputFormat = null;
+        Exception err = null;
+
+        JobContextImpl jobCtx = taskCtx.jobContext();
+
+        try {
+            InputFormat inFormat = ReflectionUtils.newInstance(jobCtx.getInputFormatClass(),
+                hadoopContext().getConfiguration());
+
+            RecordReader reader = inFormat.createRecordReader(nativeSplit, hadoopContext());
+
+            reader.initialize(nativeSplit, hadoopContext());
+
+            hadoopContext().reader(reader);
+
+            GridHadoopJobInfo jobInfo = taskCtx.job().info();
+
+            outputFormat = jobInfo.hasCombiner() || jobInfo.hasReducer() ? null : prepareWriter(jobCtx);
+
+            Mapper mapper = ReflectionUtils.newInstance(jobCtx.getMapperClass(), hadoopContext().getConfiguration());
+
+            try {
+                mapper.run(new WrappedMapper().getMapContext(hadoopContext()));
+            }
+            finally {
+                closeWriter();
+            }
+
+            commit(outputFormat);
+        }
+        catch (InterruptedException e) {
+            err = e;
+
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInterruptedCheckedException(e);
+        }
+        catch (Exception e) {
+            err = e;
+
+            throw new IgniteCheckedException(e);
+        }
+        finally {
+            if (err != null)
+                abort(outputFormat);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Partitioner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Partitioner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Partitioner.java
new file mode 100644
index 0000000..83e713b
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Partitioner.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.util.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+
+/**
+ * Hadoop partitioner adapter for v2 API.
+ */
+public class HadoopV2Partitioner implements GridHadoopPartitioner {
+    /** Partitioner instance. */
+    private Partitioner<Object, Object> part;
+
+    /**
+     * @param cls Hadoop partitioner class.
+     * @param conf Job configuration.
+     */
+    public HadoopV2Partitioner(Class<? extends Partitioner<?, ?>> cls, Configuration conf) {
+        part = (Partitioner<Object, Object>) ReflectionUtils.newInstance(cls, conf);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int partition(Object key, Object val, int parts) {
+        return part.getPartition(key, val, parts);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2ReduceTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2ReduceTask.java
new file mode 100644
index 0000000..66ff542
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2ReduceTask.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.reduce.*;
+import org.apache.hadoop.util.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.*;
+
+/**
+ * Hadoop reduce task implementation for v2 API.
+ */
+public class HadoopV2ReduceTask extends HadoopV2Task {
+    /** {@code True} if reduce, {@code false} if combine. */
+    private final boolean reduce;
+
+    /**
+     * Constructor.
+     *
+     * @param taskInfo Task info.
+     * @param reduce {@code True} if reduce, {@code false} if combine.
+     */
+    public HadoopV2ReduceTask(GridHadoopTaskInfo taskInfo, boolean reduce) {
+        super(taskInfo);
+
+        this.reduce = reduce;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"ConstantConditions", "unchecked"})
+    @Override public void run0(HadoopV2TaskContext taskCtx) throws IgniteCheckedException {
+        OutputFormat outputFormat = null;
+        Exception err = null;
+
+        JobContextImpl jobCtx = taskCtx.jobContext();
+
+        try {
+            outputFormat = reduce || !taskCtx.job().info().hasReducer() ? prepareWriter(jobCtx) : null;
+
+            Reducer reducer = ReflectionUtils.newInstance(reduce ? jobCtx.getReducerClass() : jobCtx.getCombinerClass(),
+                jobCtx.getConfiguration());
+
+            try {
+                reducer.run(new WrappedReducer().getReducerContext(hadoopContext()));
+            }
+            finally {
+                closeWriter();
+            }
+
+            commit(outputFormat);
+        }
+        catch (InterruptedException e) {
+            err = e;
+
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInterruptedCheckedException(e);
+        }
+        catch (Exception e) {
+            err = e;
+
+            throw new IgniteCheckedException(e);
+        }
+        finally {
+            if (err != null)
+                abort(outputFormat);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2SetupTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2SetupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2SetupTask.java
new file mode 100644
index 0000000..d0ac792
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2SetupTask.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.*;
+
+import java.io.*;
+
+/**
+ * Hadoop setup task (prepares job).
+ */
+public class HadoopV2SetupTask extends HadoopV2Task {
+    /**
+     * Constructor.
+     *
+     * @param taskInfo task info.
+     */
+    public HadoopV2SetupTask(GridHadoopTaskInfo taskInfo) {
+        super(taskInfo);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("ConstantConditions")
+    @Override protected void run0(HadoopV2TaskContext taskCtx) throws IgniteCheckedException {
+        try {
+            JobContextImpl jobCtx = taskCtx.jobContext();
+
+            OutputFormat outputFormat = getOutputFormat(jobCtx);
+
+            outputFormat.checkOutputSpecs(jobCtx);
+
+            OutputCommitter committer = outputFormat.getOutputCommitter(hadoopContext());
+
+            if (committer != null)
+                committer.setupJob(jobCtx);
+        }
+        catch (ClassNotFoundException | IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInterruptedCheckedException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Splitter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Splitter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Splitter.java
new file mode 100644
index 0000000..d524994
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Splitter.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.*;
+import org.apache.hadoop.util.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Hadoop API v2 splitter.
+ */
+public class HadoopV2Splitter {
+    /** */
+    private static final String[] EMPTY_HOSTS = {};
+
+    /**
+     * @param ctx Job context.
+     * @return Collection of mapped splits.
+     * @throws IgniteCheckedException If mapping failed.
+     */
+    public static Collection<GridHadoopInputSplit> splitJob(JobContext ctx) throws IgniteCheckedException {
+        try {
+            InputFormat<?, ?> format = ReflectionUtils.newInstance(ctx.getInputFormatClass(), ctx.getConfiguration());
+
+            assert format != null;
+
+            List<InputSplit> splits = format.getSplits(ctx);
+
+            Collection<GridHadoopInputSplit> res = new ArrayList<>(splits.size());
+
+            int id = 0;
+
+            for (InputSplit nativeSplit : splits) {
+                if (nativeSplit instanceof FileSplit) {
+                    FileSplit s = (FileSplit)nativeSplit;
+
+                    res.add(new GridHadoopFileBlock(s.getLocations(), s.getPath().toUri(), s.getStart(), s.getLength()));
+                }
+                else
+                    res.add(HadoopUtils.wrapSplit(id, nativeSplit, nativeSplit.getLocations()));
+
+                id++;
+            }
+
+            return res;
+        }
+        catch (IOException | ClassNotFoundException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInterruptedCheckedException(e);
+        }
+    }
+
+    /**
+     * @param clsName Input split class name.
+     * @param in Input stream.
+     * @param hosts Optional hosts.
+     * @return File block or {@code null} if it is not a {@link FileSplit} instance.
+     * @throws IgniteCheckedException If failed.
+     */
+    public static GridHadoopFileBlock readFileBlock(String clsName, DataInput in, @Nullable String[] hosts)
+        throws IgniteCheckedException {
+        if (!FileSplit.class.getName().equals(clsName))
+            return null;
+
+        FileSplit split = new FileSplit();
+
+        try {
+            split.readFields(in);
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+
+        if (hosts == null)
+            hosts = EMPTY_HOSTS;
+
+        return new GridHadoopFileBlock(hosts, split.getPath().toUri(), split.getStart(), split.getLength());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Task.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Task.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Task.java
new file mode 100644
index 0000000..04c76ee
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Task.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.util.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Extended Hadoop v2 task.
+ */
+public abstract class HadoopV2Task extends GridHadoopTask {
+    /** Hadoop context. */
+    private HadoopV2Context hadoopCtx;
+
+    /**
+     * Constructor.
+     *
+     * @param taskInfo Task info.
+     */
+    protected HadoopV2Task(GridHadoopTaskInfo taskInfo) {
+        super(taskInfo);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
+        HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+
+        hadoopCtx = new HadoopV2Context(ctx);
+
+        run0(ctx);
+    }
+
+    /**
+     * Internal task routine.
+     *
+     * @param taskCtx Task context.
+     * @throws IgniteCheckedException
+     */
+    protected abstract void run0(HadoopV2TaskContext taskCtx) throws IgniteCheckedException;
+
+    /**
+     * @return hadoop context.
+     */
+    protected HadoopV2Context hadoopContext() {
+        return hadoopCtx;
+    }
+
+    /**
+     * Create and configure an OutputFormat instance.
+     *
+     * @param jobCtx Job context.
+     * @return Instance of OutputFormat is specified in job configuration.
+     * @throws ClassNotFoundException If specified class not found.
+     */
+    protected OutputFormat getOutputFormat(JobContext jobCtx) throws ClassNotFoundException {
+        return ReflectionUtils.newInstance(jobCtx.getOutputFormatClass(), hadoopContext().getConfiguration());
+    }
+
+    /**
+     * Put write into Hadoop context and return associated output format instance.
+     *
+     * @param jobCtx Job context.
+     * @return Output format.
+     * @throws IgniteCheckedException In case of Grid exception.
+     * @throws InterruptedException In case of interrupt.
+     */
+    protected OutputFormat prepareWriter(JobContext jobCtx)
+        throws IgniteCheckedException, InterruptedException {
+        try {
+            OutputFormat outputFormat = getOutputFormat(jobCtx);
+
+            assert outputFormat != null;
+
+            OutputCommitter outCommitter = outputFormat.getOutputCommitter(hadoopCtx);
+
+            if (outCommitter != null)
+                outCommitter.setupTask(hadoopCtx);
+
+            RecordWriter writer = outputFormat.getRecordWriter(hadoopCtx);
+
+            hadoopCtx.writer(writer);
+
+            return outputFormat;
+        }
+        catch (IOException | ClassNotFoundException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /**
+     * Closes writer.
+     *
+     * @throws Exception If fails and logger hasn't been specified.
+     */
+    protected void closeWriter() throws Exception {
+        RecordWriter writer = hadoopCtx.writer();
+
+        if (writer != null)
+            writer.close(hadoopCtx);
+    }
+
+    /**
+     * Setup task.
+     *
+     * @param outputFormat Output format.
+     * @throws IOException In case of IO exception.
+     * @throws InterruptedException In case of interrupt.
+     */
+    protected void setup(@Nullable OutputFormat outputFormat) throws IOException, InterruptedException {
+        if (hadoopCtx.writer() != null) {
+            assert outputFormat != null;
+
+            outputFormat.getOutputCommitter(hadoopCtx).setupTask(hadoopCtx);
+        }
+    }
+
+    /**
+     * Commit task.
+     *
+     * @param outputFormat Output format.
+     * @throws IgniteCheckedException In case of Grid exception.
+     * @throws IOException In case of IO exception.
+     * @throws InterruptedException In case of interrupt.
+     */
+    protected void commit(@Nullable OutputFormat outputFormat) throws IgniteCheckedException, IOException, InterruptedException {
+        if (hadoopCtx.writer() != null) {
+            assert outputFormat != null;
+
+            OutputCommitter outputCommitter = outputFormat.getOutputCommitter(hadoopCtx);
+
+            if (outputCommitter.needsTaskCommit(hadoopCtx))
+                outputCommitter.commitTask(hadoopCtx);
+        }
+    }
+
+    /**
+     * Abort task.
+     *
+     * @param outputFormat Output format.
+     */
+    protected void abort(@Nullable OutputFormat outputFormat) {
+        if (hadoopCtx.writer() != null) {
+            assert outputFormat != null;
+
+            try {
+                outputFormat.getOutputCommitter(hadoopCtx).abortTask(hadoopCtx);
+            }
+            catch (IOException ignore) {
+                // Ignore.
+            }
+            catch (InterruptedException ignore) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancel() {
+        hadoopCtx.cancel();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
new file mode 100644
index 0000000..65f6629
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.serializer.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.counter.*;
+import org.apache.ignite.internal.processors.hadoop.fs.*;
+import org.apache.ignite.internal.processors.hadoop.v1.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*;
+import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*;
+
+/**
+ * Context for task execution.
+ */
+public class HadoopV2TaskContext extends GridHadoopTaskContext {
+    /** */
+    private static final boolean COMBINE_KEY_GROUPING_SUPPORTED;
+
+    /**
+     * Check for combiner grouping support (available since Hadoop 2.3).
+     */
+    static {
+        boolean ok;
+
+        try {
+            JobContext.class.getDeclaredMethod("getCombinerKeyGroupingComparator");
+
+            ok = true;
+        }
+        catch (NoSuchMethodException ignore) {
+            ok = false;
+        }
+
+        COMBINE_KEY_GROUPING_SUPPORTED = ok;
+    }
+
+    /** Flag is set if new context-object code is used for running the mapper. */
+    private final boolean useNewMapper;
+
+    /** Flag is set if new context-object code is used for running the reducer. */
+    private final boolean useNewReducer;
+
+    /** Flag is set if new context-object code is used for running the combiner. */
+    private final boolean useNewCombiner;
+
+    /** */
+    private final JobContextImpl jobCtx;
+
+    /** Set if task is to cancelling. */
+    private volatile boolean cancelled;
+
+    /** Current task. */
+    private volatile GridHadoopTask task;
+
+    /** Local node ID */
+    private UUID locNodeId;
+
+    /** Counters for task. */
+    private final GridHadoopCounters cntrs = new HadoopCountersImpl();
+
+    /**
+     * @param taskInfo Task info.
+     * @param job Job.
+     * @param jobId Job ID.
+     * @param locNodeId Local node ID.
+     * @param jobConfDataInput DataInput for read JobConf.
+     */
+    public HadoopV2TaskContext(GridHadoopTaskInfo taskInfo, GridHadoopJob job, GridHadoopJobId jobId,
+        @Nullable UUID locNodeId, DataInput jobConfDataInput) throws IgniteCheckedException {
+        super(taskInfo, job);
+        this.locNodeId = locNodeId;
+
+        // Before create JobConf instance we should set new context class loader.
+        Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+
+        try {
+            JobConf jobConf = new JobConf();
+
+            try {
+                jobConf.readFields(jobConfDataInput);
+            }
+            catch (IOException e) {
+                throw new IgniteCheckedException(e);
+            }
+
+            // For map-reduce jobs prefer local writes.
+            jobConf.setBooleanIfUnset(PARAM_IGFS_PREFER_LOCAL_WRITES, true);
+
+            jobCtx = new JobContextImpl(jobConf, new JobID(jobId.globalId().toString(), jobId.localId()));
+
+            useNewMapper = jobConf.getUseNewMapper();
+            useNewReducer = jobConf.getUseNewReducer();
+            useNewCombiner = jobConf.getCombinerClass() == null;
+        }
+        finally {
+            Thread.currentThread().setContextClassLoader(null);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T extends GridHadoopCounter> T counter(String grp, String name, Class<T> cls) {
+        return cntrs.counter(grp, name, cls);
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridHadoopCounters counters() {
+        return cntrs;
+    }
+
+    /**
+     * Creates appropriate task from current task info.
+     *
+     * @return Task.
+     */
+    private GridHadoopTask createTask() {
+        boolean isAbort = taskInfo().type() == GridHadoopTaskType.ABORT;
+
+        switch (taskInfo().type()) {
+            case SETUP:
+                return useNewMapper ? new HadoopV2SetupTask(taskInfo()) : new HadoopV1SetupTask(taskInfo());
+
+            case MAP:
+                return useNewMapper ? new HadoopV2MapTask(taskInfo()) : new HadoopV1MapTask(taskInfo());
+
+            case REDUCE:
+                return useNewReducer ? new HadoopV2ReduceTask(taskInfo(), true) :
+                    new HadoopV1ReduceTask(taskInfo(), true);
+
+            case COMBINE:
+                return useNewCombiner ? new HadoopV2ReduceTask(taskInfo(), false) :
+                    new HadoopV1ReduceTask(taskInfo(), false);
+
+            case COMMIT:
+            case ABORT:
+                return useNewReducer ? new HadoopV2CleanupTask(taskInfo(), isAbort) :
+                    new HadoopV1CleanupTask(taskInfo(), isAbort);
+
+            default:
+                return null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void run() throws IgniteCheckedException {
+        try {
+            Thread.currentThread().setContextClassLoader(jobConf().getClassLoader());
+
+            try {
+                task = createTask();
+            }
+            catch (Throwable e) {
+                throw transformException(e);
+            }
+
+            if (cancelled)
+                throw new HadoopTaskCancelledException("Task cancelled.");
+
+            try {
+                task.run(this);
+            }
+            catch (Throwable e) {
+                throw transformException(e);
+            }
+        }
+        finally {
+            task = null;
+
+            Thread.currentThread().setContextClassLoader(null);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancel() {
+        cancelled = true;
+
+        GridHadoopTask t = task;
+
+        if (t != null)
+            t.cancel();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void prepareTaskEnvironment() throws IgniteCheckedException {
+        File locDir;
+
+        switch(taskInfo().type()) {
+            case MAP:
+            case REDUCE:
+                job().prepareTaskEnvironment(taskInfo());
+
+                locDir = taskLocalDir(locNodeId, taskInfo());
+
+                break;
+
+            default:
+                locDir = jobLocalDir(locNodeId, taskInfo().jobId());
+        }
+
+        Thread.currentThread().setContextClassLoader(jobConf().getClassLoader());
+
+        try {
+            FileSystem fs = FileSystem.get(jobConf());
+
+            HadoopFileSystemsUtils.setUser(fs, jobConf().getUser());
+
+            LocalFileSystem locFs = FileSystem.getLocal(jobConf());
+
+            locFs.setWorkingDirectory(new Path(locDir.getAbsolutePath()));
+        }
+        catch (Throwable e) {
+            throw transformException(e);
+        }
+        finally {
+            Thread.currentThread().setContextClassLoader(null);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cleanupTaskEnvironment() throws IgniteCheckedException {
+        job().cleanupTaskEnvironment(taskInfo());
+    }
+
+    /**
+     * Creates Hadoop attempt ID.
+     *
+     * @return Attempt ID.
+     */
+    public TaskAttemptID attemptId() {
+        TaskID tid = new TaskID(jobCtx.getJobID(), taskType(taskInfo().type()), taskInfo().taskNumber());
+
+        return new TaskAttemptID(tid, taskInfo().attempt());
+    }
+
+    /**
+     * @param type Task type.
+     * @return Hadoop task type.
+     */
+    private TaskType taskType(GridHadoopTaskType type) {
+        switch (type) {
+            case SETUP:
+                return TaskType.JOB_SETUP;
+            case MAP:
+            case COMBINE:
+                return TaskType.MAP;
+
+            case REDUCE:
+                return TaskType.REDUCE;
+
+            case COMMIT:
+            case ABORT:
+                return TaskType.JOB_CLEANUP;
+
+            default:
+                return null;
+        }
+    }
+
+    /**
+     * Gets job configuration of the task.
+     *
+     * @return Job configuration.
+     */
+    public JobConf jobConf() {
+        return jobCtx.getJobConf();
+    }
+
+    /**
+     * Gets job context of the task.
+     *
+     * @return Job context.
+     */
+    public JobContextImpl jobContext() {
+        return jobCtx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridHadoopPartitioner partitioner() throws IgniteCheckedException {
+        Class<?> partClsOld = jobConf().getClass("mapred.partitioner.class", null);
+
+        if (partClsOld != null)
+            return new HadoopV1Partitioner(jobConf().getPartitionerClass(), jobConf());
+
+        try {
+            return new HadoopV2Partitioner(jobCtx.getPartitionerClass(), jobConf());
+        }
+        catch (ClassNotFoundException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /**
+     * Gets serializer for specified class.
+     *
+     * @param cls Class.
+     * @param jobConf Job configuration.
+     * @return Appropriate serializer.
+     */
+    @SuppressWarnings("unchecked")
+    private GridHadoopSerialization getSerialization(Class<?> cls, Configuration jobConf) throws IgniteCheckedException {
+        A.notNull(cls, "cls");
+
+        SerializationFactory factory = new SerializationFactory(jobConf);
+
+        Serialization<?> serialization = factory.getSerialization(cls);
+
+        if (serialization == null)
+            throw new IgniteCheckedException("Failed to find serialization for: " + cls.getName());
+
+        if (serialization.getClass() == WritableSerialization.class)
+            return new HadoopWritableSerialization((Class<? extends Writable>)cls);
+
+        return new HadoopSerializationWrapper(serialization, cls);
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridHadoopSerialization keySerialization() throws IgniteCheckedException {
+        return getSerialization(jobCtx.getMapOutputKeyClass(), jobConf());
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridHadoopSerialization valueSerialization() throws IgniteCheckedException {
+        return getSerialization(jobCtx.getMapOutputValueClass(), jobConf());
+    }
+
+    /** {@inheritDoc} */
+    @Override public Comparator<Object> sortComparator() {
+        return (Comparator<Object>)jobCtx.getSortComparator();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Comparator<Object> groupComparator() {
+        Comparator<?> res;
+
+        switch (taskInfo().type()) {
+            case COMBINE:
+                res = COMBINE_KEY_GROUPING_SUPPORTED ?
+                    jobContext().getCombinerKeyGroupingComparator() : jobContext().getGroupingComparator();
+
+                break;
+
+            case REDUCE:
+                res = jobContext().getGroupingComparator();
+
+                break;
+
+            default:
+                return null;
+        }
+
+        if (res != null && res.getClass() != sortComparator().getClass())
+            return (Comparator<Object>)res;
+
+        return null;
+    }
+
+    /**
+     * @param split Split.
+     * @return Native Hadoop split.
+     * @throws IgniteCheckedException if failed.
+     */
+    @SuppressWarnings("unchecked")
+    public Object getNativeSplit(GridHadoopInputSplit split) throws IgniteCheckedException {
+        if (split instanceof HadoopExternalSplit)
+            return readExternalSplit((HadoopExternalSplit)split);
+
+        if (split instanceof HadoopSplitWrapper)
+            return unwrapSplit((HadoopSplitWrapper)split);
+
+        throw new IllegalStateException("Unknown split: " + split);
+    }
+
+    /**
+     * @param split External split.
+     * @return Native input split.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("unchecked")
+    private Object readExternalSplit(HadoopExternalSplit split) throws IgniteCheckedException {
+        Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR));
+
+        try (FileSystem fs = FileSystem.get(jobDir.toUri(), jobConf());
+            FSDataInputStream in = fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) {
+
+            in.seek(split.offset());
+
+            String clsName = Text.readString(in);
+
+            Class<?> cls = jobConf().getClassByName(clsName);
+
+            assert cls != null;
+
+            Serialization serialization = new SerializationFactory(jobConf()).getSerialization(cls);
+
+            Deserializer deserializer = serialization.getDeserializer(cls);
+
+            deserializer.open(in);
+
+            Object res = deserializer.deserialize(null);
+
+            deserializer.close();
+
+            assert res != null;
+
+            return res;
+        }
+        catch (IOException | ClassNotFoundException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java
new file mode 100644
index 0000000..cf47e6f
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v2;
+
+import org.apache.hadoop.io.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Optimized serialization for Hadoop {@link Writable} types.
+ */
+public class HadoopWritableSerialization implements GridHadoopSerialization {
+    /** */
+    private final Class<? extends Writable> cls;
+
+    /**
+     * @param cls Class.
+     */
+    public HadoopWritableSerialization(Class<? extends Writable> cls) {
+        assert cls != null;
+
+        this.cls = cls;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(DataOutput out, Object obj) throws IgniteCheckedException {
+        assert cls.isAssignableFrom(obj.getClass()) : cls + " " + obj.getClass();
+
+        try {
+            ((Writable)obj).write(out);
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object read(DataInput in, @Nullable Object obj) throws IgniteCheckedException {
+        Writable w = obj == null ? U.newInstance(cls) : cls.cast(obj);
+
+        try {
+            w.readFields(in);
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+
+        return w;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSerializationWrapperSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSerializationWrapperSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSerializationWrapperSelfTest.java
deleted file mode 100644
index 79b9965..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSerializationWrapperSelfTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.io.serializer.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
-import org.apache.ignite.testframework.junits.common.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Test of wrapper of the native serialization.
- */
-public class GridHadoopSerializationWrapperSelfTest extends GridCommonAbstractTest {
-    /**
-     * Tests read/write of IntWritable via native WritableSerialization.
-     * @throws Exception If fails.
-     */
-    public void testIntWritableSerialization() throws Exception {
-        GridHadoopSerialization ser = new GridHadoopSerializationWrapper(new WritableSerialization(), IntWritable.class);
-
-        ByteArrayOutputStream buf = new ByteArrayOutputStream();
-
-        DataOutput out = new DataOutputStream(buf);
-
-        ser.write(out, new IntWritable(3));
-        ser.write(out, new IntWritable(-5));
-
-        assertEquals("[0, 0, 0, 3, -1, -1, -1, -5]", Arrays.toString(buf.toByteArray()));
-
-        DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray()));
-
-        assertEquals(3, ((IntWritable)ser.read(in, null)).get());
-        assertEquals(-5, ((IntWritable)ser.read(in, null)).get());
-    }
-
-    /**
-     * Tests read/write of Integer via native JavaleSerialization.
-     * @throws Exception If fails.
-     */
-    public void testIntJavaSerialization() throws Exception {
-        GridHadoopSerialization ser = new GridHadoopSerializationWrapper(new JavaSerialization(), Integer.class);
-
-        ByteArrayOutputStream buf = new ByteArrayOutputStream();
-
-        DataOutput out = new DataOutputStream(buf);
-
-        ser.write(out, 3);
-        ser.write(out, -5);
-        ser.close();
-
-        DataInput in = new DataInputStream(new ByteArrayInputStream(buf.toByteArray()));
-
-        assertEquals(3, ((Integer)ser.read(in, null)).intValue());
-        assertEquals(-5, ((Integer)ser.read(in, null)).intValue());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSplitWrapperSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSplitWrapperSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSplitWrapperSelfTest.java
deleted file mode 100644
index c086719..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/GridHadoopSplitWrapperSelfTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.mapreduce.lib.input.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
-import org.apache.ignite.testframework.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- * Self test of {@link GridHadoopSplitWrapper}.
- */
-public class GridHadoopSplitWrapperSelfTest extends GridHadoopAbstractSelfTest {
-    /**
-     * Tests serialization of wrapper and the wrapped native split.
-     * @throws Exception If fails.
-     */
-    public void testSerialization() throws Exception {
-        FileSplit nativeSplit = new FileSplit(new Path("/path/to/file"), 100, 500, new String[]{"host1", "host2"});
-
-        assertEquals("/path/to/file:100+500", nativeSplit.toString());
-
-        GridHadoopSplitWrapper split = HadoopUtils.wrapSplit(10, nativeSplit, nativeSplit.getLocations());
-
-        assertEquals("[host1, host2]", Arrays.toString(split.hosts()));
-
-        ByteArrayOutputStream buf = new ByteArrayOutputStream();
-
-        ObjectOutput out = new ObjectOutputStream(buf);
-
-        out.writeObject(split);
-
-        ObjectInput in = new ObjectInputStream(new ByteArrayInputStream(buf.toByteArray()));
-
-        final GridHadoopSplitWrapper res = (GridHadoopSplitWrapper)in.readObject();
-
-        assertEquals("/path/to/file:100+500", HadoopUtils.unwrapSplit(res).toString());
-
-        GridTestUtils.assertThrows(log, new Callable<Object>() {
-            @Override public Object call() throws Exception {
-                res.hosts();
-
-                return null;
-            }
-        }, AssertionError.class, null);
-    }
-
-
-}