You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2016/03/22 17:49:49 UTC
[13/50] [abbrv] ignite git commit: IGNITE-2814: IGFS: File
lock/unlock/reserve operations are no longer require put/replace on cache.
Thin entry processors are used instead.
IGNITE-2814: IGFS: File lock/unlock/reserve operations are no longer require put/replace on cache. Thin entry processors are used instead.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b1d9e8b6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b1d9e8b6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b1d9e8b6
Branch: refs/heads/ignite-2004
Commit: b1d9e8b6c7553133c4c4ca3820f6f30d202b7ea2
Parents: d83fa11
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Mar 14 13:17:58 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Mar 14 13:17:58 2016 +0300
----------------------------------------------------------------------
.../processors/igfs/IgfsMetaManager.java | 270 ++++++++++++++++---
.../processors/igfs/IgfsOutputStreamImpl.java | 76 +-----
2 files changed, 228 insertions(+), 118 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1d9e8b6/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 89ddd02..8bb9e92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -466,10 +466,11 @@ public class IgfsMetaManager extends IgfsManager {
* Lock the file explicitly outside of transaction.
*
* @param fileId File ID to lock.
+ * @param delete If file is being locked for delete.
* @return Locked file info or {@code null} if file cannot be locked or doesn't exist.
* @throws IgniteCheckedException If the file with such id does not exist, or on another failure.
*/
- public @Nullable IgfsFileInfo lock(IgniteUuid fileId, boolean isDeleteLock) throws IgniteCheckedException {
+ public @Nullable IgfsFileInfo lock(IgniteUuid fileId, boolean delete) throws IgniteCheckedException {
if (busyLock.enterBusy()) {
try {
assert validTxState(false);
@@ -487,13 +488,11 @@ public class IgfsMetaManager extends IgfsManager {
if (oldInfo.lockId() != null)
return null; // The file is already locked, we cannot lock it.
- IgfsFileInfo newInfo = lockInfo(oldInfo, isDeleteLock);
+ IgniteUuid lockId = createFileLockId(delete);
- boolean put = id2InfoPrj.replace(fileId, oldInfo, newInfo);
+ id2InfoPrj.invoke(fileId, new FileLockProcessor(lockId));
- assert put : "Value was not stored in cache [fileId=" + fileId + ", newInfo=" + newInfo + ']';
-
- assert newInfo.id().equals(oldInfo.id()); // Same id.
+ IgfsFileInfo newInfo = id2InfoPrj.get(fileId);
tx.commit();
@@ -515,30 +514,13 @@ public class IgfsMetaManager extends IgfsManager {
}
/**
- * Set lock on file info.
+ * Create file lock ID.
*
- * @param info File info.
- * @return New file info with lock set, or null if the info passed in is already locked.
- * @throws IgniteCheckedException In case lock is already set on that file.
+ * @param delete If lock ID is required for file deletion.
+ * @return Lock ID.
*/
- private @Nullable IgfsFileInfo lockInfo(IgfsFileInfo info, boolean isDeleteLock) {
- assert info != null;
-
- if (info.lockId() != null)
- return null; // Null return value indicates that the file is already locked.
-
- return new IgfsFileInfo(info, composeLockId(isDeleteLock), info.modificationTime());
- }
-
- /**
- * Gets a new lock id.
- * The returned Id #globalId() method will return the Id of the node which locked the file.
- *
- * @param isDeleteLock if this is special delete lock.
- * @return The new lock id.
- */
- private IgniteUuid composeLockId(boolean isDeleteLock) {
- if (isDeleteLock)
+ private IgniteUuid createFileLockId(boolean delete) {
+ if (delete)
return IgfsUtils.DELETE_LOCK_ID;
return IgniteUuid.fromUuid(locNode.id());
@@ -584,12 +566,7 @@ public class IgfsMetaManager extends IgfsManager {
"[fileId=" + fileId + ", lockId=" + info.lockId() + ", actualLockId=" +
oldInfo.lockId() + ']');
- IgfsFileInfo newInfo = new IgfsFileInfo(oldInfo, null, modificationTime);
-
- boolean put = id2InfoPrj.put(fileId, newInfo);
-
- assert put : "Value was not stored in cache [fileId=" + fileId + ", newInfo=" + newInfo
- + ']';
+ id2InfoPrj.invoke(fileId, new FileUnlockProcessor(modificationTime));
return null;
}
@@ -1680,6 +1657,57 @@ public class IgfsMetaManager extends IgfsManager {
}
/**
+ * Reserve space for file.
+ *
+ * @param path File path.
+ * @param fileId File ID.
+ * @param space Space.
+ * @param affRange Affinity range.
+ * @return New file info.
+ */
+ public IgfsFileInfo reserveSpace(IgfsPath path, IgniteUuid fileId, long space, IgfsFileAffinityRange affRange)
+ throws IgniteCheckedException {
+ assert validTxState(false);
+
+ if (busyLock.enterBusy()) {
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Reserve file space [path=" + path + ", id=" + fileId + ']');
+
+ IgniteInternalTx tx = startTx();
+
+ try {
+ // Lock file ID for this transaction.
+ IgfsFileInfo oldInfo = info(fileId);
+
+ if (oldInfo == null)
+ throw fsException("File has been deleted concurrently [path=" + path + ", id=" + fileId + ']');
+
+ id2InfoPrj.invoke(fileId, new FileReserveSpaceProcessor(space, affRange));
+
+ IgfsFileInfo newInfo = id2InfoPrj.get(fileId);
+
+ tx.commit();
+
+ return newInfo;
+ }
+ catch (GridClosureException e) {
+ throw U.cast(e);
+ }
+ finally {
+ tx.close();
+ }
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+ else
+ throw new IllegalStateException("Failed to reseve file space because Grid is stopping [path=" + path +
+ ", id=" + fileId + ']');
+ }
+
+ /**
* Update file info in cache.
*
* @param fileId File ID to update information for.
@@ -1992,7 +2020,7 @@ public class IgfsMetaManager extends IgfsManager {
"the secondary file system because the path points to a directory: " + path);
IgfsFileInfo newInfo = new IgfsFileInfo(status.blockSize(), status.length(), affKey,
- composeLockId(false), igfsCtx.igfs().evictExclude(path, false), status.properties(),
+ createFileLockId(false), igfsCtx.igfs().evictExclude(path, false), status.properties(),
status.accessTime(), status.modificationTime());
// Add new file info to the listing optionally removing the previous one.
@@ -2113,13 +2141,11 @@ public class IgfsMetaManager extends IgfsManager {
}
// Set lock and return.
- IgfsFileInfo lockedInfo = lockInfo(info, false);
-
- assert lockedInfo != null; // We checked the lock above.
+ IgniteUuid lockId = createFileLockId(false);
- boolean put = id2InfoPrj.put(info.id(), lockedInfo);
+ id2InfoPrj.invoke(info.id(), new FileLockProcessor(lockId));
- assert put;
+ IgfsFileInfo lockedInfo = id2InfoPrj.get(info.id());
return new IgfsSecondaryOutputStreamDescriptor(infos.get(path.parent()).id(),
lockedInfo, out);
@@ -3455,7 +3481,7 @@ public class IgfsMetaManager extends IgfsManager {
@Override protected IgfsFileInfo buildLeaf() {
long t = System.currentTimeMillis();
- return new IgfsFileInfo(blockSize, 0L, affKey, composeLockId(false),
+ return new IgfsFileInfo(blockSize, 0L, affKey, createFileLockId(false),
evictExclude, leafProps, t, t);
}
};
@@ -3505,7 +3531,7 @@ public class IgfsMetaManager extends IgfsManager {
+ "[fileName=" + name + ", fileId=" + lowermostExistingInfo.id()
+ ", lockId=" + lockId + ']');
- IgniteUuid newLockId = composeLockId(false);
+ IgniteUuid newLockId = createFileLockId(false);
EntryProcessorResult<IgfsFileInfo> result
= id2InfoPrj.invoke(lowermostExistingInfo.id(),
@@ -3553,7 +3579,7 @@ public class IgfsMetaManager extends IgfsManager {
long t = System.currentTimeMillis();
final IgfsFileInfo newFileInfo = new IgfsFileInfo(cfg.getBlockSize(), 0L,
- affKey, composeLockId(false), evictExclude, fileProps, t, t);
+ affKey, createFileLockId(false), evictExclude, fileProps, t, t);
assert newFileInfo.lockId() != null; // locked info should be created.
@@ -3874,4 +3900,162 @@ public class IgfsMetaManager extends IgfsManager {
return S.toString(LockFileProcessor.class, this);
}
}
+
+ /**
+ * File lock entry processor.
+ */
+ private static class FileLockProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>, Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Lock Id. */
+ private IgniteUuid lockId;
+
+ /**
+ * Default constructor.
+ */
+ public FileLockProcessor() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param lockId Lock ID.
+ */
+ public FileLockProcessor(IgniteUuid lockId) {
+ this.lockId = lockId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void process(MutableEntry<IgniteUuid, IgfsFileInfo> entry, Object... args)
+ throws EntryProcessorException {
+ IgfsFileInfo old = entry.getValue();
+
+ entry.setValue(new IgfsFileInfo(old, lockId, old.modificationTime()));
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeGridUuid(out, lockId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ lockId = U.readGridUuid(in);
+ }
+ }
+
+ /**
+ * File unlock entry processor.
+ */
+ private static class FileUnlockProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>,
+ Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Modification time. */
+ private long modificationTime;
+
+ /**
+ * Default constructor.
+ */
+ public FileUnlockProcessor() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param modificationTime Modification time.
+ */
+ public FileUnlockProcessor(long modificationTime) {
+ this.modificationTime = modificationTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void process(MutableEntry<IgniteUuid, IgfsFileInfo> entry, Object... args)
+ throws EntryProcessorException {
+ IgfsFileInfo old = entry.getValue();
+
+ entry.setValue(new IgfsFileInfo(old, null, modificationTime));
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeLong(modificationTime);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ modificationTime = in.readLong();
+ }
+ }
+
+ /**
+ * File reserve space entry processor.
+ */
+ private static class FileReserveSpaceProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>,
+ Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Space. */
+ private long space;
+
+ /** Affinity range. */
+ private IgfsFileAffinityRange affRange;
+
+ /**
+ * Default constructor.
+ */
+ public FileReserveSpaceProcessor() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param space Space.
+ * @param affRange
+ */
+ public FileReserveSpaceProcessor(long space, IgfsFileAffinityRange affRange) {
+ this.space = space;
+ this.affRange = affRange;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void process(MutableEntry<IgniteUuid, IgfsFileInfo> entry, Object... args)
+ throws EntryProcessorException {
+ IgfsFileInfo oldInfo = entry.getValue();
+
+ IgfsFileMap newMap = new IgfsFileMap(oldInfo.fileMap());
+
+ newMap.addRange(affRange);
+
+ IgfsFileInfo newInfo = new IgfsFileInfo(oldInfo, oldInfo.length() + space);
+
+ newInfo.fileMap(newMap);
+
+ entry.setValue(newInfo);
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeLong(space);
+ out.writeObject(affRange);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ space = in.readLong();
+ affRange = (IgfsFileAffinityRange)in.readObject();
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1d9e8b6/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index 83056af..8c11073 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -18,10 +18,7 @@
package org.apache.ignite.internal.processors.igfs;
import java.io.DataInput;
-import java.io.Externalizable;
import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.IgniteCheckedException;
@@ -30,11 +27,9 @@ import org.apache.ignite.igfs.IgfsMode;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.igfs.IgfsPathNotFoundException;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
@@ -297,8 +292,7 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
if (space > 0) {
data.awaitAllAcksReceived(fileInfo.id());
- IgfsFileInfo fileInfo0 = meta.updateInfo(fileInfo.id(),
- new ReserveSpaceClosure(space, streamRange));
+ IgfsFileInfo fileInfo0 = meta.reserveSpace(path, fileInfo.id(), space, streamRange);
if (fileInfo0 == null)
throw new IOException("File was concurrently deleted: " + path);
@@ -446,72 +440,4 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
@Override public String toString() {
return S.toString(IgfsOutputStreamImpl.class, this);
}
-
- /**
- * Helper closure to reserve specified space and update file's length
- */
- @GridInternal
- private static final class ReserveSpaceClosure implements IgniteClosure<IgfsFileInfo, IgfsFileInfo>,
- Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Space amount (bytes number) to increase file's length. */
- private long space;
-
- /** Affinity range for this particular update. */
- private IgfsFileAffinityRange range;
-
- /**
- * Empty constructor required for {@link Externalizable}.
- *
- */
- public ReserveSpaceClosure() {
- // No-op.
- }
-
- /**
- * Constructs the closure to reserve specified space and update file's length.
- *
- * @param space Space amount (bytes number) to increase file's length.
- * @param range Affinity range specifying which part of file was colocated.
- */
- private ReserveSpaceClosure(long space, IgfsFileAffinityRange range) {
- this.space = space;
- this.range = range;
- }
-
- /** {@inheritDoc} */
- @Override public IgfsFileInfo apply(IgfsFileInfo oldInfo) {
- IgfsFileMap oldMap = oldInfo.fileMap();
-
- IgfsFileMap newMap = new IgfsFileMap(oldMap);
-
- newMap.addRange(range);
-
- // Update file length.
- IgfsFileInfo updated = new IgfsFileInfo(oldInfo, oldInfo.length() + space);
-
- updated.fileMap(newMap);
-
- return updated;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeLong(space);
- out.writeObject(range);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- space = in.readLong();
- range = (IgfsFileAffinityRange)in.readObject();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(ReserveSpaceClosure.class, this);
- }
- }
}
\ No newline at end of file