You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/06/07 07:56:04 UTC

[01/14] ignite git commit: WIP on output stream optos.

Repository: ignite
Updated Branches:
  refs/heads/ignite-3264 [created] 99d244a30


WIP on output stream optos.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3cd33732
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3cd33732
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3cd33732

Branch: refs/heads/ignite-3264
Commit: 3cd337329a3e3df1c7deb97742833f55ea1c6821
Parents: e409b67
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 11:52:07 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jun 6 11:52:07 2016 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |   6 +-
 .../internal/processors/igfs/IgfsImpl.java      |  48 +--
 .../igfs/IgfsOutputStreamAdapter.java           | 265 -------------
 .../processors/igfs/IgfsOutputStreamImpl.java   | 385 ++++++++++++++-----
 4 files changed, 298 insertions(+), 406 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3cd33732/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index d1f3ef5..d257807 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -862,9 +862,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
             startProcessor(new DataStreamProcessor(ctx));
             startProcessor((GridProcessor)IGFS.create(ctx, F.isEmpty(cfg.getFileSystemConfiguration())));
             startProcessor(new GridContinuousProcessor(ctx));
-            startProcessor((GridProcessor)(cfg.isPeerClassLoadingEnabled() ?
-                IgniteComponentType.HADOOP.create(ctx, true): // No-op when peer class loading is enabled.
-                IgniteComponentType.HADOOP.createIfInClassPath(ctx, cfg.getHadoopConfiguration() != null)));
+//            startProcessor((GridProcessor)(cfg.isPeerClassLoadingEnabled() ?
+//                IgniteComponentType.HADOOP.create(ctx, true): // No-op when peer class loading is enabled.
+//                IgniteComponentType.HADOOP.createIfInClassPath(ctx, cfg.getHadoopConfiguration() != null)));
             startProcessor(new DataStructuresProcessor(ctx));
             startProcessor(createComponent(PlatformProcessor.class, ctx));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3cd33732/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 9087ff0..bc2e087 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -1077,8 +1077,8 @@ public final class IgfsImpl implements IgfsEx {
 
                     batch = newBatch(path, desc.out());
 
-                    IgfsEventAwareOutputStream os = new IgfsEventAwareOutputStream(path, desc.info(),
-                        bufferSize(bufSize), mode, batch);
+                    IgfsOutputStreamImpl os = new IgfsOutputStreamImpl(igfsCtx, path, desc.info(),
+                        bufferSize(bufSize), mode, batch, metrics);
 
                     IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_WRITE);
 
@@ -1107,7 +1107,7 @@ public final class IgfsImpl implements IgfsEx {
 
                 assert res != null;
 
-                return new IgfsEventAwareOutputStream(path, res, bufferSize(bufSize), mode, null);
+                return new IgfsOutputStreamImpl(igfsCtx, path, res, bufferSize(bufSize), mode, null, metrics);
             }
         });
     }
@@ -1142,7 +1142,8 @@ public final class IgfsImpl implements IgfsEx {
 
                     batch = newBatch(path, desc.out());
 
-                    return new IgfsEventAwareOutputStream(path, desc.info(), bufferSize(bufSize), mode, batch);
+                    return new IgfsOutputStreamImpl(igfsCtx, path, desc.info(), bufferSize(bufSize), mode, batch,
+                        metrics);
                 }
 
                 final List<IgniteUuid> ids = meta.idsForPath(path);
@@ -1183,7 +1184,7 @@ public final class IgfsImpl implements IgfsEx {
 
                 assert res != null;
 
-                return new IgfsEventAwareOutputStream(path, res, bufferSize(bufSize), mode, null);
+                return new IgfsOutputStreamImpl(igfsCtx, path, res, bufferSize(bufSize), mode, null, metrics);
             }
         });
     }
