You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/03/03 14:37:37 UTC

[4/5] incubator-ignite git commit: # IGNITE-386: Moving classes (5).

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIpcIo.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIpcIo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIpcIo.java
deleted file mode 100644
index 47e5763..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIpcIo.java
+++ /dev/null
@@ -1,599 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.commons.logging.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.igfs.common.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.ipc.*;
-import org.apache.ignite.internal.util.ipc.shmem.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-import java.util.concurrent.locks.*;
-
-/**
- * IO layer implementation based on blocking IPC streams.
- */
-@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-public class HadoopIgfsIpcIo implements HadoopIgfsIo {
-    /** Logger. */
-    private Log log;
-
-    /** Request futures map. */
-    private ConcurrentMap<Long, HadoopIgfsFuture> reqMap =
-        new ConcurrentHashMap8<>();
-
-    /** Request ID counter. */
-    private AtomicLong reqIdCnt = new AtomicLong();
-
-    /** Endpoint. */
-    private IpcEndpoint endpoint;
-
-    /** Endpoint output stream. */
-    private IgfsDataOutputStream out;
-
-    /** Protocol. */
-    private final IgfsMarshaller marsh;
-
-    /** Client reader thread. */
-    private Thread reader;
-
-    /** Lock for graceful shutdown. */
-    private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
-
-    /** Stopping flag. */
-    private volatile boolean stopping;
-
-    /** Server endpoint address. */
-    private final String endpointAddr;
-
-    /** Number of open file system sessions. */
-    private final AtomicInteger activeCnt = new AtomicInteger(1);
-
-    /** Event listeners. */
-    private final Collection<HadoopIgfsIpcIoListener> lsnrs =
-        new GridConcurrentHashSet<>();
-
-    /** Cached connections. */
-    private static final ConcurrentMap<String, HadoopIgfsIpcIo> ipcCache =
-        new ConcurrentHashMap8<>();
-
-    /** Striped lock that prevents multiple instance creation in {@link #get(Log, String)}. */
-    private static final GridStripedLock initLock = new GridStripedLock(32);
-
-    /**
-     * @param endpointAddr Endpoint.
-     * @param marsh Protocol.
-     * @param log Logger to use.
-     */
-    public HadoopIgfsIpcIo(String endpointAddr, IgfsMarshaller marsh, Log log) {
-        assert endpointAddr != null;
-        assert marsh != null;
-
-        this.endpointAddr = endpointAddr;
-        this.marsh = marsh;
-        this.log = log;
-    }
-
-    /**
-     * Returns a started and valid instance of this class
-     * for a given endpoint.
-     *
-     * @param log Logger to use for new instance.
-     * @param endpoint Endpoint string.
-     * @return New or existing cached instance, which is started and operational.
-     * @throws IOException If new instance was created but failed to start.
-     */
-    public static HadoopIgfsIpcIo get(Log log, String endpoint) throws IOException {
-        while (true) {
-            HadoopIgfsIpcIo clientIo = ipcCache.get(endpoint);
-
-            if (clientIo != null) {
-                if (clientIo.acquire())
-                    return clientIo;
-                else
-                    // If concurrent close.
-                    ipcCache.remove(endpoint, clientIo);
-            }
-            else {
-                Lock lock = initLock.getLock(endpoint);
-
-                lock.lock();
-
-                try {
-                    clientIo = ipcCache.get(endpoint);
-
-                    if (clientIo != null) { // Perform double check.
-                        if (clientIo.acquire())
-                            return clientIo;
-                        else
-                            // If concurrent close.
-                            ipcCache.remove(endpoint, clientIo);
-                    }
-
-                    // Otherwise try creating a new one.
-                    clientIo = new HadoopIgfsIpcIo(endpoint, new IgfsMarshaller(), log);
-
-                    try {
-                        clientIo.start();
-                    }
-                    catch (IgniteCheckedException e) {
-                        throw new IOException(e.getMessage(), e);
-                    }
-
-                    HadoopIgfsIpcIo old = ipcCache.putIfAbsent(endpoint, clientIo);
-
-                    // Put in exclusive lock.
-                    assert old == null;
-
-                    return clientIo;
-                }
-                finally {
-                    lock.unlock();
-                }
-            }
-        }
-    }
-
-    /**
-     * Increases usage count for this instance.
-     *
-     * @return {@code true} if usage count is greater than zero.
-     */
-    private boolean acquire() {
-        while (true) {
-            int cnt = activeCnt.get();
-
-            if (cnt == 0) {
-                if (log.isDebugEnabled())
-                    log.debug("IPC IO not acquired (count was 0): " + this);
-
-                return false;
-            }
-
-            // Need to make sure that no-one decremented count in between.
-            if (activeCnt.compareAndSet(cnt, cnt + 1)) {
-                if (log.isDebugEnabled())
-                    log.debug("IPC IO acquired: " + this);
-
-                return true;
-            }
-        }
-    }
-
-    /**
-     * Releases this instance, decrementing usage count.
-     * <p>
-     * If usage count becomes zero, the instance is stopped
-     * and removed from cache.
-     */
-    public void release() {
-        while (true) {
-            int cnt = activeCnt.get();
-
-            if (cnt == 0) {
-                if (log.isDebugEnabled())
-                    log.debug("IPC IO not released (count was 0): " + this);
-
-                return;
-            }
-
-            if (activeCnt.compareAndSet(cnt, cnt - 1)) {
-                if (cnt == 1) {
-                    ipcCache.remove(endpointAddr, this);
-
-                    if (log.isDebugEnabled())
-                        log.debug("IPC IO stopping as unused: " + this);
-
-                    stop();
-                }
-                else if (log.isDebugEnabled())
-                    log.debug("IPC IO released: " + this);
-
-                return;
-            }
-        }
-    }
-
-    /**
-     * Closes this IO instance, removing it from cache.
-     */
-    public void forceClose() {
-        if (ipcCache.remove(endpointAddr, this))
-            stop();
-    }
-
-    /**
-     * Starts the IO.
-     *
-     * @throws IgniteCheckedException If failed to connect the endpoint.
-     */
-    private void start() throws IgniteCheckedException {
-        boolean success = false;
-
-        try {
-            endpoint = IpcEndpointFactory.connectEndpoint(
-                    endpointAddr, new GridLoggerProxy(new HadoopIgfsJclLogger(log), null, null, ""));
-
-            out = new IgfsDataOutputStream(new BufferedOutputStream(endpoint.outputStream()));
-
-            reader = new ReaderThread();
-
-            // Required for Hadoop 2.x
-            reader.setDaemon(true);
-
-            reader.start();
-
-            success = true;
-        }
-        catch (IgniteCheckedException e) {
-            IpcOutOfSystemResourcesException resEx = e.getCause(IpcOutOfSystemResourcesException.class);
-
-            if (resEx != null)
-                throw new IgniteCheckedException(IpcSharedMemoryServerEndpoint.OUT_OF_RESOURCES_MSG, resEx);
-
-            throw e;
-        }
-        finally {
-            if (!success)
-                stop();
-        }
-    }
-
-    /**
-     * Shuts down the IO. No send requests will be accepted anymore, all pending futures will be failed.
-     * Close listeners will be invoked as if connection is closed by server.
-     */
-    private void stop() {
-        close0(null);
-
-        if (reader != null) {
-            try {
-                U.interrupt(reader);
-                U.join(reader);
-
-                reader = null;
-            }
-            catch (IgniteInterruptedCheckedException ignored) {
-                Thread.currentThread().interrupt();
-
-                log.warn("Got interrupted while waiting for reader thread to shut down (will return).");
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void addEventListener(HadoopIgfsIpcIoListener lsnr) {
-        if (!busyLock.readLock().tryLock()) {
-            lsnr.onClose();
-
-            return;
-        }
-
-        boolean invokeNow = false;
-
-        try {
-            invokeNow = stopping;
-
-            if (!invokeNow)
-                lsnrs.add(lsnr);
-        }
-        finally {
-            busyLock.readLock().unlock();
-
-            if (invokeNow)
-                lsnr.onClose();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void removeEventListener(HadoopIgfsIpcIoListener lsnr) {
-        lsnrs.remove(lsnr);
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridPlainFuture<IgfsMessage> send(IgfsMessage msg) throws IgniteCheckedException {
-        return send(msg, null, 0, 0);
-    }
-
-    /** {@inheritDoc} */
-    @Override public <T> GridPlainFuture<T> send(IgfsMessage msg, @Nullable byte[] outBuf, int outOff,
-        int outLen) throws IgniteCheckedException {
-        assert outBuf == null || msg.command() == IgfsIpcCommand.READ_BLOCK;
-
-        if (!busyLock.readLock().tryLock())
-            throw new HadoopIgfsCommunicationException("Failed to send message (client is being concurrently " +
-                "closed).");
-
-        try {
-            if (stopping)
-                throw new HadoopIgfsCommunicationException("Failed to send message (client is being concurrently " +
-                    "closed).");
-
-            long reqId = reqIdCnt.getAndIncrement();
-
-            HadoopIgfsFuture<T> fut = new HadoopIgfsFuture<>();
-
-            fut.outputBuffer(outBuf);
-            fut.outputOffset(outOff);
-            fut.outputLength(outLen);
-            fut.read(msg.command() == IgfsIpcCommand.READ_BLOCK);
-
-            HadoopIgfsFuture oldFut = reqMap.putIfAbsent(reqId, fut);
-
-            assert oldFut == null;
-
-            if (log.isDebugEnabled())
-                log.debug("Sending IGFS message [reqId=" + reqId + ", msg=" + msg + ']');
-
-            byte[] hdr = IgfsMarshaller.createHeader(reqId, msg.command());
-
-            IgniteCheckedException err = null;
-
-            try {
-                synchronized (this) {
-                    marsh.marshall(msg, hdr, out);
-
-                    out.flush(); // Blocking operation + sometimes system call.
-                }
-            }
-            catch (IgniteCheckedException e) {
-                err = e;
-            }
-            catch (IOException e) {
-                err = new HadoopIgfsCommunicationException(e);
-            }
-
-            if (err != null) {
-                reqMap.remove(reqId, fut);
-
-                fut.onDone(err);
-            }
-
-            return fut;
-        }
-        finally {
-            busyLock.readLock().unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void sendPlain(IgfsMessage msg) throws IgniteCheckedException {
-        if (!busyLock.readLock().tryLock())
-            throw new HadoopIgfsCommunicationException("Failed to send message (client is being " +
-                "concurrently closed).");
-
-        try {
-            if (stopping)
-                throw new HadoopIgfsCommunicationException("Failed to send message (client is being concurrently closed).");
-
-            assert msg.command() == IgfsIpcCommand.WRITE_BLOCK;
-
-            IgfsStreamControlRequest req = (IgfsStreamControlRequest)msg;
-
-            byte[] hdr = IgfsMarshaller.createHeader(-1, IgfsIpcCommand.WRITE_BLOCK);
-
-            U.longToBytes(req.streamId(), hdr, 12);
-            U.intToBytes(req.length(), hdr, 20);
-
-            synchronized (this) {
-                out.write(hdr);
-                out.write(req.data(), (int)req.position(), req.length());
-
-                out.flush();
-            }
-        }
-        catch (IOException e) {
-            throw new HadoopIgfsCommunicationException(e);
-        }
-        finally {
-            busyLock.readLock().unlock();
-        }
-    }
-
-    /**
-     * Closes client but does not wait.
-     *
-     * @param err Error.
-     */
-    private void close0(@Nullable Throwable err) {
-        busyLock.writeLock().lock();
-
-        try {
-            if (stopping)
-                return;
-
-            stopping = true;
-        }
-        finally {
-            busyLock.writeLock().unlock();
-        }
-
-        if (err == null)
-            err = new IgniteCheckedException("Failed to perform request (connection was concurrently closed before response " +
-                "is received).");
-
-        // Clean up resources.
-        U.closeQuiet(out);
-
-        if (endpoint != null)
-            endpoint.close();
-
-        // Unwind futures. We can safely iterate here because no more futures will be added.
-        Iterator<HadoopIgfsFuture> it = reqMap.values().iterator();
-
-        while (it.hasNext()) {
-            HadoopIgfsFuture fut = it.next();
-
-            fut.onDone(err);
-
-            it.remove();
-        }
-
-        for (HadoopIgfsIpcIoListener lsnr : lsnrs)
-            lsnr.onClose();
-    }
-
-    /**
-     * Do not extend {@code GridThread} to minimize class dependencies.
-     */
-    private class ReaderThread extends Thread {
-        /** {@inheritDoc} */
-        @SuppressWarnings("unchecked")
-        @Override public void run() {
-            // Error to fail pending futures.
-            Throwable err = null;
-
-            try {
-                InputStream in = endpoint.inputStream();
-
-                IgfsDataInputStream dis = new IgfsDataInputStream(in);
-
-                byte[] hdr = new byte[IgfsMarshaller.HEADER_SIZE];
-                byte[] msgHdr = new byte[IgfsControlResponse.RES_HEADER_SIZE];
-
-                while (!Thread.currentThread().isInterrupted()) {
-                    dis.readFully(hdr);
-
-                    long reqId = U.bytesToLong(hdr, 0);
-
-                    // We don't wait for write responses, therefore reqId is -1.
-                    if (reqId == -1) {
-                        // We received a response which normally should not be sent. It must contain an error.
-                        dis.readFully(msgHdr);
-
-                        assert msgHdr[4] != 0;
-
-                        String errMsg = dis.readUTF();
-
-                        // Error code.
-                        dis.readInt();
-
-                        long streamId = dis.readLong();
-
-                        for (HadoopIgfsIpcIoListener lsnr : lsnrs)
-                            lsnr.onError(streamId, errMsg);
-                    }
-                    else {
-                        HadoopIgfsFuture<Object> fut = reqMap.remove(reqId);
-
-                        if (fut == null) {
-                            String msg = "Failed to read response from server: response closure is unavailable for " +
-                                "requestId (will close connection):" + reqId;
-
-                            log.warn(msg);
-
-                            err = new IgniteCheckedException(msg);
-
-                            break;
-                        }
-                        else {
-                            try {
-                                IgfsIpcCommand cmd = IgfsIpcCommand.valueOf(U.bytesToInt(hdr, 8));
-
-                                if (log.isDebugEnabled())
-                                    log.debug("Received IGFS response [reqId=" + reqId + ", cmd=" + cmd + ']');
-
-                                Object res = null;
-
-                                if (fut.read()) {
-                                    dis.readFully(msgHdr);
-
-                                    boolean hasErr = msgHdr[4] != 0;
-
-                                    if (hasErr) {
-                                        String errMsg = dis.readUTF();
-
-                                        // Error code.
-                                        Integer errCode = dis.readInt();
-
-                                        IgfsControlResponse.throwError(errCode, errMsg);
-                                    }
-
-                                    int blockLen = U.bytesToInt(msgHdr, 5);
-
-                                    int readLen = Math.min(blockLen, fut.outputLength());
-
-                                    if (readLen > 0) {
-                                        assert fut.outputBuffer() != null;
-
-                                        dis.readFully(fut.outputBuffer(), fut.outputOffset(), readLen);
-                                    }
-
-                                    if (readLen != blockLen) {
-                                        byte[] buf = new byte[blockLen - readLen];
-
-                                        dis.readFully(buf);
-
-                                        res = buf;
-                                    }
-                                }
-                                else
-                                    res = marsh.unmarshall(cmd, hdr, dis);
-
-                                fut.onDone(res);
-                            }
-                            catch (IgniteCheckedException e) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Failed to apply response closure (will fail request future): " +
-                                        e.getMessage());
-
-                                fut.onDone(e);
-
-                                err = e;
-                            }
-                        }
-                    }
-                }
-            }
-            catch (EOFException ignored) {
-                err = new IgniteCheckedException("Failed to read response from server (connection was closed by remote peer).");
-            }
-            catch (IOException e) {
-                if (!stopping)
-                    log.error("Failed to read data (connection will be closed)", e);
-
-                err = new HadoopIgfsCommunicationException(e);
-            }
-            catch (IgniteCheckedException e) {
-                if (!stopping)
-                    log.error("Failed to obtain endpoint input stream (connection will be closed)", e);
-
-                err = e;
-            }
-            finally {
-                close0(err);
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return getClass().getSimpleName() + " [endpointAddr=" + endpointAddr + ", activeCnt=" + activeCnt +
-            ", stopping=" + stopping + ']';
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIpcIoListener.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIpcIoListener.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIpcIoListener.java
deleted file mode 100644
index 049e2b7..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIpcIoListener.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-/**
- * Listens to the events of {@link HadoopIgfsIpcIo}.
- */
-public interface HadoopIgfsIpcIoListener {
-    /**
-     * Callback invoked when the IO is being closed.
-     */
-    public void onClose();
-
-    /**
-     * Callback invoked when remote error occurs.
-     *
-     * @param streamId Stream ID.
-     * @param errMsg Error message.
-     */
-    public void onError(long streamId, String errMsg);
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsJclLogger.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsJclLogger.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsJclLogger.java
deleted file mode 100644
index 1ba6e64..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsJclLogger.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.commons.logging.*;
-import org.apache.ignite.*;
-import org.jetbrains.annotations.*;
-
-/**
- * JCL logger wrapper for Hadoop.
- */
-public class HadoopIgfsJclLogger implements IgniteLogger {
-    /** JCL implementation proxy. */
-    private Log impl;
-
-    /**
-     * Constructor.
-     *
-     * @param impl JCL implementation to use.
-     */
-    HadoopIgfsJclLogger(Log impl) {
-        assert impl != null;
-
-        this.impl = impl;
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteLogger getLogger(Object ctgr) {
-        return new HadoopIgfsJclLogger(LogFactory.getLog(
-            ctgr instanceof Class ? ((Class)ctgr).getName() : String.valueOf(ctgr)));
-    }
-
-    /** {@inheritDoc} */
-    @Override public void trace(String msg) {
-        impl.trace(msg);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void debug(String msg) {
-        impl.debug(msg);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void info(String msg) {
-        impl.info(msg);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void warning(String msg) {
-        impl.warn(msg);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void warning(String msg, @Nullable Throwable e) {
-        impl.warn(msg, e);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void error(String msg) {
-        impl.error(msg);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isQuiet() {
-        return !isInfoEnabled() && !isDebugEnabled();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void error(String msg, @Nullable Throwable e) {
-        impl.error(msg, e);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isTraceEnabled() {
-        return impl.isTraceEnabled();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isDebugEnabled() {
-        return impl.isDebugEnabled();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isInfoEnabled() {
-        return impl.isInfoEnabled();
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public String fileName() {
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return "IgfsHadoopJclLogger [impl=" + impl + ']';
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsOutProc.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsOutProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsOutProc.java
deleted file mode 100644
index 4cfacb9..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsOutProc.java
+++ /dev/null
@@ -1,466 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.commons.logging.*;
-import org.apache.ignite.*;
-import org.apache.ignite.igfs.*;
-import org.apache.ignite.internal.igfs.common.*;
-import org.apache.ignite.internal.processors.igfs.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-
-import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.*;
-
-/**
- * Communication with external process (TCP or shmem).
- */
-public class HadoopIgfsOutProc implements HadoopIgfsEx, HadoopIgfsIpcIoListener {
-    /** Expected result is boolean. */
-    private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, Boolean> BOOL_RES = createClosure();
-
-    /** Expected result is boolean. */
-    private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, Long> LONG_RES = createClosure();
-
-    /** Expected result is {@code IgfsFile}. */
-    private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, IgfsFile> FILE_RES = createClosure();
-
-    /** Expected result is {@code IgfsHandshakeResponse} */
-    private static final GridPlainClosure<GridPlainFuture<IgfsMessage>,
-        IgfsHandshakeResponse> HANDSHAKE_RES = createClosure();
-
-    /** Expected result is {@code IgfsStatus} */
-    private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, IgfsStatus> STATUS_RES =
-        createClosure();
-
-    /** Expected result is {@code IgfsFile}. */
-    private static final GridPlainClosure<GridPlainFuture<IgfsMessage>,
-        IgfsInputStreamDescriptor> STREAM_DESCRIPTOR_RES = createClosure();
-
-    /** Expected result is {@code IgfsFile}. */
-    private static final GridPlainClosure<GridPlainFuture<IgfsMessage>,
-        Collection<IgfsFile>> FILE_COL_RES = createClosure();
-
-    /** Expected result is {@code IgfsFile}. */
-    private static final GridPlainClosure<GridPlainFuture<IgfsMessage>,
-        Collection<IgfsPath>> PATH_COL_RES = createClosure();
-
-    /** Expected result is {@code IgfsPathSummary}. */
-    private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, IgfsPathSummary> SUMMARY_RES =
-        createClosure();
-
-    /** Expected result is {@code IgfsFile}. */
-    private static final GridPlainClosure<GridPlainFuture<IgfsMessage>,
-        Collection<IgfsBlockLocation>> BLOCK_LOCATION_COL_RES = createClosure();
-
-    /** Grid name. */
-    private final String grid;
-
-    /** IGFS name. */
-    private final String igfs;
-
-    /** Client log. */
-    private final Log log;
-
-    /** Client IO. */
-    private final HadoopIgfsIpcIo io;
-
-    /** Event listeners. */
-    private final Map<Long, HadoopIgfsStreamEventListener> lsnrs = new ConcurrentHashMap8<>();
-
-    /**
-     * Constructor for TCP endpoint.
-     *
-     * @param host Host.
-     * @param port Port.
-     * @param grid Grid name.
-     * @param igfs IGFS name.
-     * @param log Client logger.
-     * @throws IOException If failed.
-     */
-    public HadoopIgfsOutProc(String host, int port, String grid, String igfs, Log log) throws IOException {
-        this(host, port, grid, igfs, false, log);
-    }
-
-    /**
-     * Constructor for shmem endpoint.
-     *
-     * @param port Port.
-     * @param grid Grid name.
-     * @param igfs IGFS name.
-     * @param log Client logger.
-     * @throws IOException If failed.
-     */
-    public HadoopIgfsOutProc(int port, String grid, String igfs, Log log) throws IOException {
-        this(null, port, grid, igfs, true, log);
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param host Host.
-     * @param port Port.
-     * @param grid Grid name.
-     * @param igfs IGFS name.
-     * @param shmem Shared memory flag.
-     * @param log Client logger.
-     * @throws IOException If failed.
-     */
-    private HadoopIgfsOutProc(String host, int port, String grid, String igfs, boolean shmem, Log log)
-        throws IOException {
-        assert host != null && !shmem || host == null && shmem :
-            "Invalid arguments [host=" + host + ", port=" + port + ", shmem=" + shmem + ']';
-
-        String endpoint = host != null ? host + ":" + port : "shmem:" + port;
-
-        this.grid = grid;
-        this.igfs = igfs;
-        this.log = log;
-
-        io = HadoopIgfsIpcIo.get(log, endpoint);
-
-        io.addEventListener(this);
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException {
-        final IgfsHandshakeRequest req = new IgfsHandshakeRequest();
-
-        req.gridName(grid);
-        req.igfsName(igfs);
-        req.logDirectory(logDir);
-
-        return io.send(req).chain(HANDSHAKE_RES).get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close(boolean force) {
-        assert io != null;
-
-        io.removeEventListener(this);
-
-        if (force)
-            io.forceClose();
-        else
-            io.release();
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsFile info(IgfsPath path) throws IgniteCheckedException {
-        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
-        msg.command(INFO);
-        msg.path(path);
-
-        return io.send(msg).chain(FILE_RES).get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
-        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
-        msg.command(UPDATE);
-        msg.path(path);
-        msg.properties(props);
-
-        return io.send(msg).chain(FILE_RES).get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException {
-        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
-        msg.command(SET_TIMES);
-        msg.path(path);
-        msg.accessTime(accessTime);
-        msg.modificationTime(modificationTime);
-
-        return io.send(msg).chain(BOOL_RES).get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException {
-        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
-        msg.command(RENAME);
-        msg.path(src);
-        msg.destinationPath(dest);
-
-        return io.send(msg).chain(BOOL_RES).get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException {
-        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
-        msg.command(DELETE);
-        msg.path(path);
-        msg.flag(recursive);
-
-        return io.send(msg).chain(BOOL_RES).get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len)
-        throws IgniteCheckedException {
-        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
-        msg.command(AFFINITY);
-        msg.path(path);
-        msg.start(start);
-        msg.length(len);
-
-        return io.send(msg).chain(BLOCK_LOCATION_COL_RES).get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException {
-        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
-        msg.command(PATH_SUMMARY);
-        msg.path(path);
-
-        return io.send(msg).chain(SUMMARY_RES).get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
-        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
-        msg.command(MAKE_DIRECTORIES);
-        msg.path(path);
-        msg.properties(props);
-
-        return io.send(msg).chain(BOOL_RES).get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException {
-        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
-        msg.command(LIST_FILES);
-        msg.path(path);
-
-        return io.send(msg).chain(FILE_COL_RES).get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException {
-        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
-        msg.command(LIST_PATHS);
-        msg.path(path);
-
-        return io.send(msg).chain(PATH_COL_RES).get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgfsStatus fsStatus() throws IgniteCheckedException {
-        return io.send(new IgfsStatusRequest()).chain(STATUS_RES).get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate open(IgfsPath path) throws IgniteCheckedException {
-        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
-        msg.command(OPEN_READ);
-        msg.path(path);
-        msg.flag(false);
-
-        IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get();
-
-        return new HadoopIgfsStreamDelegate(this, rmtDesc.streamId(), rmtDesc.length());
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate open(IgfsPath path,
-        int seqReadsBeforePrefetch) throws IgniteCheckedException {
-        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
-        msg.command(OPEN_READ);
-        msg.path(path);
-        msg.flag(true);
-        msg.sequentialReadsBeforePrefetch(seqReadsBeforePrefetch);
-
-        IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get();
-
-        return new HadoopIgfsStreamDelegate(this, rmtDesc.streamId(), rmtDesc.length());
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate,
-        int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException {
-        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
-        msg.command(OPEN_CREATE);
-        msg.path(path);
-        msg.flag(overwrite);
-        msg.colocate(colocate);
-        msg.properties(props);
-        msg.replication(replication);
-        msg.blockSize(blockSize);
-
-        Long streamId = io.send(msg).chain(LONG_RES).get();
-
-        return new HadoopIgfsStreamDelegate(this, streamId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create,
-        @Nullable Map<String, String> props) throws IgniteCheckedException {
-        final IgfsPathControlRequest msg = new IgfsPathControlRequest();
-
-        msg.command(OPEN_APPEND);
-        msg.path(path);
-        msg.flag(create);
-        msg.properties(props);
-
-        Long streamId = io.send(msg).chain(LONG_RES).get();
-
-        return new HadoopIgfsStreamDelegate(this, streamId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridPlainFuture<byte[]> readData(HadoopIgfsStreamDelegate desc, long pos, int len,
-        final @Nullable byte[] outBuf, final int outOff, final int outLen) {
-        assert len > 0;
-
-        final IgfsStreamControlRequest msg = new IgfsStreamControlRequest();
-
-        msg.command(READ_BLOCK);
-        msg.streamId((long) desc.target());
-        msg.position(pos);
-        msg.length(len);
-
-        try {
-            return io.send(msg, outBuf, outOff, outLen);
-        }
-        catch (IgniteCheckedException e) {
-            return new GridPlainFutureAdapter<>(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeData(HadoopIgfsStreamDelegate desc, byte[] data, int off, int len)
-        throws IOException {
-        final IgfsStreamControlRequest msg = new IgfsStreamControlRequest();
-
-        msg.command(WRITE_BLOCK);
-        msg.streamId((long) desc.target());
-        msg.data(data);
-        msg.position(off);
-        msg.length(len);
-
-        try {
-            io.sendPlain(msg);
-        }
-        catch (IgniteCheckedException e) {
-            throw HadoopIgfsUtils.cast(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void flush(HadoopIgfsStreamDelegate delegate) throws IOException {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public void closeStream(HadoopIgfsStreamDelegate desc) throws IOException {
-        final IgfsStreamControlRequest msg = new IgfsStreamControlRequest();
-
-        msg.command(CLOSE);
-        msg.streamId((long)desc.target());
-
-        try {
-            io.send(msg).chain(BOOL_RES).get();
-        }
-        catch (IgniteCheckedException e) {
-            throw HadoopIgfsUtils.cast(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void addEventListener(HadoopIgfsStreamDelegate desc,
-        HadoopIgfsStreamEventListener lsnr) {
-        long streamId = desc.target();
-
-        HadoopIgfsStreamEventListener lsnr0 = lsnrs.put(streamId, lsnr);
-
-        assert lsnr0 == null || lsnr0 == lsnr;
-
-        if (log.isDebugEnabled())
-            log.debug("Added stream event listener [streamId=" + streamId + ']');
-    }
-
-    /** {@inheritDoc} */
-    @Override public void removeEventListener(HadoopIgfsStreamDelegate desc) {
-        long streamId = desc.target();
-
-        HadoopIgfsStreamEventListener lsnr0 = lsnrs.remove(streamId);
-
-        if (lsnr0 != null && log.isDebugEnabled())
-            log.debug("Removed stream event listener [streamId=" + streamId + ']');
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onClose() {
-        for (HadoopIgfsStreamEventListener lsnr : lsnrs.values()) {
-            try {
-                lsnr.onClose();
-            }
-            catch (IgniteCheckedException e) {
-                log.warn("Got exception from stream event listener (will ignore): " + lsnr, e);
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onError(long streamId, String errMsg) {
-        HadoopIgfsStreamEventListener lsnr = lsnrs.get(streamId);
-
-        if (lsnr != null)
-            lsnr.onError(errMsg);
-        else
-            log.warn("Received write error response for not registered output stream (will ignore) " +
-                "[streamId= " + streamId + ']');
-    }
-
-    /**
-     * Creates conversion closure for given type.
-     *
-     * @param <T> Type of expected result.
-     * @return Conversion closure.
-     */
-    @SuppressWarnings("unchecked")
-    private static <T> GridPlainClosure<GridPlainFuture<IgfsMessage>, T> createClosure() {
-        return new GridPlainClosure<GridPlainFuture<IgfsMessage>, T>() {
-            @Override public T apply(GridPlainFuture<IgfsMessage> fut) throws IgniteCheckedException {
-                IgfsControlResponse res = (IgfsControlResponse)fut.get();
-
-                if (res.hasError())
-                    res.throwError();
-
-                return (T)res.response();
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsOutputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsOutputStream.java
deleted file mode 100644
index 9e08bdb..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsOutputStream.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.commons.logging.*;
-import org.apache.ignite.*;
-import org.apache.ignite.internal.igfs.common.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-
-/**
- * IGFS Hadoop output stream implementation.
- */
-public class HadoopIgfsOutputStream extends OutputStream implements HadoopIgfsStreamEventListener {
-    /** Log instance. */
-    private Log log;
-
-    /** Client logger. */
-    private IgfsLogger clientLog;
-
-    /** Log stream ID. */
-    private long logStreamId;
-
-    /** Server stream delegate. */
-    private HadoopIgfsStreamDelegate delegate;
-
-    /** Closed flag. */
-    private volatile boolean closed;
-
-    /** Flag set if stream was closed due to connection breakage. */
-    private boolean connBroken;
-
-    /** Error message. */
-    private volatile String errMsg;
-
-    /** Read time. */
-    private long writeTime;
-
-    /** User time. */
-    private long userTime;
-
-    /** Last timestamp. */
-    private long lastTs;
-
-    /** Amount of written bytes. */
-    private long total;
-
-    /**
-     * Creates light output stream.
-     *
-     * @param delegate Server stream delegate.
-     * @param log Logger to use.
-     * @param clientLog Client logger.
-     */
-    public HadoopIgfsOutputStream(HadoopIgfsStreamDelegate delegate, Log log,
-        IgfsLogger clientLog, long logStreamId) {
-        this.delegate = delegate;
-        this.log = log;
-        this.clientLog = clientLog;
-        this.logStreamId = logStreamId;
-
-        lastTs = System.nanoTime();
-
-        delegate.hadoop().addEventListener(delegate, this);
-    }
-
-    /**
-     * Read start.
-     */
-    private void writeStart() {
-        long now = System.nanoTime();
-
-        userTime += now - lastTs;
-
-        lastTs = now;
-    }
-
-    /**
-     * Read end.
-     */
-    private void writeEnd() {
-        long now = System.nanoTime();
-
-        writeTime += now - lastTs;
-
-        lastTs = now;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void write(@NotNull byte[] b, int off, int len) throws IOException {
-        check();
-
-        writeStart();
-
-        try {
-            delegate.hadoop().writeData(delegate, b, off, len);
-
-            total += len;
-        }
-        finally {
-            writeEnd();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void write(int b) throws IOException {
-        write(new byte[] {(byte)b});
-
-        total++;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void flush() throws IOException {
-        delegate.hadoop().flush(delegate);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close() throws IOException {
-        if (!closed) {
-            if (log.isDebugEnabled())
-                log.debug("Closing output stream: " + delegate);
-
-            writeStart();
-
-            delegate.hadoop().closeStream(delegate);
-
-            markClosed(false);
-
-            writeEnd();
-
-            if (clientLog.isLogEnabled())
-                clientLog.logCloseOut(logStreamId, userTime, writeTime, total);
-
-            if (log.isDebugEnabled())
-                log.debug("Closed output stream [delegate=" + delegate + ", writeTime=" + writeTime / 1000 +
-                    ", userTime=" + userTime / 1000 + ']');
-        }
-        else if(connBroken)
-            throw new IOException(
-                "Failed to close stream, because connection was broken (data could have been lost).");
-    }
-
-    /**
-     * Marks stream as closed.
-     *
-     * @param connBroken {@code True} if connection with server was lost.
-     */
-    private void markClosed(boolean connBroken) {
-        // It is ok to have race here.
-        if (!closed) {
-            closed = true;
-
-            delegate.hadoop().removeEventListener(delegate);
-
-            this.connBroken = connBroken;
-        }
-    }
-
-    /**
-     * @throws IOException If check failed.
-     */
-    private void check() throws IOException {
-        String errMsg0 = errMsg;
-
-        if (errMsg0 != null)
-            throw new IOException(errMsg0);
-
-        if (closed) {
-            if (connBroken)
-                throw new IOException("Server connection was lost.");
-            else
-                throw new IOException("Stream is closed.");
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onClose() throws IgniteCheckedException {
-        markClosed(true);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onError(String errMsg) {
-        this.errMsg = errMsg;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProperties.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProperties.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProperties.java
deleted file mode 100644
index 20c1d5d..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProperties.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.hadoop.fs.permission.*;
-import org.apache.ignite.*;
-
-import java.util.*;
-
-import static org.apache.ignite.IgniteFs.*;
-
-/**
- * Hadoop file system properties.
- */
-public class HadoopIgfsProperties {
-    /** Username. */
-    private String usrName;
-
-    /** Group name. */
-    private String grpName;
-
-    /** Permissions. */
-    private FsPermission perm;
-
-    /**
-     * Constructor.
-     *
-     * @param props Properties.
-     * @throws IgniteException In case of error.
-     */
-    public HadoopIgfsProperties(Map<String, String> props) throws IgniteException {
-        usrName = props.get(PROP_USER_NAME);
-        grpName = props.get(PROP_GROUP_NAME);
-
-        String permStr = props.get(PROP_PERMISSION);
-
-        if (permStr != null) {
-            try {
-                perm = new FsPermission((short)Integer.parseInt(permStr, 8));
-            }
-            catch (NumberFormatException ignore) {
-                throw new IgniteException("Permissions cannot be parsed: " + permStr);
-            }
-        }
-    }
-
-    /**
-     * Get user name.
-     *
-     * @return User name.
-     */
-    public String userName() {
-        return usrName;
-    }
-
-    /**
-     * Get group name.
-     *
-     * @return Group name.
-     */
-    public String groupName() {
-        return grpName;
-    }
-
-    /**
-     * Get permission.
-     *
-     * @return Permission.
-     */
-    public FsPermission permission() {
-        return perm;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProxyInputStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProxyInputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProxyInputStream.java
deleted file mode 100644
index cfc6949..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProxyInputStream.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.hadoop.fs.*;
-import org.apache.ignite.internal.igfs.common.*;
-
-import java.io.*;
-
-/**
- * Secondary Hadoop file system input stream wrapper.
- */
-public class HadoopIgfsProxyInputStream extends InputStream implements Seekable, PositionedReadable {
-    /** Actual input stream to the secondary file system. */
-    private final FSDataInputStream is;
-
-    /** Client logger. */
-    private final IgfsLogger clientLog;
-
-    /** Log stream ID. */
-    private final long logStreamId;
-
-    /** Read time. */
-    private long readTime;
-
-    /** User time. */
-    private long userTime;
-
-    /** Last timestamp. */
-    private long lastTs;
-
-    /** Amount of read bytes. */
-    private long total;
-
-    /** Closed flag. */
-    private boolean closed;
-
-    /**
-     * Constructor.
-     *
-     * @param is Actual input stream to the secondary file system.
-     * @param clientLog Client log.
-     */
-    public HadoopIgfsProxyInputStream(FSDataInputStream is, IgfsLogger clientLog, long logStreamId) {
-        assert is != null;
-        assert clientLog != null;
-
-        this.is = is;
-        this.clientLog = clientLog;
-        this.logStreamId = logStreamId;
-
-        lastTs = System.nanoTime();
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized int read(byte[] b) throws IOException {
-        readStart();
-
-        int res;
-
-        try {
-            res = is.read(b);
-        }
-        finally {
-            readEnd();
-        }
-
-        if (res != -1)
-            total += res;
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized int read(byte[] b, int off, int len) throws IOException {
-        readStart();
-
-        int res;
-
-        try {
-            res = super.read(b, off, len);
-        }
-        finally {
-            readEnd();
-        }
-
-        if (res != -1)
-            total += res;
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized long skip(long n) throws IOException {
-        readStart();
-
-        long res;
-
-        try {
-            res =  is.skip(n);
-        }
-        finally {
-            readEnd();
-        }
-
-        if (clientLog.isLogEnabled())
-            clientLog.logSkip(logStreamId, res);
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized int available() throws IOException {
-        readStart();
-
-        try {
-            return is.available();
-        }
-        finally {
-            readEnd();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void close() throws IOException {
-        if (!closed) {
-            closed = true;
-
-            readStart();
-
-            try {
-                is.close();
-            }
-            finally {
-                readEnd();
-            }
-
-            if (clientLog.isLogEnabled())
-                clientLog.logCloseIn(logStreamId, userTime, readTime, total);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void mark(int readLimit) {
-        readStart();
-
-        try {
-            is.mark(readLimit);
-        }
-        finally {
-            readEnd();
-        }
-
-        if (clientLog.isLogEnabled())
-            clientLog.logMark(logStreamId, readLimit);
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void reset() throws IOException {
-        readStart();
-
-        try {
-            is.reset();
-        }
-        finally {
-            readEnd();
-        }
-
-        if (clientLog.isLogEnabled())
-            clientLog.logReset(logStreamId);
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized boolean markSupported() {
-        readStart();
-
-        try {
-            return is.markSupported();
-        }
-        finally {
-            readEnd();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized int read() throws IOException {
-        readStart();
-
-        int res;
-
-        try {
-            res = is.read();
-        }
-        finally {
-            readEnd();
-        }
-
-        if (res != -1)
-            total++;
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException {
-        readStart();
-
-        int res;
-
-        try {
-            res = is.read(pos, buf, off, len);
-        }
-        finally {
-            readEnd();
-        }
-
-        if (res != -1)
-            total += res;
-
-        if (clientLog.isLogEnabled())
-            clientLog.logRandomRead(logStreamId, pos, res);
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void readFully(long pos, byte[] buf, int off, int len) throws IOException {
-        readStart();
-
-        try {
-            is.readFully(pos, buf, off, len);
-        }
-        finally {
-            readEnd();
-        }
-
-        total += len;
-
-        if (clientLog.isLogEnabled())
-            clientLog.logRandomRead(logStreamId, pos, len);
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void readFully(long pos, byte[] buf) throws IOException {
-        readStart();
-
-        try {
-            is.readFully(pos, buf);
-        }
-        finally {
-            readEnd();
-        }
-
-        total += buf.length;
-
-        if (clientLog.isLogEnabled())
-            clientLog.logRandomRead(logStreamId, pos, buf.length);
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void seek(long pos) throws IOException {
-        readStart();
-
-        try {
-            is.seek(pos);
-        }
-        finally {
-            readEnd();
-        }
-
-        if (clientLog.isLogEnabled())
-            clientLog.logSeek(logStreamId, pos);
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized long getPos() throws IOException {
-        readStart();
-
-        try {
-            return is.getPos();
-        }
-        finally {
-            readEnd();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized boolean seekToNewSource(long targetPos) throws IOException {
-        readStart();
-
-        try {
-            return is.seekToNewSource(targetPos);
-        }
-        finally {
-            readEnd();
-        }
-    }
-
-    /**
-     * Read start.
-     */
-    private void readStart() {
-        long now = System.nanoTime();
-
-        userTime += now - lastTs;
-
-        lastTs = now;
-    }
-
-    /**
-     * Read end.
-     */
-    private void readEnd() {
-        long now = System.nanoTime();
-
-        readTime += now - lastTs;
-
-        lastTs = now;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProxyOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProxyOutputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProxyOutputStream.java
deleted file mode 100644
index a7266c4..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProxyOutputStream.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.hadoop.fs.*;
-import org.apache.ignite.internal.igfs.common.*;
-
-import java.io.*;
-
-/**
- * Secondary Hadoop file system output stream wrapper.
- */
-public class HadoopIgfsProxyOutputStream extends OutputStream {
-    /** Actual output stream. */
-    private FSDataOutputStream os;
-
-    /** Client logger. */
-    private final IgfsLogger clientLog;
-
-    /** Log stream ID. */
-    private final long logStreamId;
-
-    /** Read time. */
-    private long writeTime;
-
-    /** User time. */
-    private long userTime;
-
-    /** Last timestamp. */
-    private long lastTs;
-
-    /** Amount of written bytes. */
-    private long total;
-
-    /** Closed flag. */
-    private boolean closed;
-
-    /**
-     * Constructor.
-     *
-     * @param os Actual output stream.
-     * @param clientLog Client logger.
-     * @param logStreamId Log stream ID.
-     */
-    public HadoopIgfsProxyOutputStream(FSDataOutputStream os, IgfsLogger clientLog, long logStreamId) {
-        assert os != null;
-        assert clientLog != null;
-
-        this.os = os;
-        this.clientLog = clientLog;
-        this.logStreamId = logStreamId;
-
-        lastTs = System.nanoTime();
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void write(int b) throws IOException {
-        writeStart();
-
-        try {
-            os.write(b);
-        }
-        finally {
-            writeEnd();
-        }
-
-        total++;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void write(byte[] b) throws IOException {
-        writeStart();
-
-        try {
-            os.write(b);
-        }
-        finally {
-            writeEnd();
-        }
-
-        total += b.length;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void write(byte[] b, int off, int len) throws IOException {
-        writeStart();
-
-        try {
-            os.write(b, off, len);
-        }
-        finally {
-            writeEnd();
-        }
-
-        total += len;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void flush() throws IOException {
-        writeStart();
-
-        try {
-            os.flush();
-        }
-        finally {
-            writeEnd();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void close() throws IOException {
-        if (!closed) {
-            closed = true;
-
-            writeStart();
-
-            try {
-                os.close();
-            }
-            finally {
-                writeEnd();
-            }
-
-            if (clientLog.isLogEnabled())
-                clientLog.logCloseOut(logStreamId, userTime, writeTime, total);
-        }
-    }
-
-    /**
-     * Read start.
-     */
-    private void writeStart() {
-        long now = System.nanoTime();
-
-        userTime += now - lastTs;
-
-        lastTs = now;
-    }
-
-    /**
-     * Read end.
-     */
-    private void writeEnd() {
-        long now = System.nanoTime();
-
-        writeTime += now - lastTs;
-
-        lastTs = now;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsReader.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsReader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsReader.java
deleted file mode 100644
index 17755dc..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsReader.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.ignite.igfs.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-
-/**
- * Secondary file system input stream wrapper which actually opens input stream only in case it is explicitly
- * requested.
- * <p>
- * The class is expected to be used only from synchronized context and therefore is not tread-safe.
- */
-public class HadoopIgfsReader implements IgfsReader {
-    /** Secondary file system. */
-    private final FileSystem fs;
-
-    /** Path to the file to open. */
-    private final Path path;
-
-    /** Buffer size. */
-    private final int bufSize;
-
-    /** Actual input stream. */
-    private FSDataInputStream in;
-
-    /** Cached error occurred during output stream open. */
-    private IOException err;
-
-    /** Flag indicating that the stream was already opened. */
-    private boolean opened;
-
-    /**
-     * Constructor.
-     *
-     * @param fs Secondary file system.
-     * @param path Path to the file to open.
-     * @param bufSize Buffer size.
-     */
-    public HadoopIgfsReader(FileSystem fs, Path path, int bufSize) {
-        assert fs != null;
-        assert path != null;
-
-        this.fs = fs;
-        this.path = path;
-        this.bufSize = bufSize;
-    }
-
-    /** Get input stream. */
-    private PositionedReadable in() throws IOException {
-        if (opened) {
-            if (err != null)
-                throw err;
-        }
-        else {
-            opened = true;
-
-            try {
-                in = fs.open(path, bufSize);
-
-                if (in == null)
-                    throw new IOException("Failed to open input stream (file system returned null): " + path);
-            }
-            catch (IOException e) {
-                err = e;
-
-                throw err;
-            }
-        }
-
-        return in;
-    }
-
-    /**
-     * Close wrapped input stream in case it was previously opened.
-     */
-    @Override public void close() {
-        U.closeQuiet(in);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int read(long pos, byte[] buf, int off, int len) throws IOException {
-        return in().read(pos, buf, off, len);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsStreamDelegate.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsStreamDelegate.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsStreamDelegate.java
deleted file mode 100644
index e64b761..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsStreamDelegate.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-/**
- * IGFS Hadoop stream descriptor.
- */
-public class HadoopIgfsStreamDelegate {
-    /** RPC handler. */
-    private final HadoopIgfsEx hadoop;
-
-    /** Target. */
-    private final Object target;
-
-    /** Optional stream length. */
-    private final long len;
-
-    /**
-     * Constructor.
-     *
-     * @param target Target.
-     */
-    public HadoopIgfsStreamDelegate(HadoopIgfsEx hadoop, Object target) {
-        this(hadoop, target, -1);
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param target Target.
-     * @param len Optional length.
-     */
-    public HadoopIgfsStreamDelegate(HadoopIgfsEx hadoop, Object target, long len) {
-        assert hadoop != null;
-        assert target != null;
-
-        this.hadoop = hadoop;
-        this.target = target;
-        this.len = len;
-    }
-
-    /**
-     * @return RPC handler.
-     */
-    public HadoopIgfsEx hadoop() {
-        return hadoop;
-    }
-
-    /**
-     * @return Stream target.
-     */
-    @SuppressWarnings("unchecked")
-    public <T> T target() {
-        return (T) target;
-    }
-
-    /**
-     * @return Length.
-     */
-    public long length() {
-        return len;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        return System.identityHashCode(target);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object obj) {
-        return obj != null && obj instanceof HadoopIgfsStreamDelegate &&
-            target == ((HadoopIgfsStreamDelegate)obj).target;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(HadoopIgfsStreamDelegate.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsStreamEventListener.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsStreamEventListener.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsStreamEventListener.java
deleted file mode 100644
index b3de523..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsStreamEventListener.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.ignite.*;
-
-/**
- * IGFS input stream event listener.
- */
-public interface HadoopIgfsStreamEventListener {
-    /**
-     * Callback invoked when the stream is being closed.
-     *
-     * @throws IgniteCheckedException If failed.
-     */
-    public void onClose() throws IgniteCheckedException;
-
-    /**
-     * Callback invoked when remote error occurs.
-     *
-     * @param errMsg Error message.
-     */
-    public void onError(String errMsg);
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c85f120/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsUtils.java
deleted file mode 100644
index 009443d..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsUtils.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.igfs.hadoop;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.ignite.*;
-import org.apache.ignite.igfs.*;
-import org.apache.ignite.internal.processors.igfs.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-
-/**
- * Utility constants and methods for IGFS Hadoop file system.
- */
-public class HadoopIgfsUtils {
-    /** Parameter name for endpoint no embed mode flag. */
-    public static final String PARAM_IGFS_ENDPOINT_NO_EMBED = "fs.igfs.%s.endpoint.no_embed";
-
-    /** Parameter name for endpoint no shared memory flag. */
-    public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_SHMEM = "fs.igfs.%s.endpoint.no_local_shmem";
-
-    /** Parameter name for endpoint no local TCP flag. */
-    public static final String PARAM_IGFS_ENDPOINT_NO_LOCAL_TCP = "fs.igfs.%s.endpoint.no_local_tcp";
-
-    /**
-     * Get string parameter.
-     *
-     * @param cfg Configuration.
-     * @param name Parameter name.
-     * @param authority Authority.
-     * @param dflt Default value.
-     * @return String value.
-     */
-    public static String parameter(Configuration cfg, String name, String authority, String dflt) {
-        return cfg.get(String.format(name, authority != null ? authority : ""), dflt);
-    }
-
-    /**
-     * Get integer parameter.
-     *
-     * @param cfg Configuration.
-     * @param name Parameter name.
-     * @param authority Authority.
-     * @param dflt Default value.
-     * @return Integer value.
-     * @throws IOException In case of parse exception.
-     */
-    public static int parameter(Configuration cfg, String name, String authority, int dflt) throws IOException {
-        String name0 = String.format(name, authority != null ? authority : "");
-
-        try {
-            return cfg.getInt(name0, dflt);
-        }
-        catch (NumberFormatException ignore) {
-            throw new IOException("Failed to parse parameter value to integer: " + name0);
-        }
-    }
-
-    /**
-     * Get boolean parameter.
-     *
-     * @param cfg Configuration.
-     * @param name Parameter name.
-     * @param authority Authority.
-     * @param dflt Default value.
-     * @return Boolean value.
-     */
-    public static boolean parameter(Configuration cfg, String name, String authority, boolean dflt) {
-        return cfg.getBoolean(String.format(name, authority != null ? authority : ""), dflt);
-    }
-
-    /**
-     * Cast Ignite exception to appropriate IO exception.
-     *
-     * @param e Exception to cast.
-     * @return Casted exception.
-     */
-    public static IOException cast(IgniteCheckedException e) {
-        return cast(e, null);
-    }
-
-    /**
-     * Cast Ignite exception to appropriate IO exception.
-     *
-     * @param e Exception to cast.
-     * @param path Path for exceptions.
-     * @return Casted exception.
-     */
-    @SuppressWarnings("unchecked")
-    public static IOException cast(IgniteCheckedException e, @Nullable String path) {
-        assert e != null;
-
-        // First check for any nested IOException; if exists - re-throw it.
-        if (e.hasCause(IOException.class))
-            return e.getCause(IOException.class);
-        else if (e.hasCause(IgfsFileNotFoundException.class))
-            return new FileNotFoundException(path); // TODO: Or PathNotFoundException?
-        else if (e.hasCause(IgfsParentNotDirectoryException.class))
-            return new ParentNotDirectoryException(path);
-        else if (path != null && e.hasCause(IgfsDirectoryNotEmptyException.class))
-            return new PathIsNotEmptyDirectoryException(path);
-        else if (path != null && e.hasCause(IgfsPathAlreadyExistsException.class))
-            return new PathExistsException(path);
-        else
-            return new IOException(e);
-    }
-
-    /**
-     * Constructor.
-     */
-    private HadoopIgfsUtils() {
-        // No-op.
-    }
-}