You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/05 09:44:50 UTC
[23/38] incubator-ignite git commit: # Renaming
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e4cd4d8/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsOutputStreamImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsOutputStreamImpl.java
deleted file mode 100644
index 8e58787..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsOutputStreamImpl.java
+++ /dev/null
@@ -1,497 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.ggfs;
-
-import org.apache.ignite.*;
-import org.apache.ignite.lang.*;
-import org.gridgain.grid.*;
-import org.gridgain.grid.ggfs.*;
-import org.gridgain.grid.kernal.processors.task.*;
-import org.gridgain.grid.util.typedef.internal.*;
-import org.gridgain.grid.util.future.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.nio.*;
-import java.util.concurrent.atomic.*;
-
-import static org.gridgain.grid.ggfs.GridGgfsMode.*;
-
-/**
- * Output stream to store data into grid cache with separate blocks.
- */
-class GridGgfsOutputStreamImpl extends GridGgfsOutputStreamAdapter {
- /** Maximum number of blocks in buffer. */
- private static final int MAX_BLOCKS_CNT = 16;
-
- /** GGFS context. */
- private GridGgfsContext ggfsCtx;
-
- /** Meta info manager. */
- private final GridGgfsMetaManager meta;
-
- /** Data manager. */
- private final GridGgfsDataManager data;
-
- /** File descriptor. */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private GridGgfsFileInfo fileInfo;
-
- /** Parent ID. */
- private final IgniteUuid parentId;
-
- /** File name. */
- private final String fileName;
-
- /** Space in file to write data. */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private long space;
-
- /** Intermediate remainder to keep data. */
- private byte[] remainder;
-
- /** Data length in remainder. */
- private int remainderDataLen;
-
- /** Write completion future. */
- private final IgniteFuture<Boolean> writeCompletionFut;
-
- /** GGFS mode. */
- private final GridGgfsMode mode;
-
- /** File worker batch. */
- private final GridGgfsFileWorkerBatch batch;
-
- /** Ensures that onClose)_ routine is called no more than once. */
- private final AtomicBoolean onCloseGuard = new AtomicBoolean();
-
- /** Local GGFS metrics. */
- private final GridGgfsLocalMetrics metrics;
-
- /** Affinity written by this output stream. */
- private GridGgfsFileAffinityRange streamRange;
-
- /**
- * Constructs file output stream.
- *
- * @param ggfsCtx GGFS context.
- * @param path Path to stored file.
- * @param fileInfo File info to write binary data to.
- * @param bufSize The size of the buffer to be used.
- * @param mode Grid GGFS mode.
- * @param batch Optional secondary file system batch.
- * @param metrics Local GGFs metrics.
- * @throws GridException If stream creation failed.
- */
- GridGgfsOutputStreamImpl(GridGgfsContext ggfsCtx, IgniteFsPath path, GridGgfsFileInfo fileInfo, IgniteUuid parentId,
- int bufSize, GridGgfsMode mode, @Nullable GridGgfsFileWorkerBatch batch, GridGgfsLocalMetrics metrics)
- throws GridException {
- super(path, optimizeBufferSize(bufSize, fileInfo));
-
- assert fileInfo != null;
- assert fileInfo.isFile() : "Unexpected file info: " + fileInfo;
- assert mode != null && mode != PROXY;
- assert mode == PRIMARY && batch == null || batch != null;
- assert metrics != null;
-
- // File hasn't been locked.
- if (fileInfo.lockId() == null)
- throw new IgniteFsException("Failed to acquire file lock (concurrently modified?): " + path);
-
- this.ggfsCtx = ggfsCtx;
- meta = ggfsCtx.meta();
- data = ggfsCtx.data();
-
- this.fileInfo = fileInfo;
- this.mode = mode;
- this.batch = batch;
- this.parentId = parentId;
- this.metrics = metrics;
-
- streamRange = initialStreamRange(fileInfo);
-
- fileName = path.name();
-
- writeCompletionFut = data.writeStart(fileInfo);
- }
-
- /**
- * Optimize buffer size.
- *
- * @param bufSize Requested buffer size.
- * @param fileInfo File info.
- * @return Optimized buffer size.
- */
- @SuppressWarnings("IfMayBeConditional")
- private static int optimizeBufferSize(int bufSize, GridGgfsFileInfo fileInfo) {
- assert bufSize > 0;
-
- if (fileInfo == null)
- return bufSize;
-
- int blockSize = fileInfo.blockSize();
-
- if (blockSize <= 0)
- return bufSize;
-
- if (bufSize <= blockSize)
- // Optimize minimum buffer size to be equal file's block size.
- return blockSize;
-
- int maxBufSize = blockSize * MAX_BLOCKS_CNT;
-
- if (bufSize > maxBufSize)
- // There is no profit or optimization from larger buffers.
- return maxBufSize;
-
- if (fileInfo.length() == 0)
- // Make buffer size multiple of block size (optimized for new files).
- return bufSize / blockSize * blockSize;
-
- return bufSize;
- }
-
- /** {@inheritDoc} */
- @Override protected synchronized void storeDataBlock(ByteBuffer block) throws GridException, IOException {
- int writeLen = block.remaining();
-
- preStoreDataBlocks(null, writeLen);
-
- int blockSize = fileInfo.blockSize();
-
- // If data length is not enough to fill full block, fill the remainder and return.
- if (remainderDataLen + writeLen < blockSize) {
- if (remainder == null)
- remainder = new byte[blockSize];
- else if (remainder.length != blockSize) {
- assert remainderDataLen == remainder.length;
-
- byte[] allocated = new byte[blockSize];
-
- U.arrayCopy(remainder, 0, allocated, 0, remainder.length);
-
- remainder = allocated;
- }
-
- block.get(remainder, remainderDataLen, writeLen);
-
- remainderDataLen += writeLen;
- }
- else {
- remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, block,
- false, streamRange, batch);
-
- remainderDataLen = remainder == null ? 0 : remainder.length;
- }
- }
-
- /** {@inheritDoc} */
- @Override protected synchronized void storeDataBlocks(DataInput in, int len) throws GridException, IOException {
- preStoreDataBlocks(in, len);
-
- int blockSize = fileInfo.blockSize();
-
- // If data length is not enough to fill full block, fill the remainder and return.
- if (remainderDataLen + len < blockSize) {
- if (remainder == null)
- remainder = new byte[blockSize];
- else if (remainder.length != blockSize) {
- assert remainderDataLen == remainder.length;
-
- byte[] allocated = new byte[blockSize];
-
- U.arrayCopy(remainder, 0, allocated, 0, remainder.length);
-
- remainder = allocated;
- }
-
- in.readFully(remainder, remainderDataLen, len);
-
- remainderDataLen += len;
- }
- else {
- remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, in, len,
- false, streamRange, batch);
-
- remainderDataLen = remainder == null ? 0 : remainder.length;
- }
- }
-
- /**
- * Initializes data loader if it was not initialized yet and updates written space.
- *
- * @param len Data length to be written.
- */
- private void preStoreDataBlocks(@Nullable DataInput in, int len) throws GridException, IOException {
- // Check if any exception happened while writing data.
- if (writeCompletionFut.isDone()) {
- assert ((GridFutureAdapter)writeCompletionFut).isFailed();
-
- if (in != null)
- in.skipBytes(len);
-
- writeCompletionFut.get();
- }
-
- bytes += len;
- space += len;
- }
-
- /**
- * Flushes this output stream and forces any buffered output bytes to be written out.
- *
- * @exception IOException if an I/O error occurs.
- */
- @Override public synchronized void flush() throws IOException {
- boolean exists;
-
- try {
- exists = meta.exists(fileInfo.id());
- }
- catch (GridException e) {
- throw new IOError(e); // Something unrecoverable.
- }
-
- if (!exists) {
- onClose(true);
-
- throw new IOException("File was concurrently deleted: " + path);
- }
-
- super.flush();
-
- try {
- if (remainder != null) {
- data.storeDataBlocks(fileInfo, fileInfo.length() + space, null, 0,
- ByteBuffer.wrap(remainder, 0, remainderDataLen), true, streamRange, batch);
-
- remainder = null;
- remainderDataLen = 0;
- }
-
- if (space > 0) {
- GridGgfsFileInfo fileInfo0 = meta.updateInfo(fileInfo.id(),
- new ReserveSpaceClosure(space, streamRange));
-
- if (fileInfo0 == null)
- throw new IOException("File was concurrently deleted: " + path);
- else
- fileInfo = fileInfo0;
-
- streamRange = initialStreamRange(fileInfo);
-
- space = 0;
- }
- }
- catch (GridException e) {
- throw new IOException("Failed to flush data [path=" + path + ", space=" + space + ']', e);
- }
- }
-
- /** {@inheritDoc} */
- @Override protected void onClose() throws IOException {
- onClose(false);
- }
-
- /**
- * Close callback. It will be called only once in synchronized section.
- *
- * @param deleted Whether we already know that the file was deleted.
- * @throws IOException If failed.
- */
- private void onClose(boolean deleted) throws IOException {
- assert Thread.holdsLock(this);
-
- if (onCloseGuard.compareAndSet(false, true)) {
- // Notify backing secondary file system batch to finish.
- if (mode != PRIMARY) {
- assert batch != null;
-
- batch.finish();
- }
-
- // Ensure file existence.
- boolean exists;
-
- try {
- exists = !deleted && meta.exists(fileInfo.id());
- }
- catch (GridException e) {
- throw new IOError(e); // Something unrecoverable.
- }
-
- if (exists) {
- IOException err = null;
-
- try {
- data.writeClose(fileInfo);
-
- writeCompletionFut.get();
- }
- catch (GridException e) {
- err = new IOException("Failed to close stream [path=" + path + ", fileInfo=" + fileInfo + ']', e);
- }
-
- metrics.addWrittenBytesTime(bytes, time);
-
- // Await secondary file system processing to finish.
- if (mode == DUAL_SYNC) {
- try {
- batch.await();
- }
- catch (GridException e) {
- if (err == null)
- err = new IOException("Failed to close secondary file system stream [path=" + path +
- ", fileInfo=" + fileInfo + ']', e);
- }
- }
-
- long modificationTime = System.currentTimeMillis();
-
- try {
- meta.unlock(fileInfo, modificationTime);
- }
- catch (IgniteFsFileNotFoundException ignore) {
- data.delete(fileInfo); // Safety to ensure that all data blocks are deleted.
-
- throw new IOException("File was concurrently deleted: " + path);
- }
- catch (GridException e) {
- throw new IOError(e); // Something unrecoverable.
- }
-
- meta.updateParentListingAsync(parentId, fileInfo.id(), fileName, bytes, modificationTime);
-
- if (err != null)
- throw err;
- }
- else {
- try {
- if (mode == DUAL_SYNC)
- batch.await();
- }
- catch (GridException e) {
- throw new IOException("Failed to close secondary file system stream [path=" + path +
- ", fileInfo=" + fileInfo + ']', e);
- }
- finally {
- data.delete(fileInfo);
- }
- }
- }
- }
-
- /**
- * Gets initial affinity range. This range will have 0 length and will start from first
- * non-occupied file block.
- *
- * @param fileInfo File info to build initial range for.
- * @return Affinity range.
- */
- private GridGgfsFileAffinityRange initialStreamRange(GridGgfsFileInfo fileInfo) {
- if (!ggfsCtx.configuration().isFragmentizerEnabled())
- return null;
-
- if (!Boolean.parseBoolean(fileInfo.properties().get(IgniteFs.PROP_PREFER_LOCAL_WRITES)))
- return null;
-
- int blockSize = fileInfo.blockSize();
-
- // Find first non-occupied block offset.
- long off = ((fileInfo.length() + blockSize - 1) / blockSize) * blockSize;
-
- // Need to get last affinity key and reuse it if we are on the same node.
- long lastBlockOff = off - fileInfo.blockSize();
-
- if (lastBlockOff < 0)
- lastBlockOff = 0;
-
- GridGgfsFileMap map = fileInfo.fileMap();
-
- IgniteUuid prevAffKey = map == null ? null : map.affinityKey(lastBlockOff, false);
-
- IgniteUuid affKey = data.nextAffinityKey(prevAffKey);
-
- return affKey == null ? null : new GridGgfsFileAffinityRange(off, off, affKey);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridGgfsOutputStreamImpl.class, this);
- }
-
- /**
- * Helper closure to reserve specified space and update file's length
- */
- @GridInternal
- private static final class ReserveSpaceClosure implements IgniteClosure<GridGgfsFileInfo, GridGgfsFileInfo>,
- Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Space amount (bytes number) to increase file's length. */
- private long space;
-
- /** Affinity range for this particular update. */
- private GridGgfsFileAffinityRange range;
-
- /**
- * Empty constructor required for {@link Externalizable}.
- *
- */
- public ReserveSpaceClosure() {
- // No-op.
- }
-
- /**
- * Constructs the closure to reserve specified space and update file's length.
- *
- * @param space Space amount (bytes number) to increase file's length.
- * @param range Affinity range specifying which part of file was colocated.
- */
- private ReserveSpaceClosure(long space, GridGgfsFileAffinityRange range) {
- this.space = space;
- this.range = range;
- }
-
- /** {@inheritDoc} */
- @Override public GridGgfsFileInfo apply(GridGgfsFileInfo oldInfo) {
- GridGgfsFileMap oldMap = oldInfo.fileMap();
-
- GridGgfsFileMap newMap = new GridGgfsFileMap(oldMap);
-
- newMap.addRange(range);
-
- // Update file length.
- GridGgfsFileInfo updated = new GridGgfsFileInfo(oldInfo, oldInfo.length() + space);
-
- updated.fileMap(newMap);
-
- return updated;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeLong(space);
- out.writeObject(range);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- space = in.readLong();
- range = (GridGgfsFileAffinityRange)in.readObject();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(ReserveSpaceClosure.class, this);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e4cd4d8/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsPaths.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsPaths.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsPaths.java
index 12b401c..5dbb004 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsPaths.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsPaths.java
@@ -28,10 +28,10 @@ public class GridGgfsPaths implements Externalizable {
private Map<String, String> props;
/** Default GGFS mode. */
- private GridGgfsMode dfltMode;
+ private IgniteFsMode dfltMode;
/** Path modes. */
- private List<T2<IgniteFsPath, GridGgfsMode>> pathModes;
+ private List<T2<IgniteFsPath, IgniteFsMode>> pathModes;
/**
* Empty constructor required by {@link Externalizable}.
@@ -47,8 +47,8 @@ public class GridGgfsPaths implements Externalizable {
* @param dfltMode Default GGFS mode.
* @param pathModes Path modes.
*/
- public GridGgfsPaths(Map<String, String> props, GridGgfsMode dfltMode, @Nullable List<T2<IgniteFsPath,
- GridGgfsMode>> pathModes) {
+ public GridGgfsPaths(Map<String, String> props, IgniteFsMode dfltMode, @Nullable List<T2<IgniteFsPath,
+ IgniteFsMode>> pathModes) {
this.props = props;
this.dfltMode = dfltMode;
this.pathModes = pathModes;
@@ -64,14 +64,14 @@ public class GridGgfsPaths implements Externalizable {
/**
* @return Default GGFS mode.
*/
- public GridGgfsMode defaultMode() {
+ public IgniteFsMode defaultMode() {
return dfltMode;
}
/**
* @return Path modes.
*/
- @Nullable public List<T2<IgniteFsPath, GridGgfsMode>> pathModes() {
+ @Nullable public List<T2<IgniteFsPath, IgniteFsMode>> pathModes() {
return pathModes;
}
@@ -84,7 +84,7 @@ public class GridGgfsPaths implements Externalizable {
out.writeBoolean(true);
out.writeInt(pathModes.size());
- for (T2<IgniteFsPath, GridGgfsMode> pathMode : pathModes) {
+ for (T2<IgniteFsPath, IgniteFsMode> pathMode : pathModes) {
pathMode.getKey().writeExternal(out);
U.writeEnum0(out, pathMode.getValue());
}
@@ -96,7 +96,7 @@ public class GridGgfsPaths implements Externalizable {
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
props = U.readStringMap(in);
- dfltMode = GridGgfsMode.fromOrdinal(U.readEnumOrdinal0(in));
+ dfltMode = IgniteFsMode.fromOrdinal(U.readEnumOrdinal0(in));
if (in.readBoolean()) {
int size = in.readInt();
@@ -107,7 +107,7 @@ public class GridGgfsPaths implements Externalizable {
IgniteFsPath path = new IgniteFsPath();
path.readExternal(in);
- T2<IgniteFsPath, GridGgfsMode> entry = new T2<>(path, GridGgfsMode.fromOrdinal(U.readEnumOrdinal0(in)));
+ T2<IgniteFsPath, IgniteFsMode> entry = new T2<>(path, IgniteFsMode.fromOrdinal(U.readEnumOrdinal0(in)));
pathModes.add(entry);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e4cd4d8/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessor.java
index 0b37e55..3d26dcd 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessor.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsProcessor.java
@@ -35,7 +35,7 @@ import java.util.concurrent.*;
import static org.gridgain.grid.GridSystemProperties.*;
import static org.gridgain.grid.cache.GridCacheMemoryMode.*;
import static org.gridgain.grid.cache.GridCacheMode.*;
-import static org.gridgain.grid.ggfs.GridGgfsMode.*;
+import static org.gridgain.grid.ggfs.IgniteFsMode.*;
import static org.gridgain.grid.kernal.GridNodeAttributes.*;
import static org.gridgain.grid.kernal.processors.license.GridLicenseSubsystem.*;
@@ -360,7 +360,7 @@ public class GridGgfsProcessor extends GridGgfsProcessorAdapter {
boolean secondary = cfg.getDefaultMode() == PROXY;
if (cfg.getPathModes() != null) {
- for (Map.Entry<String, GridGgfsMode> mode : cfg.getPathModes().entrySet()) {
+ for (Map.Entry<String, IgniteFsMode> mode : cfg.getPathModes().entrySet()) {
if (mode.getValue() == PROXY)
secondary = true;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e4cd4d8/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsOutputStreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsOutputStreamAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsOutputStreamAdapter.java
new file mode 100644
index 0000000..b1cdf1b
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsOutputStreamAdapter.java
@@ -0,0 +1,255 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.ggfs;
+
+import org.gridgain.grid.*;
+import org.gridgain.grid.ggfs.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.nio.*;
+
+/**
+ * Output stream to store data into grid cache with separate blocks.
+ */
+@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
+abstract class IgniteFsOutputStreamAdapter extends IgniteFsOutputStream {
+ /** Path to file. */
+ protected final IgniteFsPath path;
+
+ /** Buffer size. */
+ private final int bufSize;
+
+ /** Flag for this stream open/closed state. */
+ private boolean closed;
+
+ /** Local buffer to store stream data as consistent block. */
+ private ByteBuffer buf;
+
+ /** Bytes written. */
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ protected long bytes;
+
+ /** Time consumed by write operations. */
+ protected long time;
+
+ /**
+ * Constructs file output stream.
+ *
+ * @param path Path to stored file.
+ * @param bufSize The size of the buffer to be used.
+ */
+ IgniteFsOutputStreamAdapter(IgniteFsPath path, int bufSize) {
+ assert path != null;
+ assert bufSize > 0;
+
+ this.path = path;
+ this.bufSize = bufSize;
+ }
+
+ /**
+ * Gets number of written bytes.
+ *
+ * @return Written bytes.
+ */
+ public long bytes() {
+ return bytes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void write(int b) throws IOException {
+ checkClosed(null, 0);
+
+ long startTime = System.nanoTime();
+
+ b &= 0xFF;
+
+ if (buf == null)
+ buf = ByteBuffer.allocate(bufSize);
+
+ buf.put((byte)b);
+
+ if (buf.position() >= bufSize)
+ sendData(true); // Send data to server.
+
+ time += System.nanoTime() - startTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void write(byte[] b, int off, int len) throws IOException {
+ A.notNull(b, "b");
+
+ if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
+ throw new IndexOutOfBoundsException("Invalid bounds [data.length=" + b.length + ", offset=" + off +
+ ", length=" + len + ']');
+ }
+
+ checkClosed(null, 0);
+
+ if (len == 0)
+ return; // Done.
+
+ long startTime = System.nanoTime();
+
+ if (buf == null) {
+ // Do not allocate and copy byte buffer if will send data immediately.
+ if (len >= bufSize) {
+ buf = ByteBuffer.wrap(b, off, len);
+
+ sendData(false);
+
+ return;
+ }
+
+ buf = ByteBuffer.allocate(Math.max(bufSize, len));
+ }
+
+ if (buf.remaining() < len)
+ // Expand buffer capacity, if remaining size is less then data size.
+ buf = ByteBuffer.allocate(buf.position() + len).put((ByteBuffer)buf.flip());
+
+ assert len <= buf.remaining() : "Expects write data size less or equal then remaining buffer capacity " +
+ "[len=" + len + ", buf.remaining=" + buf.remaining() + ']';
+
+ buf.put(b, off, len);
+
+ if (buf.position() >= bufSize)
+ sendData(true); // Send data to server.
+
+ time += System.nanoTime() - startTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void transferFrom(DataInput in, int len) throws IOException {
+ checkClosed(in, len);
+
+ long startTime = System.nanoTime();
+
+ // Send all IPC data from the local buffer before streaming.
+ if (buf != null && buf.position() > 0)
+ sendData(true);
+
+ try {
+ storeDataBlocks(in, len);
+ }
+ catch (GridException e) {
+ throw new IOException(e.getMessage(), e);
+ }
+
+ time += System.nanoTime() - startTime;
+ }
+
+ /**
+ * Flushes this output stream and forces any buffered output bytes to be written out.
+ *
+ * @exception IOException if an I/O error occurs.
+ */
+ @Override public synchronized void flush() throws IOException {
+ checkClosed(null, 0);
+
+ // Send all IPC data from the local buffer.
+ if (buf != null && buf.position() > 0)
+ sendData(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public final synchronized void close() throws IOException {
+ // Do nothing if stream is already closed.
+ if (closed)
+ return;
+
+ try {
+ // Send all IPC data from the local buffer.
+ try {
+ flush();
+ }
+ finally {
+ onClose(); // "onClose()" routine must be invoked anyway!
+ }
+ }
+ finally {
+ // Mark this stream closed AFTER flush.
+ closed = true;
+ }
+ }
+
+ /**
+ * Store data blocks in file.<br/>
+ * Note! If file concurrently deleted we'll get lost blocks.
+ *
+ * @param data Data to store.
+ * @throws GridException If failed.
+ */
+ protected abstract void storeDataBlock(ByteBuffer data) throws GridException, IOException;
+
+ /**
+ * Store data blocks in file reading appropriate number of bytes from given data input.
+ *
+ * @param in Data input to read from.
+ * @param len Data length to store.
+ * @throws GridException If failed.
+ */
+ protected abstract void storeDataBlocks(DataInput in, int len) throws GridException, IOException;
+
+ /**
+ * Close callback. It will be called only once in synchronized section.
+ *
+ * @throws IOException If failed.
+ */
+ protected void onClose() throws IOException {
+ // No-op.
+ }
+
+ /**
+ * Validate this stream is open.
+ *
+ * @throws IOException If this stream is closed.
+ */
+ private void checkClosed(@Nullable DataInput in, int len) throws IOException {
+ assert Thread.holdsLock(this);
+
+ if (closed) {
+ // Must read data from stream before throwing exception.
+ if (in != null)
+ in.skipBytes(len);
+
+ throw new IOException("Stream has been closed: " + this);
+ }
+ }
+
+ /**
+ * Send all local-buffered data to server.
+ *
+ * @param flip Whether to flip buffer on sending data. We do not want to flip it if sending wrapped
+ * byte array.
+ * @throws IOException In case of IO exception.
+ */
+ private void sendData(boolean flip) throws IOException {
+ assert Thread.holdsLock(this);
+
+ try {
+ if (flip)
+ buf.flip();
+
+ storeDataBlock(buf);
+ }
+ catch (GridException e) {
+ throw new IOException("Failed to store data into file: " + path, e);
+ }
+
+ buf = null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgniteFsOutputStreamAdapter.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e4cd4d8/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsOutputStreamImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsOutputStreamImpl.java
new file mode 100644
index 0000000..1ea8d91
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/IgniteFsOutputStreamImpl.java
@@ -0,0 +1,497 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.ggfs;
+
+import org.apache.ignite.*;
+import org.apache.ignite.lang.*;
+import org.gridgain.grid.*;
+import org.gridgain.grid.ggfs.*;
+import org.gridgain.grid.kernal.processors.task.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.gridgain.grid.util.future.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.nio.*;
+import java.util.concurrent.atomic.*;
+
+import static org.gridgain.grid.ggfs.IgniteFsMode.*;
+
+/**
+ * Output stream to store data into grid cache with separate blocks.
+ */
+class IgniteFsOutputStreamImpl extends IgniteFsOutputStreamAdapter {
+ /** Maximum number of blocks in buffer. */
+ private static final int MAX_BLOCKS_CNT = 16;
+
+ /** GGFS context. */
+ private GridGgfsContext ggfsCtx;
+
+ /** Meta info manager. */
+ private final GridGgfsMetaManager meta;
+
+ /** Data manager. */
+ private final GridGgfsDataManager data;
+
+ /** File descriptor. */
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ private GridGgfsFileInfo fileInfo;
+
+ /** Parent ID. */
+ private final IgniteUuid parentId;
+
+ /** File name. */
+ private final String fileName;
+
+ /** Space in file to write data. */
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ private long space;
+
+ /** Intermediate remainder to keep data. */
+ private byte[] remainder;
+
+ /** Data length in remainder. */
+ private int remainderDataLen;
+
+ /** Write completion future. */
+ private final IgniteFuture<Boolean> writeCompletionFut;
+
+ /** GGFS mode. */
+ private final IgniteFsMode mode;
+
+ /** File worker batch. */
+ private final GridGgfsFileWorkerBatch batch;
+
+ /** Ensures that onClose)_ routine is called no more than once. */
+ private final AtomicBoolean onCloseGuard = new AtomicBoolean();
+
+ /** Local GGFS metrics. */
+ private final GridGgfsLocalMetrics metrics;
+
+ /** Affinity written by this output stream. */
+ private GridGgfsFileAffinityRange streamRange;
+
+ /**
+ * Constructs file output stream.
+ *
+ * @param ggfsCtx GGFS context.
+ * @param path Path to stored file.
+ * @param fileInfo File info to write binary data to.
+ * @param bufSize The size of the buffer to be used.
+ * @param mode Grid GGFS mode.
+ * @param batch Optional secondary file system batch.
+ * @param metrics Local GGFs metrics.
+ * @throws GridException If stream creation failed.
+ */
+ IgniteFsOutputStreamImpl(GridGgfsContext ggfsCtx, IgniteFsPath path, GridGgfsFileInfo fileInfo, IgniteUuid parentId,
+ int bufSize, IgniteFsMode mode, @Nullable GridGgfsFileWorkerBatch batch, GridGgfsLocalMetrics metrics)
+ throws GridException {
+ super(path, optimizeBufferSize(bufSize, fileInfo));
+
+ assert fileInfo != null;
+ assert fileInfo.isFile() : "Unexpected file info: " + fileInfo;
+ assert mode != null && mode != PROXY;
+ assert mode == PRIMARY && batch == null || batch != null;
+ assert metrics != null;
+
+ // File hasn't been locked.
+ if (fileInfo.lockId() == null)
+ throw new IgniteFsException("Failed to acquire file lock (concurrently modified?): " + path);
+
+ this.ggfsCtx = ggfsCtx;
+ meta = ggfsCtx.meta();
+ data = ggfsCtx.data();
+
+ this.fileInfo = fileInfo;
+ this.mode = mode;
+ this.batch = batch;
+ this.parentId = parentId;
+ this.metrics = metrics;
+
+ streamRange = initialStreamRange(fileInfo);
+
+ fileName = path.name();
+
+ writeCompletionFut = data.writeStart(fileInfo);
+ }
+
+ /**
+ * Optimize buffer size.
+ *
+ * @param bufSize Requested buffer size.
+ * @param fileInfo File info.
+ * @return Optimized buffer size.
+ */
+ @SuppressWarnings("IfMayBeConditional")
+ private static int optimizeBufferSize(int bufSize, GridGgfsFileInfo fileInfo) {
+ assert bufSize > 0;
+
+ if (fileInfo == null)
+ return bufSize;
+
+ int blockSize = fileInfo.blockSize();
+
+ if (blockSize <= 0)
+ return bufSize;
+
+ if (bufSize <= blockSize)
+ // Optimize minimum buffer size to be equal file's block size.
+ return blockSize;
+
+ int maxBufSize = blockSize * MAX_BLOCKS_CNT;
+
+ if (bufSize > maxBufSize)
+ // There is no profit or optimization from larger buffers.
+ return maxBufSize;
+
+ if (fileInfo.length() == 0)
+ // Make buffer size multiple of block size (optimized for new files).
+ return bufSize / blockSize * blockSize;
+
+ return bufSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected synchronized void storeDataBlock(ByteBuffer block) throws GridException, IOException {
+ int writeLen = block.remaining();
+
+ preStoreDataBlocks(null, writeLen);
+
+ int blockSize = fileInfo.blockSize();
+
+ // If data length is not enough to fill full block, fill the remainder and return.
+ if (remainderDataLen + writeLen < blockSize) {
+ if (remainder == null)
+ remainder = new byte[blockSize];
+ else if (remainder.length != blockSize) {
+ assert remainderDataLen == remainder.length;
+
+ byte[] allocated = new byte[blockSize];
+
+ U.arrayCopy(remainder, 0, allocated, 0, remainder.length);
+
+ remainder = allocated;
+ }
+
+ block.get(remainder, remainderDataLen, writeLen);
+
+ remainderDataLen += writeLen;
+ }
+ else {
+ remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, block,
+ false, streamRange, batch);
+
+ remainderDataLen = remainder == null ? 0 : remainder.length;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected synchronized void storeDataBlocks(DataInput in, int len) throws GridException, IOException {
+ preStoreDataBlocks(in, len);
+
+ int blockSize = fileInfo.blockSize();
+
+ // If data length is not enough to fill full block, fill the remainder and return.
+ if (remainderDataLen + len < blockSize) {
+ if (remainder == null)
+ remainder = new byte[blockSize];
+ else if (remainder.length != blockSize) {
+ assert remainderDataLen == remainder.length;
+
+ byte[] allocated = new byte[blockSize];
+
+ U.arrayCopy(remainder, 0, allocated, 0, remainder.length);
+
+ remainder = allocated;
+ }
+
+ in.readFully(remainder, remainderDataLen, len);
+
+ remainderDataLen += len;
+ }
+ else {
+ remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, in, len,
+ false, streamRange, batch);
+
+ remainderDataLen = remainder == null ? 0 : remainder.length;
+ }
+ }
+
+ /**
+ * Initializes data loader if it was not initialized yet and updates written space.
+ *
+ * @param len Data length to be written.
+ */
+ private void preStoreDataBlocks(@Nullable DataInput in, int len) throws GridException, IOException {
+ // Check if any exception happened while writing data.
+ if (writeCompletionFut.isDone()) {
+ assert ((GridFutureAdapter)writeCompletionFut).isFailed();
+
+ if (in != null)
+ in.skipBytes(len);
+
+ writeCompletionFut.get();
+ }
+
+ bytes += len;
+ space += len;
+ }
+
+ /**
+ * Flushes this output stream and forces any buffered output bytes to be written out.
+ *
+ * @exception IOException if an I/O error occurs.
+ */
+ @Override public synchronized void flush() throws IOException {
+ boolean exists;
+
+ try {
+ exists = meta.exists(fileInfo.id());
+ }
+ catch (GridException e) {
+ throw new IOError(e); // Something unrecoverable.
+ }
+
+ if (!exists) {
+ onClose(true);
+
+ throw new IOException("File was concurrently deleted: " + path);
+ }
+
+ super.flush();
+
+ try {
+ if (remainder != null) {
+ data.storeDataBlocks(fileInfo, fileInfo.length() + space, null, 0,
+ ByteBuffer.wrap(remainder, 0, remainderDataLen), true, streamRange, batch);
+
+ remainder = null;
+ remainderDataLen = 0;
+ }
+
+ if (space > 0) {
+ GridGgfsFileInfo fileInfo0 = meta.updateInfo(fileInfo.id(),
+ new ReserveSpaceClosure(space, streamRange));
+
+ if (fileInfo0 == null)
+ throw new IOException("File was concurrently deleted: " + path);
+ else
+ fileInfo = fileInfo0;
+
+ streamRange = initialStreamRange(fileInfo);
+
+ space = 0;
+ }
+ }
+ catch (GridException e) {
+ throw new IOException("Failed to flush data [path=" + path + ", space=" + space + ']', e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void onClose() throws IOException {
+ onClose(false);
+ }
+
+ /**
+ * Close callback. It will be called only once in synchronized section.
+ *
+ * @param deleted Whether we already know that the file was deleted.
+ * @throws IOException If failed.
+ */
+ private void onClose(boolean deleted) throws IOException {
+ assert Thread.holdsLock(this);
+
+ if (onCloseGuard.compareAndSet(false, true)) {
+ // Notify backing secondary file system batch to finish.
+ if (mode != PRIMARY) {
+ assert batch != null;
+
+ batch.finish();
+ }
+
+ // Ensure file existence.
+ boolean exists;
+
+ try {
+ exists = !deleted && meta.exists(fileInfo.id());
+ }
+ catch (GridException e) {
+ throw new IOError(e); // Something unrecoverable.
+ }
+
+ if (exists) {
+ IOException err = null;
+
+ try {
+ data.writeClose(fileInfo);
+
+ writeCompletionFut.get();
+ }
+ catch (GridException e) {
+ err = new IOException("Failed to close stream [path=" + path + ", fileInfo=" + fileInfo + ']', e);
+ }
+
+ metrics.addWrittenBytesTime(bytes, time);
+
+ // Await secondary file system processing to finish.
+ if (mode == DUAL_SYNC) {
+ try {
+ batch.await();
+ }
+ catch (GridException e) {
+ if (err == null)
+ err = new IOException("Failed to close secondary file system stream [path=" + path +
+ ", fileInfo=" + fileInfo + ']', e);
+ }
+ }
+
+ long modificationTime = System.currentTimeMillis();
+
+ try {
+ meta.unlock(fileInfo, modificationTime);
+ }
+ catch (IgniteFsFileNotFoundException ignore) {
+ data.delete(fileInfo); // Safety to ensure that all data blocks are deleted.
+
+ throw new IOException("File was concurrently deleted: " + path);
+ }
+ catch (GridException e) {
+ throw new IOError(e); // Something unrecoverable.
+ }
+
+ meta.updateParentListingAsync(parentId, fileInfo.id(), fileName, bytes, modificationTime);
+
+ if (err != null)
+ throw err;
+ }
+ else {
+ try {
+ if (mode == DUAL_SYNC)
+ batch.await();
+ }
+ catch (GridException e) {
+ throw new IOException("Failed to close secondary file system stream [path=" + path +
+ ", fileInfo=" + fileInfo + ']', e);
+ }
+ finally {
+ data.delete(fileInfo);
+ }
+ }
+ }
+ }
+
+ /**
+ * Gets initial affinity range. This range will have 0 length and will start from first
+ * non-occupied file block.
+ *
+ * @param fileInfo File info to build initial range for.
+ * @return Affinity range.
+ */
+ private GridGgfsFileAffinityRange initialStreamRange(GridGgfsFileInfo fileInfo) {
+ if (!ggfsCtx.configuration().isFragmentizerEnabled())
+ return null;
+
+ if (!Boolean.parseBoolean(fileInfo.properties().get(IgniteFs.PROP_PREFER_LOCAL_WRITES)))
+ return null;
+
+ int blockSize = fileInfo.blockSize();
+
+ // Find first non-occupied block offset.
+ long off = ((fileInfo.length() + blockSize - 1) / blockSize) * blockSize;
+
+ // Need to get last affinity key and reuse it if we are on the same node.
+ long lastBlockOff = off - fileInfo.blockSize();
+
+ if (lastBlockOff < 0)
+ lastBlockOff = 0;
+
+ GridGgfsFileMap map = fileInfo.fileMap();
+
+ IgniteUuid prevAffKey = map == null ? null : map.affinityKey(lastBlockOff, false);
+
+ IgniteUuid affKey = data.nextAffinityKey(prevAffKey);
+
+ return affKey == null ? null : new GridGgfsFileAffinityRange(off, off, affKey);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgniteFsOutputStreamImpl.class, this);
+ }
+
+ /**
+ * Helper closure to reserve specified space and update file's length
+ */
+ @GridInternal
+ private static final class ReserveSpaceClosure implements IgniteClosure<GridGgfsFileInfo, GridGgfsFileInfo>,
+ Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Space amount (bytes number) to increase file's length. */
+ private long space;
+
+ /** Affinity range for this particular update. */
+ private GridGgfsFileAffinityRange range;
+
+ /**
+ * Empty constructor required for {@link Externalizable}.
+ *
+ */
+ public ReserveSpaceClosure() {
+ // No-op.
+ }
+
+ /**
+ * Constructs the closure to reserve specified space and update file's length.
+ *
+ * @param space Space amount (bytes number) to increase file's length.
+ * @param range Affinity range specifying which part of file was colocated.
+ */
+ private ReserveSpaceClosure(long space, GridGgfsFileAffinityRange range) {
+ this.space = space;
+ this.range = range;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridGgfsFileInfo apply(GridGgfsFileInfo oldInfo) {
+ GridGgfsFileMap oldMap = oldInfo.fileMap();
+
+ GridGgfsFileMap newMap = new GridGgfsFileMap(oldMap);
+
+ newMap.addRange(range);
+
+ // Update file length.
+ GridGgfsFileInfo updated = new GridGgfsFileInfo(oldInfo, oldInfo.length() + space);
+
+ updated.fileMap(newMap);
+
+ return updated;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeLong(space);
+ out.writeObject(range);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ space = in.readLong();
+ range = (GridGgfsFileAffinityRange)in.readObject();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ReserveSpaceClosure.class, this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e4cd4d8/modules/core/src/main/java/org/gridgain/grid/kernal/processors/job/GridJobWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/job/GridJobWorker.java
index c90e959..6a932f4 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/job/GridJobWorker.java
@@ -488,7 +488,7 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
assert ex != null;
}
else {
- if (X.hasCause(e, GridInternalException.class) || X.hasCause(e, GridGgfsOutOfSpaceException.class)) {
+ if (X.hasCause(e, GridInternalException.class) || X.hasCause(e, IgniteFsOutOfSpaceException.class)) {
// Print exception for internal errors only if debug is enabled.
if (log.isDebugEnabled())
U.error(log, "Failed to execute job [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e4cd4d8/modules/core/src/main/java/org/gridgain/grid/kernal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/task/GridTaskWorker.java
index 4d18c4b..9bf93e0 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/task/GridTaskWorker.java
@@ -826,7 +826,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
}
catch (GridException e) {
if (X.hasCause(e, GridInternalException.class) ||
- X.hasCause(e, GridGgfsOutOfSpaceException.class)) {
+ X.hasCause(e, IgniteFsOutOfSpaceException.class)) {
// Print internal exceptions only if debug is enabled.
if (log.isDebugEnabled())
U.error(log, "Failed to obtain remote job result policy for result from " +
@@ -851,7 +851,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
}
catch (GridRuntimeException e) {
if (X.hasCause(e, GridInternalException.class) ||
- X.hasCause(e, GridGgfsOutOfSpaceException.class)) {
+ X.hasCause(e, IgniteFsOutOfSpaceException.class)) {
// Print internal exceptions only if debug is enabled.
if (log.isDebugEnabled())
U.error(log, "Failed to obtain remote job result policy for result from " +
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e4cd4d8/modules/core/src/main/java/org/gridgain/grid/kernal/visor/ggfs/VisorGgfs.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/visor/ggfs/VisorGgfs.java b/modules/core/src/main/java/org/gridgain/grid/kernal/visor/ggfs/VisorGgfs.java
index 8f0d716..9d537de 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/visor/ggfs/VisorGgfs.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/visor/ggfs/VisorGgfs.java
@@ -27,7 +27,7 @@ public class VisorGgfs implements Serializable {
private final String name;
/** GGFS instance working mode. */
- private final GridGgfsMode mode;
+ private final IgniteFsMode mode;
/** GGFS metrics. */
private final VisorGgfsMetrics metrics;
@@ -45,7 +45,7 @@ public class VisorGgfs implements Serializable {
*/
public VisorGgfs(
String name,
- GridGgfsMode mode,
+ IgniteFsMode mode,
VisorGgfsMetrics metrics,
boolean secondaryFsConfigured
) {
@@ -81,7 +81,7 @@ public class VisorGgfs implements Serializable {
/**
* @return GGFS instance working mode.
*/
- public GridGgfsMode mode() {
+ public IgniteFsMode mode() {
return mode;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e4cd4d8/modules/core/src/main/java/org/gridgain/grid/kernal/visor/ggfs/VisorGgfsProfiler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/visor/ggfs/VisorGgfsProfiler.java b/modules/core/src/main/java/org/gridgain/grid/kernal/visor/ggfs/VisorGgfsProfiler.java
index 5020bec..e1c8c14 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/visor/ggfs/VisorGgfsProfiler.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/visor/ggfs/VisorGgfsProfiler.java
@@ -48,7 +48,7 @@ public class VisorGgfsProfiler {
long bytesWritten = 0;
long writeTime = 0;
long userWriteTime = 0;
- GridGgfsMode mode = null;
+ IgniteFsMode mode = null;
VisorGgfsProfilerUniformityCounters counters = new VisorGgfsProfilerUniformityCounters();
for (VisorGgfsProfilerEntry entry : entries) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e4cd4d8/modules/core/src/main/java/org/gridgain/grid/kernal/visor/ggfs/VisorGgfsProfilerEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/visor/ggfs/VisorGgfsProfilerEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/visor/ggfs/VisorGgfsProfilerEntry.java
index 773cc9c..b027627 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/visor/ggfs/VisorGgfsProfilerEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/visor/ggfs/VisorGgfsProfilerEntry.java
@@ -37,7 +37,7 @@ public class VisorGgfsProfilerEntry implements Serializable {
private final long timestamp;
/** GGFS mode. */
- private final GridGgfsMode mode;
+ private final IgniteFsMode mode;
/** File size. */
private final long size;
@@ -76,7 +76,7 @@ public class VisorGgfsProfilerEntry implements Serializable {
public VisorGgfsProfilerEntry(
String path,
long timestamp,
- GridGgfsMode mode,
+ IgniteFsMode mode,
long size,
long bytesRead,
long readTime,
@@ -139,7 +139,7 @@ public class VisorGgfsProfilerEntry implements Serializable {
/**
* @return GGFS mode.
*/
- public GridGgfsMode mode() {
+ public IgniteFsMode mode() {
return mode;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e4cd4d8/modules/core/src/main/java/org/gridgain/grid/kernal/visor/ggfs/VisorGgfsProfilerTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/visor/ggfs/VisorGgfsProfilerTask.java b/modules/core/src/main/java/org/gridgain/grid/kernal/visor/ggfs/VisorGgfsProfilerTask.java
index 5ae5348..c82ebd9 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/visor/ggfs/VisorGgfsProfilerTask.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/visor/ggfs/VisorGgfsProfilerTask.java
@@ -46,7 +46,7 @@ public class VisorGgfsProfilerTask extends VisorOneNodeTask<String, Collection<V
private final String path;
/** File GGFS mode. */
- private final GridGgfsMode mode;
+ private final IgniteFsMode mode;
/** Stream ID. */
private final long streamId;
@@ -76,7 +76,7 @@ public class VisorGgfsProfilerTask extends VisorOneNodeTask<String, Collection<V
long ts,
int entryType,
String path,
- GridGgfsMode mode,
+ IgniteFsMode mode,
long streamId,
long dataLen,
boolean overwrite,
@@ -234,13 +234,13 @@ public class VisorGgfsProfilerTask extends VisorOneNodeTask<String, Collection<V
* @param ix Index of array item to parse.
* @return Parsed GGFS mode or {@code null} if string is empty.
*/
- private GridGgfsMode parseGgfsMode(String[] ss, int ix) {
+ private IgniteFsMode parseGgfsMode(String[] ss, int ix) {
if (ss.length <= ix)
return null;
else {
String s = ss[ix];
- return s.isEmpty() ? null : GridGgfsMode.valueOf(s);
+ return s.isEmpty() ? null : IgniteFsMode.valueOf(s);
}
}
@@ -295,7 +295,7 @@ public class VisorGgfsProfilerTask extends VisorOneNodeTask<String, Collection<V
long bytesWritten = 0;
long writeTime = 0;
long userWriteTime = 0;
- GridGgfsMode mode = null;
+ IgniteFsMode mode = null;
for (VisorGgfsProfilerParsedLine line : lines) {
if (!line.path.isEmpty())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e4cd4d8/modules/core/src/main/java/org/gridgain/grid/kernal/visor/node/VisorGgfsConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/visor/node/VisorGgfsConfiguration.java b/modules/core/src/main/java/org/gridgain/grid/kernal/visor/node/VisorGgfsConfiguration.java
index 0ac5d5d..2c75301 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/visor/node/VisorGgfsConfiguration.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/visor/node/VisorGgfsConfiguration.java
@@ -62,10 +62,10 @@ public class VisorGgfsConfiguration implements Serializable {
private String secondaryHadoopFileSystemConfigPath;
/** GGFS instance mode. */
- private GridGgfsMode defaultMode;
+ private IgniteFsMode defaultMode;
/** Map of paths to GGFS modes. */
- private Map<String, GridGgfsMode> pathModes;
+ private Map<String, IgniteFsMode> pathModes;
/** Dual mode PUT operations executor service. */
private String dualModePutExecutorService;
@@ -322,28 +322,28 @@ public class VisorGgfsConfiguration implements Serializable {
/**
* @return GGFS instance mode.
*/
- public GridGgfsMode defaultMode() {
+ public IgniteFsMode defaultMode() {
return defaultMode;
}
/**
* @param dfltMode New gGFS instance mode.
*/
- public void defaultMode(GridGgfsMode dfltMode) {
+ public void defaultMode(IgniteFsMode dfltMode) {
defaultMode = dfltMode;
}
/**
* @return Map of paths to GGFS modes.
*/
- @Nullable public Map<String, GridGgfsMode> pathModes() {
+ @Nullable public Map<String, IgniteFsMode> pathModes() {
return pathModes;
}
/**
* @param pathModes New map of paths to GGFS modes.
*/
- public void pathModes(@Nullable Map<String, GridGgfsMode> pathModes) {
+ public void pathModes(@Nullable Map<String, IgniteFsMode> pathModes) {
this.pathModes = pathModes;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e4cd4d8/modules/core/src/test/java/org/gridgain/grid/ggfs/GridGgfsEventsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/ggfs/GridGgfsEventsAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/ggfs/GridGgfsEventsAbstractSelfTest.java
index bf46e18..44df918 100644
--- a/modules/core/src/test/java/org/gridgain/grid/ggfs/GridGgfsEventsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/ggfs/GridGgfsEventsAbstractSelfTest.java
@@ -776,7 +776,7 @@ public abstract class GridGgfsEventsAbstractSelfTest extends GridCommonAbstractT
byte[] buf = new byte[dataSize];
// Will generate GGFS_FILE_CREATED, GGFS_FILE_OPENED_WRITE, GGFS_FILE_CLOSED_WRITE.
- try (GridGgfsOutputStream os = ggfs.create(file, false)) {
+ try (IgniteFsOutputStream os = ggfs.create(file, false)) {
os.write(buf); // Will generate no events.
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e4cd4d8/modules/core/src/test/java/org/gridgain/grid/ggfs/GridGgfsFragmentizerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/ggfs/GridGgfsFragmentizerSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/ggfs/GridGgfsFragmentizerSelfTest.java
index 08e88b8..dfb99d6 100644
--- a/modules/core/src/test/java/org/gridgain/grid/ggfs/GridGgfsFragmentizerSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/ggfs/GridGgfsFragmentizerSelfTest.java
@@ -32,7 +32,7 @@ public class GridGgfsFragmentizerSelfTest extends GridGgfsFragmentizerAbstractSe
IgniteFsPath path = new IgniteFsPath("/someFile");
- try (GridGgfsOutputStream out = ggfs.create(path, true)) {
+ try (IgniteFsOutputStream out = ggfs.create(path, true)) {
// Write 10 groups.
for (int i = 0; i < 10 * GGFS_GROUP_SIZE; i++) {
byte[] data = new byte[GGFS_BLOCK_SIZE];
@@ -102,7 +102,7 @@ public class GridGgfsFragmentizerSelfTest extends GridGgfsFragmentizerAbstractSe
while (written < fileSize) {
IgniteFs ggfs = grid(ggfsIdx).fileSystem("ggfs");
- try (GridGgfsOutputStream out = ggfs.append(path, true)) {
+ try (IgniteFsOutputStream out = ggfs.append(path, true)) {
byte[] data = new byte[chunkSize];
Arrays.fill(data, (byte)i);
@@ -177,7 +177,7 @@ public class GridGgfsFragmentizerSelfTest extends GridGgfsFragmentizerAbstractSe
byte[] chunk = new byte[chunkSize];
while (written < fileSize) {
- try (GridGgfsOutputStream out = ggfs.append(path, true)) {
+ try (IgniteFsOutputStream out = ggfs.append(path, true)) {
for (int i = 0; i < 8; i++) {
Arrays.fill(chunk, (byte)cnt);
@@ -220,7 +220,7 @@ public class GridGgfsFragmentizerSelfTest extends GridGgfsFragmentizerAbstractSe
for (int i = 0; i < 30; i++) {
IgniteFsPath path = new IgniteFsPath("/someFile" + i);
- try (GridGgfsOutputStream out = ggfs.create(path, true)) {
+ try (IgniteFsOutputStream out = ggfs.create(path, true)) {
for (int j = 0; j < 5 * GGFS_GROUP_SIZE; j++)
out.write(new byte[GGFS_BLOCK_SIZE]);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e4cd4d8/modules/core/src/test/java/org/gridgain/grid/ggfs/GridGgfsFragmentizerTopologySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/ggfs/GridGgfsFragmentizerTopologySelfTest.java b/modules/core/src/test/java/org/gridgain/grid/ggfs/GridGgfsFragmentizerTopologySelfTest.java
index f8b2b01..4d99ac8 100644
--- a/modules/core/src/test/java/org/gridgain/grid/ggfs/GridGgfsFragmentizerTopologySelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/ggfs/GridGgfsFragmentizerTopologySelfTest.java
@@ -27,7 +27,7 @@ public class GridGgfsFragmentizerTopologySelfTest extends GridGgfsFragmentizerAb
IgniteFs ggfs = grid(1).fileSystem("ggfs");
- try (GridGgfsOutputStream out = ggfs.create(path, true)) {
+ try (IgniteFsOutputStream out = ggfs.create(path, true)) {
for (int i = 0; i < 10 * GGFS_GROUP_SIZE; i++)
out.write(new byte[GGFS_BLOCK_SIZE]);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e4cd4d8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridCacheGgfsPerBlockLruEvictionPolicySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridCacheGgfsPerBlockLruEvictionPolicySelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridCacheGgfsPerBlockLruEvictionPolicySelfTest.java
index 01d6586..f0f4965 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridCacheGgfsPerBlockLruEvictionPolicySelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridCacheGgfsPerBlockLruEvictionPolicySelfTest.java
@@ -28,7 +28,7 @@ import java.util.concurrent.*;
import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
import static org.gridgain.grid.cache.GridCacheMode.*;
-import static org.gridgain.grid.ggfs.GridGgfsMode.*;
+import static org.gridgain.grid.ggfs.IgniteFsMode.*;
/**
* Tests for GGFS per-block LR eviction policy.
@@ -85,7 +85,7 @@ public class GridCacheGgfsPerBlockLruEvictionPolicySelfTest extends GridGgfsComm
ggfsCfg.setSequentialReadsBeforePrefetch(Integer.MAX_VALUE);
ggfsCfg.setSecondaryFileSystem(secondaryFs);
- Map<String, GridGgfsMode> pathModes = new HashMap<>();
+ Map<String, IgniteFsMode> pathModes = new HashMap<>();
pathModes.put(FILE_RMT.toString(), DUAL_SYNC);
@@ -437,7 +437,7 @@ public class GridCacheGgfsPerBlockLruEvictionPolicySelfTest extends GridGgfsComm
* @throws Exception If failed.
*/
private void append(IgniteFsPath path, int len) throws Exception {
- GridGgfsOutputStream os = ggfsPrimary.append(path, false);
+ IgniteFsOutputStream os = ggfsPrimary.append(path, false);
os.write(new byte[len]);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e4cd4d8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAbstractSelfTest.java
index f722fdd..e64b9b3 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAbstractSelfTest.java
@@ -34,7 +34,7 @@ import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
import static org.gridgain.grid.cache.GridCacheMemoryMode.*;
import static org.gridgain.grid.cache.GridCacheMode.*;
import static org.apache.ignite.IgniteFs.*;
-import static org.gridgain.grid.ggfs.GridGgfsMode.*;
+import static org.gridgain.grid.ggfs.IgniteFsMode.*;
/**
* Test fo regular GGFs operations.
@@ -104,7 +104,7 @@ public abstract class GridGgfsAbstractSelfTest extends GridGgfsCommonAbstractTes
protected static GridGgfsImpl ggfsSecondary;
/** GGFS mode. */
- protected final GridGgfsMode mode;
+ protected final IgniteFsMode mode;
/** Dual mode flag. */
protected final boolean dual;
@@ -117,11 +117,11 @@ public abstract class GridGgfsAbstractSelfTest extends GridGgfsCommonAbstractTes
*
* @param mode GGFS mode.
*/
- protected GridGgfsAbstractSelfTest(GridGgfsMode mode) {
+ protected GridGgfsAbstractSelfTest(IgniteFsMode mode) {
this(mode, ONHEAP_TIERED);
}
- protected GridGgfsAbstractSelfTest(GridGgfsMode mode, GridCacheMemoryMode memoryMode) {
+ protected GridGgfsAbstractSelfTest(IgniteFsMode mode, GridCacheMemoryMode memoryMode) {
assert mode != null && mode != PROXY;
this.mode = mode;
@@ -167,7 +167,7 @@ public abstract class GridGgfsAbstractSelfTest extends GridGgfsCommonAbstractTes
* @return Started grid instance.
* @throws Exception If failed.
*/
- protected Ignite startGridWithGgfs(String gridName, String ggfsName, GridGgfsMode mode,
+ protected Ignite startGridWithGgfs(String gridName, String ggfsName, IgniteFsMode mode,
@Nullable IgniteFsFileSystem secondaryFs, @Nullable String restCfg) throws Exception {
IgniteFsConfiguration ggfsCfg = new IgniteFsConfiguration();
@@ -766,7 +766,7 @@ public abstract class GridGgfsAbstractSelfTest extends GridGgfsCommonAbstractTes
create(ggfs, paths(DIR, SUBDIR), paths(FILE));
- try (GridGgfsOutputStream os = ggfs.append(FILE, false)) {
+ try (IgniteFsOutputStream os = ggfs.append(FILE, false)) {
os.write(new byte[10 * 1024 * 1024]);
}
@@ -903,8 +903,8 @@ public abstract class GridGgfsAbstractSelfTest extends GridGgfsCommonAbstractTes
GridTestUtils.assertThrows(log(), new Callable<Object>() {
@Override public Object call() throws Exception {
- GridGgfsOutputStream os1 = null;
- GridGgfsOutputStream os2 = null;
+ IgniteFsOutputStream os1 = null;
+ IgniteFsOutputStream os2 = null;
try {
os1 = ggfs.create(FILE, true);
@@ -928,7 +928,7 @@ public abstract class GridGgfsAbstractSelfTest extends GridGgfsCommonAbstractTes
public void testCreateRenameNoClose() throws Exception {
create(ggfs, paths(DIR, SUBDIR), null);
- GridGgfsOutputStream os = null;
+ IgniteFsOutputStream os = null;
try {
os = ggfs.create(FILE, true);
@@ -950,7 +950,7 @@ public abstract class GridGgfsAbstractSelfTest extends GridGgfsCommonAbstractTes
public void testCreateRenameParentNoClose() throws Exception {
create(ggfs, paths(DIR, SUBDIR), null);
- GridGgfsOutputStream os = null;
+ IgniteFsOutputStream os = null;
try {
os = ggfs.create(FILE, true);
@@ -974,7 +974,7 @@ public abstract class GridGgfsAbstractSelfTest extends GridGgfsCommonAbstractTes
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
- GridGgfsOutputStream os = null;
+ IgniteFsOutputStream os = null;
try {
os = ggfs.create(FILE, true);
@@ -1004,7 +1004,7 @@ public abstract class GridGgfsAbstractSelfTest extends GridGgfsCommonAbstractTes
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
- GridGgfsOutputStream os = null;
+ IgniteFsOutputStream os = null;
try {
os = ggfs.create(FILE, true);
@@ -1037,7 +1037,7 @@ public abstract class GridGgfsAbstractSelfTest extends GridGgfsCommonAbstractTes
create(ggfs, paths(DIR, SUBDIR), null);
- GridGgfsOutputStream os = null;
+ IgniteFsOutputStream os = null;
try {
os = ggfs.create(FILE, true);
@@ -1070,7 +1070,7 @@ public abstract class GridGgfsAbstractSelfTest extends GridGgfsCommonAbstractTes
try {
for (int i = 0; i < REPEAT_CNT; i++) {
- GridGgfsOutputStream os = ggfs.create(path, 128, true, null, 0, 256, null);
+ IgniteFsOutputStream os = ggfs.create(path, 128, true, null, 0, 256, null);
os.write(chunk);
@@ -1111,7 +1111,7 @@ public abstract class GridGgfsAbstractSelfTest extends GridGgfsCommonAbstractTes
IgniteFuture<?> fut = multithreadedAsync(new Runnable() {
@Override public void run() {
while (!stop.get()) {
- GridGgfsOutputStream os = null;
+ IgniteFsOutputStream os = null;
try {
os = ggfs.create(FILE, true);
@@ -1203,8 +1203,8 @@ public abstract class GridGgfsAbstractSelfTest extends GridGgfsCommonAbstractTes
GridTestUtils.assertThrowsInherited(log(), new Callable<Object>() {
@Override public Object call() throws Exception {
- GridGgfsOutputStream os1 = null;
- GridGgfsOutputStream os2 = null;
+ IgniteFsOutputStream os1 = null;
+ IgniteFsOutputStream os2 = null;
try {
os1 = ggfs.append(FILE, false);
@@ -1230,7 +1230,7 @@ public abstract class GridGgfsAbstractSelfTest extends GridGgfsCommonAbstractTes
createFile(ggfs, FILE, false);
- GridGgfsOutputStream os = null;
+ IgniteFsOutputStream os = null;
try {
os = ggfs.append(FILE, false);
@@ -1254,7 +1254,7 @@ public abstract class GridGgfsAbstractSelfTest extends GridGgfsCommonAbstractTes
createFile(ggfs, FILE, false);
- GridGgfsOutputStream os = null;
+ IgniteFsOutputStream os = null;
try {
os = ggfs.append(FILE, false);
@@ -1280,7 +1280,7 @@ public abstract class GridGgfsAbstractSelfTest extends GridGgfsCommonAbstractTes
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
- GridGgfsOutputStream os = null;
+ IgniteFsOutputStream os = null;
try {
os = ggfs.append(FILE, false);
@@ -1312,7 +1312,7 @@ public abstract class GridGgfsAbstractSelfTest extends GridGgfsCommonAbstractTes
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
- GridGgfsOutputStream os = null;
+ IgniteFsOutputStream os = null;
try {
IgniteUuid id = ggfs.context().meta().fileId(FILE);
@@ -1349,7 +1349,7 @@ public abstract class GridGgfsAbstractSelfTest extends GridGgfsCommonAbstractTes
createFile(ggfs, FILE, false);
- GridGgfsOutputStream os = null;
+ IgniteFsOutputStream os = null;
try {
os = ggfs.append(FILE, false);
@@ -1389,7 +1389,7 @@ public abstract class GridGgfsAbstractSelfTest extends GridGgfsCommonAbstractTes
for (int i = 0; i < REPEAT_CNT; i++) {
chunks[i] = chunk;
- GridGgfsOutputStream os = ggfs.append(path, false);
+ IgniteFsOutputStream os = ggfs.append(path, false);
os.write(chunk);
@@ -1430,7 +1430,7 @@ public abstract class GridGgfsAbstractSelfTest extends GridGgfsCommonAbstractTes
IgniteFuture<?> fut = multithreadedAsync(new Runnable() {
@Override public void run() {
while (!stop.get()) {
- GridGgfsOutputStream os = null;
+ IgniteFsOutputStream os = null;
try {
os = ggfs.append(FILE, false);
@@ -1490,7 +1490,7 @@ public abstract class GridGgfsAbstractSelfTest extends GridGgfsCommonAbstractTes
public void testStop() throws Exception {
create(ggfs, paths(DIR, SUBDIR), null);
- GridGgfsOutputStream os = ggfs.create(FILE, true);
+ IgniteFsOutputStream os = ggfs.create(FILE, true);
os.write(chunk);
@@ -2077,7 +2077,7 @@ public abstract class GridGgfsAbstractSelfTest extends GridGgfsCommonAbstractTes
U.awaitQuiet(barrier);
- GridGgfsOutputStream os = null;
+ IgniteFsOutputStream os = null;
try {
os = ggfs.create(path, true);
@@ -2178,7 +2178,7 @@ public abstract class GridGgfsAbstractSelfTest extends GridGgfsCommonAbstractTes
*/
protected void createFile(IgniteFs ggfs, IgniteFsPath file, boolean overwrite, long blockSize,
@Nullable byte[]... chunks) throws Exception {
- GridGgfsOutputStream os = null;
+ IgniteFsOutputStream os = null;
try {
os = ggfs.create(file, 256, overwrite, null, 0, blockSize, null);
@@ -2202,7 +2202,7 @@ public abstract class GridGgfsAbstractSelfTest extends GridGgfsCommonAbstractTes
*/
protected void appendFile(IgniteFs ggfs, IgniteFsPath file, @Nullable byte[]... chunks)
throws Exception {
- GridGgfsOutputStream os = null;
+ IgniteFsOutputStream os = null;
try {
os = ggfs.append(file, false);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e4cd4d8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAttributesSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAttributesSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAttributesSelfTest.java
index 140331c..35cabd4 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAttributesSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsAttributesSelfTest.java
@@ -15,7 +15,7 @@ import java.io.*;
import java.lang.reflect.*;
import java.util.*;
-import static org.gridgain.grid.ggfs.GridGgfsMode.*;
+import static org.gridgain.grid.ggfs.IgniteFsMode.*;
/**
* {@link GridGgfsAttributes} test case.
@@ -25,7 +25,7 @@ public class GridGgfsAttributesSelfTest extends GridGgfsCommonAbstractTest {
* @throws Exception If failed.
*/
public void testSerialization() throws Exception {
- Map<String, GridGgfsMode> pathModes = new HashMap<>();
+ Map<String, IgniteFsMode> pathModes = new HashMap<>();
pathModes.put("path1", PRIMARY);
pathModes.put("path2", PROXY);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e4cd4d8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDualAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDualAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDualAbstractSelfTest.java
index 9df3b88..99feec6 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDualAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDualAbstractSelfTest.java
@@ -21,7 +21,7 @@ import java.util.*;
import java.util.concurrent.*;
import static org.apache.ignite.IgniteFs.*;
-import static org.gridgain.grid.ggfs.GridGgfsMode.*;
+import static org.gridgain.grid.ggfs.IgniteFsMode.*;
/**
* Tests for GGFS working in mode when remote file system exists: DUAL_SYNC, DUAL_ASYNC.
@@ -32,7 +32,7 @@ public abstract class GridGgfsDualAbstractSelfTest extends GridGgfsAbstractSelfT
*
* @param mode GGFS mode.
*/
- protected GridGgfsDualAbstractSelfTest(GridGgfsMode mode) {
+ protected GridGgfsDualAbstractSelfTest(IgniteFsMode mode) {
super(mode);
assert mode == DUAL_SYNC || mode == DUAL_ASYNC;
@@ -1128,7 +1128,7 @@ public abstract class GridGgfsDualAbstractSelfTest extends GridGgfsAbstractSelfT
// Write enough data to the secondary file system.
final int blockSize = GGFS_BLOCK_SIZE;
- GridGgfsOutputStream out = ggfsSecondary.append(FILE, false);
+ IgniteFsOutputStream out = ggfsSecondary.append(FILE, false);
int totalWritten = 0;
@@ -1207,7 +1207,7 @@ public abstract class GridGgfsDualAbstractSelfTest extends GridGgfsAbstractSelfT
// Write enough data to the secondary file system.
final int blockSize = ggfs.info(FILE).blockSize();
- GridGgfsOutputStream out = ggfsSecondary.append(FILE, false);
+ IgniteFsOutputStream out = ggfsSecondary.append(FILE, false);
int totalWritten = 0;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e4cd4d8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDualAsyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDualAsyncSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDualAsyncSelfTest.java
index 2aa2f30..01bc9f2 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDualAsyncSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDualAsyncSelfTest.java
@@ -9,7 +9,7 @@
package org.gridgain.grid.kernal.processors.ggfs;
-import static org.gridgain.grid.ggfs.GridGgfsMode.*;
+import static org.gridgain.grid.ggfs.IgniteFsMode.*;
/**
* Tests for DUAL_ASYNC mode.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e4cd4d8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDualSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDualSyncSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDualSyncSelfTest.java
index 83d5a14..c9d5e64 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDualSyncSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsDualSyncSelfTest.java
@@ -9,7 +9,7 @@
package org.gridgain.grid.kernal.processors.ggfs;
-import static org.gridgain.grid.ggfs.GridGgfsMode.*;
+import static org.gridgain.grid.ggfs.IgniteFsMode.*;
/**
* Tests for DUAL_SYNC mode.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e4cd4d8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsMetricsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsMetricsSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsMetricsSelfTest.java
index efd5418..42d4cb8 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsMetricsSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsMetricsSelfTest.java
@@ -24,7 +24,7 @@ import java.util.*;
import static org.gridgain.grid.cache.GridCacheAtomicityMode.*;
import static org.gridgain.grid.cache.GridCacheMode.*;
-import static org.gridgain.grid.ggfs.GridGgfsMode.*;
+import static org.gridgain.grid.ggfs.IgniteFsMode.*;
/**
* Test for GGFS metrics.
@@ -100,7 +100,7 @@ public class GridGgfsMetricsSelfTest extends GridGgfsCommonAbstractTest {
ggfsCfg.setDefaultMode(PRIMARY);
ggfsCfg.setSecondaryFileSystem(ggfsSecondary);
- Map<String, GridGgfsMode> pathModes = new HashMap<>();
+ Map<String, IgniteFsMode> pathModes = new HashMap<>();
pathModes.put("/fileRemote", DUAL_SYNC);
@@ -232,9 +232,9 @@ public class GridGgfsMetricsSelfTest extends GridGgfsCommonAbstractTest {
assertEquals(0, m.filesOpenedForRead());
assertEquals(0, m.filesOpenedForWrite());
- GridGgfsOutputStream out1 = fs.create(new IgniteFsPath("/dir1/file1"), false);
- GridGgfsOutputStream out2 = fs.create(new IgniteFsPath("/dir1/file2"), false);
- GridGgfsOutputStream out3 = fs.create(new IgniteFsPath("/dir1/dir2/file"), false);
+ IgniteFsOutputStream out1 = fs.create(new IgniteFsPath("/dir1/file1"), false);
+ IgniteFsOutputStream out2 = fs.create(new IgniteFsPath("/dir1/file2"), false);
+ IgniteFsOutputStream out3 = fs.create(new IgniteFsPath("/dir1/dir2/file"), false);
m = fs.metrics();
@@ -269,7 +269,7 @@ public class GridGgfsMetricsSelfTest extends GridGgfsCommonAbstractTest {
assertEquals(0, m.filesOpenedForRead());
assertEquals(0, m.filesOpenedForWrite());
- GridGgfsOutputStream out = fs.append(new IgniteFsPath("/dir1/file1"), false);
+ IgniteFsOutputStream out = fs.append(new IgniteFsPath("/dir1/file1"), false);
out.write(new byte[20]);
@@ -341,7 +341,7 @@ public class GridGgfsMetricsSelfTest extends GridGgfsCommonAbstractTest {
public void testMultipleClose() throws Exception {
IgniteFs fs = ggfsPrimary[0];
- GridGgfsOutputStream out = fs.create(new IgniteFsPath("/file"), false);
+ IgniteFsOutputStream out = fs.create(new IgniteFsPath("/file"), false);
out.close();
out.close();
@@ -370,7 +370,7 @@ public class GridGgfsMetricsSelfTest extends GridGgfsCommonAbstractTest {
IgniteFsPath file2 = new IgniteFsPath("/file2");
// Create remote file and write some data to it.
- GridGgfsOutputStream out = ggfsSecondary.create(fileRemote, 256, true, null, 1, 256, null);
+ IgniteFsOutputStream out = ggfsSecondary.create(fileRemote, 256, true, null, 1, 256, null);
int rmtBlockSize = ggfsSecondary.info(fileRemote).blockSize();
@@ -388,7 +388,7 @@ public class GridGgfsMetricsSelfTest extends GridGgfsCommonAbstractTest {
checkBlockMetrics(initMetrics, ggfs.metrics(), 0, 0, 0, 0, 0, 0);
// Write two blocks to the file.
- GridGgfsOutputStream os = ggfs.append(file1, false);
+ IgniteFsOutputStream os = ggfs.append(file1, false);
os.write(new byte[blockSize * 2]);
os.close();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e4cd4d8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsModeResolverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsModeResolverSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsModeResolverSelfTest.java
index 3c397de..1bc63d3 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsModeResolverSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsModeResolverSelfTest.java
@@ -15,7 +15,7 @@ import org.gridgain.grid.util.typedef.*;
import java.util.*;
-import static org.gridgain.grid.ggfs.GridGgfsMode.*;
+import static org.gridgain.grid.ggfs.IgniteFsMode.*;
/**
*
@@ -49,21 +49,21 @@ public class GridGgfsModeResolverSelfTest extends TestCase {
* @throws Exception If failed.
*/
public void testResolveChildren() throws Exception {
- assertEquals(new HashSet<GridGgfsMode>(){{add(DUAL_SYNC); add(PRIMARY); add(PROXY);}},
+ assertEquals(new HashSet<IgniteFsMode>(){{add(DUAL_SYNC); add(PRIMARY); add(PROXY);}},
resolver.resolveChildrenModes(new IgniteFsPath("/")));
- assertEquals(new HashSet<GridGgfsMode>(){{add(DUAL_SYNC); add(PRIMARY); add(PROXY);}},
+ assertEquals(new HashSet<IgniteFsMode>(){{add(DUAL_SYNC); add(PRIMARY); add(PROXY);}},
resolver.resolveChildrenModes(new IgniteFsPath("/a")));
- assertEquals(new HashSet<GridGgfsMode>(){{add(DUAL_SYNC);}},
+ assertEquals(new HashSet<IgniteFsMode>(){{add(DUAL_SYNC);}},
resolver.resolveChildrenModes(new IgniteFsPath("/a/1")));
- assertEquals(new HashSet<GridGgfsMode>(){{add(PRIMARY); add(PROXY);}},
+ assertEquals(new HashSet<IgniteFsMode>(){{add(PRIMARY); add(PROXY);}},
resolver.resolveChildrenModes(new IgniteFsPath("/a/b")));
- assertEquals(new HashSet<GridGgfsMode>(){{add(PRIMARY); add(PROXY);}},
+ assertEquals(new HashSet<IgniteFsMode>(){{add(PRIMARY); add(PROXY);}},
resolver.resolveChildrenModes(new IgniteFsPath("/a/b/c")));
- assertEquals(new HashSet<GridGgfsMode>(){{add(PRIMARY);}},
+ assertEquals(new HashSet<IgniteFsMode>(){{add(PRIMARY);}},
resolver.resolveChildrenModes(new IgniteFsPath("/a/b/c/2")));
- assertEquals(new HashSet<GridGgfsMode>(){{add(PROXY);}},
+ assertEquals(new HashSet<IgniteFsMode>(){{add(PROXY);}},
resolver.resolveChildrenModes(new IgniteFsPath("/a/b/c/d")));
- assertEquals(new HashSet<GridGgfsMode>(){{add(PROXY);}},
+ assertEquals(new HashSet<IgniteFsMode>(){{add(PROXY);}},
resolver.resolveChildrenModes(new IgniteFsPath("/a/b/c/d/e")));
}
}