You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2016/09/14 10:53:36 UTC

[31/35] ignite git commit: IGNITE-3890: IGFS: Simplified IgfsInputStream hierarchy (2).

IGNITE-3890: IGFS: Simplified IgfsInputStream hierarchy (2).


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/409f043b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/409f043b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/409f043b

Branch: refs/heads/ignite-3199-1
Commit: 409f043b3e94f51aa23341b7283233a572be6cd2
Parents: 16c5a71
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Sep 14 11:01:33 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Sep 14 11:01:33 2016 +0300

----------------------------------------------------------------------
 .../internal/processors/igfs/IgfsAsyncImpl.java |   5 -
 .../internal/processors/igfs/IgfsContext.java   |  27 ++++
 .../processors/igfs/IgfsDataManager.java        |  19 +--
 .../ignite/internal/processors/igfs/IgfsEx.java |   8 --
 .../internal/processors/igfs/IgfsImpl.java      | 143 ++-----------------
 .../processors/igfs/IgfsInputStreamImpl.java    | 103 ++++++-------
 .../processors/igfs/IgfsOutputStreamImpl.java   |   8 +-
 .../internal/processors/igfs/IgfsMock.java      |   7 -
 8 files changed, 100 insertions(+), 220 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/409f043b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
index 743601e..106ef60 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
@@ -157,11 +157,6 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter<IgniteFileSystem> impleme
     }
 
     /** {@inheritDoc} */
-    @Override public IgfsLocalMetrics localMetrics() {
-        return igfs.localMetrics();
-    }
-
-    /** {@inheritDoc} */
     @Override public long groupBlockSize() {
         return igfs.groupBlockSize();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/409f043b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
index a638bf3..3e01246 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
@@ -54,6 +54,12 @@ public class IgfsContext {
     /** IGFS instance. */
     private final IgfsEx igfs;
 
+    /** Local metrics holder. */
+    private final IgfsLocalMetrics metrics = new IgfsLocalMetrics();
+
+    /** Local cluster node. */
+    private volatile ClusterNode locNode;
+
     /**
      * @param ctx Kernal context.
      * @param cfg IGFS configuration.
@@ -179,6 +185,27 @@ public class IgfsContext {
     }
 
     /**
+     * Get local metrics.
+     *
+     * @return Local metrics.
+     */
+    public IgfsLocalMetrics metrics() {
+        return metrics;
+    }
+
+    /**
+     * Get local node.
+     *
+     * @return Local node.
+     */
+    public ClusterNode localNode() {
+        if (locNode == null)
+            locNode = ctx.discovery().localNode();
+
+        return locNode;
+    }
+
+    /**
      * Adds manager to managers list.
      *
      * @param mgr Manager.

http://git-wip-us.apache.org/repos/asf/ignite/blob/409f043b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index 5e2c6b2..d2183f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -105,9 +105,6 @@ public class IgfsDataManager extends IgfsManager {
     /** */
     private CountDownLatch dataCacheStartLatch;
 
-    /** Local IGFS metrics. */
-    private IgfsLocalMetrics metrics;
-
     /** Group block size. */
     private long grpBlockSize;
 
@@ -201,8 +198,6 @@ public class IgfsDataManager extends IgfsManager {
 
         dataCache = (IgniteInternalCache)dataCachePrj;
 
-        metrics = igfsCtx.igfs().localMetrics();
-
         AffinityKeyMapper mapper = igfsCtx.kernalContext().cache()
             .internalCache(igfsCtx.configuration().getDataCacheName()).configuration().getAffinityMapper();
 
@@ -388,7 +383,7 @@ public class IgfsDataManager extends IgfsManager {
 
                                 putBlock(fileInfo.blockSize(), key, res);
 
-                                metrics.addReadBlocks(1, 1);
+                                igfsCtx.metrics().addReadBlocks(1, 1);
                             }
                             catch (IgniteCheckedException e) {
                                 rmtReadFut.onDone(e);
@@ -405,18 +400,18 @@ public class IgfsDataManager extends IgfsManager {
                             // Wait for existing future to finish and get it's result.
                             res = oldRmtReadFut.get();
 
-                            metrics.addReadBlocks(1, 0);
+                            igfsCtx.metrics().addReadBlocks(1, 0);
                         }
                     }
                     else
-                        metrics.addReadBlocks(1, 0);
+                        igfsCtx.metrics().addReadBlocks(1, 0);
 
                     return res;
                 }
             });
         }
         else
