You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/09/23 15:26:55 UTC
[21/24] ignite git commit: IGNITE-3858 IGFS: Support direct PROXY
mode invocation in methods: create / append. This closes #1070. This closes
#1084.
IGNITE-3858 IGFS: Support direct PROXY mode invocation in methods: create / append. This closes #1070. This closes #1084.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a97483a4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a97483a4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a97483a4
Branch: refs/heads/ignite-comm-opts2
Commit: a97483a4ce2c00bd0cca025c4ef4bfa181897aa9
Parents: 0d5ee78
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Thu Sep 22 10:51:05 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Sep 22 10:51:05 2016 +0300
----------------------------------------------------------------------
.../igfs/IgfsAbstractOutputStream.java | 266 ++++++++++++++++
.../internal/processors/igfs/IgfsImpl.java | 27 +-
.../processors/igfs/IgfsOutputStreamImpl.java | 319 ++++---------------
.../igfs/IgfsOutputStreamProxyImpl.java | 163 ++++++++++
.../igfs/IgfsAbstractBaseSelfTest.java | 2 +-
5 files changed, 518 insertions(+), 259 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a97483a4/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractOutputStream.java
new file mode 100644
index 0000000..c1e751e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractOutputStream.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.events.IgfsEvent;
+import org.apache.ignite.igfs.IgfsOutputStream;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_WRITE;
+
+/**
+ * Output stream to store data into grid cache with separate blocks.
+ */
+abstract class IgfsAbstractOutputStream extends IgfsOutputStream {
+ /** IGFS context. */
+ protected final IgfsContext igfsCtx;
+
+ /** Path to file. */
+ protected final IgfsPath path;
+
+ /** Buffer size. */
+ protected final int bufSize;
+
+ /** File worker batch. */
+ protected final IgfsFileWorkerBatch batch;
+
+ /** Mutex for synchronization. */
+ protected final Object mux = new Object();
+
+ /** Flag for this stream open/closed state. */
+ protected boolean closed;
+
+ /** Local buffer to store stream data as consistent block. */
+ protected ByteBuffer buf;
+
+ /** Bytes written. */
+ protected long bytes;
+
+ /** Time consumed by write operations. */
+ protected long time;
+
+ /**
+ * Constructs file output stream.
+ *
+ * @param igfsCtx IGFS context.
+ * @param path Path to stored file.
+ * @param bufSize The size of the buffer to be used.
+ * @param batch Optional secondary file system batch.
+ */
+ IgfsAbstractOutputStream(IgfsContext igfsCtx, IgfsPath path, int bufSize, @Nullable IgfsFileWorkerBatch batch) {
+ synchronized (mux) {
+ this.path = path;
+ this.bufSize = optimizeBufferSize(bufSize);
+ this.igfsCtx = igfsCtx;
+ this.batch = batch;
+ }
+
+ igfsCtx.metrics().incrementFilesOpenedForWrite();
+ }
+
+ /**
+ * Optimize buffer size.
+ *
+ * @param bufSize Original byffer size.
+ * @return Optimized buffer size.
+ */
+ protected abstract int optimizeBufferSize(int bufSize);
+
+ /** {@inheritDoc} */
+ @Override public void write(int b) throws IOException {
+ synchronized (mux) {
+ checkClosed(null, 0);
+
+ b &= 0xFF;
+
+ long startTime = System.nanoTime();
+
+ if (buf == null)
+ buf = allocateNewBuffer();
+
+ buf.put((byte)b);
+
+ sendBufferIfFull();
+
+ time += System.nanoTime() - startTime;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("NullableProblems")
+ @Override public void write(byte[] b, int off, int len) throws IOException {
+ A.notNull(b, "b");
+
+ if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
+ throw new IndexOutOfBoundsException("Invalid bounds [data.length=" + b.length + ", offset=" + off +
+ ", length=" + len + ']');
+ }
+
+ synchronized (mux) {
+ checkClosed(null, 0);
+
+ // Check if there is anything to write.
+ if (len == 0)
+ return;
+
+ long startTime = System.nanoTime();
+
+ if (buf == null) {
+ if (len >= bufSize) {
+ // Send data right away.
+ ByteBuffer tmpBuf = ByteBuffer.wrap(b, off, len);
+
+ send(tmpBuf, tmpBuf.remaining());
+ }
+ else {
+ buf = allocateNewBuffer();
+
+ buf.put(b, off, len);
+ }
+ }
+ else {
+ // Re-allocate buffer if needed.
+ if (buf.remaining() < len)
+ buf = ByteBuffer.allocate(buf.position() + len).put((ByteBuffer)buf.flip());
+
+ buf.put(b, off, len);
+
+ sendBufferIfFull();
+ }
+
+ time += System.nanoTime() - startTime;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void transferFrom(DataInput in, int len) throws IOException {
+ synchronized (mux) {
+ checkClosed(in, len);
+
+ long startTime = System.nanoTime();
+
+ // Clean-up local buffer before streaming.
+ sendBufferIfNotEmpty();
+
+ // Perform transfer.
+ send(in, len);
+
+ time += System.nanoTime() - startTime;
+ }
+ }
+
+ /**
+ * Validate this stream is open.
+ *
+ * @param in Data input.
+ * @param len Data len in bytes.
+ * @throws IOException If this stream is closed.
+ */
+ protected void checkClosed(@Nullable DataInput in, int len) throws IOException {
+ assert Thread.holdsLock(mux);
+
+ if (closed) {
+ // Must read data from stream before throwing exception.
+ if (in != null)
+ in.skipBytes(len);
+
+ throw new IOException("Stream has been closed: " + this);
+ }
+ }
+
+ /**
+ * Send local buffer if it full.
+ *
+ * @throws IOException If failed.
+ */
+ private void sendBufferIfFull() throws IOException {
+ if (buf.position() >= bufSize)
+ sendBuffer();
+ }
+
+ /**
+ * Send local buffer if at least something is stored there.
+ *
+ * @throws IOException If failed.
+ */
+ void sendBufferIfNotEmpty() throws IOException {
+ if (buf != null && buf.position() > 0)
+ sendBuffer();
+ }
+
+ /**
+ * Send all local-buffered data to server.
+ *
+ * @throws IOException In case of IO exception.
+ */
+ private void sendBuffer() throws IOException {
+ buf.flip();
+
+ send(buf, buf.remaining());
+
+ buf = null;
+ }
+
+ /**
+ * Store data block.
+ *
+ * @param data Block.
+ * @param writeLen Write length.
+ * @throws IOException If failed.
+ */
+ protected abstract void send(Object data, int writeLen) throws IOException;
+
+ /**
+ * Allocate new buffer.
+ *
+ * @return New buffer.
+ */
+ private ByteBuffer allocateNewBuffer() {
+ return ByteBuffer.allocate(bufSize);
+ }
+
+ /**
+ * Updates IGFS metrics when the stream is closed.
+ */
+ protected void updateMetricsOnClose() {
+ IgfsLocalMetrics metrics = igfsCtx.metrics();
+
+ metrics.addWrittenBytesTime(bytes, time);
+ metrics.decrementFilesOpenedForWrite();
+
+ GridEventStorageManager evts = igfsCtx.kernalContext().event();
+
+ if (evts.isRecordable(EVT_IGFS_FILE_CLOSED_WRITE))
+ evts.record(new IgfsEvent(path, igfsCtx.localNode(),
+ EVT_IGFS_FILE_CLOSED_WRITE, bytes));
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgfsAbstractOutputStream.class, this);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/a97483a4/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 87a4699..bee9d9a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -92,7 +92,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -274,7 +274,7 @@ public final class IgfsImpl implements IgfsEx {
}
dualPool = secondaryFs != null ? new IgniteThreadPoolExecutor(4, Integer.MAX_VALUE, 5000L,
- new LinkedBlockingQueue<Runnable>(), new IgfsThreadFactory(cfg.getName()), null) : null;
+ new SynchronousQueue<Runnable>(), new IgfsThreadFactory(cfg.getName()), null) : null;
}
/** {@inheritDoc} */
@@ -1088,6 +1088,17 @@ public final class IgfsImpl implements IgfsEx {
else
dirProps = fileProps = new HashMap<>(props);
+ if (mode == PROXY) {
+ assert secondaryFs != null;
+
+ OutputStream secondaryStream = secondaryFs.create(path, bufSize, overwrite, replication,
+ groupBlockSize(), props);
+
+ IgfsFileWorkerBatch batch = newBatch(path, secondaryStream);
+
+ return new IgfsOutputStreamProxyImpl(igfsCtx, path, info(path), bufferSize(bufSize), batch);
+ }
+
// Prepare context for DUAL mode.
IgfsSecondaryFileSystemCreateContext secondaryCtx = null;
@@ -1142,7 +1153,15 @@ public final class IgfsImpl implements IgfsEx {
final IgfsMode mode = resolveMode(path);
- IgfsFileWorkerBatch batch;
+ if (mode == PROXY) {
+ assert secondaryFs != null;
+
+ OutputStream secondaryStream = secondaryFs.append(path, bufSize, create, props);
+
+ IgfsFileWorkerBatch batch = newBatch(path, secondaryStream);
+
+ return new IgfsOutputStreamProxyImpl(igfsCtx, path, info(path), bufferSize(bufSize), batch);
+ }
if (mode != PRIMARY) {
assert IgfsUtils.isDualMode(mode);
@@ -1151,7 +1170,7 @@ public final class IgfsImpl implements IgfsEx {
IgfsCreateResult desc = meta.appendDual(secondaryFs, path, bufSize, create);
- batch = newBatch(path, desc.secondaryOutputStream());
+ IgfsFileWorkerBatch batch = newBatch(path, desc.secondaryOutputStream());
return new IgfsOutputStreamImpl(igfsCtx, path, desc.info(), bufferSize(bufSize), mode, batch);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a97483a4/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index 6dec0c1..f976242 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -18,14 +18,10 @@
package org.apache.ignite.internal.processors.igfs;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.events.IgfsEvent;
import org.apache.ignite.igfs.IgfsException;
import org.apache.ignite.igfs.IgfsMode;
-import org.apache.ignite.igfs.IgfsOutputStream;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
-import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
@@ -35,7 +31,6 @@ import java.io.DataInput;
import java.io.IOException;
import java.nio.ByteBuffer;
-import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_WRITE;
import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
import static org.apache.ignite.igfs.IgfsMode.PROXY;
@@ -43,57 +38,30 @@ import static org.apache.ignite.igfs.IgfsMode.PROXY;
/**
* Output stream to store data into grid cache with separate blocks.
*/
-class IgfsOutputStreamImpl extends IgfsOutputStream {
+class IgfsOutputStreamImpl extends IgfsAbstractOutputStream {
/** Maximum number of blocks in buffer. */
private static final int MAX_BLOCKS_CNT = 16;
- /** IGFS context. */
- private final IgfsContext igfsCtx;
-
- /** Path to file. */
- private final IgfsPath path;
-
- /** Buffer size. */
- private final int bufSize;
-
/** IGFS mode. */
private final IgfsMode mode;
- /** File worker batch. */
- private final IgfsFileWorkerBatch batch;
-
- /** Mutex for synchronization. */
- private final Object mux = new Object();
-
/** Write completion future. */
private final IgniteInternalFuture<Boolean> writeFut;
- /** Flag for this stream open/closed state. */
- private boolean closed;
-
- /** Local buffer to store stream data as consistent block. */
- private ByteBuffer buf;
-
- /** Bytes written. */
- private long bytes;
-
- /** Time consumed by write operations. */
- private long time;
-
/** File descriptor. */
private IgfsEntryInfo fileInfo;
- /** Space in file to write data. */
- private long space;
+ /** Affinity written by this output stream. */
+ private IgfsFileAffinityRange streamRange;
+
+ /** Data length in remainder. */
+ protected int remainderDataLen;
/** Intermediate remainder to keep data. */
private byte[] remainder;
- /** Data length in remainder. */
- private int remainderDataLen;
-
- /** Affinity written by this output stream. */
- private IgfsFileAffinityRange streamRange;
+ /** Space in file to write data. */
+ protected long space;
/**
* Constructs file output stream.
@@ -107,6 +75,8 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
*/
IgfsOutputStreamImpl(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo fileInfo, int bufSize, IgfsMode mode,
@Nullable IgfsFileWorkerBatch batch) {
+ super(igfsCtx, path, bufSize, batch);
+
assert fileInfo != null && fileInfo.isFile() : "Unexpected file info: " + fileInfo;
assert mode != null && mode != PROXY && (mode == PRIMARY && batch == null || batch != null);
@@ -115,108 +85,55 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
throw new IgfsException("Failed to acquire file lock (concurrently modified?): " + path);
synchronized (mux) {
- this.path = path;
- this.bufSize = optimizeBufferSize(bufSize, fileInfo);
- this.igfsCtx = igfsCtx;
this.fileInfo = fileInfo;
this.mode = mode;
- this.batch = batch;
streamRange = initialStreamRange(fileInfo);
writeFut = igfsCtx.data().writeStart(fileInfo.id());
}
-
- igfsCtx.metrics().incrementFilesOpenedForWrite();
}
- /** {@inheritDoc} */
- @Override public void write(int b) throws IOException {
- synchronized (mux) {
- checkClosed(null, 0);
-
- b &= 0xFF;
-
- long startTime = System.nanoTime();
-
- if (buf == null)
- buf = allocateNewBuffer();
-
- buf.put((byte)b);
-
- sendBufferIfFull();
-
- time += System.nanoTime() - startTime;
- }
+ /**
+ * @return Length of file.
+ */
+ private long length() {
+ return fileInfo.length();
}
/** {@inheritDoc} */
- @SuppressWarnings("NullableProblems")
- @Override public void write(byte[] b, int off, int len) throws IOException {
- A.notNull(b, "b");
-
- if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
- throw new IndexOutOfBoundsException("Invalid bounds [data.length=" + b.length + ", offset=" + off +
- ", length=" + len + ']');
- }
-
- synchronized (mux) {
- checkClosed(null, 0);
-
- // Check if there is anything to write.
- if (len == 0)
- return;
-
- long startTime = System.nanoTime();
-
- if (buf == null) {
- if (len >= bufSize) {
- // Send data right away.
- ByteBuffer tmpBuf = ByteBuffer.wrap(b, off, len);
-
- send(tmpBuf, tmpBuf.remaining());
- }
- else {
- buf = allocateNewBuffer();
-
- buf.put(b, off, len);
- }
- }
- else {
- // Re-allocate buffer if needed.
- if (buf.remaining() < len)
- buf = ByteBuffer.allocate(buf.position() + len).put((ByteBuffer)buf.flip());
+ @Override protected int optimizeBufferSize(int bufSize) {
+ assert bufSize > 0;
- buf.put(b, off, len);
+ if (fileInfo == null)
+ return bufSize;
- sendBufferIfFull();
- }
+ int blockSize = fileInfo.blockSize();
- time += System.nanoTime() - startTime;
- }
- }
+ if (blockSize <= 0)
+ return bufSize;
- /** {@inheritDoc} */
- @Override public void transferFrom(DataInput in, int len) throws IOException {
- synchronized (mux) {
- checkClosed(in, len);
+ if (bufSize <= blockSize)
+ // Optimize minimum buffer size to be equal file's block size.
+ return blockSize;
- long startTime = System.nanoTime();
+ int maxBufSize = blockSize * MAX_BLOCKS_CNT;
- // Clean-up local buffer before streaming.
- sendBufferIfNotEmpty();
+ if (bufSize > maxBufSize)
+ // There is no profit or optimization from larger buffers.
+ return maxBufSize;
- // Perform transfer.
- send(in, len);
+ if (fileInfo.length() == 0)
+ // Make buffer size multiple of block size (optimized for new files).
+ return bufSize / blockSize * blockSize;
- time += System.nanoTime() - startTime;
- }
+ return bufSize;
}
/**
* Flushes this output stream and forces any buffered output bytes to be written out.
*
- * @exception IOException if an I/O error occurs.
+ * @throws IOException if an I/O error occurs.
*/
@Override public void flush() throws IOException {
synchronized (mux) {
@@ -250,40 +167,6 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
}
}
- /**
- * Await acknowledgments.
- *
- * @throws IOException If failed.
- */
- private void awaitAcks() throws IOException {
- try {
- igfsCtx.data().awaitAllAcksReceived(fileInfo.id());
- }
- catch (IgniteCheckedException e) {
- throw new IOException("Failed to wait for flush acknowledge: " + fileInfo.id, e);
- }
- }
-
- /**
- * Flush remainder.
- *
- * @throws IOException If failed.
- */
- private void flushRemainder() throws IOException {
- try {
- if (remainder != null) {
- igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, null, 0,
- ByteBuffer.wrap(remainder, 0, remainderDataLen), true, streamRange, batch);
-
- remainder = null;
- remainderDataLen = 0;
- }
- }
- catch (IgniteCheckedException e) {
- throw new IOException("Failed to flush data (remainder) [path=" + path + ", space=" + space + ']', e);
- }
- }
-
/** {@inheritDoc} */
@Override public final void close() throws IOException {
synchronized (mux) {
@@ -355,75 +238,33 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
if (err != null)
throw err;
- igfsCtx.metrics().addWrittenBytesTime(bytes, time);
- igfsCtx.metrics().decrementFilesOpenedForWrite();
-
- GridEventStorageManager evts = igfsCtx.kernalContext().event();
-
- if (evts.isRecordable(EVT_IGFS_FILE_CLOSED_WRITE))
- evts.record(new IgfsEvent(path, igfsCtx.kernalContext().discovery().localNode(),
- EVT_IGFS_FILE_CLOSED_WRITE, bytes));
- }
- }
-
- /**
- * Validate this stream is open.
- *
- * @throws IOException If this stream is closed.
- */
- private void checkClosed(@Nullable DataInput in, int len) throws IOException {
- assert Thread.holdsLock(mux);
-
- if (closed) {
- // Must read data from stream before throwing exception.
- if (in != null)
- in.skipBytes(len);
-
- throw new IOException("Stream has been closed: " + this);
+ updateMetricsOnClose();
}
}
/**
- * Send local buffer if it full.
- *
- * @throws IOException If failed.
- */
- private void sendBufferIfFull() throws IOException {
- if (buf.position() >= bufSize)
- sendBuffer();
- }
-
- /**
- * Send local buffer if at least something is stored there.
+ * Flush remainder.
*
* @throws IOException If failed.
*/
- private void sendBufferIfNotEmpty() throws IOException {
- if (buf != null && buf.position() > 0)
- sendBuffer();
- }
-
- /**
- * Send all local-buffered data to server.
- *
- * @throws IOException In case of IO exception.
- */
- private void sendBuffer() throws IOException {
- buf.flip();
+ private void flushRemainder() throws IOException {
+ try {
+ if (remainder != null) {
- send(buf, buf.remaining());
+ remainder = igfsCtx.data().storeDataBlocks(fileInfo, length() + space, null,
+ 0, ByteBuffer.wrap(remainder, 0, remainderDataLen), true, streamRange, batch);
- buf = null;
+ remainder = null;
+ remainderDataLen = 0;
+ }
+ }
+ catch (IgniteCheckedException e) {
+ throw new IOException("Failed to flush data (remainder) [path=" + path + ", space=" + space + ']', e);
+ }
}
- /**
- * Store data block.
- *
- * @param data Block.
- * @param writeLen Write length.
- * @throws IOException If failed.
- */
- private void send(Object data, int writeLen) throws IOException {
+ /** {@inheritDoc} */
+ @Override protected void send(Object data, int writeLen) throws IOException {
assert Thread.holdsLock(mux);
assert data instanceof ByteBuffer || data instanceof DataInput;
@@ -449,20 +290,20 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
}
if (data instanceof ByteBuffer)
- ((ByteBuffer) data).get(remainder, remainderDataLen, writeLen);
+ ((ByteBuffer)data).get(remainder, remainderDataLen, writeLen);
else
- ((DataInput) data).readFully(remainder, remainderDataLen, writeLen);
+ ((DataInput)data).readFully(remainder, remainderDataLen, writeLen);
remainderDataLen += writeLen;
}
else {
if (data instanceof ByteBuffer) {
- remainder = igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, remainder,
- remainderDataLen, (ByteBuffer) data, false, streamRange, batch);
+ remainder = igfsCtx.data().storeDataBlocks(fileInfo, length() + space, remainder,
+ remainderDataLen, (ByteBuffer)data, false, streamRange, batch);
}
else {
- remainder = igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, remainder,
- remainderDataLen, (DataInput) data, writeLen, false, streamRange, batch);
+ remainder = igfsCtx.data().storeDataBlocks(fileInfo, length() + space, remainder,
+ remainderDataLen, (DataInput)data, writeLen, false, streamRange, batch);
}
remainderDataLen = remainder == null ? 0 : remainder.length;
@@ -474,12 +315,17 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
}
/**
- * Allocate new buffer.
+ * Await acknowledgments.
*
- * @return New buffer.
+ * @throws IOException If failed.
*/
- private ByteBuffer allocateNewBuffer() {
- return ByteBuffer.allocate(bufSize);
+ private void awaitAcks() throws IOException {
+ try {
+ igfsCtx.data().awaitAllAcksReceived(fileInfo.id());
+ }
+ catch (IgniteCheckedException e) {
+ throw new IOException("Failed to wait for flush acknowledge: " + fileInfo.id, e);
+ }
}
/**
@@ -516,41 +362,6 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
return affKey == null ? null : new IgfsFileAffinityRange(off, off, affKey);
}
- /**
- * Optimize buffer size.
- *
- * @param bufSize Requested buffer size.
- * @param fileInfo File info.
- * @return Optimized buffer size.
- */
- private static int optimizeBufferSize(int bufSize, IgfsEntryInfo fileInfo) {
- assert bufSize > 0;
-
- if (fileInfo == null)
- return bufSize;
-
- int blockSize = fileInfo.blockSize();
-
- if (blockSize <= 0)
- return bufSize;
-
- if (bufSize <= blockSize)
- // Optimize minimum buffer size to be equal file's block size.
- return blockSize;
-
- int maxBufSize = blockSize * MAX_BLOCKS_CNT;
-
- if (bufSize > maxBufSize)
- // There is no profit or optimization from larger buffers.
- return maxBufSize;
-
- if (fileInfo.length() == 0)
- // Make buffer size multiple of block size (optimized for new files).
- return bufSize / blockSize * blockSize;
-
- return bufSize;
- }
-
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(IgfsOutputStreamImpl.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a97483a4/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamProxyImpl.java
new file mode 100644
index 0000000..7b74a1f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamProxyImpl.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.igfs.IgfsFile;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Output stream to store data into grid cache with separate blocks.
+ */
+class IgfsOutputStreamProxyImpl extends IgfsAbstractOutputStream {
+ /** File info. */
+ private IgfsFile info;
+
+ /**
+ * Constructs file output stream.
+ *
+ * @param igfsCtx IGFS context.
+ * @param path Path to stored file.
+ * @param info File info.
+ * @param bufSize The size of the buffer to be used.
+ * @param batch Optional secondary file system batch.
+ */
+ IgfsOutputStreamProxyImpl(IgfsContext igfsCtx, IgfsPath path, IgfsFile info, int bufSize,
+ @Nullable IgfsFileWorkerBatch batch) {
+ super(igfsCtx, path, bufSize, batch);
+
+ assert batch != null;
+
+ this.info = info;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int optimizeBufferSize(int bufSize) {
+ assert bufSize > 0;
+
+ return bufSize;
+ }
+
+ /**
+ * Flushes this output stream and forces any buffered output bytes to be written out.
+ *
+ * @throws IOException if an I/O error occurs.
+ */
+ @Override public void flush() throws IOException {
+ synchronized (mux) {
+ checkClosed(null, 0);
+
+ sendBufferIfNotEmpty();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public final void close() throws IOException {
+ synchronized (mux) {
+ // Do nothing if stream is already closed.
+ if (closed)
+ return;
+
+ // Set closed flag immediately.
+ closed = true;
+
+ // Flush data.
+ IOException err = null;
+
+ try {
+ sendBufferIfNotEmpty();
+ }
+ catch (Exception e) {
+ err = new IOException("Failed to flush data during stream close [path=" + path +
+ ", fileInfo=" + info + ']', e);
+ }
+
+ // Finish batch before file unlocking to support the assertion that unlocked file batch,
+ // if any, must be in finishing state (e.g. append see more IgfsImpl.newBatch)
+ batch.finish();
+
+ // Finally, await secondary file system flush.
+ try {
+ batch.await();
+ }
+ catch (IgniteCheckedException e) {
+ if (err == null)
+ err = new IOException("Failed to close secondary file system stream [path=" + path +
+ ", fileInfo=" + info + ']', e);
+ else
+ err.addSuppressed(e);
+ }
+
+ // Throw error, if any.
+ if (err != null)
+ throw err;
+
+ updateMetricsOnClose();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void send(Object data, int writeLen) throws IOException {
+ assert Thread.holdsLock(mux);
+ assert data instanceof ByteBuffer || data instanceof DataInput;
+
+ try {
+ // Increment metrics.
+ bytes += writeLen;
+
+ byte [] dataBuf = new byte[writeLen];
+
+ if (data instanceof ByteBuffer) {
+ ByteBuffer byteBuf = (ByteBuffer)data;
+
+ byteBuf.get(dataBuf);
+ }
+ else {
+ DataInput dataIn = (DataInput)data;
+
+ try {
+ dataIn.readFully(dataBuf);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ if (!batch.write(dataBuf))
+ throw new IgniteCheckedException("Cannot write more data to the secondary file system output " +
+ "stream because it was marked as closed: " + batch.path());
+ else
+ igfsCtx.metrics().addWriteBlocks(1, 1);
+
+ }
+ catch (IgniteCheckedException e) {
+ throw new IOException("Failed to store data into file: " + path, e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgfsOutputStreamProxyImpl.class, this);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/a97483a4/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractBaseSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractBaseSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractBaseSelfTest.java
index 3f62cf5..14a653b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractBaseSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractBaseSelfTest.java
@@ -904,7 +904,7 @@ public abstract class IgfsAbstractBaseSelfTest extends IgfsCommonAbstractTest {
protected void clear(IgniteFileSystem igfs, IgfsSecondaryFileSystemTestAdapter igfsSecondary) throws Exception {
clear(igfs);
- if (dual)
+ if (mode != PRIMARY)
clear(igfsSecondary);
}