You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/03/14 11:18:05 UTC

ignite git commit: IGNITE-2814: IGFS: File lock/unlock/reserve operations are no longer require put/replace on cache. Thin entry processors are used instead.

Repository: ignite
Updated Branches:
  refs/heads/master d83fa1166 -> b1d9e8b6c


IGNITE-2814: IGFS: File lock/unlock/reserve operations are no longer require put/replace on cache. Thin entry processors are used instead.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b1d9e8b6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b1d9e8b6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b1d9e8b6

Branch: refs/heads/master
Commit: b1d9e8b6c7553133c4c4ca3820f6f30d202b7ea2
Parents: d83fa11
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Mar 14 13:17:58 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Mar 14 13:17:58 2016 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsMetaManager.java        | 270 ++++++++++++++++---
 .../processors/igfs/IgfsOutputStreamImpl.java   |  76 +-----
 2 files changed, 228 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b1d9e8b6/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 89ddd02..8bb9e92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -466,10 +466,11 @@ public class IgfsMetaManager extends IgfsManager {
      * Lock the file explicitly outside of transaction.
      *
      * @param fileId File ID to lock.
+     * @param delete If file is being locked for delete.
      * @return Locked file info or {@code null} if file cannot be locked or doesn't exist.
      * @throws IgniteCheckedException If the file with such id does not exist, or on another failure.
      */
-    public @Nullable IgfsFileInfo lock(IgniteUuid fileId, boolean isDeleteLock) throws IgniteCheckedException {
+    public @Nullable IgfsFileInfo lock(IgniteUuid fileId, boolean delete) throws IgniteCheckedException {
         if (busyLock.enterBusy()) {
             try {
                 assert validTxState(false);
@@ -487,13 +488,11 @@ public class IgfsMetaManager extends IgfsManager {
                     if (oldInfo.lockId() != null)
                         return null; // The file is already locked, we cannot lock it.
 
-                    IgfsFileInfo newInfo = lockInfo(oldInfo, isDeleteLock);
+                    IgniteUuid lockId = createFileLockId(delete);
 
-                    boolean put = id2InfoPrj.replace(fileId, oldInfo, newInfo);
+                    id2InfoPrj.invoke(fileId, new FileLockProcessor(lockId));
 
-                    assert put : "Value was not stored in cache [fileId=" + fileId + ", newInfo=" + newInfo + ']';
-
-                    assert newInfo.id().equals(oldInfo.id()); // Same id.
+                    IgfsFileInfo newInfo = id2InfoPrj.get(fileId);
 
                     tx.commit();
 
@@ -515,30 +514,13 @@ public class IgfsMetaManager extends IgfsManager {
     }
 
     /**
-     * Set lock on file info.
+     * Create file lock ID.
      *
-     * @param info File info.
-     * @return New file info with lock set, or null if the info passed in is already locked.
-     * @throws IgniteCheckedException In case lock is already set on that file.
+     * @param delete If lock ID is required for file deletion.
+     * @return Lock ID.
      */
-    private @Nullable IgfsFileInfo lockInfo(IgfsFileInfo info, boolean isDeleteLock) {
-         assert info != null;
-
-         if (info.lockId() != null)
-             return null; // Null return value indicates that the file is already locked.
-
-         return new IgfsFileInfo(info, composeLockId(isDeleteLock), info.modificationTime());
-    }
-
-    /**
-     * Gets a new lock id.
-     * The returned Id #globalId() method will return the Id of the node which locked the file.
-     *
-     * @param isDeleteLock if this is special delete lock.
-     * @return The new lock id.
-     */
-    private IgniteUuid composeLockId(boolean isDeleteLock) {
-        if (isDeleteLock)
+    private IgniteUuid createFileLockId(boolean delete) {
+        if (delete)
             return IgfsUtils.DELETE_LOCK_ID;
 
         return IgniteUuid.fromUuid(locNode.id());
@@ -584,12 +566,7 @@ public class IgfsMetaManager extends IgfsManager {
                                     "[fileId=" + fileId + ", lockId=" + info.lockId() + ", actualLockId=" +
                                     oldInfo.lockId() + ']');
 
-                            IgfsFileInfo newInfo = new IgfsFileInfo(oldInfo, null, modificationTime);
-
-                            boolean put = id2InfoPrj.put(fileId, newInfo);
-
-                            assert put : "Value was not stored in cache [fileId=" + fileId + ", newInfo=" + newInfo
-                                    + ']';
+                            id2InfoPrj.invoke(fileId, new FileUnlockProcessor(modificationTime));
 
                             return null;
                         }
@@ -1680,6 +1657,57 @@ public class IgfsMetaManager extends IgfsManager {
     }
 
     /**
+     * Reserve space for file.
+     *
+     * @param path File path.
+     * @param fileId File ID.
+     * @param space Space.
+     * @param affRange Affinity range.
+     * @return New file info.
+     */
+    public IgfsFileInfo reserveSpace(IgfsPath path, IgniteUuid fileId, long space, IgfsFileAffinityRange affRange)
+        throws IgniteCheckedException {
+        assert validTxState(false);
+
+        if (busyLock.enterBusy()) {
+            try {
+                if (log.isDebugEnabled())
+                    log.debug("Reserve file space [path=" + path + ", id=" + fileId + ']');
+
+                IgniteInternalTx tx = startTx();
+
+                try {
+                    // Lock file ID for this transaction.
+                    IgfsFileInfo oldInfo = info(fileId);
+
+                    if (oldInfo == null)
+                        throw fsException("File has been deleted concurrently [path=" + path + ", id=" + fileId + ']');
+
+                    id2InfoPrj.invoke(fileId, new FileReserveSpaceProcessor(space, affRange));
+
+                    IgfsFileInfo newInfo = id2InfoPrj.get(fileId);
+
+                    tx.commit();
+
+                    return newInfo;
+                }
+                catch (GridClosureException e) {
+                    throw U.cast(e);
+                }
+                finally {
+                    tx.close();
+                }
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }
+        else
+            throw new IllegalStateException("Failed to reseve file space because Grid is stopping [path=" + path +
+                ", id=" + fileId + ']');
+    }
+
+    /**
      * Update file info in cache.
      *
      * @param fileId File ID to update information for.
@@ -1992,7 +2020,7 @@ public class IgfsMetaManager extends IgfsManager {
                                     "the secondary file system because the path points to a directory: " + path);
 
                             IgfsFileInfo newInfo = new IgfsFileInfo(status.blockSize(), status.length(), affKey,
-                                composeLockId(false), igfsCtx.igfs().evictExclude(path, false), status.properties(),
+                                createFileLockId(false), igfsCtx.igfs().evictExclude(path, false), status.properties(),
                                 status.accessTime(), status.modificationTime());
 
                             // Add new file info to the listing optionally removing the previous one.
@@ -2113,13 +2141,11 @@ public class IgfsMetaManager extends IgfsManager {
                             }
 
                             // Set lock and return.
-                            IgfsFileInfo lockedInfo = lockInfo(info, false);
-
-                            assert lockedInfo != null; // We checked the lock above.
+                            IgniteUuid lockId = createFileLockId(false);
 
-                            boolean put = id2InfoPrj.put(info.id(), lockedInfo);
+                            id2InfoPrj.invoke(info.id(), new FileLockProcessor(lockId));
 
-                            assert put;
+                            IgfsFileInfo lockedInfo = id2InfoPrj.get(info.id());
 
                             return new IgfsSecondaryOutputStreamDescriptor(infos.get(path.parent()).id(),
                                 lockedInfo, out);
@@ -3455,7 +3481,7 @@ public class IgfsMetaManager extends IgfsManager {
                         @Override protected IgfsFileInfo buildLeaf() {
                             long t = System.currentTimeMillis();
 
-                            return new IgfsFileInfo(blockSize, 0L, affKey, composeLockId(false),
+                            return new IgfsFileInfo(blockSize, 0L, affKey, createFileLockId(false),
                                  evictExclude, leafProps, t, t);
                         }
                     };
@@ -3505,7 +3531,7 @@ public class IgfsMetaManager extends IgfsManager {
                                                 + "[fileName=" + name + ", fileId=" + lowermostExistingInfo.id()
                                                 + ", lockId=" + lockId + ']');
 
-                                        IgniteUuid newLockId = composeLockId(false);
+                                        IgniteUuid newLockId = createFileLockId(false);
 
                                         EntryProcessorResult<IgfsFileInfo> result
                                             = id2InfoPrj.invoke(lowermostExistingInfo.id(),
@@ -3553,7 +3579,7 @@ public class IgfsMetaManager extends IgfsManager {
                                         long t = System.currentTimeMillis();
 
                                         final IgfsFileInfo newFileInfo = new IgfsFileInfo(cfg.getBlockSize(), 0L,
-                                            affKey, composeLockId(false), evictExclude, fileProps, t, t);
+                                            affKey, createFileLockId(false), evictExclude, fileProps, t, t);
 
                                         assert newFileInfo.lockId() != null; // locked info should be created.
 
@@ -3874,4 +3900,162 @@ public class IgfsMetaManager extends IgfsManager {
             return S.toString(LockFileProcessor.class, this);
         }
     }
+
+    /**
+     * File lock entry processor.
+     */
+    private static class FileLockProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>, Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Lock Id. */
+        private IgniteUuid lockId;
+
+        /**
+         * Default constructor.
+         */
+        public FileLockProcessor() {
+            // No-op.
+        }
+
+        /**
+         * Constructor.
+         *
+         * @param lockId Lock ID.
+         */
+        public FileLockProcessor(IgniteUuid lockId) {
+            this.lockId = lockId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void process(MutableEntry<IgniteUuid, IgfsFileInfo> entry, Object... args)
+            throws EntryProcessorException {
+            IgfsFileInfo old = entry.getValue();
+
+            entry.setValue(new IgfsFileInfo(old, lockId, old.modificationTime()));
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            U.writeGridUuid(out, lockId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            lockId = U.readGridUuid(in);
+        }
+    }
+
+    /**
+     * File unlock entry processor.
+     */
+    private static class FileUnlockProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>,
+        Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Modification time. */
+        private long modificationTime;
+
+        /**
+         * Default constructor.
+         */
+        public FileUnlockProcessor() {
+            // No-op.
+        }
+
+        /**
+         * Constructor.
+         *
+         * @param modificationTime Modification time.
+         */
+        public FileUnlockProcessor(long modificationTime) {
+            this.modificationTime = modificationTime;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void process(MutableEntry<IgniteUuid, IgfsFileInfo> entry, Object... args)
+            throws EntryProcessorException {
+            IgfsFileInfo old = entry.getValue();
+
+            entry.setValue(new IgfsFileInfo(old, null, modificationTime));
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeLong(modificationTime);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            modificationTime = in.readLong();
+        }
+    }
+
+    /**
+     * File reserve space entry processor.
+     */
+    private static class FileReserveSpaceProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>,
+        Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Space. */
+        private long space;
+
+        /** Affinity range. */
+        private IgfsFileAffinityRange affRange;
+
+        /**
+         * Default constructor.
+         */
+        public FileReserveSpaceProcessor() {
+            // No-op.
+        }
+
+        /**
+         * Constructor.
+         *
+         * @param space Space.
+         * @param affRange
+         */
+        public FileReserveSpaceProcessor(long space, IgfsFileAffinityRange affRange) {
+            this.space = space;
+            this.affRange = affRange;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void process(MutableEntry<IgniteUuid, IgfsFileInfo> entry, Object... args)
+            throws EntryProcessorException {
+            IgfsFileInfo oldInfo = entry.getValue();
+
+            IgfsFileMap newMap = new IgfsFileMap(oldInfo.fileMap());
+
+            newMap.addRange(affRange);
+
+            IgfsFileInfo newInfo = new IgfsFileInfo(oldInfo, oldInfo.length() + space);
+
+            newInfo.fileMap(newMap);
+
+            entry.setValue(newInfo);
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeLong(space);
+            out.writeObject(affRange);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            space = in.readLong();
+            affRange = (IgfsFileAffinityRange)in.readObject();
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1d9e8b6/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index 83056af..8c11073 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -18,10 +18,7 @@
 package org.apache.ignite.internal.processors.igfs;
 
 import java.io.DataInput;
-import java.io.Externalizable;
 import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.IgniteCheckedException;
@@ -30,11 +27,9 @@ import org.apache.ignite.igfs.IgfsMode;
 import org.apache.ignite.igfs.IgfsPath;
 import org.apache.ignite.igfs.IgfsPathNotFoundException;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
@@ -297,8 +292,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
             if (space > 0) {
                 data.awaitAllAcksReceived(fileInfo.id());
 
-                IgfsFileInfo fileInfo0 = meta.updateInfo(fileInfo.id(),
-                    new ReserveSpaceClosure(space, streamRange));
+                IgfsFileInfo fileInfo0 = meta.reserveSpace(path, fileInfo.id(), space, streamRange);
 
                 if (fileInfo0 == null)
                     throw new IOException("File was concurrently deleted: " + path);
@@ -446,72 +440,4 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
     @Override public String toString() {
         return S.toString(IgfsOutputStreamImpl.class, this);
     }
-
-    /**
-     * Helper closure to reserve specified space and update file's length
-     */
-    @GridInternal
-    private static final class ReserveSpaceClosure implements IgniteClosure<IgfsFileInfo, IgfsFileInfo>,
-        Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Space amount (bytes number) to increase file's length. */
-        private long space;
-
-        /** Affinity range for this particular update. */
-        private IgfsFileAffinityRange range;
-
-        /**
-         * Empty constructor required for {@link Externalizable}.
-         *
-         */
-        public ReserveSpaceClosure() {
-            // No-op.
-        }
-
-        /**
-         * Constructs the closure to reserve specified space and update file's length.
-         *
-         * @param space Space amount (bytes number) to increase file's length.
-         * @param range Affinity range specifying which part of file was colocated.
-         */
-        private ReserveSpaceClosure(long space, IgfsFileAffinityRange range) {
-            this.space = space;
-            this.range = range;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgfsFileInfo apply(IgfsFileInfo oldInfo) {
-            IgfsFileMap oldMap = oldInfo.fileMap();
-
-            IgfsFileMap newMap = new IgfsFileMap(oldMap);
-
-            newMap.addRange(range);
-
-            // Update file length.
-            IgfsFileInfo updated = new IgfsFileInfo(oldInfo, oldInfo.length() + space);
-
-            updated.fileMap(newMap);
-
-            return updated;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeLong(space);
-            out.writeObject(range);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            space = in.readLong();
-            range = (IgfsFileAffinityRange)in.readObject();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(ReserveSpaceClosure.class, this);
-        }
-    }
 }
\ No newline at end of file