You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/09/23 16:05:45 UTC
[11/27] ignite git commit: IGNITE-3859: IGFS: Support direct PROXY
mode invocation in the open method,
add proxy mode to IgfsInputStreamImpl This closes #1065. This closes #1083.
IGNITE-3859: IGFS: Support direct PROXY mode invocation in the open method, add proxy mode to IgfsInputStreamImpl
This closes #1065. This closes #1083.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5a35ee9d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5a35ee9d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5a35ee9d
Branch: refs/heads/ignite-comm-balance
Commit: 5a35ee9dad194b3009151b79f0ebd3976bb8fd22
Parents: 2474e2b
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Tue Sep 20 14:10:55 2016 +0500
Committer: tledkov-gridgain <tl...@gridgain.com>
Committed: Tue Sep 20 14:10:55 2016 +0500
----------------------------------------------------------------------
.../internal/processors/igfs/IgfsContext.java | 35 ++++++
.../processors/igfs/IgfsDataManager.java | 121 ++++++++-----------
.../internal/processors/igfs/IgfsImpl.java | 82 ++++++++++---
.../processors/igfs/IgfsInputStreamImpl.java | 103 +++++++++++-----
4 files changed, 226 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a35ee9d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
index 3e01246..3405b53 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
@@ -20,7 +20,10 @@ package org.apache.ignite.internal.processors.igfs;
import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.FileSystemConfiguration;
import org.apache.ignite.internal.GridKernalContext;
@@ -60,6 +63,12 @@ public class IgfsContext {
/** Local cluster node. */
private volatile ClusterNode locNode;
+ /** IGFS executor service. */
+ private ExecutorService igfsSvc;
+
+ /** Logger. */
+ protected IgniteLogger log;
+
/**
* @param ctx Kernal context.
* @param cfg IGFS configuration.
@@ -85,6 +94,10 @@ public class IgfsContext {
this.srvMgr = add(srvMgr);
this.fragmentizerMgr = add(fragmentizerMgr);
+ log = ctx.log(IgfsContext.class);
+
+ igfsSvc = ctx.getIgfsExecutorService();
+
igfs = new IgfsImpl(this);
}
@@ -206,6 +219,28 @@ public class IgfsContext {
}
/**
+ * Executes runnable in IGFS executor service. If execution rejected, runnable will be executed
+ * in caller thread.
+ *
+ * @param r Runnable to execute.
+ */
+ public void runInIgfsThreadPool(Runnable r) {
+ try {
+ igfsSvc.submit(r);
+ }
+ catch (RejectedExecutionException ignored) {
+ // This exception will happen if network speed is too low and data comes faster
+ // than we can send it to remote nodes.
+ try {
+ r.run();
+ }
+ catch (Exception e) {
+ log.warning("Failed to execute IGFS runnable: " + r, e);
+ }
+ }
+ }
+
+ /**
* Adds manager to managers list.
*
* @param mgr Manager.
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a35ee9d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index d2183f9..2f704ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -45,7 +45,6 @@ import org.apache.ignite.internal.processors.igfs.data.IgfsDataPutProcessor;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
@@ -74,12 +73,9 @@ import java.util.LinkedList;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@@ -123,9 +119,6 @@ public class IgfsDataManager extends IgfsManager {
/** Affinity key generator. */
private AtomicLong affKeyGen = new AtomicLong();
- /** IGFS executor service. */
- private ExecutorService igfsSvc;
-
/** Request ID counter for write messages. */
private AtomicLong reqIdCtr = new AtomicLong();
@@ -183,8 +176,6 @@ public class IgfsDataManager extends IgfsManager {
}
}, EVT_NODE_LEFT, EVT_NODE_FAILED);
- igfsSvc = igfsCtx.kernalContext().getIgfsExecutorService();
-
delWorker = new AsyncDeleteWorker(igfsCtx.kernalContext().gridName(),
"igfs-" + igfsName + "-delete-worker", log);
}
@@ -345,45 +336,11 @@ public class IgfsDataManager extends IgfsManager {
if (oldRmtReadFut == null) {
try {
- if (log.isDebugEnabled())
- log.debug("Reading non-local data block in the secondary file system [path=" +
- path + ", fileInfo=" + fileInfo + ", blockIdx=" + blockIdx + ']');
-
- int blockSize = fileInfo.blockSize();
-
- long pos = blockIdx * blockSize; // Calculate position for Hadoop
-
- res = new byte[blockSize];
-
- int read = 0;
-
- synchronized (secReader) {
- try {
- // Delegate to the secondary file system.
- while (read < blockSize) {
- int r = secReader.read(pos + read, res, read, blockSize - read);
-
- if (r < 0)
- break;
-
- read += r;
- }
- }
- catch (IOException e) {
- throw new IgniteCheckedException("Failed to read data due to secondary file system " +
- "exception: " + e.getMessage(), e);
- }
- }
-
- // If we did not read full block at the end of the file - trim it.
- if (read != blockSize)
- res = Arrays.copyOf(res, read);
+ res = secondaryDataBlock(path, blockIdx, secReader, fileInfo.blockSize());
rmtReadFut.onDone(res);
putBlock(fileInfo.blockSize(), key, res);
-
- igfsCtx.metrics().addReadBlocks(1, 1);
}
catch (IgniteCheckedException e) {
rmtReadFut.onDone(e);
@@ -417,11 +374,59 @@ public class IgfsDataManager extends IgfsManager {
}
/**
+ * Get data block for specified block index from secondary reader.
+ *
+ * @param path Path reading from.
+ * @param blockIdx Block index.
+ * @param secReader Optional secondary file system reader.
+ * @param blockSize Block size.
+ * @return Requested data block or {@code null} if nothing found.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable public byte[] secondaryDataBlock(IgfsPath path, long blockIdx,
+ IgfsSecondaryFileSystemPositionedReadable secReader, int blockSize) throws IgniteCheckedException {
+ if (log.isDebugEnabled())
+ log.debug("Reading non-local data block in the secondary file system [path=" +
+ path + ", blockIdx=" + blockIdx + ']');
+
+ long pos = blockIdx * blockSize; // Calculate position for Hadoop
+
+ byte[] res = new byte[blockSize];
+
+ int read = 0;
+
+ try {
+ // Delegate to the secondary file system.
+ while (read < blockSize) {
+ int r = secReader.read(pos + read, res, read, blockSize - read);
+
+ if (r < 0)
+ break;
+
+ read += r;
+ }
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException("Failed to read data due to secondary file system " +
+ "exception: " + e.getMessage(), e);
+ }
+
+ // If we did not read full block at the end of the file - trim it.
+ if (read != blockSize)
+ res = Arrays.copyOf(res, read);
+
+ igfsCtx.metrics().addReadBlocks(1, 1);
+
+ return res;
+ }
+
+ /**
* Stores the given block in data cache.
*
* @param blockSize The size of the block.
* @param key The data cache key of the block.
* @param data The new value of the block.
+ * @throws IgniteCheckedException If failed.
*/
private void putBlock(int blockSize, IgfsBlockKey key, byte[] data) throws IgniteCheckedException {
if (data.length < blockSize)
@@ -967,8 +972,8 @@ public class IgfsDataManager extends IgfsManager {
}
}
else {
- callIgfsLocalSafe(new GridPlainCallable<Object>() {
- @Override @Nullable public Object call() throws Exception {
+ igfsCtx.runInIgfsThreadPool(new Runnable() {
+ @Override public void run() {
storeBlocksAsync(blocks).listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
try {
@@ -981,8 +986,6 @@ public class IgfsDataManager extends IgfsManager {
}
}
});
-
- return null;
}
});
}
@@ -1070,28 +1073,6 @@ public class IgfsDataManager extends IgfsManager {
}
/**
- * Executes callable in IGFS executor service. If execution rejected, callable will be executed
- * in caller thread.
- *
- * @param c Callable to execute.
- */
- private <T> void callIgfsLocalSafe(Callable<T> c) {
- try {
- igfsSvc.submit(c);
- }
- catch (RejectedExecutionException ignored) {
- // This exception will happen if network speed is too low and data comes faster
- // than we can send it to remote nodes.
- try {
- c.call();
- }
- catch (Exception e) {
- log.warning("Failed to execute IGFS callable: " + c, e);
- }
- }
- }
-
- /**
* @param blocks Blocks to write.
* @return Future that will be completed after put is done.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a35ee9d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 45596a3..87a4699 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -47,6 +47,7 @@ import org.apache.ignite.igfs.IgfsPathSummary;
import org.apache.ignite.igfs.mapreduce.IgfsRecordResolver;
import org.apache.ignite.igfs.mapreduce.IgfsTask;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
+import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
@@ -949,34 +950,79 @@ public final class IgfsImpl implements IgfsEx {
IgfsMode mode = resolveMode(path);
- if (mode != PRIMARY) {
- assert IgfsUtils.isDualMode(mode);
+ switch (mode) {
+ case PRIMARY: {
+ IgfsEntryInfo info = meta.infoForPath(path);
- IgfsSecondaryInputStreamDescriptor desc = meta.openDual(secondaryFs, path, bufSize0);
+ if (info == null)
+ throw new IgfsPathNotFoundException("File not found: " + path);
- IgfsInputStreamImpl os = new IgfsInputStreamImpl(igfsCtx, path, desc.info(),
- cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, desc.reader());
+ if (!info.isFile())
+ throw new IgfsPathIsDirectoryException("Failed to open file (not a file): " + path);
- IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ);
+ // Input stream to read data from grid cache with separate blocks.
+ IgfsInputStreamImpl os = new IgfsInputStreamImpl(igfsCtx, path, info,
+ cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, null,
+ info.length(), info.blockSize(), info.blocksCount(), false);
- return os;
- }
+ IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ);
- IgfsEntryInfo info = meta.infoForPath(path);
+ return os;
+ }
- if (info == null)
- throw new IgfsPathNotFoundException("File not found: " + path);
+ case DUAL_ASYNC:
+ case DUAL_SYNC: {
+ assert IgfsUtils.isDualMode(mode);
- if (!info.isFile())
- throw new IgfsPathIsDirectoryException("Failed to open file (not a file): " + path);
+ IgfsSecondaryInputStreamDescriptor desc = meta.openDual(secondaryFs, path, bufSize0);
+
+ IgfsEntryInfo info = desc.info();
- // Input stream to read data from grid cache with separate blocks.
- IgfsInputStreamImpl os = new IgfsInputStreamImpl(igfsCtx, path, info,
- cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, null);
+ IgfsInputStreamImpl os = new IgfsInputStreamImpl(igfsCtx, path, info,
+ cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, desc.reader(),
+ info.length(), info.blockSize(), info.blocksCount(), false);
+
+ IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ);
+
+ return os;
+ }
- IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ);
+ case PROXY: {
+ assert secondaryFs != null;
- return os;
+ IgfsFile info = info(path);
+
+ if (info == null)
+ throw new IgfsPathNotFoundException("File not found: " + path);
+
+ if (!info.isFile())
+ throw new IgfsPathIsDirectoryException("Failed to open file (not a file): " + path);
+
+ IgfsSecondaryFileSystemPositionedReadable secReader =
+ new IgfsLazySecondaryFileSystemPositionedReadable(secondaryFs, path, bufSize);
+
+ long len = info.length();
+
+ int blockSize = info.blockSize() > 0 ? info.blockSize() : cfg.getBlockSize();
+
+ long blockCnt = len / blockSize;
+
+ if (len % blockSize != 0)
+ blockCnt++;
+
+ IgfsInputStream os = new IgfsInputStreamImpl(igfsCtx, path, null,
+ cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, secReader,
+ info.length(), blockSize, blockCnt, true);
+
+ IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ);
+
+ return os;
+ }
+
+ default:
+ assert false : "Unexpected mode " + mode;
+ return null;
+ }
}
});
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5a35ee9d/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
index 2f9f2fc..0d9f2cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
@@ -28,6 +28,7 @@ import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadabl
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
@@ -109,21 +110,44 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
/** Time consumed on reading. */
private long time;
+ /** File Length. */
+ private long len;
+
+ /** Block size to read. */
+ private int blockSize;
+
+ /** Block size to read. */
+ private long blocksCnt;
+
+ /** Proxy mode. */
+ private boolean proxy;
+
/**
* Constructs file output stream.
- *
- * @param igfsCtx IGFS context.
+ * @param igfsCtx IGFS context.
* @param path Path to stored file.
* @param fileInfo File info to write binary data to.
* @param prefetchBlocks Number of blocks to prefetch.
* @param seqReadsBeforePrefetch Amount of sequential reads before prefetch is triggered.
* @param secReader Optional secondary file system reader.
+ * @param len File length.
+ * @param blockSize Block size.
+ * @param blocksCnt Blocks count.
+ * @param proxy Proxy mode flag.
*/
- IgfsInputStreamImpl(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo fileInfo, int prefetchBlocks,
- int seqReadsBeforePrefetch, @Nullable IgfsSecondaryFileSystemPositionedReadable secReader) {
+ IgfsInputStreamImpl(
+ IgfsContext igfsCtx,
+ IgfsPath path,
+ @Nullable IgfsEntryInfo fileInfo,
+ int prefetchBlocks,
+ int seqReadsBeforePrefetch,
+ @Nullable IgfsSecondaryFileSystemPositionedReadable secReader,
+ long len,
+ int blockSize,
+ long blocksCnt,
+ boolean proxy) {
assert igfsCtx != null;
assert path != null;
- assert fileInfo != null;
this.igfsCtx = igfsCtx;
this.path = path;
@@ -131,6 +155,10 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
this.prefetchBlocks = prefetchBlocks;
this.seqReadsBeforePrefetch = seqReadsBeforePrefetch;
this.secReader = secReader;
+ this.len = len;
+ this.blockSize = blockSize;
+ this.blocksCnt = blocksCnt;
+ this.proxy = proxy;
log = igfsCtx.kernalContext().log(IgfsInputStream.class);
@@ -154,7 +182,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
/** {@inheritDoc} */
@Override public long length() {
- return fileInfo.length();
+ return len;
}
/** {@inheritDoc} */
@@ -195,7 +223,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
/** {@inheritDoc} */
@Override public synchronized int available() throws IOException {
- long l = fileInfo.length() - pos;
+ long l = len - pos;
if (l < 0)
return 0;
@@ -240,7 +268,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
@SuppressWarnings("IfMayBeConditional")
public synchronized byte[][] readChunks(long pos, int len) throws IOException {
// Readable bytes in the file, starting from the specified position.
- long readable = fileInfo.length() - pos;
+ long readable = this.len - pos;
if (readable <= 0)
return EMPTY_CHUNKS;
@@ -254,8 +282,8 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
bytes += len;
- int start = (int)(pos / fileInfo.blockSize());
- int end = (int)((pos + len - 1) / fileInfo.blockSize());
+ int start = (int)(pos / blockSize);
+ int end = (int)((pos + len - 1) / blockSize);
int chunkCnt = end - start + 1;
@@ -264,7 +292,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
for (int i = 0; i < chunkCnt; i++) {
byte[] block = blockFragmentizerSafe(start + i);
- int blockOff = (int)(pos % fileInfo.blockSize());
+ int blockOff = (int)(pos % blockSize);
int blockLen = Math.min(len, block.length - blockOff);
// If whole block can be used as result, do not do array copy.
@@ -366,7 +394,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
return 0; // Fully read done: read zero bytes correctly.
// Readable bytes in the file, starting from the specified position.
- long readable = fileInfo.length() - pos;
+ long readable = this.len - pos;
if (readable <= 0)
return -1; // EOF.
@@ -378,10 +406,10 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
assert len > 0;
- byte[] block = blockFragmentizerSafe(pos / fileInfo.blockSize());
+ byte[] block = blockFragmentizerSafe(pos / blockSize);
// Skip bytes to expected position.
- int blockOff = (int)(pos % fileInfo.blockSize());
+ int blockOff = (int)(pos % blockSize);
len = Math.min(len, block.length - blockOff);
@@ -412,7 +440,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
", blockIdx=" + blockIdx + ", errMsg=" + e.getMessage() + ']');
// This failure may be caused by file being fragmented.
- if (fileInfo.fileMap() != null && !fileInfo.fileMap().ranges().isEmpty()) {
+ if (fileInfo != null && fileInfo.fileMap() != null && !fileInfo.fileMap().ranges().isEmpty()) {
IgfsEntryInfo newInfo = igfsCtx.meta().info(fileInfo.id());
// File was deleted.
@@ -459,7 +487,7 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
prevBlockIdx = blockIdx;
- bytesFut = dataBlock(fileInfo, blockIdx);
+ bytesFut = dataBlock(blockIdx);
assert bytesFut != null;
@@ -470,10 +498,10 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
if (prefetchBlocks > 0 && seqReads >= seqReadsBeforePrefetch - 1) {
for (int i = 1; i <= prefetchBlocks; i++) {
// Ensure that we do not prefetch over file size.
- if (fileInfo.blockSize() * (i + blockIdx) >= fileInfo.length())
+ if (blockSize * (i + blockIdx) >= len)
break;
else if (locCache.get(blockIdx + i) == null)
- addLocalCacheFuture(blockIdx + i, dataBlock(fileInfo, blockIdx + i));
+ addLocalCacheFuture(blockIdx + i, dataBlock(blockIdx + i));
}
}
@@ -483,17 +511,17 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
throw new IgfsCorruptedFileException("Failed to retrieve file's data block (corrupted file?) " +
"[path=" + path + ", blockIdx=" + blockIdx + ']');
- int blockSize = fileInfo.blockSize();
+ int blockSize0 = blockSize;
- if (blockIdx == fileInfo.blocksCount() - 1)
- blockSize = (int)(fileInfo.length() % blockSize);
+ if (blockIdx == blocksCnt - 1)
+ blockSize0 = (int)(len % blockSize0);
// If part of the file was reserved for writing, but was not actually written.
- if (bytes.length < blockSize)
+ if (bytes.length < blockSize0)
throw new IOException("Inconsistent file's data block (incorrectly written?)" +
" [path=" + path + ", blockIdx=" + blockIdx + ", blockSize=" + bytes.length +
- ", expectedBlockSize=" + blockSize + ", fileBlockSize=" + fileInfo.blockSize() +
- ", fileLen=" + fileInfo.length() + ']');
+ ", expectedBlockSize=" + blockSize0 + ", fileBlockSize=" + blockSize +
+ ", fileLen=" + len + ']');
return bytes;
}
@@ -538,14 +566,35 @@ public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondar
/**
* Get data block for specified block index.
*
- * @param fileInfo File info.
* @param blockIdx Block index.
* @return Requested data block or {@code null} if nothing found.
* @throws IgniteCheckedException If failed.
*/
- @Nullable protected IgniteInternalFuture<byte[]> dataBlock(IgfsEntryInfo fileInfo, long blockIdx)
+ @Nullable protected IgniteInternalFuture<byte[]> dataBlock(final long blockIdx)
throws IgniteCheckedException {
- return igfsCtx.data().dataBlock(fileInfo, path, blockIdx, secReader);
+ if (proxy) {
+ assert secReader != null;
+
+ final GridFutureAdapter<byte[]> fut = new GridFutureAdapter<>();
+
+ igfsCtx.runInIgfsThreadPool(new Runnable() {
+ @Override public void run() {
+ try {
+ fut.onDone(igfsCtx.data().secondaryDataBlock(path, blockIdx, secReader, blockSize));
+ }
+ catch (Throwable e) {
+ fut.onDone(null, e);
+ }
+ }
+ });
+
+ return fut;
+ }
+ else {
+ assert fileInfo != null;
+
+ return igfsCtx.data().dataBlock(fileInfo, path, blockIdx, secReader);
+ }
}
/** {@inheritDoc} */