You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/03/05 10:06:43 UTC

[32/51] incubator-ignite git commit: IGNITE-386: Squashed changes.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
new file mode 100644
index 0000000..e0c5916
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.hadoop.mapreduce.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
+import org.apache.ignite.internal.processors.hadoop.shuffle.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopClassLoader.*;
+
+/**
+ * Hadoop processor.
+ */
+public class HadoopProcessor extends HadoopProcessorAdapter {
+    /** Job ID counter. */
+    private final AtomicInteger idCtr = new AtomicInteger();
+
+    /** Hadoop context. */
+    @GridToStringExclude
+    private HadoopContext hctx;
+
+    /** Hadoop facade for public API. */
+    @GridToStringExclude
+    private Hadoop hadoop;
+
+    /**
+     * @param ctx Kernal context.
+     */
+    public HadoopProcessor(GridKernalContext ctx) {
+        super(ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        if (ctx.isDaemon())
+            return;
+
+        HadoopConfiguration cfg = ctx.config().getHadoopConfiguration();
+
+        if (cfg == null)
+            cfg = new HadoopConfiguration();
+        else
+            cfg = new HadoopConfiguration(cfg);
+
+        initializeDefaults(cfg);
+
+        validate(cfg);
+
+        if (hadoopHome() != null)
+            U.quietAndInfo(log, "HADOOP_HOME is set to " + hadoopHome());
+
+        boolean ok = false;
+
+        try { // Check for Hadoop installation.
+            hadoopUrls();
+
+            ok = true;
+        }
+        catch (IgniteCheckedException e) {
+            U.quietAndWarn(log, e.getMessage());
+        }
+
+        if (ok) {
+            hctx = new HadoopContext(
+                ctx,
+                cfg,
+                new HadoopJobTracker(),
+                cfg.isExternalExecution() ? new HadoopExternalTaskExecutor() : new HadoopEmbeddedTaskExecutor(),
+                new HadoopShuffle());
+
+
+            for (HadoopComponent c : hctx.components())
+                c.start(hctx);
+
+            hadoop = new HadoopImpl(this);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopProcessor.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) throws IgniteCheckedException {
+        super.stop(cancel);
+
+        if (hctx == null)
+            return;
+
+        List<HadoopComponent> components = hctx.components();
+
+        for (ListIterator<HadoopComponent> it = components.listIterator(components.size()); it.hasPrevious();) {
+            HadoopComponent c = it.previous();
+
+            c.stop(cancel);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStart() throws IgniteCheckedException {
+        super.onKernalStart();
+
+        if (hctx == null)
+            return;
+
+        for (HadoopComponent c : hctx.components())
+            c.onKernalStart();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        super.onKernalStop(cancel);
+
+        if (hctx == null)
+            return;
+
+        List<HadoopComponent> components = hctx.components();
+
+        for (ListIterator<HadoopComponent> it = components.listIterator(components.size()); it.hasPrevious();) {
+            HadoopComponent c = it.previous();
+
+            c.onKernalStop(cancel);
+        }
+    }
+
+    /**
+     * Gets Hadoop context.
+     *
+     * @return Hadoop context.
+     */
+    public HadoopContext context() {
+        return hctx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Hadoop hadoop() {
+        if (hadoop == null)
+            throw new IllegalStateException("Hadoop accelerator is disabled (Hadoop is not in classpath, " +
+                "is HADOOP_HOME environment variable set?)");
+
+        return hadoop;
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopConfiguration config() {
+        return hctx.configuration();
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopJobId nextJobId() {
+        return new HadoopJobId(ctx.localNodeId(), idCtr.incrementAndGet());
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> submit(HadoopJobId jobId, HadoopJobInfo jobInfo) {
+        return hctx.jobTracker().submit(jobId, jobInfo);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopJobStatus status(HadoopJobId jobId) throws IgniteCheckedException {
+        return hctx.jobTracker().status(jobId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopCounters counters(HadoopJobId jobId) throws IgniteCheckedException {
+        return hctx.jobTracker().jobCounters(jobId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<?> finishFuture(HadoopJobId jobId) throws IgniteCheckedException {
+        return hctx.jobTracker().finishFuture(jobId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean kill(HadoopJobId jobId) throws IgniteCheckedException {
+        return hctx.jobTracker().killJob(jobId);
+    }
+
+    /**
+     * Initializes default hadoop configuration.
+     *
+     * @param cfg Hadoop configuration.
+     */
+    private void initializeDefaults(HadoopConfiguration cfg) {
+        if (cfg.getMapReducePlanner() == null)
+            cfg.setMapReducePlanner(new IgniteHadoopMapReducePlanner());
+    }
+
+    /**
+     * Validates Grid and Hadoop configuration for correctness.
+     *
+     * @param hadoopCfg Hadoop configuration.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void validate(HadoopConfiguration hadoopCfg) throws IgniteCheckedException {
+        if (ctx.config().isPeerClassLoadingEnabled())
+            throw new IgniteCheckedException("Peer class loading cannot be used with Hadoop (disable it using " +
+                "GridConfiguration.setPeerClassLoadingEnabled()).");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java
new file mode 100644
index 0000000..35df5da
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopSetup.java
@@ -0,0 +1,505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.net.*;
+import java.nio.file.*;
+import java.text.*;
+import java.util.*;
+
+import static org.apache.ignite.internal.IgniteVersionUtils.*;
+
+/**
+ * Setup tool to configure Hadoop client.
+ */
+public class HadoopSetup {
+    /** */
+    public static final String WINUTILS_EXE = "winutils.exe";
+
+    /** */
+    private static final FilenameFilter IGNITE_JARS = new FilenameFilter() {
+        @Override public boolean accept(File dir, String name) {
+            return name.startsWith("ignite-") && name.endsWith(".jar");
+        }
+    };
+
+    /**
+     * The main method.
+     * @param ignore Params.
+     */
+    public static void main(String[] ignore) {
+        X.println(
+            "   __________  ________________ ",
+            "  /  _/ ___/ |/ /  _/_  __/ __/ ",
+            " _/ // (_ /    // /  / / / _/   ",
+            "/___/\\___/_/|_/___/ /_/ /___/  ",
+            "                for Apache Hadoop        ",
+            "  ");
+
+        println("Version " + ACK_VER_STR);
+
+        configureHadoop();
+    }
+
+    /**
+     * This operation prepares the clean unpacked Hadoop distributive to work as client with Ignite-Hadoop.
+     * It performs these operations:
+     * <ul>
+     *     <li>Check for setting of HADOOP_HOME environment variable.</li>
+     *     <li>Try to resolve HADOOP_COMMON_HOME or evaluate it relative to HADOOP_HOME.</li>
+     *     <li>In Windows check if winutils.exe exists and try to fix issue with some restrictions.</li>
+     *     <li>In Windows check new line character issues in CMD scripts.</li>
+     *     <li>Scan Hadoop lib directory to detect Ignite JARs. If these don't exist tries to create ones.</li>
+     * </ul>
+     */
+    private static void configureHadoop() {
+        String igniteHome = U.getIgniteHome();
+
+        println("IGNITE_HOME is set to '" + igniteHome + "'.");
+
+        checkIgniteHome(igniteHome);
+
+        String homeVar = "HADOOP_HOME";
+        String hadoopHome = System.getenv(homeVar);
+
+        if (F.isEmpty(hadoopHome)) {
+            homeVar = "HADOOP_PREFIX";
+            hadoopHome = System.getenv(homeVar);
+        }
+
+        if (F.isEmpty(hadoopHome))
+            exit("Neither HADOOP_HOME nor HADOOP_PREFIX environment variable is set. Please set one of them to a " +
+                "valid Hadoop installation directory and run setup tool again.", null);
+
+        hadoopHome = hadoopHome.replaceAll("\"", "");
+
+        println(homeVar + " is set to '" + hadoopHome + "'.");
+
+        String hiveHome = System.getenv("HIVE_HOME");
+
+        if (!F.isEmpty(hiveHome)) {
+            hiveHome = hiveHome.replaceAll("\"", "");
+
+            println("HIVE_HOME is set to '" + hiveHome + "'.");
+        }
+
+        File hadoopDir = new File(hadoopHome);
+
+        if (!hadoopDir.exists())
+            exit("Hadoop installation folder does not exist.", null);
+
+        if (!hadoopDir.isDirectory())
+            exit("HADOOP_HOME must point to a directory.", null);
+
+        if (!hadoopDir.canRead())
+            exit("Hadoop installation folder can not be read. Please check permissions.", null);
+
+        File hadoopCommonDir;
+
+        String hadoopCommonHome = System.getenv("HADOOP_COMMON_HOME");
+
+        if (F.isEmpty(hadoopCommonHome)) {
+            hadoopCommonDir = new File(hadoopDir, "share/hadoop/common");
+
+            println("HADOOP_COMMON_HOME is not set, will use '" + hadoopCommonDir.getPath() + "'.");
+        }
+        else {
+            println("HADOOP_COMMON_HOME is set to '" + hadoopCommonHome + "'.");
+
+            hadoopCommonDir = new File(hadoopCommonHome);
+        }
+
+        if (!hadoopCommonDir.canRead())
+            exit("Failed to read Hadoop common dir in '" + hadoopCommonHome + "'.", null);
+
+        File hadoopCommonLibDir = new File(hadoopCommonDir, "lib");
+
+        if (!hadoopCommonLibDir.canRead())
+            exit("Failed to read Hadoop 'lib' folder in '" + hadoopCommonLibDir.getPath() + "'.", null);
+
+        if (U.isWindows()) {
+            checkJavaPathSpaces();
+
+            File hadoopBinDir = new File(hadoopDir, "bin");
+
+            if (!hadoopBinDir.canRead())
+                exit("Failed to read subdirectory 'bin' in HADOOP_HOME.", null);
+
+            File winutilsFile = new File(hadoopBinDir, WINUTILS_EXE);
+
+            if (!winutilsFile.exists()) {
+                if (ask("File '" + WINUTILS_EXE + "' does not exist. " +
+                    "It may be replaced by a stub. Create it?")) {
+                    println("Creating file stub '" + winutilsFile.getAbsolutePath() + "'.");
+
+                    boolean ok = false;
+
+                    try {
+                        ok = winutilsFile.createNewFile();
+                    }
+                    catch (IOException ignore) {
+                        // No-op.
+                    }
+
+                    if (!ok)
+                        exit("Failed to create '" + WINUTILS_EXE + "' file. Please check permissions.", null);
+                }
+                else
+                    println("Ok. But Hadoop client probably will not work on Windows this way...");
+            }
+
+            processCmdFiles(hadoopDir, "bin", "sbin", "libexec");
+        }
+
+        File igniteLibs = new File(new File(igniteHome), "libs");
+
+        if (!igniteLibs.exists())
+            exit("Ignite 'libs' folder is not found.", null);
+
+        Collection<File> jarFiles = new ArrayList<>();
+
+        addJarsInFolder(jarFiles, igniteLibs);
+        addJarsInFolder(jarFiles, new File(igniteLibs, "ignite-hadoop"));
+
+        boolean jarsLinksCorrect = true;
+
+        for (File file : jarFiles) {
+            File link = new File(hadoopCommonLibDir, file.getName());
+
+            jarsLinksCorrect &= isJarLinkCorrect(link, file);
+
+            if (!jarsLinksCorrect)
+                break;
+        }
+
+        if (!jarsLinksCorrect) {
+            if (ask("Ignite JAR files are not found in Hadoop 'lib' directory. " +
+                "Create appropriate symbolic links?")) {
+                File[] oldIgniteJarFiles = hadoopCommonLibDir.listFiles(IGNITE_JARS);
+
+                if (oldIgniteJarFiles.length > 0 && ask("The Hadoop 'lib' directory contains JARs from other Ignite " +
+                    "installation. They must be deleted to continue. Continue?")) {
+                    for (File file : oldIgniteJarFiles) {
+                        println("Deleting file '" + file.getAbsolutePath() + "'.");
+
+                        if (!file.delete())
+                            exit("Failed to delete file '" + file.getPath() + "'.", null);
+                    }
+                }
+
+                for (File file : jarFiles) {
+                    File targetFile = new File(hadoopCommonLibDir, file.getName());
+
+                    try {
+                        println("Creating symbolic link '" + targetFile.getAbsolutePath() + "'.");
+
+                        Files.createSymbolicLink(targetFile.toPath(), file.toPath());
+                    }
+                    catch (IOException e) {
+                        if (U.isWindows()) {
+                            warn("Ability to create symbolic links is required!");
+                            warn("On Windows platform you have to grant permission 'Create symbolic links'");
+                            warn("to your user or run the Accelerator as Administrator.");
+                        }
+
+                        exit("Creating symbolic link failed! Check permissions.", e);
+                    }
+                }
+            }
+            else
+                println("Ok. But Hadoop client will not be able to talk to Ignite cluster without those JARs in classpath...");
+        }
+
+        File hadoopEtc = new File(hadoopDir, "etc" + File.separator + "hadoop");
+
+        File igniteDocs = new File(igniteHome, "docs");
+
+        if (!igniteDocs.canRead())
+            exit("Failed to read Ignite 'docs' folder at '" + igniteDocs.getAbsolutePath() + "'.", null);
+
+        if (hadoopEtc.canWrite()) { // TODO Bigtop
+            if (ask("Replace 'core-site.xml' and 'mapred-site.xml' files with preconfigured templates " +
+                "(existing files will be backed up)?")) {
+                replaceWithBackup(new File(igniteDocs, "core-site.ignite.xml"), new File(hadoopEtc, "core-site.xml"));
+
+                replaceWithBackup(new File(igniteDocs, "mapred-site.ignite.xml"), new File(hadoopEtc, "mapred-site.xml"));
+            }
+            else
+                println("Ok. You can configure them later, the templates are available at Ignite's 'docs' directory...");
+        }
+
+        if (!F.isEmpty(hiveHome)) {
+            File hiveConfDir = new File(hiveHome + File.separator + "conf");
+
+            if (!hiveConfDir.canWrite())
+                warn("Can not write to '" + hiveConfDir.getAbsolutePath() + "'. To run Hive queries you have to " +
+                    "configure 'hive-site.xml' manually. The template is available at Ignite's 'docs' directory.");
+            else if (ask("Replace 'hive-site.xml' with preconfigured template (existing file will be backed up)?"))
+                replaceWithBackup(new File(igniteDocs, "hive-site.ignite.xml"), new File(hiveConfDir, "hive-site.xml"));
+            else
+                println("Ok. You can configure it later, the template is available at Ignite's 'docs' directory...");
+        }
+
+        println("Apache Hadoop setup is complete.");
+    }
+
+    /**
+     * @param jarFiles Jars.
+     * @param folder Folder.
+     */
+    private static void addJarsInFolder(Collection<File> jarFiles, File folder) {
+        if (!folder.exists())
+            exit("Folder '" + folder.getAbsolutePath() + "' is not found.", null);
+
+        jarFiles.addAll(Arrays.asList(folder.listFiles(IGNITE_JARS)));
+    }
+
+    /**
+     * Checks that JAVA_HOME does not contain space characters.
+     */
+    private static void checkJavaPathSpaces() {
+        String javaHome = System.getProperty("java.home");
+
+        if (javaHome.contains(" ")) {
+            warn("Java installation path contains space characters!");
+            warn("Hadoop client will not be able to start using '" + javaHome + "'.");
+            warn("Please install JRE to path which does not contain spaces and point JAVA_HOME to that installation.");
+        }
+    }
+
+    /**
+     * Checks Ignite home.
+     *
+     * @param igniteHome Ignite home.
+     */
+    private static void checkIgniteHome(String igniteHome) {
+        URL jarUrl = U.class.getProtectionDomain().getCodeSource().getLocation();
+
+        try {
+            Path jar = Paths.get(jarUrl.toURI());
+            Path igHome = Paths.get(igniteHome);
+
+            if (!jar.startsWith(igHome))
+                exit("Ignite JAR files are not under IGNITE_HOME.", null);
+        }
+        catch (Exception e) {
+            exit(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Replaces target file with source file.
+     *
+     * @param from From.
+     * @param to To.
+     */
+    private static void replaceWithBackup(File from, File to) {
+        if (!from.canRead())
+            exit("Failed to read source file '" + from.getAbsolutePath() + "'.", null);
+
+        println("Replacing file '" + to.getAbsolutePath() + "'.");
+
+        try {
+            U.copy(from, renameToBak(to), true);
+        }
+        catch (IOException e) {
+            exit("Failed to replace file '" + to.getAbsolutePath() + "'.", e);
+        }
+    }
+
+    /**
+     * Renames file for backup.
+     *
+     * @param file File.
+     * @return File.
+     */
+    private static File renameToBak(File file) {
+        DateFormat fmt = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
+
+        if (file.exists() && !file.renameTo(new File(file.getAbsolutePath() + "." + fmt.format(new Date()) + ".bak")))
+            exit("Failed to rename file '" + file.getPath() + "'.", null);
+
+        return file;
+    }
+
+    /**
+     * Checks if link is correct.
+     *
+     * @param link Symbolic link.
+     * @param correctTarget Correct link target.
+     * @return {@code true} If link target is correct.
+     */
+    private static boolean isJarLinkCorrect(File link, File correctTarget) {
+        if (!Files.isSymbolicLink(link.toPath()))
+            return false; // It is a real file or it does not exist.
+
+        Path target = null;
+
+        try {
+            target = Files.readSymbolicLink(link.toPath());
+        }
+        catch (IOException e) {
+            exit("Failed to read symbolic link: " + link.getAbsolutePath(), e);
+        }
+
+        return Files.exists(target) && target.toFile().equals(correctTarget);
+    }
+
+    /**
+     * Writes the question end read the boolean answer from the console.
+     *
+     * @param question Question to write.
+     * @return {@code true} if user inputs 'Y' or 'y', {@code false} otherwise.
+     */
+    private static boolean ask(String question) {
+        X.println();
+        X.print(" <  " + question + " (Y/N): ");
+
+        String answer = null;
+
+        if (!F.isEmpty(System.getenv("IGNITE_HADOOP_SETUP_YES")))
+            answer = "Y";
+        else {
+            BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
+
+            try {
+                answer = br.readLine();
+            }
+            catch (IOException e) {
+                exit("Failed to read answer: " + e.getMessage(), e);
+            }
+        }
+
+        if (answer != null && "Y".equals(answer.toUpperCase().trim())) {
+            X.println(" >  Yes.");
+
+            return true;
+        }
+        else {
+            X.println(" >  No.");
+
+            return false;
+        }
+    }
+
+    /**
+     * Exit with message.
+     *
+     * @param msg Exit message.
+     */
+    private static void exit(String msg, Exception e) {
+        X.println("    ");
+        X.println("  # " + msg);
+        X.println("  # Setup failed, exiting... ");
+
+        if (e != null && !F.isEmpty(System.getenv("IGNITE_HADOOP_SETUP_DEBUG")))
+            e.printStackTrace();
+
+        System.exit(1);
+    }
+
+    /**
+     * Prints message.
+     *
+     * @param msg Message.
+     */
+    private static void println(String msg) {
+        X.println("  > " + msg);
+    }
+
+    /**
+     * Prints warning.
+     *
+     * @param msg Message.
+     */
+    private static void warn(String msg) {
+        X.println("  ! " + msg);
+    }
+
+    /**
+     * Checks that CMD files have valid MS Windows new line characters. If not, writes question to console and reads the
+     * answer. If it's 'Y' then backups original files and corrects invalid new line characters.
+     *
+     * @param rootDir Root directory to process.
+     * @param dirs Directories inside of the root to process.
+     */
+    private static void processCmdFiles(File rootDir, String... dirs) {
+        boolean answer = false;
+
+        for (String dir : dirs) {
+            File subDir = new File(rootDir, dir);
+
+            File[] cmdFiles = subDir.listFiles(new FilenameFilter() {
+                @Override public boolean accept(File dir, String name) {
+                    return name.toLowerCase().endsWith(".cmd");
+                }
+            });
+
+            for (File file : cmdFiles) {
+                String content = null;
+
+                try (Scanner scanner = new Scanner(file)) {
+                    content = scanner.useDelimiter("\\Z").next();
+                }
+                catch (FileNotFoundException e) {
+                    exit("Failed to read file '" + file + "'.", e);
+                }
+
+                boolean invalid = false;
+
+                for (int i = 0; i < content.length(); i++) {
+                    if (content.charAt(i) == '\n' && (i == 0 || content.charAt(i - 1) != '\r')) {
+                        invalid = true;
+
+                        break;
+                    }
+                }
+
+                if (invalid) {
+                    answer = answer || ask("One or more *.CMD files has invalid new line character. Replace them?");
+
+                    if (!answer) {
+                        println("Ok. But Windows most probably will fail to execute them...");
+
+                        return;
+                    }
+
+                    println("Fixing newline characters in file '" + file.getAbsolutePath() + "'.");
+
+                    renameToBak(file);
+
+                    try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) {
+                        for (int i = 0; i < content.length(); i++) {
+                            if (content.charAt(i) == '\n' && (i == 0 || content.charAt(i - 1) != '\r'))
+                                writer.write("\r");
+
+                            writer.write(content.charAt(i));
+                        }
+                    }
+                    catch (IOException e) {
+                        exit("Failed to write file '" + file.getPath() + "': " + e.getMessage(), e);
+                    }
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskCancelledException.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskCancelledException.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskCancelledException.java
new file mode 100644
index 0000000..bb3d1cc
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskCancelledException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.*;
+
+/**
+ * Exception that throws when the task is cancelling.
+ */
+public class HadoopTaskCancelledException extends IgniteException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * @param msg Exception message.
+     */
+    public HadoopTaskCancelledException(String msg) {
+        super(msg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
new file mode 100644
index 0000000..00be422
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobPriority;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.v2.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Hadoop utility methods.
+ */
+public class HadoopUtils {
+    /** Property to store timestamp of new job id request. */
+    public static final String REQ_NEW_JOBID_TS_PROPERTY = "ignite.job.requestNewIdTs";
+
+    /** Property to store timestamp of response of new job id request. */
+    public static final String RESPONSE_NEW_JOBID_TS_PROPERTY = "ignite.job.responseNewIdTs";
+
+    /** Property to store timestamp of job submission. */
+    public static final String JOB_SUBMISSION_START_TS_PROPERTY = "ignite.job.submissionStartTs";
+
+    /** Property to set custom writer of job statistics. */
+    public static final String JOB_COUNTER_WRITER_PROPERTY = "ignite.counters.writer";
+
+    /** Staging constant. */
+    private static final String STAGING_CONSTANT = ".staging";
+
+    /** Old mapper class attribute. */
+    private static final String OLD_MAP_CLASS_ATTR = "mapred.mapper.class";
+
+    /** Old reducer class attribute. */
+    private static final String OLD_REDUCE_CLASS_ATTR = "mapred.reducer.class";
+
+    /**
+     * Wraps native split.
+     *
+     * @param id Split ID.
+     * @param split Split.
+     * @param hosts Hosts.
+     * @throws IOException If failed.
+     */
+    public static HadoopSplitWrapper wrapSplit(int id, Object split, String[] hosts) throws IOException {
+        ByteArrayOutputStream arr = new ByteArrayOutputStream();
+        ObjectOutput out = new ObjectOutputStream(arr);
+
+        assert split instanceof Writable;
+
+        ((Writable)split).write(out);
+
+        out.flush();
+
+        return new HadoopSplitWrapper(id, split.getClass().getName(), arr.toByteArray(), hosts);
+    }
+
+    /**
+     * Unwraps native split.
+     *
+     * @param o Wrapper.
+     * @return Split.
+     */
+    public static Object unwrapSplit(HadoopSplitWrapper o) {
+        try {
+            Writable w = (Writable)HadoopUtils.class.getClassLoader().loadClass(o.className()).newInstance();
+
+            w.readFields(new ObjectInputStream(new ByteArrayInputStream(o.bytes())));
+
+            return w;
+        }
+        catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    /**
+     * Convert Ignite job status to Hadoop job status.
+     *
+     * @param status Ignite job status.
+     * @return Hadoop job status.
+     */
+    public static JobStatus status(HadoopJobStatus status, Configuration conf) {
+        JobID jobId = new JobID(status.jobId().globalId().toString(), status.jobId().localId());
+
+        float setupProgress = 0;
+        float mapProgress = 0;
+        float reduceProgress = 0;
+        float cleanupProgress = 0;
+
+        JobStatus.State state = JobStatus.State.RUNNING;
+
+        switch (status.jobPhase()) {
+            case PHASE_SETUP:
+                setupProgress = 0.42f;
+
+                break;
+
+            case PHASE_MAP:
+                setupProgress = 1;
+                mapProgress = 1f - status.pendingMapperCnt() / (float)status.totalMapperCnt();
+
+                break;
+
+            case PHASE_REDUCE:
+                assert status.totalReducerCnt() > 0;
+
+                setupProgress = 1;
+                mapProgress = 1;
+                reduceProgress = 1f - status.pendingReducerCnt() / (float)status.totalReducerCnt();
+
+                break;
+
+            case PHASE_CANCELLING:
+            case PHASE_COMPLETE:
+                if (!status.isFailed()) {
+                    setupProgress = 1;
+                    mapProgress = 1;
+                    reduceProgress = 1;
+                    cleanupProgress = 1;
+
+                    state = JobStatus.State.SUCCEEDED;
+                }
+                else
+                    state = JobStatus.State.FAILED;
+
+                break;
+
+            default:
+                assert false;
+        }
+
+        return new JobStatus(jobId, setupProgress, mapProgress, reduceProgress, cleanupProgress, state,
+            JobPriority.NORMAL, status.user(), status.jobName(), jobFile(conf, status.user(), jobId).toString(), "N/A");
+    }
+
+    /**
+     * Gets staging area directory.
+     *
+     * @param conf Configuration.
+     * @param usr User.
+     * @return Staging area directory.
+     */
+    public static Path stagingAreaDir(Configuration conf, String usr) {
+        return new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR, MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
+            + Path.SEPARATOR + usr + Path.SEPARATOR + STAGING_CONSTANT);
+    }
+
+    /**
+     * Gets job file.
+     *
+     * @param conf Configuration.
+     * @param usr User.
+     * @param jobId Job ID.
+     * @return Job file.
+     */
+    public static Path jobFile(Configuration conf, String usr, JobID jobId) {
+        return new Path(stagingAreaDir(conf, usr), jobId.toString() + Path.SEPARATOR + MRJobConfig.JOB_CONF_FILE);
+    }
+
+    /**
+     * Checks the attribute in configuration is not set.
+     *
+     * @param attr Attribute name.
+     * @param msg Message for creation of exception.
+     * @throws IgniteCheckedException If attribute is set.
+     */
+    public static void ensureNotSet(Configuration cfg, String attr, String msg) throws IgniteCheckedException {
+        if (cfg.get(attr) != null)
+            throw new IgniteCheckedException(attr + " is incompatible with " + msg + " mode.");
+    }
+
+    /**
+     * Creates JobInfo from hadoop configuration.
+     *
+     * @param cfg Hadoop configuration.
+     * @return Job info.
+     * @throws IgniteCheckedException If failed.
+     */
+    public static HadoopDefaultJobInfo createJobInfo(Configuration cfg) throws IgniteCheckedException {
+        JobConf jobConf = new JobConf(cfg);
+
+        boolean hasCombiner = jobConf.get("mapred.combiner.class") != null
+                || jobConf.get(MRJobConfig.COMBINE_CLASS_ATTR) != null;
+
+        int numReduces = jobConf.getNumReduceTasks();
+
+        jobConf.setBooleanIfUnset("mapred.mapper.new-api", jobConf.get(OLD_MAP_CLASS_ATTR) == null);
+
+        if (jobConf.getUseNewMapper()) {
+            String mode = "new map API";
+
+            ensureNotSet(jobConf, "mapred.input.format.class", mode);
+            ensureNotSet(jobConf, OLD_MAP_CLASS_ATTR, mode);
+
+            if (numReduces != 0)
+                ensureNotSet(jobConf, "mapred.partitioner.class", mode);
+            else
+                ensureNotSet(jobConf, "mapred.output.format.class", mode);
+        }
+        else {
+            String mode = "map compatibility";
+
+            ensureNotSet(jobConf, MRJobConfig.INPUT_FORMAT_CLASS_ATTR, mode);
+            ensureNotSet(jobConf, MRJobConfig.MAP_CLASS_ATTR, mode);
+
+            if (numReduces != 0)
+                ensureNotSet(jobConf, MRJobConfig.PARTITIONER_CLASS_ATTR, mode);
+            else
+                ensureNotSet(jobConf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode);
+        }
+
+        if (numReduces != 0) {
+            jobConf.setBooleanIfUnset("mapred.reducer.new-api", jobConf.get(OLD_REDUCE_CLASS_ATTR) == null);
+
+            if (jobConf.getUseNewReducer()) {
+                String mode = "new reduce API";
+
+                ensureNotSet(jobConf, "mapred.output.format.class", mode);
+                ensureNotSet(jobConf, OLD_REDUCE_CLASS_ATTR, mode);
+            }
+            else {
+                String mode = "reduce compatibility";
+
+                ensureNotSet(jobConf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode);
+                ensureNotSet(jobConf, MRJobConfig.REDUCE_CLASS_ATTR, mode);
+            }
+        }
+
+        Map<String, String> props = new HashMap<>();
+
+        for (Map.Entry<String, String> entry : jobConf)
+            props.put(entry.getKey(), entry.getValue());
+
+        return new HadoopDefaultJobInfo(jobConf.getJobName(), jobConf.getUser(), hasCombiner, numReduces, props);
+    }
+
+    /**
+     * Throws new {@link IgniteCheckedException} with original exception is serialized into string.
+     * This is needed to transfer error outside the current class loader.
+     *
+     * @param e Original exception.
+     * @return IgniteCheckedException New exception.
+     */
+    public static IgniteCheckedException transformException(Throwable e) {
+        ByteArrayOutputStream os = new ByteArrayOutputStream();
+
+        e.printStackTrace(new PrintStream(os, true));
+
+        return new IgniteCheckedException(os.toString());
+    }
+
+    /**
+     * Returns work directory for job execution.
+     *
+     * @param locNodeId Local node ID.
+     * @param jobId Job ID.
+     * @return Working directory for job.
+     * @throws IgniteCheckedException If Failed.
+     */
+    public static File jobLocalDir(UUID locNodeId, HadoopJobId jobId) throws IgniteCheckedException {
+        return new File(new File(U.resolveWorkDirectory("hadoop", false), "node-" + locNodeId), "job_" + jobId);
+    }
+
+    /**
+     * Returns subdirectory of job working directory for task execution.
+     *
+     * @param locNodeId Local node ID.
+     * @param info Task info.
+     * @return Working directory for task.
+     * @throws IgniteCheckedException If Failed.
+     */
+    public static File taskLocalDir(UUID locNodeId, HadoopTaskInfo info) throws IgniteCheckedException {
+        File jobLocDir = jobLocalDir(locNodeId, info.jobId());
+
+        return new File(jobLocDir, info.type() + "_" + info.taskNumber() + "_" + info.attempt());
+    }
+
+    /**
+     * Constructor.
+     */
+    private HadoopUtils() {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessor.java
deleted file mode 100644
index 4ef9e35..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/IgniteHadoopProcessor.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
-import org.apache.ignite.internal.processors.hadoop.planner.*;
-import org.apache.ignite.internal.processors.hadoop.shuffle.*;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.*;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.internal.processors.hadoop.GridHadoopClassLoader.*;
-
-/**
- * Hadoop processor.
- */
-public class IgniteHadoopProcessor extends IgniteHadoopProcessorAdapter {
-    /** Job ID counter. */
-    private final AtomicInteger idCtr = new AtomicInteger();
-
-    /** Hadoop context. */
-    @GridToStringExclude
-    private GridHadoopContext hctx;
-
-    /** Hadoop facade for public API. */
-    @GridToStringExclude
-    private GridHadoop hadoop;
-
-    /**
-     * @param ctx Kernal context.
-     */
-    public IgniteHadoopProcessor(GridKernalContext ctx) {
-        super(ctx);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void start() throws IgniteCheckedException {
-        if (ctx.isDaemon())
-            return;
-
-        GridHadoopConfiguration cfg = ctx.config().getHadoopConfiguration();
-
-        if (cfg == null)
-            cfg = new GridHadoopConfiguration();
-        else
-            cfg = new GridHadoopConfiguration(cfg);
-
-        initializeDefaults(cfg);
-
-        validate(cfg);
-
-        if (hadoopHome() != null)
-            U.quietAndInfo(log, "HADOOP_HOME is set to " + hadoopHome());
-
-        boolean ok = false;
-
-        try { // Check for Hadoop installation.
-            hadoopUrls();
-
-            ok = true;
-        }
-        catch (IgniteCheckedException e) {
-            U.quietAndWarn(log, e.getMessage());
-        }
-
-        if (ok) {
-            hctx = new GridHadoopContext(
-                ctx,
-                cfg,
-                new GridHadoopJobTracker(),
-                cfg.isExternalExecution() ? new GridHadoopExternalTaskExecutor() : new GridHadoopEmbeddedTaskExecutor(),
-                new GridHadoopShuffle());
-
-
-            for (GridHadoopComponent c : hctx.components())
-                c.start(hctx);
-
-            hadoop = new GridHadoopImpl(this);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(IgniteHadoopProcessor.class, this);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void stop(boolean cancel) throws IgniteCheckedException {
-        super.stop(cancel);
-
-        if (hctx == null)
-            return;
-
-        List<GridHadoopComponent> components = hctx.components();
-
-        for (ListIterator<GridHadoopComponent> it = components.listIterator(components.size()); it.hasPrevious();) {
-            GridHadoopComponent c = it.previous();
-
-            c.stop(cancel);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onKernalStart() throws IgniteCheckedException {
-        super.onKernalStart();
-
-        if (hctx == null)
-            return;
-
-        for (GridHadoopComponent c : hctx.components())
-            c.onKernalStart();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onKernalStop(boolean cancel) {
-        super.onKernalStop(cancel);
-
-        if (hctx == null)
-            return;
-
-        List<GridHadoopComponent> components = hctx.components();
-
-        for (ListIterator<GridHadoopComponent> it = components.listIterator(components.size()); it.hasPrevious();) {
-            GridHadoopComponent c = it.previous();
-
-            c.onKernalStop(cancel);
-        }
-    }
-
-    /**
-     * Gets Hadoop context.
-     *
-     * @return Hadoop context.
-     */
-    public GridHadoopContext context() {
-        return hctx;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridHadoop hadoop() {
-        if (hadoop == null)
-            throw new IllegalStateException("Hadoop accelerator is disabled (Hadoop is not in classpath, " +
-                "is HADOOP_HOME environment variable set?)");
-
-        return hadoop;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridHadoopConfiguration config() {
-        return hctx.configuration();
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridHadoopJobId nextJobId() {
-        return new GridHadoopJobId(ctx.localNodeId(), idCtr.incrementAndGet());
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> submit(GridHadoopJobId jobId, GridHadoopJobInfo jobInfo) {
-        return hctx.jobTracker().submit(jobId, jobInfo);
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridHadoopJobStatus status(GridHadoopJobId jobId) throws IgniteCheckedException {
-        return hctx.jobTracker().status(jobId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridHadoopCounters counters(GridHadoopJobId jobId) throws IgniteCheckedException {
-        return hctx.jobTracker().jobCounters(jobId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> finishFuture(GridHadoopJobId jobId) throws IgniteCheckedException {
-        return hctx.jobTracker().finishFuture(jobId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean kill(GridHadoopJobId jobId) throws IgniteCheckedException {
-        return hctx.jobTracker().killJob(jobId);
-    }
-
-    /**
-     * Initializes default hadoop configuration.
-     *
-     * @param cfg Hadoop configuration.
-     */
-    private void initializeDefaults(GridHadoopConfiguration cfg) {
-        if (cfg.getMapReducePlanner() == null)
-            cfg.setMapReducePlanner(new GridHadoopDefaultMapReducePlanner());
-    }
-
-    /**
-     * Validates Grid and Hadoop configuration for correctness.
-     *
-     * @param hadoopCfg Hadoop configuration.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void validate(GridHadoopConfiguration hadoopCfg) throws IgniteCheckedException {
-        if (ctx.config().isPeerClassLoadingEnabled())
-            throw new IgniteCheckedException("Peer class loading cannot be used with Hadoop (disable it using " +
-                "GridConfiguration.setPeerClassLoadingEnabled()).");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCounterAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCounterAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCounterAdapter.java
deleted file mode 100644
index 9e46846..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCounterAdapter.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.counter;
-
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-
-/**
- * Default Hadoop counter implementation.
- */
-public abstract class GridHadoopCounterAdapter implements GridHadoopCounter, Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Counter group name. */
-    private String grp;
-
-    /** Counter name. */
-    private String name;
-
-    /**
-     * Default constructor required by {@link Externalizable}.
-     */
-    protected GridHadoopCounterAdapter() {
-        // No-op.
-    }
-
-    /**
-     * Creates new counter with given group and name.
-     *
-     * @param grp Counter group name.
-     * @param name Counter name.
-     */
-    protected GridHadoopCounterAdapter(String grp, String name) {
-        assert grp != null : "counter must have group";
-        assert name != null : "counter must have name";
-
-        this.grp = grp;
-        this.name = name;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String name() {
-        return name;
-    }
-
-    /** {@inheritDoc} */
-    @Override @Nullable public String group() {
-        return grp;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeUTF(grp);
-        out.writeUTF(name);
-        writeValue(out);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        grp = in.readUTF();
-        name = in.readUTF();
-        readValue(in);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object o) {
-        if (this == o)
-            return true;
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        GridHadoopCounterAdapter cntr = (GridHadoopCounterAdapter)o;
-
-        if (!grp.equals(cntr.grp))
-            return false;
-        if (!name.equals(cntr.name))
-            return false;
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        int res = grp.hashCode();
-        res = 31 * res + name.hashCode();
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridHadoopCounterAdapter.class, this);
-    }
-
-    /**
-     * Writes value of this counter to output.
-     *
-     * @param out Output.
-     * @throws IOException If failed.
-     */
-    protected abstract void writeValue(ObjectOutput out) throws IOException;
-
-    /**
-     * Read value of this counter from input.
-     *
-     * @param in Input.
-     * @throws IOException If failed.
-     */
-    protected abstract void readValue(ObjectInput in) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCountersImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCountersImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCountersImpl.java
deleted file mode 100644
index 92d54af..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopCountersImpl.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.counter;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jdk8.backport.*;
-
-import java.io.*;
-import java.lang.reflect.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- * Default in-memory counters store.
- */
-public class GridHadoopCountersImpl implements GridHadoopCounters, Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private final ConcurrentMap<CounterKey, GridHadoopCounter> cntrsMap = new ConcurrentHashMap8<>();
-
-    /**
-     * Default constructor. Creates new instance without counters.
-     */
-    public GridHadoopCountersImpl() {
-        // No-op.
-    }
-
-    /**
-     * Creates new instance that contain given counters.
-     *
-     * @param cntrs Counters to store.
-     */
-    public GridHadoopCountersImpl(Iterable<GridHadoopCounter> cntrs) {
-        addCounters(cntrs, true);
-    }
-
-    /**
-     * Copy constructor.
-     *
-     * @param cntrs Counters to copy.
-     */
-    public GridHadoopCountersImpl(GridHadoopCounters cntrs) {
-        this(cntrs.all());
-    }
-
-    /**
-     * Creates counter instance.
-     *
-     * @param cls Class of the counter.
-     * @param grp Group name.
-     * @param name Counter name.
-     * @return Counter.
-     */
-    private <T extends GridHadoopCounter> T createCounter(Class<? extends GridHadoopCounter> cls, String grp,
-        String name) {
-        try {
-            Constructor constructor = cls.getConstructor(String.class, String.class);
-
-            return (T)constructor.newInstance(grp, name);
-        }
-        catch (Exception e) {
-            throw new IgniteException(e);
-        }
-    }
-
-    /**
-     * Adds counters collection in addition to existing counters.
-     *
-     * @param cntrs Counters to add.
-     * @param cp Whether to copy counters or not.
-     */
-    private void addCounters(Iterable<GridHadoopCounter> cntrs, boolean cp) {
-        assert cntrs != null;
-
-        for (GridHadoopCounter cntr : cntrs) {
-            if (cp) {
-                GridHadoopCounter cntrCp = createCounter(cntr.getClass(), cntr.group(), cntr.name());
-
-                cntrCp.merge(cntr);
-
-                cntr = cntrCp;
-            }
-
-            cntrsMap.put(new CounterKey(cntr.getClass(), cntr.group(), cntr.name()), cntr);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public <T extends GridHadoopCounter> T counter(String grp, String name, Class<T> cls) {
-        assert cls != null;
-
-        CounterKey mapKey = new CounterKey(cls, grp, name);
-
-        T cntr = (T)cntrsMap.get(mapKey);
-
-        if (cntr == null) {
-            cntr = createCounter(cls, grp, name);
-
-            T old = (T)cntrsMap.putIfAbsent(mapKey, cntr);
-
-            if (old != null)
-                return old;
-        }
-
-        return cntr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<GridHadoopCounter> all() {
-        return cntrsMap.values();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void merge(GridHadoopCounters other) {
-        for (GridHadoopCounter counter : other.all())
-            counter(counter.group(), counter.name(), counter.getClass()).merge(counter);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        U.writeCollection(out, cntrsMap.values());
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        addCounters(U.<GridHadoopCounter>readCollection(in), false);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object o) {
-        if (this == o)
-            return true;
-
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        GridHadoopCountersImpl counters = (GridHadoopCountersImpl)o;
-
-        return cntrsMap.equals(counters.cntrsMap);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        return cntrsMap.hashCode();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridHadoopCountersImpl.class, this, "counters", cntrsMap.values());
-    }
-
-    /**
-     * The tuple of counter identifier components for more readable code.
-     */
-    private static class CounterKey extends GridTuple3<Class<? extends GridHadoopCounter>, String, String> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /**
-         * Constructor.
-         *
-         * @param cls Class of the counter.
-         * @param grp Group name.
-         * @param name Counter name.
-         */
-        private CounterKey(Class<? extends GridHadoopCounter> cls, String grp, String name) {
-            super(cls, grp, name);
-        }
-
-        /**
-         * Empty constructor required by {@link Externalizable}.
-         */
-        public CounterKey() {
-            // No-op.
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopFSCounterWriter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopFSCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopFSCounterWriter.java
deleted file mode 100644
index 55dcc4c..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopFSCounterWriter.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.counter;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.typedef.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Statistic writer implementation that writes info into any Hadoop file system.
- */
-public class GridHadoopFSCounterWriter implements GridHadoopCounterWriter {
-    /** */
-    public static final String PERFORMANCE_COUNTER_FILE_NAME = "performance";
-
-    /** */
-    private static final String DEFAULT_USER_NAME = "anonymous";
-
-    /** */
-    public static final String COUNTER_WRITER_DIR_PROPERTY = "ignite.counters.fswriter.directory";
-
-    /** */
-    private static final String USER_MACRO = "${USER}";
-
-    /** */
-    private static final String DEFAULT_COUNTER_WRITER_DIR = "/user/" + USER_MACRO;
-
-    /** {@inheritDoc} */
-    @Override public void write(GridHadoopJobInfo jobInfo, GridHadoopJobId jobId, GridHadoopCounters cntrs)
-        throws IgniteCheckedException {
-
-        Configuration hadoopCfg = new Configuration();
-
-        for (Map.Entry<String, String> e : ((GridHadoopDefaultJobInfo)jobInfo).properties().entrySet())
-            hadoopCfg.set(e.getKey(), e.getValue());
-
-        String user = jobInfo.user();
-
-        if (F.isEmpty(user))
-            user = DEFAULT_USER_NAME;
-
-        String dir = jobInfo.property(COUNTER_WRITER_DIR_PROPERTY);
-
-        if (dir == null)
-            dir = DEFAULT_COUNTER_WRITER_DIR;
-
-        Path jobStatPath = new Path(new Path(dir.replace(USER_MACRO, user)), jobId.toString());
-
-        GridHadoopPerformanceCounter perfCntr = GridHadoopPerformanceCounter.getCounter(cntrs, null);
-
-        try {
-            FileSystem fs = jobStatPath.getFileSystem(hadoopCfg);
-
-            fs.mkdirs(jobStatPath);
-
-            try (PrintStream out = new PrintStream(fs.create(new Path(jobStatPath, PERFORMANCE_COUNTER_FILE_NAME)))) {
-                for (T2<String, Long> evt : perfCntr.evts()) {
-                    out.print(evt.get1());
-                    out.print(':');
-                    out.println(evt.get2().toString());
-                }
-
-                out.flush();
-            }
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopLongCounter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopLongCounter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopLongCounter.java
deleted file mode 100644
index 67af49f..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopLongCounter.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.counter;
-
-import org.apache.ignite.internal.processors.hadoop.*;
-
-import java.io.*;
-
-/**
- * Standard hadoop counter to use via original Hadoop API in Hadoop jobs.
- */
-public class GridHadoopLongCounter extends GridHadoopCounterAdapter {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** The counter value. */
-    private long val;
-
-    /**
-     * Default constructor required by {@link Externalizable}.
-     */
-    public GridHadoopLongCounter() {
-        // No-op.
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param grp Group name.
-     * @param name Counter name.
-     */
-    public GridHadoopLongCounter(String grp, String name) {
-        super(grp, name);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void writeValue(ObjectOutput out) throws IOException {
-        out.writeLong(val);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void readValue(ObjectInput in) throws IOException {
-        val = in.readLong();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void merge(GridHadoopCounter cntr) {
-        val += ((GridHadoopLongCounter)cntr).val;
-    }
-
-    /**
-     * Gets current value of this counter.
-     *
-     * @return Current value.
-     */
-    public long value() {
-        return val;
-    }
-
-    /**
-     * Sets current value by the given value.
-     *
-     * @param val Value to set.
-     */
-    public void value(long val) {
-        this.val = val;
-    }
-
-    /**
-     * Increment this counter by the given value.
-     *
-     * @param i Value to increase this counter by.
-     */
-    public void increment(long i) {
-        val += i;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopPerformanceCounter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopPerformanceCounter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopPerformanceCounter.java
deleted file mode 100644
index d5ceebf..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/GridHadoopPerformanceCounter.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.counter;
-
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-
-import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*;
-
-/**
- * Counter for the job statistics accumulation.
- */
-public class GridHadoopPerformanceCounter extends GridHadoopCounterAdapter {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** The group name for this counter. */
-    private static final String GROUP_NAME = "SYSTEM";
-
-    /** The counter name for this counter. */
-    private static final String COUNTER_NAME = "PERFORMANCE";
-
-    /** Events collections. */
-    private Collection<T2<String,Long>> evts = new ArrayList<>();
-
-    /** Node id to insert into the event info. */
-    private UUID nodeId;
-
-    /** */
-    private int reducerNum;
-
-    /** */
-    private volatile Long firstShuffleMsg;
-
-    /** */
-    private volatile Long lastShuffleMsg;
-
-    /**
-     * Default constructor required by {@link Externalizable}.
-     */
-    public GridHadoopPerformanceCounter() {
-        // No-op.
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param grp Group name.
-     * @param name Counter name.
-     */
-    public GridHadoopPerformanceCounter(String grp, String name) {
-        super(grp, name);
-    }
-
-    /**
-     * Constructor to create instance to use this as helper.
-     *
-     * @param nodeId Id of the work node.
-     */
-    public GridHadoopPerformanceCounter(UUID nodeId) {
-        this.nodeId = nodeId;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void writeValue(ObjectOutput out) throws IOException {
-        U.writeCollection(out, evts);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void readValue(ObjectInput in) throws IOException {
-        try {
-            evts = U.readCollection(in);
-        }
-        catch (ClassNotFoundException e) {
-            throw new IOException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void merge(GridHadoopCounter cntr) {
-        evts.addAll(((GridHadoopPerformanceCounter)cntr).evts);
-    }
-
-    /**
-     * Gets the events collection.
-     *
-     * @return Collection of event.
-     */
-    public Collection<T2<String, Long>> evts() {
-        return evts;
-    }
-
-    /**
-     * Generate name that consists of some event information.
-     *
-     * @param info Task info.
-     * @param evtType The type of the event.
-     * @return String contains necessary event information.
-     */
-    private String eventName(GridHadoopTaskInfo info, String evtType) {
-        return eventName(info.type().toString(), info.taskNumber(), evtType);
-    }
-
-    /**
-     * Generate name that consists of some event information.
-     *
-     * @param taskType Task type.
-     * @param taskNum Number of the task.
-     * @param evtType The type of the event.
-     * @return String contains necessary event information.
-     */
-    private String eventName(String taskType, int taskNum, String evtType) {
-        assert nodeId != null;
-
-        return taskType + " " + taskNum + " " + evtType + " " + nodeId;
-    }
-
-    /**
-     * Adds event of the task submission (task instance creation).
-     *
-     * @param info Task info.
-     * @param ts Timestamp of the event.
-     */
-    public void onTaskSubmit(GridHadoopTaskInfo info, long ts) {
-        evts.add(new T2<>(eventName(info, "submit"), ts));
-    }
-
-    /**
-     * Adds event of the task preparation.
-     *
-     * @param info Task info.
-     * @param ts Timestamp of the event.
-     */
-    public void onTaskPrepare(GridHadoopTaskInfo info, long ts) {
-        evts.add(new T2<>(eventName(info, "prepare"), ts));
-    }
-
-    /**
-     * Adds event of the task finish.
-     *
-     * @param info Task info.
-     * @param ts Timestamp of the event.
-     */
-    public void onTaskFinish(GridHadoopTaskInfo info, long ts) {
-        if (info.type() == GridHadoopTaskType.REDUCE && lastShuffleMsg != null) {
-            evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "start"), firstShuffleMsg));
-            evts.add(new T2<>(eventName("SHUFFLE", reducerNum, "finish"), lastShuffleMsg));
-
-            lastShuffleMsg = null;
-        }
-
-        evts.add(new T2<>(eventName(info, "finish"), ts));
-    }
-
-    /**
-     * Adds event of the task run.
-     *
-     * @param info Task info.
-     * @param ts Timestamp of the event.
-     */
-    public void onTaskStart(GridHadoopTaskInfo info, long ts) {
-        evts.add(new T2<>(eventName(info, "start"), ts));
-    }
-
-    /**
-     * Adds event of the job preparation.
-     *
-     * @param ts Timestamp of the event.
-     */
-    public void onJobPrepare(long ts) {
-        assert nodeId != null;
-
-        evts.add(new T2<>("JOB prepare " + nodeId, ts));
-    }
-
-    /**
-     * Adds event of the job start.
-     *
-     * @param ts Timestamp of the event.
-     */
-    public void onJobStart(long ts) {
-        assert nodeId != null;
-
-        evts.add(new T2<>("JOB start " + nodeId, ts));
-    }
-
-    /**
-     * Adds client submission events from job info.
-     *
-     * @param info Job info.
-     */
-    public void clientSubmissionEvents(GridHadoopJobInfo info) {
-        assert nodeId != null;
-
-        addEventFromProperty("JOB requestId", info, REQ_NEW_JOBID_TS_PROPERTY);
-        addEventFromProperty("JOB responseId", info, RESPONSE_NEW_JOBID_TS_PROPERTY);
-        addEventFromProperty("JOB submit", info, JOB_SUBMISSION_START_TS_PROPERTY);
-    }
-
-    /**
-     * Adds event with timestamp from some property in job info.
-     *
-     * @param evt Event type and phase.
-     * @param info Job info.
-     * @param propName Property name to get timestamp.
-     */
-    private void addEventFromProperty(String evt, GridHadoopJobInfo info, String propName) {
-        String val = info.property(propName);
-
-        if (!F.isEmpty(val)) {
-            try {
-                evts.add(new T2<>(evt + " " + nodeId, Long.parseLong(val)));
-            }
-            catch (NumberFormatException e) {
-                throw new IllegalStateException("Invalid value '" + val + "' of property '" + propName + "'", e);
-            }
-        }
-    }
-
-    /**
-     * Registers shuffle message event.
-     *
-     * @param reducerNum Number of reducer that receives the data.
-     * @param ts Timestamp of the event.
-     */
-    public void onShuffleMessage(int reducerNum, long ts) {
-        this.reducerNum = reducerNum;
-
-        if (firstShuffleMsg == null)
-            firstShuffleMsg = ts;
-
-        lastShuffleMsg = ts;
-    }
-
-    /**
-     * Gets system predefined performance counter from the GridHadoopCounters object.
-     *
-     * @param cntrs GridHadoopCounters object.
-     * @param nodeId Node id for methods that adds events. It may be null if you don't use ones.
-     * @return Predefined performance counter.
-     */
-    public static GridHadoopPerformanceCounter getCounter(GridHadoopCounters cntrs, @Nullable UUID nodeId) {
-        GridHadoopPerformanceCounter cntr = cntrs.counter(GROUP_NAME, COUNTER_NAME, GridHadoopPerformanceCounter.class);
-
-        if (nodeId != null)
-            cntr.nodeId(nodeId);
-
-        return cntrs.counter(GROUP_NAME, COUNTER_NAME, GridHadoopPerformanceCounter.class);
-    }
-
-    /**
-     * Sets the nodeId field.
-     *
-     * @param nodeId Node id.
-     */
-    private void nodeId(UUID nodeId) {
-        this.nodeId = nodeId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java
new file mode 100644
index 0000000..c2ed5bb
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCounterAdapter.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.counter;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Default Hadoop counter implementation.
+ */
+public abstract class HadoopCounterAdapter implements HadoopCounter, Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Counter group name. */
+    private String grp;
+
+    /** Counter name. */
+    private String name;
+
+    /**
+     * Default constructor required by {@link Externalizable}.
+     */
+    protected HadoopCounterAdapter() {
+        // No-op.
+    }
+
+    /**
+     * Creates new counter with given group and name.
+     *
+     * @param grp Counter group name.
+     * @param name Counter name.
+     */
+    protected HadoopCounterAdapter(String grp, String name) {
+        assert grp != null : "counter must have group";
+        assert name != null : "counter must have name";
+
+        this.grp = grp;
+        this.name = name;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return name;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public String group() {
+        return grp;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeUTF(grp);
+        out.writeUTF(name);
+        writeValue(out);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        grp = in.readUTF();
+        name = in.readUTF();
+        readValue(in);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        HadoopCounterAdapter cntr = (HadoopCounterAdapter)o;
+
+        if (!grp.equals(cntr.grp))
+            return false;
+        if (!name.equals(cntr.name))
+            return false;
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = grp.hashCode();
+        res = 31 * res + name.hashCode();
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopCounterAdapter.class, this);
+    }
+
+    /**
+     * Writes value of this counter to output.
+     *
+     * @param out Output.
+     * @throws IOException If failed.
+     */
+    protected abstract void writeValue(ObjectOutput out) throws IOException;
+
+    /**
+     * Read value of this counter from input.
+     *
+     * @param in Input.
+     * @throws IOException If failed.
+     */
+    protected abstract void readValue(ObjectInput in) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java
new file mode 100644
index 0000000..78e1c26
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopCountersImpl.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.counter;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jdk8.backport.*;
+
+import java.io.*;
+import java.lang.reflect.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Default in-memory counters store.
+ */
+public class HadoopCountersImpl implements HadoopCounters, Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private final ConcurrentMap<CounterKey, HadoopCounter> cntrsMap = new ConcurrentHashMap8<>();
+
+    /**
+     * Default constructor. Creates new instance without counters.
+     */
+    public HadoopCountersImpl() {
+        // No-op.
+    }
+
+    /**
+     * Creates new instance that contain given counters.
+     *
+     * @param cntrs Counters to store.
+     */
+    public HadoopCountersImpl(Iterable<HadoopCounter> cntrs) {
+        addCounters(cntrs, true);
+    }
+
+    /**
+     * Copy constructor.
+     *
+     * @param cntrs Counters to copy.
+     */
+    public HadoopCountersImpl(HadoopCounters cntrs) {
+        this(cntrs.all());
+    }
+
+    /**
+     * Creates counter instance.
+     *
+     * @param cls Class of the counter.
+     * @param grp Group name.
+     * @param name Counter name.
+     * @return Counter.
+     */
+    private <T extends HadoopCounter> T createCounter(Class<? extends HadoopCounter> cls, String grp,
+        String name) {
+        try {
+            Constructor constructor = cls.getConstructor(String.class, String.class);
+
+            return (T)constructor.newInstance(grp, name);
+        }
+        catch (Exception e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * Adds counters collection in addition to existing counters.
+     *
+     * @param cntrs Counters to add.
+     * @param cp Whether to copy counters or not.
+     */
+    private void addCounters(Iterable<HadoopCounter> cntrs, boolean cp) {
+        assert cntrs != null;
+
+        for (HadoopCounter cntr : cntrs) {
+            if (cp) {
+                HadoopCounter cntrCp = createCounter(cntr.getClass(), cntr.group(), cntr.name());
+
+                cntrCp.merge(cntr);
+
+                cntr = cntrCp;
+            }
+
+            cntrsMap.put(new CounterKey(cntr.getClass(), cntr.group(), cntr.name()), cntr);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T extends HadoopCounter> T counter(String grp, String name, Class<T> cls) {
+        assert cls != null;
+
+        CounterKey mapKey = new CounterKey(cls, grp, name);
+
+        T cntr = (T)cntrsMap.get(mapKey);
+
+        if (cntr == null) {
+            cntr = createCounter(cls, grp, name);
+
+            T old = (T)cntrsMap.putIfAbsent(mapKey, cntr);
+
+            if (old != null)
+                return old;
+        }
+
+        return cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<HadoopCounter> all() {
+        return cntrsMap.values();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void merge(HadoopCounters other) {
+        for (HadoopCounter counter : other.all())
+            counter(counter.group(), counter.name(), counter.getClass()).merge(counter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeCollection(out, cntrsMap.values());
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        addCounters(U.<HadoopCounter>readCollection(in), false);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        HadoopCountersImpl counters = (HadoopCountersImpl)o;
+
+        return cntrsMap.equals(counters.cntrsMap);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return cntrsMap.hashCode();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopCountersImpl.class, this, "counters", cntrsMap.values());
+    }
+
+    /**
+     * The tuple of counter identifier components for more readable code.
+     */
+    private static class CounterKey extends GridTuple3<Class<? extends HadoopCounter>, String, String> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * Constructor.
+         *
+         * @param cls Class of the counter.
+         * @param grp Group name.
+         * @param name Counter name.
+         */
+        private CounterKey(Class<? extends HadoopCounter> cls, String grp, String name) {
+            super(cls, grp, name);
+        }
+
+        /**
+         * Empty constructor required by {@link Externalizable}.
+         */
+        public CounterKey() {
+            // No-op.
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java
new file mode 100644
index 0000000..ce86edb
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/counter/HadoopLongCounter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.counter;
+
+import java.io.*;
+
+/**
+ * Standard hadoop counter to use via original Hadoop API in Hadoop jobs.
+ */
+public class HadoopLongCounter extends HadoopCounterAdapter {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** The counter value. */
+    private long val;
+
+    /**
+     * Default constructor required by {@link Externalizable}.
+     */
+    public HadoopLongCounter() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param grp Group name.
+     * @param name Counter name.
+     */
+    public HadoopLongCounter(String grp, String name) {
+        super(grp, name);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeValue(ObjectOutput out) throws IOException {
+        out.writeLong(val);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void readValue(ObjectInput in) throws IOException {
+        val = in.readLong();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void merge(HadoopCounter cntr) {
+        val += ((HadoopLongCounter)cntr).val;
+    }
+
+    /**
+     * Gets current value of this counter.
+     *
+     * @return Current value.
+     */
+    public long value() {
+        return val;
+    }
+
+    /**
+     * Sets current value by the given value.
+     *
+     * @param val Value to set.
+     */
+    public void value(long val) {
+        this.val = val;
+    }
+
+    /**
+     * Increment this counter by the given value.
+     *
+     * @param i Value to increase this counter by.
+     */
+    public void increment(long i) {
+        val += i;
+    }
+}