You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/06/07 07:56:11 UTC

[08/14] ignite git commit: WIP.

WIP.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f6fd3b84
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f6fd3b84
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f6fd3b84

Branch: refs/heads/ignite-3264
Commit: f6fd3b84f17cff6dd4d335ad18e2a8a322a1942f
Parents: 04e311b
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 12:59:43 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jun 6 12:59:43 2016 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsOutputStreamImpl.java   | 96 ++++++--------------
 1 file changed, 26 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f6fd3b84/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index 13808ea..16a20a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -221,8 +221,9 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
                 sendData(true);
 
             try {
-                storeDataBlocks(in, len);
-            } catch (IgniteCheckedException e) {
+                storeData(in, len);
+            }
+            catch (IgniteCheckedException e) {
                 throw new IOException(e.getMessage(), e);
             }
 
@@ -323,16 +324,23 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
     /**
      * Store data block.
      *
-     * @param block Block.
+     * @param data Block.
+     * @param writeLen Write length.
      * @throws IgniteCheckedException If failed.
      * @throws IOException If failed.
      */
-    private void storeDataBlock(ByteBuffer block) throws IgniteCheckedException, IOException {
+    private void storeData(Object data, int writeLen) throws IgniteCheckedException, IOException {
         assert Thread.holdsLock(mux);
+        assert data instanceof ByteBuffer || data instanceof DataInput;
 
-        int writeLen = block.remaining();
+        if (writeCompletionFut.isDone()) {
+            assert ((GridFutureAdapter)writeCompletionFut).isFailed();
+
+            writeCompletionFut.get();
+        }
 
-        preStoreDataBlocks(null, writeLen);
+        bytes += writeLen;
+        space += writeLen;
 
         int blockSize = fileInfo.blockSize();
 
@@ -350,80 +358,28 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
                 remainder = allocated;
             }
 
-            block.get(remainder, remainderDataLen, writeLen);
+            if (data instanceof ByteBuffer)
+                ((ByteBuffer)data).get(remainder, remainderDataLen, writeLen);
+            else
+                ((DataInput)data).readFully(remainder, remainderDataLen, writeLen);
 
             remainderDataLen += writeLen;
         }
         else {
-            remainder = igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, remainder,
-                remainderDataLen, block, false, streamRange, batch);
-
-            remainderDataLen = remainder == null ? 0 : remainder.length;
-        }
-    }
-
-    /**
-     * Store data blocks.
-     *
-     * @param in Input.
-     * @param len Length.
-     * @throws IgniteCheckedException If failed.
-     * @throws IOException If failed.
-     */
-    private void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException {
-        assert Thread.holdsLock(mux);
-
-        preStoreDataBlocks(in, len);
-
-        int blockSize = fileInfo.blockSize();
-
-        // If data length is not enough to fill full block, fill the remainder and return.
-        if (remainderDataLen + len < blockSize) {
-            if (remainder == null)
-                remainder = new byte[blockSize];
-            else if (remainder.length != blockSize) {
-                assert remainderDataLen == remainder.length;
-
-                byte[] allocated = new byte[blockSize];
-
-                U.arrayCopy(remainder, 0, allocated, 0, remainder.length);
-
-                remainder = allocated;
+            if (data instanceof ByteBuffer) {
+                remainder = igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, remainder,
+                    remainderDataLen, (ByteBuffer)data, false, streamRange, batch);
+            }
+            else {
+                remainder = igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, remainder,
+                    remainderDataLen, (DataInput)data, writeLen, false, streamRange, batch);
             }
-
-            in.readFully(remainder, remainderDataLen, len);
-
-            remainderDataLen += len;
-        }
-        else {
-            remainder = igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, remainder,
-                remainderDataLen, in, len, false, streamRange, batch);
 
             remainderDataLen = remainder == null ? 0 : remainder.length;
         }
     }
 
     /**
-     * Initializes data loader if it was not initialized yet and updates written space.
-     *
-     * @param len Data length to be written.
-     */
-    private void preStoreDataBlocks(@Nullable DataInput in, int len) throws IgniteCheckedException, IOException {
-        // Check if any exception happened while writing data.
-        if (writeCompletionFut.isDone()) {
-            assert ((GridFutureAdapter)writeCompletionFut).isFailed();
-
-            if (in != null)
-                in.skipBytes(len);
-
-            writeCompletionFut.get();
-        }
-
-        bytes += len;
-        space += len;
-    }
-
-    /**
      * Close callback. It will be called only once in synchronized section.
      *
      * @param deleted Whether we already know that the file was deleted.
@@ -542,7 +498,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
             if (flip)
                 buf.flip();
 
-            storeDataBlock(buf);
+            storeData(buf, buf.remaining());
 
             buf = null;
         }