-            metrics.addReadBlocks(1, 0);
+            igfsCtx.metrics().addReadBlocks(1, 0);
 
         return fut;
     }
@@ -1308,7 +1303,7 @@ public class IgfsDataManager extends IgfsManager {
                         if (!nodeBlocks.isEmpty()) {
                             processBatch(id, node, nodeBlocks);
 
-                            metrics.addWriteBlocks(1, 0);
+                            igfsCtx.metrics().addWriteBlocks(1, 0);
                         }
 
                         return portion;
@@ -1350,7 +1345,7 @@ public class IgfsDataManager extends IgfsManager {
                 else
                     nodeBlocks.put(key, portion);
 
-                metrics.addWriteBlocks(writtenTotal, writtenSecondary);
+                igfsCtx.metrics().addWriteBlocks(writtenTotal, writtenSecondary);
 
                 written += portion.length;
             }
@@ -1359,7 +1354,7 @@ public class IgfsDataManager extends IgfsManager {
             if (!nodeBlocks.isEmpty()) {
                 processBatch(id, node, nodeBlocks);
 
-                metrics.addWriteBlocks(nodeBlocks.size(), 0);
+                igfsCtx.metrics().addWriteBlocks(nodeBlocks.size(), 0);
             }
 
             assert written == len;

http://git-wip-us.apache.org/repos/asf/ignite/blob/409f043b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
index 05e157d..c869695 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.igfs;
 
 import java.net.URI;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteFileSystem;
 import org.apache.ignite.igfs.IgfsPath;
 import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
@@ -75,13 +74,6 @@ public interface IgfsEx extends IgniteFileSystem {
     @Nullable public Boolean globalSampling();
 
     /**
-     * Get local metrics.
-     *
-     * @return Local metrics.
-     */
-    public IgfsLocalMetrics localMetrics();
-
-    /**
      * Gets group block size, i.e. block size multiplied by group size in affinity mapper.
      *
      * @return Group block size.

http://git-wip-us.apache.org/repos/asf/ignite/blob/409f043b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 2c1f0f3..45596a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -24,7 +24,6 @@ import org.apache.ignite.IgniteFileSystem;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.eviction.EvictionPolicy;
 import org.apache.ignite.cache.eviction.igfs.IgfsPerBlockLruEvictionPolicy;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.ComputeJobAdapter;
 import org.apache.ignite.compute.ComputeJobResult;
@@ -48,7 +47,6 @@ import org.apache.ignite.igfs.IgfsPathSummary;
 import org.apache.ignite.igfs.mapreduce.IgfsRecordResolver;
 import org.apache.ignite.igfs.mapreduce.IgfsTask;
 import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
-import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
@@ -72,7 +70,6 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.LT;
-import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteFuture;
@@ -83,7 +80,6 @@ import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
-import java.io.IOException;
 import java.io.OutputStream;
 import java.net.URI;
 import java.util.ArrayList;
@@ -97,12 +93,10 @@ import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_DELETED;
-import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_READ;
 import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_DELETED;
 import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_READ;
 import static org.apache.ignite.events.EventType.EVT_IGFS_META_UPDATED;
@@ -142,9 +136,6 @@ public final class IgfsImpl implements IgfsEx {
     /** Event storage manager. */
     private GridEventStorageManager evts;
 
-    /** Local node. */
-    private ClusterNode locNode;
-
     /** Logger. */
     private IgniteLogger log;
 
@@ -285,16 +276,6 @@ public final class IgfsImpl implements IgfsEx {
             new LinkedBlockingQueue<Runnable>(), new IgfsThreadFactory(cfg.getName()), null) : null;
     }
 
-    /**
-     * @return Local node.
-     */
-    private ClusterNode localNode() {
-        if (locNode == null)
-            locNode = igfsCtx.kernalContext().discovery().localNode();
-
-        return locNode;
-    }
-
     /** {@inheritDoc} */
     @Override public void stop(boolean cancel) {
         busyLock.block();
@@ -500,12 +481,6 @@ public final class IgfsImpl implements IgfsEx {
             }
         });
     }
