You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/06/07 07:56:05 UTC
[02/14] ignite git commit: Better encapsulated monitor.
Better encapsulated monitor.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d3a432c0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d3a432c0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d3a432c0
Branch: refs/heads/ignite-3264
Commit: d3a432c02e300988e39516641fb17d5b5a9af698
Parents: 3cd3373
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 11:57:10 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jun 6 11:57:10 2016 +0300
----------------------------------------------------------------------
.../processors/igfs/IgfsOutputStreamImpl.java | 271 ++++++++++---------
1 file changed, 144 insertions(+), 127 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d3a432c0/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index 7a40ba3..7363ffe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -113,6 +113,9 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
/** Close guard. */
private final AtomicBoolean closeGuard = new AtomicBoolean(false);
+ /** Mutex for synchronization. */
+ private final Object mux = new Object();
+
/**
* Constructs file output stream.
*
@@ -126,59 +129,63 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
*/
IgfsOutputStreamImpl(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo fileInfo, int bufSize, IgfsMode mode,
@Nullable IgfsFileWorkerBatch batch, IgfsLocalMetrics metrics) {
- this.path = path;
- this.bufSize = optimizeBufferSize(bufSize, fileInfo);
+ synchronized (mux) {
+ this.path = path;
+ this.bufSize = optimizeBufferSize(bufSize, fileInfo);
- assert fileInfo != null;
- assert fileInfo.isFile() : "Unexpected file info: " + fileInfo;
- assert mode != null && mode != PROXY;
- assert mode == PRIMARY && batch == null || batch != null;
- assert metrics != null;
+ assert fileInfo != null;
+ assert fileInfo.isFile() : "Unexpected file info: " + fileInfo;
+ assert mode != null && mode != PROXY;
+ assert mode == PRIMARY && batch == null || batch != null;
+ assert metrics != null;
- // File hasn't been locked.
- if (fileInfo.lockId() == null)
- throw new IgfsException("Failed to acquire file lock (concurrently modified?): " + path);
+ // File hasn't been locked.
+ if (fileInfo.lockId() == null)
+ throw new IgfsException("Failed to acquire file lock (concurrently modified?): " + path);
- assert !IgfsUtils.DELETE_LOCK_ID.equals(fileInfo.lockId());
+ assert !IgfsUtils.DELETE_LOCK_ID.equals(fileInfo.lockId());
- this.igfsCtx = igfsCtx;
- meta = igfsCtx.meta();
- data = igfsCtx.data();
+ this.igfsCtx = igfsCtx;
+ meta = igfsCtx.meta();
+ data = igfsCtx.data();
- this.fileInfo = fileInfo;
- this.mode = mode;
- this.batch = batch;
- this.metrics = metrics;
+ this.fileInfo = fileInfo;
+ this.mode = mode;
+ this.batch = batch;
+ this.metrics = metrics;
- streamRange = initialStreamRange(fileInfo);
+ streamRange = initialStreamRange(fileInfo);
- writeCompletionFut = data.writeStart(fileInfo);
+ writeCompletionFut = data.writeStart(fileInfo);
- metrics.incrementFilesOpenedForWrite();
+ metrics.incrementFilesOpenedForWrite();
+ }
}
/** {@inheritDoc} */
- @Override public synchronized void write(int b) throws IOException {
- checkClosed(null, 0);
+ @Override public void write(int b) throws IOException {
+ synchronized (mux) {
+ checkClosed(null, 0);
- long startTime = System.nanoTime();
+ long startTime = System.nanoTime();
- b &= 0xFF;
+ b &= 0xFF;
- if (buf == null)
- buf = ByteBuffer.allocate(bufSize);
+ if (buf == null)
+ buf = ByteBuffer.allocate(bufSize);
- buf.put((byte)b);
+ buf.put((byte)b);
- if (buf.position() >= bufSize)
- sendData(true); // Send data to server.
+ if (buf.position() >= bufSize)
+ sendData(true); // Send data to server.
- time += System.nanoTime() - startTime;
+ time += System.nanoTime() - startTime;
+ }
}
/** {@inheritDoc} */
@SuppressWarnings("NullableProblems")
- @Override public synchronized void write(byte[] b, int off, int len) throws IOException {
+ @Override public void write(byte[] b, int off, int len) throws IOException {
A.notNull(b, "b");
if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
@@ -186,89 +193,94 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
", length=" + len + ']');
}
- checkClosed(null, 0);
+ synchronized (mux) {
+ checkClosed(null, 0);
- if (len == 0)
- return; // Done.
+ if (len == 0)
+ return; // Done.
- long startTime = System.nanoTime();
+ long startTime = System.nanoTime();
- if (buf == null) {
- // Do not allocate and copy byte buffer if will send data immediately.
- if (len >= bufSize) {
- buf = ByteBuffer.wrap(b, off, len);
+ if (buf == null) {
+ // Do not allocate and copy byte buffer if will send data immediately.
+ if (len >= bufSize) {
+ buf = ByteBuffer.wrap(b, off, len);
- sendData(false);
+ sendData(false);
- return;
- }
+ return;
+ }
- buf = ByteBuffer.allocate(Math.max(bufSize, len));
- }
+ buf = ByteBuffer.allocate(Math.max(bufSize, len));
+ }
- if (buf.remaining() < len)
- // Expand buffer capacity, if remaining size is less then data size.
- buf = ByteBuffer.allocate(buf.position() + len).put((ByteBuffer)buf.flip());
+ if (buf.remaining() < len)
+ // Expand buffer capacity, if remaining size is less then data size.
+ buf = ByteBuffer.allocate(buf.position() + len).put((ByteBuffer)buf.flip());
- assert len <= buf.remaining() : "Expects write data size less or equal then remaining buffer capacity " +
- "[len=" + len + ", buf.remaining=" + buf.remaining() + ']';
+ assert len <= buf.remaining() : "Expects write data size less or equal then remaining buffer capacity " +
+ "[len=" + len + ", buf.remaining=" + buf.remaining() + ']';
- buf.put(b, off, len);
+ buf.put(b, off, len);
- if (buf.position() >= bufSize)
- sendData(true); // Send data to server.
+ if (buf.position() >= bufSize)
+ sendData(true); // Send data to server.
- time += System.nanoTime() - startTime;
+ time += System.nanoTime() - startTime;
+ }
}
/** {@inheritDoc} */
- @Override public synchronized void transferFrom(DataInput in, int len) throws IOException {
- checkClosed(in, len);
+ @Override public void transferFrom(DataInput in, int len) throws IOException {
+ synchronized (mux) {
+ checkClosed(in, len);
- long startTime = System.nanoTime();
+ long startTime = System.nanoTime();
- // Send all IPC data from the local buffer before streaming.
- if (buf != null && buf.position() > 0)
- sendData(true);
+ // Send all IPC data from the local buffer before streaming.
+ if (buf != null && buf.position() > 0)
+ sendData(true);
- try {
- storeDataBlocks(in, len);
- }
- catch (IgniteCheckedException e) {
- throw new IOException(e.getMessage(), e);
- }
+ try {
+ storeDataBlocks(in, len);
+ } catch (IgniteCheckedException e) {
+ throw new IOException(e.getMessage(), e);
+ }
- time += System.nanoTime() - startTime;
+ time += System.nanoTime() - startTime;
+ }
}
/** {@inheritDoc} */
- @Override public final synchronized void close() throws IOException {
- // Do nothing if stream is already closed.
- if (closed)
- return;
+ @Override public final void close() throws IOException {
+ synchronized (mux) {
+ // Do nothing if stream is already closed.
+ if (closed)
+ return;
- try {
- // Send all IPC data from the local buffer.
try {
- flush();
- }
- finally {
- if (closeGuard.compareAndSet(false, true)) {
- onClose(false);
+ // Send all IPC data from the local buffer.
+ try {
+ flush();
+ }
+ finally {
+ if (closeGuard.compareAndSet(false, true)) {
+ onClose(false);
- metrics.decrementFilesOpenedForWrite();
+ metrics.decrementFilesOpenedForWrite();
- GridEventStorageManager evts = igfsCtx.kernalContext().event();
+ GridEventStorageManager evts = igfsCtx.kernalContext().event();
- if (evts.isRecordable(EVT_IGFS_FILE_CLOSED_WRITE))
- evts.record(new IgfsEvent(path, igfsCtx.kernalContext().discovery().localNode(),
- EVT_IGFS_FILE_CLOSED_WRITE, bytes));
+ if (evts.isRecordable(EVT_IGFS_FILE_CLOSED_WRITE))
+ evts.record(new IgfsEvent(path, igfsCtx.kernalContext().discovery().localNode(),
+ EVT_IGFS_FILE_CLOSED_WRITE, bytes));
+ }
}
}
- }
- finally {
- // Mark this stream closed AFTER flush.
- closed = true;
+ finally {
+ // Mark this stream closed AFTER flush.
+ closed = true;
+ }
}
}
@@ -277,55 +289,56 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
*
* @exception IOException if an I/O error occurs.
*/
- @Override public synchronized void flush() throws IOException {
- boolean exists;
+ @Override public void flush() throws IOException {
+ synchronized (mux) {
- try {
- exists = meta.exists(fileInfo.id());
- }
- catch (IgniteCheckedException e) {
- throw new IOException("File to read file metadata: " + path, e);
- }
+ boolean exists;
- if (!exists) {
- onClose(true);
+ try {
+ exists = meta.exists(fileInfo.id());
+ } catch (IgniteCheckedException e) {
+ throw new IOException("File to read file metadata: " + path, e);
+ }
- throw new IOException("File was concurrently deleted: " + path);
- }
+ if (!exists) {
+ onClose(true);
- checkClosed(null, 0);
+ throw new IOException("File was concurrently deleted: " + path);
+ }
- // Send all IPC data from the local buffer.
- if (buf != null && buf.position() > 0)
- sendData(true);
+ checkClosed(null, 0);
- try {
- if (remainder != null) {
- data.storeDataBlocks(fileInfo, fileInfo.length() + space, null, 0,
- ByteBuffer.wrap(remainder, 0, remainderDataLen), true, streamRange, batch);
+ // Send all IPC data from the local buffer.
+ if (buf != null && buf.position() > 0)
+ sendData(true);
- remainder = null;
- remainderDataLen = 0;
- }
+ try {
+ if (remainder != null) {
+ data.storeDataBlocks(fileInfo, fileInfo.length() + space, null, 0,
+ ByteBuffer.wrap(remainder, 0, remainderDataLen), true, streamRange, batch);
- if (space > 0) {
- data.awaitAllAcksReceived(fileInfo.id());
+ remainder = null;
+ remainderDataLen = 0;
+ }
- IgfsEntryInfo fileInfo0 = meta.reserveSpace(path, fileInfo.id(), space, streamRange);
+ if (space > 0) {
+ data.awaitAllAcksReceived(fileInfo.id());
- if (fileInfo0 == null)
- throw new IOException("File was concurrently deleted: " + path);
- else
- fileInfo = fileInfo0;
+ IgfsEntryInfo fileInfo0 = meta.reserveSpace(path, fileInfo.id(), space, streamRange);
- streamRange = initialStreamRange(fileInfo);
+ if (fileInfo0 == null)
+ throw new IOException("File was concurrently deleted: " + path);
+ else
+ fileInfo = fileInfo0;
- space = 0;
+ streamRange = initialStreamRange(fileInfo);
+
+ space = 0;
+ }
+ } catch (IgniteCheckedException e) {
+ throw new IOException("Failed to flush data [path=" + path + ", space=" + space + ']', e);
}
}
- catch (IgniteCheckedException e) {
- throw new IOException("Failed to flush data [path=" + path + ", space=" + space + ']', e);
- }
}
/**
@@ -355,7 +368,9 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
* @throws IgniteCheckedException If failed.
* @throws IOException If failed.
*/
- protected synchronized void storeDataBlock(ByteBuffer block) throws IgniteCheckedException, IOException {
+ protected void storeDataBlock(ByteBuffer block) throws IgniteCheckedException, IOException {
+ assert Thread.holdsLock(mux);
+
int writeLen = block.remaining();
preStoreDataBlocks(null, writeLen);
@@ -396,7 +411,9 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
* @throws IgniteCheckedException If failed.
* @throws IOException If failed.
*/
- protected synchronized void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException {
+ protected void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException {
+ assert Thread.holdsLock(mux);
+
preStoreDataBlocks(in, len);
int blockSize = fileInfo.blockSize();
@@ -434,7 +451,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
* @throws IOException If failed.
*/
private void onClose(boolean deleted) throws IOException {
- assert Thread.holdsLock(this);
+ assert Thread.holdsLock(mux);
if (onCloseGuard.compareAndSet(false, true)) {
// Notify backing secondary file system batch to finish.
@@ -516,7 +533,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
* @throws IOException If this stream is closed.
*/
protected void checkClosed(@Nullable DataInput in, int len) throws IOException {
- assert Thread.holdsLock(this);
+ assert Thread.holdsLock(mux);
if (closed) {
// Must read data from stream before throwing exception.
@@ -535,7 +552,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
* @throws IOException In case of IO exception.
*/
protected void sendData(boolean flip) throws IOException {
- assert Thread.holdsLock(this);
+ assert Thread.holdsLock(mux);
try {
if (flip)