You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/06/07 07:56:05 UTC

[02/14] ignite git commit: Better encapsulated monitor.

Better encapsulated monitor.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d3a432c0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d3a432c0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d3a432c0

Branch: refs/heads/ignite-3264
Commit: d3a432c02e300988e39516641fb17d5b5a9af698
Parents: 3cd3373
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 11:57:10 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jun 6 11:57:10 2016 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsOutputStreamImpl.java   | 271 ++++++++++---------
 1 file changed, 144 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d3a432c0/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index 7a40ba3..7363ffe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -113,6 +113,9 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
     /** Close guard. */
     private final AtomicBoolean closeGuard = new AtomicBoolean(false);
 
+    /** Mutex for synchronization. */
+    private final Object mux = new Object();
+
     /**
      * Constructs file output stream.
      *
@@ -126,59 +129,63 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
      */
     IgfsOutputStreamImpl(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo fileInfo, int bufSize, IgfsMode mode,
         @Nullable IgfsFileWorkerBatch batch, IgfsLocalMetrics metrics) {
-        this.path = path;
-        this.bufSize = optimizeBufferSize(bufSize, fileInfo);
+        synchronized (mux) {
+            this.path = path;
+            this.bufSize = optimizeBufferSize(bufSize, fileInfo);
 
-        assert fileInfo != null;
-        assert fileInfo.isFile() : "Unexpected file info: " + fileInfo;
-        assert mode != null && mode != PROXY;
-        assert mode == PRIMARY && batch == null || batch != null;
-        assert metrics != null;
+            assert fileInfo != null;
+            assert fileInfo.isFile() : "Unexpected file info: " + fileInfo;
+            assert mode != null && mode != PROXY;
+            assert mode == PRIMARY && batch == null || batch != null;
+            assert metrics != null;
 
-        // File hasn't been locked.
-        if (fileInfo.lockId() == null)
-            throw new IgfsException("Failed to acquire file lock (concurrently modified?): " + path);
+            // File hasn't been locked.
+            if (fileInfo.lockId() == null)
+                throw new IgfsException("Failed to acquire file lock (concurrently modified?): " + path);
 
-        assert !IgfsUtils.DELETE_LOCK_ID.equals(fileInfo.lockId());
+            assert !IgfsUtils.DELETE_LOCK_ID.equals(fileInfo.lockId());
 
-        this.igfsCtx = igfsCtx;
-        meta = igfsCtx.meta();
-        data = igfsCtx.data();
+            this.igfsCtx = igfsCtx;
+            meta = igfsCtx.meta();
+            data = igfsCtx.data();
 
-        this.fileInfo = fileInfo;
-        this.mode = mode;
-        this.batch = batch;
-        this.metrics = metrics;
+            this.fileInfo = fileInfo;
+            this.mode = mode;
+            this.batch = batch;
+            this.metrics = metrics;
 
-        streamRange = initialStreamRange(fileInfo);
+            streamRange = initialStreamRange(fileInfo);
 
-        writeCompletionFut = data.writeStart(fileInfo);
+            writeCompletionFut = data.writeStart(fileInfo);
 
-        metrics.incrementFilesOpenedForWrite();
+            metrics.incrementFilesOpenedForWrite();
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized void write(int b) throws IOException {
-        checkClosed(null, 0);
+    @Override public void write(int b) throws IOException {
+        synchronized (mux) {
+            checkClosed(null, 0);
 
-        long startTime = System.nanoTime();
+            long startTime = System.nanoTime();
 
-        b &= 0xFF;
+            b &= 0xFF;
 
-        if (buf == null)
-            buf = ByteBuffer.allocate(bufSize);
+            if (buf == null)
+                buf = ByteBuffer.allocate(bufSize);
 
-        buf.put((byte)b);
+            buf.put((byte)b);
 
-        if (buf.position() >= bufSize)
-            sendData(true); // Send data to server.
+            if (buf.position() >= bufSize)
+                sendData(true); // Send data to server.
 
-        time += System.nanoTime() - startTime;
+            time += System.nanoTime() - startTime;
+        }
     }
 
     /** {@inheritDoc} */
     @SuppressWarnings("NullableProblems")
-    @Override public synchronized void write(byte[] b, int off, int len) throws IOException {
+    @Override public void write(byte[] b, int off, int len) throws IOException {
         A.notNull(b, "b");
 
         if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
@@ -186,89 +193,94 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
                 ", length=" + len + ']');
         }
 
-        checkClosed(null, 0);
+        synchronized (mux) {
+            checkClosed(null, 0);
 
-        if (len == 0)
-            return; // Done.
+            if (len == 0)
+                return; // Done.
 
-        long startTime = System.nanoTime();
+            long startTime = System.nanoTime();
 
-        if (buf == null) {
-            // Do not allocate and copy byte buffer if will send data immediately.
-            if (len >= bufSize) {
-                buf = ByteBuffer.wrap(b, off, len);
+            if (buf == null) {
+                // Do not allocate and copy byte buffer if will send data immediately.
+                if (len >= bufSize) {
+                    buf = ByteBuffer.wrap(b, off, len);
 
-                sendData(false);
+                    sendData(false);
 
-                return;
-            }
+                    return;
+                }
 
-            buf = ByteBuffer.allocate(Math.max(bufSize, len));
-        }
+                buf = ByteBuffer.allocate(Math.max(bufSize, len));
+            }
 
-        if (buf.remaining() < len)
-            // Expand buffer capacity, if remaining size is less then data size.
-            buf = ByteBuffer.allocate(buf.position() + len).put((ByteBuffer)buf.flip());
+            if (buf.remaining() < len)
+                // Expand buffer capacity, if remaining size is less then data size.
+                buf = ByteBuffer.allocate(buf.position() + len).put((ByteBuffer)buf.flip());
 
-        assert len <= buf.remaining() : "Expects write data size less or equal then remaining buffer capacity " +
-            "[len=" + len + ", buf.remaining=" + buf.remaining() + ']';
+            assert len <= buf.remaining() : "Expects write data size less or equal then remaining buffer capacity " +
+                "[len=" + len + ", buf.remaining=" + buf.remaining() + ']';
 
-        buf.put(b, off, len);
+            buf.put(b, off, len);
 
-        if (buf.position() >= bufSize)
-            sendData(true); // Send data to server.
+            if (buf.position() >= bufSize)
+                sendData(true); // Send data to server.
 
-        time += System.nanoTime() - startTime;
+            time += System.nanoTime() - startTime;
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized void transferFrom(DataInput in, int len) throws IOException {
-        checkClosed(in, len);
+    @Override public void transferFrom(DataInput in, int len) throws IOException {
+        synchronized (mux) {
+            checkClosed(in, len);
 
-        long startTime = System.nanoTime();
+            long startTime = System.nanoTime();
 
-        // Send all IPC data from the local buffer before streaming.
-        if (buf != null && buf.position() > 0)
-            sendData(true);
+            // Send all IPC data from the local buffer before streaming.
+            if (buf != null && buf.position() > 0)
+                sendData(true);
 
-        try {
-            storeDataBlocks(in, len);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IOException(e.getMessage(), e);
-        }
+            try {
+                storeDataBlocks(in, len);
+            } catch (IgniteCheckedException e) {
+                throw new IOException(e.getMessage(), e);
+            }
 
-        time += System.nanoTime() - startTime;
+            time += System.nanoTime() - startTime;
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public final synchronized void close() throws IOException {
-        // Do nothing if stream is already closed.
-        if (closed)
-            return;
+    @Override public final void close() throws IOException {
+        synchronized (mux) {
+            // Do nothing if stream is already closed.
+            if (closed)
+                return;
 
-        try {
-            // Send all IPC data from the local buffer.
             try {
-                flush();
-            }
-            finally {
-                if (closeGuard.compareAndSet(false, true)) {
-                    onClose(false);
+                // Send all IPC data from the local buffer.
+                try {
+                    flush();
+                }
+                finally {
+                    if (closeGuard.compareAndSet(false, true)) {
+                        onClose(false);
 
-                    metrics.decrementFilesOpenedForWrite();
+                        metrics.decrementFilesOpenedForWrite();
 
-                    GridEventStorageManager evts = igfsCtx.kernalContext().event();
+                        GridEventStorageManager evts = igfsCtx.kernalContext().event();
 
-                    if (evts.isRecordable(EVT_IGFS_FILE_CLOSED_WRITE))
-                        evts.record(new IgfsEvent(path, igfsCtx.kernalContext().discovery().localNode(),
-                            EVT_IGFS_FILE_CLOSED_WRITE, bytes));
+                        if (evts.isRecordable(EVT_IGFS_FILE_CLOSED_WRITE))
+                            evts.record(new IgfsEvent(path, igfsCtx.kernalContext().discovery().localNode(),
+                                EVT_IGFS_FILE_CLOSED_WRITE, bytes));
+                    }
                 }
             }
-        }
-        finally {
-            // Mark this stream closed AFTER flush.
-            closed = true;
+            finally {
+                // Mark this stream closed AFTER flush.
+                closed = true;
+            }
         }
     }
 
@@ -277,55 +289,56 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
      *
      * @exception IOException  if an I/O error occurs.
      */
-    @Override public synchronized void flush() throws IOException {
-        boolean exists;
+    @Override public void flush() throws IOException {
+        synchronized (mux) {
 
-        try {
-            exists = meta.exists(fileInfo.id());
-        }
-        catch (IgniteCheckedException e) {
-            throw new IOException("File to read file metadata: " + path, e);
-        }
+            boolean exists;
 
-        if (!exists) {
-            onClose(true);
+            try {
+                exists = meta.exists(fileInfo.id());
+            } catch (IgniteCheckedException e) {
+                throw new IOException("File to read file metadata: " + path, e);
+            }
 
-            throw new IOException("File was concurrently deleted: " + path);
-        }
+            if (!exists) {
+                onClose(true);
 
-        checkClosed(null, 0);
+                throw new IOException("File was concurrently deleted: " + path);
+            }
 
-        // Send all IPC data from the local buffer.
-        if (buf != null && buf.position() > 0)
-            sendData(true);
+            checkClosed(null, 0);
 
-        try {
-            if (remainder != null) {
-                data.storeDataBlocks(fileInfo, fileInfo.length() + space, null, 0,
-                    ByteBuffer.wrap(remainder, 0, remainderDataLen), true, streamRange, batch);
+            // Send all IPC data from the local buffer.
+            if (buf != null && buf.position() > 0)
+                sendData(true);
 
-                remainder = null;
-                remainderDataLen = 0;
-            }
+            try {
+                if (remainder != null) {
+                    data.storeDataBlocks(fileInfo, fileInfo.length() + space, null, 0,
+                        ByteBuffer.wrap(remainder, 0, remainderDataLen), true, streamRange, batch);
 
-            if (space > 0) {
-                data.awaitAllAcksReceived(fileInfo.id());
+                    remainder = null;
+                    remainderDataLen = 0;
+                }
 
-                IgfsEntryInfo fileInfo0 = meta.reserveSpace(path, fileInfo.id(), space, streamRange);
+                if (space > 0) {
+                    data.awaitAllAcksReceived(fileInfo.id());
 
-                if (fileInfo0 == null)
-                    throw new IOException("File was concurrently deleted: " + path);
-                else
-                    fileInfo = fileInfo0;
+                    IgfsEntryInfo fileInfo0 = meta.reserveSpace(path, fileInfo.id(), space, streamRange);
 
-                streamRange = initialStreamRange(fileInfo);
+                    if (fileInfo0 == null)
+                        throw new IOException("File was concurrently deleted: " + path);
+                    else
+                        fileInfo = fileInfo0;
 
-                space = 0;
+                    streamRange = initialStreamRange(fileInfo);
+
+                    space = 0;
+                }
+            } catch (IgniteCheckedException e) {
+                throw new IOException("Failed to flush data [path=" + path + ", space=" + space + ']', e);
             }
         }
-        catch (IgniteCheckedException e) {
-            throw new IOException("Failed to flush data [path=" + path + ", space=" + space + ']', e);
-        }
     }
 
     /**
@@ -355,7 +368,9 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
      * @throws IgniteCheckedException If failed.
      * @throws IOException If failed.
      */
-    protected synchronized void storeDataBlock(ByteBuffer block) throws IgniteCheckedException, IOException {
+    protected void storeDataBlock(ByteBuffer block) throws IgniteCheckedException, IOException {
+        assert Thread.holdsLock(mux);
+
         int writeLen = block.remaining();
 
         preStoreDataBlocks(null, writeLen);
@@ -396,7 +411,9 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
      * @throws IgniteCheckedException If failed.
      * @throws IOException If failed.
      */
-    protected synchronized void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException {
+    protected void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException {
+        assert Thread.holdsLock(mux);
+
         preStoreDataBlocks(in, len);
 
         int blockSize = fileInfo.blockSize();
@@ -434,7 +451,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
      * @throws IOException If failed.
      */
     private void onClose(boolean deleted) throws IOException {
-        assert Thread.holdsLock(this);
+        assert Thread.holdsLock(mux);
 
         if (onCloseGuard.compareAndSet(false, true)) {
             // Notify backing secondary file system batch to finish.
@@ -516,7 +533,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
      * @throws IOException If this stream is closed.
      */
     protected void checkClosed(@Nullable DataInput in, int len) throws IOException {
-        assert Thread.holdsLock(this);
+        assert Thread.holdsLock(mux);
 
         if (closed) {
             // Must read data from stream before throwing exception.
@@ -535,7 +552,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
      * @throws IOException In case of IO exception.
      */
     protected void sendData(boolean flip) throws IOException {
-        assert Thread.holdsLock(this);
+        assert Thread.holdsLock(mux);
 
         try {
             if (flip)