You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/03/03 14:08:35 UTC

[19/31] incubator-ignite git commit: # IGNITE-386: WIP on internal namings (3).

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1OutputCollector.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1OutputCollector.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1OutputCollector.java
deleted file mode 100644
index 2a38684..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1OutputCollector.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import org.apache.hadoop.mapred.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-
-/**
- * Hadoop output collector.
- */
-public class GridHadoopV1OutputCollector implements OutputCollector {
-    /** Job configuration. */
-    private final JobConf jobConf;
-
-    /** Task context. */
-    private final GridHadoopTaskContext taskCtx;
-
-    /** Optional direct writer. */
-    private final RecordWriter writer;
-
-    /** Task attempt. */
-    private final TaskAttemptID attempt;
-
-    /**
-     * @param jobConf Job configuration.
-     * @param taskCtx Task context.
-     * @param directWrite Direct write flag.
-     * @param fileName File name.
-     * @throws IOException In case of IO exception.
-     */
-    GridHadoopV1OutputCollector(JobConf jobConf, GridHadoopTaskContext taskCtx, boolean directWrite,
-        @Nullable String fileName, TaskAttemptID attempt) throws IOException {
-        this.jobConf = jobConf;
-        this.taskCtx = taskCtx;
-        this.attempt = attempt;
-
-        if (directWrite) {
-            jobConf.set("mapreduce.task.attempt.id", attempt.toString());
-
-            OutputFormat outFormat = jobConf.getOutputFormat();
-
-            writer = outFormat.getRecordWriter(null, jobConf, fileName, Reporter.NULL);
-        }
-        else
-            writer = null;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public void collect(Object key, Object val) throws IOException {
-        if (writer != null)
-            writer.write(key, val);
-        else {
-            try {
-                taskCtx.output().write(key, val);
-            }
-            catch (IgniteCheckedException e) {
-                throw new IOException(e);
-            }
-        }
-    }
-
-    /**
-     * Close writer.
-     *
-     * @throws IOException In case of IO exception.
-     */
-    public void closeWriter() throws IOException {
-        if (writer != null)
-            writer.close(Reporter.NULL);
-    }
-
-    /**
-     * Setup task.
-     *
-     * @throws IOException If failed.
-     */
-    public void setup() throws IOException {
-        if (writer != null)
-            jobConf.getOutputCommitter().setupTask(new TaskAttemptContextImpl(jobConf, attempt));
-    }
-
-    /**
-     * Commit task.
-     *
-     * @throws IOException In failed.
-     */
-    public void commit() throws IOException {
-        if (writer != null) {
-            OutputCommitter outputCommitter = jobConf.getOutputCommitter();
-
-            TaskAttemptContext taskCtx = new TaskAttemptContextImpl(jobConf, attempt);
-
-            if (outputCommitter.needsTaskCommit(taskCtx))
-                outputCommitter.commitTask(taskCtx);
-        }
-    }
-
-    /**
-     * Abort task.
-     */
-    public void abort() {
-        try {
-            if (writer != null)
-                jobConf.getOutputCommitter().abortTask(new TaskAttemptContextImpl(jobConf, attempt));
-        }
-        catch (IOException ignore) {
-            // No-op.
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Partitioner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Partitioner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Partitioner.java
deleted file mode 100644
index 688ccef..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Partitioner.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-
-/**
- * Hadoop partitioner adapter for v1 API.
- */
-public class GridHadoopV1Partitioner implements GridHadoopPartitioner {
-    /** Partitioner instance. */
-    private Partitioner<Object, Object> part;
-
-    /**
-     * @param cls Hadoop partitioner class.
-     * @param conf Job configuration.
-     */
-    public GridHadoopV1Partitioner(Class<? extends Partitioner> cls, Configuration conf) {
-        part = (Partitioner<Object, Object>) ReflectionUtils.newInstance(cls, conf);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int partition(Object key, Object val, int parts) {
-        return part.getPartition(key, val, parts);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1ReduceTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1ReduceTask.java
deleted file mode 100644
index 3aca637..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1ReduceTask.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
-
-/**
- * Hadoop reduce task implementation for v1 API.
- */
-public class GridHadoopV1ReduceTask extends GridHadoopV1Task {
-    /** {@code True} if reduce, {@code false} if combine. */
-    private final boolean reduce;
-
-    /**
-     * Constructor.
-     *
-     * @param taskInfo Task info.
-     * @param reduce {@code True} if reduce, {@code false} if combine.
-     */
-    public GridHadoopV1ReduceTask(GridHadoopTaskInfo taskInfo, boolean reduce) {
-        super(taskInfo);
-
-        this.reduce = reduce;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
-        GridHadoopJob job = taskCtx.job();
-
-        GridHadoopV2TaskContext ctx = (GridHadoopV2TaskContext)taskCtx;
-
-        JobConf jobConf = ctx.jobConf();
-
-        GridHadoopTaskInput input = taskCtx.input();
-
-        GridHadoopV1OutputCollector collector = null;
-
-        try {
-            collector = collector(jobConf, ctx, reduce || !job.info().hasReducer(), fileName(), ctx.attemptId());
-
-            Reducer reducer = ReflectionUtils.newInstance(reduce ? jobConf.getReducerClass() : jobConf.getCombinerClass(),
-                jobConf);
-
-            assert reducer != null;
-
-            try {
-                try {
-                    while (input.next()) {
-                        if (isCancelled())
-                            throw new HadoopTaskCancelledException("Reduce task cancelled.");
-
-                        reducer.reduce(input.key(), input.values(), collector, Reporter.NULL);
-                    }
-                }
-                finally {
-                    reducer.close();
-                }
-            }
-            finally {
-                collector.closeWriter();
-            }
-
-            collector.commit();
-        }
-        catch (Exception e) {
-            if (collector != null)
-                collector.abort();
-
-            throw new IgniteCheckedException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Reporter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Reporter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Reporter.java
deleted file mode 100644
index 791ccdc..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Reporter.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import org.apache.hadoop.mapred.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.counter.*;
-
-/**
- * Hadoop reporter implementation for v1 API.
- */
-public class GridHadoopV1Reporter implements Reporter {
-    /** Context. */
-    private final GridHadoopTaskContext ctx;
-
-    /**
-     * Creates new instance.
-     *
-     * @param ctx Context.
-     */
-    public GridHadoopV1Reporter(GridHadoopTaskContext ctx) {
-        this.ctx = ctx;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setStatus(String status) {
-        // TODO
-    }
-
-    /** {@inheritDoc} */
-    @Override public Counters.Counter getCounter(Enum<?> name) {
-        return getCounter(name.getDeclaringClass().getName(), name.name());
-    }
-
-    /** {@inheritDoc} */
-    @Override public Counters.Counter getCounter(String grp, String name) {
-        return new GridHadoopV1Counter(ctx.counter(grp, name, HadoopLongCounter.class));
-    }
-
-    /** {@inheritDoc} */
-    @Override public void incrCounter(Enum<?> key, long amount) {
-        getCounter(key).increment(amount);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void incrCounter(String grp, String cntr, long amount) {
-        getCounter(grp, cntr).increment(amount);
-    }
-
-    /** {@inheritDoc} */
-    @Override public InputSplit getInputSplit() throws UnsupportedOperationException {
-        throw new UnsupportedOperationException("reporter has no input"); // TODO
-    }
-
-    /** {@inheritDoc} */
-    @Override public float getProgress() {
-        return 0.5f; // TODO
-    }
-
-    /** {@inheritDoc} */
-    @Override public void progress() {
-        // TODO
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1SetupTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1SetupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1SetupTask.java
deleted file mode 100644
index c7dc3fd..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1SetupTask.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import org.apache.hadoop.mapred.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
-
-import java.io.*;
-
-/**
- * Hadoop setup task implementation for v1 API.
- */
-public class GridHadoopV1SetupTask extends GridHadoopV1Task {
-    /**
-     * Constructor.
-     *
-     * @param taskInfo Task info.
-     */
-    public GridHadoopV1SetupTask(GridHadoopTaskInfo taskInfo) {
-        super(taskInfo);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
-        GridHadoopV2TaskContext ctx = (GridHadoopV2TaskContext)taskCtx;
-
-        try {
-            ctx.jobConf().getOutputFormat().checkOutputSpecs(null, ctx.jobConf());
-
-            OutputCommitter committer = ctx.jobConf().getOutputCommitter();
-
-            if (committer != null)
-                committer.setupJob(ctx.jobContext());
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Splitter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Splitter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Splitter.java
deleted file mode 100644
index 0e1fb44..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Splitter.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.mapred.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Hadoop API v1 splitter.
- */
-public class GridHadoopV1Splitter {
-    /** */
-    private static final String[] EMPTY_HOSTS = {};
-
-    /**
-     * @param jobConf Job configuration.
-     * @return Collection of mapped splits.
-     * @throws IgniteCheckedException If mapping failed.
-     */
-    public static Collection<GridHadoopInputSplit> splitJob(JobConf jobConf) throws IgniteCheckedException {
-        try {
-            InputFormat<?, ?> format = jobConf.getInputFormat();
-
-            assert format != null;
-
-            InputSplit[] splits = format.getSplits(jobConf, 0);
-
-            Collection<GridHadoopInputSplit> res = new ArrayList<>(splits.length);
-
-            for (int i = 0; i < splits.length; i++) {
-                InputSplit nativeSplit = splits[i];
-
-                if (nativeSplit instanceof FileSplit) {
-                    FileSplit s = (FileSplit)nativeSplit;
-
-                    res.add(new GridHadoopFileBlock(s.getLocations(), s.getPath().toUri(), s.getStart(), s.getLength()));
-                }
-                else
-                    res.add(HadoopUtils.wrapSplit(i, nativeSplit, nativeSplit.getLocations()));
-            }
-
-            return res;
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-    }
-
-    /**
-     * @param clsName Input split class name.
-     * @param in Input stream.
-     * @param hosts Optional hosts.
-     * @return File block or {@code null} if it is not a {@link FileSplit} instance.
-     * @throws IgniteCheckedException If failed.
-     */
-    @Nullable public static GridHadoopFileBlock readFileBlock(String clsName, FSDataInputStream in,
-        @Nullable String[] hosts) throws IgniteCheckedException {
-        if (!FileSplit.class.getName().equals(clsName))
-            return null;
-
-        FileSplit split = U.newInstance(FileSplit.class);
-
-        try {
-            split.readFields(in);
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-
-        if (hosts == null)
-            hosts = EMPTY_HOSTS;
-
-        return new GridHadoopFileBlock(hosts, split.getPath().toUri(), split.getStart(), split.getLength());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Task.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Task.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Task.java
deleted file mode 100644
index 305bc4e..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/GridHadoopV1Task.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v1;
-
-import org.apache.hadoop.mapred.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.v2.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.text.*;
-
-/**
- * Extended Hadoop v1 task.
- */
-public abstract class GridHadoopV1Task extends GridHadoopTask {
-    /** Indicates that this task is to be cancelled. */
-    private volatile boolean cancelled;
-
-    /**
-     * Constructor.
-     *
-     * @param taskInfo Task info.
-     */
-    protected GridHadoopV1Task(GridHadoopTaskInfo taskInfo) {
-        super(taskInfo);
-    }
-
-    /**
-     * Gets file name for that task result.
-     *
-     * @return File name.
-     */
-    public String fileName() {
-        NumberFormat numFormat = NumberFormat.getInstance();
-
-        numFormat.setMinimumIntegerDigits(5);
-        numFormat.setGroupingUsed(false);
-
-        return "part-" + numFormat.format(info().taskNumber());
-    }
-
-    /**
-     *
-     * @param jobConf Job configuration.
-     * @param taskCtx Task context.
-     * @param directWrite Direct write flag.
-     * @param fileName File name.
-     * @param attempt Attempt of task.
-     * @return Collector.
-     * @throws IOException In case of IO exception.
-     */
-    protected GridHadoopV1OutputCollector collector(JobConf jobConf, GridHadoopV2TaskContext taskCtx,
-        boolean directWrite, @Nullable String fileName, TaskAttemptID attempt) throws IOException {
-        GridHadoopV1OutputCollector collector = new GridHadoopV1OutputCollector(jobConf, taskCtx, directWrite,
-            fileName, attempt) {
-            /** {@inheritDoc} */
-            @Override public void collect(Object key, Object val) throws IOException {
-                if (cancelled)
-                    throw new HadoopTaskCancelledException("Task cancelled.");
-
-                super.collect(key, val);
-            }
-        };
-
-        collector.setup();
-
-        return collector;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void cancel() {
-        cancelled = true;
-    }
-
-    /** Returns true if task is cancelled. */
-    public boolean isCancelled() {
-        return cancelled;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java
new file mode 100644
index 0000000..85f08be
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+
+import java.io.*;
+
+/**
+ * Hadoop cleanup task implementation for v1 API.
+ */
+public class HadoopV1CleanupTask extends HadoopV1Task {
+    /** Abort flag. */
+    private final boolean abort;
+
+    /**
+     * @param taskInfo Task info.
+     * @param abort Abort flag.
+     */
+    public HadoopV1CleanupTask(GridHadoopTaskInfo taskInfo, boolean abort) {
+        super(taskInfo);
+
+        this.abort = abort;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
+        HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+
+        JobContext jobCtx = ctx.jobContext();
+
+        try {
+            OutputCommitter committer = jobCtx.getJobConf().getOutputCommitter();
+
+            if (abort)
+                committer.abortJob(jobCtx, JobStatus.State.FAILED);
+            else
+                committer.commitJob(jobCtx);
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java
new file mode 100644
index 0000000..609297b
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.ignite.internal.processors.hadoop.counter.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+
+import java.io.*;
+
+import static org.apache.hadoop.mapreduce.util.CountersStrings.*;
+
+/**
+ * Hadoop counter implementation for v1 API.
+ */
+public class HadoopV1Counter extends Counters.Counter {
+    /** Delegate. */
+    private final HadoopLongCounter cntr;
+
+    /**
+     * Creates new instance.
+     *
+     * @param cntr Delegate counter.
+     */
+    public HadoopV1Counter(HadoopLongCounter cntr) {
+        this.cntr = cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setDisplayName(String displayName) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getName() {
+        return cntr.name();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getDisplayName() {
+        return getName();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getValue() {
+        return cntr.value();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setValue(long val) {
+        cntr.value(val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void increment(long incr) {
+        cntr.increment(incr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(DataOutput out) throws IOException {
+        throw new UnsupportedOperationException("not implemented");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readFields(DataInput in) throws IOException {
+        throw new UnsupportedOperationException("not implemented");
+    }
+
+    /** {@inheritDoc} */
+    @Override public String makeEscapedCompactString() {
+        return toEscapedCompactString(new HadoopV2Counter(cntr));
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("deprecation")
+    @Override public boolean contentEquals(Counters.Counter cntr) {
+        return getUnderlyingCounter().equals(cntr.getUnderlyingCounter());
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getCounter() {
+        return cntr.value();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Counter getUnderlyingCounter() {
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java
new file mode 100644
index 0000000..51856d6
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+
+/**
+ * Hadoop map task implementation for v1 API.
+ */
+public class HadoopV1MapTask extends HadoopV1Task {
+    /** */
+    private static final String[] EMPTY_HOSTS = new String[0];
+
+    /**
+     * Constructor.
+     *
+     * @param taskInfo 
+     */
+    public HadoopV1MapTask(GridHadoopTaskInfo taskInfo) {
+        super(taskInfo);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
+        GridHadoopJob job = taskCtx.job();
+
+        HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+
+        JobConf jobConf = ctx.jobConf();
+
+        InputFormat inFormat = jobConf.getInputFormat();
+
+        GridHadoopInputSplit split = info().inputSplit();
+
+        InputSplit nativeSplit;
+
+        if (split instanceof GridHadoopFileBlock) {
+            GridHadoopFileBlock block = (GridHadoopFileBlock)split;
+
+            nativeSplit = new FileSplit(new Path(block.file().toString()), block.start(), block.length(), EMPTY_HOSTS);
+        }
+        else
+            nativeSplit = (InputSplit)ctx.getNativeSplit(split);
+
+        assert nativeSplit != null;
+
+        Reporter reporter = new HadoopV1Reporter(taskCtx);
+
+        HadoopV1OutputCollector collector = null;
+
+        try {
+            collector = collector(jobConf, ctx, !job.info().hasCombiner() && !job.info().hasReducer(),
+                fileName(), ctx.attemptId());
+
+            RecordReader reader = inFormat.getRecordReader(nativeSplit, jobConf, reporter);
+
+            Mapper mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf);
+
+            Object key = reader.createKey();
+            Object val = reader.createValue();
+
+            assert mapper != null;
+
+            try {
+                try {
+                    while (reader.next(key, val)) {
+                        if (isCancelled())
+                            throw new HadoopTaskCancelledException("Map task cancelled.");
+
+                        mapper.map(key, val, collector, reporter);
+                    }
+                }
+                finally {
+                    mapper.close();
+                }
+            }
+            finally {
+                collector.closeWriter();
+            }
+
+            collector.commit();
+        }
+        catch (Exception e) {
+            if (collector != null)
+                collector.abort();
+
+            throw new IgniteCheckedException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java
new file mode 100644
index 0000000..ac23bb3
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1OutputCollector.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Hadoop output collector.
+ */
+public class HadoopV1OutputCollector implements OutputCollector {
+    /** Job configuration. */
+    private final JobConf jobConf;
+
+    /** Task context. */
+    private final GridHadoopTaskContext taskCtx;
+
+    /** Optional direct writer. */
+    private final RecordWriter writer;
+
+    /** Task attempt. */
+    private final TaskAttemptID attempt;
+
+    /**
+     * @param jobConf Job configuration.
+     * @param taskCtx Task context.
+     * @param directWrite Direct write flag.
+     * @param fileName File name.
+     * @throws IOException In case of IO exception.
+     */
+    HadoopV1OutputCollector(JobConf jobConf, GridHadoopTaskContext taskCtx, boolean directWrite,
+        @Nullable String fileName, TaskAttemptID attempt) throws IOException {
+        this.jobConf = jobConf;
+        this.taskCtx = taskCtx;
+        this.attempt = attempt;
+
+        if (directWrite) {
+            jobConf.set("mapreduce.task.attempt.id", attempt.toString());
+
+            OutputFormat outFormat = jobConf.getOutputFormat();
+
+            writer = outFormat.getRecordWriter(null, jobConf, fileName, Reporter.NULL);
+        }
+        else
+            writer = null;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void collect(Object key, Object val) throws IOException {
+        if (writer != null)
+            writer.write(key, val);
+        else {
+            try {
+                taskCtx.output().write(key, val);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IOException(e);
+            }
+        }
+    }
+
+    /**
+     * Close writer.
+     *
+     * @throws IOException In case of IO exception.
+     */
+    public void closeWriter() throws IOException {
+        if (writer != null)
+            writer.close(Reporter.NULL);
+    }
+
+    /**
+     * Setup task.
+     *
+     * @throws IOException If failed.
+     */
+    public void setup() throws IOException {
+        if (writer != null)
+            jobConf.getOutputCommitter().setupTask(new TaskAttemptContextImpl(jobConf, attempt));
+    }
+
+    /**
+     * Commit task.
+     *
+     * @throws IOException In failed.
+     */
+    public void commit() throws IOException {
+        if (writer != null) {
+            OutputCommitter outputCommitter = jobConf.getOutputCommitter();
+
+            TaskAttemptContext taskCtx = new TaskAttemptContextImpl(jobConf, attempt);
+
+            if (outputCommitter.needsTaskCommit(taskCtx))
+                outputCommitter.commitTask(taskCtx);
+        }
+    }
+
+    /**
+     * Abort task.
+     */
+    public void abort() {
+        try {
+            if (writer != null)
+                jobConf.getOutputCommitter().abortTask(new TaskAttemptContextImpl(jobConf, attempt));
+        }
+        catch (IOException ignore) {
+            // No-op.
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java
new file mode 100644
index 0000000..36fdd55
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Partitioner.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+
+/**
+ * Hadoop partitioner adapter for v1 API.
+ */
+public class HadoopV1Partitioner implements GridHadoopPartitioner {
+    /** Partitioner instance. */
+    private Partitioner<Object, Object> part;
+
+    /**
+     * @param cls Hadoop partitioner class.
+     * @param conf Job configuration.
+     */
+    public HadoopV1Partitioner(Class<? extends Partitioner> cls, Configuration conf) {
+        part = (Partitioner<Object, Object>) ReflectionUtils.newInstance(cls, conf);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int partition(Object key, Object val, int parts) {
+        return part.getPartition(key, val, parts);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java
new file mode 100644
index 0000000..b5c6bfa
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1ReduceTask.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+
+/**
+ * Hadoop reduce task implementation for v1 API.
+ */
+public class HadoopV1ReduceTask extends HadoopV1Task {
+    /** {@code True} if reduce, {@code false} if combine. */
+    private final boolean reduce;
+
+    /**
+     * Constructor.
+     *
+     * @param taskInfo Task info.
+     * @param reduce {@code True} if reduce, {@code false} if combine.
+     */
+    public HadoopV1ReduceTask(GridHadoopTaskInfo taskInfo, boolean reduce) {
+        super(taskInfo);
+
+        this.reduce = reduce;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
+        GridHadoopJob job = taskCtx.job();
+
+        HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+
+        JobConf jobConf = ctx.jobConf();
+
+        GridHadoopTaskInput input = taskCtx.input();
+
+        HadoopV1OutputCollector collector = null;
+
+        try {
+            collector = collector(jobConf, ctx, reduce || !job.info().hasReducer(), fileName(), ctx.attemptId());
+
+            Reducer reducer = ReflectionUtils.newInstance(reduce ? jobConf.getReducerClass() : jobConf.getCombinerClass(),
+                jobConf);
+
+            assert reducer != null;
+
+            try {
+                try {
+                    while (input.next()) {
+                        if (isCancelled())
+                            throw new HadoopTaskCancelledException("Reduce task cancelled.");
+
+                        reducer.reduce(input.key(), input.values(), collector, Reporter.NULL);
+                    }
+                }
+                finally {
+                    reducer.close();
+                }
+            }
+            finally {
+                collector.closeWriter();
+            }
+
+            collector.commit();
+        }
+        catch (Exception e) {
+            if (collector != null)
+                collector.abort();
+
+            throw new IgniteCheckedException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java
new file mode 100644
index 0000000..db4e159
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Reporter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.counter.*;
+
+/**
+ * Hadoop reporter implementation for v1 API.
+ */
+public class HadoopV1Reporter implements Reporter {
+    /** Context. */
+    private final GridHadoopTaskContext ctx;
+
+    /**
+     * Creates new instance.
+     *
+     * @param ctx Context.
+     */
+    public HadoopV1Reporter(GridHadoopTaskContext ctx) {
+        this.ctx = ctx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setStatus(String status) {
+        // TODO
+    }
+
+    /** {@inheritDoc} */
+    @Override public Counters.Counter getCounter(Enum<?> name) {
+        return getCounter(name.getDeclaringClass().getName(), name.name());
+    }
+
+    /** {@inheritDoc} */
+    @Override public Counters.Counter getCounter(String grp, String name) {
+        return new HadoopV1Counter(ctx.counter(grp, name, HadoopLongCounter.class));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void incrCounter(Enum<?> key, long amount) {
+        getCounter(key).increment(amount);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void incrCounter(String grp, String cntr, long amount) {
+        getCounter(grp, cntr).increment(amount);
+    }
+
+    /** {@inheritDoc} */
+    @Override public InputSplit getInputSplit() throws UnsupportedOperationException {
+        throw new UnsupportedOperationException("reporter has no input"); // TODO
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getProgress() {
+        return 0.5f; // TODO
+    }
+
+    /** {@inheritDoc} */
+    @Override public void progress() {
+        // TODO
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java
new file mode 100644
index 0000000..c427774
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1SetupTask.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+
+import java.io.*;
+
+/**
+ * Hadoop setup task implementation for v1 API.
+ */
+public class HadoopV1SetupTask extends HadoopV1Task {
+    /**
+     * Constructor.
+     *
+     * @param taskInfo Task info.
+     */
+    public HadoopV1SetupTask(GridHadoopTaskInfo taskInfo) {
+        super(taskInfo);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void run(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
+        HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+
+        try {
+            ctx.jobConf().getOutputFormat().checkOutputSpecs(null, ctx.jobConf());
+
+            OutputCommitter committer = ctx.jobConf().getOutputCommitter();
+
+            if (committer != null)
+                committer.setupJob(ctx.jobContext());
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java
new file mode 100644
index 0000000..0d89082
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Splitter.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Hadoop API v1 splitter.
+ */
+public class HadoopV1Splitter {
+    /** */
+    private static final String[] EMPTY_HOSTS = {};
+
+    /**
+     * @param jobConf Job configuration.
+     * @return Collection of mapped splits.
+     * @throws IgniteCheckedException If mapping failed.
+     */
+    public static Collection<GridHadoopInputSplit> splitJob(JobConf jobConf) throws IgniteCheckedException {
+        try {
+            InputFormat<?, ?> format = jobConf.getInputFormat();
+
+            assert format != null;
+
+            InputSplit[] splits = format.getSplits(jobConf, 0);
+
+            Collection<GridHadoopInputSplit> res = new ArrayList<>(splits.length);
+
+            for (int i = 0; i < splits.length; i++) {
+                InputSplit nativeSplit = splits[i];
+
+                if (nativeSplit instanceof FileSplit) {
+                    FileSplit s = (FileSplit)nativeSplit;
+
+                    res.add(new GridHadoopFileBlock(s.getLocations(), s.getPath().toUri(), s.getStart(), s.getLength()));
+                }
+                else
+                    res.add(HadoopUtils.wrapSplit(i, nativeSplit, nativeSplit.getLocations()));
+            }
+
+            return res;
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /**
+     * @param clsName Input split class name.
+     * @param in Input stream.
+     * @param hosts Optional hosts.
+     * @return File block or {@code null} if it is not a {@link FileSplit} instance.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable public static GridHadoopFileBlock readFileBlock(String clsName, FSDataInputStream in,
+        @Nullable String[] hosts) throws IgniteCheckedException {
+        if (!FileSplit.class.getName().equals(clsName))
+            return null;
+
+        FileSplit split = U.newInstance(FileSplit.class);
+
+        try {
+            split.readFields(in);
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+
+        if (hosts == null)
+            hosts = EMPTY_HOSTS;
+
+        return new GridHadoopFileBlock(hosts, split.getPath().toUri(), split.getStart(), split.getLength());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java
new file mode 100644
index 0000000..71a259c
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Task.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.mapred.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.text.*;
+
+/**
+ * Extended Hadoop v1 task.
+ */
+public abstract class HadoopV1Task extends GridHadoopTask {
+    /** Indicates that this task is to be cancelled. */
+    private volatile boolean cancelled;
+
+    /**
+     * Constructor.
+     *
+     * @param taskInfo Task info.
+     */
+    protected HadoopV1Task(GridHadoopTaskInfo taskInfo) {
+        super(taskInfo);
+    }
+
+    /**
+     * Gets file name for that task result.
+     *
+     * @return File name.
+     */
+    public String fileName() {
+        NumberFormat numFormat = NumberFormat.getInstance();
+
+        numFormat.setMinimumIntegerDigits(5);
+        numFormat.setGroupingUsed(false);
+
+        return "part-" + numFormat.format(info().taskNumber());
+    }
+
+    /**
+     *
+     * @param jobConf Job configuration.
+     * @param taskCtx Task context.
+     * @param directWrite Direct write flag.
+     * @param fileName File name.
+     * @param attempt Attempt of task.
+     * @return Collector.
+     * @throws IOException In case of IO exception.
+     */
+    protected HadoopV1OutputCollector collector(JobConf jobConf, HadoopV2TaskContext taskCtx,
+        boolean directWrite, @Nullable String fileName, TaskAttemptID attempt) throws IOException {
+        HadoopV1OutputCollector collector = new HadoopV1OutputCollector(jobConf, taskCtx, directWrite,
+            fileName, attempt) {
+            /** {@inheritDoc} */
+            @Override public void collect(Object key, Object val) throws IOException {
+                if (cancelled)
+                    throw new HadoopTaskCancelledException("Task cancelled.");
+
+                super.collect(key, val);
+            }
+        };
+
+        collector.setup();
+
+        return collector;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancel() {
+        cancelled = true;
+    }
+
+    /** Returns true if task is cancelled. */
+    public boolean isCancelled() {
+        return cancelled;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopExternalSplit.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopExternalSplit.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopExternalSplit.java
deleted file mode 100644
index 36b40a2..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopExternalSplit.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.ignite.internal.processors.hadoop.*;
-
-import java.io.*;
-
-/**
- * Split serialized in external file.
- */
-public class GridHadoopExternalSplit extends GridHadoopInputSplit {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private long off;
-
-    /**
-     * For {@link Externalizable}.
-     */
-    public GridHadoopExternalSplit() {
-        // No-op.
-    }
-
-    /**
-     * @param hosts Hosts.
-     * @param off Offset of this split in external file.
-     */
-    public GridHadoopExternalSplit(String[] hosts, long off) {
-        assert off >= 0 : off;
-        assert hosts != null;
-
-        this.hosts = hosts;
-        this.off = off;
-    }
-
-    /**
-     * @return Offset of this input split in external file.
-     */
-    public long offset() {
-        return off;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeLong(off);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        off = in.readLong();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object o) {
-        if (this == o)
-            return true;
-
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        GridHadoopExternalSplit that = (GridHadoopExternalSplit) o;
-
-        return off == that.off;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        return (int)(off ^ (off >>> 32));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopNativeCodeLoader.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopNativeCodeLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopNativeCodeLoader.java
deleted file mode 100644
index 5ef4759..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopNativeCodeLoader.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.hadoop.classification.*;
-import org.apache.hadoop.conf.*;
-
-/**
- * A fake helper to load the native hadoop code i.e. libhadoop.so.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class GridHadoopNativeCodeLoader {
-    /**
-     * Check if native-hadoop code is loaded for this platform.
-     *
-     * @return <code>true</code> if native-hadoop is loaded,
-     *         else <code>false</code>
-     */
-    public static boolean isNativeCodeLoaded() {
-        return false;
-    }
-
-    /**
-     * Returns true only if this build was compiled with support for snappy.
-     */
-    public static boolean buildSupportsSnappy() {
-        return false;
-    }
-
-    /**
-     * @return Library name.
-     */
-    public static String getLibraryName() {
-        throw new IllegalStateException();
-    }
-
-    /**
-     * Return if native hadoop libraries, if present, can be used for this job.
-     * @param conf configuration
-     *
-     * @return <code>true</code> if native hadoop libraries, if present, can be
-     *         used for this job; <code>false</code> otherwise.
-     */
-    public boolean getLoadNativeLibraries(Configuration conf) {
-        return false;
-    }
-
-    /**
-     * Set if native hadoop libraries, if present, can be used for this job.
-     *
-     * @param conf configuration
-     * @param loadNativeLibraries can native hadoop libraries be loaded
-     */
-    public void setLoadNativeLibraries(Configuration conf, boolean loadNativeLibraries) {
-        // No-op.
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSerializationWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSerializationWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSerializationWrapper.java
deleted file mode 100644
index 0f38548..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSerializationWrapper.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.hadoop.io.serializer.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-
-/**
- * The wrapper around external serializer.
- */
-public class GridHadoopSerializationWrapper<T> implements GridHadoopSerialization {
-    /** External serializer - writer. */
-    private final Serializer<T> serializer;
-
-    /** External serializer - reader. */
-    private final Deserializer<T> deserializer;
-
-    /** Data output for current write operation. */
-    private OutputStream currOut;
-
-    /** Data input for current read operation. */
-    private InputStream currIn;
-
-    /** Wrapper around current output to provide OutputStream interface. */
-    private final OutputStream outStream = new OutputStream() {
-        /** {@inheritDoc} */
-        @Override public void write(int b) throws IOException {
-            currOut.write(b);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void write(byte[] b, int off, int len) throws IOException {
-            currOut.write(b, off, len);
-        }
-    };
-
-    /** Wrapper around current input to provide InputStream interface. */
-    private final InputStream inStream = new InputStream() {
-        /** {@inheritDoc} */
-        @Override public int read() throws IOException {
-            return currIn.read();
-        }
-
-        /** {@inheritDoc} */
-        @Override public int read(byte[] b, int off, int len) throws IOException {
-            return currIn.read(b, off, len);
-        }
-    };
-
-    /**
-     * @param serialization External serializer to wrap.
-     * @param cls The class to serialize.
-     */
-    public GridHadoopSerializationWrapper(Serialization<T> serialization, Class<T> cls) throws IgniteCheckedException {
-        assert cls != null;
-
-        serializer = serialization.getSerializer(cls);
-        deserializer = serialization.getDeserializer(cls);
-
-        try {
-            serializer.open(outStream);
-            deserializer.open(inStream);
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void write(DataOutput out, Object obj) throws IgniteCheckedException {
-        assert out != null;
-        assert obj != null;
-
-        try {
-            currOut = (OutputStream)out;
-
-            serializer.serialize((T)obj);
-
-            currOut = null;
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Object read(DataInput in, @Nullable Object obj) throws IgniteCheckedException {
-        assert in != null;
-
-        try {
-            currIn = (InputStream)in;
-
-            T res = deserializer.deserialize((T) obj);
-
-            currIn = null;
-
-            return res;
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close() throws IgniteCheckedException {
-        try {
-            serializer.close();
-            deserializer.close();
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopShutdownHookManager.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopShutdownHookManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopShutdownHookManager.java
deleted file mode 100644
index 48558fc..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopShutdownHookManager.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Fake manager for shutdown hooks.
- */
-public class GridHadoopShutdownHookManager {
-    /** */
-    private static final GridHadoopShutdownHookManager MGR = new GridHadoopShutdownHookManager();
-
-    /**
-     * Return <code>ShutdownHookManager</code> singleton.
-     *
-     * @return <code>ShutdownHookManager</code> singleton.
-     */
-    public static GridHadoopShutdownHookManager get() {
-        return MGR;
-    }
-
-    /** */
-    private Set<Runnable> hooks = Collections.synchronizedSet(new HashSet<Runnable>());
-
-    /** */
-    private AtomicBoolean shutdownInProgress = new AtomicBoolean(false);
-
-    /**
-     * Singleton.
-     */
-    private GridHadoopShutdownHookManager() {
-        // No-op.
-    }
-
-    /**
-     * Adds a shutdownHook with a priority, the higher the priority
-     * the earlier will run. ShutdownHooks with same priority run
-     * in a non-deterministic order.
-     *
-     * @param shutdownHook shutdownHook <code>Runnable</code>
-     * @param priority priority of the shutdownHook.
-     */
-    public void addShutdownHook(Runnable shutdownHook, int priority) {
-        if (shutdownHook == null)
-            throw new IllegalArgumentException("shutdownHook cannot be NULL");
-
-        hooks.add(shutdownHook);
-    }
-
-    /**
-     * Removes a shutdownHook.
-     *
-     * @param shutdownHook shutdownHook to remove.
-     * @return TRUE if the shutdownHook was registered and removed,
-     * FALSE otherwise.
-     */
-    public boolean removeShutdownHook(Runnable shutdownHook) {
-        return hooks.remove(shutdownHook);
-    }
-
-    /**
-     * Indicates if a shutdownHook is registered or not.
-     *
-     * @param shutdownHook shutdownHook to check if registered.
-     * @return TRUE/FALSE depending if the shutdownHook is is registered.
-     */
-    public boolean hasShutdownHook(Runnable shutdownHook) {
-        return hooks.contains(shutdownHook);
-    }
-
-    /**
-     * Indicates if shutdown is in progress or not.
-     *
-     * @return TRUE if the shutdown is in progress, otherwise FALSE.
-     */
-    public boolean isShutdownInProgress() {
-        return shutdownInProgress.get();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSplitWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSplitWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSplitWrapper.java
deleted file mode 100644
index 57edfa9..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopSplitWrapper.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-
-/**
- * The wrapper for native hadoop input splits.
- *
- * Warning!! This class must not depend on any Hadoop classes directly or indirectly.
- */
-public class GridHadoopSplitWrapper extends GridHadoopInputSplit {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Native hadoop input split. */
-    private byte[] bytes;
-
-    /** */
-    private String clsName;
-
-    /** Internal ID */
-    private int id;
-
-    /**
-     * Creates new split wrapper.
-     */
-    public GridHadoopSplitWrapper() {
-        // No-op.
-    }
-
-    /**
-     * Creates new split wrapper.
-     *
-     * @param id Split ID.
-     * @param clsName Class name.
-     * @param bytes Serialized class.
-     * @param hosts Hosts where split is located.
-     */
-    public GridHadoopSplitWrapper(int id, String clsName, byte[] bytes, String[] hosts) {
-        assert hosts != null;
-        assert clsName != null;
-        assert bytes != null;
-
-        this.hosts = hosts;
-        this.id = id;
-
-        this.clsName = clsName;
-        this.bytes = bytes;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeInt(id);
-
-        out.writeUTF(clsName);
-        U.writeByteArray(out, bytes);
-    }
-
-    /**
-     * @return Class name.
-     */
-    public String className() {
-        return clsName;
-    }
-
-    /**
-     * @return Class bytes.
-     */
-    public byte[] bytes() {
-        return bytes;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        id = in.readInt();
-
-        clsName = in.readUTF();
-        bytes = U.readByteArray(in);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object o) {
-        if (this == o)
-            return true;
-
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        GridHadoopSplitWrapper that = (GridHadoopSplitWrapper)o;
-
-        return id == that.id;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        return id;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2CleanupTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2CleanupTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2CleanupTask.java
deleted file mode 100644
index 38be3da..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2CleanupTask.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.*;
-
-import java.io.*;
-
-/**
- * Hadoop cleanup task (commits or aborts job).
- */
-public class GridHadoopV2CleanupTask extends GridHadoopV2Task {
-    /** Abort flag. */
-    private final boolean abort;
-
-    /**
-     * @param taskInfo Task info.
-     * @param abort Abort flag.
-     */
-    public GridHadoopV2CleanupTask(GridHadoopTaskInfo taskInfo, boolean abort) {
-        super(taskInfo);
-
-        this.abort = abort;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("ConstantConditions")
-    @Override public void run0(GridHadoopV2TaskContext taskCtx) throws IgniteCheckedException {
-        JobContextImpl jobCtx = taskCtx.jobContext();
-
-        try {
-            OutputFormat outputFormat = getOutputFormat(jobCtx);
-
-            OutputCommitter committer = outputFormat.getOutputCommitter(hadoopContext());
-
-            if (committer != null) {
-                if (abort)
-                    committer.abortJob(jobCtx, JobStatus.State.FAILED);
-                else
-                    committer.commitJob(jobCtx);
-            }
-        }
-        catch (ClassNotFoundException | IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-
-            throw new IgniteInterruptedCheckedException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Context.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Context.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Context.java
deleted file mode 100644
index 9964d91..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/GridHadoopV2Context.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.mapreduce.lib.input.*;
-import org.apache.hadoop.mapreduce.task.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.counter.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Hadoop context implementation for v2 API. It provides IO operations for hadoop tasks.
- */
-public class GridHadoopV2Context extends JobContextImpl implements MapContext, ReduceContext {
-    /** Input reader to overriding of GridHadoopTaskContext input. */
-    private RecordReader reader;
-
-    /** Output writer to overriding of GridHadoopTaskContext output. */
-    private RecordWriter writer;
-
-    /** Output is provided by executor environment. */
-    private final GridHadoopTaskOutput output;
-
-    /** Input is provided by executor environment. */
-    private final GridHadoopTaskInput input;
-
-    /** Unique identifier for a task attempt. */
-    private final TaskAttemptID taskAttemptID;
-
-    /** Indicates that this task is to be cancelled. */
-    private volatile boolean cancelled;
-
-    /** Input split. */
-    private InputSplit inputSplit;
-
-    /** */
-    private final GridHadoopTaskContext ctx;
-
-    /** */
-    private String status;
-
-    /**
-     * @param ctx Context for IO operations.
-     */
-    public GridHadoopV2Context(GridHadoopV2TaskContext ctx) {
-        super(ctx.jobConf(), ctx.jobContext().getJobID());
-
-        taskAttemptID = ctx.attemptId();
-
-        conf.set("mapreduce.job.id", taskAttemptID.getJobID().toString());
-        conf.set("mapreduce.task.id", taskAttemptID.getTaskID().toString());
-
-        output = ctx.output();
-        input = ctx.input();
-
-        this.ctx = ctx;
-    }
-
-    /** {@inheritDoc} */
-    @Override public InputSplit getInputSplit() {
-        if (inputSplit == null) {
-            GridHadoopInputSplit split = ctx.taskInfo().inputSplit();
-
-            if (split == null)
-                return null;
-
-            if (split instanceof GridHadoopFileBlock) {
-                GridHadoopFileBlock fileBlock = (GridHadoopFileBlock)split;
-
-                inputSplit = new FileSplit(new Path(fileBlock.file()), fileBlock.start(), fileBlock.length(), null);
-            }
-            else if (split instanceof GridHadoopExternalSplit)
-                throw new UnsupportedOperationException(); // TODO
-            else if (split instanceof GridHadoopSplitWrapper)
-                inputSplit = (InputSplit) HadoopUtils.unwrapSplit((GridHadoopSplitWrapper) split);
-            else
-                throw new IllegalStateException();
-        }
-
-        return inputSplit;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean nextKeyValue() throws IOException, InterruptedException {
-        if (cancelled)
-            throw new HadoopTaskCancelledException("Task cancelled.");
-
-        return reader.nextKeyValue();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Object getCurrentKey() throws IOException, InterruptedException {
-        if (reader != null)
-            return reader.getCurrentKey();
-
-        return input.key();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Object getCurrentValue() throws IOException, InterruptedException {
-        return reader.getCurrentValue();
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public void write(Object key, Object val) throws IOException, InterruptedException {
-        if (cancelled)
-            throw new HadoopTaskCancelledException("Task cancelled.");
-
-        if (writer != null)
-            writer.write(key, val);
-        else {
-            try {
-                output.write(key, val);
-            }
-            catch (IgniteCheckedException e) {
-                throw new IOException(e);
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public OutputCommitter getOutputCommitter() {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public TaskAttemptID getTaskAttemptID() {
-        return taskAttemptID;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setStatus(String msg) {
-        status = msg;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String getStatus() {
-        return status;
-    }
-
-    /** {@inheritDoc} */
-    @Override public float getProgress() {
-        return 0.5f; // TODO
-    }
-
-    /** {@inheritDoc} */
-    @Override public Counter getCounter(Enum<?> cntrName) {
-        return getCounter(cntrName.getDeclaringClass().getName(), cntrName.name());
-    }
-
-    /** {@inheritDoc} */
-    @Override public Counter getCounter(String grpName, String cntrName) {
-        return new GridHadoopV2Counter(ctx.counter(grpName, cntrName, HadoopLongCounter.class));
-    }
-
-    /** {@inheritDoc} */
-    @Override public void progress() {
-        // No-op.
-    }
-
-    /**
-     * Overrides default input data reader.
-     *
-     * @param reader New reader.
-     */
-    public void reader(RecordReader reader) {
-        this.reader = reader;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean nextKey() throws IOException, InterruptedException {
-        if (cancelled)
-            throw new HadoopTaskCancelledException("Task cancelled.");
-
-        return input.next();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Iterable getValues() throws IOException, InterruptedException {
-        return new Iterable() {
-            @Override public Iterator iterator() {
-                return input.values();
-            }
-        };
-    }
-
-    /**
-     * @return Overridden output data writer.
-     */
-    public RecordWriter writer() {
-        return writer;
-    }
-
-    /**
-     * Overrides default output data writer.
-     *
-     * @param writer New writer.
-     */
-    public void writer(RecordWriter writer) {
-        this.writer = writer;
-    }
-
-    /**
-     * Cancels the task by stop the IO.
-     */
-    public void cancel() {
-        cancelled = true;
-    }
-}