You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/06/07 07:56:04 UTC
[01/14] ignite git commit: WIP on output stream optos.
Repository: ignite
Updated Branches:
refs/heads/ignite-3264 [created] 99d244a30
WIP on output stream optos.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3cd33732
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3cd33732
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3cd33732
Branch: refs/heads/ignite-3264
Commit: 3cd337329a3e3df1c7deb97742833f55ea1c6821
Parents: e409b67
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 11:52:07 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jun 6 11:52:07 2016 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/IgniteKernal.java | 6 +-
.../internal/processors/igfs/IgfsImpl.java | 48 +--
.../igfs/IgfsOutputStreamAdapter.java | 265 -------------
.../processors/igfs/IgfsOutputStreamImpl.java | 385 ++++++++++++++-----
4 files changed, 298 insertions(+), 406 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3cd33732/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index d1f3ef5..d257807 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -862,9 +862,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
startProcessor(new DataStreamProcessor(ctx));
startProcessor((GridProcessor)IGFS.create(ctx, F.isEmpty(cfg.getFileSystemConfiguration())));
startProcessor(new GridContinuousProcessor(ctx));
- startProcessor((GridProcessor)(cfg.isPeerClassLoadingEnabled() ?
- IgniteComponentType.HADOOP.create(ctx, true): // No-op when peer class loading is enabled.
- IgniteComponentType.HADOOP.createIfInClassPath(ctx, cfg.getHadoopConfiguration() != null)));
+// startProcessor((GridProcessor)(cfg.isPeerClassLoadingEnabled() ?
+// IgniteComponentType.HADOOP.create(ctx, true): // No-op when peer class loading is enabled.
+// IgniteComponentType.HADOOP.createIfInClassPath(ctx, cfg.getHadoopConfiguration() != null)));
startProcessor(new DataStructuresProcessor(ctx));
startProcessor(createComponent(PlatformProcessor.class, ctx));
http://git-wip-us.apache.org/repos/asf/ignite/blob/3cd33732/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 9087ff0..bc2e087 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -1077,8 +1077,8 @@ public final class IgfsImpl implements IgfsEx {
batch = newBatch(path, desc.out());
- IgfsEventAwareOutputStream os = new IgfsEventAwareOutputStream(path, desc.info(),
- bufferSize(bufSize), mode, batch);
+ IgfsOutputStreamImpl os = new IgfsOutputStreamImpl(igfsCtx, path, desc.info(),
+ bufferSize(bufSize), mode, batch, metrics);
IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_WRITE);
@@ -1107,7 +1107,7 @@ public final class IgfsImpl implements IgfsEx {
assert res != null;
- return new IgfsEventAwareOutputStream(path, res, bufferSize(bufSize), mode, null);
+ return new IgfsOutputStreamImpl(igfsCtx, path, res, bufferSize(bufSize), mode, null, metrics);
}
});
}
@@ -1142,7 +1142,8 @@ public final class IgfsImpl implements IgfsEx {
batch = newBatch(path, desc.out());
- return new IgfsEventAwareOutputStream(path, desc.info(), bufferSize(bufSize), mode, batch);
+ return new IgfsOutputStreamImpl(igfsCtx, path, desc.info(), bufferSize(bufSize), mode, batch,
+ metrics);
}
final List<IgniteUuid> ids = meta.idsForPath(path);
@@ -1183,7 +1184,7 @@ public final class IgfsImpl implements IgfsEx {
assert res != null;
- return new IgfsEventAwareOutputStream(path, res, bufferSize(bufSize), mode, null);
+ return new IgfsOutputStreamImpl(igfsCtx, path, res, bufferSize(bufSize), mode, null, metrics);
}
});
}
@@ -1759,43 +1760,6 @@ public final class IgfsImpl implements IgfsEx {
}
/**
- * IGFS output stream extension that fires events.
- */
- private class IgfsEventAwareOutputStream extends IgfsOutputStreamImpl {
- /** Close guard. */
- private final AtomicBoolean closeGuard = new AtomicBoolean(false);
-
- /**
- * Constructs file output stream.
- *
- * @param path Path to stored file.
- * @param fileInfo File info.
- * @param bufSize The size of the buffer to be used.
- * @param mode IGFS mode.
- * @param batch Optional secondary file system batch.
- */
- IgfsEventAwareOutputStream(IgfsPath path, IgfsEntryInfo fileInfo, int bufSize, IgfsMode mode,
- @Nullable IgfsFileWorkerBatch batch) {
- super(igfsCtx, path, fileInfo, bufSize, mode, batch, metrics);
-
- metrics.incrementFilesOpenedForWrite();
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("NonSynchronizedMethodOverridesSynchronizedMethod")
- @Override protected void onClose() throws IOException {
- if (closeGuard.compareAndSet(false, true)) {
- super.onClose();
-
- metrics.decrementFilesOpenedForWrite();
-
- if (evts.isRecordable(EVT_IGFS_FILE_CLOSED_WRITE))
- evts.record(new IgfsEvent(path, localNode(), EVT_IGFS_FILE_CLOSED_WRITE, bytes()));
- }
- }
- }
-
- /**
* IGFS input stream extension that fires events.
*/
private class IgfsEventAwareInputStream extends IgfsInputStreamImpl {
http://git-wip-us.apache.org/repos/asf/ignite/blob/3cd33732/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamAdapter.java
deleted file mode 100644
index 43de61e..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamAdapter.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.igfs;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.igfs.IgfsOutputStream;
-import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Output stream to store data into grid cache with separate blocks.
- */
-@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
-abstract class IgfsOutputStreamAdapter extends IgfsOutputStream {
- /** Path to file. */
- protected final IgfsPath path;
-
- /** Buffer size. */
- private final int bufSize;
-
- /** Flag for this stream open/closed state. */
- private boolean closed;
-
- /** Local buffer to store stream data as consistent block. */
- private ByteBuffer buf;
-
- /** Bytes written. */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- protected long bytes;
-
- /** Time consumed by write operations. */
- protected long time;
-
- /**
- * Constructs file output stream.
- *
- * @param path Path to stored file.
- * @param bufSize The size of the buffer to be used.
- */
- IgfsOutputStreamAdapter(IgfsPath path, int bufSize) {
- assert path != null;
- assert bufSize > 0;
-
- this.path = path;
- this.bufSize = bufSize;
- }
-
- /**
- * Gets number of written bytes.
- *
- * @return Written bytes.
- */
- public long bytes() {
- return bytes;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void write(int b) throws IOException {
- checkClosed(null, 0);
-
- long startTime = System.nanoTime();
-
- b &= 0xFF;
-
- if (buf == null)
- buf = ByteBuffer.allocate(bufSize);
-
- buf.put((byte)b);
-
- if (buf.position() >= bufSize)
- sendData(true); // Send data to server.
-
- time += System.nanoTime() - startTime;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void write(byte[] b, int off, int len) throws IOException {
- A.notNull(b, "b");
-
- if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
- throw new IndexOutOfBoundsException("Invalid bounds [data.length=" + b.length + ", offset=" + off +
- ", length=" + len + ']');
- }
-
- checkClosed(null, 0);
-
- if (len == 0)
- return; // Done.
-
- long startTime = System.nanoTime();
-
- if (buf == null) {
- // Do not allocate and copy byte buffer if will send data immediately.
- if (len >= bufSize) {
- buf = ByteBuffer.wrap(b, off, len);
-
- sendData(false);
-
- return;
- }
-
- buf = ByteBuffer.allocate(Math.max(bufSize, len));
- }
-
- if (buf.remaining() < len)
- // Expand buffer capacity, if remaining size is less then data size.
- buf = ByteBuffer.allocate(buf.position() + len).put((ByteBuffer)buf.flip());
-
- assert len <= buf.remaining() : "Expects write data size less or equal then remaining buffer capacity " +
- "[len=" + len + ", buf.remaining=" + buf.remaining() + ']';
-
- buf.put(b, off, len);
-
- if (buf.position() >= bufSize)
- sendData(true); // Send data to server.
-
- time += System.nanoTime() - startTime;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void transferFrom(DataInput in, int len) throws IOException {
- checkClosed(in, len);
-
- long startTime = System.nanoTime();
-
- // Send all IPC data from the local buffer before streaming.
- if (buf != null && buf.position() > 0)
- sendData(true);
-
- try {
- storeDataBlocks(in, len);
- }
- catch (IgniteCheckedException e) {
- throw new IOException(e.getMessage(), e);
- }
-
- time += System.nanoTime() - startTime;
- }
-
- /**
- * Flushes this output stream and forces any buffered output bytes to be written out.
- *
- * @exception IOException if an I/O error occurs.
- */
- @Override public synchronized void flush() throws IOException {
- checkClosed(null, 0);
-
- // Send all IPC data from the local buffer.
- if (buf != null && buf.position() > 0)
- sendData(true);
- }
-
- /** {@inheritDoc} */
- @Override public final synchronized void close() throws IOException {
- // Do nothing if stream is already closed.
- if (closed)
- return;
-
- try {
- // Send all IPC data from the local buffer.
- try {
- flush();
- }
- finally {
- onClose(); // "onClose()" routine must be invoked anyway!
- }
- }
- finally {
- // Mark this stream closed AFTER flush.
- closed = true;
- }
- }
-
- /**
- * Store data blocks in file.<br/>
- * Note! If file concurrently deleted we'll get lost blocks.
- *
- * @param data Data to store.
- * @throws IgniteCheckedException If failed.
- */
- protected abstract void storeDataBlock(ByteBuffer data) throws IgniteCheckedException, IOException;
-
- /**
- * Store data blocks in file reading appropriate number of bytes from given data input.
- *
- * @param in Data input to read from.
- * @param len Data length to store.
- * @throws IgniteCheckedException If failed.
- */
- protected abstract void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException;
-
- /**
- * Close callback. It will be called only once in synchronized section.
- *
- * @throws IOException If failed.
- */
- protected void onClose() throws IOException {
- // No-op.
- }
-
- /**
- * Validate this stream is open.
- *
- * @throws IOException If this stream is closed.
- */
- private void checkClosed(@Nullable DataInput in, int len) throws IOException {
- assert Thread.holdsLock(this);
-
- if (closed) {
- // Must read data from stream before throwing exception.
- if (in != null)
- in.skipBytes(len);
-
- throw new IOException("Stream has been closed: " + this);
- }
- }
-
- /**
- * Send all local-buffered data to server.
- *
- * @param flip Whether to flip buffer on sending data. We do not want to flip it if sending wrapped
- * byte array.
- * @throws IOException In case of IO exception.
- */
- private void sendData(boolean flip) throws IOException {
- assert Thread.holdsLock(this);
-
- try {
- if (flip)
- buf.flip();
-
- storeDataBlock(buf);
- }
- catch (IgniteCheckedException e) {
- throw new IOException("Failed to store data into file: " + path, e);
- }
-
- buf = null;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(IgfsOutputStreamAdapter.class, this);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/3cd33732/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index 21e5fb6..7a40ba3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -18,12 +18,16 @@
package org.apache.ignite.internal.processors.igfs;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.events.IgfsEvent;
import org.apache.ignite.igfs.IgfsException;
import org.apache.ignite.igfs.IgfsMode;
+import org.apache.ignite.igfs.IgfsOutputStream;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.igfs.IgfsPathNotFoundException;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
@@ -34,6 +38,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
+import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_WRITE;
import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
import static org.apache.ignite.igfs.IgfsMode.PROXY;
@@ -41,10 +46,29 @@ import static org.apache.ignite.igfs.IgfsMode.PROXY;
/**
* Output stream to store data into grid cache with separate blocks.
*/
-class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
+class IgfsOutputStreamImpl extends IgfsOutputStream {
/** Maximum number of blocks in buffer. */
private static final int MAX_BLOCKS_CNT = 16;
+ /** Path to file. */
+ protected final IgfsPath path;
+
+ /** Buffer size. */
+ protected final int bufSize;
+
+ /** Flag for this stream open/closed state. */
+ protected boolean closed;
+
+ /** Local buffer to store stream data as consistent block. */
+ protected ByteBuffer buf;
+
+ /** Bytes written. */
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ protected long bytes;
+
+ /** Time consumed by write operations. */
+ protected long time;
+
/** IGFS context. */
private IgfsContext igfsCtx;
@@ -86,6 +110,9 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
/** Affinity written by this output stream. */
private IgfsFileAffinityRange streamRange;
+ /** Close guard. */
+ private final AtomicBoolean closeGuard = new AtomicBoolean(false);
+
/**
* Constructs file output stream.
*
@@ -99,7 +126,8 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
*/
IgfsOutputStreamImpl(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo fileInfo, int bufSize, IgfsMode mode,
@Nullable IgfsFileWorkerBatch batch, IgfsLocalMetrics metrics) {
- super(path, optimizeBufferSize(bufSize, fileInfo));
+ this.path = path;
+ this.bufSize = optimizeBufferSize(bufSize, fileInfo);
assert fileInfo != null;
assert fileInfo.isFile() : "Unexpected file info: " + fileInfo;
@@ -125,128 +153,123 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
streamRange = initialStreamRange(fileInfo);
writeCompletionFut = data.writeStart(fileInfo);
- }
- /**
- * Optimize buffer size.
- *
- * @param bufSize Requested buffer size.
- * @param fileInfo File info.
- * @return Optimized buffer size.
- */
- @SuppressWarnings("IfMayBeConditional")
- private static int optimizeBufferSize(int bufSize, IgfsEntryInfo fileInfo) {
- assert bufSize > 0;
+ metrics.incrementFilesOpenedForWrite();
+ }
- if (fileInfo == null)
- return bufSize;
+ /** {@inheritDoc} */
+ @Override public synchronized void write(int b) throws IOException {
+ checkClosed(null, 0);
- int blockSize = fileInfo.blockSize();
+ long startTime = System.nanoTime();
- if (blockSize <= 0)
- return bufSize;
+ b &= 0xFF;
- if (bufSize <= blockSize)
- // Optimize minimum buffer size to be equal file's block size.
- return blockSize;
+ if (buf == null)
+ buf = ByteBuffer.allocate(bufSize);
- int maxBufSize = blockSize * MAX_BLOCKS_CNT;
+ buf.put((byte)b);
- if (bufSize > maxBufSize)
- // There is no profit or optimization from larger buffers.
- return maxBufSize;
+ if (buf.position() >= bufSize)
+ sendData(true); // Send data to server.
- if (fileInfo.length() == 0)
- // Make buffer size multiple of block size (optimized for new files).
- return bufSize / blockSize * blockSize;
-
- return bufSize;
+ time += System.nanoTime() - startTime;
}
/** {@inheritDoc} */
- @Override protected synchronized void storeDataBlock(ByteBuffer block) throws IgniteCheckedException, IOException {
- int writeLen = block.remaining();
+ @SuppressWarnings("NullableProblems")
+ @Override public synchronized void write(byte[] b, int off, int len) throws IOException {
+ A.notNull(b, "b");
- preStoreDataBlocks(null, writeLen);
+ if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
+ throw new IndexOutOfBoundsException("Invalid bounds [data.length=" + b.length + ", offset=" + off +
+ ", length=" + len + ']');
+ }
- int blockSize = fileInfo.blockSize();
+ checkClosed(null, 0);
- // If data length is not enough to fill full block, fill the remainder and return.
- if (remainderDataLen + writeLen < blockSize) {
- if (remainder == null)
- remainder = new byte[blockSize];
- else if (remainder.length != blockSize) {
- assert remainderDataLen == remainder.length;
+ if (len == 0)
+ return; // Done.
- byte[] allocated = new byte[blockSize];
+ long startTime = System.nanoTime();
- U.arrayCopy(remainder, 0, allocated, 0, remainder.length);
+ if (buf == null) {
+ // Do not allocate and copy byte buffer if will send data immediately.
+ if (len >= bufSize) {
+ buf = ByteBuffer.wrap(b, off, len);
- remainder = allocated;
- }
+ sendData(false);
- block.get(remainder, remainderDataLen, writeLen);
+ return;
+ }
- remainderDataLen += writeLen;
+ buf = ByteBuffer.allocate(Math.max(bufSize, len));
}
- else {
- remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, block,
- false, streamRange, batch);
- remainderDataLen = remainder == null ? 0 : remainder.length;
- }
- }
+ if (buf.remaining() < len)
+ // Expand buffer capacity, if remaining size is less then data size.
+ buf = ByteBuffer.allocate(buf.position() + len).put((ByteBuffer)buf.flip());
- /** {@inheritDoc} */
- @Override protected synchronized void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException {
- preStoreDataBlocks(in, len);
+ assert len <= buf.remaining() : "Expects write data size less or equal then remaining buffer capacity " +
+ "[len=" + len + ", buf.remaining=" + buf.remaining() + ']';
- int blockSize = fileInfo.blockSize();
+ buf.put(b, off, len);
- // If data length is not enough to fill full block, fill the remainder and return.
- if (remainderDataLen + len < blockSize) {
- if (remainder == null)
- remainder = new byte[blockSize];
- else if (remainder.length != blockSize) {
- assert remainderDataLen == remainder.length;
+ if (buf.position() >= bufSize)
+ sendData(true); // Send data to server.
- byte[] allocated = new byte[blockSize];
+ time += System.nanoTime() - startTime;
+ }
- U.arrayCopy(remainder, 0, allocated, 0, remainder.length);
+ /** {@inheritDoc} */
+ @Override public synchronized void transferFrom(DataInput in, int len) throws IOException {
+ checkClosed(in, len);
- remainder = allocated;
- }
+ long startTime = System.nanoTime();
- in.readFully(remainder, remainderDataLen, len);
+ // Send all IPC data from the local buffer before streaming.
+ if (buf != null && buf.position() > 0)
+ sendData(true);
- remainderDataLen += len;
+ try {
+ storeDataBlocks(in, len);
}
- else {
- remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, in, len,
- false, streamRange, batch);
-
- remainderDataLen = remainder == null ? 0 : remainder.length;
+ catch (IgniteCheckedException e) {
+ throw new IOException(e.getMessage(), e);
}
+
+ time += System.nanoTime() - startTime;
}
- /**
- * Initializes data loader if it was not initialized yet and updates written space.
- *
- * @param len Data length to be written.
- */
- private void preStoreDataBlocks(@Nullable DataInput in, int len) throws IgniteCheckedException, IOException {
- // Check if any exception happened while writing data.
- if (writeCompletionFut.isDone()) {
- assert ((GridFutureAdapter)writeCompletionFut).isFailed();
+ /** {@inheritDoc} */
+ @Override public final synchronized void close() throws IOException {
+ // Do nothing if stream is already closed.
+ if (closed)
+ return;
- if (in != null)
- in.skipBytes(len);
+ try {
+ // Send all IPC data from the local buffer.
+ try {
+ flush();
+ }
+ finally {
+ if (closeGuard.compareAndSet(false, true)) {
+ onClose(false);
- writeCompletionFut.get();
- }
+ metrics.decrementFilesOpenedForWrite();
- bytes += len;
- space += len;
+ GridEventStorageManager evts = igfsCtx.kernalContext().event();
+
+ if (evts.isRecordable(EVT_IGFS_FILE_CLOSED_WRITE))
+ evts.record(new IgfsEvent(path, igfsCtx.kernalContext().discovery().localNode(),
+ EVT_IGFS_FILE_CLOSED_WRITE, bytes));
+ }
+ }
+ }
+ finally {
+ // Mark this stream closed AFTER flush.
+ closed = true;
+ }
}
/**
@@ -270,7 +293,11 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
throw new IOException("File was concurrently deleted: " + path);
}
- super.flush();
+ checkClosed(null, 0);
+
+ // Send all IPC data from the local buffer.
+ if (buf != null && buf.position() > 0)
+ sendData(true);
try {
if (remainder != null) {
@@ -301,9 +328,103 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
}
}
- /** {@inheritDoc} */
- @Override protected void onClose() throws IOException {
- onClose(false);
+ /**
+ * Initializes data loader if it was not initialized yet and updates written space.
+ *
+ * @param len Data length to be written.
+ */
+ private void preStoreDataBlocks(@Nullable DataInput in, int len) throws IgniteCheckedException, IOException {
+ // Check if any exception happened while writing data.
+ if (writeCompletionFut.isDone()) {
+ assert ((GridFutureAdapter)writeCompletionFut).isFailed();
+
+ if (in != null)
+ in.skipBytes(len);
+
+ writeCompletionFut.get();
+ }
+
+ bytes += len;
+ space += len;
+ }
+
+ /**
+ * Store data block.
+ *
+ * @param block Block.
+ * @throws IgniteCheckedException If failed.
+ * @throws IOException If failed.
+ */
+ protected synchronized void storeDataBlock(ByteBuffer block) throws IgniteCheckedException, IOException {
+ int writeLen = block.remaining();
+
+ preStoreDataBlocks(null, writeLen);
+
+ int blockSize = fileInfo.blockSize();
+
+ // If data length is not enough to fill full block, fill the remainder and return.
+ if (remainderDataLen + writeLen < blockSize) {
+ if (remainder == null)
+ remainder = new byte[blockSize];
+ else if (remainder.length != blockSize) {
+ assert remainderDataLen == remainder.length;
+
+ byte[] allocated = new byte[blockSize];
+
+ U.arrayCopy(remainder, 0, allocated, 0, remainder.length);
+
+ remainder = allocated;
+ }
+
+ block.get(remainder, remainderDataLen, writeLen);
+
+ remainderDataLen += writeLen;
+ }
+ else {
+ remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, block,
+ false, streamRange, batch);
+
+ remainderDataLen = remainder == null ? 0 : remainder.length;
+ }
+ }
+
+ /**
+ * Store data blocks.
+ *
+ * @param in Input.
+ * @param len Length.
+ * @throws IgniteCheckedException If failed.
+ * @throws IOException If failed.
+ */
+ protected synchronized void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException {
+ preStoreDataBlocks(in, len);
+
+ int blockSize = fileInfo.blockSize();
+
+ // If data length is not enough to fill full block, fill the remainder and return.
+ if (remainderDataLen + len < blockSize) {
+ if (remainder == null)
+ remainder = new byte[blockSize];
+ else if (remainder.length != blockSize) {
+ assert remainderDataLen == remainder.length;
+
+ byte[] allocated = new byte[blockSize];
+
+ U.arrayCopy(remainder, 0, allocated, 0, remainder.length);
+
+ remainder = allocated;
+ }
+
+ in.readFully(remainder, remainderDataLen, len);
+
+ remainderDataLen += len;
+ }
+ else {
+ remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, in, len,
+ false, streamRange, batch);
+
+ remainderDataLen = remainder == null ? 0 : remainder.length;
+ }
}
/**
@@ -317,11 +438,8 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
if (onCloseGuard.compareAndSet(false, true)) {
// Notify backing secondary file system batch to finish.
- if (mode != PRIMARY) {
- assert batch != null;
-
+ if (batch != null)
batch.finish();
- }
// Ensure file existence.
boolean exists;
@@ -393,6 +511,46 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
}
/**
+ * Validate this stream is open.
+ *
+ * @throws IOException If this stream is closed.
+ */
+ protected void checkClosed(@Nullable DataInput in, int len) throws IOException {
+ assert Thread.holdsLock(this);
+
+ if (closed) {
+ // Must read data from stream before throwing exception.
+ if (in != null)
+ in.skipBytes(len);
+
+ throw new IOException("Stream has been closed: " + this);
+ }
+ }
+
+ /**
+ * Send all local-buffered data to server.
+ *
+ * @param flip Whether to flip buffer on sending data. We do not want to flip it if sending wrapped
+ * byte array.
+ * @throws IOException In case of IO exception.
+ */
+ protected void sendData(boolean flip) throws IOException {
+ assert Thread.holdsLock(this);
+
+ try {
+ if (flip)
+ buf.flip();
+
+ storeDataBlock(buf);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IOException("Failed to store data into file: " + path, e);
+ }
+
+ buf = null;
+ }
+
+ /**
* Gets initial affinity range. This range will have 0 length and will start from first
* non-occupied file block.
*
@@ -426,6 +584,41 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
return affKey == null ? null : new IgfsFileAffinityRange(off, off, affKey);
}
+ /**
+ * Optimize buffer size.
+ *
+ * @param bufSize Requested buffer size.
+ * @param fileInfo File info.
+ * @return Optimized buffer size.
+ */
+ private static int optimizeBufferSize(int bufSize, IgfsEntryInfo fileInfo) {
+ assert bufSize > 0;
+
+ if (fileInfo == null)
+ return bufSize;
+
+ int blockSize = fileInfo.blockSize();
+
+ if (blockSize <= 0)
+ return bufSize;
+
+ if (bufSize <= blockSize)
+ // Optimize minimum buffer size to be equal file's block size.
+ return blockSize;
+
+ int maxBufSize = blockSize * MAX_BLOCKS_CNT;
+
+ if (bufSize > maxBufSize)
+ // There is no profit or optimization from larger buffers.
+ return maxBufSize;
+
+ if (fileInfo.length() == 0)
+ // Make buffer size multiple of block size (optimized for new files).
+ return bufSize / blockSize * blockSize;
+
+ return bufSize;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(IgfsOutputStreamImpl.class, this);
[04/14] ignite git commit: Protected -> private.
Posted by vo...@apache.org.
Protected -> private.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b6c6b488
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b6c6b488
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b6c6b488
Branch: refs/heads/ignite-3264
Commit: b6c6b488608acbe0dc4009134b49cf0187f8a250
Parents: 75b6080
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 12:01:26 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jun 6 12:01:26 2016 +0300
----------------------------------------------------------------------
.../processors/igfs/IgfsOutputStreamImpl.java | 64 ++++++++++----------
1 file changed, 32 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b6c6b488/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index c50c431..98ccb81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -51,23 +51,23 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
private static final int MAX_BLOCKS_CNT = 16;
/** Path to file. */
- protected final IgfsPath path;
+ private final IgfsPath path;
/** Buffer size. */
- protected final int bufSize;
+ private final int bufSize;
/** Flag for this stream open/closed state. */
- protected boolean closed;
+ private boolean closed;
/** Local buffer to store stream data as consistent block. */
- protected ByteBuffer buf;
+ private ByteBuffer buf;
/** Bytes written. */
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- protected long bytes;
+ private long bytes;
/** Time consumed by write operations. */
- protected long time;
+ private long time;
/** IGFS context. */
private IgfsContext igfsCtx;
@@ -342,33 +342,13 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
}
/**
- * Initializes data loader if it was not initialized yet and updates written space.
- *
- * @param len Data length to be written.
- */
- private void preStoreDataBlocks(@Nullable DataInput in, int len) throws IgniteCheckedException, IOException {
- // Check if any exception happened while writing data.
- if (writeCompletionFut.isDone()) {
- assert ((GridFutureAdapter)writeCompletionFut).isFailed();
-
- if (in != null)
- in.skipBytes(len);
-
- writeCompletionFut.get();
- }
-
- bytes += len;
- space += len;
- }
-
- /**
* Store data block.
*
* @param block Block.
* @throws IgniteCheckedException If failed.
* @throws IOException If failed.
*/
- protected void storeDataBlock(ByteBuffer block) throws IgniteCheckedException, IOException {
+ private void storeDataBlock(ByteBuffer block) throws IgniteCheckedException, IOException {
assert Thread.holdsLock(mux);
int writeLen = block.remaining();
@@ -411,7 +391,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
* @throws IgniteCheckedException If failed.
* @throws IOException If failed.
*/
- protected void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException {
+ private void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException {
assert Thread.holdsLock(mux);
preStoreDataBlocks(in, len);
@@ -445,6 +425,26 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
}
/**
+ * Initializes data loader if it was not initialized yet and updates written space.
+ *
+ * @param len Data length to be written.
+ */
+ private void preStoreDataBlocks(@Nullable DataInput in, int len) throws IgniteCheckedException, IOException {
+ // Check if any exception happened while writing data.
+ if (writeCompletionFut.isDone()) {
+ assert ((GridFutureAdapter)writeCompletionFut).isFailed();
+
+ if (in != null)
+ in.skipBytes(len);
+
+ writeCompletionFut.get();
+ }
+
+ bytes += len;
+ space += len;
+ }
+
+ /**
* Close callback. It will be called only once in synchronized section.
*
* @param deleted Whether we already know that the file was deleted.
@@ -537,7 +537,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
*
* @throws IOException If this stream is closed.
*/
- protected void checkClosed(@Nullable DataInput in, int len) throws IOException {
+ private void checkClosed(@Nullable DataInput in, int len) throws IOException {
assert Thread.holdsLock(mux);
if (closed) {
@@ -556,7 +556,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
* byte array.
* @throws IOException In case of IO exception.
*/
- protected void sendData(boolean flip) throws IOException {
+ private void sendData(boolean flip) throws IOException {
assert Thread.holdsLock(mux);
try {
@@ -564,12 +564,12 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
buf.flip();
storeDataBlock(buf);
+
+ buf = null;
}
catch (IgniteCheckedException e) {
throw new IOException("Failed to store data into file: " + path, e);
}
-
- buf = null;
}
/**
[12/14] ignite git commit: Minors.
Posted by vo...@apache.org.
Minors.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/93f8eca5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/93f8eca5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/93f8eca5
Branch: refs/heads/ignite-3264
Commit: 93f8eca53008849ccfe609b5c7e5c20425f530e7
Parents: da1ff65
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Jun 7 09:57:07 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Jun 7 09:57:07 2016 +0300
----------------------------------------------------------------------
.../main/java/org/apache/ignite/internal/IgniteKernal.java | 6 +++---
.../ignite/internal/processors/igfs/IgfsOutputStreamImpl.java | 7 ++++---
2 files changed, 7 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/93f8eca5/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index d257807..d1f3ef5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -862,9 +862,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
startProcessor(new DataStreamProcessor(ctx));
startProcessor((GridProcessor)IGFS.create(ctx, F.isEmpty(cfg.getFileSystemConfiguration())));
startProcessor(new GridContinuousProcessor(ctx));
-// startProcessor((GridProcessor)(cfg.isPeerClassLoadingEnabled() ?
-// IgniteComponentType.HADOOP.create(ctx, true): // No-op when peer class loading is enabled.
-// IgniteComponentType.HADOOP.createIfInClassPath(ctx, cfg.getHadoopConfiguration() != null)));
+ startProcessor((GridProcessor)(cfg.isPeerClassLoadingEnabled() ?
+ IgniteComponentType.HADOOP.create(ctx, true): // No-op when peer class loading is enabled.
+ IgniteComponentType.HADOOP.createIfInClassPath(ctx, cfg.getHadoopConfiguration() != null)));
startProcessor(new DataStructuresProcessor(ctx));
startProcessor(createComponent(PlatformProcessor.class, ctx));
http://git-wip-us.apache.org/repos/asf/ignite/blob/93f8eca5/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index 16a20a2..f51e9b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -271,12 +271,12 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
*/
@Override public void flush() throws IOException {
synchronized (mux) {
-
boolean exists;
try {
exists = igfsCtx.meta().exists(fileInfo.id());
- } catch (IgniteCheckedException e) {
+ }
+ catch (IgniteCheckedException e) {
throw new IOException("File to read file metadata: " + path, e);
}
@@ -315,7 +315,8 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
space = 0;
}
- } catch (IgniteCheckedException e) {
+ }
+ catch (IgniteCheckedException e) {
throw new IOException("Failed to flush data [path=" + path + ", space=" + space + ']', e);
}
}
[05/14] ignite git commit: Removed "meta" and "data" fields.
Posted by vo...@apache.org.
Removed "meta" and "data" fields.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5949abe4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5949abe4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5949abe4
Branch: refs/heads/ignite-3264
Commit: 5949abe4943bc07de26c4ecce62148e7f399cb41
Parents: b6c6b48
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 12:02:59 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jun 6 12:02:59 2016 +0300
----------------------------------------------------------------------
.../processors/igfs/IgfsOutputStreamImpl.java | 38 ++++++++------------
1 file changed, 15 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5949abe4/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index 98ccb81..bc32e81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -72,12 +72,6 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
/** IGFS context. */
private IgfsContext igfsCtx;
- /** Meta info manager. */
- private final IgfsMetaManager meta;
-
- /** Data manager. */
- private final IgfsDataManager data;
-
/** File descriptor. */
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private IgfsEntryInfo fileInfo;
@@ -146,8 +140,6 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
assert !IgfsUtils.DELETE_LOCK_ID.equals(fileInfo.lockId());
this.igfsCtx = igfsCtx;
- meta = igfsCtx.meta();
- data = igfsCtx.data();
this.fileInfo = fileInfo;
this.mode = mode;
@@ -156,7 +148,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
streamRange = initialStreamRange(fileInfo);
- writeCompletionFut = data.writeStart(fileInfo);
+ writeCompletionFut = igfsCtx.data().writeStart(fileInfo);
metrics.incrementFilesOpenedForWrite();
}
@@ -295,7 +287,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
boolean exists;
try {
- exists = meta.exists(fileInfo.id());
+ exists = igfsCtx.meta().exists(fileInfo.id());
} catch (IgniteCheckedException e) {
throw new IOException("File to read file metadata: " + path, e);
}
@@ -314,7 +306,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
try {
if (remainder != null) {
- data.storeDataBlocks(fileInfo, fileInfo.length() + space, null, 0,
+ igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, null, 0,
ByteBuffer.wrap(remainder, 0, remainderDataLen), true, streamRange, batch);
remainder = null;
@@ -322,9 +314,9 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
}
if (space > 0) {
- data.awaitAllAcksReceived(fileInfo.id());
+ igfsCtx.data().awaitAllAcksReceived(fileInfo.id());
- IgfsEntryInfo fileInfo0 = meta.reserveSpace(path, fileInfo.id(), space, streamRange);
+ IgfsEntryInfo fileInfo0 = igfsCtx.meta().reserveSpace(path, fileInfo.id(), space, streamRange);
if (fileInfo0 == null)
throw new IOException("File was concurrently deleted: " + path);
@@ -376,8 +368,8 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
remainderDataLen += writeLen;
}
else {
- remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, block,
- false, streamRange, batch);
+ remainder = igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, remainder,
+ remainderDataLen, block, false, streamRange, batch);
remainderDataLen = remainder == null ? 0 : remainder.length;
}
@@ -417,8 +409,8 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
remainderDataLen += len;
}
else {
- remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, in, len,
- false, streamRange, batch);
+ remainder = igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, remainder,
+ remainderDataLen, in, len, false, streamRange, batch);
remainderDataLen = remainder == null ? 0 : remainder.length;
}
@@ -462,7 +454,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
boolean exists;
try {
- exists = !deleted && meta.exists(fileInfo.id());
+ exists = !deleted && igfsCtx.meta().exists(fileInfo.id());
}
catch (IgniteCheckedException e) {
throw new IOException("File to read file metadata: " + path, e);
@@ -472,7 +464,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
IOException err = null;
try {
- data.writeClose(fileInfo);
+ igfsCtx.data().writeClose(fileInfo);
writeCompletionFut.get();
}
@@ -499,10 +491,10 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
long modificationTime = System.currentTimeMillis();
try {
- meta.unlock(fileInfo, modificationTime);
+ igfsCtx.meta().unlock(fileInfo, modificationTime);
}
catch (IgfsPathNotFoundException ignore) {
- data.delete(fileInfo); // Safety to ensure that all data blocks are deleted.
+ igfsCtx.data().delete(fileInfo); // Safety to ensure that all data blocks are deleted.
throw new IOException("File was concurrently deleted: " + path);
}
@@ -526,7 +518,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
", fileInfo=" + fileInfo + ']', e);
}
finally {
- data.delete(fileInfo);
+ igfsCtx.data().delete(fileInfo);
}
}
}
@@ -601,7 +593,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
IgniteUuid prevAffKey = map == null ? null : map.affinityKey(lastBlockOff, false);
- IgniteUuid affKey = data.nextAffinityKey(prevAffKey);
+ IgniteUuid affKey = igfsCtx.data().nextAffinityKey(prevAffKey);
return affKey == null ? null : new IgfsFileAffinityRange(off, off, affKey);
}
[07/14] ignite git commit: Simplified ctor.
Posted by vo...@apache.org.
Simplified ctor.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/04e311b8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/04e311b8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/04e311b8
Branch: refs/heads/ignite-3264
Commit: 04e311b8cef3dd8e3dd1c27f8f4d5816bcc916db
Parents: a76b349
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 12:07:05 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jun 6 12:07:05 2016 +0300
----------------------------------------------------------------------
.../processors/igfs/IgfsOutputStreamImpl.java | 25 +++++++-------------
1 file changed, 9 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/04e311b8/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index 8c93aad..13808ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -119,33 +119,26 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
*/
IgfsOutputStreamImpl(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo fileInfo, int bufSize, IgfsMode mode,
@Nullable IgfsFileWorkerBatch batch) {
+ assert fileInfo != null && fileInfo.isFile() : "Unexpected file info: " + fileInfo;
+ assert mode != null && mode != PROXY && (mode == PRIMARY && batch == null || batch != null);
+
+ // File hasn't been locked.
+ if (fileInfo.lockId() == null)
+ throw new IgfsException("Failed to acquire file lock (concurrently modified?): " + path);
+
synchronized (mux) {
this.path = path;
this.bufSize = optimizeBufferSize(bufSize, fileInfo);
-
- assert fileInfo != null;
- assert fileInfo.isFile() : "Unexpected file info: " + fileInfo;
- assert mode != null && mode != PROXY;
- assert mode == PRIMARY && batch == null || batch != null;
-
- // File hasn't been locked.
- if (fileInfo.lockId() == null)
- throw new IgfsException("Failed to acquire file lock (concurrently modified?): " + path);
-
- assert !IgfsUtils.DELETE_LOCK_ID.equals(fileInfo.lockId());
-
this.igfsCtx = igfsCtx;
-
this.fileInfo = fileInfo;
this.mode = mode;
this.batch = batch;
streamRange = initialStreamRange(fileInfo);
-
writeCompletionFut = igfsCtx.data().writeStart(fileInfo);
-
- igfsCtx.igfs().localMetrics().incrementFilesOpenedForWrite();
}
+
+ igfsCtx.igfs().localMetrics().incrementFilesOpenedForWrite();
}
/** {@inheritDoc} */
[10/14] ignite git commit: IGNITE-3260: IGFS: Delete messages are no
longer passed.
Posted by vo...@apache.org.
IGNITE-3260: IGFS: Delete messages are no longer passed.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/065d2e70
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/065d2e70
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/065d2e70
Branch: refs/heads/ignite-3264
Commit: 065d2e70c21418437eba5e725eaa8b1ebc3af6da
Parents: 0176af1
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 18:12:42 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jun 6 18:12:42 2016 +0300
----------------------------------------------------------------------
.../internal/processors/igfs/IgfsAsyncImpl.java | 6 -
.../processors/igfs/IgfsDataManager.java | 61 ++---
.../processors/igfs/IgfsDeleteWorker.java | 42 ----
.../ignite/internal/processors/igfs/IgfsEx.java | 9 -
.../internal/processors/igfs/IgfsImpl.java | 249 +++++--------------
.../internal/processors/igfs/IgfsUtils.java | 2 +-
.../ignite/igfs/IgfsFragmentizerSelfTest.java | 2 -
.../processors/igfs/IgfsSizeSelfTest.java | 133 ----------
.../HadoopDefaultMapReducePlannerSelfTest.java | 6 -
9 files changed, 83 insertions(+), 427 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
index 8653f90..7530557 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
@@ -33,7 +33,6 @@ import org.apache.ignite.igfs.mapreduce.IgfsRecordResolver;
import org.apache.ignite.igfs.mapreduce.IgfsTask;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
import org.apache.ignite.internal.AsyncSupportAdapter;
-import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
@@ -166,11 +165,6 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter<IgniteFileSystem> impleme
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException {
- return igfs.awaitDeletesAsync();
- }
-
- /** {@inheritDoc} */
@Nullable @Override public String clientLogDirectory() {
return igfs.clientLogDirectory();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index 16fbeb8..57a8c6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -33,7 +33,6 @@ import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper;
import org.apache.ignite.igfs.IgfsOutOfSpaceException;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
-import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -1056,34 +1055,24 @@ public class IgfsDataManager extends IgfsManager {
private void processPartialBlockWrite(IgniteUuid fileId, IgfsBlockKey colocatedKey, int startOff,
byte[] data) throws IgniteCheckedException {
if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax()) {
- try {
- igfs.awaitDeletesAsync().get(trashPurgeTimeout);
- }
- catch (IgniteFutureTimeoutCheckedException ignore) {
- // Ignore.
- }
+ final WriteCompletionFuture completionFut = pendingWrites.get(fileId);
- // Additional size check.
- if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax()) {
- final WriteCompletionFuture completionFut = pendingWrites.get(fileId);
-
- if (completionFut == null) {
- if (log.isDebugEnabled())
- log.debug("Missing completion future for file write request (most likely exception occurred " +
- "which will be thrown upon stream close) [fileId=" + fileId + ']');
+ if (completionFut == null) {
+ if (log.isDebugEnabled())
+ log.debug("Missing completion future for file write request (most likely exception occurred " +
+ "which will be thrown upon stream close) [fileId=" + fileId + ']');
- return;
- }
+ return;
+ }
- IgfsOutOfSpaceException e = new IgfsOutOfSpaceException("Failed to write data block " +
- "(IGFS maximum data size exceeded) [used=" + dataCachePrj.igfsDataSpaceUsed() +
- ", allowed=" + dataCachePrj.igfsDataSpaceMax() + ']');
+ IgfsOutOfSpaceException e = new IgfsOutOfSpaceException("Failed to write data block " +
+ "(IGFS maximum data size exceeded) [used=" + dataCachePrj.igfsDataSpaceUsed() +
+ ", allowed=" + dataCachePrj.igfsDataSpaceMax() + ']');
- completionFut.onDone(new IgniteCheckedException("Failed to write data (not enough space on node): " +
- igfsCtx.kernalContext().localNodeId(), e));
+ completionFut.onDone(new IgniteCheckedException("Failed to write data (not enough space on node): " +
+ igfsCtx.kernalContext().localNodeId(), e));
- return;
- }
+ return;
}
// No affinity key present, just concat and return.
@@ -1225,26 +1214,10 @@ public class IgfsDataManager extends IgfsManager {
assert !blocks.isEmpty();
if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax()) {
- try {
- try {
- igfs.awaitDeletesAsync().get(trashPurgeTimeout);
- }
- catch (IgniteFutureTimeoutCheckedException ignore) {
- // Ignore.
- }
-
- // Additional size check.
- if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax())
- return new GridFinishedFuture<Object>(
- new IgfsOutOfSpaceException("Failed to write data block (IGFS maximum data size " +
- "exceeded) [used=" + dataCachePrj.igfsDataSpaceUsed() +
- ", allowed=" + dataCachePrj.igfsDataSpaceMax() + ']'));
-
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(new IgniteCheckedException("Failed to store data " +
- "block due to unexpected exception.", e));
- }
+ return new GridFinishedFuture<Object>(
+ new IgfsOutOfSpaceException("Failed to write data block (IGFS maximum data size " +
+ "exceeded) [used=" + dataCachePrj.igfsDataSpaceUsed() +
+ ", allowed=" + dataCachePrj.igfsDataSpaceMax() + ']'));
}
return dataCachePrj.putAllAsync(blocks);
http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
index bae9354..310090d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
@@ -19,13 +19,10 @@ package org.apache.ignite.internal.processors.igfs;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
-import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
-import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
@@ -37,8 +34,6 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS;
-
/**
* IGFS worker for removal from the trash directory.
*/
@@ -49,9 +44,6 @@ public class IgfsDeleteWorker extends IgfsThread {
/** How many files/folders to delete at once (i.e in a single transaction). */
private static final int MAX_DELETE_BATCH = 100;
- /** IGFS context. */
- private final IgfsContext igfsCtx;
-
/** Metadata manager. */
private final IgfsMetaManager meta;
@@ -73,9 +65,6 @@ public class IgfsDeleteWorker extends IgfsThread {
/** Cancellation flag. */
private volatile boolean cancelled;
- /** Message topic. */
- private Object topic;
-
/**
* Constructor.
*
@@ -84,15 +73,9 @@ public class IgfsDeleteWorker extends IgfsThread {
IgfsDeleteWorker(IgfsContext igfsCtx) {
super("igfs-delete-worker%" + igfsCtx.igfs().name() + "%" + igfsCtx.kernalContext().localNodeId() + "%");
- this.igfsCtx = igfsCtx;
-
meta = igfsCtx.meta();
data = igfsCtx.data();
- String igfsName = igfsCtx.igfs().name();
-
- topic = F.isEmpty(igfsName) ? TOPIC_IGFS : TOPIC_IGFS.topic(igfsName);
-
assert meta != null;
assert data != null;
@@ -189,8 +172,6 @@ public class IgfsDeleteWorker extends IgfsThread {
if (log.isDebugEnabled())
log.debug("Sending delete confirmation message [name=" + entry.getKey() +
", fileId=" + fileId + ']');
-
- sendDeleteMessage(new IgfsDeleteMessage(fileId));
}
}
else
@@ -201,8 +182,6 @@ public class IgfsDeleteWorker extends IgfsThread {
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to delete entry from the trash directory: " + entry.getKey(), e);
-
- sendDeleteMessage(new IgfsDeleteMessage(fileId, e));
}
}
}
@@ -346,25 +325,4 @@ public class IgfsDeleteWorker extends IgfsThread {
return true; // Directory entry was deleted concurrently.
}
}
-
- /**
- * Send delete message to all meta cache nodes in the grid.
- *
- * @param msg Message to send.
- */
- private void sendDeleteMessage(IgfsDeleteMessage msg) {
- assert msg != null;
-
- Collection<ClusterNode> nodes = meta.metaCacheNodes();
-
- for (ClusterNode node : nodes) {
- try {
- igfsCtx.send(node, topic, msg, GridIoPolicy.IGFS_POOL);
- }
- catch (IgniteCheckedException e) {
- U.warn(log, "Failed to send IGFS delete message to node [nodeId=" + node.id() +
- ", msg=" + msg + ", err=" + e.getMessage() + ']');
- }
- }
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
index fb67e20..4c64bc9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
@@ -23,7 +23,6 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteFileSystem;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
-import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
@@ -100,14 +99,6 @@ public interface IgfsEx extends IgniteFileSystem {
public long groupBlockSize();
/**
- * Asynchronously await for all entries existing in trash to be removed.
- *
- * @return Future which will be completed when all entries existed in trash by the time of invocation are removed.
- * @throws IgniteCheckedException If failed.
- */
- public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException;
-
- /**
* Gets client file system log directory.
*
* @return Client file system log directory or {@code null} in case no client connections have been created yet.
http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 9087ff0..262dfef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -32,10 +32,9 @@ import org.apache.ignite.compute.ComputeJobResultPolicy;
import org.apache.ignite.compute.ComputeTaskSplitAdapter;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.FileSystemConfiguration;
-import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
import org.apache.ignite.events.IgfsEvent;
import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsException;
import org.apache.ignite.igfs.IgfsFile;
import org.apache.ignite.igfs.IgfsInvalidPathException;
import org.apache.ignite.igfs.IgfsMetrics;
@@ -51,9 +50,7 @@ import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware;
import org.apache.ignite.internal.processors.igfs.client.IgfsClientAffinityCallable;
import org.apache.ignite.internal.processors.igfs.client.IgfsClientDeleteCallable;
@@ -69,8 +66,6 @@ import org.apache.ignite.internal.processors.igfs.client.IgfsClientSummaryCallab
import org.apache.ignite.internal.processors.igfs.client.IgfsClientUpdateCallable;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.GridSpinBusyLock;
-import org.apache.ignite.internal.util.future.GridCompoundFuture;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
@@ -100,11 +95,11 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_DELETED;
@@ -114,14 +109,10 @@ import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_DELETED;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_READ;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_WRITE;
import static org.apache.ignite.events.EventType.EVT_IGFS_META_UPDATED;
-import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
-import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC;
import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
import static org.apache.ignite.igfs.IgfsMode.PROXY;
-import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGFS;
/**
* Cache-based IGFS implementation.
@@ -130,6 +121,9 @@ public final class IgfsImpl implements IgfsEx {
/** Default permissions for file system entry. */
private static final String PERMISSION_DFLT_VAL = "0777";
+ /** Index generator for async format threads. */
+ private static final AtomicInteger FORMAT_THREAD_IDX_GEN = new AtomicInteger();
+
/** Default directory metadata. */
static final Map<String, String> DFLT_DIR_META = F.asMap(IgfsUtils.PROP_PERMISSION, PERMISSION_DFLT_VAL);
@@ -169,24 +163,12 @@ public final class IgfsImpl implements IgfsEx {
/** Writers map. */
private final ConcurrentHashMap8<IgfsPath, IgfsFileWorkerBatch> workerMap = new ConcurrentHashMap8<>();
- /** Delete futures. */
- private final ConcurrentHashMap8<IgniteUuid, GridFutureAdapter<Object>> delFuts = new ConcurrentHashMap8<>();
-
- /** Delete message listener. */
- private final GridMessageListener delMsgLsnr = new FormatMessageListener();
-
- /** Format discovery listener. */
- private final GridLocalEventListener delDiscoLsnr = new FormatDiscoveryListener();
-
/** Local metrics holder. */
private final IgfsLocalMetrics metrics = new IgfsLocalMetrics();
/** Client log directory. */
private volatile String logDir;
- /** Message topic. */
- private Object topic;
-
/** Eviction policy (if set). */
private IgfsPerBlockLruEvictionPolicy evictPlc;
@@ -292,11 +274,6 @@ public final class IgfsImpl implements IgfsEx {
}
}
- topic = F.isEmpty(name()) ? TOPIC_IGFS : TOPIC_IGFS.topic(name());
-
- igfsCtx.kernalContext().io().addMessageListener(topic, delMsgLsnr);
- igfsCtx.kernalContext().event().addLocalEventListener(delDiscoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
-
dualPool = secondaryFs != null ? new IgniteThreadPoolExecutor(4, Integer.MAX_VALUE, 5000L,
new LinkedBlockingQueue<Runnable>(), new IgfsThreadFactory(cfg.getName()), null) : null;
}
@@ -332,9 +309,6 @@ public final class IgfsImpl implements IgfsEx {
}
}
- igfsCtx.kernalContext().io().removeMessageListener(topic, delMsgLsnr);
- igfsCtx.kernalContext().event().removeLocalEventListener(delDiscoLsnr);
-
// Restore interrupted flag.
if (interrupted)
Thread.currentThread().interrupt();
@@ -1381,7 +1355,25 @@ public final class IgfsImpl implements IgfsEx {
/** {@inheritDoc} */
@Override public void format() {
try {
- formatAsync().get();
+ IgniteUuid id = meta.format();
+
+ // If ID is null, then file system is already empty.
+ if (id == null)
+ return;
+
+ while (true) {
+ if (enterBusy()) {
+ try {
+ if (!meta.exists(id))
+ return;
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ U.sleep(10);
+ }
}
catch (Exception e) {
throw IgfsUtils.toIgfsException(e);
@@ -1394,69 +1386,16 @@ public final class IgfsImpl implements IgfsEx {
* @return Future.
*/
IgniteInternalFuture<?> formatAsync() {
- try {
- IgniteUuid id = meta.format();
-
- if (id == null)
- return new GridFinishedFuture<Object>();
- else {
- GridFutureAdapter<Object> fut = new GridFutureAdapter<>();
-
- GridFutureAdapter<Object> oldFut = delFuts.putIfAbsent(id, fut);
-
- if (oldFut != null)
- return oldFut;
- else {
- if (!meta.exists(id)) {
- // Safety in case response message was received before we put future into collection.
- fut.onDone();
-
- delFuts.remove(id, fut);
- }
+ GridFutureAdapter<?> fut = new GridFutureAdapter<>();
- return fut;
- }
- }
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<Object>(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException {
- Collection<IgniteUuid> ids = meta.pendingDeletes();
+ Thread t = new Thread(new FormatRunnable(fut), "igfs-format-" + cfg.getName() + "-" +
+ FORMAT_THREAD_IDX_GEN.incrementAndGet());
- if (!ids.isEmpty()) {
- if (log.isDebugEnabled())
- log.debug("Constructing delete future for trash entries: " + ids);
+ t.setDaemon(true);
- GridCompoundFuture<Object, Object> resFut = new GridCompoundFuture<>();
+ t.start();
- for (IgniteUuid id : ids) {
- GridFutureAdapter<Object> fut = new GridFutureAdapter<>();
-
- IgniteInternalFuture<Object> oldFut = delFuts.putIfAbsent(id, fut);
-
- if (oldFut != null)
- resFut.add(oldFut);
- else {
- if (meta.exists(id))
- resFut.add(fut);
- else {
- fut.onDone();
-
- delFuts.remove(id, fut);
- }
- }
- }
-
- resFut.markInitialized();
-
- return resFut;
- }
- else
- return new GridFinishedFuture<>();
+ return fut;
}
/**
@@ -1482,24 +1421,6 @@ public final class IgfsImpl implements IgfsEx {
return new FileDescriptor(parentId, path.name(), fileInfo.id(), fileInfo.isFile());
}
- /**
- * Check whether IGFS with the same name exists among provided attributes.
- *
- * @param attrs Attributes.
- * @return {@code True} in case IGFS with the same name exists among provided attributes
- */
- private boolean sameIgfs(IgfsAttributes[] attrs) {
- if (attrs != null) {
- String igfsName = name();
-
- for (IgfsAttributes attr : attrs) {
- if (F.eq(igfsName, attr.igfsName()))
- return true;
- }
- }
- return false;
- }
-
/** {@inheritDoc} */
@Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
Collection<IgfsPath> paths, @Nullable T arg) {
@@ -1905,81 +1826,6 @@ public final class IgfsImpl implements IgfsEx {
}
}
- /**
- * Format message listener required for format action completion.
- */
- private class FormatMessageListener implements GridMessageListener {
- /** {@inheritDoc} */
- @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- @Override public void onMessage(UUID nodeId, Object msg) {
- if (msg instanceof IgfsDeleteMessage) {
- ClusterNode node = igfsCtx.kernalContext().discovery().node(nodeId);
-
- if (node != null) {
- if (sameIgfs((IgfsAttributes[]) node.attribute(ATTR_IGFS))) {
- IgfsDeleteMessage msg0 = (IgfsDeleteMessage)msg;
-
- try {
- msg0.finishUnmarshal(igfsCtx.kernalContext().config().getMarshaller(), null);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to unmarshal message (will ignore): " + msg0, e);
-
- return;
- }
-
- assert msg0.id() != null;
-
- GridFutureAdapter<?> fut = delFuts.remove(msg0.id());
-
- if (fut != null) {
- if (msg0.error() == null)
- fut.onDone();
- else
- fut.onDone(msg0.error());
- }
- }
- }
- }
- }
- }
-
- /**
- * Discovery listener required for format actions completion.
- */
- private class FormatDiscoveryListener implements GridLocalEventListener {
- /** {@inheritDoc} */
- @Override public void onEvent(Event evt) {
- assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED;
-
- DiscoveryEvent evt0 = (DiscoveryEvent)evt;
-
- if (evt0.eventNode() != null) {
- if (sameIgfs((IgfsAttributes[]) evt0.eventNode().attribute(ATTR_IGFS))) {
- Collection<IgniteUuid> rmv = new HashSet<>();
-
- for (Map.Entry<IgniteUuid, GridFutureAdapter<Object>> fut : delFuts.entrySet()) {
- IgniteUuid id = fut.getKey();
-
- try {
- if (!meta.exists(id)) {
- fut.getValue().onDone();
-
- rmv.add(id);
- }
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to check file existence: " + id, e);
- }
- }
-
- for (IgniteUuid id : rmv)
- delFuts.remove(id);
- }
- }
- }
- }
-
/** {@inheritDoc} */
@Override public IgniteUuid nextAffinityKey() {
return safeOp(new Callable<IgniteUuid>() {
@@ -2079,4 +1925,39 @@ public final class IgfsImpl implements IgfsEx {
return t;
}
}
+
+ /**
+ * Format runnable.
+ */
+ private class FormatRunnable implements Runnable {
+ /** Target future. */
+ private final GridFutureAdapter<?> fut;
+
+ /**
+ * Constructor.
+ *
+ * @param fut Future.
+ */
+ public FormatRunnable(GridFutureAdapter<?> fut) {
+ this.fut = fut;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ IgfsException err = null;
+
+ try {
+ format();
+ }
+ catch (Throwable err0) {
+ err = IgfsUtils.toIgfsException(err0);
+ }
+ finally {
+ if (err == null)
+ fut.onDone();
+ else
+ fut.onDone(err);
+ }
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index 6fa9877..cfe549f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@ -180,7 +180,7 @@ public class IgfsUtils {
* @return Converted IGFS exception.
*/
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- public static IgfsException toIgfsException(Exception err) {
+ public static IgfsException toIgfsException(Throwable err) {
IgfsException err0 = err instanceof IgfsException ? (IgfsException)err : null;
IgfsException igfsErr = X.cause(err, IgfsException.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java
index fd4ec17..4e0f12b 100644
--- a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java
@@ -239,8 +239,6 @@ public class IgfsFragmentizerSelfTest extends IgfsFragmentizerAbstractSelfTest {
igfs.format();
- igfs.awaitDeletesAsync().get();
-
GridTestUtils.retryAssert(log, 50, 100, new CA() {
@Override public void apply() {
for (int i = 0; i < NODE_CNT; i++) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java
index 3933e86..266945f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.igfs;
import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterNode;
@@ -41,27 +40,21 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.transactions.Transaction;
import org.jsr166.ThreadLocalRandom8;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicReference;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
-import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
* {@link IgfsAttributes} test case.
@@ -256,41 +249,6 @@ public class IgfsSizeSelfTest extends IgfsCommonAbstractTest {
}
/**
- * Ensure that exception is not thrown in case PARTITIONED cache is oversized, but data is deleted concurrently.
- *
- * @throws Exception If failed.
- */
- public void testPartitionedOversizeDelay() throws Exception {
- cacheMode = PARTITIONED;
- nearEnabled = true;
-
- checkOversizeDelay();
- }
-
- /**
- * Ensure that exception is not thrown in case co-located cache is oversized, but data is deleted concurrently.
- *
- * @throws Exception If failed.
- */
- public void testColocatedOversizeDelay() throws Exception {
- cacheMode = PARTITIONED;
- nearEnabled = false;
-
- checkOversizeDelay();
- }
-
- /**
- * Ensure that exception is not thrown in case REPLICATED cache is oversized, but data is deleted concurrently.
- *
- * @throws Exception If failed.
- */
- public void testReplicatedOversizeDelay() throws Exception {
- cacheMode = REPLICATED;
-
- checkOversizeDelay();
- }
-
- /**
* Ensure that IGFS size is correctly updated in case of preloading for PARTITIONED cache.
*
* @throws Exception If failed.
@@ -484,97 +442,6 @@ public class IgfsSizeSelfTest extends IgfsCommonAbstractTest {
}
/**
- * Ensure that exception is not thrown or thrown with some delay when there is something in trash directory.
- *
- * @throws Exception If failed.
- */
- private void checkOversizeDelay() throws Exception {
- final CountDownLatch latch = new CountDownLatch(1);
-
- igfsMaxData = 256;
- trashPurgeTimeout = 2000;
-
- startUp();
-
- IgfsImpl igfs = igfs(0);
-
- final IgfsPath path = new IgfsPath("/file");
- final IgfsPath otherPath = new IgfsPath("/fileOther");
-
- // Fill cache with data up to it's limit.
- IgfsOutputStream os = igfs.create(path, false);
- os.write(chunk((int)igfsMaxData));
- os.close();
-
- final IgniteCache<IgniteUuid, IgfsEntryInfo> metaCache = igfs.context().kernalContext().cache().jcache(
- igfs.configuration().getMetaCacheName());
-
- // Start a transaction in a separate thread which will lock file ID.
- final IgniteUuid id = igfs.context().meta().fileId(path);
- final IgfsEntryInfo info = igfs.context().meta().info(id);
-
- final AtomicReference<Throwable> err = new AtomicReference<>();
-
- try {
- new Thread(new Runnable() {
- @Override public void run() {
- try {
-
- try (Transaction tx = metaCache.unwrap(Ignite.class).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
- metaCache.get(id);
-
- latch.await();
-
- U.sleep(1000); // Sleep here so that data manager could "see" oversize.
-
- tx.commit();
- }
- }
- catch (Throwable e) {
- err.set(e);
- }
- }
- }).start();
-
- // Now add file ID to trash listing so that delete worker could "see" it.
- IgniteUuid trashId = IgfsUtils.randomTrashId();
-
- try (Transaction tx = metaCache.unwrap(Ignite.class).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
- Map<String, IgfsListingEntry> listing = Collections.singletonMap(path.name(),
- new IgfsListingEntry(info));
-
- // Clear root listing.
- metaCache.put(IgfsUtils.ROOT_ID, IgfsUtils.createDirectory(IgfsUtils.ROOT_ID));
-
- // Add file to trash listing.
- IgfsEntryInfo trashInfo = metaCache.get(trashId);
-
- if (trashInfo == null)
- metaCache.put(trashId, IgfsUtils.createDirectory(trashId).listing(listing));
- else
- metaCache.put(trashId, trashInfo.listing(listing));
-
- tx.commit();
- }
-
- assert metaCache.get(trashId) != null;
-
- // Now the file is locked and is located in trash, try adding some more data.
- os = igfs.create(otherPath, false);
- os.write(new byte[1]);
-
- latch.countDown();
-
- os.close();
-
- assert err.get() == null;
- }
- finally {
- latch.countDown(); // Safety.
- }
- }
-
- /**
* Ensure that IGFS size is correctly updated in case of preloading.
*
* @throws Exception If failed.
http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
index b38f3a2..ffa6f7d 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
@@ -41,7 +41,6 @@ import org.apache.ignite.igfs.mapreduce.IgfsTask;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.IgniteClusterEx;
import org.apache.ignite.internal.processors.cache.GridCacheUtilityKey;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
@@ -754,11 +753,6 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException {
- return null;
- }
-
- /** {@inheritDoc} */
@Nullable @Override public String clientLogDirectory() {
return null;
}
[11/14] ignite git commit: Merge branch 'gridgain-7.5.25' into
gridgain-7.5.25-out-refactor
Posted by vo...@apache.org.
Merge branch 'gridgain-7.5.25' into gridgain-7.5.25-out-refactor
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/da1ff65a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/da1ff65a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/da1ff65a
Branch: refs/heads/ignite-3264
Commit: da1ff65afc39c7b2dab4246551d2db25c21d7baa
Parents: f6fd3b8 065d2e7
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Jun 7 09:55:11 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Jun 7 09:55:11 2016 +0300
----------------------------------------------------------------------
.../configuration/FileSystemConfiguration.java | 2 +-
.../ignite/internal/binary/BinaryUtils.java | 16 ++
.../processors/cache/CacheObjectContext.java | 3 +
.../internal/processors/igfs/IgfsAsyncImpl.java | 6 -
.../processors/igfs/IgfsDataManager.java | 61 ++---
.../processors/igfs/IgfsDeleteWorker.java | 42 ----
.../ignite/internal/processors/igfs/IgfsEx.java | 9 -
.../internal/processors/igfs/IgfsImpl.java | 249 +++++--------------
.../processors/igfs/IgfsInputStreamImpl.java | 6 +-
...zySecondaryFileSystemPositionedReadable.java | 77 ++++++
.../processors/igfs/IgfsMetaManager.java | 43 +++-
.../internal/processors/igfs/IgfsUtils.java | 2 +-
.../ignite/igfs/IgfsFragmentizerSelfTest.java | 2 -
.../GridCacheBinaryObjectsAbstractSelfTest.java | 78 +++++-
.../processors/igfs/IgfsAbstractSelfTest.java | 3 +
.../processors/igfs/IgfsModesSelfTest.java | 1 +
.../processors/igfs/IgfsSizeSelfTest.java | 133 ----------
.../unsafe/GridOffheapSnapTreeSelfTest.java | 2 +-
.../igfs/HadoopFIleSystemFactorySelfTest.java | 1 +
.../HadoopDefaultMapReducePlannerSelfTest.java | 6 -
.../query/h2/opt/GridH2AbstractKeyValueRow.java | 23 +-
.../query/h2/opt/GridH2KeyValueRowOffheap.java | 17 +-
.../cache/IgniteCacheOffheapIndexScanTest.java | 195 +++++++++++++++
.../IgniteCacheQuerySelfTestSuite.java | 2 +
24 files changed, 522 insertions(+), 457 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/da1ff65a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
[09/14] ignite git commit: IGNITE-3259: Delete worker is not started
on client nodes any more.
Posted by vo...@apache.org.
IGNITE-3259: Delete worker is not started on client nodes any more.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0176af13
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0176af13
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0176af13
Branch: refs/heads/ignite-3264
Commit: 0176af13646a09541d65a10cf7ec0641c71e2ca7
Parents: 5254957
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 18:10:36 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jun 6 18:10:36 2016 +0300
----------------------------------------------------------------------
.../processors/igfs/IgfsMetaManager.java | 25 ++++++++++++++------
1 file changed, 18 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0176af13/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 1dd4c53..465116b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -207,19 +207,20 @@ public class IgfsMetaManager extends IgfsManager {
locNode = igfsCtx.kernalContext().discovery().localNode();
// Start background delete worker.
- delWorker = new IgfsDeleteWorker(igfsCtx);
+ if (!client) {
+ delWorker = new IgfsDeleteWorker(igfsCtx);
- delWorker.start();
+ delWorker.start();
+ }
}
/** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
IgfsDeleteWorker delWorker0 = delWorker;
- if (delWorker0 != null)
+ if (delWorker0 != null) {
delWorker0.cancel();
- if (delWorker0 != null) {
try {
U.join(delWorker0);
}
@@ -1136,7 +1137,7 @@ public class IgfsMetaManager extends IgfsManager {
tx.commit();
- delWorker.signal();
+ signalDeleteWorker();
return newInfo.id();
}
@@ -1212,7 +1213,7 @@ public class IgfsMetaManager extends IgfsManager {
tx.commit();
- delWorker.signal();
+ signalDeleteWorker();
return victimId;
}
@@ -2476,7 +2477,7 @@ public class IgfsMetaManager extends IgfsManager {
Boolean res = synchronizeAndExecute(task, fs, false, Collections.singleton(trashId), path);
- delWorker.signal();
+ signalDeleteWorker();
return res;
}
@@ -3341,4 +3342,14 @@ public class IgfsMetaManager extends IgfsManager {
else
IgfsUtils.sendEvents(igfsCtx.kernalContext(), leafPath, EventType.EVT_IGFS_DIR_CREATED);
}
+
+ /**
+ * Signal delete worker thread.
+ */
+ private void signalDeleteWorker() {
+ IgfsDeleteWorker delWorker0 = delWorker;
+
+ if (delWorker0 != null)
+ delWorker0.signal();
+ }
}
\ No newline at end of file
[14/14] ignite git commit: WIP.
Posted by vo...@apache.org.
WIP.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/99d244a3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/99d244a3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/99d244a3
Branch: refs/heads/ignite-3264
Commit: 99d244a3009a5bcf347ce09da145a8f6cc3dc19f
Parents: cd92c9e
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Jun 7 10:55:21 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Jun 7 10:55:21 2016 +0300
----------------------------------------------------------------------
.../processors/igfs/IgfsMetaManager.java | 341 +++++++++++--------
1 file changed, 203 insertions(+), 138 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/99d244a3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 465116b..404d837 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -1882,121 +1882,8 @@ public class IgfsMetaManager extends IgfsManager {
// Events to fire (can be done outside of a transaction).
final Deque<IgfsEvent> pendingEvts = new LinkedList<>();
- SynchronizationTask<IgfsSecondaryOutputStreamDescriptor> task =
- new SynchronizationTask<IgfsSecondaryOutputStreamDescriptor>() {
- /** Output stream to the secondary file system. */
- private OutputStream out;
-
- @Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath,
- IgfsEntryInfo> infos) throws Exception {
- validTxState(true);
-
- assert !infos.isEmpty();
-
- // Determine the first existing parent.
- IgfsPath parentPath = null;
-
- for (IgfsPath curPath : infos.keySet()) {
- if (parentPath == null || curPath.isSubDirectoryOf(parentPath))
- parentPath = curPath;
- }
-
- assert parentPath != null;
-
- IgfsEntryInfo parentInfo = infos.get(parentPath);
-
- // Delegate to the secondary file system.
- out = simpleCreate ? fs.create(path, overwrite) :
- fs.create(path, bufSize, overwrite, replication, blockSize, props);
-
- IgfsPath parent0 = path.parent();
-
- assert parent0 != null : "path.parent() is null (are we creating ROOT?): " + path;
-
- // If some of the parent directories were missing, synchronize again.
- if (!parentPath.equals(parent0)) {
- parentInfo = synchronize(fs, parentPath, parentInfo, parent0, true, null);
-
- // Fire notification about missing directories creation.
- if (evts.isRecordable(EventType.EVT_IGFS_DIR_CREATED)) {
- IgfsPath evtPath = parent0;
-
- while (!parentPath.equals(evtPath)) {
- pendingEvts.addFirst(new IgfsEvent(evtPath, locNode,
- EventType.EVT_IGFS_DIR_CREATED));
-
- evtPath = evtPath.parent();
-
- assert evtPath != null; // If this fails, then ROOT does not exist.
- }
- }
- }
-
- // Get created file info.
- IgfsFile status = fs.info(path);
-
- if (status == null)
- throw fsException("Failed to open output stream to the file created in " +
- "the secondary file system because it no longer exists: " + path);
- else if (status.isDirectory())
- throw fsException("Failed to open output stream to the file created in " +
- "the secondary file system because the path points to a directory: " + path);
-
- IgfsEntryInfo newInfo = IgfsUtils.createFile(
- IgniteUuid.randomUuid(),
- status.blockSize(),
- status.length(),
- affKey,
- createFileLockId(false),
- igfsCtx.igfs().evictExclude(path, false),
- status.properties(),
- status.accessTime(),
- status.modificationTime()
- );
-
- // Add new file info to the listing optionally removing the previous one.
- assert parentInfo != null;
-
- IgniteUuid oldId = putIfAbsentNonTx(parentInfo.id(), path.name(), newInfo);
-
- if (oldId != null) {
- IgfsEntryInfo oldInfo = info(oldId);
-
- assert oldInfo != null; // Otherwise cache is in inconsistent state.
-
- // The contact is that we cannot overwrite a file locked for writing:
- if (oldInfo.lockId() != null)
- throw fsException("Failed to overwrite file (file is opened for writing) [path=" +
- path + ", fileId=" + oldId + ", lockId=" + oldInfo.lockId() + ']');
-
- id2InfoPrj.remove(oldId); // Remove the old one.
- id2InfoPrj.invoke(parentInfo.id(), new IgfsMetaDirectoryListingRemoveProcessor(
- path.name(), parentInfo.listing().get(path.name()).fileId()));
-
- createNewEntry(newInfo, parentInfo.id(), path.name()); // Put new one.
-
- igfsCtx.data().delete(oldInfo);
- }
-
- // Record CREATE event if needed.
- if (oldId == null && evts.isRecordable(EventType.EVT_IGFS_FILE_CREATED))
- pendingEvts.add(new IgfsEvent(path, locNode, EventType.EVT_IGFS_FILE_CREATED));
-
- return new IgfsSecondaryOutputStreamDescriptor(newInfo, out);
- }
-
- @Override public IgfsSecondaryOutputStreamDescriptor onFailure(Exception err)
- throws IgniteCheckedException {
- U.closeQuiet(out);
-
- U.error(log, "File create in DUAL mode failed [path=" + path + ", simpleCreate=" +
- simpleCreate + ", props=" + props + ", overwrite=" + overwrite + ", bufferSize=" +
- bufSize + ", replication=" + replication + ", blockSize=" + blockSize + ']', err);
-
- throw new IgniteCheckedException("Failed to create the file due to secondary file system " +
- "exception: " + path, err);
- }
- };
+ CreateFileSynchronizationTask task = new CreateFileSynchronizationTask(fs, path, simpleCreate, props,
+ overwrite, bufSize, replication, blockSize, affKey, pendingEvts);
try {
return synchronizeAndExecute(task, fs, false, path.parent());
@@ -2956,29 +2843,6 @@ public class IgfsMetaManager extends IgfsManager {
}
/**
- * Synchronization task interface.
- */
- private static interface SynchronizationTask<T> {
- /**
- * Callback handler in case synchronization was successful.
- *
- * @param infos Map from paths to corresponding infos.
- * @return Task result.
- * @throws Exception If failed.
- */
- public T onSuccess(Map<IgfsPath, IgfsEntryInfo> infos) throws Exception;
-
- /**
- * Callback handler in case synchronization failed.
- *
- * @param err Optional exception.
- * @return Task result.
- * @throws IgniteCheckedException In case exception is to be thrown in that case.
- */
- public T onFailure(Exception err) throws IgniteCheckedException;
- }
-
- /**
* Append routine.
*
* @param path Path.
@@ -3352,4 +3216,205 @@ public class IgfsMetaManager extends IgfsManager {
if (delWorker0 != null)
delWorker0.signal();
}
+
+ /**
+ * Synchronization task interface.
+ */
+ private static interface SynchronizationTask<T> {
+ /**
+ * Callback handler in case synchronization was successful.
+ *
+ * @param infos Map from paths to corresponding infos.
+ * @return Task result.
+ * @throws Exception If failed.
+ */
+ public T onSuccess(Map<IgfsPath, IgfsEntryInfo> infos) throws Exception;
+
+ /**
+ * Callback handler in case synchronization failed.
+ *
+ * @param err Optional exception.
+ * @return Task result.
+ * @throws IgniteCheckedException In case exception is to be thrown in that case.
+ */
+ public T onFailure(Exception err) throws IgniteCheckedException;
+ }
+
+ /**
+ * Synchronization task to create a file.
+ */
+ private class CreateFileSynchronizationTask implements SynchronizationTask<IgfsSecondaryOutputStreamDescriptor> {
+ /** Secondary file system. */
+ private IgfsSecondaryFileSystem fs;
+
+ /** Path. */
+ private IgfsPath path;
+
+ /** Simple create flag. */
+ private boolean simpleCreate;
+
+ /** Properties. */
+ private Map<String, String> props;
+
+ /** Overwrite flag. */
+ private boolean overwrite;
+
+ /** Buffer size. */
+ private int bufSize;
+
+ /** Replication factor. */
+ private short replication;
+
+ /** Block size. */
+ private long blockSize;
+
+ /** Affinity key. */
+ private IgniteUuid affKey;
+
+ /** Pending events. */
+ private Deque<IgfsEvent> pendingEvts;
+
+ /** Output stream to the secondary file system. */
+ private OutputStream out;
+
+ /**
+ * Constructor.
+ *
+ * @param fs Secondary file system.
+ * @param path Path.
+ * @param simpleCreate Simple create flag.
+ * @param props Properties.
+ * @param overwrite Overwrite flag.
+ * @param bufSize Buffer size.
+ * @param replication Replication factor.
+ * @param blockSize Block size.
+ * @param affKey Affinity key.
+ * @param pendingEvts Pending events.
+ */
+ public CreateFileSynchronizationTask(IgfsSecondaryFileSystem fs, IgfsPath path, boolean simpleCreate,
+ @Nullable Map<String, String> props, boolean overwrite, int bufSize, short replication, long blockSize,
+ IgniteUuid affKey, Deque<IgfsEvent> pendingEvts) {
+ this.fs = fs;
+ this.path = path;
+ this.simpleCreate = simpleCreate;
+ this.props = props;
+ this.overwrite = overwrite;
+ this.bufSize = bufSize;
+ this.replication = replication;
+ this.blockSize = blockSize;
+ this.affKey = affKey;
+ this.pendingEvts = pendingEvts;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath, IgfsEntryInfo> infos)
+ throws Exception {
+ validTxState(true);
+
+ assert !infos.isEmpty();
+
+ // Determine the first existing parent.
+ IgfsPath parentPath = null;
+
+ for (IgfsPath curPath : infos.keySet()) {
+ if (parentPath == null || curPath.isSubDirectoryOf(parentPath))
+ parentPath = curPath;
+ }
+
+ assert parentPath != null;
+
+ IgfsEntryInfo parentInfo = infos.get(parentPath);
+
+ // Delegate to the secondary file system.
+ out = simpleCreate ? fs.create(path, overwrite) :
+ fs.create(path, bufSize, overwrite, replication, blockSize, props);
+
+ IgfsPath parent0 = path.parent();
+
+ assert parent0 != null : "path.parent() is null (are we creating ROOT?): " + path;
+
+ // If some of the parent directories were missing, synchronize again.
+ if (!parentPath.equals(parent0)) {
+ parentInfo = synchronize(fs, parentPath, parentInfo, parent0, true, null);
+
+ // Fire notification about missing directories creation.
+ if (evts.isRecordable(EventType.EVT_IGFS_DIR_CREATED)) {
+ IgfsPath evtPath = parent0;
+
+ while (!parentPath.equals(evtPath)) {
+ pendingEvts.addFirst(new IgfsEvent(evtPath, locNode,
+ EventType.EVT_IGFS_DIR_CREATED));
+
+ evtPath = evtPath.parent();
+
+ assert evtPath != null; // If this fails, then ROOT does not exist.
+ }
+ }
+ }
+
+ // Get created file info.
+ IgfsFile status = fs.info(path);
+
+ if (status == null)
+ throw fsException("Failed to open output stream to the file created in " +
+ "the secondary file system because it no longer exists: " + path);
+ else if (status.isDirectory())
+ throw fsException("Failed to open output stream to the file created in " +
+ "the secondary file system because the path points to a directory: " + path);
+
+ IgfsEntryInfo newInfo = IgfsUtils.createFile(
+ IgniteUuid.randomUuid(),
+ status.blockSize(),
+ status.length(),
+ affKey,
+ createFileLockId(false),
+ igfsCtx.igfs().evictExclude(path, false),
+ status.properties(),
+ status.accessTime(),
+ status.modificationTime()
+ );
+
+ // Add new file info to the listing optionally removing the previous one.
+ assert parentInfo != null;
+
+ IgniteUuid oldId = putIfAbsentNonTx(parentInfo.id(), path.name(), newInfo);
+
+ if (oldId != null) {
+ IgfsEntryInfo oldInfo = info(oldId);
+
+ assert oldInfo != null; // Otherwise cache is in inconsistent state.
+
+ // The contact is that we cannot overwrite a file locked for writing:
+ if (oldInfo.lockId() != null)
+ throw fsException("Failed to overwrite file (file is opened for writing) [path=" +
+ path + ", fileId=" + oldId + ", lockId=" + oldInfo.lockId() + ']');
+
+ id2InfoPrj.remove(oldId); // Remove the old one.
+ id2InfoPrj.invoke(parentInfo.id(), new IgfsMetaDirectoryListingRemoveProcessor(
+ path.name(), parentInfo.listing().get(path.name()).fileId()));
+
+ createNewEntry(newInfo, parentInfo.id(), path.name()); // Put new one.
+
+ igfsCtx.data().delete(oldInfo);
+ }
+
+ // Record CREATE event if needed.
+ if (oldId == null && evts.isRecordable(EventType.EVT_IGFS_FILE_CREATED))
+ pendingEvts.add(new IgfsEvent(path, locNode, EventType.EVT_IGFS_FILE_CREATED));
+
+ return new IgfsSecondaryOutputStreamDescriptor(newInfo, out);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsSecondaryOutputStreamDescriptor onFailure(Exception err) throws IgniteCheckedException {
+ U.closeQuiet(out);
+
+ U.error(log, "File create in DUAL mode failed [path=" + path + ", simpleCreate=" +
+ simpleCreate + ", props=" + props + ", overwrite=" + overwrite + ", bufferSize=" +
+ bufSize + ", replication=" + replication + ", blockSize=" + blockSize + ']', err);
+
+ throw new IgniteCheckedException("Failed to create the file due to secondary file system " +
+ "exception: " + path, err);
+ }
+ }
}
\ No newline at end of file
[06/14] ignite git commit: Removed more unnecessary fields.
Posted by vo...@apache.org.
Removed more unnecessary fields.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a76b3492
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a76b3492
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a76b3492
Branch: refs/heads/ignite-3264
Commit: a76b3492b6c1312c7c3b7bac0b302dba788e4fba
Parents: 5949abe
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 12:05:28 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jun 6 12:05:28 2016 +0300
----------------------------------------------------------------------
.../ignite/internal/processors/igfs/IgfsImpl.java | 9 ++++-----
.../processors/igfs/IgfsOutputStreamImpl.java | 14 ++++----------
2 files changed, 8 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a76b3492/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index bc2e087..5e2bca0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -1078,7 +1078,7 @@ public final class IgfsImpl implements IgfsEx {
batch = newBatch(path, desc.out());
IgfsOutputStreamImpl os = new IgfsOutputStreamImpl(igfsCtx, path, desc.info(),
- bufferSize(bufSize), mode, batch, metrics);
+ bufferSize(bufSize), mode, batch);
IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_WRITE);
@@ -1107,7 +1107,7 @@ public final class IgfsImpl implements IgfsEx {
assert res != null;
- return new IgfsOutputStreamImpl(igfsCtx, path, res, bufferSize(bufSize), mode, null, metrics);
+ return new IgfsOutputStreamImpl(igfsCtx, path, res, bufferSize(bufSize), mode, null);
}
});
}
@@ -1142,8 +1142,7 @@ public final class IgfsImpl implements IgfsEx {
batch = newBatch(path, desc.out());
- return new IgfsOutputStreamImpl(igfsCtx, path, desc.info(), bufferSize(bufSize), mode, batch,
- metrics);
+ return new IgfsOutputStreamImpl(igfsCtx, path, desc.info(), bufferSize(bufSize), mode, batch);
}
final List<IgniteUuid> ids = meta.idsForPath(path);
@@ -1184,7 +1183,7 @@ public final class IgfsImpl implements IgfsEx {
assert res != null;
- return new IgfsOutputStreamImpl(igfsCtx, path, res, bufferSize(bufSize), mode, null, metrics);
+ return new IgfsOutputStreamImpl(igfsCtx, path, res, bufferSize(bufSize), mode, null);
}
});
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a76b3492/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index bc32e81..8c93aad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -98,9 +98,6 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
/** Ensures that onClose)_ routine is called no more than once. */
private final AtomicBoolean onCloseGuard = new AtomicBoolean();
- /** Local IGFS metrics. */
- private final IgfsLocalMetrics metrics;
-
/** Affinity written by this output stream. */
private IgfsFileAffinityRange streamRange;
@@ -119,10 +116,9 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
* @param bufSize The size of the buffer to be used.
* @param mode Grid IGFS mode.
* @param batch Optional secondary file system batch.
- * @param metrics Local IGFS metrics.
*/
IgfsOutputStreamImpl(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo fileInfo, int bufSize, IgfsMode mode,
- @Nullable IgfsFileWorkerBatch batch, IgfsLocalMetrics metrics) {
+ @Nullable IgfsFileWorkerBatch batch) {
synchronized (mux) {
this.path = path;
this.bufSize = optimizeBufferSize(bufSize, fileInfo);
@@ -131,7 +127,6 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
assert fileInfo.isFile() : "Unexpected file info: " + fileInfo;
assert mode != null && mode != PROXY;
assert mode == PRIMARY && batch == null || batch != null;
- assert metrics != null;
// File hasn't been locked.
if (fileInfo.lockId() == null)
@@ -144,13 +139,12 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
this.fileInfo = fileInfo;
this.mode = mode;
this.batch = batch;
- this.metrics = metrics;
streamRange = initialStreamRange(fileInfo);
writeCompletionFut = igfsCtx.data().writeStart(fileInfo);
- metrics.incrementFilesOpenedForWrite();
+ igfsCtx.igfs().localMetrics().incrementFilesOpenedForWrite();
}
}
@@ -259,7 +253,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
if (closeGuard.compareAndSet(false, true)) {
onClose(false);
- metrics.decrementFilesOpenedForWrite();
+ igfsCtx.igfs().localMetrics().decrementFilesOpenedForWrite();
GridEventStorageManager evts = igfsCtx.kernalContext().event();
@@ -472,7 +466,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
err = new IOException("Failed to close stream [path=" + path + ", fileInfo=" + fileInfo + ']', e);
}
- metrics.addWrittenBytesTime(bytes, time);
+ igfsCtx.igfs().localMetrics().addWrittenBytesTime(bytes, time);
// Await secondary file system processing to finish.
if (mode == DUAL_SYNC) {
[08/14] ignite git commit: WIP.
Posted by vo...@apache.org.
WIP.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f6fd3b84
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f6fd3b84
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f6fd3b84
Branch: refs/heads/ignite-3264
Commit: f6fd3b84f17cff6dd4d335ad18e2a8a322a1942f
Parents: 04e311b
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 12:59:43 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jun 6 12:59:43 2016 +0300
----------------------------------------------------------------------
.../processors/igfs/IgfsOutputStreamImpl.java | 96 ++++++--------------
1 file changed, 26 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f6fd3b84/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index 13808ea..16a20a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -221,8 +221,9 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
sendData(true);
try {
- storeDataBlocks(in, len);
- } catch (IgniteCheckedException e) {
+ storeData(in, len);
+ }
+ catch (IgniteCheckedException e) {
throw new IOException(e.getMessage(), e);
}
@@ -323,16 +324,23 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
/**
* Store data block.
*
- * @param block Block.
+ * @param data Block.
+ * @param writeLen Write length.
* @throws IgniteCheckedException If failed.
* @throws IOException If failed.
*/
- private void storeDataBlock(ByteBuffer block) throws IgniteCheckedException, IOException {
+ private void storeData(Object data, int writeLen) throws IgniteCheckedException, IOException {
assert Thread.holdsLock(mux);
+ assert data instanceof ByteBuffer || data instanceof DataInput;
- int writeLen = block.remaining();
+ if (writeCompletionFut.isDone()) {
+ assert ((GridFutureAdapter)writeCompletionFut).isFailed();
+
+ writeCompletionFut.get();
+ }
- preStoreDataBlocks(null, writeLen);
+ bytes += writeLen;
+ space += writeLen;
int blockSize = fileInfo.blockSize();
@@ -350,80 +358,28 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
remainder = allocated;
}
- block.get(remainder, remainderDataLen, writeLen);
+ if (data instanceof ByteBuffer)
+ ((ByteBuffer)data).get(remainder, remainderDataLen, writeLen);
+ else
+ ((DataInput)data).readFully(remainder, remainderDataLen, writeLen);
remainderDataLen += writeLen;
}
else {
- remainder = igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, remainder,
- remainderDataLen, block, false, streamRange, batch);
-
- remainderDataLen = remainder == null ? 0 : remainder.length;
- }
- }
-
- /**
- * Store data blocks.
- *
- * @param in Input.
- * @param len Length.
- * @throws IgniteCheckedException If failed.
- * @throws IOException If failed.
- */
- private void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException {
- assert Thread.holdsLock(mux);
-
- preStoreDataBlocks(in, len);
-
- int blockSize = fileInfo.blockSize();
-
- // If data length is not enough to fill full block, fill the remainder and return.
- if (remainderDataLen + len < blockSize) {
- if (remainder == null)
- remainder = new byte[blockSize];
- else if (remainder.length != blockSize) {
- assert remainderDataLen == remainder.length;
-
- byte[] allocated = new byte[blockSize];
-
- U.arrayCopy(remainder, 0, allocated, 0, remainder.length);
-
- remainder = allocated;
+ if (data instanceof ByteBuffer) {
+ remainder = igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, remainder,
+ remainderDataLen, (ByteBuffer)data, false, streamRange, batch);
+ }
+ else {
+ remainder = igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, remainder,
+ remainderDataLen, (DataInput)data, writeLen, false, streamRange, batch);
}
-
- in.readFully(remainder, remainderDataLen, len);
-
- remainderDataLen += len;
- }
- else {
- remainder = igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, remainder,
- remainderDataLen, in, len, false, streamRange, batch);
remainderDataLen = remainder == null ? 0 : remainder.length;
}
}
/**
- * Initializes data loader if it was not initialized yet and updates written space.
- *
- * @param len Data length to be written.
- */
- private void preStoreDataBlocks(@Nullable DataInput in, int len) throws IgniteCheckedException, IOException {
- // Check if any exception happened while writing data.
- if (writeCompletionFut.isDone()) {
- assert ((GridFutureAdapter)writeCompletionFut).isFailed();
-
- if (in != null)
- in.skipBytes(len);
-
- writeCompletionFut.get();
- }
-
- bytes += len;
- space += len;
- }
-
- /**
* Close callback. It will be called only once in synchronized section.
*
* @param deleted Whether we already know that the file was deleted.
@@ -542,7 +498,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
if (flip)
buf.flip();
- storeDataBlock(buf);
+ storeData(buf, buf.remaining());
buf = null;
}
[02/14] ignite git commit: Better encapsulated monitor.
Posted by vo...@apache.org.
Better encapsulated monitor.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d3a432c0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d3a432c0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d3a432c0
Branch: refs/heads/ignite-3264
Commit: d3a432c02e300988e39516641fb17d5b5a9af698
Parents: 3cd3373
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 11:57:10 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jun 6 11:57:10 2016 +0300
----------------------------------------------------------------------
.../processors/igfs/IgfsOutputStreamImpl.java | 271 ++++++++++---------
1 file changed, 144 insertions(+), 127 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d3a432c0/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index 7a40ba3..7363ffe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -113,6 +113,9 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
/** Close guard. */
private final AtomicBoolean closeGuard = new AtomicBoolean(false);
+ /** Mutex for synchronization. */
+ private final Object mux = new Object();
+
/**
* Constructs file output stream.
*
@@ -126,59 +129,63 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
*/
IgfsOutputStreamImpl(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo fileInfo, int bufSize, IgfsMode mode,
@Nullable IgfsFileWorkerBatch batch, IgfsLocalMetrics metrics) {
- this.path = path;
- this.bufSize = optimizeBufferSize(bufSize, fileInfo);
+ synchronized (mux) {
+ this.path = path;
+ this.bufSize = optimizeBufferSize(bufSize, fileInfo);
- assert fileInfo != null;
- assert fileInfo.isFile() : "Unexpected file info: " + fileInfo;
- assert mode != null && mode != PROXY;
- assert mode == PRIMARY && batch == null || batch != null;
- assert metrics != null;
+ assert fileInfo != null;
+ assert fileInfo.isFile() : "Unexpected file info: " + fileInfo;
+ assert mode != null && mode != PROXY;
+ assert mode == PRIMARY && batch == null || batch != null;
+ assert metrics != null;
- // File hasn't been locked.
- if (fileInfo.lockId() == null)
- throw new IgfsException("Failed to acquire file lock (concurrently modified?): " + path);
+ // File hasn't been locked.
+ if (fileInfo.lockId() == null)
+ throw new IgfsException("Failed to acquire file lock (concurrently modified?): " + path);
- assert !IgfsUtils.DELETE_LOCK_ID.equals(fileInfo.lockId());
+ assert !IgfsUtils.DELETE_LOCK_ID.equals(fileInfo.lockId());
- this.igfsCtx = igfsCtx;
- meta = igfsCtx.meta();
- data = igfsCtx.data();
+ this.igfsCtx = igfsCtx;
+ meta = igfsCtx.meta();
+ data = igfsCtx.data();
- this.fileInfo = fileInfo;
- this.mode = mode;
- this.batch = batch;
- this.metrics = metrics;
+ this.fileInfo = fileInfo;
+ this.mode = mode;
+ this.batch = batch;
+ this.metrics = metrics;
- streamRange = initialStreamRange(fileInfo);
+ streamRange = initialStreamRange(fileInfo);
- writeCompletionFut = data.writeStart(fileInfo);
+ writeCompletionFut = data.writeStart(fileInfo);
- metrics.incrementFilesOpenedForWrite();
+ metrics.incrementFilesOpenedForWrite();
+ }
}
/** {@inheritDoc} */
- @Override public synchronized void write(int b) throws IOException {
- checkClosed(null, 0);
+ @Override public void write(int b) throws IOException {
+ synchronized (mux) {
+ checkClosed(null, 0);
- long startTime = System.nanoTime();
+ long startTime = System.nanoTime();
- b &= 0xFF;
+ b &= 0xFF;
- if (buf == null)
- buf = ByteBuffer.allocate(bufSize);
+ if (buf == null)
+ buf = ByteBuffer.allocate(bufSize);
- buf.put((byte)b);
+ buf.put((byte)b);
- if (buf.position() >= bufSize)
- sendData(true); // Send data to server.
+ if (buf.position() >= bufSize)
+ sendData(true); // Send data to server.
- time += System.nanoTime() - startTime;
+ time += System.nanoTime() - startTime;
+ }
}
/** {@inheritDoc} */
@SuppressWarnings("NullableProblems")
- @Override public synchronized void write(byte[] b, int off, int len) throws IOException {
+ @Override public void write(byte[] b, int off, int len) throws IOException {
A.notNull(b, "b");
if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
@@ -186,89 +193,94 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
", length=" + len + ']');
}
- checkClosed(null, 0);
+ synchronized (mux) {
+ checkClosed(null, 0);
- if (len == 0)
- return; // Done.
+ if (len == 0)
+ return; // Done.
- long startTime = System.nanoTime();
+ long startTime = System.nanoTime();
- if (buf == null) {
- // Do not allocate and copy byte buffer if will send data immediately.
- if (len >= bufSize) {
- buf = ByteBuffer.wrap(b, off, len);
+ if (buf == null) {
+ // Do not allocate and copy byte buffer if will send data immediately.
+ if (len >= bufSize) {
+ buf = ByteBuffer.wrap(b, off, len);
- sendData(false);
+ sendData(false);
- return;
- }
+ return;
+ }
- buf = ByteBuffer.allocate(Math.max(bufSize, len));
- }
+ buf = ByteBuffer.allocate(Math.max(bufSize, len));
+ }
- if (buf.remaining() < len)
- // Expand buffer capacity, if remaining size is less then data size.
- buf = ByteBuffer.allocate(buf.position() + len).put((ByteBuffer)buf.flip());
+ if (buf.remaining() < len)
+ // Expand buffer capacity, if remaining size is less then data size.
+ buf = ByteBuffer.allocate(buf.position() + len).put((ByteBuffer)buf.flip());
- assert len <= buf.remaining() : "Expects write data size less or equal then remaining buffer capacity " +
- "[len=" + len + ", buf.remaining=" + buf.remaining() + ']';
+ assert len <= buf.remaining() : "Expects write data size less or equal then remaining buffer capacity " +
+ "[len=" + len + ", buf.remaining=" + buf.remaining() + ']';
- buf.put(b, off, len);
+ buf.put(b, off, len);
- if (buf.position() >= bufSize)
- sendData(true); // Send data to server.
+ if (buf.position() >= bufSize)
+ sendData(true); // Send data to server.
- time += System.nanoTime() - startTime;
+ time += System.nanoTime() - startTime;
+ }
}
/** {@inheritDoc} */
- @Override public synchronized void transferFrom(DataInput in, int len) throws IOException {
- checkClosed(in, len);
+ @Override public void transferFrom(DataInput in, int len) throws IOException {
+ synchronized (mux) {
+ checkClosed(in, len);
- long startTime = System.nanoTime();
+ long startTime = System.nanoTime();
- // Send all IPC data from the local buffer before streaming.
- if (buf != null && buf.position() > 0)
- sendData(true);
+ // Send all IPC data from the local buffer before streaming.
+ if (buf != null && buf.position() > 0)
+ sendData(true);
- try {
- storeDataBlocks(in, len);
- }
- catch (IgniteCheckedException e) {
- throw new IOException(e.getMessage(), e);
- }
+ try {
+ storeDataBlocks(in, len);
+ } catch (IgniteCheckedException e) {
+ throw new IOException(e.getMessage(), e);
+ }
- time += System.nanoTime() - startTime;
+ time += System.nanoTime() - startTime;
+ }
}
/** {@inheritDoc} */
- @Override public final synchronized void close() throws IOException {
- // Do nothing if stream is already closed.
- if (closed)
- return;
+ @Override public final void close() throws IOException {
+ synchronized (mux) {
+ // Do nothing if stream is already closed.
+ if (closed)
+ return;
- try {
- // Send all IPC data from the local buffer.
try {
- flush();
- }
- finally {
- if (closeGuard.compareAndSet(false, true)) {
- onClose(false);
+ // Send all IPC data from the local buffer.
+ try {
+ flush();
+ }
+ finally {
+ if (closeGuard.compareAndSet(false, true)) {
+ onClose(false);
- metrics.decrementFilesOpenedForWrite();
+ metrics.decrementFilesOpenedForWrite();
- GridEventStorageManager evts = igfsCtx.kernalContext().event();
+ GridEventStorageManager evts = igfsCtx.kernalContext().event();
- if (evts.isRecordable(EVT_IGFS_FILE_CLOSED_WRITE))
- evts.record(new IgfsEvent(path, igfsCtx.kernalContext().discovery().localNode(),
- EVT_IGFS_FILE_CLOSED_WRITE, bytes));
+ if (evts.isRecordable(EVT_IGFS_FILE_CLOSED_WRITE))
+ evts.record(new IgfsEvent(path, igfsCtx.kernalContext().discovery().localNode(),
+ EVT_IGFS_FILE_CLOSED_WRITE, bytes));
+ }
}
}
- }
- finally {
- // Mark this stream closed AFTER flush.
- closed = true;
+ finally {
+ // Mark this stream closed AFTER flush.
+ closed = true;
+ }
}
}
@@ -277,55 +289,56 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
*
* @exception IOException if an I/O error occurs.
*/
- @Override public synchronized void flush() throws IOException {
- boolean exists;
+ @Override public void flush() throws IOException {
+ synchronized (mux) {
- try {
- exists = meta.exists(fileInfo.id());
- }
- catch (IgniteCheckedException e) {
- throw new IOException("File to read file metadata: " + path, e);
- }
+ boolean exists;
- if (!exists) {
- onClose(true);
+ try {
+ exists = meta.exists(fileInfo.id());
+ } catch (IgniteCheckedException e) {
+ throw new IOException("File to read file metadata: " + path, e);
+ }
- throw new IOException("File was concurrently deleted: " + path);
- }
+ if (!exists) {
+ onClose(true);
- checkClosed(null, 0);
+ throw new IOException("File was concurrently deleted: " + path);
+ }
- // Send all IPC data from the local buffer.
- if (buf != null && buf.position() > 0)
- sendData(true);
+ checkClosed(null, 0);
- try {
- if (remainder != null) {
- data.storeDataBlocks(fileInfo, fileInfo.length() + space, null, 0,
- ByteBuffer.wrap(remainder, 0, remainderDataLen), true, streamRange, batch);
+ // Send all IPC data from the local buffer.
+ if (buf != null && buf.position() > 0)
+ sendData(true);
- remainder = null;
- remainderDataLen = 0;
- }
+ try {
+ if (remainder != null) {
+ data.storeDataBlocks(fileInfo, fileInfo.length() + space, null, 0,
+ ByteBuffer.wrap(remainder, 0, remainderDataLen), true, streamRange, batch);
- if (space > 0) {
- data.awaitAllAcksReceived(fileInfo.id());
+ remainder = null;
+ remainderDataLen = 0;
+ }
- IgfsEntryInfo fileInfo0 = meta.reserveSpace(path, fileInfo.id(), space, streamRange);
+ if (space > 0) {
+ data.awaitAllAcksReceived(fileInfo.id());
- if (fileInfo0 == null)
- throw new IOException("File was concurrently deleted: " + path);
- else
- fileInfo = fileInfo0;
+ IgfsEntryInfo fileInfo0 = meta.reserveSpace(path, fileInfo.id(), space, streamRange);
- streamRange = initialStreamRange(fileInfo);
+ if (fileInfo0 == null)
+ throw new IOException("File was concurrently deleted: " + path);
+ else
+ fileInfo = fileInfo0;
- space = 0;
+ streamRange = initialStreamRange(fileInfo);
+
+ space = 0;
+ }
+ } catch (IgniteCheckedException e) {
+ throw new IOException("Failed to flush data [path=" + path + ", space=" + space + ']', e);
}
}
- catch (IgniteCheckedException e) {
- throw new IOException("Failed to flush data [path=" + path + ", space=" + space + ']', e);
- }
}
/**
@@ -355,7 +368,9 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
* @throws IgniteCheckedException If failed.
* @throws IOException If failed.
*/
- protected synchronized void storeDataBlock(ByteBuffer block) throws IgniteCheckedException, IOException {
+ protected void storeDataBlock(ByteBuffer block) throws IgniteCheckedException, IOException {
+ assert Thread.holdsLock(mux);
+
int writeLen = block.remaining();
preStoreDataBlocks(null, writeLen);
@@ -396,7 +411,9 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
* @throws IgniteCheckedException If failed.
* @throws IOException If failed.
*/
- protected synchronized void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException {
+ protected void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException {
+ assert Thread.holdsLock(mux);
+
preStoreDataBlocks(in, len);
int blockSize = fileInfo.blockSize();
@@ -434,7 +451,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
* @throws IOException If failed.
*/
private void onClose(boolean deleted) throws IOException {
- assert Thread.holdsLock(this);
+ assert Thread.holdsLock(mux);
if (onCloseGuard.compareAndSet(false, true)) {
// Notify backing secondary file system batch to finish.
@@ -516,7 +533,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
* @throws IOException If this stream is closed.
*/
protected void checkClosed(@Nullable DataInput in, int len) throws IOException {
- assert Thread.holdsLock(this);
+ assert Thread.holdsLock(mux);
if (closed) {
// Must read data from stream before throwing exception.
@@ -535,7 +552,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
* @throws IOException In case of IO exception.
*/
protected void sendData(boolean flip) throws IOException {
- assert Thread.holdsLock(this);
+ assert Thread.holdsLock(mux);
try {
if (flip)
[13/14] ignite git commit: Re-arranged fields.
Posted by vo...@apache.org.
Re-arranged fields.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cd92c9ec
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cd92c9ec
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cd92c9ec
Branch: refs/heads/ignite-3264
Commit: cd92c9ecaf2b2c9e1e7b9e2e7ed7c5aadf9be3d5
Parents: 93f8eca
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Jun 7 10:01:21 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Jun 7 10:01:21 2016 +0300
----------------------------------------------------------------------
.../processors/igfs/IgfsOutputStreamImpl.java | 42 ++++++++++----------
1 file changed, 21 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/cd92c9ec/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index f51e9b5..b90e34d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -50,12 +50,33 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
/** Maximum number of blocks in buffer. */
private static final int MAX_BLOCKS_CNT = 16;
+ /** IGFS context. */
+ private final IgfsContext igfsCtx;
+
/** Path to file. */
private final IgfsPath path;
/** Buffer size. */
private final int bufSize;
+ /** IGFS mode. */
+ private final IgfsMode mode;
+
+ /** File worker batch. */
+ private final IgfsFileWorkerBatch batch;
+
+ /** Write completion future. */
+ private final IgniteInternalFuture<Boolean> writeCompletionFut;
+
+ /** Ensures that onClose)_ routine is called no more than once. */
+ private final AtomicBoolean onCloseGuard = new AtomicBoolean();
+
+ /** Close guard. */
+ private final AtomicBoolean closeGuard = new AtomicBoolean(false);
+
+ /** Mutex for synchronization. */
+ private final Object mux = new Object();
+
/** Flag for this stream open/closed state. */
private boolean closed;
@@ -69,9 +90,6 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
/** Time consumed by write operations. */
private long time;
- /** IGFS context. */
- private IgfsContext igfsCtx;
-
/** File descriptor. */
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private IgfsEntryInfo fileInfo;
@@ -86,27 +104,9 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
/** Data length in remainder. */
private int remainderDataLen;
- /** Write completion future. */
- private final IgniteInternalFuture<Boolean> writeCompletionFut;
-
- /** IGFS mode. */
- private final IgfsMode mode;
-
- /** File worker batch. */
- private final IgfsFileWorkerBatch batch;
-
- /** Ensures that onClose)_ routine is called no more than once. */
- private final AtomicBoolean onCloseGuard = new AtomicBoolean();
-
/** Affinity written by this output stream. */
private IgfsFileAffinityRange streamRange;
- /** Close guard. */
- private final AtomicBoolean closeGuard = new AtomicBoolean(false);
-
- /** Mutex for synchronization. */
- private final Object mux = new Object();
-
/**
* Constructs file output stream.
*
[03/14] ignite git commit: Got rid of warns.
Posted by vo...@apache.org.
Got rid of warns.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/75b60800
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/75b60800
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/75b60800
Branch: refs/heads/ignite-3264
Commit: 75b608003baa74f76a5c2537b5f1cf01bab0ed38
Parents: d3a432c
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 11:57:31 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jun 6 11:57:31 2016 +0300
----------------------------------------------------------------------
.../ignite/internal/processors/igfs/IgfsOutputStreamImpl.java | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/75b60800/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index 7363ffe..c50c431 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -485,6 +485,8 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
// Await secondary file system processing to finish.
if (mode == DUAL_SYNC) {
try {
+ assert batch != null;
+
batch.await();
}
catch (IgniteCheckedException e) {
@@ -513,8 +515,11 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
}
else {
try {
- if (mode == DUAL_SYNC)
+ if (mode == DUAL_SYNC) {
+ assert batch != null;
+
batch.await();
+ }
}
catch (IgniteCheckedException e) {
throw new IOException("Failed to close secondary file system stream [path=" + path +