@@ -1759,43 +1760,6 @@ public final class IgfsImpl implements IgfsEx {
     }
 
     /**
-     * IGFS output stream extension that fires events.
-     */
-    private class IgfsEventAwareOutputStream extends IgfsOutputStreamImpl {
-        /** Close guard. */
-        private final AtomicBoolean closeGuard = new AtomicBoolean(false);
-
-        /**
-         * Constructs file output stream.
-         *
-         * @param path Path to stored file.
-         * @param fileInfo File info.
-         * @param bufSize The size of the buffer to be used.
-         * @param mode IGFS mode.
-         * @param batch Optional secondary file system batch.
-         */
-        IgfsEventAwareOutputStream(IgfsPath path, IgfsEntryInfo fileInfo, int bufSize, IgfsMode mode,
-            @Nullable IgfsFileWorkerBatch batch) {
-            super(igfsCtx, path, fileInfo, bufSize, mode, batch, metrics);
-
-            metrics.incrementFilesOpenedForWrite();
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("NonSynchronizedMethodOverridesSynchronizedMethod")
-        @Override protected void onClose() throws IOException {
-            if (closeGuard.compareAndSet(false, true)) {
-                super.onClose();
-
-                metrics.decrementFilesOpenedForWrite();
-
-                if (evts.isRecordable(EVT_IGFS_FILE_CLOSED_WRITE))
-                    evts.record(new IgfsEvent(path, localNode(), EVT_IGFS_FILE_CLOSED_WRITE, bytes()));
-            }
-        }
-    }
-
-    /**
      * IGFS input stream extension that fires events.
      */
     private class IgfsEventAwareInputStream extends IgfsInputStreamImpl {

http://git-wip-us.apache.org/repos/asf/ignite/blob/3cd33732/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamAdapter.java
deleted file mode 100644
index 43de61e..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamAdapter.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.igfs;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.igfs.IgfsOutputStream;
-import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Output stream to store data into grid cache with separate blocks.
- */
-@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
-abstract class IgfsOutputStreamAdapter extends IgfsOutputStream {
-    /** Path to file. */
-    protected final IgfsPath path;
-
-    /** Buffer size. */
-    private final int bufSize;
-
-    /** Flag for this stream open/closed state. */
-    private boolean closed;
-
-    /** Local buffer to store stream data as consistent block. */
-    private ByteBuffer buf;
-
-    /** Bytes written. */
-    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-    protected long bytes;
-
-    /** Time consumed by write operations. */
-    protected long time;
-
-    /**
-     * Constructs file output stream.
-     *
-     * @param path Path to stored file.
-     * @param bufSize The size of the buffer to be used.
-     */
-    IgfsOutputStreamAdapter(IgfsPath path, int bufSize) {
-        assert path != null;
-        assert bufSize > 0;
-
-        this.path = path;
-        this.bufSize = bufSize;
-    }
-
-    /**
-     * Gets number of written bytes.
-     *
-     * @return Written bytes.
-     */
-    public long bytes() {
-        return bytes;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void write(int b) throws IOException {
-        checkClosed(null, 0);
-
-        long startTime = System.nanoTime();
-
-        b &= 0xFF;
-
-        if (buf == null)
-            buf = ByteBuffer.allocate(bufSize);
-
-        buf.put((byte)b);
-
-        if (buf.position() >= bufSize)
-            sendData(true); // Send data to server.
-
-        time += System.nanoTime() - startTime;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void write(byte[] b, int off, int len) throws IOException {
-        A.notNull(b, "b");
-
-        if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
-            throw new IndexOutOfBoundsException("Invalid bounds [data.length=" + b.length + ", offset=" + off +
-                ", length=" + len + ']');
-        }
-
-        checkClosed(null, 0);
-
-        if (len == 0)
-            return; // Done.
-
-        long startTime = System.nanoTime();
-
-        if (buf == null) {
-            // Do not allocate and copy byte buffer if will send data immediately.
-            if (len >= bufSize) {
-                buf = ByteBuffer.wrap(b, off, len);
-
-                sendData(false);
-
-                return;
-            }
-
-            buf = ByteBuffer.allocate(Math.max(bufSize, len));
-        }
-
-        if (buf.remaining() < len)
-            // Expand buffer capacity, if remaining size is less then data size.
-            buf = ByteBuffer.allocate(buf.position() + len).put((ByteBuffer)buf.flip());
-
-        assert len <= buf.remaining() : "Expects write data size less or equal then remaining buffer capacity " +
-            "[len=" + len + ", buf.remaining=" + buf.remaining() + ']';
-
-        buf.put(b, off, len);
-
-        if (buf.position() >= bufSize)
-            sendData(true); // Send data to server.
-
-        time += System.nanoTime() - startTime;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized void transferFrom(DataInput in, int len) throws IOException {
-        checkClosed(in, len);
-
-        long startTime = System.nanoTime();
-
-        // Send all IPC data from the local buffer before streaming.
-        if (buf != null && buf.position() > 0)
-            sendData(true);
-
-        try {
-            storeDataBlocks(in, len);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IOException(e.getMessage(), e);
-        }
-
-        time += System.nanoTime() - startTime;
-    }
-
-    /**
-     * Flushes this output stream and forces any buffered output bytes to be written out.
-     *
-     * @exception IOException  if an I/O error occurs.
-     */
-    @Override public synchronized void flush() throws IOException {
-        checkClosed(null, 0);
-
-        // Send all IPC data from the local buffer.
-        if (buf != null && buf.position() > 0)
-            sendData(true);
-    }
-
-    /** {@inheritDoc} */
-    @Override public final synchronized void close() throws IOException {
-        // Do nothing if stream is already closed.
-        if (closed)
-            return;
-
-        try {
-            // Send all IPC data from the local buffer.
-            try {
-                flush();
-            }
-            finally {
-                onClose(); // "onClose()" routine must be invoked anyway!
-            }
-        }
-        finally {
-            // Mark this stream closed AFTER flush.
-            closed = true;
-        }
-    }
-
-    /**
-     * Store data blocks in file.<br/>
-     * Note! If file concurrently deleted we'll get lost blocks.
-     *
-     * @param data Data to store.
-     * @throws IgniteCheckedException If failed.
-     */
-    protected abstract void storeDataBlock(ByteBuffer data) throws IgniteCheckedException, IOException;
-
-    /**
-     * Store data blocks in file reading appropriate number of bytes from given data input.
-     *
-     * @param in Data input to read from.
-     * @param len Data length to store.
-     * @throws IgniteCheckedException If failed.
-     */
-    protected abstract void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException;
-
-    /**
-     * Close callback. It will be called only once in synchronized section.
-     *
-     * @throws IOException If failed.
-     */
-    protected void onClose() throws IOException {
-        // No-op.
-    }
-
-    /**
-     * Validate this stream is open.
-     *
-     * @throws IOException If this stream is closed.
-     */
-    private void checkClosed(@Nullable DataInput in, int len) throws IOException {
-        assert Thread.holdsLock(this);
-
-        if (closed) {
-            // Must read data from stream before throwing exception.
-            if (in != null)
-                in.skipBytes(len);
-
-            throw new IOException("Stream has been closed: " + this);
-        }
-    }
-
-    /**
-     * Send all local-buffered data to server.
-     *
-     * @param flip Whether to flip buffer on sending data. We do not want to flip it if sending wrapped
-     *      byte array.
-     * @throws IOException In case of IO exception.
-     */
-    private void sendData(boolean flip) throws IOException {
-        assert Thread.holdsLock(this);
-
-        try {
-            if (flip)
-                buf.flip();
-
-            storeDataBlock(buf);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IOException("Failed to store data into file: " + path, e);
-        }
-
-        buf = null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(IgfsOutputStreamAdapter.class, this);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/3cd33732/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index 21e5fb6..7a40ba3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -18,12 +18,16 @@
 package org.apache.ignite.internal.processors.igfs;
 
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.events.IgfsEvent;
 import org.apache.ignite.igfs.IgfsException;
 import org.apache.ignite.igfs.IgfsMode;
+import org.apache.ignite.igfs.IgfsOutputStream;
 import org.apache.ignite.igfs.IgfsPath;
 import org.apache.ignite.igfs.IgfsPathNotFoundException;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
@@ -34,6 +38,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_WRITE;
 import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
 import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
 import static org.apache.ignite.igfs.IgfsMode.PROXY;
@@ -41,10 +46,29 @@ import static org.apache.ignite.igfs.IgfsMode.PROXY;
 /**
  * Output stream to store data into grid cache with separate blocks.
  */
-class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
+class IgfsOutputStreamImpl extends IgfsOutputStream {
     /** Maximum number of blocks in buffer. */
     private static final int MAX_BLOCKS_CNT = 16;
 
+    /** Path to file. */
+    protected final IgfsPath path;
+
+    /** Buffer size. */
+    protected final int bufSize;
+
+    /** Flag for this stream open/closed state. */
+    protected boolean closed;
+
+    /** Local buffer to store stream data as consistent block. */
+    protected ByteBuffer buf;
+
+    /** Bytes written. */
+    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+    protected long bytes;
+
+    /** Time consumed by write operations. */
+    protected long time;
+
     /** IGFS context. */
     private IgfsContext igfsCtx;
 
@@ -86,6 +110,9 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
     /** Affinity written by this output stream. */
     private IgfsFileAffinityRange streamRange;
 
+    /** Close guard. */
+    private final AtomicBoolean closeGuard = new AtomicBoolean(false);
+
     /**
      * Constructs file output stream.
      *
@@ -99,7 +126,8 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
      */
     IgfsOutputStreamImpl(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo fileInfo, int bufSize, IgfsMode mode,
         @Nullable IgfsFileWorkerBatch batch, IgfsLocalMetrics metrics) {
-        super(path, optimizeBufferSize(bufSize, fileInfo));
+        this.path = path;
+        this.bufSize = optimizeBufferSize(bufSize, fileInfo);
 
         assert fileInfo != null;
         assert fileInfo.isFile() : "Unexpected file info: " + fileInfo;
@@ -125,128 +153,123 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
         streamRange = initialStreamRange(fileInfo);
 
         writeCompletionFut = data.writeStart(fileInfo);
-    }
 
-    /**
-     * Optimize buffer size.
-     *
-     * @param bufSize Requested buffer size.
-     * @param fileInfo File info.
-     * @return Optimized buffer size.
-     */
-    @SuppressWarnings("IfMayBeConditional")
-    private static int optimizeBufferSize(int bufSize, IgfsEntryInfo fileInfo) {
-        assert bufSize > 0;
+        metrics.incrementFilesOpenedForWrite();
+    }
 
-        if (fileInfo == null)
-            return bufSize;
+    /** {@inheritDoc} */
+    @Override public synchronized void write(int b) throws IOException {
+        checkClosed(null, 0);
 
-        int blockSize = fileInfo.blockSize();
+        long startTime = System.nanoTime();
 
-        if (blockSize <= 0)
-            return bufSize;
+        b &= 0xFF;
 
-        if (bufSize <= blockSize)
-            // Optimize minimum buffer size to be equal file's block size.
-            return blockSize;
+        if (buf == null)
+            buf = ByteBuffer.allocate(bufSize);
 
-        int maxBufSize = blockSize * MAX_BLOCKS_CNT;
+        buf.put((byte)b);
 
-        if (bufSize > maxBufSize)
-            // There is no profit or optimization from larger buffers.
-            return maxBufSize;
+        if (buf.position() >= bufSize)
+            sendData(true); // Send data to server.
 
-        if (fileInfo.length() == 0)
-            // Make buffer size multiple of block size (optimized for new files).
-            return bufSize / blockSize * blockSize;
-
-        return bufSize;
+        time += System.nanoTime() - startTime;
     }
 
     /** {@inheritDoc} */
-    @Override protected synchronized void storeDataBlock(ByteBuffer block) throws IgniteCheckedException, IOException {
-        int writeLen = block.remaining();
+    @SuppressWarnings("NullableProblems")
+    @Override public synchronized void write(byte[] b, int off, int len) throws IOException {
+        A.notNull(b, "b");
 
-        preStoreDataBlocks(null, writeLen);
+        if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
+            throw new IndexOutOfBoundsException("Invalid bounds [data.length=" + b.length + ", offset=" + off +
+                ", length=" + len + ']');
+        }
 
-        int blockSize = fileInfo.blockSize();
+        checkClosed(null, 0);
 
-        // If data length is not enough to fill full block, fill the remainder and return.
-        if (remainderDataLen + writeLen < blockSize) {
-            if (remainder == null)
-                remainder = new byte[blockSize];
-            else if (remainder.length != blockSize) {
-                assert remainderDataLen == remainder.length;
+        if (len == 0)
+            return; // Done.
 
-                byte[] allocated = new byte[blockSize];
+        long startTime = System.nanoTime();
 
-                U.arrayCopy(remainder, 0, allocated, 0, remainder.length);
+        if (buf == null) {
+            // Do not allocate and copy byte buffer if will send data immediately.
+            if (len >= bufSize) {
+                buf = ByteBuffer.wrap(b, off, len);
 
-                remainder = allocated;
-            }
+                sendData(false);
 
-            block.get(remainder, remainderDataLen, writeLen);
+                return;
+            }
 
-            remainderDataLen += writeLen;
+            buf = ByteBuffer.allocate(Math.max(bufSize, len));
         }
-        else {
-            remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, block,
-                false, streamRange, batch);
 
-            remainderDataLen = remainder == null ? 0 : remainder.length;
-        }
-    }
+        if (buf.remaining() < len)
+            // Expand buffer capacity, if remaining size is less then data size.
+            buf = ByteBuffer.allocate(buf.position() + len).put((ByteBuffer)buf.flip());
 
-    /** {@inheritDoc} */
-    @Override protected synchronized void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException {
-        preStoreDataBlocks(in, len);
+        assert len <= buf.remaining() : "Expects write data size less or equal then remaining buffer capacity " +
+            "[len=" + len + ", buf.remaining=" + buf.remaining() + ']';
 
-        int blockSize = fileInfo.blockSize();
+        buf.put(b, off, len);
 
-        // If data length is not enough to fill full block, fill the remainder and return.
-        if (remainderDataLen + len < blockSize) {
-            if (remainder == null)
-                remainder = new byte[blockSize];
-            else if (remainder.length != blockSize) {
-                assert remainderDataLen == remainder.length;
+        if (buf.position() >= bufSize)
+            sendData(true); // Send data to server.
 
-                byte[] allocated = new byte[blockSize];
+        time += System.nanoTime() - startTime;
+    }
 
-                U.arrayCopy(remainder, 0, allocated, 0, remainder.length);
+    /** {@inheritDoc} */
+    @Override public synchronized void transferFrom(DataInput in, int len) throws IOException {
+        checkClosed(in, len);
 
-                remainder = allocated;
-            }
+        long startTime = System.nanoTime();
 
-            in.readFully(remainder, remainderDataLen, len);
+        // Send all IPC data from the local buffer before streaming.
+        if (buf != null && buf.position() > 0)
+            sendData(true);
 
-            remainderDataLen += len;
+        try {
+            storeDataBlocks(in, len);
         }
-        else {
-            remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, in, len,
-                false, streamRange, batch);
-
-            remainderDataLen = remainder == null ? 0 : remainder.length;
+        catch (IgniteCheckedException e) {
+            throw new IOException(e.getMessage(), e);
         }
+
+        time += System.nanoTime() - startTime;
     }
 
-    /**
-     * Initializes data loader if it was not initialized yet and updates written space.
-     *
-     * @param len Data length to be written.
-     */
-    private void preStoreDataBlocks(@Nullable DataInput in, int len) throws IgniteCheckedException, IOException {
-        // Check if any exception happened while writing data.
-        if (writeCompletionFut.isDone()) {
-            assert ((GridFutureAdapter)writeCompletionFut).isFailed();
+    /** {@inheritDoc} */
+    @Override public final synchronized void close() throws IOException {
+        // Do nothing if stream is already closed.
+        if (closed)
+            return;
 
-            if (in != null)
-                in.skipBytes(len);
+        try {
+            // Send all IPC data from the local buffer.
+            try {
+                flush();
+            }
+            finally {
+                if (closeGuard.compareAndSet(false, true)) {
+                    onClose(false);
 
-            writeCompletionFut.get();
-        }
+                    metrics.decrementFilesOpenedForWrite();
 
-        bytes += len;
-        space += len;
+                    GridEventStorageManager evts = igfsCtx.kernalContext().event();
+
+                    if (evts.isRecordable(EVT_IGFS_FILE_CLOSED_WRITE))
+                        evts.record(new IgfsEvent(path, igfsCtx.kernalContext().discovery().localNode(),
+                            EVT_IGFS_FILE_CLOSED_WRITE, bytes));
+                }
+            }
+        }
+        finally {
+            // Mark this stream closed AFTER flush.
+            closed = true;
+        }
     }
 
     /**
@@ -270,7 +293,11 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
             throw new IOException("File was concurrently deleted: " + path);
         }
 
-        super.flush();
+        checkClosed(null, 0);
+
+        // Send all IPC data from the local buffer.
+        if (buf != null && buf.position() > 0)
+            sendData(true);
 
         try {
             if (remainder != null) {
@@ -301,9 +328,103 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
         }
     }
 
-    /** {@inheritDoc} */
-    @Override protected void onClose() throws IOException {
-        onClose(false);
+    /**
+     * Initializes data loader if it was not initialized yet and updates written space.
+     *
+     * @param len Data length to be written.
+     */
+    private void preStoreDataBlocks(@Nullable DataInput in, int len) throws IgniteCheckedException, IOException {
+        // Check if any exception happened while writing data.
+        if (writeCompletionFut.isDone()) {
+            assert ((GridFutureAdapter)writeCompletionFut).isFailed();
+
+            if (in != null)
+                in.skipBytes(len);
+
+            writeCompletionFut.get();
+        }
+
+        bytes += len;
+        space += len;
+    }
+
+    /**
+     * Store data block.
+     *
+     * @param block Block.
+     * @throws IgniteCheckedException If failed.
+     * @throws IOException If failed.
+     */
+    protected synchronized void storeDataBlock(ByteBuffer block) throws IgniteCheckedException, IOException {
+        int writeLen = block.remaining();
+
+        preStoreDataBlocks(null, writeLen);
+
+        int blockSize = fileInfo.blockSize();
+
+        // If data length is not enough to fill full block, fill the remainder and return.
+        if (remainderDataLen + writeLen < blockSize) {
+            if (remainder == null)
+                remainder = new byte[blockSize];
+            else if (remainder.length != blockSize) {
+                assert remainderDataLen == remainder.length;
+
+                byte[] allocated = new byte[blockSize];
+
+                U.arrayCopy(remainder, 0, allocated, 0, remainder.length);
+
+                remainder = allocated;
+            }
+
+            block.get(remainder, remainderDataLen, writeLen);
+
+            remainderDataLen += writeLen;
+        }
+        else {
+            remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, block,
+                false, streamRange, batch);
+
+            remainderDataLen = remainder == null ? 0 : remainder.length;
+        }
+    }
+
+    /**
+     * Store data blocks.
+     *
+     * @param in Input.
+     * @param len Length.
+     * @throws IgniteCheckedException If failed.
+     * @throws IOException If failed.
+     */
+    protected synchronized void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException {
+        preStoreDataBlocks(in, len);
+
+        int blockSize = fileInfo.blockSize();
+
+        // If data length is not enough to fill full block, fill the remainder and return.
+        if (remainderDataLen + len < blockSize) {
+            if (remainder == null)
+                remainder = new byte[blockSize];
+            else if (remainder.length != blockSize) {
+                assert remainderDataLen == remainder.length;
+
+                byte[] allocated = new byte[blockSize];
+
+                U.arrayCopy(remainder, 0, allocated, 0, remainder.length);
+
+                remainder = allocated;
+            }
+
+            in.readFully(remainder, remainderDataLen, len);
+
+            remainderDataLen += len;
+        }
+        else {
+            remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, in, len,
+                false, streamRange, batch);
+
+            remainderDataLen = remainder == null ? 0 : remainder.length;
+        }
     }
 
     /**
@@ -317,11 +438,8 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
 
         if (onCloseGuard.compareAndSet(false, true)) {
             // Notify backing secondary file system batch to finish.
-            if (mode != PRIMARY) {
-                assert batch != null;
-
+            if (batch != null)
                 batch.finish();
-            }
 
             // Ensure file existence.
             boolean exists;
@@ -393,6 +511,46 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
     }
 
     /**
+     * Validate this stream is open.
+     *
+     * @throws IOException If this stream is closed.
+     */
+    protected void checkClosed(@Nullable DataInput in, int len) throws IOException {
+        assert Thread.holdsLock(this);
+
+        if (closed) {
+            // Must read data from stream before throwing exception.
+            if (in != null)
+                in.skipBytes(len);
+
+            throw new IOException("Stream has been closed: " + this);
+        }
+    }
+
+    /**
+     * Send all local-buffered data to server.
+     *
+     * @param flip Whether to flip buffer on sending data. We do not want to flip it if sending wrapped
+     *      byte array.
+     * @throws IOException In case of IO exception.
+     */
+    protected void sendData(boolean flip) throws IOException {
+        assert Thread.holdsLock(this);
+
+        try {
+            if (flip)
+                buf.flip();
+
+            storeDataBlock(buf);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IOException("Failed to store data into file: " + path, e);
+        }
+
+        buf = null;
+    }
+
+    /**
      * Gets initial affinity range. This range will have 0 length and will start from first
      * non-occupied file block.
      *
@@ -426,6 +584,41 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
         return affKey == null ? null : new IgfsFileAffinityRange(off, off, affKey);
     }
 
+    /**
+     * Optimize buffer size.
+     *
+     * @param bufSize Requested buffer size.
+     * @param fileInfo File info.
+     * @return Optimized buffer size.
+     */
+    private static int optimizeBufferSize(int bufSize, IgfsEntryInfo fileInfo) {
+        assert bufSize > 0;
+
+        if (fileInfo == null)
+            return bufSize;
+
+        int blockSize = fileInfo.blockSize();
+
+        if (blockSize <= 0)
+            return bufSize;
+
+        if (bufSize <= blockSize)
+            // Optimize minimum buffer size to be equal file's block size.
+            return blockSize;
+
+        int maxBufSize = blockSize * MAX_BLOCKS_CNT;
+
+        if (bufSize > maxBufSize)
+            // There is no profit or optimization from larger buffers.
+            return maxBufSize;
+
+        if (fileInfo.length() == 0)
+            // Make buffer size multiple of block size (optimized for new files).
+            return bufSize / blockSize * blockSize;
+
+        return bufSize;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IgfsOutputStreamImpl.class, this);


[04/14] ignite git commit: Protected -> private.

Posted by vo...@apache.org.
Protected -> private.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b6c6b488
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b6c6b488
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b6c6b488

Branch: refs/heads/ignite-3264
Commit: b6c6b488608acbe0dc4009134b49cf0187f8a250
Parents: 75b6080
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 12:01:26 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jun 6 12:01:26 2016 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsOutputStreamImpl.java   | 64 ++++++++++----------
 1 file changed, 32 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b6c6b488/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index c50c431..98ccb81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -51,23 +51,23 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
     private static final int MAX_BLOCKS_CNT = 16;
 
     /** Path to file. */
-    protected final IgfsPath path;
+    private final IgfsPath path;
 
     /** Buffer size. */
-    protected final int bufSize;
+    private final int bufSize;
 
     /** Flag for this stream open/closed state. */
-    protected boolean closed;
+    private boolean closed;
 
     /** Local buffer to store stream data as consistent block. */
-    protected ByteBuffer buf;
+    private ByteBuffer buf;
 
     /** Bytes written. */
     @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-    protected long bytes;
+    private long bytes;
 
     /** Time consumed by write operations. */
-    protected long time;
+    private long time;
 
     /** IGFS context. */
     private IgfsContext igfsCtx;
@@ -342,33 +342,13 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
     }
 
     /**
-     * Initializes data loader if it was not initialized yet and updates written space.
-     *
-     * @param len Data length to be written.
-     */
-    private void preStoreDataBlocks(@Nullable DataInput in, int len) throws IgniteCheckedException, IOException {
-        // Check if any exception happened while writing data.
-        if (writeCompletionFut.isDone()) {
-            assert ((GridFutureAdapter)writeCompletionFut).isFailed();
-
-            if (in != null)
-                in.skipBytes(len);
-
-            writeCompletionFut.get();
-        }
-
-        bytes += len;
-        space += len;
-    }
-
-    /**
      * Store data block.
      *
      * @param block Block.
      * @throws IgniteCheckedException If failed.
      * @throws IOException If failed.
      */
-    protected void storeDataBlock(ByteBuffer block) throws IgniteCheckedException, IOException {
+    private void storeDataBlock(ByteBuffer block) throws IgniteCheckedException, IOException {
         assert Thread.holdsLock(mux);
 
         int writeLen = block.remaining();
@@ -411,7 +391,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
      * @throws IgniteCheckedException If failed.
      * @throws IOException If failed.
      */
-    protected void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException {
+    private void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException {
         assert Thread.holdsLock(mux);
 
         preStoreDataBlocks(in, len);
@@ -445,6 +425,26 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
     }
 
     /**
+     * Initializes data loader if it was not initialized yet and updates written space.
+     *
+     * @param len Data length to be written.
+     */
+    private void preStoreDataBlocks(@Nullable DataInput in, int len) throws IgniteCheckedException, IOException {
+        // Check if any exception happened while writing data.
+        if (writeCompletionFut.isDone()) {
+            assert ((GridFutureAdapter)writeCompletionFut).isFailed();
+
+            if (in != null)
+                in.skipBytes(len);
+
+            writeCompletionFut.get();
+        }
+
+        bytes += len;
+        space += len;
+    }
+
+    /**
      * Close callback. It will be called only once in synchronized section.
      *
      * @param deleted Whether we already know that the file was deleted.
@@ -537,7 +537,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
      *
      * @throws IOException If this stream is closed.
      */
-    protected void checkClosed(@Nullable DataInput in, int len) throws IOException {
+    private void checkClosed(@Nullable DataInput in, int len) throws IOException {
         assert Thread.holdsLock(mux);
 
         if (closed) {
@@ -556,7 +556,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
      *      byte array.
      * @throws IOException In case of IO exception.
      */
-    protected void sendData(boolean flip) throws IOException {
+    private void sendData(boolean flip) throws IOException {
         assert Thread.holdsLock(mux);
 
         try {
@@ -564,12 +564,12 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
                 buf.flip();
 
             storeDataBlock(buf);
+
+            buf = null;
         }
         catch (IgniteCheckedException e) {
             throw new IOException("Failed to store data into file: " + path, e);
         }
-
-        buf = null;
     }
 
     /**


[12/14] ignite git commit: Minors.

Posted by vo...@apache.org.
Minors.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/93f8eca5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/93f8eca5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/93f8eca5

Branch: refs/heads/ignite-3264
Commit: 93f8eca53008849ccfe609b5c7e5c20425f530e7
Parents: da1ff65
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Jun 7 09:57:07 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Jun 7 09:57:07 2016 +0300

----------------------------------------------------------------------
 .../main/java/org/apache/ignite/internal/IgniteKernal.java    | 6 +++---
 .../ignite/internal/processors/igfs/IgfsOutputStreamImpl.java | 7 ++++---
 2 files changed, 7 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/93f8eca5/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index d257807..d1f3ef5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -862,9 +862,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
             startProcessor(new DataStreamProcessor(ctx));
             startProcessor((GridProcessor)IGFS.create(ctx, F.isEmpty(cfg.getFileSystemConfiguration())));
             startProcessor(new GridContinuousProcessor(ctx));
-//            startProcessor((GridProcessor)(cfg.isPeerClassLoadingEnabled() ?
-//                IgniteComponentType.HADOOP.create(ctx, true): // No-op when peer class loading is enabled.
-//                IgniteComponentType.HADOOP.createIfInClassPath(ctx, cfg.getHadoopConfiguration() != null)));
+            startProcessor((GridProcessor)(cfg.isPeerClassLoadingEnabled() ?
+                IgniteComponentType.HADOOP.create(ctx, true): // No-op when peer class loading is enabled.
+                IgniteComponentType.HADOOP.createIfInClassPath(ctx, cfg.getHadoopConfiguration() != null)));
             startProcessor(new DataStructuresProcessor(ctx));
             startProcessor(createComponent(PlatformProcessor.class, ctx));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/93f8eca5/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index 16a20a2..f51e9b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -271,12 +271,12 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
      */
     @Override public void flush() throws IOException {
         synchronized (mux) {
-
             boolean exists;
 
             try {
                 exists = igfsCtx.meta().exists(fileInfo.id());
-            } catch (IgniteCheckedException e) {
+            }
+            catch (IgniteCheckedException e) {
                 throw new IOException("File to read file metadata: " + path, e);
             }
 
@@ -315,7 +315,8 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
 
                     space = 0;
                 }
-            } catch (IgniteCheckedException e) {
+            }
+            catch (IgniteCheckedException e) {
                 throw new IOException("Failed to flush data [path=" + path + ", space=" + space + ']', e);
             }
         }


[05/14] ignite git commit: Removed "meta" and "data" fields.

Posted by vo...@apache.org.
Removed "meta" and "data" fields.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5949abe4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5949abe4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5949abe4

Branch: refs/heads/ignite-3264
Commit: 5949abe4943bc07de26c4ecce62148e7f399cb41
Parents: b6c6b48
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 12:02:59 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jun 6 12:02:59 2016 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsOutputStreamImpl.java   | 38 ++++++++------------
 1 file changed, 15 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5949abe4/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index 98ccb81..bc32e81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -72,12 +72,6 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
     /** IGFS context. */
     private IgfsContext igfsCtx;
 
-    /** Meta info manager. */
-    private final IgfsMetaManager meta;
-
-    /** Data manager. */
-    private final IgfsDataManager data;
-
     /** File descriptor. */
     @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
     private IgfsEntryInfo fileInfo;
@@ -146,8 +140,6 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
             assert !IgfsUtils.DELETE_LOCK_ID.equals(fileInfo.lockId());
 
             this.igfsCtx = igfsCtx;
-            meta = igfsCtx.meta();
-            data = igfsCtx.data();
 
             this.fileInfo = fileInfo;
             this.mode = mode;
@@ -156,7 +148,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
 
             streamRange = initialStreamRange(fileInfo);
 
-            writeCompletionFut = data.writeStart(fileInfo);
+            writeCompletionFut = igfsCtx.data().writeStart(fileInfo);
 
             metrics.incrementFilesOpenedForWrite();
         }
@@ -295,7 +287,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
             boolean exists;
 
             try {
-                exists = meta.exists(fileInfo.id());
+                exists = igfsCtx.meta().exists(fileInfo.id());
             } catch (IgniteCheckedException e) {
                 throw new IOException("File to read file metadata: " + path, e);
             }
@@ -314,7 +306,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
 
             try {
                 if (remainder != null) {
-                    data.storeDataBlocks(fileInfo, fileInfo.length() + space, null, 0,
+                    igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, null, 0,
                         ByteBuffer.wrap(remainder, 0, remainderDataLen), true, streamRange, batch);
 
                     remainder = null;
@@ -322,9 +314,9 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
                 }
 
                 if (space > 0) {
-                    data.awaitAllAcksReceived(fileInfo.id());
+                    igfsCtx.data().awaitAllAcksReceived(fileInfo.id());
 
-                    IgfsEntryInfo fileInfo0 = meta.reserveSpace(path, fileInfo.id(), space, streamRange);
+                    IgfsEntryInfo fileInfo0 = igfsCtx.meta().reserveSpace(path, fileInfo.id(), space, streamRange);
 
                     if (fileInfo0 == null)
                         throw new IOException("File was concurrently deleted: " + path);
@@ -376,8 +368,8 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
             remainderDataLen += writeLen;
         }
         else {
-            remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, block,
-                false, streamRange, batch);
+            remainder = igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, remainder,
+                remainderDataLen, block, false, streamRange, batch);
 
             remainderDataLen = remainder == null ? 0 : remainder.length;
         }
@@ -417,8 +409,8 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
             remainderDataLen += len;
         }
         else {
-            remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, in, len,
-                false, streamRange, batch);
+            remainder = igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, remainder,
+                remainderDataLen, in, len, false, streamRange, batch);
 
             remainderDataLen = remainder == null ? 0 : remainder.length;
         }
@@ -462,7 +454,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
             boolean exists;
 
             try {
-                exists = !deleted && meta.exists(fileInfo.id());
+                exists = !deleted && igfsCtx.meta().exists(fileInfo.id());
             }
             catch (IgniteCheckedException e) {
                 throw new IOException("File to read file metadata: " + path, e);
@@ -472,7 +464,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
                 IOException err = null;
 
                 try {
-                    data.writeClose(fileInfo);
+                    igfsCtx.data().writeClose(fileInfo);
 
                     writeCompletionFut.get();
                 }
@@ -499,10 +491,10 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
                 long modificationTime = System.currentTimeMillis();
 
                 try {
-                    meta.unlock(fileInfo, modificationTime);
+                    igfsCtx.meta().unlock(fileInfo, modificationTime);
                 }
                 catch (IgfsPathNotFoundException ignore) {
-                    data.delete(fileInfo); // Safety to ensure that all data blocks are deleted.
+                    igfsCtx.data().delete(fileInfo); // Safety to ensure that all data blocks are deleted.
 
                     throw new IOException("File was concurrently deleted: " + path);
                 }
@@ -526,7 +518,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
                         ", fileInfo=" + fileInfo + ']', e);
                 }
                 finally {
-                    data.delete(fileInfo);
+                    igfsCtx.data().delete(fileInfo);
                 }
             }
         }
@@ -601,7 +593,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
 
         IgniteUuid prevAffKey = map == null ? null : map.affinityKey(lastBlockOff, false);
 
-        IgniteUuid affKey = data.nextAffinityKey(prevAffKey);
+        IgniteUuid affKey = igfsCtx.data().nextAffinityKey(prevAffKey);
 
         return affKey == null ? null : new IgfsFileAffinityRange(off, off, affKey);
     }


[07/14] ignite git commit: Simplified ctor.

Posted by vo...@apache.org.
Simplified ctor.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/04e311b8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/04e311b8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/04e311b8

Branch: refs/heads/ignite-3264
Commit: 04e311b8cef3dd8e3dd1c27f8f4d5816bcc916db
Parents: a76b349
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 12:07:05 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jun 6 12:07:05 2016 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsOutputStreamImpl.java   | 25 +++++++-------------
 1 file changed, 9 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/04e311b8/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index 8c93aad..13808ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -119,33 +119,26 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
      */
     IgfsOutputStreamImpl(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo fileInfo, int bufSize, IgfsMode mode,
         @Nullable IgfsFileWorkerBatch batch) {
+        assert fileInfo != null && fileInfo.isFile() : "Unexpected file info: " + fileInfo;
+        assert mode != null && mode != PROXY && (mode == PRIMARY && batch == null || batch != null);
+
+        // File hasn't been locked.
+        if (fileInfo.lockId() == null)
+            throw new IgfsException("Failed to acquire file lock (concurrently modified?): " + path);
+
         synchronized (mux) {
             this.path = path;
             this.bufSize = optimizeBufferSize(bufSize, fileInfo);
-
-            assert fileInfo != null;
-            assert fileInfo.isFile() : "Unexpected file info: " + fileInfo;
-            assert mode != null && mode != PROXY;
-            assert mode == PRIMARY && batch == null || batch != null;
-
-            // File hasn't been locked.
-            if (fileInfo.lockId() == null)
-                throw new IgfsException("Failed to acquire file lock (concurrently modified?): " + path);
-
-            assert !IgfsUtils.DELETE_LOCK_ID.equals(fileInfo.lockId());
-
             this.igfsCtx = igfsCtx;
-
             this.fileInfo = fileInfo;
             this.mode = mode;
             this.batch = batch;
 
             streamRange = initialStreamRange(fileInfo);
-
             writeCompletionFut = igfsCtx.data().writeStart(fileInfo);
-
-            igfsCtx.igfs().localMetrics().incrementFilesOpenedForWrite();
         }
+
+        igfsCtx.igfs().localMetrics().incrementFilesOpenedForWrite();
     }
 
     /** {@inheritDoc} */


[10/14] ignite git commit: IGNITE-3260: IGFS: Delete messages are no longer passed.

Posted by vo...@apache.org.
IGNITE-3260: IGFS: Delete messages are no longer passed.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/065d2e70
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/065d2e70
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/065d2e70

Branch: refs/heads/ignite-3264
Commit: 065d2e70c21418437eba5e725eaa8b1ebc3af6da
Parents: 0176af1
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 18:12:42 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jun 6 18:12:42 2016 +0300

----------------------------------------------------------------------
 .../internal/processors/igfs/IgfsAsyncImpl.java |   6 -
 .../processors/igfs/IgfsDataManager.java        |  61 ++---
 .../processors/igfs/IgfsDeleteWorker.java       |  42 ----
 .../ignite/internal/processors/igfs/IgfsEx.java |   9 -
 .../internal/processors/igfs/IgfsImpl.java      | 249 +++++--------------
 .../internal/processors/igfs/IgfsUtils.java     |   2 +-
 .../ignite/igfs/IgfsFragmentizerSelfTest.java   |   2 -
 .../processors/igfs/IgfsSizeSelfTest.java       | 133 ----------
 .../HadoopDefaultMapReducePlannerSelfTest.java  |   6 -
 9 files changed, 83 insertions(+), 427 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
index 8653f90..7530557 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
@@ -33,7 +33,6 @@ import org.apache.ignite.igfs.mapreduce.IgfsRecordResolver;
 import org.apache.ignite.igfs.mapreduce.IgfsTask;
 import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
 import org.apache.ignite.internal.AsyncSupportAdapter;
-import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
@@ -166,11 +165,6 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter<IgniteFileSystem> impleme
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException {
-        return igfs.awaitDeletesAsync();
-    }
-
-    /** {@inheritDoc} */
     @Nullable @Override public String clientLogDirectory() {
         return igfs.clientLogDirectory();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index 16fbeb8..57a8c6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -33,7 +33,6 @@ import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper;
 import org.apache.ignite.igfs.IgfsOutOfSpaceException;
 import org.apache.ignite.igfs.IgfsPath;
 import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
-import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -1056,34 +1055,24 @@ public class IgfsDataManager extends IgfsManager {
     private void processPartialBlockWrite(IgniteUuid fileId, IgfsBlockKey colocatedKey, int startOff,
         byte[] data) throws IgniteCheckedException {
         if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax()) {
-            try {
-                igfs.awaitDeletesAsync().get(trashPurgeTimeout);
-            }
-            catch (IgniteFutureTimeoutCheckedException ignore) {
-                // Ignore.
-            }
+            final WriteCompletionFuture completionFut = pendingWrites.get(fileId);
 
-            // Additional size check.
-            if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax()) {
-                final WriteCompletionFuture completionFut = pendingWrites.get(fileId);
-
-                if (completionFut == null) {
-                    if (log.isDebugEnabled())
-                        log.debug("Missing completion future for file write request (most likely exception occurred " +
-                            "which will be thrown upon stream close) [fileId=" + fileId + ']');
+            if (completionFut == null) {
+                if (log.isDebugEnabled())
+                    log.debug("Missing completion future for file write request (most likely exception occurred " +
+                        "which will be thrown upon stream close) [fileId=" + fileId + ']');
 
-                    return;
-                }
+                return;
+            }
 
-                IgfsOutOfSpaceException e = new IgfsOutOfSpaceException("Failed to write data block " +
-                    "(IGFS maximum data size exceeded) [used=" + dataCachePrj.igfsDataSpaceUsed() +
-                    ", allowed=" + dataCachePrj.igfsDataSpaceMax() + ']');
+            IgfsOutOfSpaceException e = new IgfsOutOfSpaceException("Failed to write data block " +
+                "(IGFS maximum data size exceeded) [used=" + dataCachePrj.igfsDataSpaceUsed() +
+                ", allowed=" + dataCachePrj.igfsDataSpaceMax() + ']');
 
-                completionFut.onDone(new IgniteCheckedException("Failed to write data (not enough space on node): " +
-                    igfsCtx.kernalContext().localNodeId(), e));
+            completionFut.onDone(new IgniteCheckedException("Failed to write data (not enough space on node): " +
+                igfsCtx.kernalContext().localNodeId(), e));
 
-                return;
-            }
+            return;
         }
 
         // No affinity key present, just concat and return.
@@ -1225,26 +1214,10 @@ public class IgfsDataManager extends IgfsManager {
         assert !blocks.isEmpty();
 
         if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax()) {
-            try {
-                try {
-                    igfs.awaitDeletesAsync().get(trashPurgeTimeout);
-                }
-                catch (IgniteFutureTimeoutCheckedException ignore) {
-                    // Ignore.
-                }
-
-                // Additional size check.
-                if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax())
-                    return new GridFinishedFuture<Object>(
-                        new IgfsOutOfSpaceException("Failed to write data block (IGFS maximum data size " +
-                            "exceeded) [used=" + dataCachePrj.igfsDataSpaceUsed() +
-                            ", allowed=" + dataCachePrj.igfsDataSpaceMax() + ']'));
-
-            }
-            catch (IgniteCheckedException e) {
-                return new GridFinishedFuture<>(new IgniteCheckedException("Failed to store data " +
-                    "block due to unexpected exception.", e));
-            }
+            return new GridFinishedFuture<Object>(
+                new IgfsOutOfSpaceException("Failed to write data block (IGFS maximum data size " +
+                    "exceeded) [used=" + dataCachePrj.igfsDataSpaceUsed() +
+                    ", allowed=" + dataCachePrj.igfsDataSpaceMax() + ']'));
         }
 
         return dataCachePrj.putAllAsync(blocks);

http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
index bae9354..310090d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
@@ -19,13 +19,10 @@ package org.apache.ignite.internal.processors.igfs;
 
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
-import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
-import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 
@@ -37,8 +34,6 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS;
-
 /**
  * IGFS worker for removal from the trash directory.
  */
@@ -49,9 +44,6 @@ public class IgfsDeleteWorker extends IgfsThread {
     /** How many files/folders to delete at once (i.e in a single transaction). */
     private static final int MAX_DELETE_BATCH = 100;
 
-    /** IGFS context. */
-    private final IgfsContext igfsCtx;
-
     /** Metadata manager. */
     private final IgfsMetaManager meta;
 
@@ -73,9 +65,6 @@ public class IgfsDeleteWorker extends IgfsThread {
     /** Cancellation flag. */
     private volatile boolean cancelled;
 
-    /** Message topic. */
-    private Object topic;
-
     /**
      * Constructor.
      *
@@ -84,15 +73,9 @@ public class IgfsDeleteWorker extends IgfsThread {
     IgfsDeleteWorker(IgfsContext igfsCtx) {
         super("igfs-delete-worker%" + igfsCtx.igfs().name() + "%" + igfsCtx.kernalContext().localNodeId() + "%");
 
-        this.igfsCtx = igfsCtx;
-
         meta = igfsCtx.meta();
         data = igfsCtx.data();
 
-        String igfsName = igfsCtx.igfs().name();
-
-        topic = F.isEmpty(igfsName) ? TOPIC_IGFS : TOPIC_IGFS.topic(igfsName);
-
         assert meta != null;
         assert data != null;
 
@@ -189,8 +172,6 @@ public class IgfsDeleteWorker extends IgfsThread {
                             if (log.isDebugEnabled())
                                 log.debug("Sending delete confirmation message [name=" + entry.getKey() +
                                     ", fileId=" + fileId + ']');
-
-                            sendDeleteMessage(new IgfsDeleteMessage(fileId));
                         }
                     }
                     else
@@ -201,8 +182,6 @@ public class IgfsDeleteWorker extends IgfsThread {
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to delete entry from the trash directory: " + entry.getKey(), e);
-
-                    sendDeleteMessage(new IgfsDeleteMessage(fileId, e));
                 }
             }
         }
@@ -346,25 +325,4 @@ public class IgfsDeleteWorker extends IgfsThread {
                 return true; // Directory entry was deleted concurrently.
         }
     }
-
-    /**
-     * Send delete message to all meta cache nodes in the grid.
-     *
-     * @param msg Message to send.
-     */
-    private void sendDeleteMessage(IgfsDeleteMessage msg) {
-        assert msg != null;
-
-        Collection<ClusterNode> nodes = meta.metaCacheNodes();
-
-        for (ClusterNode node : nodes) {
-            try {
-                igfsCtx.send(node, topic, msg, GridIoPolicy.IGFS_POOL);
-            }
-            catch (IgniteCheckedException e) {
-                U.warn(log, "Failed to send IGFS delete message to node [nodeId=" + node.id() +
-                    ", msg=" + msg + ", err=" + e.getMessage() + ']');
-            }
-        }
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
index fb67e20..4c64bc9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
@@ -23,7 +23,6 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteFileSystem;
 import org.apache.ignite.igfs.IgfsPath;
 import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
-import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
@@ -100,14 +99,6 @@ public interface IgfsEx extends IgniteFileSystem {
     public long groupBlockSize();
 
     /**
-     * Asynchronously await for all entries existing in trash to be removed.
-     *
-     * @return Future which will be completed when all entries existed in trash by the time of invocation are removed.
-     * @throws IgniteCheckedException If failed.
-     */
-    public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException;
-
-    /**
      * Gets client file system log directory.
      *
      * @return Client file system log directory or {@code null} in case no client connections have been created yet.

http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 9087ff0..262dfef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -32,10 +32,9 @@ import org.apache.ignite.compute.ComputeJobResultPolicy;
 import org.apache.ignite.compute.ComputeTaskSplitAdapter;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.FileSystemConfiguration;
-import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
 import org.apache.ignite.events.IgfsEvent;
 import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsException;
 import org.apache.ignite.igfs.IgfsFile;
 import org.apache.ignite.igfs.IgfsInvalidPathException;
 import org.apache.ignite.igfs.IgfsMetrics;
@@ -51,9 +50,7 @@ import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
 import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware;
 import org.apache.ignite.internal.processors.igfs.client.IgfsClientAffinityCallable;
 import org.apache.ignite.internal.processors.igfs.client.IgfsClientDeleteCallable;
@@ -69,8 +66,6 @@ import org.apache.ignite.internal.processors.igfs.client.IgfsClientSummaryCallab
 import org.apache.ignite.internal.processors.igfs.client.IgfsClientUpdateCallable;
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
-import org.apache.ignite.internal.util.future.GridCompoundFuture;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.F;
@@ -100,11 +95,11 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_DELETED;
@@ -114,14 +109,10 @@ import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_DELETED;
 import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_READ;
 import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_WRITE;
 import static org.apache.ignite.events.EventType.EVT_IGFS_META_UPDATED;
-import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
-import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC;
 import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
 import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
 import static org.apache.ignite.igfs.IgfsMode.PROXY;
-import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGFS;
 
 /**
  * Cache-based IGFS implementation.
@@ -130,6 +121,9 @@ public final class IgfsImpl implements IgfsEx {
     /** Default permissions for file system entry. */
     private static final String PERMISSION_DFLT_VAL = "0777";
 
+    /** Index generator for async format threads. */
+    private static final AtomicInteger FORMAT_THREAD_IDX_GEN = new AtomicInteger();
+
     /** Default directory metadata. */
     static final Map<String, String> DFLT_DIR_META = F.asMap(IgfsUtils.PROP_PERMISSION, PERMISSION_DFLT_VAL);
 
@@ -169,24 +163,12 @@ public final class IgfsImpl implements IgfsEx {
     /** Writers map. */
     private final ConcurrentHashMap8<IgfsPath, IgfsFileWorkerBatch> workerMap = new ConcurrentHashMap8<>();
 
-    /** Delete futures. */
-    private final ConcurrentHashMap8<IgniteUuid, GridFutureAdapter<Object>> delFuts = new ConcurrentHashMap8<>();
-
-    /** Delete message listener. */
-    private final GridMessageListener delMsgLsnr = new FormatMessageListener();
-
-    /** Format discovery listener. */
-    private final GridLocalEventListener delDiscoLsnr = new FormatDiscoveryListener();
-
     /** Local metrics holder. */
     private final IgfsLocalMetrics metrics = new IgfsLocalMetrics();
 
     /** Client log directory. */
     private volatile String logDir;
 
-    /** Message topic. */
-    private Object topic;
-
     /** Eviction policy (if set). */
     private IgfsPerBlockLruEvictionPolicy evictPlc;
 
@@ -292,11 +274,6 @@ public final class IgfsImpl implements IgfsEx {
             }
         }
 
-        topic = F.isEmpty(name()) ? TOPIC_IGFS : TOPIC_IGFS.topic(name());
-
-        igfsCtx.kernalContext().io().addMessageListener(topic, delMsgLsnr);
-        igfsCtx.kernalContext().event().addLocalEventListener(delDiscoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
-
         dualPool = secondaryFs != null ? new IgniteThreadPoolExecutor(4, Integer.MAX_VALUE, 5000L,
             new LinkedBlockingQueue<Runnable>(), new IgfsThreadFactory(cfg.getName()), null) : null;
     }
@@ -332,9 +309,6 @@ public final class IgfsImpl implements IgfsEx {
             }
         }
 
-        igfsCtx.kernalContext().io().removeMessageListener(topic, delMsgLsnr);
-        igfsCtx.kernalContext().event().removeLocalEventListener(delDiscoLsnr);
-
         // Restore interrupted flag.
         if (interrupted)
             Thread.currentThread().interrupt();
@@ -1381,7 +1355,25 @@ public final class IgfsImpl implements IgfsEx {
     /** {@inheritDoc} */
     @Override public void format() {
         try {
-            formatAsync().get();
+            IgniteUuid id = meta.format();
+
+            // If ID is null, then file system is already empty.
+            if (id == null)
+                return;
+
+            while (true) {
+                if (enterBusy()) {
+                    try {
+                        if (!meta.exists(id))
+                            return;
+                    }
+                    finally {
+                        busyLock.leaveBusy();
+                    }
+                }
+
+                U.sleep(10);
+            }
         }
         catch (Exception e) {
             throw IgfsUtils.toIgfsException(e);
@@ -1394,69 +1386,16 @@ public final class IgfsImpl implements IgfsEx {
      * @return Future.
      */
     IgniteInternalFuture<?> formatAsync() {
-        try {
-            IgniteUuid id = meta.format();
-
-            if (id == null)
-                return new GridFinishedFuture<Object>();
-            else {
-                GridFutureAdapter<Object> fut = new GridFutureAdapter<>();
-
-                GridFutureAdapter<Object> oldFut = delFuts.putIfAbsent(id, fut);
-
-                if (oldFut != null)
-                    return oldFut;
-                else {
-                    if (!meta.exists(id)) {
-                        // Safety in case response message was received before we put future into collection.
-                        fut.onDone();
-
-                        delFuts.remove(id, fut);
-                    }
+        GridFutureAdapter<?> fut = new GridFutureAdapter<>();
 
-                    return fut;
-                }
-            }
-        }
-        catch (IgniteCheckedException e) {
-            return new GridFinishedFuture<Object>(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException {
-        Collection<IgniteUuid> ids = meta.pendingDeletes();
+        Thread t = new Thread(new FormatRunnable(fut), "igfs-format-" + cfg.getName() + "-" +
+            FORMAT_THREAD_IDX_GEN.incrementAndGet());
 
-        if (!ids.isEmpty()) {
-            if (log.isDebugEnabled())
-                log.debug("Constructing delete future for trash entries: " + ids);
+        t.setDaemon(true);
 
-            GridCompoundFuture<Object, Object> resFut = new GridCompoundFuture<>();
+        t.start();
 
-            for (IgniteUuid id : ids) {
-                GridFutureAdapter<Object> fut = new GridFutureAdapter<>();
-
-                IgniteInternalFuture<Object> oldFut = delFuts.putIfAbsent(id, fut);
-
-                if (oldFut != null)
-                    resFut.add(oldFut);
-                else {
-                    if (meta.exists(id))
-                        resFut.add(fut);
-                    else {
-                        fut.onDone();
-
-                        delFuts.remove(id, fut);
-                    }
-                }
-            }
-
-            resFut.markInitialized();
-
-            return resFut;
-        }
-        else
-            return new GridFinishedFuture<>();
+        return fut;
     }
 
     /**
@@ -1482,24 +1421,6 @@ public final class IgfsImpl implements IgfsEx {
         return new FileDescriptor(parentId, path.name(), fileInfo.id(), fileInfo.isFile());
     }
 
-    /**
-     * Check whether IGFS with the same name exists among provided attributes.
-     *
-     * @param attrs Attributes.
-     * @return {@code True} in case IGFS with the same name exists among provided attributes
-     */
-    private boolean sameIgfs(IgfsAttributes[] attrs) {
-        if (attrs != null) {
-            String igfsName = name();
-
-            for (IgfsAttributes attr : attrs) {
-                if (F.eq(igfsName, attr.igfsName()))
-                    return true;
-            }
-        }
-        return false;
-    }
-
     /** {@inheritDoc} */
     @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
         Collection<IgfsPath> paths, @Nullable T arg) {
@@ -1905,81 +1826,6 @@ public final class IgfsImpl implements IgfsEx {
         }
     }
 
-    /**
-     * Format message listener required for format action completion.
-     */
-    private class FormatMessageListener implements GridMessageListener {
-        /** {@inheritDoc} */
-        @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-        @Override public void onMessage(UUID nodeId, Object msg) {
-            if (msg instanceof IgfsDeleteMessage) {
-                ClusterNode node = igfsCtx.kernalContext().discovery().node(nodeId);
-
-                if (node != null) {
-                    if (sameIgfs((IgfsAttributes[]) node.attribute(ATTR_IGFS))) {
-                        IgfsDeleteMessage msg0 = (IgfsDeleteMessage)msg;
-
-                        try {
-                            msg0.finishUnmarshal(igfsCtx.kernalContext().config().getMarshaller(), null);
-                        }
-                        catch (IgniteCheckedException e) {
-                            U.error(log, "Failed to unmarshal message (will ignore): " + msg0, e);
-
-                            return;
-                        }
-
-                        assert msg0.id() != null;
-
-                        GridFutureAdapter<?> fut = delFuts.remove(msg0.id());
-
-                        if (fut != null) {
-                            if (msg0.error() == null)
-                                fut.onDone();
-                            else
-                                fut.onDone(msg0.error());
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * Discovery listener required for format actions completion.
-     */
-    private class FormatDiscoveryListener implements GridLocalEventListener {
-        /** {@inheritDoc} */
-        @Override public void onEvent(Event evt) {
-            assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED;
-
-            DiscoveryEvent evt0 = (DiscoveryEvent)evt;
-
-            if (evt0.eventNode() != null) {
-                if (sameIgfs((IgfsAttributes[]) evt0.eventNode().attribute(ATTR_IGFS))) {
-                    Collection<IgniteUuid> rmv = new HashSet<>();
-
-                    for (Map.Entry<IgniteUuid, GridFutureAdapter<Object>> fut : delFuts.entrySet()) {
-                        IgniteUuid id = fut.getKey();
-
-                        try {
-                            if (!meta.exists(id)) {
-                                fut.getValue().onDone();
-
-                                rmv.add(id);
-                            }
-                        }
-                        catch (IgniteCheckedException e) {
-                            U.error(log, "Failed to check file existence: " + id, e);
-                        }
-                    }
-
-                    for (IgniteUuid id : rmv)
-                        delFuts.remove(id);
-                }
-            }
-        }
-    }
-
     /** {@inheritDoc} */
     @Override public IgniteUuid nextAffinityKey() {
         return safeOp(new Callable<IgniteUuid>() {
@@ -2079,4 +1925,39 @@ public final class IgfsImpl implements IgfsEx {
             return t;
         }
     }
+
+    /**
+     * Format runnable.
+     */
+    private class FormatRunnable implements Runnable {
+        /** Target future. */
+        private final GridFutureAdapter<?> fut;
+
+        /**
+         * Constructor.
+         *
+         * @param fut Future.
+         */
+        public FormatRunnable(GridFutureAdapter<?> fut) {
+            this.fut = fut;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            IgfsException err = null;
+
+            try {
+                format();
+            }
+            catch (Throwable err0) {
+                err = IgfsUtils.toIgfsException(err0);
+            }
+            finally {
+                if (err == null)
+                    fut.onDone();
+                else
+                    fut.onDone(err);
+            }
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index 6fa9877..cfe549f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@ -180,7 +180,7 @@ public class IgfsUtils {
      * @return Converted IGFS exception.
      */
     @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-    public static IgfsException toIgfsException(Exception err) {
+    public static IgfsException toIgfsException(Throwable err) {
         IgfsException err0 = err instanceof IgfsException ? (IgfsException)err : null;
 
         IgfsException igfsErr = X.cause(err, IgfsException.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java
index fd4ec17..4e0f12b 100644
--- a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java
@@ -239,8 +239,6 @@ public class IgfsFragmentizerSelfTest extends IgfsFragmentizerAbstractSelfTest {
 
         igfs.format();
 
-        igfs.awaitDeletesAsync().get();
-
         GridTestUtils.retryAssert(log, 50, 100, new CA() {
             @Override public void apply() {
                 for (int i = 0; i < NODE_CNT; i++) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java
index 3933e86..266945f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.igfs;
 
 import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cluster.ClusterNode;
@@ -41,27 +40,21 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.transactions.Transaction;
 import org.jsr166.ThreadLocalRandom8;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
-import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
 
 /**
  * {@link IgfsAttributes} test case.
@@ -256,41 +249,6 @@ public class IgfsSizeSelfTest extends IgfsCommonAbstractTest {
     }
 
     /**
-     * Ensure that exception is not thrown in case PARTITIONED cache is oversized, but data is deleted concurrently.
-     *
-     * @throws Exception If failed.
-     */
-    public void testPartitionedOversizeDelay() throws Exception {
-        cacheMode = PARTITIONED;
-        nearEnabled = true;
-
-        checkOversizeDelay();
-    }
-
-    /**
-     * Ensure that exception is not thrown in case co-located cache is oversized, but data is deleted concurrently.
-     *
-     * @throws Exception If failed.
-     */
-    public void testColocatedOversizeDelay() throws Exception {
-        cacheMode = PARTITIONED;
-        nearEnabled = false;
-
-        checkOversizeDelay();
-    }
-
-    /**
-     * Ensure that exception is not thrown in case REPLICATED cache is oversized, but data is deleted concurrently.
-     *
-     * @throws Exception If failed.
-     */
-    public void testReplicatedOversizeDelay() throws Exception {
-        cacheMode = REPLICATED;
-
-        checkOversizeDelay();
-    }
-
-    /**
      * Ensure that IGFS size is correctly updated in case of preloading for PARTITIONED cache.
      *
      * @throws Exception If failed.
@@ -484,97 +442,6 @@ public class IgfsSizeSelfTest extends IgfsCommonAbstractTest {
     }
 
     /**
-     * Ensure that exception is not thrown or thrown with some delay when there is something in trash directory.
-     *
-     * @throws Exception If failed.
-     */
-    private void checkOversizeDelay() throws Exception {
-        final CountDownLatch latch = new CountDownLatch(1);
-
-        igfsMaxData = 256;
-        trashPurgeTimeout = 2000;
-
-        startUp();
-
-        IgfsImpl igfs = igfs(0);
-
-        final IgfsPath path = new IgfsPath("/file");
-        final IgfsPath otherPath = new IgfsPath("/fileOther");
-
-        // Fill cache with data up to it's limit.
-        IgfsOutputStream os = igfs.create(path, false);
-        os.write(chunk((int)igfsMaxData));
-        os.close();
-
-        final IgniteCache<IgniteUuid, IgfsEntryInfo> metaCache = igfs.context().kernalContext().cache().jcache(
-            igfs.configuration().getMetaCacheName());
-
-        // Start a transaction in a separate thread which will lock file ID.
-        final IgniteUuid id = igfs.context().meta().fileId(path);
-        final IgfsEntryInfo info = igfs.context().meta().info(id);
-
-        final AtomicReference<Throwable> err = new AtomicReference<>();
-
-        try {
-            new Thread(new Runnable() {
-                @Override public void run() {
-                    try {
-
-                        try (Transaction tx = metaCache.unwrap(Ignite.class).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
-                            metaCache.get(id);
-
-                            latch.await();
-
-                            U.sleep(1000); // Sleep here so that data manager could "see" oversize.
-
-                            tx.commit();
-                        }
-                    }
-                    catch (Throwable e) {
-                        err.set(e);
-                    }
-                }
-            }).start();
-
-            // Now add file ID to trash listing so that delete worker could "see" it.
-            IgniteUuid trashId = IgfsUtils.randomTrashId();
-
-            try (Transaction tx = metaCache.unwrap(Ignite.class).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
-                Map<String, IgfsListingEntry> listing = Collections.singletonMap(path.name(),
-                    new IgfsListingEntry(info));
-
-                // Clear root listing.
-                metaCache.put(IgfsUtils.ROOT_ID, IgfsUtils.createDirectory(IgfsUtils.ROOT_ID));
-
-                // Add file to trash listing.
-                IgfsEntryInfo trashInfo = metaCache.get(trashId);
-
-                if (trashInfo == null)
-                    metaCache.put(trashId, IgfsUtils.createDirectory(trashId).listing(listing));
-                else
-                    metaCache.put(trashId, trashInfo.listing(listing));
-
-                tx.commit();
-            }
-
-            assert metaCache.get(trashId) != null;
-
-            // Now the file is locked and is located in trash, try adding some more data.
-            os = igfs.create(otherPath, false);
-            os.write(new byte[1]);
-
-            latch.countDown();
-
-            os.close();
-
-            assert err.get() == null;
-        }
-        finally {
-            latch.countDown(); // Safety.
-        }
-    }
-
-    /**
      * Ensure that IGFS size is correctly updated in case of preloading.
      *
      * @throws Exception If failed.

http://git-wip-us.apache.org/repos/asf/ignite/blob/065d2e70/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
index b38f3a2..ffa6f7d 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
@@ -41,7 +41,6 @@ import org.apache.ignite.igfs.mapreduce.IgfsTask;
 import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.IgniteClusterEx;
 import org.apache.ignite.internal.processors.cache.GridCacheUtilityKey;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
@@ -754,11 +753,6 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes
         }
 
         /** {@inheritDoc} */
-        @Override public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException {
-            return null;
-        }
-
-        /** {@inheritDoc} */
         @Nullable @Override public String clientLogDirectory() {
             return null;
         }


[11/14] ignite git commit: Merge branch 'gridgain-7.5.25' into gridgain-7.5.25-out-refactor

Posted by vo...@apache.org.
Merge branch 'gridgain-7.5.25' into gridgain-7.5.25-out-refactor


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/da1ff65a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/da1ff65a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/da1ff65a

Branch: refs/heads/ignite-3264
Commit: da1ff65afc39c7b2dab4246551d2db25c21d7baa
Parents: f6fd3b8 065d2e7
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Jun 7 09:55:11 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Jun 7 09:55:11 2016 +0300

----------------------------------------------------------------------
 .../configuration/FileSystemConfiguration.java  |   2 +-
 .../ignite/internal/binary/BinaryUtils.java     |  16 ++
 .../processors/cache/CacheObjectContext.java    |   3 +
 .../internal/processors/igfs/IgfsAsyncImpl.java |   6 -
 .../processors/igfs/IgfsDataManager.java        |  61 ++---
 .../processors/igfs/IgfsDeleteWorker.java       |  42 ----
 .../ignite/internal/processors/igfs/IgfsEx.java |   9 -
 .../internal/processors/igfs/IgfsImpl.java      | 249 +++++--------------
 .../processors/igfs/IgfsInputStreamImpl.java    |   6 +-
 ...zySecondaryFileSystemPositionedReadable.java |  77 ++++++
 .../processors/igfs/IgfsMetaManager.java        |  43 +++-
 .../internal/processors/igfs/IgfsUtils.java     |   2 +-
 .../ignite/igfs/IgfsFragmentizerSelfTest.java   |   2 -
 .../GridCacheBinaryObjectsAbstractSelfTest.java |  78 +++++-
 .../processors/igfs/IgfsAbstractSelfTest.java   |   3 +
 .../processors/igfs/IgfsModesSelfTest.java      |   1 +
 .../processors/igfs/IgfsSizeSelfTest.java       | 133 ----------
 .../unsafe/GridOffheapSnapTreeSelfTest.java     |   2 +-
 .../igfs/HadoopFIleSystemFactorySelfTest.java   |   1 +
 .../HadoopDefaultMapReducePlannerSelfTest.java  |   6 -
 .../query/h2/opt/GridH2AbstractKeyValueRow.java |  23 +-
 .../query/h2/opt/GridH2KeyValueRowOffheap.java  |  17 +-
 .../cache/IgniteCacheOffheapIndexScanTest.java  | 195 +++++++++++++++
 .../IgniteCacheQuerySelfTestSuite.java          |   2 +
 24 files changed, 522 insertions(+), 457 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/da1ff65a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------


[09/14] ignite git commit: IGNITE-3259: Delete worker is not started on client nodes any more.

Posted by vo...@apache.org.
IGNITE-3259: Delete worker is not started on client nodes any more.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0176af13
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0176af13
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0176af13

Branch: refs/heads/ignite-3264
Commit: 0176af13646a09541d65a10cf7ec0641c71e2ca7
Parents: 5254957
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 18:10:36 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jun 6 18:10:36 2016 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsMetaManager.java        | 25 ++++++++++++++------
 1 file changed, 18 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0176af13/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 1dd4c53..465116b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -207,19 +207,20 @@ public class IgfsMetaManager extends IgfsManager {
         locNode = igfsCtx.kernalContext().discovery().localNode();
 
         // Start background delete worker.
-        delWorker = new IgfsDeleteWorker(igfsCtx);
+        if (!client) {
+            delWorker = new IgfsDeleteWorker(igfsCtx);
 
-        delWorker.start();
+            delWorker.start();
+        }
     }
 
     /** {@inheritDoc} */
     @Override protected void onKernalStop0(boolean cancel) {
         IgfsDeleteWorker delWorker0 = delWorker;
 
-        if (delWorker0 != null)
+        if (delWorker0 != null) {
             delWorker0.cancel();
 
-        if (delWorker0 != null) {
             try {
                 U.join(delWorker0);
             }
@@ -1136,7 +1137,7 @@ public class IgfsMetaManager extends IgfsManager {
 
                     tx.commit();
 
-                    delWorker.signal();
+                    signalDeleteWorker();
 
                     return newInfo.id();
                 }
@@ -1212,7 +1213,7 @@ public class IgfsMetaManager extends IgfsManager {
 
                     tx.commit();
 
-                    delWorker.signal();
+                    signalDeleteWorker();
 
                     return victimId;
                 }
@@ -2476,7 +2477,7 @@ public class IgfsMetaManager extends IgfsManager {
 
                 Boolean res = synchronizeAndExecute(task, fs, false, Collections.singleton(trashId), path);
 
-                delWorker.signal();
+                signalDeleteWorker();
 
                 return res;
             }
@@ -3341,4 +3342,14 @@ public class IgfsMetaManager extends IgfsManager {
         else
             IgfsUtils.sendEvents(igfsCtx.kernalContext(), leafPath, EventType.EVT_IGFS_DIR_CREATED);
     }
+
+    /**
+     * Signal delete worker thread.
+     */
+    private void signalDeleteWorker() {
+        IgfsDeleteWorker delWorker0 = delWorker;
+
+        if (delWorker0 != null)
+            delWorker0.signal();
+    }
 }
\ No newline at end of file


[14/14] ignite git commit: WIP.

Posted by vo...@apache.org.
WIP.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/99d244a3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/99d244a3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/99d244a3

Branch: refs/heads/ignite-3264
Commit: 99d244a3009a5bcf347ce09da145a8f6cc3dc19f
Parents: cd92c9e
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Jun 7 10:55:21 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Jun 7 10:55:21 2016 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsMetaManager.java        | 341 +++++++++++--------
 1 file changed, 203 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/99d244a3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 465116b..404d837 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -1882,121 +1882,8 @@ public class IgfsMetaManager extends IgfsManager {
                 // Events to fire (can be done outside of a transaction).
                 final Deque<IgfsEvent> pendingEvts = new LinkedList<>();
 
-                SynchronizationTask<IgfsSecondaryOutputStreamDescriptor> task =
-                    new SynchronizationTask<IgfsSecondaryOutputStreamDescriptor>() {
-                        /** Output stream to the secondary file system. */
-                        private OutputStream out;
-
-                        @Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath,
-                            IgfsEntryInfo> infos) throws Exception {
-                            validTxState(true);
-
-                            assert !infos.isEmpty();
-
-                            // Determine the first existing parent.
-                            IgfsPath parentPath = null;
-
-                            for (IgfsPath curPath : infos.keySet()) {
-                                if (parentPath == null || curPath.isSubDirectoryOf(parentPath))
-                                    parentPath = curPath;
-                            }
-
-                            assert parentPath != null;
-
-                            IgfsEntryInfo parentInfo = infos.get(parentPath);
-
-                            // Delegate to the secondary file system.
-                            out = simpleCreate ? fs.create(path, overwrite) :
-                                fs.create(path, bufSize, overwrite, replication, blockSize, props);
-
-                            IgfsPath parent0 = path.parent();
-
-                            assert parent0 != null : "path.parent() is null (are we creating ROOT?): " + path;
-
-                            // If some of the parent directories were missing, synchronize again.
-                            if (!parentPath.equals(parent0)) {
-                                parentInfo = synchronize(fs, parentPath, parentInfo, parent0, true, null);
-
-                                // Fire notification about missing directories creation.
-                                if (evts.isRecordable(EventType.EVT_IGFS_DIR_CREATED)) {
-                                    IgfsPath evtPath = parent0;
-
-                                    while (!parentPath.equals(evtPath)) {
-                                        pendingEvts.addFirst(new IgfsEvent(evtPath, locNode,
-                                            EventType.EVT_IGFS_DIR_CREATED));
-
-                                        evtPath = evtPath.parent();
-
-                                        assert evtPath != null; // If this fails, then ROOT does not exist.
-                                    }
-                                }
-                            }
-
-                            // Get created file info.
-                            IgfsFile status = fs.info(path);
-
-                            if (status == null)
-                                throw fsException("Failed to open output stream to the file created in " +
-                                    "the secondary file system because it no longer exists: " + path);
-                            else if (status.isDirectory())
-                                throw fsException("Failed to open output stream to the file created in " +
-                                    "the secondary file system because the path points to a directory: " + path);
-
-                            IgfsEntryInfo newInfo = IgfsUtils.createFile(
-                                IgniteUuid.randomUuid(),
-                                status.blockSize(),
-                                status.length(),
-                                affKey,
-                                createFileLockId(false),
-                                igfsCtx.igfs().evictExclude(path, false),
-                                status.properties(),
-                                status.accessTime(),
-                                status.modificationTime()
-                            );
-
-                            // Add new file info to the listing optionally removing the previous one.
-                            assert parentInfo != null;
-
-                            IgniteUuid oldId = putIfAbsentNonTx(parentInfo.id(), path.name(), newInfo);
-
-                            if (oldId != null) {
-                                IgfsEntryInfo oldInfo = info(oldId);
-
-                                assert oldInfo != null; // Otherwise cache is in inconsistent state.
-
-                                // The contact is that we cannot overwrite a file locked for writing:
-                                if (oldInfo.lockId() != null)
-                                    throw fsException("Failed to overwrite file (file is opened for writing) [path=" +
-                                        path + ", fileId=" + oldId + ", lockId=" + oldInfo.lockId() + ']');
-
-                                id2InfoPrj.remove(oldId); // Remove the old one.
-                                id2InfoPrj.invoke(parentInfo.id(), new IgfsMetaDirectoryListingRemoveProcessor(
-                                    path.name(), parentInfo.listing().get(path.name()).fileId()));
-
-                                createNewEntry(newInfo, parentInfo.id(), path.name()); // Put new one.
-
-                                igfsCtx.data().delete(oldInfo);
-                            }
-
-                            // Record CREATE event if needed.
-                            if (oldId == null && evts.isRecordable(EventType.EVT_IGFS_FILE_CREATED))
-                                pendingEvts.add(new IgfsEvent(path, locNode, EventType.EVT_IGFS_FILE_CREATED));
-
-                            return new IgfsSecondaryOutputStreamDescriptor(newInfo, out);
-                        }
-
-                        @Override public IgfsSecondaryOutputStreamDescriptor onFailure(Exception err)
-                            throws IgniteCheckedException {
-                            U.closeQuiet(out);
-
-                            U.error(log, "File create in DUAL mode failed [path=" + path + ", simpleCreate=" +
-                                simpleCreate + ", props=" + props + ", overwrite=" + overwrite + ", bufferSize=" +
-                                bufSize + ", replication=" + replication + ", blockSize=" + blockSize + ']', err);
-
-                            throw new IgniteCheckedException("Failed to create the file due to secondary file system " +
-                                "exception: " + path, err);
-                        }
-                    };
+                CreateFileSynchronizationTask task = new CreateFileSynchronizationTask(fs, path, simpleCreate, props,
+                    overwrite, bufSize, replication, blockSize, affKey, pendingEvts);
 
                 try {
                     return synchronizeAndExecute(task, fs, false, path.parent());
@@ -2956,29 +2843,6 @@ public class IgfsMetaManager extends IgfsManager {
     }
 
     /**
-     * Synchronization task interface.
-     */
-    private static interface SynchronizationTask<T> {
-        /**
-         * Callback handler in case synchronization was successful.
-         *
-         * @param infos Map from paths to corresponding infos.
-         * @return Task result.
-         * @throws Exception If failed.
-         */
-        public T onSuccess(Map<IgfsPath, IgfsEntryInfo> infos) throws Exception;
-
-        /**
-         * Callback handler in case synchronization failed.
-         *
-         * @param err Optional exception.
-         * @return Task result.
-         * @throws IgniteCheckedException In case exception is to be thrown in that case.
-         */
-        public T onFailure(Exception err) throws IgniteCheckedException;
-    }
-
-    /**
      * Append routine.
      *
      * @param path Path.
@@ -3352,4 +3216,205 @@ public class IgfsMetaManager extends IgfsManager {
         if (delWorker0 != null)
             delWorker0.signal();
     }
+
+    /**
+     * Synchronization task interface.
+     */
+    private static interface SynchronizationTask<T> {
+        /**
+         * Callback handler in case synchronization was successful.
+         *
+         * @param infos Map from paths to corresponding infos.
+         * @return Task result.
+         * @throws Exception If failed.
+         */
+        public T onSuccess(Map<IgfsPath, IgfsEntryInfo> infos) throws Exception;
+
+        /**
+         * Callback handler in case synchronization failed.
+         *
+         * @param err Optional exception.
+         * @return Task result.
+         * @throws IgniteCheckedException In case exception is to be thrown in that case.
+         */
+        public T onFailure(Exception err) throws IgniteCheckedException;
+    }
+
+    /**
+     * Synchronization task to create a file.
+     */
+    private class CreateFileSynchronizationTask implements SynchronizationTask<IgfsSecondaryOutputStreamDescriptor> {
+        /** Secondary file system. */
+        private IgfsSecondaryFileSystem fs;
+
+        /** Path. */
+        private IgfsPath path;
+
+        /** Simple create flag. */
+        private boolean simpleCreate;
+
+        /** Properties. */
+        private Map<String, String> props;
+
+        /** Overwrite flag. */
+        private boolean overwrite;
+
+        /** Buffer size. */
+        private int bufSize;
+
+        /** Replication factor. */
+        private short replication;
+
+        /** Block size. */
+        private long blockSize;
+
+        /** Affinity key. */
+        private IgniteUuid affKey;
+
+        /** Pending events. */
+        private Deque<IgfsEvent> pendingEvts;
+
+        /** Output stream to the secondary file system. */
+        private OutputStream out;
+
+        /**
+         * Constructor.
+         *
+         * @param fs Secondary file system.
+         * @param path Path.
+         * @param simpleCreate Simple create flag.
+         * @param props Properties.
+         * @param overwrite Overwrite flag.
+         * @param bufSize Buffer size.
+         * @param replication Replication factor.
+         * @param blockSize Block size.
+         * @param affKey Affinity key.
+         * @param pendingEvts Pending events.
+         */
+        public CreateFileSynchronizationTask(IgfsSecondaryFileSystem fs, IgfsPath path, boolean simpleCreate,
+            @Nullable Map<String, String> props, boolean overwrite, int bufSize, short replication, long blockSize,
+            IgniteUuid affKey, Deque<IgfsEvent> pendingEvts) {
+            this.fs = fs;
+            this.path = path;
+            this.simpleCreate = simpleCreate;
+            this.props = props;
+            this.overwrite = overwrite;
+            this.bufSize = bufSize;
+            this.replication = replication;
+            this.blockSize = blockSize;
+            this.affKey = affKey;
+            this.pendingEvts = pendingEvts;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath, IgfsEntryInfo> infos)
+            throws Exception {
+            validTxState(true);
+
+            assert !infos.isEmpty();
+
+            // Determine the first existing parent.
+            IgfsPath parentPath = null;
+
+            for (IgfsPath curPath : infos.keySet()) {
+                if (parentPath == null || curPath.isSubDirectoryOf(parentPath))
+                    parentPath = curPath;
+            }
+
+            assert parentPath != null;
+
+            IgfsEntryInfo parentInfo = infos.get(parentPath);
+
+            // Delegate to the secondary file system.
+            out = simpleCreate ? fs.create(path, overwrite) :
+                fs.create(path, bufSize, overwrite, replication, blockSize, props);
+
+            IgfsPath parent0 = path.parent();
+
+            assert parent0 != null : "path.parent() is null (are we creating ROOT?): " + path;
+
+            // If some of the parent directories were missing, synchronize again.
+            if (!parentPath.equals(parent0)) {
+                parentInfo = synchronize(fs, parentPath, parentInfo, parent0, true, null);
+
+                // Fire notification about missing directories creation.
+                if (evts.isRecordable(EventType.EVT_IGFS_DIR_CREATED)) {
+                    IgfsPath evtPath = parent0;
+
+                    while (!parentPath.equals(evtPath)) {
+                        pendingEvts.addFirst(new IgfsEvent(evtPath, locNode,
+                            EventType.EVT_IGFS_DIR_CREATED));
+
+                        evtPath = evtPath.parent();
+
+                        assert evtPath != null; // If this fails, then ROOT does not exist.
+                    }
+                }
+            }
+
+            // Get created file info.
+            IgfsFile status = fs.info(path);
+
+            if (status == null)
+                throw fsException("Failed to open output stream to the file created in " +
+                    "the secondary file system because it no longer exists: " + path);
+            else if (status.isDirectory())
+                throw fsException("Failed to open output stream to the file created in " +
+                    "the secondary file system because the path points to a directory: " + path);
+
+            IgfsEntryInfo newInfo = IgfsUtils.createFile(
+                IgniteUuid.randomUuid(),
+                status.blockSize(),
+                status.length(),
+                affKey,
+                createFileLockId(false),
+                igfsCtx.igfs().evictExclude(path, false),
+                status.properties(),
+                status.accessTime(),
+                status.modificationTime()
+            );
+
+            // Add new file info to the listing optionally removing the previous one.
+            assert parentInfo != null;
+
+            IgniteUuid oldId = putIfAbsentNonTx(parentInfo.id(), path.name(), newInfo);
+
+            if (oldId != null) {
+                IgfsEntryInfo oldInfo = info(oldId);
+
+                assert oldInfo != null; // Otherwise cache is in inconsistent state.
+
+                // The contact is that we cannot overwrite a file locked for writing:
+                if (oldInfo.lockId() != null)
+                    throw fsException("Failed to overwrite file (file is opened for writing) [path=" +
+                        path + ", fileId=" + oldId + ", lockId=" + oldInfo.lockId() + ']');
+
+                id2InfoPrj.remove(oldId); // Remove the old one.
+                id2InfoPrj.invoke(parentInfo.id(), new IgfsMetaDirectoryListingRemoveProcessor(
+                    path.name(), parentInfo.listing().get(path.name()).fileId()));
+
+                createNewEntry(newInfo, parentInfo.id(), path.name()); // Put new one.
+
+                igfsCtx.data().delete(oldInfo);
+            }
+
+            // Record CREATE event if needed.
+            if (oldId == null && evts.isRecordable(EventType.EVT_IGFS_FILE_CREATED))
+                pendingEvts.add(new IgfsEvent(path, locNode, EventType.EVT_IGFS_FILE_CREATED));
+
+            return new IgfsSecondaryOutputStreamDescriptor(newInfo, out);
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgfsSecondaryOutputStreamDescriptor onFailure(Exception err) throws IgniteCheckedException {
+            U.closeQuiet(out);
+
+            U.error(log, "File create in DUAL mode failed [path=" + path + ", simpleCreate=" +
+                simpleCreate + ", props=" + props + ", overwrite=" + overwrite + ", bufferSize=" +
+                bufSize + ", replication=" + replication + ", blockSize=" + blockSize + ']', err);
+
+            throw new IgniteCheckedException("Failed to create the file due to secondary file system " +
+                "exception: " + path, err);
+        }
+    }
 }
\ No newline at end of file


[06/14] ignite git commit: Removed more unnecessary fields.

Posted by vo...@apache.org.
Removed more unnecessary fields.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a76b3492
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a76b3492
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a76b3492

Branch: refs/heads/ignite-3264
Commit: a76b3492b6c1312c7c3b7bac0b302dba788e4fba
Parents: 5949abe
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 12:05:28 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jun 6 12:05:28 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/processors/igfs/IgfsImpl.java     |  9 ++++-----
 .../processors/igfs/IgfsOutputStreamImpl.java         | 14 ++++----------
 2 files changed, 8 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a76b3492/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index bc2e087..5e2bca0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -1078,7 +1078,7 @@ public final class IgfsImpl implements IgfsEx {
                     batch = newBatch(path, desc.out());
 
                     IgfsOutputStreamImpl os = new IgfsOutputStreamImpl(igfsCtx, path, desc.info(),
-                        bufferSize(bufSize), mode, batch, metrics);
+                        bufferSize(bufSize), mode, batch);
 
                     IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_WRITE);
 
@@ -1107,7 +1107,7 @@ public final class IgfsImpl implements IgfsEx {
 
                 assert res != null;
 
-                return new IgfsOutputStreamImpl(igfsCtx, path, res, bufferSize(bufSize), mode, null, metrics);
+                return new IgfsOutputStreamImpl(igfsCtx, path, res, bufferSize(bufSize), mode, null);
             }
         });
     }
@@ -1142,8 +1142,7 @@ public final class IgfsImpl implements IgfsEx {
 
                     batch = newBatch(path, desc.out());
 
-                    return new IgfsOutputStreamImpl(igfsCtx, path, desc.info(), bufferSize(bufSize), mode, batch,
-                        metrics);
+                    return new IgfsOutputStreamImpl(igfsCtx, path, desc.info(), bufferSize(bufSize), mode, batch);
                 }
 
                 final List<IgniteUuid> ids = meta.idsForPath(path);
@@ -1184,7 +1183,7 @@ public final class IgfsImpl implements IgfsEx {
 
                 assert res != null;
 
-                return new IgfsOutputStreamImpl(igfsCtx, path, res, bufferSize(bufSize), mode, null, metrics);
+                return new IgfsOutputStreamImpl(igfsCtx, path, res, bufferSize(bufSize), mode, null);
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a76b3492/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index bc32e81..8c93aad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -98,9 +98,6 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
     /** Ensures that onClose)_ routine is called no more than once. */
     private final AtomicBoolean onCloseGuard = new AtomicBoolean();
 
-    /** Local IGFS metrics. */
-    private final IgfsLocalMetrics metrics;
-
     /** Affinity written by this output stream. */
     private IgfsFileAffinityRange streamRange;
 
@@ -119,10 +116,9 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
      * @param bufSize The size of the buffer to be used.
      * @param mode Grid IGFS mode.
      * @param batch Optional secondary file system batch.
-     * @param metrics Local IGFS metrics.
      */
     IgfsOutputStreamImpl(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo fileInfo, int bufSize, IgfsMode mode,
-        @Nullable IgfsFileWorkerBatch batch, IgfsLocalMetrics metrics) {
+        @Nullable IgfsFileWorkerBatch batch) {
         synchronized (mux) {
             this.path = path;
             this.bufSize = optimizeBufferSize(bufSize, fileInfo);
@@ -131,7 +127,6 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
             assert fileInfo.isFile() : "Unexpected file info: " + fileInfo;
             assert mode != null && mode != PROXY;
             assert mode == PRIMARY && batch == null || batch != null;
-            assert metrics != null;
 
             // File hasn't been locked.
             if (fileInfo.lockId() == null)
@@ -144,13 +139,12 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
             this.fileInfo = fileInfo;
             this.mode = mode;
             this.batch = batch;
-            this.metrics = metrics;
 
             streamRange = initialStreamRange(fileInfo);
 
             writeCompletionFut = igfsCtx.data().writeStart(fileInfo);
 
-            metrics.incrementFilesOpenedForWrite();
+            igfsCtx.igfs().localMetrics().incrementFilesOpenedForWrite();
         }
     }
 
@@ -259,7 +253,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
                     if (closeGuard.compareAndSet(false, true)) {
                         onClose(false);
 
-                        metrics.decrementFilesOpenedForWrite();
+                        igfsCtx.igfs().localMetrics().decrementFilesOpenedForWrite();
 
                         GridEventStorageManager evts = igfsCtx.kernalContext().event();
 
@@ -472,7 +466,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
                     err = new IOException("Failed to close stream [path=" + path + ", fileInfo=" + fileInfo + ']', e);
                 }
 
-                metrics.addWrittenBytesTime(bytes, time);
+                igfsCtx.igfs().localMetrics().addWrittenBytesTime(bytes, time);
 
                 // Await secondary file system processing to finish.
                 if (mode == DUAL_SYNC) {


[08/14] ignite git commit: WIP.

Posted by vo...@apache.org.
WIP.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f6fd3b84
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f6fd3b84
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f6fd3b84

Branch: refs/heads/ignite-3264
Commit: f6fd3b84f17cff6dd4d335ad18e2a8a322a1942f
Parents: 04e311b
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 12:59:43 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jun 6 12:59:43 2016 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsOutputStreamImpl.java   | 96 ++++++--------------
 1 file changed, 26 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f6fd3b84/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index 13808ea..16a20a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -221,8 +221,9 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
                 sendData(true);
 
             try {
-                storeDataBlocks(in, len);
-            } catch (IgniteCheckedException e) {
+                storeData(in, len);
+            }
+            catch (IgniteCheckedException e) {
                 throw new IOException(e.getMessage(), e);
             }
 
@@ -323,16 +324,23 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
     /**
      * Store data block.
      *
-     * @param block Block.
+     * @param data Block.
+     * @param writeLen Write length.
      * @throws IgniteCheckedException If failed.
      * @throws IOException If failed.
      */
-    private void storeDataBlock(ByteBuffer block) throws IgniteCheckedException, IOException {
+    private void storeData(Object data, int writeLen) throws IgniteCheckedException, IOException {
         assert Thread.holdsLock(mux);
+        assert data instanceof ByteBuffer || data instanceof DataInput;
 
-        int writeLen = block.remaining();
+        if (writeCompletionFut.isDone()) {
+            assert ((GridFutureAdapter)writeCompletionFut).isFailed();
+
+            writeCompletionFut.get();
+        }
 
-        preStoreDataBlocks(null, writeLen);
+        bytes += writeLen;
+        space += writeLen;
 
         int blockSize = fileInfo.blockSize();
 
@@ -350,80 +358,28 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
                 remainder = allocated;
             }
 
-            block.get(remainder, remainderDataLen, writeLen);
+            if (data instanceof ByteBuffer)
+                ((ByteBuffer)data).get(remainder, remainderDataLen, writeLen);
+            else
+                ((DataInput)data).readFully(remainder, remainderDataLen, writeLen);
 
             remainderDataLen += writeLen;
         }
         else {
-            remainder = igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, remainder,
-                remainderDataLen, block, false, streamRange, batch);
-
-            remainderDataLen = remainder == null ? 0 : remainder.length;
-        }
-    }
-
-    /**
-     * Store data blocks.
-     *
-     * @param in Input.
-     * @param len Length.
-     * @throws IgniteCheckedException If failed.
-     * @throws IOException If failed.
-     */
-    private void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException {
-        assert Thread.holdsLock(mux);
-
-        preStoreDataBlocks(in, len);
-
-        int blockSize = fileInfo.blockSize();
-
-        // If data length is not enough to fill full block, fill the remainder and return.
-        if (remainderDataLen + len < blockSize) {
-            if (remainder == null)
-                remainder = new byte[blockSize];
-            else if (remainder.length != blockSize) {
-                assert remainderDataLen == remainder.length;
-
-                byte[] allocated = new byte[blockSize];
-
-                U.arrayCopy(remainder, 0, allocated, 0, remainder.length);
-
-                remainder = allocated;
+            if (data instanceof ByteBuffer) {
+                remainder = igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, remainder,
+                    remainderDataLen, (ByteBuffer)data, false, streamRange, batch);
+            }
+            else {
+                remainder = igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, remainder,
+                    remainderDataLen, (DataInput)data, writeLen, false, streamRange, batch);
             }
-
-            in.readFully(remainder, remainderDataLen, len);
-
-            remainderDataLen += len;
-        }
-        else {
-            remainder = igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, remainder,
-                remainderDataLen, in, len, false, streamRange, batch);
 
             remainderDataLen = remainder == null ? 0 : remainder.length;
         }
     }
 
     /**
-     * Initializes data loader if it was not initialized yet and updates written space.
-     *
-     * @param len Data length to be written.
-     */
-    private void preStoreDataBlocks(@Nullable DataInput in, int len) throws IgniteCheckedException, IOException {
-        // Check if any exception happened while writing data.
-        if (writeCompletionFut.isDone()) {
-            assert ((GridFutureAdapter)writeCompletionFut).isFailed();
-
-            if (in != null)
-                in.skipBytes(len);
-
-            writeCompletionFut.get();
-        }
-
-        bytes += len;
-        space += len;
-    }
-
-    /**
      * Close callback. It will be called only once in synchronized section.
      *
      * @param deleted Whether we already know that the file was deleted.
@@ -542,7 +498,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
             if (flip)
                 buf.flip();
 
-            storeDataBlock(buf);
+            storeData(buf, buf.remaining());
 
             buf = null;
         }


[02/14] ignite git commit: Better encapsulated monitor.

Posted by vo...@apache.org.
Better encapsulated monitor.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d3a432c0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d3a432c0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d3a432c0

Branch: refs/heads/ignite-3264
Commit: d3a432c02e300988e39516641fb17d5b5a9af698
Parents: 3cd3373
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 11:57:10 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jun 6 11:57:10 2016 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsOutputStreamImpl.java   | 271 ++++++++++---------
 1 file changed, 144 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d3a432c0/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index 7a40ba3..7363ffe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -113,6 +113,9 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
     /** Close guard. */
     private final AtomicBoolean closeGuard = new AtomicBoolean(false);
 
+    /** Mutex for synchronization. */
+    private final Object mux = new Object();
+
     /**
      * Constructs file output stream.
      *
@@ -126,59 +129,63 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
      */
     IgfsOutputStreamImpl(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo fileInfo, int bufSize, IgfsMode mode,
         @Nullable IgfsFileWorkerBatch batch, IgfsLocalMetrics metrics) {
-        this.path = path;
-        this.bufSize = optimizeBufferSize(bufSize, fileInfo);
+        synchronized (mux) {
+            this.path = path;
+            this.bufSize = optimizeBufferSize(bufSize, fileInfo);
 
-        assert fileInfo != null;
-        assert fileInfo.isFile() : "Unexpected file info: " + fileInfo;
-        assert mode != null && mode != PROXY;
-        assert mode == PRIMARY && batch == null || batch != null;
-        assert metrics != null;
+            assert fileInfo != null;
+            assert fileInfo.isFile() : "Unexpected file info: " + fileInfo;
+            assert mode != null && mode != PROXY;
+            assert mode == PRIMARY && batch == null || batch != null;
+            assert metrics != null;
 
-        // File hasn't been locked.
-        if (fileInfo.lockId() == null)
-            throw new IgfsException("Failed to acquire file lock (concurrently modified?): " + path);
+            // File hasn't been locked.
+            if (fileInfo.lockId() == null)
+                throw new IgfsException("Failed to acquire file lock (concurrently modified?): " + path);
 
-        assert !IgfsUtils.DELETE_LOCK_ID.equals(fileInfo.lockId());
+            assert !IgfsUtils.DELETE_LOCK_ID.equals(fileInfo.lockId());
 
-        this.igfsCtx = igfsCtx;
-        meta = igfsCtx.meta();
-        data = igfsCtx.data();
+            this.igfsCtx = igfsCtx;
+            meta = igfsCtx.meta();
+            data = igfsCtx.data();
 
-        this.fileInfo = fileInfo;
-        this.mode = mode;
-        this.batch = batch;
-        this.metrics = metrics;
+            this.fileInfo = fileInfo;
+            this.mode = mode;
+            this.batch = batch;
+            this.metrics = metrics;
 
-        streamRange = initialStreamRange(fileInfo);
+            streamRange = initialStreamRange(fileInfo);
 
-        writeCompletionFut = data.writeStart(fileInfo);
+            writeCompletionFut = data.writeStart(fileInfo);
 
-        metrics.incrementFilesOpenedForWrite();
+            metrics.incrementFilesOpenedForWrite();
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized void write(int b) throws IOException {
-        checkClosed(null, 0);
+    @Override public void write(int b) throws IOException {
+        synchronized (mux) {
+            checkClosed(null, 0);
 
-        long startTime = System.nanoTime();
+            long startTime = System.nanoTime();
 
-        b &= 0xFF;
+            b &= 0xFF;
 
-        if (buf == null)
-            buf = ByteBuffer.allocate(bufSize);
+            if (buf == null)
+                buf = ByteBuffer.allocate(bufSize);
 
-        buf.put((byte)b);
+            buf.put((byte)b);
 
-        if (buf.position() >= bufSize)
-            sendData(true); // Send data to server.
+            if (buf.position() >= bufSize)
+                sendData(true); // Send data to server.
 
-        time += System.nanoTime() - startTime;
+            time += System.nanoTime() - startTime;
+        }
     }
 
     /** {@inheritDoc} */
     @SuppressWarnings("NullableProblems")
-    @Override public synchronized void write(byte[] b, int off, int len) throws IOException {
+    @Override public void write(byte[] b, int off, int len) throws IOException {
         A.notNull(b, "b");
 
         if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
@@ -186,89 +193,94 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
                 ", length=" + len + ']');
         }
 
-        checkClosed(null, 0);
+        synchronized (mux) {
+            checkClosed(null, 0);
 
-        if (len == 0)
-            return; // Done.
+            if (len == 0)
+                return; // Done.
 
-        long startTime = System.nanoTime();
+            long startTime = System.nanoTime();
 
-        if (buf == null) {
-            // Do not allocate and copy byte buffer if will send data immediately.
-            if (len >= bufSize) {
-                buf = ByteBuffer.wrap(b, off, len);
+            if (buf == null) {
+                // Do not allocate and copy byte buffer if will send data immediately.
+                if (len >= bufSize) {
+                    buf = ByteBuffer.wrap(b, off, len);
 
-                sendData(false);
+                    sendData(false);
 
-                return;
-            }
+                    return;
+                }
 
-            buf = ByteBuffer.allocate(Math.max(bufSize, len));
-        }
+                buf = ByteBuffer.allocate(Math.max(bufSize, len));
+            }
 
-        if (buf.remaining() < len)
-            // Expand buffer capacity, if remaining size is less then data size.
-            buf = ByteBuffer.allocate(buf.position() + len).put((ByteBuffer)buf.flip());
+            if (buf.remaining() < len)
+                // Expand buffer capacity, if remaining size is less then data size.
+                buf = ByteBuffer.allocate(buf.position() + len).put((ByteBuffer)buf.flip());
 
-        assert len <= buf.remaining() : "Expects write data size less or equal then remaining buffer capacity " +
-            "[len=" + len + ", buf.remaining=" + buf.remaining() + ']';
+            assert len <= buf.remaining() : "Expects write data size less or equal then remaining buffer capacity " +
+                "[len=" + len + ", buf.remaining=" + buf.remaining() + ']';
 
-        buf.put(b, off, len);
+            buf.put(b, off, len);
 
-        if (buf.position() >= bufSize)
-            sendData(true); // Send data to server.
+            if (buf.position() >= bufSize)
+                sendData(true); // Send data to server.
 
-        time += System.nanoTime() - startTime;
+            time += System.nanoTime() - startTime;
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized void transferFrom(DataInput in, int len) throws IOException {
-        checkClosed(in, len);
+    @Override public void transferFrom(DataInput in, int len) throws IOException {
+        synchronized (mux) {
+            checkClosed(in, len);
 
-        long startTime = System.nanoTime();
+            long startTime = System.nanoTime();
 
-        // Send all IPC data from the local buffer before streaming.
-        if (buf != null && buf.position() > 0)
-            sendData(true);
+            // Send all IPC data from the local buffer before streaming.
+            if (buf != null && buf.position() > 0)
+                sendData(true);
 
-        try {
-            storeDataBlocks(in, len);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IOException(e.getMessage(), e);
-        }
+            try {
+                storeDataBlocks(in, len);
+            } catch (IgniteCheckedException e) {
+                throw new IOException(e.getMessage(), e);
+            }
 
-        time += System.nanoTime() - startTime;
+            time += System.nanoTime() - startTime;
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public final synchronized void close() throws IOException {
-        // Do nothing if stream is already closed.
-        if (closed)
-            return;
+    @Override public final void close() throws IOException {
+        synchronized (mux) {
+            // Do nothing if stream is already closed.
+            if (closed)
+                return;
 
-        try {
-            // Send all IPC data from the local buffer.
             try {
-                flush();
-            }
-            finally {
-                if (closeGuard.compareAndSet(false, true)) {
-                    onClose(false);
+                // Send all IPC data from the local buffer.
+                try {
+                    flush();
+                }
+                finally {
+                    if (closeGuard.compareAndSet(false, true)) {
+                        onClose(false);
 
-                    metrics.decrementFilesOpenedForWrite();
+                        metrics.decrementFilesOpenedForWrite();
 
-                    GridEventStorageManager evts = igfsCtx.kernalContext().event();
+                        GridEventStorageManager evts = igfsCtx.kernalContext().event();
 
-                    if (evts.isRecordable(EVT_IGFS_FILE_CLOSED_WRITE))
-                        evts.record(new IgfsEvent(path, igfsCtx.kernalContext().discovery().localNode(),
-                            EVT_IGFS_FILE_CLOSED_WRITE, bytes));
+                        if (evts.isRecordable(EVT_IGFS_FILE_CLOSED_WRITE))
+                            evts.record(new IgfsEvent(path, igfsCtx.kernalContext().discovery().localNode(),
+                                EVT_IGFS_FILE_CLOSED_WRITE, bytes));
+                    }
                 }
             }
-        }
-        finally {
-            // Mark this stream closed AFTER flush.
-            closed = true;
+            finally {
+                // Mark this stream closed AFTER flush.
+                closed = true;
+            }
         }
     }
 
@@ -277,55 +289,56 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
      *
      * @exception IOException  if an I/O error occurs.
      */
-    @Override public synchronized void flush() throws IOException {
-        boolean exists;
+    @Override public void flush() throws IOException {
+        synchronized (mux) {
 
-        try {
-            exists = meta.exists(fileInfo.id());
-        }
-        catch (IgniteCheckedException e) {
-            throw new IOException("File to read file metadata: " + path, e);
-        }
+            boolean exists;
 
-        if (!exists) {
-            onClose(true);
+            try {
+                exists = meta.exists(fileInfo.id());
+            } catch (IgniteCheckedException e) {
+                throw new IOException("File to read file metadata: " + path, e);
+            }
 
-            throw new IOException("File was concurrently deleted: " + path);
-        }
+            if (!exists) {
+                onClose(true);
 
-        checkClosed(null, 0);
+                throw new IOException("File was concurrently deleted: " + path);
+            }
 
-        // Send all IPC data from the local buffer.
-        if (buf != null && buf.position() > 0)
-            sendData(true);
+            checkClosed(null, 0);
 
-        try {
-            if (remainder != null) {
-                data.storeDataBlocks(fileInfo, fileInfo.length() + space, null, 0,
-                    ByteBuffer.wrap(remainder, 0, remainderDataLen), true, streamRange, batch);
+            // Send all IPC data from the local buffer.
+            if (buf != null && buf.position() > 0)
+                sendData(true);
 
-                remainder = null;
-                remainderDataLen = 0;
-            }
+            try {
+                if (remainder != null) {
+                    data.storeDataBlocks(fileInfo, fileInfo.length() + space, null, 0,
+                        ByteBuffer.wrap(remainder, 0, remainderDataLen), true, streamRange, batch);
 
-            if (space > 0) {
-                data.awaitAllAcksReceived(fileInfo.id());
+                    remainder = null;
+                    remainderDataLen = 0;
+                }
 
-                IgfsEntryInfo fileInfo0 = meta.reserveSpace(path, fileInfo.id(), space, streamRange);
+                if (space > 0) {
+                    data.awaitAllAcksReceived(fileInfo.id());
 
-                if (fileInfo0 == null)
-                    throw new IOException("File was concurrently deleted: " + path);
-                else
-                    fileInfo = fileInfo0;
+                    IgfsEntryInfo fileInfo0 = meta.reserveSpace(path, fileInfo.id(), space, streamRange);
 
-                streamRange = initialStreamRange(fileInfo);
+                    if (fileInfo0 == null)
+                        throw new IOException("File was concurrently deleted: " + path);
+                    else
+                        fileInfo = fileInfo0;
 
-                space = 0;
+                    streamRange = initialStreamRange(fileInfo);
+
+                    space = 0;
+                }
+            } catch (IgniteCheckedException e) {
+                throw new IOException("Failed to flush data [path=" + path + ", space=" + space + ']', e);
             }
         }
-        catch (IgniteCheckedException e) {
-            throw new IOException("Failed to flush data [path=" + path + ", space=" + space + ']', e);
-        }
     }
 
     /**
@@ -355,7 +368,9 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
      * @throws IgniteCheckedException If failed.
      * @throws IOException If failed.
      */
-    protected synchronized void storeDataBlock(ByteBuffer block) throws IgniteCheckedException, IOException {
+    protected void storeDataBlock(ByteBuffer block) throws IgniteCheckedException, IOException {
+        assert Thread.holdsLock(mux);
+
         int writeLen = block.remaining();
 
         preStoreDataBlocks(null, writeLen);
@@ -396,7 +411,9 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
      * @throws IgniteCheckedException If failed.
      * @throws IOException If failed.
      */
-    protected synchronized void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException {
+    protected void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException {
+        assert Thread.holdsLock(mux);
+
         preStoreDataBlocks(in, len);
 
         int blockSize = fileInfo.blockSize();
@@ -434,7 +451,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
      * @throws IOException If failed.
      */
     private void onClose(boolean deleted) throws IOException {
-        assert Thread.holdsLock(this);
+        assert Thread.holdsLock(mux);
 
         if (onCloseGuard.compareAndSet(false, true)) {
             // Notify backing secondary file system batch to finish.
@@ -516,7 +533,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
      * @throws IOException If this stream is closed.
      */
     protected void checkClosed(@Nullable DataInput in, int len) throws IOException {
-        assert Thread.holdsLock(this);
+        assert Thread.holdsLock(mux);
 
         if (closed) {
             // Must read data from stream before throwing exception.
@@ -535,7 +552,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
      * @throws IOException In case of IO exception.
      */
     protected void sendData(boolean flip) throws IOException {
-        assert Thread.holdsLock(this);
+        assert Thread.holdsLock(mux);
 
         try {
             if (flip)


[13/14] ignite git commit: Re-arranged fields.

Posted by vo...@apache.org.
Re-arranged fields.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cd92c9ec
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cd92c9ec
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cd92c9ec

Branch: refs/heads/ignite-3264
Commit: cd92c9ecaf2b2c9e1e7b9e2e7ed7c5aadf9be3d5
Parents: 93f8eca
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Jun 7 10:01:21 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Jun 7 10:01:21 2016 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsOutputStreamImpl.java   | 42 ++++++++++----------
 1 file changed, 21 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/cd92c9ec/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index f51e9b5..b90e34d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -50,12 +50,33 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
     /** Maximum number of blocks in buffer. */
     private static final int MAX_BLOCKS_CNT = 16;
 
+    /** IGFS context. */
+    private final IgfsContext igfsCtx;
+
     /** Path to file. */
     private final IgfsPath path;
 
     /** Buffer size. */
     private final int bufSize;
 
+    /** IGFS mode. */
+    private final IgfsMode mode;
+
+    /** File worker batch. */
+    private final IgfsFileWorkerBatch batch;
+
+    /** Write completion future. */
+    private final IgniteInternalFuture<Boolean> writeCompletionFut;
+
+    /** Ensures that onClose)_ routine is called no more than once. */
+    private final AtomicBoolean onCloseGuard = new AtomicBoolean();
+
+    /** Close guard. */
+    private final AtomicBoolean closeGuard = new AtomicBoolean(false);
+
+    /** Mutex for synchronization. */
+    private final Object mux = new Object();
+
     /** Flag for this stream open/closed state. */
     private boolean closed;
 
@@ -69,9 +90,6 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
     /** Time consumed by write operations. */
     private long time;
 
-    /** IGFS context. */
-    private IgfsContext igfsCtx;
-
     /** File descriptor. */
     @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
     private IgfsEntryInfo fileInfo;
@@ -86,27 +104,9 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
     /** Data length in remainder. */
     private int remainderDataLen;
 
-    /** Write completion future. */
-    private final IgniteInternalFuture<Boolean> writeCompletionFut;
-
-    /** IGFS mode. */
-    private final IgfsMode mode;
-
-    /** File worker batch. */
-    private final IgfsFileWorkerBatch batch;
-
-    /** Ensures that onClose)_ routine is called no more than once. */
-    private final AtomicBoolean onCloseGuard = new AtomicBoolean();
-
     /** Affinity written by this output stream. */
     private IgfsFileAffinityRange streamRange;
 
-    /** Close guard. */
-    private final AtomicBoolean closeGuard = new AtomicBoolean(false);
-
-    /** Mutex for synchronization. */
-    private final Object mux = new Object();
-
     /**
      * Constructs file output stream.
      *


[03/14] ignite git commit: Got rid of warns.

Posted by vo...@apache.org.
Got rid of warns.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/75b60800
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/75b60800
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/75b60800

Branch: refs/heads/ignite-3264
Commit: 75b608003baa74f76a5c2537b5f1cf01bab0ed38
Parents: d3a432c
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 11:57:31 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Jun 6 11:57:31 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/processors/igfs/IgfsOutputStreamImpl.java | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/75b60800/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index 7363ffe..c50c431 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -485,6 +485,8 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
                 // Await secondary file system processing to finish.
                 if (mode == DUAL_SYNC) {
                     try {
+                        assert batch != null;
+
                         batch.await();
                     }
                     catch (IgniteCheckedException e) {
@@ -513,8 +515,11 @@ class IgfsOutputStreamImpl extends IgfsOutputStream {
             }
             else {
                 try {
-                    if (mode == DUAL_SYNC)
+                    if (mode == DUAL_SYNC) {
+                        assert batch != null;
+
                         batch.await();
+                    }
                 }
                 catch (IgniteCheckedException e) {
                     throw new IOException("Failed to close secondary file system stream [path=" + path +