You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/06/07 07:56:11 UTC
[08/14] ignite git commit: WIP.
WIP.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f6fd3b84
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f6fd3b84
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f6fd3b84
Branch: refs/heads/ignite-3264
Commit: f6fd3b84f17cff6dd4d335ad18e2a8a322a1942f
Parents: 04e311b
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 12:59:43 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jun 6 12:59:43 2016 +0300
----------------------------------------------------------------------
.../processors/igfs/IgfsOutputStreamImpl.java | 96 ++++++--------------
1 file changed, 26 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f6fd3b84/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index 13808ea..16a20a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -221,8 +221,9 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
sendData(true);
try {
- storeDataBlocks(in, len);
- } catch (IgniteCheckedException e) {
+ storeData(in, len);
+ }
+ catch (IgniteCheckedException e) {
throw new IOException(e.getMessage(), e);
}
@@ -323,16 +324,23 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
/**
* Store data block.
*
- * @param block Block.
+ * @param data Block.
+ * @param writeLen Write length.
* @throws IgniteCheckedException If failed.
* @throws IOException If failed.
*/
- private void storeDataBlock(ByteBuffer block) throws IgniteCheckedException, IOException {
+ private void storeData(Object data, int writeLen) throws IgniteCheckedException, IOException {
assert Thread.holdsLock(mux);
+ assert data instanceof ByteBuffer || data instanceof DataInput;
- int writeLen = block.remaining();
+ if (writeCompletionFut.isDone()) {
+ assert ((GridFutureAdapter)writeCompletionFut).isFailed();
+
+ writeCompletionFut.get();
+ }
- preStoreDataBlocks(null, writeLen);
+ bytes += writeLen;
+ space += writeLen;
int blockSize = fileInfo.blockSize();
@@ -350,80 +358,28 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
remainder = allocated;
}
- block.get(remainder, remainderDataLen, writeLen);
+ if (data instanceof ByteBuffer)
+ ((ByteBuffer)data).get(remainder, remainderDataLen, writeLen);
+ else
+ ((DataInput)data).readFully(remainder, remainderDataLen, writeLen);
remainderDataLen += writeLen;
}
else {
- remainder = igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, remainder,
- remainderDataLen, block, false, streamRange, batch);
-
- remainderDataLen = remainder == null ? 0 : remainder.length;
- }
- }
-
- /**
- * Store data blocks.
- *
- * @param in Input.
- * @param len Length.
- * @throws IgniteCheckedException If failed.
- * @throws IOException If failed.
- */
- private void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException {
- assert Thread.holdsLock(mux);
-
- preStoreDataBlocks(in, len);
-
- int blockSize = fileInfo.blockSize();
-
- // If data length is not enough to fill full block, fill the remainder and return.
- if (remainderDataLen + len < blockSize) {
- if (remainder == null)
- remainder = new byte[blockSize];
- else if (remainder.length != blockSize) {
- assert remainderDataLen == remainder.length;
-
- byte[] allocated = new byte[blockSize];
-
- U.arrayCopy(remainder, 0, allocated, 0, remainder.length);
-
- remainder = allocated;
+ if (data instanceof ByteBuffer) {
+ remainder = igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, remainder,
+ remainderDataLen, (ByteBuffer)data, false, streamRange, batch);
+ }
+ else {
+ remainder = igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, remainder,
+ remainderDataLen, (DataInput)data, writeLen, false, streamRange, batch);
}
-
- in.readFully(remainder, remainderDataLen, len);
-
- remainderDataLen += len;
- }
- else {
- remainder = igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, remainder,
- remainderDataLen, in, len, false, streamRange, batch);
remainderDataLen = remainder == null ? 0 : remainder.length;
}
}
/**
- * Initializes data loader if it was not initialized yet and updates written space.
- *
- * @param len Data length to be written.
- */
- private void preStoreDataBlocks(@Nullable DataInput in, int len) throws IgniteCheckedException, IOException {
- // Check if any exception happened while writing data.
- if (writeCompletionFut.isDone()) {
- assert ((GridFutureAdapter)writeCompletionFut).isFailed();
-
- if (in != null)
- in.skipBytes(len);
-
- writeCompletionFut.get();
- }
-
- bytes += len;
- space += len;
- }
-
- /**
* Close callback. It will be called only once in synchronized section.
*
* @param deleted Whether we already know that the file was deleted.
@@ -542,7 +498,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
if (flip)
buf.flip();
- storeDataBlock(buf);
+ storeData(buf, buf.remaining());
buf = null;
}