You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/03/14 10:39:26 UTC

[3/3] ignite git commit: IGNITE-2814: File length update.

IGNITE-2814: File length update.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/660a51f7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/660a51f7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/660a51f7

Branch: refs/heads/ignite-2814
Commit: 660a51f72655c6efeeb3c1da24e1e730835c521d
Parents: 5ea6d67
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Mar 14 12:39:11 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Mar 14 12:39:11 2016 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsMetaManager.java        | 297 +++++++++++++------
 .../processors/igfs/IgfsOutputStreamImpl.java   |  76 +----
 2 files changed, 210 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/660a51f7/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 50d553f..8bb9e92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -588,94 +588,6 @@ public class IgfsMetaManager extends IgfsManager {
     }
 
     /**
-     * File lock entry processor.
-     */
-    private static class FileLockProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>, Externalizable {
-        /** Lock Id. */
-        private IgniteUuid lockId;
-
-        /**
-         * Default constructor.
-         */
-        public FileLockProcessor() {
-            // No-op.
-        }
-
-        /**
-         * Constructor.
-         *
-         * @param lockId Lock ID.
-         */
-        public FileLockProcessor(IgniteUuid lockId) {
-            this.lockId = lockId;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Void process(MutableEntry<IgniteUuid, IgfsFileInfo> entry, Object... args)
-            throws EntryProcessorException {
-            IgfsFileInfo old = entry.getValue();
-
-            entry.setValue(new IgfsFileInfo(old, lockId, old.modificationTime()));
-
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            U.writeGridUuid(out, lockId);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            lockId = U.readGridUuid(in);
-        }
-    }
-
-    /**
-     * File unlock entry processor.
-     */
-    private static class FileUnlockProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>, Externalizable {
-        /** Modification time. */
-        private long modificationTime;
-
-        /**
-         * Default constructor.
-         */
-        public FileUnlockProcessor() {
-            // No-op.
-        }
-
-        /**
-         * Constructor.
-         *
-         * @param modificationTime Modification time.
-         */
-        public FileUnlockProcessor(long modificationTime) {
-            this.modificationTime = modificationTime;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Void process(MutableEntry<IgniteUuid, IgfsFileInfo> entry, Object... args)
-            throws EntryProcessorException {
-            IgfsFileInfo old = entry.getValue();
-
-            entry.setValue(new IgfsFileInfo(old, null, modificationTime));
-
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeLong(modificationTime);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            modificationTime = in.readLong();
-        }
-    }
-
-    /**
      * Lock file IDs participating in the transaction.<br/>
      *
      * @param fileIds file IDs to lock.
@@ -1745,6 +1657,57 @@ public class IgfsMetaManager extends IgfsManager {
     }
 
     /**
+     * Reserve space for file.
+     *
+     * @param path File path.
+     * @param fileId File ID.
+     * @param space Space.
+     * @param affRange Affinity range.
+     * @return New file info.
+     */
+    public IgfsFileInfo reserveSpace(IgfsPath path, IgniteUuid fileId, long space, IgfsFileAffinityRange affRange)
+        throws IgniteCheckedException {
+        assert validTxState(false);
+
+        if (busyLock.enterBusy()) {
+            try {
+                if (log.isDebugEnabled())
+                    log.debug("Reserve file space [path=" + path + ", id=" + fileId + ']');
+
+                IgniteInternalTx tx = startTx();
+
+                try {
+                    // Lock file ID for this transaction.
+                    IgfsFileInfo oldInfo = info(fileId);
+
+                    if (oldInfo == null)
+                        throw fsException("File has been deleted concurrently [path=" + path + ", id=" + fileId + ']');
+
+                    id2InfoPrj.invoke(fileId, new FileReserveSpaceProcessor(space, affRange));
+
+                    IgfsFileInfo newInfo = id2InfoPrj.get(fileId);
+
+                    tx.commit();
+
+                    return newInfo;
+                }
+                catch (GridClosureException e) {
+                    throw U.cast(e);
+                }
+                finally {
+                    tx.close();
+                }
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }
+        else
+            throw new IllegalStateException("Failed to reseve file space because Grid is stopping [path=" + path +
+                ", id=" + fileId + ']');
+    }
+
+    /**
      * Update file info in cache.
      *
      * @param fileId File ID to update information for.
@@ -3937,4 +3900,162 @@ public class IgfsMetaManager extends IgfsManager {
             return S.toString(LockFileProcessor.class, this);
         }
     }
+
+    /**
+     * File lock entry processor.
+     */
+    private static class FileLockProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>, Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Lock Id. */
+        private IgniteUuid lockId;
+
+        /**
+         * Default constructor.
+         */
+        public FileLockProcessor() {
+            // No-op.
+        }
+
+        /**
+         * Constructor.
+         *
+         * @param lockId Lock ID.
+         */
+        public FileLockProcessor(IgniteUuid lockId) {
+            this.lockId = lockId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void process(MutableEntry<IgniteUuid, IgfsFileInfo> entry, Object... args)
+            throws EntryProcessorException {
+            IgfsFileInfo old = entry.getValue();
+
+            entry.setValue(new IgfsFileInfo(old, lockId, old.modificationTime()));
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            U.writeGridUuid(out, lockId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            lockId = U.readGridUuid(in);
+        }
+    }
+
+    /**
+     * File unlock entry processor.
+     */
+    private static class FileUnlockProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>,
+        Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Modification time. */
+        private long modificationTime;
+
+        /**
+         * Default constructor.
+         */
+        public FileUnlockProcessor() {
+            // No-op.
+        }
+
+        /**
+         * Constructor.
+         *
+         * @param modificationTime Modification time.
+         */
+        public FileUnlockProcessor(long modificationTime) {
+            this.modificationTime = modificationTime;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void process(MutableEntry<IgniteUuid, IgfsFileInfo> entry, Object... args)
+            throws EntryProcessorException {
+            IgfsFileInfo old = entry.getValue();
+
+            entry.setValue(new IgfsFileInfo(old, null, modificationTime));
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeLong(modificationTime);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            modificationTime = in.readLong();
+        }
+    }
+
+    /**
+     * File reserve space entry processor.
+     */
+    private static class FileReserveSpaceProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>,
+        Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Space. */
+        private long space;
+
+        /** Affinity range. */
+        private IgfsFileAffinityRange affRange;
+
+        /**
+         * Default constructor.
+         */
+        public FileReserveSpaceProcessor() {
+            // No-op.
+        }
+
+        /**
+         * Constructor.
+         *
+         * @param space Space.
+         * @param affRange
+         */
+        public FileReserveSpaceProcessor(long space, IgfsFileAffinityRange affRange) {
+            this.space = space;
+            this.affRange = affRange;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void process(MutableEntry<IgniteUuid, IgfsFileInfo> entry, Object... args)
+            throws EntryProcessorException {
+            IgfsFileInfo oldInfo = entry.getValue();
+
+            IgfsFileMap newMap = new IgfsFileMap(oldInfo.fileMap());
+
+            newMap.addRange(affRange);
+
+            IgfsFileInfo newInfo = new IgfsFileInfo(oldInfo, oldInfo.length() + space);
+
+            newInfo.fileMap(newMap);
+
+            entry.setValue(newInfo);
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeLong(space);
+            out.writeObject(affRange);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            space = in.readLong();
+            affRange = (IgfsFileAffinityRange)in.readObject();
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/660a51f7/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index 83056af..8c11073 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -18,10 +18,7 @@
 package org.apache.ignite.internal.processors.igfs;
 
 import java.io.DataInput;
-import java.io.Externalizable;
 import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.IgniteCheckedException;
@@ -30,11 +27,9 @@ import org.apache.ignite.igfs.IgfsMode;
 import org.apache.ignite.igfs.IgfsPath;
 import org.apache.ignite.igfs.IgfsPathNotFoundException;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
@@ -297,8 +292,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
             if (space > 0) {
                 data.awaitAllAcksReceived(fileInfo.id());
 
-                IgfsFileInfo fileInfo0 = meta.updateInfo(fileInfo.id(),
-                    new ReserveSpaceClosure(space, streamRange));
+                IgfsFileInfo fileInfo0 = meta.reserveSpace(path, fileInfo.id(), space, streamRange);
 
                 if (fileInfo0 == null)
                     throw new IOException("File was concurrently deleted: " + path);
@@ -446,72 +440,4 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
     @Override public String toString() {
         return S.toString(IgfsOutputStreamImpl.class, this);
     }
-
-    /**
-     * Helper closure to reserve specified space and update file's length
-     */
-    @GridInternal
-    private static final class ReserveSpaceClosure implements IgniteClosure<IgfsFileInfo, IgfsFileInfo>,
-        Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Space amount (bytes number) to increase file's length. */
-        private long space;
-
-        /** Affinity range for this particular update. */
-        private IgfsFileAffinityRange range;
-
-        /**
-         * Empty constructor required for {@link Externalizable}.
-         *
-         */
-        public ReserveSpaceClosure() {
-            // No-op.
-        }
-
-        /**
-         * Constructs the closure to reserve specified space and update file's length.
-         *
-         * @param space Space amount (bytes number) to increase file's length.
-         * @param range Affinity range specifying which part of file was colocated.
-         */
-        private ReserveSpaceClosure(long space, IgfsFileAffinityRange range) {
-            this.space = space;
-            this.range = range;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgfsFileInfo apply(IgfsFileInfo oldInfo) {
-            IgfsFileMap oldMap = oldInfo.fileMap();
-
-            IgfsFileMap newMap = new IgfsFileMap(oldMap);
-
-            newMap.addRange(range);
-
-            // Update file length.
-            IgfsFileInfo updated = new IgfsFileInfo(oldInfo, oldInfo.length() + space);
-
-            updated.fileMap(newMap);
-
-            return updated;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeLong(space);
-            out.writeObject(range);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            space = in.readLong();
-            range = (IgfsFileAffinityRange)in.readObject();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(ReserveSpaceClosure.class, this);
-        }
-    }
 }
\ No newline at end of file