-
-    /** {@inheritDoc} */
-    @Override public IgfsLocalMetrics localMetrics() {
-        return metrics;
-    }
-
     /** {@inheritDoc} */
     @Override public long groupBlockSize() {
         return data.groupBlockSize();
@@ -632,7 +607,7 @@ public final class IgfsImpl implements IgfsEx {
 
                             if (info != null) {
                                 if (evts.isRecordable(EVT_IGFS_META_UPDATED))
-                                    evts.record(new IgfsEvent(path, localNode(), EVT_IGFS_META_UPDATED, props));
+                                    evts.record(new IgfsEvent(path, igfsCtx.localNode(), EVT_IGFS_META_UPDATED, props));
 
                                 return new IgfsFileImpl(path, info, data.groupBlockSize());
                             }
@@ -979,8 +954,8 @@ public final class IgfsImpl implements IgfsEx {
 
                     IgfsSecondaryInputStreamDescriptor desc = meta.openDual(secondaryFs, path, bufSize0);
 
-                    IgfsEventAwareInputStream os = new IgfsEventAwareInputStream(igfsCtx, path, desc.info(),
-                        cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, desc.reader(), metrics);
+                    IgfsInputStreamImpl os = new IgfsInputStreamImpl(igfsCtx, path, desc.info(),
+                        cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, desc.reader());
 
                     IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ);
 
@@ -996,8 +971,8 @@ public final class IgfsImpl implements IgfsEx {
                     throw new IgfsPathIsDirectoryException("Failed to open file (not a file): " + path);
 
                 // Input stream to read data from grid cache with separate blocks.
-                IgfsEventAwareInputStream os = new IgfsEventAwareInputStream(igfsCtx, path, info,
-                    cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, null, metrics);
+                IgfsInputStreamImpl os = new IgfsInputStreamImpl(igfsCtx, path, info,
+                    cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, null);
 
                 IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ);
 
@@ -1266,6 +1241,8 @@ public final class IgfsImpl implements IgfsEx {
                     }
                 }
 
+                IgfsLocalMetrics metrics = igfsCtx.metrics();
+
                 return new IgfsMetricsAdapter(
                     igfsCtx.data().spaceSize(),
                     igfsCtx.data().maxSpaceSize(),
@@ -1288,7 +1265,7 @@ public final class IgfsImpl implements IgfsEx {
 
     /** {@inheritDoc} */
     @Override public void resetMetrics() {
-        metrics.reset();
+        igfsCtx.metrics().reset();
     }
 
     /** {@inheritDoc} */
@@ -1592,110 +1569,6 @@ public final class IgfsImpl implements IgfsEx {
         throw new IllegalStateException("Asynchronous mode is not enabled.");
     }
 
-    /** Detailed file descriptor. */
-    private static final class FileDescriptor {
-        /** Parent file ID. */
-        @Nullable
-        private final IgniteUuid parentId;
-
-        /** File name. */
-        private final String fileName;
-
-        /** File ID. */
-        private final IgniteUuid fileId;
-
-        /** File is plain data file or directory. */
-        private final boolean isFile;
-
-        /**
-         * Constructs detailed file descriptor.
-         *
-         * @param parentId Parent file ID.
-         * @param fileName File name.
-         * @param fileId File ID.
-         * @param isFile {@code True} if file.
-         */
-        private FileDescriptor(@Nullable IgniteUuid parentId, String fileName, IgniteUuid fileId, boolean isFile) {
-            assert fileName != null;
-
-            this.parentId = parentId;
-            this.fileName = fileName;
-
-            this.fileId = fileId;
-            this.isFile = isFile;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            int res = parentId != null ? parentId.hashCode() : 0;
-
-            res = 31 * res + fileName.hashCode();
-            res = 31 * res + fileId.hashCode();
-            res = 31 * res + (isFile ? 1231 : 1237);
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (o == this)
-                return true;
-
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            FileDescriptor that = (FileDescriptor)o;
-
-            return fileId.equals(that.fileId) && isFile == that.isFile && fileName.equals(that.fileName) &&
-                (parentId == null ? that.parentId == null : parentId.equals(that.parentId));
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(FileDescriptor.class, this);
-        }
-    }
-
-    /**
-     * IGFS input stream extension that fires events.
-     */
-    private class IgfsEventAwareInputStream extends IgfsInputStreamImpl {
-        /** Close guard. */
-        private final AtomicBoolean closeGuard = new AtomicBoolean(false);
-
-        /**
-         * Constructor.
-         *
-         * @param igfsCtx IGFS context.
-         * @param path Path to stored file.
-         * @param fileInfo File info.
-         * @param prefetchBlocks Prefetch blocks.
-         * @param seqReadsBeforePrefetch Amount of sequential reads before prefetch is triggered.
-         * @param secReader Optional secondary file system reader.
-         * @param metrics Metrics.
-         */
-        IgfsEventAwareInputStream(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo fileInfo,
-            int prefetchBlocks, int seqReadsBeforePrefetch, @Nullable IgfsSecondaryFileSystemPositionedReadable secReader,
-            IgfsLocalMetrics metrics) {
-            super(igfsCtx, path, fileInfo, prefetchBlocks, seqReadsBeforePrefetch, secReader, metrics);
-
-            metrics.incrementFilesOpenedForRead();
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("NonSynchronizedMethodOverridesSynchronizedMethod")
-        @Override public void close() throws IOException {
-            if (closeGuard.compareAndSet(false, true)) {
-                super.close();
-
-                metrics.decrementFilesOpenedForRead();
-
-                if (evts.isRecordable(EVT_IGFS_FILE_CLOSED_READ))
-                    evts.record(new IgfsEvent(path, localNode(), EVT_IGFS_FILE_CLOSED_READ, bytes()));
-            }
-        }
-    }
-
     /**
      * Space calculation task.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/409f043b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
index f20a423..2f9f2fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
@@ -19,12 +19,14 @@ package org.apache.ignite.internal.processors.igfs;
 
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.events.IgfsEvent;
 import org.apache.ignite.igfs.IgfsCorruptedFileException;
 import org.apache.ignite.igfs.IgfsInputStream;
 import org.apache.ignite.igfs.IgfsPath;
 import org.apache.ignite.igfs.IgfsPathNotFoundException;
 import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -43,6 +45,8 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_READ;
+
 /**
  * Input stream to read data from grid cache with separate blocks.
  */
@@ -50,11 +54,8 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
     /** Empty chunks result. */
     private static final byte[][] EMPTY_CHUNKS = new byte[0][];
 
-    /** Meta manager. */
-    private final IgfsMetaManager meta;
-
-    /** Data manager. */
-    private final IgfsDataManager data;
+    /** IGFS context. */
+    private final IgfsContext igfsCtx;
 
     /** Secondary file system reader. */
     @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
@@ -108,9 +109,6 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
     /** Time consumed on reading. */
     private long time;
 
-    /** Local IGFS metrics. */
-    private final IgfsLocalMetrics metrics;
-
     /**
      * Constructs file output stream.
      *
@@ -120,24 +118,19 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
      * @param prefetchBlocks Number of blocks to prefetch.
      * @param seqReadsBeforePrefetch Amount of sequential reads before prefetch is triggered.
      * @param secReader Optional secondary file system reader.
-     * @param metrics Local IGFS metrics.
      */
     IgfsInputStreamImpl(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo fileInfo, int prefetchBlocks,
-        int seqReadsBeforePrefetch, @Nullable IgfsSecondaryFileSystemPositionedReadable secReader, IgfsLocalMetrics metrics) {
+        int seqReadsBeforePrefetch, @Nullable IgfsSecondaryFileSystemPositionedReadable secReader) {
         assert igfsCtx != null;
         assert path != null;
         assert fileInfo != null;
-        assert metrics != null;
 
+        this.igfsCtx = igfsCtx;
         this.path = path;
         this.fileInfo = fileInfo;
         this.prefetchBlocks = prefetchBlocks;
         this.seqReadsBeforePrefetch = seqReadsBeforePrefetch;
         this.secReader = secReader;
-        this.metrics = metrics;
-
-        meta = igfsCtx.meta();
-        data = igfsCtx.data();
 
         log = igfsCtx.kernalContext().log(IgfsInputStream.class);
 
@@ -146,6 +139,8 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
         locCache = new LinkedHashMap<>(maxLocCacheSize, 1.0f);
 
         pendingFuts = new GridConcurrentHashSet<>(prefetchBlocks > 0 ? prefetchBlocks : 1);
+
+        igfsCtx.metrics().incrementFilesOpenedForRead();
     }
 
     /**
@@ -295,46 +290,56 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
 
     /** {@inheritDoc} */
     @Override public synchronized void close() throws IOException {
-        try {
-            if (secReader != null) {
-                // Close secondary input stream.
-                secReader.close();
-
-                // Ensuring local cache futures completion.
-                for (IgniteInternalFuture<byte[]> fut : locCache.values()) {
-                    try {
-                        fut.get();
-                    }
-                    catch (IgniteCheckedException ignore) {
-                        // No-op.
+        if (!closed) {
+            try {
+                if (secReader != null) {
+                    // Close secondary input stream.
+                    secReader.close();
+
+                    // Ensuring local cache futures completion.
+                    for (IgniteInternalFuture<byte[]> fut : locCache.values()) {
+                        try {
+                            fut.get();
+                        }
+                        catch (IgniteCheckedException ignore) {
+                            // No-op.
+                        }
                     }
-                }
 
-                // Ensuring pending evicted futures completion.
-                while (!pendingFuts.isEmpty()) {
-                    pendingFutsLock.lock();
+                    // Ensuring pending evicted futures completion.
+                    while (!pendingFuts.isEmpty()) {
+                        pendingFutsLock.lock();
 
-                    try {
-                        pendingFutsCond.await(100, TimeUnit.MILLISECONDS);
-                    }
-                    catch (InterruptedException ignore) {
-                        // No-op.
-                    }
-                    finally {
-                        pendingFutsLock.unlock();
+                        try {
+                            pendingFutsCond.await(100, TimeUnit.MILLISECONDS);
+                        }
+                        catch (InterruptedException ignore) {
+                            // No-op.
+                        }
+                        finally {
+                            pendingFutsLock.unlock();
+                        }
                     }
                 }
             }
-        }
-        catch (Exception e) {
-            throw new IOException("File to close the file: " + path, e);
-        }
-        finally {
-            closed = true;
+            catch (Exception e) {
+                throw new IOException("File to close the file: " + path, e);
+            }
+            finally {
+                closed = true;
+
+                IgfsLocalMetrics metrics = igfsCtx.metrics();
+
+                metrics.addReadBytesTime(bytes, time);
+                metrics.decrementFilesOpenedForRead();
 
-            metrics.addReadBytesTime(bytes, time);
+                locCache.clear();
 
-            locCache.clear();
+                GridEventStorageManager evts = igfsCtx.kernalContext().event();
+
+                if (evts.isRecordable(EVT_IGFS_FILE_CLOSED_READ))
+                    evts.record(new IgfsEvent(path, igfsCtx.localNode(), EVT_IGFS_FILE_CLOSED_READ, bytes()));
+            }
         }
     }
 
@@ -408,7 +413,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
 
                 // This failure may be caused by file being fragmented.
                 if (fileInfo.fileMap() != null && !fileInfo.fileMap().ranges().isEmpty()) {
-                    IgfsEntryInfo newInfo = meta.info(fileInfo.id());
+                    IgfsEntryInfo newInfo = igfsCtx.meta().info(fileInfo.id());
 
                     // File was deleted.
                     if (newInfo == null)
@@ -540,7 +545,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
      */
     @Nullable protected IgniteInternalFuture<byte[]> dataBlock(IgfsEntryInfo fileInfo, long blockIdx)
         throws IgniteCheckedException {
-        return data.dataBlock(fileInfo, path, blockIdx, secReader);
+        return igfsCtx.data().dataBlock(fileInfo, path, blockIdx, secReader);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/409f043b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index bbff93b..6dec0c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -127,7 +127,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
             writeFut = igfsCtx.data().writeStart(fileInfo.id());
         }
 
-        igfsCtx.igfs().localMetrics().incrementFilesOpenedForWrite();
+        igfsCtx.metrics().incrementFilesOpenedForWrite();
     }
 
     /** {@inheritDoc} */
@@ -355,8 +355,8 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
             if (err != null)
                 throw err;
 
-            igfsCtx.igfs().localMetrics().addWrittenBytesTime(bytes, time);
-            igfsCtx.igfs().localMetrics().decrementFilesOpenedForWrite();
+            igfsCtx.metrics().addWrittenBytesTime(bytes, time);
+            igfsCtx.metrics().decrementFilesOpenedForWrite();
 
             GridEventStorageManager evts = igfsCtx.kernalContext().event();
 
@@ -396,7 +396,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
     /**
      * Send local buffer if at least something is stored there.
      *
-     * @throws IOException
+     * @throws IOException If failed.
      */
     private void sendBufferIfNotEmpty() throws IOException {
         if (buf != null && buf.position() > 0)

http://git-wip-us.apache.org/repos/asf/ignite/blob/409f043b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
index 2b989c5..04c67dc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
@@ -117,13 +117,6 @@ public class IgfsMock implements IgfsEx {
     }
 
     /** {@inheritDoc} */
-    @Override public IgfsLocalMetrics localMetrics() {
-        throwUnsupported();
-
-        return null;
-    }
-
-    /** {@inheritDoc} */
     @Override public long groupBlockSize() {
         throwUnsupported();