You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/19 10:27:06 UTC

[16/51] [abbrv] [partial] ignite git commit: IGNITE-3916: Created separate module.

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java
deleted file mode 100644
index fa5cbc5..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsUtils.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.AbstractFileSystem;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.ParentNotDirectoryException;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathExistsException;
-import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException;
-import org.apache.ignite.igfs.IgfsParentNotDirectoryException;
-import org.apache.ignite.igfs.IgfsPathAlreadyExistsException;
-import org.apache.ignite.igfs.IgfsPathNotFoundException;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Utility constants and methods for IGFS Hadoop file system.
- */
-public class HadoopIgfsUtils {
-    /** Parameter name for endpoint no embed mode flag. */
-    public static final String PARAM_IGFS_ENDPOINT_NO_EMBED = "fs.igfs.%s.endpoint.no_embed";
-
-    /** Parameter name for endpoint no shared memory flag. */
-    public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM = "fs.igfs.%s.endpoint.no_local_shmem";
-
-    /** Parameter name for endpoint no local TCP flag. */
-    public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP = "fs.igfs.%s.endpoint.no_local_tcp";
-
-    /**
-     * Get string parameter.
-     *
-     * @param cfg Configuration.
-     * @param name Parameter name.
-     * @param authority Authority.
-     * @param dflt Default value.
-     * @return String value.
-     */
-    public static String parameter(Configuration cfg, String name, String authority, String dflt) {
-        return cfg.get(String.format(name, authority != null ? authority : ""), dflt);
-    }
-
-    /**
-     * Get integer parameter.
-     *
-     * @param cfg Configuration.
-     * @param name Parameter name.
-     * @param authority Authority.
-     * @param dflt Default value.
-     * @return Integer value.
-     * @throws IOException In case of parse exception.
-     */
-    public static int parameter(Configuration cfg, String name, String authority, int dflt) throws IOException {
-        String name0 = String.format(name, authority != null ? authority : "");
-
-        try {
-            return cfg.getInt(name0, dflt);
-        }
-        catch (NumberFormatException ignore) {
-            throw new IOException("Failed to parse parameter value to integer: " + name0);
-        }
-    }
-
-    /**
-     * Get boolean parameter.
-     *
-     * @param cfg Configuration.
-     * @param name Parameter name.
-     * @param authority Authority.
-     * @param dflt Default value.
-     * @return Boolean value.
-     */
-    public static boolean parameter(Configuration cfg, String name, String authority, boolean dflt) {
-        return cfg.getBoolean(String.format(name, authority != null ? authority : ""), dflt);
-    }
-
-    /**
-     * Cast Ignite exception to appropriate IO exception.
-     *
-     * @param e Exception to cast.
-     * @return Casted exception.
-     */
-    public static IOException cast(IgniteCheckedException e) {
-        return cast(e, null);
-    }
-
-    /**
-     * Cast Ignite exception to appropriate IO exception.
-     *
-     * @param e Exception to cast.
-     * @param path Path for exceptions.
-     * @return Casted exception.
-     */
-    @SuppressWarnings("unchecked")
-    public static IOException cast(IgniteCheckedException e, @Nullable String path) {
-        assert e != null;
-
-        // First check for any nested IOException; if exists - re-throw it.
-        if (e.hasCause(IOException.class))
-            return e.getCause(IOException.class);
-        else if (e.hasCause(IgfsPathNotFoundException.class))
-            return new FileNotFoundException(path); // TODO: Or PathNotFoundException?
-        else if (e.hasCause(IgfsParentNotDirectoryException.class))
-            return new ParentNotDirectoryException(path);
-        else if (path != null && e.hasCause(IgfsDirectoryNotEmptyException.class))
-            return new PathIsNotEmptyDirectoryException(path);
-        else if (path != null && e.hasCause(IgfsPathAlreadyExistsException.class))
-            return new PathExistsException(path);
-        else {
-            String msg = e.getMessage();
-
-            return msg == null ? new IOException(e) : new IOException(msg, e);
-        }
-    }
-
-    /**
-     * Deletes all files from the given file system.
-     *
-     * @param fs The file system to clean up.
-     * @throws IOException On error.
-     */
-    public static void clear(FileSystem fs) throws IOException {
-        // Delete root contents:
-        FileStatus[] statuses = fs.listStatus(new Path("/"));
-
-        if (statuses != null) {
-            for (FileStatus stat: statuses)
-                fs.delete(stat.getPath(), true);
-        }
-    }
-
-    /**
-     * Deletes all files from the given file system.
-     *
-     * @param fs The file system to clean up.
-     * @throws IOException On error.
-     */
-    public static void clear(AbstractFileSystem fs) throws IOException {
-        // Delete root contents:
-        FileStatus[] statuses = fs.listStatus(new Path("/"));
-
-        if (statuses != null) {
-            for (FileStatus stat: statuses)
-                fs.delete(stat.getPath(), true);
-        }
-    }
-
-    /**
-     * Constructor.
-     */
-    private HadoopIgfsUtils() {
-        // No-op.
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
deleted file mode 100644
index f4ee97f..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java
+++ /dev/null
@@ -1,552 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.igfs;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.commons.logging.Log;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteFileSystem;
-import org.apache.ignite.IgniteIllegalStateException;
-import org.apache.ignite.Ignition;
-import org.apache.ignite.igfs.IgfsBlockLocation;
-import org.apache.ignite.igfs.IgfsFile;
-import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.igfs.IgfsPathSummary;
-import org.apache.ignite.internal.processors.igfs.IgfsEx;
-import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
-import org.apache.ignite.internal.processors.igfs.IgfsStatus;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.SB;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-
-import static org.apache.ignite.IgniteState.STARTED;
-import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint.LOCALHOST;
-import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_EMBED;
-import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM;
-import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP;
-import static org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsUtils.parameter;
-
-/**
- * Wrapper for IGFS server.
- */
-public class HadoopIgfsWrapper implements HadoopIgfs {
-    /** Delegate. */
-    private final AtomicReference<Delegate> delegateRef = new AtomicReference<>();
-
-    /** Authority. */
-    private final String authority;
-
-    /** Connection string. */
-    private final HadoopIgfsEndpoint endpoint;
-
-    /** Log directory. */
-    private final String logDir;
-
-    /** Configuration. */
-    private final Configuration conf;
-
-    /** Logger. */
-    private final Log log;
-
-    /** The user name this wrapper works on behalf of. */
-    private final String userName;
-
-    /**
-     * Constructor.
-     *
-     * @param authority Authority (connection string).
-     * @param logDir Log directory for server.
-     * @param conf Configuration.
-     * @param log Current logger.
-     */
-    public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log, String user)
-        throws IOException {
-        try {
-            this.authority = authority;
-            this.endpoint = new HadoopIgfsEndpoint(authority);
-            this.logDir = logDir;
-            this.conf = conf;
-            this.log = log;
-            this.userName = user;
-        }
-        catch (IgniteCheckedException e) {
-            throw new IOException("Failed to parse endpoint: " + authority, e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsHandshakeResponse handshake(String logDir) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<IgfsHandshakeResponse>() {
-            @Override public IgfsHandshakeResponse apply(HadoopIgfsEx hadoop,
-                IgfsHandshakeResponse hndResp) {
-                return hndResp;
-            }
-        });
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close(boolean force) {
-        Delegate delegate = delegateRef.get();
-
-        if (delegate != null && delegateRef.compareAndSet(delegate, null))
-            delegate.close(force);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsFile info(final IgfsPath path) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<IgfsFile>() {
-            @Override public IgfsFile apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
-                throws IgniteCheckedException, IOException {
-                return hadoop.info(path);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsFile update(final IgfsPath path, final Map<String, String> props) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<IgfsFile>() {
-            @Override public IgfsFile apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
-                throws IgniteCheckedException, IOException {
-                return hadoop.update(path, props);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Boolean setTimes(final IgfsPath path, final long accessTime, final long modificationTime)
-        throws IOException {
-        return withReconnectHandling(new FileSystemClosure<Boolean>() {
-            @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
-                throws IgniteCheckedException, IOException {
-                return hadoop.setTimes(path, accessTime, modificationTime);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Boolean rename(final IgfsPath src, final IgfsPath dest) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<Boolean>() {
-            @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
-                throws IgniteCheckedException, IOException {
-                return hadoop.rename(src, dest);
-            }
-        }, src);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Boolean delete(final IgfsPath path, final boolean recursive) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<Boolean>() {
-            @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
-                throws IgniteCheckedException, IOException {
-                return hadoop.delete(path, recursive);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<IgfsBlockLocation> affinity(final IgfsPath path, final long start,
-        final long len) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<Collection<IgfsBlockLocation>>() {
-            @Override public Collection<IgfsBlockLocation> apply(HadoopIgfsEx hadoop,
-                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
-                return hadoop.affinity(path, start, len);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsPathSummary contentSummary(final IgfsPath path) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<IgfsPathSummary>() {
-            @Override public IgfsPathSummary apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
-                throws IgniteCheckedException, IOException {
-                return hadoop.contentSummary(path);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<Boolean>() {
-            @Override public Boolean apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
-                throws IgniteCheckedException, IOException {
-                return hadoop.mkdirs(path, props);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<IgfsFile> listFiles(final IgfsPath path) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<Collection<IgfsFile>>() {
-            @Override public Collection<IgfsFile> apply(HadoopIgfsEx hadoop,
-                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
-                return hadoop.listFiles(path);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<IgfsPath> listPaths(final IgfsPath path) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<Collection<IgfsPath>>() {
-            @Override public Collection<IgfsPath> apply(HadoopIgfsEx hadoop,
-                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
-                return hadoop.listPaths(path);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsStatus fsStatus() throws IOException {
-        return withReconnectHandling(new FileSystemClosure<IgfsStatus>() {
-            @Override public IgfsStatus apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp)
-                throws IgniteCheckedException, IOException {
-                return hadoop.fsStatus();
-            }
-        });
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate open(final IgfsPath path) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() {
-            @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop,
-                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
-                return hadoop.open(path);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate open(final IgfsPath path, final int seqReadsBeforePrefetch)
-        throws IOException {
-        return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() {
-            @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop,
-                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
-                return hadoop.open(path, seqReadsBeforePrefetch);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate create(final IgfsPath path, final boolean overwrite,
-        final boolean colocate, final int replication, final long blockSize, @Nullable final Map<String, String> props)
-        throws IOException {
-        return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() {
-            @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop,
-                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
-                return hadoop.create(path, overwrite, colocate, replication, blockSize, props);
-            }
-        }, path);
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate append(final IgfsPath path, final boolean create,
-        @Nullable final Map<String, String> props) throws IOException {
-        return withReconnectHandling(new FileSystemClosure<HadoopIgfsStreamDelegate>() {
-            @Override public HadoopIgfsStreamDelegate apply(HadoopIgfsEx hadoop,
-                IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException {
-                return hadoop.append(path, create, props);
-            }
-        }, path);
-    }
-
-    /**
-     * Execute closure which is not path-specific.
-     *
-     * @param clo Closure.
-     * @return Result.
-     * @throws IOException If failed.
-     */
-    private <T> T withReconnectHandling(FileSystemClosure<T> clo) throws IOException {
-        return withReconnectHandling(clo, null);
-    }
-
-    /**
-     * Execute closure.
-     *
-     * @param clo Closure.
-     * @param path Path for exceptions.
-     * @return Result.
-     * @throws IOException If failed.
-     */
-    private <T> T withReconnectHandling(final FileSystemClosure<T> clo, @Nullable IgfsPath path)
-        throws IOException {
-        Exception err = null;
-
-        for (int i = 0; i < 2; i++) {
-            Delegate curDelegate = null;
-
-            boolean close = false;
-            boolean force = false;
-
-            try {
-                curDelegate = delegate();
-
-                assert curDelegate != null;
-
-                close = curDelegate.doomed;
-
-                return clo.apply(curDelegate.hadoop, curDelegate.hndResp);
-            }
-            catch (HadoopIgfsCommunicationException e) {
-                if (curDelegate != null && !curDelegate.doomed) {
-                    // Try getting rid fo faulty delegate ASAP.
-                    delegateRef.compareAndSet(curDelegate, null);
-
-                    close = true;
-                    force = true;
-                }
-
-                if (log.isDebugEnabled())
-                    log.debug("Failed to send message to a server: " + e);
-
-                err = e;
-            }
-            catch (IgniteCheckedException e) {
-                throw HadoopIgfsUtils.cast(e, path != null ? path.toString() : null);
-            }
-            finally {
-                if (close) {
-                    assert curDelegate != null;
-
-                    curDelegate.close(force);
-                }
-            }
-        }
-
-        List<Throwable> list = X.getThrowableList(err);
-
-        Throwable cause = list.get(list.size() - 1);
-
-        throw new IOException("Failed to communicate with IGFS: "
-            + (cause.getMessage() == null ? cause.toString() : cause.getMessage()), err);
-    }
-
-    /**
-     * Get delegate creating it if needed.
-     *
-     * @return Delegate.
-     */
-    private Delegate delegate() throws HadoopIgfsCommunicationException {
-        // These fields will contain possible exceptions from shmem and TCP endpoints.
-        Exception errShmem = null;
-        Exception errTcp = null;
-
-        // 1. If delegate is set, return it immediately.
-        Delegate curDelegate = delegateRef.get();
-
-        if (curDelegate != null)
-            return curDelegate;
-
-        // 2. Guess that we are in the same VM.
-        boolean skipInProc = parameter(conf, PARAM_IGFS_ENDPOINT_NO_EMBED, authority, false);
-
-        if (!skipInProc) {
-            IgfsEx igfs = getIgfsEx(endpoint.grid(), endpoint.igfs());
-
-            if (igfs != null) {
-                HadoopIgfsEx hadoop = null;
-
-                try {
-                    hadoop = new HadoopIgfsInProc(igfs, log, userName);
-
-                    curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
-                }
-                catch (IOException | IgniteCheckedException e) {
-                    if (e instanceof HadoopIgfsCommunicationException)
-                        if (hadoop != null)
-                            hadoop.close(true);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Failed to connect to in-process IGFS, fallback to IPC mode.", e);
-                }
-            }
-        }
-
-        // 3. Try connecting using shmem.
-        boolean skipLocShmem = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM, authority, false);
-
-        if (curDelegate == null && !skipLocShmem && !U.isWindows()) {
-            HadoopIgfsEx hadoop = null;
-
-            try {
-                hadoop = new HadoopIgfsOutProc(endpoint.port(), endpoint.grid(), endpoint.igfs(), log, userName);
-
-                curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
-            }
-            catch (IOException | IgniteCheckedException e) {
-                if (e instanceof HadoopIgfsCommunicationException)
-                    hadoop.close(true);
-
-                if (log.isDebugEnabled())
-                    log.debug("Failed to connect to IGFS using shared memory [port=" + endpoint.port() + ']', e);
-
-                errShmem = e;
-            }
-        }
-
-        // 4. Try local TCP connection.
-        boolean skipLocTcp = parameter(conf, PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP, authority, false);
-
-        if (curDelegate == null && !skipLocTcp) {
-            HadoopIgfsEx hadoop = null;
-
-            try {
-                hadoop = new HadoopIgfsOutProc(LOCALHOST, endpoint.port(), endpoint.grid(), endpoint.igfs(),
-                    log, userName);
-
-                curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
-            }
-            catch (IOException | IgniteCheckedException e) {
-                if (e instanceof HadoopIgfsCommunicationException)
-                    hadoop.close(true);
-
-                if (log.isDebugEnabled())
-                    log.debug("Failed to connect to IGFS using TCP [host=" + endpoint.host() +
-                        ", port=" + endpoint.port() + ']', e);
-
-                errTcp = e;
-            }
-        }
-
-        // 5. Try remote TCP connection.
-        if (curDelegate == null && (skipLocTcp || !F.eq(LOCALHOST, endpoint.host()))) {
-            HadoopIgfsEx hadoop = null;
-
-            try {
-                hadoop = new HadoopIgfsOutProc(endpoint.host(), endpoint.port(), endpoint.grid(), endpoint.igfs(),
-                    log, userName);
-
-                curDelegate = new Delegate(hadoop, hadoop.handshake(logDir));
-            }
-            catch (IOException | IgniteCheckedException e) {
-                if (e instanceof HadoopIgfsCommunicationException)
-                    hadoop.close(true);
-
-                if (log.isDebugEnabled())
-                    log.debug("Failed to connect to IGFS using TCP [host=" + endpoint.host() +
-                        ", port=" + endpoint.port() + ']', e);
-
-                errTcp = e;
-            }
-        }
-
-        if (curDelegate != null) {
-            if (!delegateRef.compareAndSet(null, curDelegate))
-                curDelegate.doomed = true;
-
-            return curDelegate;
-        }
-        else {
-            SB errMsg = new SB("Failed to connect to IGFS [endpoint=igfs://" + authority + ", attempts=[");
-
-            if (errShmem != null)
-                errMsg.a("[type=SHMEM, port=" + endpoint.port() + ", err=" + errShmem + "], ");
-
-            errMsg.a("[type=TCP, host=" + endpoint.host() + ", port=" + endpoint.port() + ", err=" + errTcp + "]] ");
-
-            errMsg.a("(ensure that IGFS is running and have IPC endpoint enabled; ensure that " +
-                "ignite-shmem-1.0.0.jar is in Hadoop classpath if you use shared memory endpoint).");
-
-            throw new HadoopIgfsCommunicationException(errMsg.toString());
-        }
-    }
-
-    /**
-     * File system operation closure.
-     */
-    private static interface FileSystemClosure<T> {
-        /**
-         * Call closure body.
-         *
-         * @param hadoop RPC handler.
-         * @param hndResp Handshake response.
-         * @return Result.
-         * @throws IgniteCheckedException If failed.
-         * @throws IOException If failed.
-         */
-        public T apply(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) throws IgniteCheckedException, IOException;
-    }
-
-    /**
-     * Delegate.
-     */
-    private static class Delegate {
-        /** RPC handler. */
-        private final HadoopIgfsEx hadoop;
-
-        /** Handshake request. */
-        private final IgfsHandshakeResponse hndResp;
-
-        /** Close guard. */
-        private final AtomicBoolean closeGuard = new AtomicBoolean();
-
-        /** Whether this delegate must be closed at the end of the next invocation. */
-        private boolean doomed;
-
-        /**
-         * Constructor.
-         *
-         * @param hadoop Hadoop.
-         * @param hndResp Handshake response.
-         */
-        private Delegate(HadoopIgfsEx hadoop, IgfsHandshakeResponse hndResp) {
-            this.hadoop = hadoop;
-            this.hndResp = hndResp;
-        }
-
-        /**
-         * Close underlying RPC handler.
-         *
-         * @param force Force flag.
-         */
-        private void close(boolean force) {
-            if (closeGuard.compareAndSet(false, true))
-                hadoop.close(force);
-        }
-    }
-
-    /**
-     * Helper method to find Igfs of the given name in the given Ignite instance.
-     *
-     * @param gridName The name of the grid to check.
-     * @param igfsName The name of Igfs.
-     * @return The file system instance, or null if not found.
-     */
-    private static IgfsEx getIgfsEx(@Nullable String gridName, @Nullable String igfsName) {
-        if (Ignition.state(gridName) == STARTED) {
-            try {
-                for (IgniteFileSystem fs : Ignition.ignite(gridName).fileSystems()) {
-                    if (F.eq(fs.name(), igfsName))
-                        return (IgfsEx)fs;
-                }
-            }
-            catch (IgniteIllegalStateException ignore) {
-                // May happen if the grid state has changed:
-            }
-        }
-
-        return null;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/857cdcde/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
deleted file mode 100644
index 090b336..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.jobtracker;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Collection;
-import java.util.Map;
-import java.util.UUID;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl;
-import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_SETUP;
-
-/**
- * Hadoop job metadata. Internal object used for distributed job state tracking.
- */
-public class HadoopJobMetadata implements Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Job ID. */
-    private HadoopJobId jobId;
-
-    /** Job info. */
-    private HadoopJobInfo jobInfo;
-
-    /** Node submitted job. */
-    private UUID submitNodeId;
-
-    /** Map-reduce plan. */
-    private HadoopMapReducePlan mrPlan;
-
-    /** Pending splits for which mapper should be executed. */
-    private Map<HadoopInputSplit, Integer> pendingSplits;
-
-    /** Pending reducers. */
-    private Collection<Integer> pendingReducers;
-
-    /** Reducers addresses. */
-    @GridToStringInclude
-    private Map<Integer, HadoopProcessDescriptor> reducersAddrs;
-
-    /** Job phase. */
-    private HadoopJobPhase phase = PHASE_SETUP;
-
-    /** Fail cause. */
-    @GridToStringExclude
-    private Throwable failCause;
-
-    /** Version. */
-    private long ver;
-
-    /** Job counters */
-    private HadoopCounters counters = new HadoopCountersImpl();
-
-    /**
-     * Empty constructor required by {@link Externalizable}.
-     */
-    public HadoopJobMetadata() {
-        // No-op.
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param submitNodeId Submit node ID.
-     * @param jobId Job ID.
-     * @param jobInfo Job info.
-     */
-    public HadoopJobMetadata(UUID submitNodeId, HadoopJobId jobId, HadoopJobInfo jobInfo) {
-        this.jobId = jobId;
-        this.jobInfo = jobInfo;
-        this.submitNodeId = submitNodeId;
-    }
-
-    /**
-     * Copy constructor.
-     *
-     * @param src Metadata to copy.
-     */
-    public HadoopJobMetadata(HadoopJobMetadata src) {
-        // Make sure to preserve alphabetic order.
-        counters = src.counters;
-        failCause = src.failCause;
-        jobId = src.jobId;
-        jobInfo = src.jobInfo;
-        mrPlan = src.mrPlan;
-        pendingSplits = src.pendingSplits;
-        pendingReducers = src.pendingReducers;
-        phase = src.phase;
-        reducersAddrs = src.reducersAddrs;
-        submitNodeId = src.submitNodeId;
-        ver = src.ver + 1;
-    }
-
-    /**
-     * @return Submit node ID.
-     */
-    public UUID submitNodeId() {
-        return submitNodeId;
-    }
-
-    /**
-     * @param phase Job phase.
-     */
-    public void phase(HadoopJobPhase phase) {
-        this.phase = phase;
-    }
-
-    /**
-     * @return Job phase.
-     */
-    public HadoopJobPhase phase() {
-        return phase;
-    }
-
-    /**
-     * Gets reducers addresses for external execution.
-     *
-     * @return Reducers addresses.
-     */
-    public Map<Integer, HadoopProcessDescriptor> reducersAddresses() {
-        return reducersAddrs;
-    }
-
-    /**
-     * Sets reducers addresses for external execution.
-     *
-     * @param reducersAddrs Map of addresses.
-     */
-    public void reducersAddresses(Map<Integer, HadoopProcessDescriptor> reducersAddrs) {
-        this.reducersAddrs = reducersAddrs;
-    }
-
-    /**
-     * Sets collection of pending splits.
-     *
-     * @param pendingSplits Collection of pending splits.
-     */
-    public void pendingSplits(Map<HadoopInputSplit, Integer> pendingSplits) {
-        this.pendingSplits = pendingSplits;
-    }
-
-    /**
-     * Gets collection of pending splits.
-     *
-     * @return Collection of pending splits.
-     */
-    public Map<HadoopInputSplit, Integer> pendingSplits() {
-        return pendingSplits;
-    }
-
-    /**
-     * Sets collection of pending reducers.
-     *
-     * @param pendingReducers Collection of pending reducers.
-     */
-    public void pendingReducers(Collection<Integer> pendingReducers) {
-        this.pendingReducers = pendingReducers;
-    }
-
-    /**
-     * Gets collection of pending reducers.
-     *
-     * @return Collection of pending reducers.
-     */
-    public Collection<Integer> pendingReducers() {
-        return pendingReducers;
-    }
-
-    /**
-     * @return Job ID.
-     */
-    public HadoopJobId jobId() {
-        return jobId;
-    }
-
-    /**
-     * @param mrPlan Map-reduce plan.
-     */
-    public void mapReducePlan(HadoopMapReducePlan mrPlan) {
-        assert this.mrPlan == null : "Map-reduce plan can only be initialized once.";
-
-        this.mrPlan = mrPlan;
-    }
-
-    /**
-     * @return Map-reduce plan.
-     */
-    public HadoopMapReducePlan mapReducePlan() {
-        return mrPlan;
-    }
-
-    /**
-     * @return Job info.
-     */
-    public HadoopJobInfo jobInfo() {
-        return jobInfo;
-    }
-
-    /**
-     * Returns job counters.
-     *
-     * @return Collection of counters.
-     */
-    public HadoopCounters counters() {
-        return counters;
-    }
-
-    /**
-     * Sets counters.
-     *
-     * @param counters Collection of counters.
-     */
-    public void counters(HadoopCounters counters) {
-        this.counters = counters;
-    }
-
-    /**
-     * @param failCause Fail cause.
-     */
-    public void failCause(Throwable failCause) {
-        assert failCause != null;
-
-        if (this.failCause == null) // Keep the first error.
-            this.failCause = failCause;
-    }
-
-    /**
-     * @return Fail cause.
-     */
-    public Throwable failCause() {
-        return failCause;
-    }
-
-    /**
-     * @return Version.
-     */
-    public long version() {
-        return ver;
-    }
-
-    /**
-     * @param split Split.
-     * @return Task number.
-     */
-    public int taskNumber(HadoopInputSplit split) {
-        return pendingSplits.get(split);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        U.writeUuid(out, submitNodeId);
-        out.writeObject(jobId);
-        out.writeObject(jobInfo);
-        out.writeObject(mrPlan);
-        out.writeObject(pendingSplits);
-        out.writeObject(pendingReducers);
-        out.writeObject(phase);
-        out.writeObject(failCause);
-        out.writeLong(ver);
-        out.writeObject(reducersAddrs);
-        out.writeObject(counters);
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        submitNodeId = U.readUuid(in);
-        jobId = (HadoopJobId)in.readObject();
-        jobInfo = (HadoopJobInfo)in.readObject();
-        mrPlan = (HadoopMapReducePlan)in.readObject();
-        pendingSplits = (Map<HadoopInputSplit,Integer>)in.readObject();
-        pendingReducers = (Collection<Integer>)in.readObject();
-        phase = (HadoopJobPhase)in.readObject();
-        failCause = (Throwable)in.readObject();
-        ver = in.readLong();
-        reducersAddrs = (Map<Integer, HadoopProcessDescriptor>)in.readObject();
-        counters = (HadoopCounters)in.readObject();
-    }
-
-    /** {@inheritDoc} */
-    public String toString() {
-        return S.toString(HadoopJobMetadata.class, this, "pendingMaps", pendingSplits.size(),
-            "pendingReduces", pendingReducers.size(), "failCause", failCause == null ? null :
-                failCause.getClass().getName());
-    }
-}
\ No newline at end of file