You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/03/03 14:08:30 UTC

[14/31] incubator-ignite git commit: # IGNITE-386: WIP on internal namings (2).

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java
deleted file mode 100644
index e1bf9b6..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopFileSystemsUtils.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.protocol.*;
-import org.apache.ignite.hadoop.fs.v1.*;
-
-/**
- * Utilities for configuring file systems to support the separate working directory per each thread.
- */
-public class GridHadoopFileSystemsUtils {
-    /** Name of the property for setting working directory on create new local FS instance. */
-    public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".workDir";
-
-    /**
-     * Set user name and default working directory for current thread if it's supported by file system.
-     *
-     * @param fs File system.
-     * @param userName User name.
-     */
-    public static void setUser(FileSystem fs, String userName) {
-        if (fs instanceof IgniteHadoopFileSystem)
-            ((IgniteHadoopFileSystem)fs).setUser(userName);
-        else if (fs instanceof GridHadoopDistributedFileSystem)
-            ((GridHadoopDistributedFileSystem)fs).setUser(userName);
-    }
-
-    /**
-     * Setup wrappers of filesystems to support the separate working directory.
-     *
-     * @param cfg Config for setup.
-     */
-    public static void setupFileSystems(Configuration cfg) {
-        cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", GridHadoopLocalFileSystemV1.class.getName());
-        cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl",
-                GridHadoopLocalFileSystemV2.class.getName());
-
-        cfg.set("fs." + HdfsConstants.HDFS_URI_SCHEME + ".impl", GridHadoopDistributedFileSystem.class.getName());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV1.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV1.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV1.java
deleted file mode 100644
index 28834d4..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV1.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.fs.*;
-
-import java.io.*;
-
-/**
- * Local file system replacement for Hadoop jobs.
- */
-public class GridHadoopLocalFileSystemV1 extends LocalFileSystem {
-    /**
-     * Creates new local file system.
-     */
-    public GridHadoopLocalFileSystemV1() {
-        super(new GridHadoopRawLocalFileSystem());
-    }
-
-    /** {@inheritDoc} */
-    @Override public File pathToFile(Path path) {
-        return ((GridHadoopRawLocalFileSystem)getRaw()).convert(path);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV2.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV2.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV2.java
deleted file mode 100644
index 62d7cea..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopLocalFileSystemV2.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.local.*;
-
-import java.io.*;
-import java.net.*;
-
-import static org.apache.hadoop.fs.FsConstants.*;
-
-/**
- * Local file system replacement for Hadoop jobs.
- */
-public class GridHadoopLocalFileSystemV2 extends ChecksumFs {
-    /**
-     * Creates new local file system.
-     *
-     * @param cfg Configuration.
-     * @throws IOException If failed.
-     * @throws URISyntaxException If failed.
-     */
-    public GridHadoopLocalFileSystemV2(Configuration cfg) throws IOException, URISyntaxException {
-        super(new DelegateFS(cfg));
-    }
-
-    /**
-     * Creates new local file system.
-     *
-     * @param uri URI.
-     * @param cfg Configuration.
-     * @throws IOException If failed.
-     * @throws URISyntaxException If failed.
-     */
-    public GridHadoopLocalFileSystemV2(URI uri, Configuration cfg) throws IOException, URISyntaxException {
-        this(cfg);
-    }
-
-    /**
-     * Delegate file system.
-     */
-    private static class DelegateFS extends DelegateToFileSystem {
-        /**
-         * Creates new local file system.
-         *
-         * @param cfg Configuration.
-         * @throws IOException If failed.
-         * @throws URISyntaxException If failed.
-         */
-        public DelegateFS(Configuration cfg) throws IOException, URISyntaxException {
-            super(LOCAL_FS_URI, new GridHadoopRawLocalFileSystem(), cfg, LOCAL_FS_URI.getScheme(), false);
-        }
-
-        /** {@inheritDoc} */
-        @Override public int getUriDefaultPort() {
-            return -1;
-        }
-
-        /** {@inheritDoc} */
-        @Override public FsServerDefaults getServerDefaults() throws IOException {
-            return LocalConfigKeys.getServerDefaults();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean isValidName(String src) {
-            return true;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopRawLocalFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopRawLocalFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopRawLocalFileSystem.java
deleted file mode 100644
index 29645f8..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/GridHadoopRawLocalFileSystem.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.fs;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.util.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-import java.net.*;
-import java.nio.file.*;
-
-/**
- * Local file system implementation for Hadoop.
- */
-public class GridHadoopRawLocalFileSystem extends FileSystem {
-    /** Working directory for each thread. */
-    private final ThreadLocal<Path> workDir = new ThreadLocal<Path>() {
-        @Override protected Path initialValue() {
-            return getInitialWorkingDirectory();
-        }
-    };
-
-    /**
-     * Converts Hadoop path to local path.
-     *
-     * @param path Hadoop path.
-     * @return Local path.
-     */
-    File convert(Path path) {
-        checkPath(path);
-
-        if (path.isAbsolute())
-            return new File(path.toUri().getPath());
-
-        return new File(getWorkingDirectory().toUri().getPath(), path.toUri().getPath());
-    }
-
-    /** {@inheritDoc} */
-    @Override public Path getHomeDirectory() {
-        return makeQualified(new Path(System.getProperty("user.home")));
-    }
-
-    /** {@inheritDoc} */
-    @Override public Path getInitialWorkingDirectory() {
-        File f = new File(System.getProperty("user.dir"));
-
-        return new Path(f.getAbsoluteFile().toURI()).makeQualified(getUri(), null);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void initialize(URI uri, Configuration conf) throws IOException {
-        super.initialize(uri, conf);
-
-        setConf(conf);
-
-        String initWorkDir = conf.get(GridHadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP);
-
-        if (initWorkDir != null)
-            setWorkingDirectory(new Path(initWorkDir));
-    }
-
-    /** {@inheritDoc} */
-    @Override public URI getUri() {
-        return FsConstants.LOCAL_FS_URI;
-    }
-
-    /** {@inheritDoc} */
-    @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException {
-        return new FSDataInputStream(new InStream(checkExists(convert(f))));
-    }
-
-    /** {@inheritDoc} */
-    @Override public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufSize,
-        short replication, long blockSize, Progressable progress) throws IOException {
-        File file = convert(f);
-
-        if (!overwrite && !file.createNewFile())
-            throw new IOException("Failed to create new file: " + f.toUri());
-
-        return out(file, false, bufSize);
-    }
-
-    /**
-     * @param file File.
-     * @param append Append flag.
-     * @return Output stream.
-     * @throws IOException If failed.
-     */
-    private FSDataOutputStream out(File file, boolean append, int bufSize) throws IOException {
-        return new FSDataOutputStream(new BufferedOutputStream(new FileOutputStream(file, append),
-            bufSize < 32 * 1024 ? 32 * 1024 : bufSize), new Statistics(getUri().getScheme()));
-    }
-
-    /** {@inheritDoc} */
-    @Override public FSDataOutputStream append(Path f, int bufSize, Progressable progress) throws IOException {
-        return out(convert(f), true, bufSize);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean rename(Path src, Path dst) throws IOException {
-        return convert(src).renameTo(convert(dst));
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean delete(Path f, boolean recursive) throws IOException {
-        File file = convert(f);
-
-        if (file.isDirectory() && !recursive)
-            throw new IOException("Failed to remove directory in non recursive mode: " + f.toUri());
-
-        return U.delete(file);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void setWorkingDirectory(Path dir) {
-        workDir.set(fixRelativePart(dir));
-
-        checkPath(dir);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Path getWorkingDirectory() {
-        return workDir.get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean mkdirs(Path f, FsPermission permission) throws IOException {
-        if(f == null)
-            throw new IllegalArgumentException("mkdirs path arg is null");
-
-        Path parent = f.getParent();
-
-        File p2f = convert(f);
-
-        if(parent != null) {
-            File parent2f = convert(parent);
-
-            if(parent2f != null && parent2f.exists() && !parent2f.isDirectory())
-                throw new FileAlreadyExistsException("Parent path is not a directory: " + parent);
-
-        }
-
-        return (parent == null || mkdirs(parent)) && (p2f.mkdir() || p2f.isDirectory());
-    }
-
-    /** {@inheritDoc} */
-    @Override public FileStatus getFileStatus(Path f) throws IOException {
-        return fileStatus(checkExists(convert(f)));
-    }
-
-    /**
-     * @return File status.
-     */
-    private FileStatus fileStatus(File file) throws IOException {
-        boolean dir = file.isDirectory();
-
-        java.nio.file.Path path = dir ? null : file.toPath();
-
-        return new FileStatus(dir ? 0 : file.length(), dir, 1, 4 * 1024, file.lastModified(), file.lastModified(),
-            /*permission*/null, /*owner*/null, /*group*/null, dir ? null : Files.isSymbolicLink(path) ?
-            new Path(Files.readSymbolicLink(path).toUri()) : null, new Path(file.toURI()));
-    }
-
-    /**
-     * @param file File.
-     * @return Same file.
-     * @throws FileNotFoundException If does not exist.
-     */
-    private static File checkExists(File file) throws FileNotFoundException {
-        if (!file.exists())
-            throw new FileNotFoundException("File " + file.getAbsolutePath() + " does not exist.");
-
-        return file;
-    }
-
-    /** {@inheritDoc} */
-    @Override public FileStatus[] listStatus(Path f) throws IOException {
-        File file = convert(f);
-
-        if (checkExists(file).isFile())
-            return new FileStatus[] {fileStatus(file)};
-
-        File[] files = file.listFiles();
-
-        FileStatus[] res = new FileStatus[files.length];
-
-        for (int i = 0; i < res.length; i++)
-            res[i] = fileStatus(files[i]);
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean supportsSymlinks() {
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void createSymlink(Path target, Path link, boolean createParent) throws IOException {
-        Files.createSymbolicLink(convert(link).toPath(), convert(target).toPath());
-    }
-
-    /** {@inheritDoc} */
-    @Override public FileStatus getFileLinkStatus(Path f) throws IOException {
-        return getFileStatus(getLinkTarget(f));
-    }
-
-    /** {@inheritDoc} */
-    @Override public Path getLinkTarget(Path f) throws IOException {
-        File file = Files.readSymbolicLink(convert(f).toPath()).toFile();
-
-        return new Path(file.toURI());
-    }
-
-    /**
-     * Input stream.
-     */
-    private static class InStream extends InputStream implements Seekable, PositionedReadable {
-        /** */
-        private final RandomAccessFile file;
-
-        /**
-         * @param f File.
-         * @throws IOException If failed.
-         */
-        public InStream(File f) throws IOException {
-            file = new RandomAccessFile(f, "r");
-        }
-
-        /** {@inheritDoc} */
-        @Override public synchronized int read() throws IOException {
-            return file.read();
-        }
-
-        /** {@inheritDoc} */
-        @Override public synchronized int read(byte[] b, int off, int len) throws IOException {
-            return file.read(b, off, len);
-        }
-
-        /** {@inheritDoc} */
-        @Override public synchronized void close() throws IOException {
-            file.close();
-        }
-
-        /** {@inheritDoc} */
-        @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException {
-            long pos0 = file.getFilePointer();
-
-            file.seek(pos);
-            int res = file.read(buf, off, len);
-
-            file.seek(pos0);
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readFully(long pos, byte[] buf, int off, int len) throws IOException {
-            if (read(pos, buf, off, len) != len)
-                throw new IOException();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readFully(long pos, byte[] buf) throws IOException {
-            readFully(pos, buf, 0, buf.length);
-        }
-
-        /** {@inheritDoc} */
-        @Override public synchronized void seek(long pos) throws IOException {
-            file.seek(pos);
-        }
-
-        /** {@inheritDoc} */
-        @Override public synchronized long getPos() throws IOException {
-            return file.getFilePointer();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean seekToNewSource(long targetPos) throws IOException {
-            return false;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
new file mode 100644
index 0000000..88c5899
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopDistributedFileSystem.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hdfs.*;
+import org.apache.hadoop.mapreduce.*;
+
+import java.io.*;
+import java.net.*;
+
+import static org.apache.ignite.configuration.IgfsConfiguration.*;
+
+/**
+ * Wrapper of HDFS for support of separated working directory.
+ */
+public class HadoopDistributedFileSystem extends DistributedFileSystem {
+    /** User name for each thread. */
+    private final ThreadLocal<String> userName = new ThreadLocal<String>() {
+        /** {@inheritDoc} */
+        @Override protected String initialValue() {
+            return DFLT_USER_NAME;
+        }
+    };
+
+    /** Working directory for each thread. */
+    private final ThreadLocal<Path> workingDir = new ThreadLocal<Path>() {
+        /** {@inheritDoc} */
+        @Override protected Path initialValue() {
+            return getHomeDirectory();
+        }
+    };
+
+    /** {@inheritDoc} */
+    @Override public void initialize(URI uri, Configuration conf) throws IOException {
+        super.initialize(uri, conf);
+
+        setUser(conf.get(MRJobConfig.USER_NAME, DFLT_USER_NAME));
+    }
+
+    /**
+     * Set user name and default working directory for current thread.
+     *
+     * @param userName User name.
+     */
+    public void setUser(String userName) {
+        this.userName.set(userName);
+
+        setWorkingDirectory(getHomeDirectory());
+    }
+
+    /** {@inheritDoc} */
+    @Override public Path getHomeDirectory() {
+        Path path = new Path("/user/" + userName.get());
+
+        return path.makeQualified(getUri(), null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setWorkingDirectory(Path dir) {
+        Path fixedDir = fixRelativePart(dir);
+
+        String res = fixedDir.toUri().getPath();
+
+        if (!DFSUtil.isValidName(res))
+            throw new IllegalArgumentException("Invalid DFS directory name " + res);
+
+        workingDir.set(fixedDir);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Path getWorkingDirectory() {
+        return workingDir.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
new file mode 100644
index 0000000..f3f51d4
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemsUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.ignite.hadoop.fs.v1.*;
+
+/**
+ * Utilities for configuring file systems to support the separate working directory per each thread.
+ */
+public class HadoopFileSystemsUtils {
+    /** Name of the property for setting working directory on create new local FS instance. */
+    public static final String LOC_FS_WORK_DIR_PROP = "fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".workDir";
+
+    /**
+     * Set user name and default working directory for current thread if it's supported by file system.
+     *
+     * @param fs File system.
+     * @param userName User name.
+     */
+    public static void setUser(FileSystem fs, String userName) {
+        if (fs instanceof IgniteHadoopFileSystem)
+            ((IgniteHadoopFileSystem)fs).setUser(userName);
+        else if (fs instanceof HadoopDistributedFileSystem)
+            ((HadoopDistributedFileSystem)fs).setUser(userName);
+    }
+
+    /**
+     * Setup wrappers of filesystems to support the separate working directory.
+     *
+     * @param cfg Config for setup.
+     */
+    public static void setupFileSystems(Configuration cfg) {
+        cfg.set("fs." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl", HadoopLocalFileSystemV1.class.getName());
+        cfg.set("fs.AbstractFileSystem." + FsConstants.LOCAL_FS_URI.getScheme() + ".impl",
+                HadoopLocalFileSystemV2.class.getName());
+
+        cfg.set("fs." + HdfsConstants.HDFS_URI_SCHEME + ".impl", HadoopDistributedFileSystem.class.getName());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java
new file mode 100644
index 0000000..9cc5881
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV1.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.hadoop.fs.*;
+
+import java.io.*;
+
+/**
+ * Local file system replacement for Hadoop jobs.
+ */
+public class HadoopLocalFileSystemV1 extends LocalFileSystem {
+    /**
+     * Creates new local file system.
+     */
+    public HadoopLocalFileSystemV1() {
+        super(new HadoopRawLocalFileSystem());
+    }
+
+    /** {@inheritDoc} */
+    @Override public File pathToFile(Path path) {
+        return ((HadoopRawLocalFileSystem)getRaw()).convert(path);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java
new file mode 100644
index 0000000..15ddc5a
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopLocalFileSystemV2.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.local.*;
+
+import java.io.*;
+import java.net.*;
+
+import static org.apache.hadoop.fs.FsConstants.*;
+
+/**
+ * Local file system replacement for Hadoop jobs.
+ */
+public class HadoopLocalFileSystemV2 extends ChecksumFs {
+    /**
+     * Creates new local file system.
+     *
+     * @param cfg Configuration.
+     * @throws IOException If failed.
+     * @throws URISyntaxException If failed.
+     */
+    public HadoopLocalFileSystemV2(Configuration cfg) throws IOException, URISyntaxException {
+        super(new DelegateFS(cfg));
+    }
+
+    /**
+     * Creates new local file system.
+     *
+     * @param uri URI.
+     * @param cfg Configuration.
+     * @throws IOException If failed.
+     * @throws URISyntaxException If failed.
+     */
+    public HadoopLocalFileSystemV2(URI uri, Configuration cfg) throws IOException, URISyntaxException {
+        this(cfg);
+    }
+
+    /**
+     * Delegate file system.
+     */
+    private static class DelegateFS extends DelegateToFileSystem {
+        /**
+         * Creates new local file system.
+         *
+         * @param cfg Configuration.
+         * @throws IOException If failed.
+         * @throws URISyntaxException If failed.
+         */
+        public DelegateFS(Configuration cfg) throws IOException, URISyntaxException {
+            super(LOCAL_FS_URI, new HadoopRawLocalFileSystem(), cfg, LOCAL_FS_URI.getScheme(), false);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int getUriDefaultPort() {
+            return -1;
+        }
+
+        /** {@inheritDoc} */
+        @Override public FsServerDefaults getServerDefaults() throws IOException {
+            return LocalConfigKeys.getServerDefaults();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isValidName(String src) {
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java
new file mode 100644
index 0000000..e5ec3f7
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopRawLocalFileSystem.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.fs;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.util.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.net.*;
+import java.nio.file.*;
+
+/**
+ * Local file system implementation for Hadoop.
+ */
+public class HadoopRawLocalFileSystem extends FileSystem {
+    /** Working directory for each thread. */
+    private final ThreadLocal<Path> workDir = new ThreadLocal<Path>() {
+        @Override protected Path initialValue() {
+            return getInitialWorkingDirectory();
+        }
+    };
+
+    /**
+     * Converts Hadoop path to local path.
+     *
+     * @param path Hadoop path.
+     * @return Local path.
+     */
+    File convert(Path path) {
+        checkPath(path);
+
+        if (path.isAbsolute())
+            return new File(path.toUri().getPath());
+
+        return new File(getWorkingDirectory().toUri().getPath(), path.toUri().getPath());
+    }
+
+    /** {@inheritDoc} */
+    @Override public Path getHomeDirectory() {
+        return makeQualified(new Path(System.getProperty("user.home")));
+    }
+
+    /** {@inheritDoc} */
+    @Override public Path getInitialWorkingDirectory() {
+        File f = new File(System.getProperty("user.dir"));
+
+        return new Path(f.getAbsoluteFile().toURI()).makeQualified(getUri(), null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initialize(URI uri, Configuration conf) throws IOException {
+        super.initialize(uri, conf);
+
+        setConf(conf);
+
+        String initWorkDir = conf.get(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP);
+
+        if (initWorkDir != null)
+            setWorkingDirectory(new Path(initWorkDir));
+    }
+
+    /** {@inheritDoc} */
+    @Override public URI getUri() {
+        return FsConstants.LOCAL_FS_URI;
+    }
+
+    /** {@inheritDoc} */
+    @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+        return new FSDataInputStream(new InStream(checkExists(convert(f))));
+    }
+
+    /** {@inheritDoc} */
+    @Override public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufSize,
+        short replication, long blockSize, Progressable progress) throws IOException {
+        File file = convert(f);
+
+        if (!overwrite && !file.createNewFile())
+            throw new IOException("Failed to create new file: " + f.toUri());
+
+        return out(file, false, bufSize);
+    }
+
+    /**
+     * @param file File.
+     * @param append Append flag.
+     * @return Output stream.
+     * @throws IOException If failed.
+     */
+    private FSDataOutputStream out(File file, boolean append, int bufSize) throws IOException {
+        return new FSDataOutputStream(new BufferedOutputStream(new FileOutputStream(file, append),
+            bufSize < 32 * 1024 ? 32 * 1024 : bufSize), new Statistics(getUri().getScheme()));
+    }
+
+    /** {@inheritDoc} */
+    @Override public FSDataOutputStream append(Path f, int bufSize, Progressable progress) throws IOException {
+        return out(convert(f), true, bufSize);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean rename(Path src, Path dst) throws IOException {
+        return convert(src).renameTo(convert(dst));
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean delete(Path f, boolean recursive) throws IOException {
+        File file = convert(f);
+
+        if (file.isDirectory() && !recursive)
+            throw new IOException("Failed to remove directory in non recursive mode: " + f.toUri());
+
+        return U.delete(file);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setWorkingDirectory(Path dir) {
+        workDir.set(fixRelativePart(dir));
+
+        checkPath(dir);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Path getWorkingDirectory() {
+        return workDir.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+        if(f == null)
+            throw new IllegalArgumentException("mkdirs path arg is null");
+
+        Path parent = f.getParent();
+
+        File p2f = convert(f);
+
+        if(parent != null) {
+            File parent2f = convert(parent);
+
+            if(parent2f != null && parent2f.exists() && !parent2f.isDirectory())
+                throw new FileAlreadyExistsException("Parent path is not a directory: " + parent);
+
+        }
+
+        return (parent == null || mkdirs(parent)) && (p2f.mkdir() || p2f.isDirectory());
+    }
+
+    /** {@inheritDoc} */
+    @Override public FileStatus getFileStatus(Path f) throws IOException {
+        return fileStatus(checkExists(convert(f)));
+    }
+
+    /**
+     * @return File status.
+     */
+    private FileStatus fileStatus(File file) throws IOException {
+        boolean dir = file.isDirectory();
+
+        java.nio.file.Path path = dir ? null : file.toPath();
+
+        return new FileStatus(dir ? 0 : file.length(), dir, 1, 4 * 1024, file.lastModified(), file.lastModified(),
+            /*permission*/null, /*owner*/null, /*group*/null, dir ? null : Files.isSymbolicLink(path) ?
+            new Path(Files.readSymbolicLink(path).toUri()) : null, new Path(file.toURI()));
+    }
+
+    /**
+     * @param file File.
+     * @return Same file.
+     * @throws FileNotFoundException If does not exist.
+     */
+    private static File checkExists(File file) throws FileNotFoundException {
+        if (!file.exists())
+            throw new FileNotFoundException("File " + file.getAbsolutePath() + " does not exist.");
+
+        return file;
+    }
+
+    /** {@inheritDoc} */
+    @Override public FileStatus[] listStatus(Path f) throws IOException {
+        File file = convert(f);
+
+        if (checkExists(file).isFile())
+            return new FileStatus[] {fileStatus(file)};
+
+        File[] files = file.listFiles();
+
+        FileStatus[] res = new FileStatus[files.length];
+
+        for (int i = 0; i < res.length; i++)
+            res[i] = fileStatus(files[i]);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean supportsSymlinks() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void createSymlink(Path target, Path link, boolean createParent) throws IOException {
+        Files.createSymbolicLink(convert(link).toPath(), convert(target).toPath());
+    }
+
+    /** {@inheritDoc} */
+    @Override public FileStatus getFileLinkStatus(Path f) throws IOException {
+        return getFileStatus(getLinkTarget(f));
+    }
+
+    /** {@inheritDoc} */
+    @Override public Path getLinkTarget(Path f) throws IOException {
+        File file = Files.readSymbolicLink(convert(f).toPath()).toFile();
+
+        return new Path(file.toURI());
+    }
+
+    /**
+     * Input stream.
+     */
+    private static class InStream extends InputStream implements Seekable, PositionedReadable {
+        /** */
+        private final RandomAccessFile file;
+
+        /**
+         * @param f File.
+         * @throws IOException If failed.
+         */
+        public InStream(File f) throws IOException {
+            file = new RandomAccessFile(f, "r");
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized int read() throws IOException {
+            return file.read();
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized int read(byte[] b, int off, int len) throws IOException {
+            return file.read(b, off, len);
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized void close() throws IOException {
+            file.close();
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException {
+            long pos0 = file.getFilePointer();
+
+            file.seek(pos);
+            int res = file.read(buf, off, len);
+
+            file.seek(pos0);
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readFully(long pos, byte[] buf, int off, int len) throws IOException {
+            if (read(pos, buf, off, len) != len)
+                throw new IOException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readFully(long pos, byte[] buf) throws IOException {
+            readFully(pos, buf, 0, buf.length);
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized void seek(long pos) throws IOException {
+            file.seek(pos);
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized long getPos() throws IOException {
+            return file.getFilePointer();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean seekToNewSource(long targetPos) throws IOException {
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobMetadata.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobMetadata.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobMetadata.java
deleted file mode 100644
index b124312..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/GridHadoopJobMetadata.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.jobtracker;
-
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.counter.*;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-import java.util.*;
-
-import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobPhase.*;
-
-/**
- * Hadoop job metadata. Internal object used for distributed job state tracking.
- */
-public class GridHadoopJobMetadata implements Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Job ID. */
-    private GridHadoopJobId jobId;
-
-    /** Job info. */
-    private GridHadoopJobInfo jobInfo;
-
-    /** Node submitted job. */
-    private UUID submitNodeId;
-
-    /** Map-reduce plan. */
-    private GridHadoopMapReducePlan mrPlan;
-
-    /** Pending splits for which mapper should be executed. */
-    private Map<GridHadoopInputSplit, Integer> pendingSplits;
-
-    /** Pending reducers. */
-    private Collection<Integer> pendingReducers;
-
-    /** Reducers addresses. */
-    @GridToStringInclude
-    private Map<Integer, GridHadoopProcessDescriptor> reducersAddrs;
-
-    /** Job phase. */
-    private GridHadoopJobPhase phase = PHASE_SETUP;
-
-    /** Fail cause. */
-    @GridToStringExclude
-    private Throwable failCause;
-
-    /** Version. */
-    private long ver;
-
-    /** Job counters */
-    private GridHadoopCounters counters = new GridHadoopCountersImpl();
-
-    /**
-     * Empty constructor required by {@link Externalizable}.
-     */
-    public GridHadoopJobMetadata() {
-        // No-op.
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param submitNodeId Submit node ID.
-     * @param jobId Job ID.
-     * @param jobInfo Job info.
-     */
-    public GridHadoopJobMetadata(UUID submitNodeId, GridHadoopJobId jobId, GridHadoopJobInfo jobInfo) {
-        this.jobId = jobId;
-        this.jobInfo = jobInfo;
-        this.submitNodeId = submitNodeId;
-    }
-
-    /**
-     * Copy constructor.
-     *
-     * @param src Metadata to copy.
-     */
-    public GridHadoopJobMetadata(GridHadoopJobMetadata src) {
-        // Make sure to preserve alphabetic order.
-        counters = src.counters;
-        failCause = src.failCause;
-        jobId = src.jobId;
-        jobInfo = src.jobInfo;
-        mrPlan = src.mrPlan;
-        pendingSplits = src.pendingSplits;
-        pendingReducers = src.pendingReducers;
-        phase = src.phase;
-        reducersAddrs = src.reducersAddrs;
-        submitNodeId = src.submitNodeId;
-        ver = src.ver + 1;
-    }
-
-    /**
-     * @return Submit node ID.
-     */
-    public UUID submitNodeId() {
-        return submitNodeId;
-    }
-
-    /**
-     * @param phase Job phase.
-     */
-    public void phase(GridHadoopJobPhase phase) {
-        this.phase = phase;
-    }
-
-    /**
-     * @return Job phase.
-     */
-    public GridHadoopJobPhase phase() {
-        return phase;
-    }
-
-    /**
-     * Gets reducers addresses for external execution.
-     *
-     * @return Reducers addresses.
-     */
-    public Map<Integer, GridHadoopProcessDescriptor> reducersAddresses() {
-        return reducersAddrs;
-    }
-
-    /**
-     * Sets reducers addresses for external execution.
-     *
-     * @param reducersAddrs Map of addresses.
-     */
-    public void reducersAddresses(Map<Integer, GridHadoopProcessDescriptor> reducersAddrs) {
-        this.reducersAddrs = reducersAddrs;
-    }
-
-    /**
-     * Sets collection of pending splits.
-     *
-     * @param pendingSplits Collection of pending splits.
-     */
-    public void pendingSplits(Map<GridHadoopInputSplit, Integer> pendingSplits) {
-        this.pendingSplits = pendingSplits;
-    }
-
-    /**
-     * Gets collection of pending splits.
-     *
-     * @return Collection of pending splits.
-     */
-    public Map<GridHadoopInputSplit, Integer> pendingSplits() {
-        return pendingSplits;
-    }
-
-    /**
-     * Sets collection of pending reducers.
-     *
-     * @param pendingReducers Collection of pending reducers.
-     */
-    public void pendingReducers(Collection<Integer> pendingReducers) {
-        this.pendingReducers = pendingReducers;
-    }
-
-    /**
-     * Gets collection of pending reducers.
-     *
-     * @return Collection of pending reducers.
-     */
-    public Collection<Integer> pendingReducers() {
-        return pendingReducers;
-    }
-
-    /**
-     * @return Job ID.
-     */
-    public GridHadoopJobId jobId() {
-        return jobId;
-    }
-
-    /**
-     * @param mrPlan Map-reduce plan.
-     */
-    public void mapReducePlan(GridHadoopMapReducePlan mrPlan) {
-        assert this.mrPlan == null : "Map-reduce plan can only be initialized once.";
-
-        this.mrPlan = mrPlan;
-    }
-
-    /**
-     * @return Map-reduce plan.
-     */
-    public GridHadoopMapReducePlan mapReducePlan() {
-        return mrPlan;
-    }
-
-    /**
-     * @return Job info.
-     */
-    public GridHadoopJobInfo jobInfo() {
-        return jobInfo;
-    }
-
-    /**
-     * Returns job counters.
-     *
-     * @return Collection of counters.
-     */
-    public GridHadoopCounters counters() {
-        return counters;
-    }
-
-    /**
-     * Sets counters.
-     *
-     * @param counters Collection of counters.
-     */
-    public void counters(GridHadoopCounters counters) {
-        this.counters = counters;
-    }
-
-    /**
-     * @param failCause Fail cause.
-     */
-    public void failCause(Throwable failCause) {
-        assert failCause != null;
-
-        if (this.failCause == null) // Keep the first error.
-            this.failCause = failCause;
-    }
-
-    /**
-     * @return Fail cause.
-     */
-    public Throwable failCause() {
-        return failCause;
-    }
-
-    /**
-     * @return Version.
-     */
-    public long version() {
-        return ver;
-    }
-
-    /**
-     * @param split Split.
-     * @return Task number.
-     */
-    public int taskNumber(GridHadoopInputSplit split) {
-        return pendingSplits.get(split);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        U.writeUuid(out, submitNodeId);
-        out.writeObject(jobId);
-        out.writeObject(jobInfo);
-        out.writeObject(mrPlan);
-        out.writeObject(pendingSplits);
-        out.writeObject(pendingReducers);
-        out.writeObject(phase);
-        out.writeObject(failCause);
-        out.writeLong(ver);
-        out.writeObject(reducersAddrs);
-        out.writeObject(counters);
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        submitNodeId = U.readUuid(in);
-        jobId = (GridHadoopJobId)in.readObject();
-        jobInfo = (GridHadoopJobInfo)in.readObject();
-        mrPlan = (GridHadoopMapReducePlan)in.readObject();
-        pendingSplits = (Map<GridHadoopInputSplit,Integer>)in.readObject();
-        pendingReducers = (Collection<Integer>)in.readObject();
-        phase = (GridHadoopJobPhase)in.readObject();
-        failCause = (Throwable)in.readObject();
-        ver = in.readLong();
-        reducersAddrs = (Map<Integer, GridHadoopProcessDescriptor>)in.readObject();
-        counters = (GridHadoopCounters)in.readObject();
-    }
-
-    /** {@inheritDoc} */
-    public String toString() {
-        return S.toString(GridHadoopJobMetadata.class, this, "pendingMaps", pendingSplits.size(),
-            "pendingReduces", pendingReducers.size(), "failCause", failCause == null ? null :
-                failCause.getClass().getName());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
new file mode 100644
index 0000000..6042775
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.jobtracker;
+
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.counter.*;
+import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.*;
+
+import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobPhase.*;
+
+/**
+ * Hadoop job metadata. Internal object used for distributed job state tracking.
+ */
+public class HadoopJobMetadata implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Job ID. */
+    private GridHadoopJobId jobId;
+
+    /** Job info. */
+    private GridHadoopJobInfo jobInfo;
+
+    /** Node submitted job. */
+    private UUID submitNodeId;
+
+    /** Map-reduce plan. */
+    private GridHadoopMapReducePlan mrPlan;
+
+    /** Pending splits for which mapper should be executed. */
+    private Map<GridHadoopInputSplit, Integer> pendingSplits;
+
+    /** Pending reducers. */
+    private Collection<Integer> pendingReducers;
+
+    /** Reducers addresses. */
+    @GridToStringInclude
+    private Map<Integer, HadoopProcessDescriptor> reducersAddrs;
+
+    /** Job phase. */
+    private GridHadoopJobPhase phase = PHASE_SETUP;
+
+    /** Fail cause. */
+    @GridToStringExclude
+    private Throwable failCause;
+
+    /** Version. */
+    private long ver;
+
+    /** Job counters */
+    private GridHadoopCounters counters = new HadoopCountersImpl();
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public HadoopJobMetadata() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param submitNodeId Submit node ID.
+     * @param jobId Job ID.
+     * @param jobInfo Job info.
+     */
+    public HadoopJobMetadata(UUID submitNodeId, GridHadoopJobId jobId, GridHadoopJobInfo jobInfo) {
+        this.jobId = jobId;
+        this.jobInfo = jobInfo;
+        this.submitNodeId = submitNodeId;
+    }
+
+    /**
+     * Copy constructor.
+     *
+     * @param src Metadata to copy.
+     */
+    public HadoopJobMetadata(HadoopJobMetadata src) {
+        // Make sure to preserve alphabetic order.
+        counters = src.counters;
+        failCause = src.failCause;
+        jobId = src.jobId;
+        jobInfo = src.jobInfo;
+        mrPlan = src.mrPlan;
+        pendingSplits = src.pendingSplits;
+        pendingReducers = src.pendingReducers;
+        phase = src.phase;
+        reducersAddrs = src.reducersAddrs;
+        submitNodeId = src.submitNodeId;
+        ver = src.ver + 1;
+    }
+
+    /**
+     * @return Submit node ID.
+     */
+    public UUID submitNodeId() {
+        return submitNodeId;
+    }
+
+    /**
+     * @param phase Job phase.
+     */
+    public void phase(GridHadoopJobPhase phase) {
+        this.phase = phase;
+    }
+
+    /**
+     * @return Job phase.
+     */
+    public GridHadoopJobPhase phase() {
+        return phase;
+    }
+
+    /**
+     * Gets reducers addresses for external execution.
+     *
+     * @return Reducers addresses.
+     */
+    public Map<Integer, HadoopProcessDescriptor> reducersAddresses() {
+        return reducersAddrs;
+    }
+
+    /**
+     * Sets reducers addresses for external execution.
+     *
+     * @param reducersAddrs Map of addresses.
+     */
+    public void reducersAddresses(Map<Integer, HadoopProcessDescriptor> reducersAddrs) {
+        this.reducersAddrs = reducersAddrs;
+    }
+
+    /**
+     * Sets collection of pending splits.
+     *
+     * @param pendingSplits Collection of pending splits.
+     */
+    public void pendingSplits(Map<GridHadoopInputSplit, Integer> pendingSplits) {
+        this.pendingSplits = pendingSplits;
+    }
+
+    /**
+     * Gets collection of pending splits.
+     *
+     * @return Collection of pending splits.
+     */
+    public Map<GridHadoopInputSplit, Integer> pendingSplits() {
+        return pendingSplits;
+    }
+
+    /**
+     * Sets collection of pending reducers.
+     *
+     * @param pendingReducers Collection of pending reducers.
+     */
+    public void pendingReducers(Collection<Integer> pendingReducers) {
+        this.pendingReducers = pendingReducers;
+    }
+
+    /**
+     * Gets collection of pending reducers.
+     *
+     * @return Collection of pending reducers.
+     */
+    public Collection<Integer> pendingReducers() {
+        return pendingReducers;
+    }
+
+    /**
+     * @return Job ID.
+     */
+    public GridHadoopJobId jobId() {
+        return jobId;
+    }
+
+    /**
+     * @param mrPlan Map-reduce plan.
+     */
+    public void mapReducePlan(GridHadoopMapReducePlan mrPlan) {
+        assert this.mrPlan == null : "Map-reduce plan can only be initialized once.";
+
+        this.mrPlan = mrPlan;
+    }
+
+    /**
+     * @return Map-reduce plan.
+     */
+    public GridHadoopMapReducePlan mapReducePlan() {
+        return mrPlan;
+    }
+
+    /**
+     * @return Job info.
+     */
+    public GridHadoopJobInfo jobInfo() {
+        return jobInfo;
+    }
+
+    /**
+     * Returns job counters.
+     *
+     * @return Collection of counters.
+     */
+    public GridHadoopCounters counters() {
+        return counters;
+    }
+
+    /**
+     * Sets counters.
+     *
+     * @param counters Collection of counters.
+     */
+    public void counters(GridHadoopCounters counters) {
+        this.counters = counters;
+    }
+
+    /**
+     * @param failCause Fail cause.
+     */
+    public void failCause(Throwable failCause) {
+        assert failCause != null;
+
+        if (this.failCause == null) // Keep the first error.
+            this.failCause = failCause;
+    }
+
+    /**
+     * @return Fail cause.
+     */
+    public Throwable failCause() {
+        return failCause;
+    }
+
+    /**
+     * @return Version.
+     */
+    public long version() {
+        return ver;
+    }
+
+    /**
+     * @param split Split.
+     * @return Task number.
+     */
+    public int taskNumber(GridHadoopInputSplit split) {
+        return pendingSplits.get(split);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeUuid(out, submitNodeId);
+        out.writeObject(jobId);
+        out.writeObject(jobInfo);
+        out.writeObject(mrPlan);
+        out.writeObject(pendingSplits);
+        out.writeObject(pendingReducers);
+        out.writeObject(phase);
+        out.writeObject(failCause);
+        out.writeLong(ver);
+        out.writeObject(reducersAddrs);
+        out.writeObject(counters);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        submitNodeId = U.readUuid(in);
+        jobId = (GridHadoopJobId)in.readObject();
+        jobInfo = (GridHadoopJobInfo)in.readObject();
+        mrPlan = (GridHadoopMapReducePlan)in.readObject();
+        pendingSplits = (Map<GridHadoopInputSplit,Integer>)in.readObject();
+        pendingReducers = (Collection<Integer>)in.readObject();
+        phase = (GridHadoopJobPhase)in.readObject();
+        failCause = (Throwable)in.readObject();
+        ver = in.readLong();
+        reducersAddrs = (Map<Integer, HadoopProcessDescriptor>)in.readObject();
+        counters = (GridHadoopCounters)in.readObject();
+    }
+
+    /** {@inheritDoc} */
+    public String toString() {
+        return S.toString(HadoopJobMetadata.class, this, "pendingMaps", pendingSplits.size(),
+            "pendingReduces", pendingReducers.size(), "failCause", failCause == null ? null :
+                failCause.getClass().getName());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
index 91a2d6f..a0ae3f6 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
@@ -46,7 +46,7 @@ import java.util.concurrent.atomic.*;
 import static java.util.concurrent.TimeUnit.*;
 import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobPhase.*;
 import static org.apache.ignite.internal.processors.hadoop.GridHadoopTaskType.*;
-import static org.apache.ignite.internal.processors.hadoop.taskexecutor.GridHadoopTaskState.*;
+import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.*;
 
 /**
  * Hadoop job tracker.
@@ -56,10 +56,10 @@ public class HadoopJobTracker extends HadoopComponent {
     private final GridMutex mux = new GridMutex();
 
     /** */
-    private volatile GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> jobMetaPrj;
+    private volatile GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata> jobMetaPrj;
 
     /** Projection with expiry policy for finished job updates. */
-    private volatile GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> finishedJobMetaPrj;
+    private volatile GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata> finishedJobMetaPrj;
 
     /** Map-reduce execution planner. */
     @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
@@ -106,8 +106,8 @@ public class HadoopJobTracker extends HadoopComponent {
      * @return Job meta projection.
      */
     @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
-    private GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> jobMetaCache() {
-        GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> prj = jobMetaPrj;
+    private GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata> jobMetaCache() {
+        GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata> prj = jobMetaPrj;
 
         if (prj == null) {
             synchronized (mux) {
@@ -128,8 +128,8 @@ public class HadoopJobTracker extends HadoopComponent {
                         throw new IllegalStateException(e);
                     }
 
-                    jobMetaPrj = prj = (GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata>)
-                        sysCache.projection(GridHadoopJobId.class, GridHadoopJobMetadata.class);
+                    jobMetaPrj = prj = (GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata>)
+                        sysCache.projection(GridHadoopJobId.class, HadoopJobMetadata.class);
 
                     if (ctx.configuration().getFinishedJobInfoTtl() > 0) {
                         ExpiryPolicy finishedJobPlc = new ModifiedExpiryPolicy(
@@ -149,8 +149,8 @@ public class HadoopJobTracker extends HadoopComponent {
     /**
      * @return Projection with expiry policy for finished job updates.
      */
-    private GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> finishedJobMetaCache() {
-        GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> prj = finishedJobMetaPrj;
+    private GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata> finishedJobMetaCache() {
+        GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata> prj = finishedJobMetaPrj;
 
         if (prj == null) {
             jobMetaCache();
@@ -169,9 +169,9 @@ public class HadoopJobTracker extends HadoopComponent {
         super.onKernalStart();
 
         jobMetaCache().context().continuousQueries().executeInternalQuery(
-            new CacheEntryUpdatedListener<GridHadoopJobId, GridHadoopJobMetadata>() {
+            new CacheEntryUpdatedListener<GridHadoopJobId, HadoopJobMetadata>() {
                 @Override public void onUpdated(final Iterable<CacheEntryEvent<? extends GridHadoopJobId,
-                    ? extends GridHadoopJobMetadata>> evts) {
+                    ? extends HadoopJobMetadata>> evts) {
                     if (!busyLock.tryReadLock())
                         return;
 
@@ -250,7 +250,7 @@ public class HadoopJobTracker extends HadoopComponent {
 
             GridHadoopMapReducePlan mrPlan = mrPlanner.preparePlan(job, ctx.nodes(), null);
 
-            GridHadoopJobMetadata meta = new GridHadoopJobMetadata(ctx.localNodeId(), jobId, info);
+            HadoopJobMetadata meta = new HadoopJobMetadata(ctx.localNodeId(), jobId, info);
 
             meta.mapReducePlan(mrPlan);
 
@@ -268,7 +268,7 @@ public class HadoopJobTracker extends HadoopComponent {
 
             long jobStart = U.currentTimeMillis();
 
-            GridHadoopPerformanceCounter perfCntr = GridHadoopPerformanceCounter.getCounter(meta.counters(),
+            HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(meta.counters(),
                 ctx.localNodeId());
 
             perfCntr.clientSubmissionEvents(info);
@@ -297,7 +297,7 @@ public class HadoopJobTracker extends HadoopComponent {
      * @return Status.
      */
     @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-    public static GridHadoopJobStatus status(GridHadoopJobMetadata meta) {
+    public static GridHadoopJobStatus status(HadoopJobMetadata meta) {
         GridHadoopJobInfo jobInfo = meta.jobInfo();
 
         return new GridHadoopJobStatus(
@@ -325,7 +325,7 @@ public class HadoopJobTracker extends HadoopComponent {
             return null; // Grid is stopping.
 
         try {
-            GridHadoopJobMetadata meta = jobMetaCache().get(jobId);
+            HadoopJobMetadata meta = jobMetaCache().get(jobId);
 
             return meta != null ? status(meta) : null;
         }
@@ -346,7 +346,7 @@ public class HadoopJobTracker extends HadoopComponent {
             return null; // Grid is stopping.
 
         try {
-            GridHadoopJobMetadata meta = jobMetaCache().get(jobId);
+            HadoopJobMetadata meta = jobMetaCache().get(jobId);
 
             if (meta == null)
                 return null;
@@ -400,7 +400,7 @@ public class HadoopJobTracker extends HadoopComponent {
             return null;
 
         try {
-            GridHadoopJobMetadata meta = jobMetaCache().get(jobId);
+            HadoopJobMetadata meta = jobMetaCache().get(jobId);
 
             if (meta != null)
                 return meta.mapReducePlan();
@@ -419,7 +419,7 @@ public class HadoopJobTracker extends HadoopComponent {
      * @param status Task status.
      */
     @SuppressWarnings({"ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
-    public void onTaskFinished(GridHadoopTaskInfo info, GridHadoopTaskStatus status) {
+    public void onTaskFinished(GridHadoopTaskInfo info, HadoopTaskStatus status) {
         if (!busyLock.tryReadLock())
             return;
 
@@ -470,7 +470,7 @@ public class HadoopJobTracker extends HadoopComponent {
 
                 case COMMIT:
                 case ABORT: {
-                    GridCacheProjectionEx<GridHadoopJobId, GridHadoopJobMetadata> cache = finishedJobMetaCache();
+                    GridCacheProjectionEx<GridHadoopJobId, HadoopJobMetadata> cache = finishedJobMetaCache();
 
                     cache.invokeAsync(info.jobId(), new UpdatePhaseProcessor(incrCntrs, PHASE_COMPLETE)).
                         listenAsync(failsLog);
@@ -488,7 +488,7 @@ public class HadoopJobTracker extends HadoopComponent {
      * @param jobId Job id.
      * @param c Closure of operation.
      */
-    private void transform(GridHadoopJobId jobId, EntryProcessor<GridHadoopJobId, GridHadoopJobMetadata, Void> c) {
+    private void transform(GridHadoopJobId jobId, EntryProcessor<GridHadoopJobId, HadoopJobMetadata, Void> c) {
         jobMetaCache().invokeAsync(jobId, c).listenAsync(failsLog);
     }
 
@@ -500,7 +500,7 @@ public class HadoopJobTracker extends HadoopComponent {
      * @param desc Process descriptor.
      */
     public void onExternalMappersInitialized(GridHadoopJobId jobId, Collection<Integer> reducers,
-        GridHadoopProcessDescriptor desc) {
+        HadoopProcessDescriptor desc) {
         transform(jobId, new InitializeReducersProcessor(null, reducers, desc));
     }
 
@@ -557,7 +557,7 @@ public class HadoopJobTracker extends HadoopComponent {
 
             // Iteration over all local entries is correct since system cache is REPLICATED.
             for (Object metaObj : jobMetaCache().values()) {
-                GridHadoopJobMetadata meta = (GridHadoopJobMetadata)metaObj;
+                HadoopJobMetadata meta = (HadoopJobMetadata)metaObj;
 
                 GridHadoopJobId jobId = meta.jobId();
 
@@ -626,13 +626,13 @@ public class HadoopJobTracker extends HadoopComponent {
      * @throws IgniteCheckedException If failed.
      */
     private void processJobMetadataUpdates(
-        Iterable<CacheEntryEvent<? extends GridHadoopJobId, ? extends GridHadoopJobMetadata>> updated)
+        Iterable<CacheEntryEvent<? extends GridHadoopJobId, ? extends HadoopJobMetadata>> updated)
         throws IgniteCheckedException {
         UUID locNodeId = ctx.localNodeId();
 
-        for (CacheEntryEvent<? extends GridHadoopJobId, ? extends GridHadoopJobMetadata> entry : updated) {
+        for (CacheEntryEvent<? extends GridHadoopJobId, ? extends HadoopJobMetadata> entry : updated) {
             GridHadoopJobId jobId = entry.getKey();
-            GridHadoopJobMetadata meta = entry.getValue();
+            HadoopJobMetadata meta = entry.getValue();
 
             if (meta == null || !ctx.isParticipating(meta))
                 continue;
@@ -689,7 +689,7 @@ public class HadoopJobTracker extends HadoopComponent {
      * @param locNodeId Local node ID.
      * @throws IgniteCheckedException If failed.
      */
-    private void processJobMetaUpdate(GridHadoopJobId jobId, GridHadoopJobMetadata meta, UUID locNodeId)
+    private void processJobMetaUpdate(GridHadoopJobId jobId, HadoopJobMetadata meta, UUID locNodeId)
         throws IgniteCheckedException {
         JobLocalState state = activeJobs.get(jobId);
 
@@ -879,7 +879,7 @@ public class HadoopJobTracker extends HadoopComponent {
      * @param meta Job metadata.
      * @return Collection of created task infos or {@code null} if no mapper tasks scheduled for local node.
      */
-    private Collection<GridHadoopTaskInfo> mapperTasks(Iterable<GridHadoopInputSplit> mappers, GridHadoopJobMetadata meta) {
+    private Collection<GridHadoopTaskInfo> mapperTasks(Iterable<GridHadoopInputSplit> mappers, HadoopJobMetadata meta) {
         UUID locNodeId = ctx.localNodeId();
         GridHadoopJobId jobId = meta.jobId();
 
@@ -978,7 +978,7 @@ public class HadoopJobTracker extends HadoopComponent {
 
         try {
             if (jobInfo == null) {
-                GridHadoopJobMetadata meta = jobMetaCache().get(jobId);
+                HadoopJobMetadata meta = jobMetaCache().get(jobId);
 
                 if (meta == null)
                     throw new IgniteCheckedException("Failed to find job metadata for ID: " + jobId);
@@ -1024,7 +1024,7 @@ public class HadoopJobTracker extends HadoopComponent {
             return false; // Grid is stopping.
 
         try {
-            GridHadoopJobMetadata meta = jobMetaCache().get(jobId);
+            HadoopJobMetadata meta = jobMetaCache().get(jobId);
 
             if (meta != null && meta.phase() != PHASE_COMPLETE && meta.phase() != PHASE_CANCELLING) {
                 HadoopTaskCancelledException err = new HadoopTaskCancelledException("Job cancelled.");
@@ -1063,7 +1063,7 @@ public class HadoopJobTracker extends HadoopComponent {
             return null;
 
         try {
-            final GridHadoopJobMetadata meta = jobMetaCache().get(jobId);
+            final HadoopJobMetadata meta = jobMetaCache().get(jobId);
 
             return meta != null ? meta.counters() : null;
         }
@@ -1158,7 +1158,7 @@ public class HadoopJobTracker extends HadoopComponent {
          * @param status Task status.
          * @param prev Previous closure.
          */
-        private void onSetupFinished(final GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, StackedProcessor prev) {
+        private void onSetupFinished(final GridHadoopTaskInfo taskInfo, HadoopTaskStatus status, StackedProcessor prev) {
             final GridHadoopJobId jobId = taskInfo.jobId();
 
             if (status.state() == FAILED || status.state() == CRASHED)
@@ -1172,7 +1172,7 @@ public class HadoopJobTracker extends HadoopComponent {
          * @param status Task status.
          * @param prev Previous closure.
          */
-        private void onMapFinished(final GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status,
+        private void onMapFinished(final GridHadoopTaskInfo taskInfo, HadoopTaskStatus status,
             final StackedProcessor prev) {
             final GridHadoopJobId jobId = taskInfo.jobId();
 
@@ -1213,7 +1213,7 @@ public class HadoopJobTracker extends HadoopComponent {
          * @param status Task status.
          * @param prev Previous closure.
          */
-        private void onReduceFinished(GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status, StackedProcessor prev) {
+        private void onReduceFinished(GridHadoopTaskInfo taskInfo, HadoopTaskStatus status, StackedProcessor prev) {
             GridHadoopJobId jobId = taskInfo.jobId();
             if (status.state() == FAILED || status.state() == CRASHED)
                 // Fail the whole job.
@@ -1227,7 +1227,7 @@ public class HadoopJobTracker extends HadoopComponent {
          * @param status Task status.
          * @param prev Previous closure.
          */
-        private void onCombineFinished(GridHadoopTaskInfo taskInfo, GridHadoopTaskStatus status,
+        private void onCombineFinished(GridHadoopTaskInfo taskInfo, HadoopTaskStatus status,
             final StackedProcessor prev) {
             final GridHadoopJobId jobId = taskInfo.jobId();
 
@@ -1302,7 +1302,7 @@ public class HadoopJobTracker extends HadoopComponent {
         }
 
         /** {@inheritDoc} */
-        @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) {
+        @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
             cp.phase(phase);
         }
     }
@@ -1343,7 +1343,7 @@ public class HadoopJobTracker extends HadoopComponent {
         }
 
         /** {@inheritDoc} */
-        @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) {
+        @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
             Map<GridHadoopInputSplit, Integer> splitsCp = new HashMap<>(cp.pendingSplits());
 
             for (GridHadoopInputSplit s : splits)
@@ -1400,7 +1400,7 @@ public class HadoopJobTracker extends HadoopComponent {
         }
 
         /** {@inheritDoc} */
-        @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) {
+        @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
             Collection<Integer> rdcCp = new HashSet<>(cp.pendingReducers());
 
             rdcCp.remove(rdc);
@@ -1425,7 +1425,7 @@ public class HadoopJobTracker extends HadoopComponent {
         private final Collection<Integer> rdc;
 
         /** Process descriptor for reducers. */
-        private final GridHadoopProcessDescriptor desc;
+        private final HadoopProcessDescriptor desc;
 
         /**
          * @param prev Previous closure.
@@ -1434,7 +1434,7 @@ public class HadoopJobTracker extends HadoopComponent {
          */
         private InitializeReducersProcessor(@Nullable StackedProcessor prev,
             Collection<Integer> rdc,
-            GridHadoopProcessDescriptor desc) {
+            HadoopProcessDescriptor desc) {
             super(prev);
 
             assert !F.isEmpty(rdc);
@@ -1445,11 +1445,11 @@ public class HadoopJobTracker extends HadoopComponent {
         }
 
         /** {@inheritDoc} */
-        @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) {
-            Map<Integer, GridHadoopProcessDescriptor> oldMap = meta.reducersAddresses();
+        @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
+            Map<Integer, HadoopProcessDescriptor> oldMap = meta.reducersAddresses();
 
-            Map<Integer, GridHadoopProcessDescriptor> rdcMap = oldMap == null ?
-                new HashMap<Integer, GridHadoopProcessDescriptor>() : new HashMap<>(oldMap);
+            Map<Integer, HadoopProcessDescriptor> rdcMap = oldMap == null ?
+                new HashMap<Integer, HadoopProcessDescriptor>() : new HashMap<>(oldMap);
 
             for (Integer r : rdc)
                 rdcMap.put(r, desc);
@@ -1511,7 +1511,7 @@ public class HadoopJobTracker extends HadoopComponent {
         }
 
         /** {@inheritDoc} */
-        @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) {
+        @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
             assert meta.phase() == PHASE_CANCELLING || err != null: "Invalid phase for cancel: " + meta;
 
             Collection<Integer> rdcCp = new HashSet<>(cp.pendingReducers());
@@ -1560,8 +1560,8 @@ public class HadoopJobTracker extends HadoopComponent {
         }
 
         /** {@inheritDoc} */
-        @Override protected void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp) {
-            GridHadoopCounters cntrs = new GridHadoopCountersImpl(cp.counters());
+        @Override protected void update(HadoopJobMetadata meta, HadoopJobMetadata cp) {
+            GridHadoopCounters cntrs = new HadoopCountersImpl(cp.counters());
 
             cntrs.merge(counters);
 
@@ -1573,7 +1573,7 @@ public class HadoopJobTracker extends HadoopComponent {
      * Abstract stacked closure.
      */
     private abstract static class StackedProcessor implements
-        EntryProcessor<GridHadoopJobId, GridHadoopJobMetadata, Void>, Serializable {
+        EntryProcessor<GridHadoopJobId, HadoopJobMetadata, Void>, Serializable {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -1588,8 +1588,8 @@ public class HadoopJobTracker extends HadoopComponent {
         }
 
         /** {@inheritDoc} */
-        @Override public Void process(MutableEntry<GridHadoopJobId, GridHadoopJobMetadata> e, Object... args) {
-            GridHadoopJobMetadata val = apply(e.getValue());
+        @Override public Void process(MutableEntry<GridHadoopJobId, HadoopJobMetadata> e, Object... args) {
+            HadoopJobMetadata val = apply(e.getValue());
 
             if (val != null)
                 e.setValue(val);
@@ -1603,11 +1603,11 @@ public class HadoopJobTracker extends HadoopComponent {
          * @param meta Old value.
          * @return New value.
          */
-        private GridHadoopJobMetadata apply(GridHadoopJobMetadata meta) {
+        private HadoopJobMetadata apply(HadoopJobMetadata meta) {
             if (meta == null)
                 return null;
 
-            GridHadoopJobMetadata cp = prev != null ? prev.apply(meta) : new GridHadoopJobMetadata(meta);
+            HadoopJobMetadata cp = prev != null ? prev.apply(meta) : new HadoopJobMetadata(meta);
 
             update(meta, cp);
 
@@ -1620,6 +1620,6 @@ public class HadoopJobTracker extends HadoopComponent {
          * @param meta Initial job metadata.
          * @param cp Copy.
          */
-        protected abstract void update(GridHadoopJobMetadata meta, GridHadoopJobMetadata cp);
+        protected abstract void update(HadoopJobMetadata meta, HadoopJobMetadata cp);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/GridHadoopMessage.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/GridHadoopMessage.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/GridHadoopMessage.java
deleted file mode 100644
index 1670a8a..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/GridHadoopMessage.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.message;
-
-import java.io.*;
-
-/**
- * Marker interface for all hadoop messages.
- */
-public interface GridHadoopMessage extends Externalizable {
-    // No-op.
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java
new file mode 100644
index 0000000..cab6138
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/message/HadoopMessage.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.message;
+
+import java.io.*;
+
+/**
+ * Marker interface for all hadoop messages.
+ */
+public interface HadoopMessage extends Externalizable {
+    // No-op.
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/288709a1/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlan.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlan.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlan.java
deleted file mode 100644
index 7988403..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/GridHadoopDefaultMapReducePlan.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.planner;
-
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Map-reduce plan.
- */
-public class GridHadoopDefaultMapReducePlan implements GridHadoopMapReducePlan {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Mappers map. */
-    private Map<UUID, Collection<GridHadoopInputSplit>> mappers;
-
-    /** Reducers map. */
-    private Map<UUID, int[]> reducers;
-
-    /** Mappers count. */
-    private int mappersCnt;
-
-    /** Reducers count. */
-    private int reducersCnt;
-
-    /**
-     * @param mappers Mappers map.
-     * @param reducers Reducers map.
-     */
-    public GridHadoopDefaultMapReducePlan(Map<UUID, Collection<GridHadoopInputSplit>> mappers,
-        Map<UUID, int[]> reducers) {
-        this.mappers = mappers;
-        this.reducers = reducers;
-
-        if (mappers != null) {
-            for (Collection<GridHadoopInputSplit> splits : mappers.values())
-                mappersCnt += splits.size();
-        }
-
-        if (reducers != null) {
-            for (int[] rdcrs : reducers.values())
-                reducersCnt += rdcrs.length;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public int mappers() {
-        return mappersCnt;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int reducers() {
-        return reducersCnt;
-    }
-
-    /** {@inheritDoc} */
-    @Override public UUID nodeForReducer(int reducer) {
-        assert reducer >= 0 && reducer < reducersCnt : reducer;
-
-        for (Map.Entry<UUID, int[]> entry : reducers.entrySet()) {
-            for (int r : entry.getValue()) {
-                if (r == reducer)
-                    return entry.getKey();
-            }
-        }
-
-        throw new IllegalStateException("Not found reducer index: " + reducer);
-    }
-
-    /** {@inheritDoc} */
-    @Override @Nullable public Collection<GridHadoopInputSplit> mappers(UUID nodeId) {
-        return mappers.get(nodeId);
-    }
-
-    /** {@inheritDoc} */
-    @Override @Nullable public int[] reducers(UUID nodeId) {
-        return reducers.get(nodeId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<UUID> mapperNodeIds() {
-        return mappers.keySet();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<UUID> reducerNodeIds() {
-        return reducers.keySet();
-    }
-}