You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2016/09/16 05:49:56 UTC
[42/50] [abbrv] ignite git commit: IGNITE-3890: IGFS: Simplified
IgfsInputStream hierarchy (2).
IGNITE-3890: IGFS: Simplified IgfsInputStream hierarchy (2).
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/409f043b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/409f043b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/409f043b
Branch: refs/heads/ignite-3443
Commit: 409f043b3e94f51aa23341b7283233a572be6cd2
Parents: 16c5a71
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Sep 14 11:01:33 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Sep 14 11:01:33 2016 +0300
----------------------------------------------------------------------
.../internal/processors/igfs/IgfsAsyncImpl.java | 5 -
.../internal/processors/igfs/IgfsContext.java | 27 ++++
.../processors/igfs/IgfsDataManager.java | 19 +--
.../ignite/internal/processors/igfs/IgfsEx.java | 8 --
.../internal/processors/igfs/IgfsImpl.java | 143 ++-----------------
.../processors/igfs/IgfsInputStreamImpl.java | 103 ++++++-------
.../processors/igfs/IgfsOutputStreamImpl.java | 8 +-
.../internal/processors/igfs/IgfsMock.java | 7 -
8 files changed, 100 insertions(+), 220 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/409f043b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
index 743601e..106ef60 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
@@ -157,11 +157,6 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter<IgniteFileSystem> impleme
}
/** {@inheritDoc} */
- @Override public IgfsLocalMetrics localMetrics() {
- return igfs.localMetrics();
- }
-
- /** {@inheritDoc} */
@Override public long groupBlockSize() {
return igfs.groupBlockSize();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/409f043b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
index a638bf3..3e01246 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
@@ -54,6 +54,12 @@ public class IgfsContext {
/** IGFS instance. */
private final IgfsEx igfs;
+ /** Local metrics holder. */
+ private final IgfsLocalMetrics metrics = new IgfsLocalMetrics();
+
+ /** Local cluster node. */
+ private volatile ClusterNode locNode;
+
/**
* @param ctx Kernal context.
* @param cfg IGFS configuration.
@@ -179,6 +185,27 @@ public class IgfsContext {
}
/**
+ * Get local metrics.
+ *
+ * @return Local metrics.
+ */
+ public IgfsLocalMetrics metrics() {
+ return metrics;
+ }
+
+ /**
+ * Get local node.
+ *
+ * @return Local node.
+ */
+ public ClusterNode localNode() {
+ if (locNode == null)
+ locNode = ctx.discovery().localNode();
+
+ return locNode;
+ }
+
+ /**
* Adds manager to managers list.
*
* @param mgr Manager.
http://git-wip-us.apache.org/repos/asf/ignite/blob/409f043b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index 5e2c6b2..d2183f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -105,9 +105,6 @@ public class IgfsDataManager extends IgfsManager {
/** */
private CountDownLatch dataCacheStartLatch;
- /** Local IGFS metrics. */
- private IgfsLocalMetrics metrics;
-
/** Group block size. */
private long grpBlockSize;
@@ -201,8 +198,6 @@ public class IgfsDataManager extends IgfsManager {
dataCache = (IgniteInternalCache)dataCachePrj;
- metrics = igfsCtx.igfs().localMetrics();
-
AffinityKeyMapper mapper = igfsCtx.kernalContext().cache()
.internalCache(igfsCtx.configuration().getDataCacheName()).configuration().getAffinityMapper();
@@ -388,7 +383,7 @@ public class IgfsDataManager extends IgfsManager {
putBlock(fileInfo.blockSize(), key, res);
- metrics.addReadBlocks(1, 1);
+ igfsCtx.metrics().addReadBlocks(1, 1);
}
catch (IgniteCheckedException e) {
rmtReadFut.onDone(e);
@@ -405,18 +400,18 @@ public class IgfsDataManager extends IgfsManager {
// Wait for existing future to finish and get it's result.
res = oldRmtReadFut.get();
- metrics.addReadBlocks(1, 0);
+ igfsCtx.metrics().addReadBlocks(1, 0);
}
}
else
- metrics.addReadBlocks(1, 0);
+ igfsCtx.metrics().addReadBlocks(1, 0);
return res;
}
});
}
else
- metrics.addReadBlocks(1, 0);
+ igfsCtx.metrics().addReadBlocks(1, 0);
return fut;
}
@@ -1308,7 +1303,7 @@ public class IgfsDataManager extends IgfsManager {
if (!nodeBlocks.isEmpty()) {
processBatch(id, node, nodeBlocks);
- metrics.addWriteBlocks(1, 0);
+ igfsCtx.metrics().addWriteBlocks(1, 0);
}
return portion;
@@ -1350,7 +1345,7 @@ public class IgfsDataManager extends IgfsManager {
else
nodeBlocks.put(key, portion);
- metrics.addWriteBlocks(writtenTotal, writtenSecondary);
+ igfsCtx.metrics().addWriteBlocks(writtenTotal, writtenSecondary);
written += portion.length;
}
@@ -1359,7 +1354,7 @@ public class IgfsDataManager extends IgfsManager {
if (!nodeBlocks.isEmpty()) {
processBatch(id, node, nodeBlocks);
- metrics.addWriteBlocks(nodeBlocks.size(), 0);
+ igfsCtx.metrics().addWriteBlocks(nodeBlocks.size(), 0);
}
assert written == len;
http://git-wip-us.apache.org/repos/asf/ignite/blob/409f043b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
index 05e157d..c869695 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.igfs;
import java.net.URI;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteFileSystem;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
@@ -75,13 +74,6 @@ public interface IgfsEx extends IgniteFileSystem {
@Nullable public Boolean globalSampling();
/**
- * Get local metrics.
- *
- * @return Local metrics.
- */
- public IgfsLocalMetrics localMetrics();
-
- /**
* Gets group block size, i.e. block size multiplied by group size in affinity mapper.
*
* @return Group block size.
http://git-wip-us.apache.org/repos/asf/ignite/blob/409f043b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 2c1f0f3..45596a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -24,7 +24,6 @@ import org.apache.ignite.IgniteFileSystem;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.eviction.EvictionPolicy;
import org.apache.ignite.cache.eviction.igfs.IgfsPerBlockLruEvictionPolicy;
-import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeJobResult;
@@ -48,7 +47,6 @@ import org.apache.ignite.igfs.IgfsPathSummary;
import org.apache.ignite.igfs.mapreduce.IgfsRecordResolver;
import org.apache.ignite.igfs.mapreduce.IgfsTask;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
-import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
@@ -72,7 +70,6 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.LT;
-import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
@@ -83,7 +80,6 @@ import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
-import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.ArrayList;
@@ -97,12 +93,10 @@ import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_DELETED;
-import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_READ;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_DELETED;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_READ;
import static org.apache.ignite.events.EventType.EVT_IGFS_META_UPDATED;
@@ -142,9 +136,6 @@ public final class IgfsImpl implements IgfsEx {
/** Event storage manager. */
private GridEventStorageManager evts;
- /** Local node. */
- private ClusterNode locNode;
-
/** Logger. */
private IgniteLogger log;
@@ -285,16 +276,6 @@ public final class IgfsImpl implements IgfsEx {
new LinkedBlockingQueue<Runnable>(), new IgfsThreadFactory(cfg.getName()), null) : null;
}
- /**
- * @return Local node.
- */
- private ClusterNode localNode() {
- if (locNode == null)
- locNode = igfsCtx.kernalContext().discovery().localNode();
-
- return locNode;
- }
-
/** {@inheritDoc} */
@Override public void stop(boolean cancel) {
busyLock.block();
@@ -500,12 +481,6 @@ public final class IgfsImpl implements IgfsEx {
}
});
}
-
- /** {@inheritDoc} */
- @Override public IgfsLocalMetrics localMetrics() {
- return metrics;
- }
-
/** {@inheritDoc} */
@Override public long groupBlockSize() {
return data.groupBlockSize();
@@ -632,7 +607,7 @@ public final class IgfsImpl implements IgfsEx {
if (info != null) {
if (evts.isRecordable(EVT_IGFS_META_UPDATED))
- evts.record(new IgfsEvent(path, localNode(), EVT_IGFS_META_UPDATED, props));
+ evts.record(new IgfsEvent(path, igfsCtx.localNode(), EVT_IGFS_META_UPDATED, props));
return new IgfsFileImpl(path, info, data.groupBlockSize());
}
@@ -979,8 +954,8 @@ public final class IgfsImpl implements IgfsEx {
IgfsSecondaryInputStreamDescriptor desc = meta.openDual(secondaryFs, path, bufSize0);
- IgfsEventAwareInputStream os = new IgfsEventAwareInputStream(igfsCtx, path, desc.info(),
- cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, desc.reader(), metrics);
+ IgfsInputStreamImpl os = new IgfsInputStreamImpl(igfsCtx, path, desc.info(),
+ cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, desc.reader());
IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ);
@@ -996,8 +971,8 @@ public final class IgfsImpl implements IgfsEx {
throw new IgfsPathIsDirectoryException("Failed to open file (not a file): " + path);
// Input stream to read data from grid cache with separate blocks.
- IgfsEventAwareInputStream os = new IgfsEventAwareInputStream(igfsCtx, path, info,
- cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, null, metrics);
+ IgfsInputStreamImpl os = new IgfsInputStreamImpl(igfsCtx, path, info,
+ cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, null);
IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ);
@@ -1266,6 +1241,8 @@ public final class IgfsImpl implements IgfsEx {
}
}
+ IgfsLocalMetrics metrics = igfsCtx.metrics();
+
return new IgfsMetricsAdapter(
igfsCtx.data().spaceSize(),
igfsCtx.data().maxSpaceSize(),
@@ -1288,7 +1265,7 @@ public final class IgfsImpl implements IgfsEx {
/** {@inheritDoc} */
@Override public void resetMetrics() {
- metrics.reset();
+ igfsCtx.metrics().reset();
}
/** {@inheritDoc} */
@@ -1592,110 +1569,6 @@ public final class IgfsImpl implements IgfsEx {
throw new IllegalStateException("Asynchronous mode is not enabled.");
}
- /** Detailed file descriptor. */
- private static final class FileDescriptor {
- /** Parent file ID. */
- @Nullable
- private final IgniteUuid parentId;
-
- /** File name. */
- private final String fileName;
-
- /** File ID. */
- private final IgniteUuid fileId;
-
- /** File is plain data file or directory. */
- private final boolean isFile;
-
- /**
- * Constructs detailed file descriptor.
- *
- * @param parentId Parent file ID.
- * @param fileName File name.
- * @param fileId File ID.
- * @param isFile {@code True} if file.
- */
- private FileDescriptor(@Nullable IgniteUuid parentId, String fileName, IgniteUuid fileId, boolean isFile) {
- assert fileName != null;
-
- this.parentId = parentId;
- this.fileName = fileName;
-
- this.fileId = fileId;
- this.isFile = isFile;
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- int res = parentId != null ? parentId.hashCode() : 0;
-
- res = 31 * res + fileName.hashCode();
- res = 31 * res + fileId.hashCode();
- res = 31 * res + (isFile ? 1231 : 1237);
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (o == this)
- return true;
-
- if (o == null || getClass() != o.getClass())
- return false;
-
- FileDescriptor that = (FileDescriptor)o;
-
- return fileId.equals(that.fileId) && isFile == that.isFile && fileName.equals(that.fileName) &&
- (parentId == null ? that.parentId == null : parentId.equals(that.parentId));
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(FileDescriptor.class, this);
- }
- }
-
- /**
- * IGFS input stream extension that fires events.
- */
- private class IgfsEventAwareInputStream extends IgfsInputStreamImpl {
- /** Close guard. */
- private final AtomicBoolean closeGuard = new AtomicBoolean(false);
-
- /**
- * Constructor.
- *
- * @param igfsCtx IGFS context.
- * @param path Path to stored file.
- * @param fileInfo File info.
- * @param prefetchBlocks Prefetch blocks.
- * @param seqReadsBeforePrefetch Amount of sequential reads before prefetch is triggered.
- * @param secReader Optional secondary file system reader.
- * @param metrics Metrics.
- */
- IgfsEventAwareInputStream(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo fileInfo,
- int prefetchBlocks, int seqReadsBeforePrefetch, @Nullable IgfsSecondaryFileSystemPositionedReadable secReader,
- IgfsLocalMetrics metrics) {
- super(igfsCtx, path, fileInfo, prefetchBlocks, seqReadsBeforePrefetch, secReader, metrics);
-
- metrics.incrementFilesOpenedForRead();
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("NonSynchronizedMethodOverridesSynchronizedMethod")
- @Override public void close() throws IOException {
- if (closeGuard.compareAndSet(false, true)) {
- super.close();
-
- metrics.decrementFilesOpenedForRead();
-
- if (evts.isRecordable(EVT_IGFS_FILE_CLOSED_READ))
- evts.record(new IgfsEvent(path, localNode(), EVT_IGFS_FILE_CLOSED_READ, bytes()));
- }
- }
- }
-
/**
* Space calculation task.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/409f043b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
index f20a423..2f9f2fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
@@ -19,12 +19,14 @@ package org.apache.ignite.internal.processors.igfs;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.events.IgfsEvent;
import org.apache.ignite.igfs.IgfsCorruptedFileException;
import org.apache.ignite.igfs.IgfsInputStream;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.igfs.IgfsPathNotFoundException;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -43,6 +45,8 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_READ;
+
/**
* Input stream to read data from grid cache with separate blocks.
*/
@@ -50,11 +54,8 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
/** Empty chunks result. */
private static final byte[][] EMPTY_CHUNKS = new byte[0][];
- /** Meta manager. */
- private final IgfsMetaManager meta;
-
- /** Data manager. */
- private final IgfsDataManager data;
+ /** IGFS context. */
+ private final IgfsContext igfsCtx;
/** Secondary file system reader. */
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
@@ -108,9 +109,6 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
/** Time consumed on reading. */
private long time;
- /** Local IGFS metrics. */
- private final IgfsLocalMetrics metrics;
-
/**
* Constructs file output stream.
*
@@ -120,24 +118,19 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
* @param prefetchBlocks Number of blocks to prefetch.
* @param seqReadsBeforePrefetch Amount of sequential reads before prefetch is triggered.
* @param secReader Optional secondary file system reader.
- * @param metrics Local IGFS metrics.
*/
IgfsInputStreamImpl(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo fileInfo, int prefetchBlocks,
- int seqReadsBeforePrefetch, @Nullable IgfsSecondaryFileSystemPositionedReadable secReader, IgfsLocalMetrics metrics) {
+ int seqReadsBeforePrefetch, @Nullable IgfsSecondaryFileSystemPositionedReadable secReader) {
assert igfsCtx != null;
assert path != null;
assert fileInfo != null;
- assert metrics != null;
+ this.igfsCtx = igfsCtx;
this.path = path;
this.fileInfo = fileInfo;
this.prefetchBlocks = prefetchBlocks;
this.seqReadsBeforePrefetch = seqReadsBeforePrefetch;
this.secReader = secReader;
- this.metrics = metrics;
-
- meta = igfsCtx.meta();
- data = igfsCtx.data();
log = igfsCtx.kernalContext().log(IgfsInputStream.class);
@@ -146,6 +139,8 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
locCache = new LinkedHashMap<>(maxLocCacheSize, 1.0f);
pendingFuts = new GridConcurrentHashSet<>(prefetchBlocks > 0 ? prefetchBlocks : 1);
+
+ igfsCtx.metrics().incrementFilesOpenedForRead();
}
/**
@@ -295,46 +290,56 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
/** {@inheritDoc} */
@Override public synchronized void close() throws IOException {
- try {
- if (secReader != null) {
- // Close secondary input stream.
- secReader.close();
-
- // Ensuring local cache futures completion.
- for (IgniteInternalFuture<byte[]> fut : locCache.values()) {
- try {
- fut.get();
- }
- catch (IgniteCheckedException ignore) {
- // No-op.
+ if (!closed) {
+ try {
+ if (secReader != null) {
+ // Close secondary input stream.
+ secReader.close();
+
+ // Ensuring local cache futures completion.
+ for (IgniteInternalFuture<byte[]> fut : locCache.values()) {
+ try {
+ fut.get();
+ }
+ catch (IgniteCheckedException ignore) {
+ // No-op.
+ }
}
- }
- // Ensuring pending evicted futures completion.
- while (!pendingFuts.isEmpty()) {
- pendingFutsLock.lock();
+ // Ensuring pending evicted futures completion.
+ while (!pendingFuts.isEmpty()) {
+ pendingFutsLock.lock();
- try {
- pendingFutsCond.await(100, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException ignore) {
- // No-op.
- }
- finally {
- pendingFutsLock.unlock();
+ try {
+ pendingFutsCond.await(100, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException ignore) {
+ // No-op.
+ }
+ finally {
+ pendingFutsLock.unlock();
+ }
}
}
}
- }
- catch (Exception e) {
- throw new IOException("File to close the file: " + path, e);
- }
- finally {
- closed = true;
+ catch (Exception e) {
+ throw new IOException("File to close the file: " + path, e);
+ }
+ finally {
+ closed = true;
+
+ IgfsLocalMetrics metrics = igfsCtx.metrics();
+
+ metrics.addReadBytesTime(bytes, time);
+ metrics.decrementFilesOpenedForRead();
- metrics.addReadBytesTime(bytes, time);
+ locCache.clear();
- locCache.clear();
+ GridEventStorageManager evts = igfsCtx.kernalContext().event();
+
+ if (evts.isRecordable(EVT_IGFS_FILE_CLOSED_READ))
+ evts.record(new IgfsEvent(path, igfsCtx.localNode(), EVT_IGFS_FILE_CLOSED_READ, bytes()));
+ }
}
}
@@ -408,7 +413,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
// This failure may be caused by file being fragmented.
if (fileInfo.fileMap() != null && !fileInfo.fileMap().ranges().isEmpty()) {
- IgfsEntryInfo newInfo = meta.info(fileInfo.id());
+ IgfsEntryInfo newInfo = igfsCtx.meta().info(fileInfo.id());
// File was deleted.
if (newInfo == null)
@@ -540,7 +545,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
*/
@Nullable protected IgniteInternalFuture<byte[]> dataBlock(IgfsEntryInfo fileInfo, long blockIdx)
throws IgniteCheckedException {
- return data.dataBlock(fileInfo, path, blockIdx, secReader);
+ return igfsCtx.data().dataBlock(fileInfo, path, blockIdx, secReader);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/409f043b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index bbff93b..6dec0c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -127,7 +127,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
writeFut = igfsCtx.data().writeStart(fileInfo.id());
}
- igfsCtx.igfs().localMetrics().incrementFilesOpenedForWrite();
+ igfsCtx.metrics().incrementFilesOpenedForWrite();
}
/** {@inheritDoc} */
@@ -355,8 +355,8 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
if (err != null)
throw err;
- igfsCtx.igfs().localMetrics().addWrittenBytesTime(bytes, time);
- igfsCtx.igfs().localMetrics().decrementFilesOpenedForWrite();
+ igfsCtx.metrics().addWrittenBytesTime(bytes, time);
+ igfsCtx.metrics().decrementFilesOpenedForWrite();
GridEventStorageManager evts = igfsCtx.kernalContext().event();
@@ -396,7 +396,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
/**
* Send local buffer if at least something is stored there.
*
- * @throws IOException
+ * @throws IOException If failed.
*/
private void sendBufferIfNotEmpty() throws IOException {
if (buf != null && buf.position() > 0)
http://git-wip-us.apache.org/repos/asf/ignite/blob/409f043b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
index 2b989c5..04c67dc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
@@ -117,13 +117,6 @@ public class IgfsMock implements IgfsEx {
}
/** {@inheritDoc} */
- @Override public IgfsLocalMetrics localMetrics() {
- throwUnsupported();
-
- return null;
- }
-
- /** {@inheritDoc} */
@Override public long groupBlockSize() {
throwUnsupported();