You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/03/21 08:45:36 UTC
[12/15] ignite git commit: IGNITE-2861: IGFS: Moved metadata
processors into separate top-level classes to simplify code. Also cleaned up
IgfsMetaManager from unused code.
IGNITE-2861: IGFS: Moved metadata processors into separate top-level classes to simplify code. Also cleaned up IgfsMetaManager from unused code.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/865e376a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/865e376a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/865e376a
Branch: refs/heads/ignite-1786
Commit: 865e376ad469bf9929b9f6f98ff0882e44c951a8
Parents: c506c44
Author: thatcoach <pp...@list.ru>
Authored: Sat Mar 19 21:13:35 2016 +0300
Committer: thatcoach <pp...@list.ru>
Committed: Sat Mar 19 21:13:35 2016 +0300
----------------------------------------------------------------------
.../igfs/IgfsFragmentizerManager.java | 141 +--
.../processors/igfs/IgfsMetaManager.java | 1142 ++----------------
.../internal/processors/igfs/IgfsPathIds.java | 2 +-
.../meta/IgfsMetaDirectoryCreateProcessor.java | 117 ++
.../IgfsMetaDirectoryListingAddProcessor.java | 92 ++
...IgfsMetaDirectoryListingRemoveProcessor.java | 89 ++
...gfsMetaDirectoryListingReplaceProcessor.java | 84 ++
.../igfs/meta/IgfsMetaFileCreateProcessor.java | 110 ++
.../igfs/meta/IgfsMetaFileLockProcessor.java | 63 +
.../meta/IgfsMetaFileRangeDeleteProcessor.java | 74 ++
.../meta/IgfsMetaFileRangeUpdateProcessor.java | 81 ++
.../meta/IgfsMetaFileReserveSpaceProcessor.java | 75 ++
.../igfs/meta/IgfsMetaFileUnlockProcessor.java | 60 +
.../igfs/meta/IgfsMetaUpdatePathProcessor.java | 66 +
.../meta/IgfsMetaUpdatePropertiesProcessor.java | 78 ++
.../igfs/meta/IgfsMetaUpdateTimesProcessor.java | 68 ++
16 files changed, 1155 insertions(+), 1187 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/865e376a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
index 194a8ac..99e7cd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsFragmentizerManager.java
@@ -26,13 +26,14 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaFileRangeDeleteProcessor;
+import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaFileRangeUpdateProcessor;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.LT;
-import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
@@ -40,13 +41,6 @@ import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
-import javax.cache.processor.EntryProcessor;
-import javax.cache.processor.EntryProcessorException;
-import javax.cache.processor.MutableEntry;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
@@ -294,7 +288,7 @@ public class IgfsFragmentizerManager extends IgfsManager {
case RANGE_STATUS_INITIAL: {
// Mark range as moving.
updated = igfsCtx.meta().updateInfo(
- fileId, new RangeUpdateProcessor(range, RANGE_STATUS_MOVING));
+ fileId, new IgfsMetaFileRangeUpdateProcessor(range, RANGE_STATUS_MOVING));
if (updated == null) {
igfsCtx.data().cleanBlocks(fileInfo, range, true);
@@ -311,7 +305,7 @@ public class IgfsFragmentizerManager extends IgfsManager {
// Mark range as moved.
updated = igfsCtx.meta().updateInfo(
- fileId, new RangeUpdateProcessor(range, RANGE_STATUS_MOVED));
+ fileId, new IgfsMetaFileRangeUpdateProcessor(range, RANGE_STATUS_MOVED));
if (updated == null) {
igfsCtx.data().cleanBlocks(fileInfo, range, true);
@@ -327,7 +321,7 @@ public class IgfsFragmentizerManager extends IgfsManager {
igfsCtx.data().cleanBlocks(fileInfo, range, false);
// Remove range from map.
- updated = igfsCtx.meta().updateInfo(fileId, new RangeDeleteProcessor(range));
+ updated = igfsCtx.meta().updateInfo(fileId, new IgfsMetaFileRangeDeleteProcessor(range));
if (updated == null)
igfsCtx.data().cleanBlocks(fileInfo, range, true);
@@ -343,131 +337,6 @@ public class IgfsFragmentizerManager extends IgfsManager {
}
/**
- * Update range processor.
- */
- private static class RangeUpdateProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, IgfsEntryInfo>,
- Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Range. */
- private IgfsFileAffinityRange range;
-
- /** Status. */
- private int status;
-
- /**
- * Constructor.
- */
- public RangeUpdateProcessor() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param range Range.
- * @param status Status.
- */
- public RangeUpdateProcessor(IgfsFileAffinityRange range, int status) {
- this.range = range;
- this.status = status;
- }
-
- /** {@inheritDoc} */
- @Override public IgfsEntryInfo process(MutableEntry<IgniteUuid, IgfsEntryInfo> entry, Object... args)
- throws EntryProcessorException {
- IgfsEntryInfo oldInfo = entry.getValue();
-
- IgfsFileMap newMap = new IgfsFileMap(oldInfo.fileMap());
-
- newMap.updateRangeStatus(range, status);
-
- IgfsEntryInfo newInfo = oldInfo.fileMap(newMap);
-
- entry.setValue(newInfo);
-
- return newInfo;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(range);
- out.writeInt(status);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- range = (IgfsFileAffinityRange)in.readObject();
- status = in.readInt();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(RangeUpdateProcessor.class, this);
- }
- }
-
- /**
- * Delete range processor.
- */
- private static class RangeDeleteProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, IgfsEntryInfo>,
- Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Range. */
- private IgfsFileAffinityRange range;
-
- /**
- * Constructor.
- */
- public RangeDeleteProcessor() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param range Range.
- */
- public RangeDeleteProcessor(IgfsFileAffinityRange range) {
- this.range = range;
- }
-
- /** {@inheritDoc} */
- @Override public IgfsEntryInfo process(MutableEntry<IgniteUuid, IgfsEntryInfo> entry, Object... args)
- throws EntryProcessorException {
- IgfsEntryInfo oldInfo = entry.getValue();
-
- IgfsFileMap newMap = new IgfsFileMap(oldInfo.fileMap());
-
- newMap.deleteRange(range);
-
- IgfsEntryInfo newInfo = oldInfo.fileMap(newMap);
-
- entry.setValue(newInfo);
-
- return newInfo;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(range);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- range = (IgfsFileAffinityRange)in.readObject();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(RangeDeleteProcessor.class, this);
- }
- }
-
- /**
* Fragmentizer coordinator thread.
*/
private class FragmentizerCoordinator extends GridWorker implements GridLocalEventListener, GridMessageListener {
http://git-wip-us.apache.org/repos/asf/ignite/blob/865e376a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index d66d9be..1aa49ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -44,7 +44,17 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheInternal;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaDirectoryCreateProcessor;
+import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaFileCreateProcessor;
+import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaFileLockProcessor;
+import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaFileReserveSpaceProcessor;
+import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaFileUnlockProcessor;
+import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaDirectoryListingAddProcessor;
+import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaDirectoryListingRemoveProcessor;
+import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaDirectoryListingReplaceProcessor;
+import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaUpdatePathProcessor;
+import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaUpdatePropertiesProcessor;
+import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaUpdateTimesProcessor;
import org.apache.ignite.internal.util.GridLeanMap;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.lang.GridClosureException;
@@ -52,22 +62,15 @@ import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
import javax.cache.processor.EntryProcessor;
-import javax.cache.processor.EntryProcessorException;
-import javax.cache.processor.MutableEntry;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
+import javax.cache.processor.EntryProcessorResult;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
@@ -89,7 +92,6 @@ import java.util.concurrent.CountDownLatch;
/**
* Cache based structure (meta data) manager.
*/
-@SuppressWarnings("all")
public class IgfsMetaManager extends IgfsManager {
/** Comparator for Id sorting. */
private static final Comparator<IgniteUuid> PATH_ID_SORTING_COMPARATOR
@@ -161,6 +163,7 @@ public class IgfsMetaManager extends IgfsManager {
}
/** {@inheritDoc} */
+ @SuppressWarnings("RedundantCast")
@Override protected void onKernalStart0() throws IgniteCheckedException {
metaCache = igfsCtx.kernalContext().cache().getOrStartCache(cfg.getMetaCacheName());
@@ -511,9 +514,7 @@ public class IgfsMetaManager extends IgfsManager {
assert fileId != null;
- IgniteInternalTx tx = startTx();
-
- try {
+ try (IgniteInternalTx tx = startTx()) {
// Lock file ID for this transaction.
IgfsEntryInfo oldInfo = info(fileId);
@@ -532,9 +533,6 @@ public class IgfsMetaManager extends IgfsManager {
catch (GridClosureException e) {
throw U.cast(e);
}
- finally {
- tx.close();
- }
}
finally {
busyLock.leaveBusy();
@@ -593,12 +591,12 @@ public class IgfsMetaManager extends IgfsManager {
throw fsException(new IgfsPathNotFoundException("Failed to unlock file (file not " +
"found): " + fileId));
- if (!info.lockId().equals(oldInfo.lockId()))
+ if (!F.eq(info.lockId(), oldInfo.lockId()))
throw new IgniteCheckedException("Failed to unlock file (inconsistent file lock ID) " +
"[fileId=" + fileId + ", lockId=" + info.lockId() + ", actualLockId=" +
oldInfo.lockId() + ']');
- id2InfoPrj.invoke(fileId, new FileUnlockProcessor(modificationTime));
+ id2InfoPrj.invoke(fileId, new IgfsMetaFileUnlockProcessor(modificationTime));
return null;
}
@@ -701,8 +699,6 @@ public class IgfsMetaManager extends IgfsManager {
throws IgniteCheckedException {
assert IgfsUtils.isRootOrTrashId(id);
- long time = System.currentTimeMillis();
-
IgfsEntryInfo info = IgfsUtils.createDirectory(id);
IgfsEntryInfo oldInfo = id2InfoPrj.getAndPutIfAbsent(id, info);
@@ -898,9 +894,7 @@ public class IgfsMetaManager extends IgfsManager {
srcPathIds.addExistingIds(lockIds);
dstPathIds.addExistingIds(lockIds);
- IgniteInternalTx tx = startTx();
-
- try {
+ try (IgniteInternalTx tx = startTx()) {
// Obtain the locks.
final Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds);
@@ -942,9 +936,6 @@ public class IgfsMetaManager extends IgfsManager {
// Set the new path to the info to simplify event creation:
return srcInfo.path(newPath);
}
- finally {
- tx.close();
- }
}
finally {
busyLock.leaveBusy();
@@ -956,41 +947,6 @@ public class IgfsMetaManager extends IgfsManager {
}
/**
- * Verify path integrity.
- *
- * @param path Path to verify.
- * @param expIds Expected IDs for this path. Might contain additional elements, e.g. because they were created
- * on a child path.
- * @param infos Locked infos.
- * @return verification result.
- */
- private static boolean verifyPathIntegrity(IgfsPath path, List<IgniteUuid> expIds,
- Map<IgniteUuid, IgfsEntryInfo> infos) {
- List<String> pathParts = path.components();
-
- assert pathParts.size() < expIds.size();
-
- for (int i = 0; i < pathParts.size(); i++) {
- IgniteUuid parentId = expIds.get(i);
-
- // If parent ID is null, it doesn't exist.
- if (parentId != null) {
- IgfsEntryInfo parentInfo = infos.get(parentId);
-
- // If parent info is null, it doesn't exist.
- if (parentInfo != null) {
- if (parentInfo.hasChild(pathParts.get(i), expIds.get(i + 1)))
- continue;
- }
- }
-
- return false;
- }
-
- return true;
- }
-
- /**
* Move or rename file in existing transaction.
*
* @param fileId File ID to move or rename.
@@ -1069,6 +1025,7 @@ public class IgfsMetaManager extends IgfsManager {
* elements moved to TRASH folder.
* @throws IgniteCheckedException On error.
*/
+ @SuppressWarnings("RedundantCast")
IgniteUuid format() throws IgniteCheckedException {
if (busyLock.enterBusy()) {
try {
@@ -1076,9 +1033,7 @@ public class IgfsMetaManager extends IgfsManager {
IgniteUuid trashId = IgfsUtils.randomTrashId();
- final IgniteInternalTx tx = startTx();
-
- try {
+ try (IgniteInternalTx tx = startTx()) {
// NB: We may lock root because its id is less than any other id:
final IgfsEntryInfo rootInfo = lockIds(IgfsUtils.ROOT_ID, trashId).get(IgfsUtils.ROOT_ID);
@@ -1097,7 +1052,7 @@ public class IgfsMetaManager extends IgfsManager {
IgfsEntryInfo newInfo = IgfsUtils.createDirectory(
IgniteUuid.randomUuid(),
transferListing,
- (Map<String,String>)null
+ (Map<String, String>) null
);
createNewEntry(newInfo, trashId, newInfo.id().toString());
@@ -1112,9 +1067,6 @@ public class IgfsMetaManager extends IgfsManager {
return newInfo.id();
}
- finally {
- tx.close();
- }
}
finally {
busyLock.leaveBusy();
@@ -1127,9 +1079,8 @@ public class IgfsMetaManager extends IgfsManager {
/**
* Move path to the trash directory.
*
- * @param parentId Parent ID.
- * @param pathName Path name.
- * @param pathId Path ID.
+ * @param path Path.
+ * @param recursive Recursive flag.
* @return ID of an entry located directly under the trash directory.
* @throws IgniteCheckedException If failed.
*/
@@ -1159,9 +1110,7 @@ public class IgfsMetaManager extends IgfsManager {
allIds.add(trashId);
- IgniteInternalTx tx = startTx();
-
- try {
+ try (IgniteInternalTx tx = startTx()) {
// Lock participants.
Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(allIds);
@@ -1197,9 +1146,6 @@ public class IgfsMetaManager extends IgfsManager {
return victimId;
}
- finally {
- tx.close();
- }
}
finally {
busyLock.leaveBusy();
@@ -1220,9 +1166,9 @@ public class IgfsMetaManager extends IgfsManager {
* @return ID of an entry located directly under the trash directory.
* @throws IgniteCheckedException If failed.
*/
+ @SuppressWarnings("RedundantCast")
@Nullable private IgniteUuid softDeleteNonTx(@Nullable IgniteUuid parentId, @Nullable String name, IgniteUuid id,
- IgniteUuid trashId)
- throws IgniteCheckedException {
+ IgniteUuid trashId) throws IgniteCheckedException {
validTxState(true);
IgniteUuid resId;
@@ -1268,7 +1214,7 @@ public class IgfsMetaManager extends IgfsManager {
// Remove listing entries from root.
for (Map.Entry<String, IgfsListingEntry> entry : transferListing.entrySet())
id2InfoPrj.invoke(IgfsUtils.ROOT_ID,
- new ListingRemoveProcessor(entry.getKey(), entry.getValue().fileId()));
+ new IgfsMetaDirectoryListingRemoveProcessor(entry.getKey(), entry.getValue().fileId()));
resId = newInfo.id();
}
@@ -1304,9 +1250,7 @@ public class IgfsMetaManager extends IgfsManager {
assert listing != null;
validTxState(false);
- IgniteInternalTx tx = startTx();
-
- try {
+ try (IgniteInternalTx tx = startTx()) {
Collection<IgniteUuid> res = new HashSet<>();
// Obtain all necessary locks in one hop.
@@ -1367,9 +1311,6 @@ public class IgfsMetaManager extends IgfsManager {
return res;
}
- finally {
- tx.close();
- }
}
finally {
busyLock.leaveBusy();
@@ -1395,17 +1336,13 @@ public class IgfsMetaManager extends IgfsManager {
try {
validTxState(false);
- IgniteInternalTx tx = startTx();
-
- try {
- boolean res = false;
-
+ try (IgniteInternalTx tx = startTx()) {
Map<IgniteUuid, IgfsEntryInfo> infos = lockIds(parentId, id);
IgfsEntryInfo victim = infos.get(id);
if (victim == null)
- return res;
+ return false;
assert victim.isDirectory() || IgfsUtils.DELETE_LOCK_ID.equals(victim.lockId()) :
" isDir: " + victim.isDirectory() + ", lockId: " + victim.lockId();
@@ -1419,19 +1356,16 @@ public class IgfsMetaManager extends IgfsManager {
IgfsListingEntry childEntry = parentInfo.listing().get(name);
if (childEntry != null)
- id2InfoPrj.invoke(parentId, new ListingRemoveProcessor(name, id));
+ id2InfoPrj.invoke(parentId, new IgfsMetaDirectoryListingRemoveProcessor(name, id));
id2InfoPrj.remove(id);
- res = true;
- }
+ tx.commit();
- tx.commit();
+ return true;
+ }
- return res;
- }
- finally {
- tx.close();
+ return false;
}
}
finally {
@@ -1499,7 +1433,7 @@ public class IgfsMetaManager extends IgfsManager {
if (oldInfo == null)
return null;
- return invokeAndGet(fileId, new UpdatePropertiesProcessor(props));
+ return invokeAndGet(fileId, new IgfsMetaUpdatePropertiesProcessor(props));
}
catch (GridClosureException e) {
throw U.cast(e);
@@ -1520,18 +1454,13 @@ public class IgfsMetaManager extends IgfsManager {
try {
validTxState(false);
- IgniteInternalTx tx = startTx();
-
- try {
+ try (IgniteInternalTx tx = startTx()) {
IgfsEntryInfo info = updatePropertiesNonTx(fileId, props);
tx.commit();
return info;
}
- finally {
- tx.close();
- }
}
finally {
busyLock.leaveBusy();
@@ -1560,16 +1489,15 @@ public class IgfsMetaManager extends IgfsManager {
if (log.isDebugEnabled())
log.debug("Reserve file space [path=" + path + ", id=" + fileId + ']');
- IgniteInternalTx tx = startTx();
-
- try {
+ try (IgniteInternalTx tx = startTx()) {
// Lock file ID for this transaction.
IgfsEntryInfo oldInfo = info(fileId);
if (oldInfo == null)
throw fsException("File has been deleted concurrently [path=" + path + ", id=" + fileId + ']');
- IgfsEntryInfo newInfo = invokeAndGet(fileId, new FileReserveSpaceProcessor(space, affRange));
+ IgfsEntryInfo newInfo =
+ invokeAndGet(fileId, new IgfsMetaFileReserveSpaceProcessor(space, affRange));
tx.commit();
@@ -1578,9 +1506,6 @@ public class IgfsMetaManager extends IgfsManager {
catch (GridClosureException e) {
throw U.cast(e);
}
- finally {
- tx.close();
- }
}
finally {
busyLock.leaveBusy();
@@ -1610,9 +1535,7 @@ public class IgfsMetaManager extends IgfsManager {
if (log.isDebugEnabled())
log.debug("Update file info [fileId=" + fileId + ", proc=" + proc + ']');
- IgniteInternalTx tx = startTx();
-
- try {
+ try (IgniteInternalTx tx = startTx()) {
// Lock file ID for this transaction.
IgfsEntryInfo oldInfo = info(fileId);
@@ -1623,7 +1546,7 @@ public class IgfsMetaManager extends IgfsManager {
if (newInfo == null)
throw fsException("Failed to update file info with null value" +
- " [oldInfo=" + oldInfo + ", newInfo=" + newInfo + ", proc=" + proc + ']');
+ " [oldInfo=" + oldInfo + ", newInfo=null, proc=" + proc + ']');
if (!oldInfo.id().equals(newInfo.id()))
throw fsException("Failed to update file info (file IDs differ)" +
@@ -1640,9 +1563,6 @@ public class IgfsMetaManager extends IgfsManager {
catch (GridClosureException e) {
throw U.cast(e);
}
- finally {
- tx.close();
- }
}
finally {
busyLock.leaveBusy();
@@ -1679,9 +1599,7 @@ public class IgfsMetaManager extends IgfsManager {
assert lockIds.size() == pathIds.count();
// Start TX.
- IgniteInternalTx tx = startTx();
-
- try {
+ try (IgniteInternalTx tx = startTx()) {
final Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds);
if (!pathIds.verifyIntegrity(lockInfos))
@@ -1710,9 +1628,6 @@ public class IgfsMetaManager extends IgfsManager {
// We are done.
return true;
}
- finally {
- tx.close();
- }
}
finally {
busyLock.leaveBusy();
@@ -1735,18 +1650,13 @@ public class IgfsMetaManager extends IgfsManager {
try {
validTxState(false);
- IgniteInternalTx tx = startTx();
-
- try {
+ try (IgniteInternalTx tx = startTx()) {
Object prev = val != null ? metaCache.getAndPut(sampling, val) : metaCache.getAndRemove(sampling);
tx.commit();
return !F.eq(prev, val);
}
- finally {
- tx.close();
- }
}
finally {
busyLock.leaveBusy();
@@ -1795,7 +1705,7 @@ public class IgfsMetaManager extends IgfsManager {
throw fsException("Failed to create new metadata entry due to ID conflict: " + info.id());
if (parentId != null)
- id2InfoPrj.invoke(parentId, new ListingAddProcessor(name, new IgfsListingEntry(info)));
+ id2InfoPrj.invoke(parentId, new IgfsMetaDirectoryListingAddProcessor(name, new IgfsListingEntry(info)));
}
/**
@@ -1812,8 +1722,8 @@ public class IgfsMetaManager extends IgfsManager {
IgniteUuid destId, String destName) throws IgniteCheckedException {
validTxState(true);
- id2InfoPrj.invoke(srcId, new ListingRemoveProcessor(srcName, entry.fileId()));
- id2InfoPrj.invoke(destId, new ListingAddProcessor(destName, entry));
+ id2InfoPrj.invoke(srcId, new IgfsMetaDirectoryListingRemoveProcessor(srcName, entry.fileId()));
+ id2InfoPrj.invoke(destId, new IgfsMetaDirectoryListingAddProcessor(destName, entry));
}
/**
@@ -1825,7 +1735,7 @@ public class IgfsMetaManager extends IgfsManager {
* @throws IgniteCheckedException If failed.
*/
private IgfsEntryInfo invokeLock(IgniteUuid id, boolean delete) throws IgniteCheckedException {
- return invokeAndGet(id, new FileLockProcessor(createFileLockId(delete)));
+ return invokeAndGet(id, new IgfsMetaFileLockProcessor(createFileLockId(delete)));
}
/**
@@ -1838,7 +1748,7 @@ public class IgfsMetaManager extends IgfsManager {
private void invokeUpdatePath(IgniteUuid id, IgfsPath path) throws IgniteCheckedException {
validTxState(true);
- id2InfoPrj.invoke(id, new UpdatePathProcessor(path));
+ id2InfoPrj.invoke(id, new IgfsMetaUpdatePathProcessor(path));
}
/**
@@ -1853,7 +1763,11 @@ public class IgfsMetaManager extends IgfsManager {
throws IgniteCheckedException {
validTxState(true);
- return id2InfoPrj.invoke(id, proc).get();
+ EntryProcessorResult<IgfsEntryInfo> res = id2InfoPrj.invoke(id, proc);
+
+ assert res != null;
+
+ return res.get();
}
/**
@@ -1985,6 +1899,8 @@ public class IgfsMetaManager extends IgfsManager {
);
// Add new file info to the listing optionally removing the previous one.
+ assert parentInfo != null;
+
IgniteUuid oldId = putIfAbsentNonTx(parentInfo.id(), path.name(), newInfo);
if (oldId != null) {
@@ -1998,12 +1914,12 @@ public class IgfsMetaManager extends IgfsManager {
path + ", fileId=" + oldId + ", lockId=" + oldInfo.lockId() + ']');
id2InfoPrj.remove(oldId); // Remove the old one.
- id2InfoPrj.invoke(parentInfo.id(),
- new ListingRemoveProcessor(path.name(), parentInfo.listing().get(path.name()).fileId()));
+ id2InfoPrj.invoke(parentInfo.id(), new IgfsMetaDirectoryListingRemoveProcessor(
+ path.name(), parentInfo.listing().get(path.name()).fileId()));
createNewEntry(newInfo, parentInfo.id(), path.name()); // Put new one.
- IgniteInternalFuture<?> delFut = igfsCtx.data().delete(oldInfo);
+ igfsCtx.data().delete(oldInfo);
}
// Record CREATE event if needed.
@@ -2084,13 +2000,13 @@ public class IgfsMetaManager extends IgfsManager {
if (remainder > 0) {
int blockIdx = (int)(len / blockSize);
- IgfsSecondaryFileSystemPositionedReadable reader = fs.open(path, bufSize);
+ try (IgfsSecondaryFileSystemPositionedReadable reader = fs.open(path, bufSize)) {
+ IgniteInternalFuture<byte[]> fut =
+ igfsCtx.data().dataBlock(info, path, blockIdx, reader);
- try {
- igfsCtx.data().dataBlock(info, path, blockIdx, reader).get();
- }
- finally {
- reader.close();
+ assert fut != null;
+
+ fut.get();
}
}
@@ -2366,12 +2282,8 @@ public class IgfsMetaManager extends IgfsManager {
fs.rename(src, dest);
// Rename was successful, perform compensation in the local file system.
- if (destInfo == null) {
- // Move and rename.
- assert destParentInfo != null;
-
+ if (destInfo == null)
moveNonTx(srcInfo.id(), src.name(), srcParentInfo.id(), dest.name(), destParentInfo.id());
- }
else {
// Move.
if (destInfo.isFile())
@@ -2622,6 +2534,8 @@ public class IgfsMetaManager extends IgfsManager {
status.modificationTime()
);
+ assert parentInfo != null;
+
IgniteUuid oldId = putIfAbsentNonTx(parentInfo.id(), components.get(i), curInfo);
if (oldId != null)
@@ -2669,13 +2583,9 @@ public class IgfsMetaManager extends IgfsManager {
* @return Result of task execution.
* @throws IgniteCheckedException If failed.
*/
- private <T> T synchronizeAndExecute(SynchronizationTask<T> task,
- IgfsSecondaryFileSystem fs,
- boolean strict,
- @Nullable Collection<IgniteUuid> extraLockIds,
- IgfsPath... paths)
- throws IgniteCheckedException
- {
+ @SuppressWarnings({"Contract", "ConstantConditions"})
+ private <T> T synchronizeAndExecute(SynchronizationTask<T> task, IgfsSecondaryFileSystem fs, boolean strict,
+ @Nullable Collection<IgniteUuid> extraLockIds, IgfsPath... paths) throws IgniteCheckedException {
assert task != null;
assert fs != null;
assert paths != null && paths.length > 0;
@@ -2696,9 +2606,8 @@ public class IgfsMetaManager extends IgfsManager {
pathIds.add(fileIds(path));
// Start pessimistic.
- IgniteInternalTx tx = startTx();
- try {
+ try (IgniteInternalTx tx = startTx()) {
// Lock the very first existing parents and possibly the leaf as well.
Map<IgfsPath, IgfsPath> pathToParent = new HashMap<>();
@@ -2864,34 +2773,12 @@ public class IgfsMetaManager extends IgfsManager {
else
throw e;
}
- finally {
- tx.close();
- }
}
return res;
}
/**
- * Update cached value with closure.
- *
- * @param cache Cache projection to work with.
- * @param key Key to retrieve/update the value for.
- * @param c Closure to apply to cached value.
- * @return {@code True} if value was stored in cache, {@code false} otherwise.
- * @throws IgniteCheckedException If operation failed.
- */
- private <K, V> boolean putx(IgniteInternalCache<K, V> cache, K key, IgniteClosure<V, V> c)
- throws IgniteCheckedException {
- validTxState(true);
-
- V oldVal = cache.get(key);
- V newVal = c.apply(oldVal);
-
- return newVal == null ? cache.remove(key) : cache.put(key, newVal);
- }
-
- /**
* Check transaction is (not) started.
*
* @param inTx Expected transaction state.
@@ -2927,16 +2814,14 @@ public class IgfsMetaManager extends IgfsManager {
validTxState(false);
// Start pessimistic transaction.
- IgniteInternalTx tx = startTx();
-
- try {
+ try (IgniteInternalTx tx = startTx()) {
Map<IgniteUuid, IgfsEntryInfo> infoMap = lockIds(fileId, parentId);
IgfsEntryInfo fileInfo = infoMap.get(fileId);
if (fileInfo == null)
throw fsException(new IgfsPathNotFoundException("Failed to update times " +
- "(path was not found): " + fileName));
+ "(path was not found): " + fileName));
IgfsEntryInfo parentInfo = infoMap.get(parentId);
@@ -2947,20 +2832,17 @@ public class IgfsMetaManager extends IgfsManager {
// Validate listing.
if (!parentInfo.hasChild(fileName, fileId))
throw fsException(new IgfsConcurrentModificationException("Failed to update times " +
- "(file concurrently modified): " + fileName));
+ "(file concurrently modified): " + fileName));
assert parentInfo.isDirectory();
- id2InfoPrj.invoke(fileId, new UpdateTimesProcessor(
+ id2InfoPrj.invoke(fileId, new IgfsMetaUpdateTimesProcessor(
accessTime == -1 ? fileInfo.accessTime() : accessTime,
modificationTime == -1 ? fileInfo.modificationTime() : modificationTime)
);
tx.commit();
}
- finally {
- tx.close();
- }
}
finally {
busyLock.leaveBusy();
@@ -2980,7 +2862,7 @@ public class IgfsMetaManager extends IgfsManager {
}
/**
- * @param msg Error message.
+ * @param err Unchecked exception.
* @return Checked exception.
*/
private static IgniteCheckedException fsException(IgfsException err) {
@@ -3011,365 +2893,6 @@ public class IgfsMetaManager extends IgfsManager {
}
/**
- * Path descriptor.
- */
- private static class PathDescriptor {
- /** Path. */
- private final IgfsPath path;
-
- /** Resolved IDs. */
- private final List<IgniteUuid> ids;
-
- /** Parent path. */
- private IgfsPath parentPath;
-
- /** Parent path info. */
- private IgfsEntryInfo parentInfo;
-
- /**
- * Constructor.
- *
- * @param path Path.
- * @param ids Resolved path IDs.
- * @param parentPath Parent path.
- * @param parentInfo Parent info.
- */
- PathDescriptor(IgfsPath path, List<IgniteUuid> ids, IgfsPath parentPath, IgfsEntryInfo parentInfo) {
- assert path != null;
- assert ids != null && !ids.isEmpty();
- assert parentPath == null && parentInfo == null || parentPath != null && parentInfo != null;
- assert parentPath == null || parentPath != null && path.isSubDirectoryOf(parentPath);
-
- this.path = path;
- this.ids = ids;
- this.parentPath = parentPath;
- this.parentInfo = parentInfo;
- }
-
- /**
- * Get resolved path ids.
- *
- * @return Path ids.
- */
- private Collection<IgniteUuid> ids() {
- return ids;
- }
-
- /**
- * Get path ID from the end. E.g. endId(1) will return the last element.
- * @param i Element index from the end.
- *
- * @return Path ID from the end.
- */
- private IgniteUuid endId(int i) {
- return ids.get(ids.size() - i);
- }
-
- /**
- * Update ID with the given index.
- *
- * @param newParentPath New parent path.
- * @param newParentInfo New parent info.
- */
- private void updateParent(IgfsPath newParentPath, IgfsEntryInfo newParentInfo) {
- assert newParentPath != null;
- assert newParentInfo != null;
- assert path.isSubDirectoryOf(newParentPath);
-
- parentPath = newParentPath;
- parentInfo = newParentInfo;
-
- ids.set(newParentPath.components().size(), newParentInfo.id());
- }
-
- /**
- * Get parent path.
- *
- * @return Parent path.
- */
- private IgfsPath parentPath() {
- return parentPath;
- }
-
- /**
- * Get parent path info.
- *
- * @return Parent path info.
- */
- private IgfsEntryInfo parentInfo() {
- return parentInfo;
- }
- }
-
- /**
- * Remove entry from directory listing.
- */
- @GridInternal
- private static final class ListingRemoveProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, Void>,
- Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** File name. */
- private String fileName;
-
- /** Expected ID. */
- private IgniteUuid fileId;
-
- /**
- * Default constructor.
- */
- public ListingRemoveProcessor() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param fileName File name.
- * @param fileId File ID.
- */
- public ListingRemoveProcessor(String fileName, IgniteUuid fileId) {
- this.fileName = fileName;
- this.fileId = fileId;
- }
-
- /** {@inheritDoc} */
- @Override public Void process(MutableEntry<IgniteUuid, IgfsEntryInfo> e, Object... args)
- throws EntryProcessorException {
- IgfsEntryInfo fileInfo = e.getValue();
-
- assert fileInfo != null;
- assert fileInfo.isDirectory();
-
- Map<String, IgfsListingEntry> listing = new HashMap<>(fileInfo.listing());
-
- listing.putAll(fileInfo.listing());
-
- IgfsListingEntry oldEntry = listing.get(fileName);
-
- if (oldEntry == null || !oldEntry.fileId().equals(fileId))
- throw new IgniteException("Directory listing doesn't contain expected file" +
- " [listing=" + listing + ", fileName=" + fileName + "]");
-
- // Modify listing in-place.
- listing.remove(fileName);
-
- e.setValue(fileInfo.listing(listing));
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- U.writeString(out, fileName);
- U.writeGridUuid(out, fileId);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- fileName = U.readString(in);
- fileId = U.readGridUuid(in);
- }
- }
-
- /**
- * Update directory listing closure.
- */
- @GridInternal
- private static final class ListingAddProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, Void>,
- Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** File name to add into parent listing. */
- private String fileName;
-
- /** File ID.*/
- private IgfsListingEntry entry;
-
- /**
- * Empty constructor required for {@link Externalizable}.
- *
- */
- public ListingAddProcessor() {
- // No-op.
- }
-
- /**
- * Constructs update directory listing closure.
- *
- * @param fileName File name to add into parent listing.
- * @param entry Listing entry to add or remove.
- */
- private ListingAddProcessor(String fileName, IgfsListingEntry entry) {
- assert fileName != null;
- assert entry != null;
-
- this.fileName = fileName;
- this.entry = entry;
- }
-
- /** {@inheritDoc} */
- @Override public Void process(MutableEntry<IgniteUuid, IgfsEntryInfo> e, Object... args) {
- IgfsEntryInfo fileInfo = e.getValue();
-
- assert fileInfo.isDirectory();
-
- Map<String, IgfsListingEntry> listing = new HashMap<>(fileInfo.listing());
-
- // Modify listing in-place.
- IgfsListingEntry oldEntry = listing.put(fileName, entry);
-
- if (oldEntry != null && !oldEntry.fileId().equals(entry.fileId()))
- throw new IgniteException("Directory listing contains unexpected file" +
- " [listing=" + listing + ", fileName=" + fileName + ", entry=" + entry +
- ", oldEntry=" + oldEntry + ']');
-
- e.setValue(fileInfo.listing(listing));
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- U.writeString(out, fileName);
- out.writeObject(entry);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- fileName = U.readString(in);
- entry = (IgfsListingEntry)in.readObject();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(ListingAddProcessor.class, this);
- }
- }
-
- /**
- * Listing replace processor.
- */
- private static final class ListingReplaceProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, Void>,
- Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Name. */
- private String name;
-
- /** New ID. */
- private IgniteUuid id;
-
- /**
- * Constructor.
- */
- public ListingReplaceProcessor() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param name Name.
- * @param id ID.
- */
- public ListingReplaceProcessor(String name, IgniteUuid id) {
- this.name = name;
- this.id = id;
- }
-
- /** {@inheritDoc} */
- @Override public Void process(MutableEntry<IgniteUuid, IgfsEntryInfo> e, Object... args)
- throws EntryProcessorException {
- IgfsEntryInfo fileInfo = e.getValue();
-
- assert fileInfo.isDirectory();
-
- Map<String, IgfsListingEntry> listing = new HashMap<>(fileInfo.listing());
-
- // Modify listing in-place.
- IgfsListingEntry oldEntry = listing.get(name);
-
- if (oldEntry == null)
- throw new IgniteException("Directory listing doesn't contain expected entry: " + name);
-
- listing.put(name, new IgfsListingEntry(id, oldEntry.isDirectory()));
-
- e.setValue(fileInfo.listing(listing));
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- U.writeString(out, name);
- out.writeObject(id);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- name = U.readString(in);
- id = (IgniteUuid)in.readObject();
- }
- }
-
- /**
- * Update path closure.
- */
- @GridInternal
- private static final class UpdatePathProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, Void>,
- Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** New path. */
- private IgfsPath path;
-
- /**
- * @param path Path.
- */
- private UpdatePathProcessor(IgfsPath path) {
- this.path = path;
- }
-
- /**
- * Default constructor (required by Externalizable).
- */
- public UpdatePathProcessor() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public Void process(MutableEntry<IgniteUuid, IgfsEntryInfo> e, Object... args) {
- IgfsEntryInfo info = e.getValue();
-
- IgfsEntryInfo newInfo = info.path(path);
-
- e.setValue(newInfo);
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(path);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- path = (IgfsPath)in.readObject();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(UpdatePathProcessor.class, this);
- }
- }
-
- /**
* Append routine.
*
* @param path Path.
@@ -3409,9 +2932,7 @@ public class IgfsMetaManager extends IgfsManager {
pathIds.addSurrogateIds(lockIds);
// Start TX.
- IgniteInternalTx tx = startTx();
-
- try {
+ try (IgniteInternalTx tx = startTx()) {
Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds);
if (!pathIds.verifyIntegrity(lockInfos))
@@ -3458,9 +2979,6 @@ public class IgfsMetaManager extends IgfsManager {
return new T2<>(res.info(), res.parentId());
}
}
- finally {
- tx.close();
- }
}
finally {
busyLock.leaveBusy();
@@ -3472,16 +2990,17 @@ public class IgfsMetaManager extends IgfsManager {
}
/**
- * Create a new file.
+ * Create a file.
*
* @param path Path.
- * @param bufSize Buffer size.
+ * @param dirProps Directory properties.
* @param overwrite Overwrite flag.
+ * @param blockSize Block size.
* @param affKey Affinity key.
- * @param replication Replication factor.
- * @param props Properties.
- * @param simpleCreate Whether new file should be created in secondary FS using create(Path, boolean) method.
- * @return Tuple containing the created file info and its parent id.
+ * @param evictExclude Evict exclude flag.
+ * @param fileProps File properties.
+ * @return @return Tuple containing the created file info and its parent id.
+ * @throws IgniteCheckedException If failed.
*/
IgniteBiTuple<IgfsEntryInfo, IgniteUuid> create(
final IgfsPath path,
@@ -3518,9 +3037,7 @@ public class IgfsMetaManager extends IgfsManager {
}
// Start TX.
- IgniteInternalTx tx = startTx();
-
- try {
+ try (IgniteInternalTx tx = startTx()) {
Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds);
if (!pathIds.verifyIntegrity(lockInfos))
@@ -3548,19 +3065,19 @@ public class IgfsMetaManager extends IgfsManager {
// First step: add existing to trash listing.
IgniteUuid oldId = pathIds.lastId();
- id2InfoPrj.invoke(trashId, new ListingAddProcessor(oldId.toString(),
+ id2InfoPrj.invoke(trashId, new IgfsMetaDirectoryListingAddProcessor(oldId.toString(),
new IgfsListingEntry(oldInfo)));
// Second step: replace ID in parent directory.
String name = pathIds.lastPart();
IgniteUuid parentId = pathIds.lastParentId();
- id2InfoPrj.invoke(parentId, new ListingReplaceProcessor(name, overwriteId));
+ id2InfoPrj.invoke(parentId, new IgfsMetaDirectoryListingReplaceProcessor(name, overwriteId));
// Third step: create the file.
long createTime = System.currentTimeMillis();
- IgfsEntryInfo newInfo = invokeAndGet(overwriteId, new FileCreateProcessor(createTime,
+ IgfsEntryInfo newInfo = invokeAndGet(overwriteId, new IgfsMetaFileCreateProcessor(createTime,
fileProps, blockSize, affKey, createFileLockId(false), evictExclude));
// Fourth step: update path of remove file.
@@ -3592,9 +3109,6 @@ public class IgfsMetaManager extends IgfsManager {
return new T2<>(res.info(), res.parentId());
}
}
- finally {
- tx.close();
- }
}
finally {
busyLock.leaveBusy();
@@ -3682,7 +3196,7 @@ public class IgfsMetaManager extends IgfsManager {
return null;
// First step: add new entry to the last existing element.
- id2InfoPrj.invoke(lastExistingInfo.id(), new ListingAddProcessor(curPart,
+ id2InfoPrj.invoke(lastExistingInfo.id(), new IgfsMetaDirectoryListingAddProcessor(curPart,
new IgfsListingEntry(curId, dir || !pathIds.isLastIndex(curIdx))));
// Events support.
@@ -3699,7 +3213,7 @@ public class IgfsMetaManager extends IgfsManager {
String nextPart = pathIds.part(nextIdx);
IgniteUuid nextId = pathIds.surrogateId(nextIdx);
- id2InfoPrj.invoke(curId, new DirectoryCreateProcessor(createTime, dirProps,
+ id2InfoPrj.invoke(curId, new IgfsMetaDirectoryCreateProcessor(createTime, dirProps,
nextPart, new IgfsListingEntry(nextId, dir || !pathIds.isLastIndex(nextIdx))));
// Save event.
@@ -3720,9 +3234,9 @@ public class IgfsMetaManager extends IgfsManager {
IgfsEntryInfo info;
if (dir)
- info = invokeAndGet(curId, new DirectoryCreateProcessor(createTime, dirProps));
+ info = invokeAndGet(curId, new IgfsMetaDirectoryCreateProcessor(createTime, dirProps));
else
- info = invokeAndGet(curId, new FileCreateProcessor(createTime, fileProps,
+ info = invokeAndGet(curId, new IgfsMetaFileCreateProcessor(createTime, fileProps,
blockSize, affKey, createFileLockId(false), evictExclude));
createdPaths.add(pathIds.path());
@@ -3752,476 +3266,4 @@ public class IgfsMetaManager extends IgfsManager {
else
IgfsUtils.sendEvents(igfsCtx.kernalContext(), leafPath, EventType.EVT_IGFS_DIR_CREATED);
}
-
- /**
- * File create processor.
- */
- private static class FileCreateProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, IgfsEntryInfo>,
- Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Create time. */
- private long createTime;
-
- /** Properties. */
- private Map<String, String> props;
-
- /** Block size. */
- private int blockSize;
-
- /** Affintiy key. */
- private IgniteUuid affKey;
-
- /** Lcok ID. */
- private IgniteUuid lockId;
-
- /** Evict exclude flag. */
- private boolean evictExclude;
-
- /**
- * Constructor.
- */
- public FileCreateProcessor() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param createTime Create time.
- * @param props Properties.
- * @param blockSize Block size.
- * @param affKey Affinity key.
- * @param lockId Lock ID.
- * @param evictExclude Evict exclude flag.
- */
- public FileCreateProcessor(long createTime, Map<String, String> props, int blockSize,
- @Nullable IgniteUuid affKey, IgniteUuid lockId, boolean evictExclude) {
- this.createTime = createTime;
- this.props = props;
- this.blockSize = blockSize;
- this.affKey = affKey;
- this.lockId = lockId;
- this.evictExclude = evictExclude;
- }
-
- /** {@inheritDoc} */
- @Override public IgfsEntryInfo process(MutableEntry<IgniteUuid, IgfsEntryInfo> entry, Object... args)
- throws EntryProcessorException {
- IgfsEntryInfo info = IgfsUtils.createFile(
- entry.getKey(),
- blockSize,
- 0L,
- affKey,
- lockId,
- evictExclude,
- props,
- createTime,
- createTime
- );
-
- entry.setValue(info);
-
- return info;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeLong(createTime);
- U.writeStringMap(out, props);
- out.writeInt(blockSize);
- out.writeObject(affKey);
- out.writeObject(lockId);
- out.writeBoolean(evictExclude);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- createTime = in.readLong();
- props = U.readStringMap(in);
- blockSize = in.readInt();
- affKey = (IgniteUuid)in.readObject();
- lockId = (IgniteUuid)in.readObject();
- evictExclude = in.readBoolean();
- }
- }
-
- /**
- * Directory create processor.
- */
- private static class DirectoryCreateProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, IgfsEntryInfo>,
- Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Create time. */
- private long createTime;
-
- /** Properties. */
- private Map<String, String> props;
-
- /** Child name (optional). */
- private String childName;
-
- /** Child entry (optional. */
- private IgfsListingEntry childEntry;
-
- /**
- * Constructor.
- */
- public DirectoryCreateProcessor() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param createTime Create time.
- * @param props Properties.
- */
- public DirectoryCreateProcessor(long createTime, Map<String, String> props) {
- this(createTime, props, null, null);
- }
-
- /**
- * Constructor.
- *
- * @param createTime Create time.
- * @param props Properties.
- * @param childName Child name.
- * @param childEntry Child entry.
- */
- public DirectoryCreateProcessor(long createTime, Map<String, String> props, String childName,
- IgfsListingEntry childEntry) {
- this.createTime = createTime;
- this.props = props;
- this.childName = childName;
- this.childEntry = childEntry;
- }
-
- /** {@inheritDoc} */
- @Override public IgfsEntryInfo process(MutableEntry<IgniteUuid, IgfsEntryInfo> entry, Object... args)
- throws EntryProcessorException {
-
- IgfsEntryInfo info = IgfsUtils.createDirectory(
- entry.getKey(),
- null,
- props,
- createTime,
- createTime
- );
-
- if (childName != null)
- info = info.listing(Collections.singletonMap(childName, childEntry));
-
- entry.setValue(info);
-
- return info;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeLong(createTime);
- U.writeStringMap(out, props);
-
- if (childName != null) {
- out.writeBoolean(true);
-
- U.writeString(out, childName);
- out.writeObject(childEntry);
- }
- else
- out.writeBoolean(false);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- createTime = in.readLong();
- props = U.readStringMap(in);
-
- if (in.readBoolean()) {
- childName = U.readString(in);
- childEntry = (IgfsListingEntry)in.readObject();
- }
- }
- }
-
- /**
- * File lock entry processor.
- */
- private static class FileLockProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, IgfsEntryInfo>,
- Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Lock Id. */
- private IgniteUuid lockId;
-
- /**
- * Default constructor.
- */
- public FileLockProcessor() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param lockId Lock ID.
- */
- public FileLockProcessor(IgniteUuid lockId) {
- this.lockId = lockId;
- }
-
- /** {@inheritDoc} */
- @Override public IgfsEntryInfo process(MutableEntry<IgniteUuid, IgfsEntryInfo> entry, Object... args)
- throws EntryProcessorException {
- IgfsEntryInfo oldInfo = entry.getValue();
-
- IgfsEntryInfo newInfo = oldInfo.lock(lockId);
-
- entry.setValue(newInfo);
-
- return newInfo;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- U.writeGridUuid(out, lockId);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- lockId = U.readGridUuid(in);
- }
- }
-
- /**
- * File unlock entry processor.
- */
- private static class FileUnlockProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, Void>,
- Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Modification time. */
- private long modificationTime;
-
- /**
- * Default constructor.
- */
- public FileUnlockProcessor() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param modificationTime Modification time.
- */
- public FileUnlockProcessor(long modificationTime) {
- this.modificationTime = modificationTime;
- }
-
- /** {@inheritDoc} */
- @Override public Void process(MutableEntry<IgniteUuid, IgfsEntryInfo> entry, Object... args)
- throws EntryProcessorException {
- IgfsEntryInfo old = entry.getValue();
-
- entry.setValue(old.unlock(modificationTime));
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeLong(modificationTime);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- modificationTime = in.readLong();
- }
- }
-
- /**
- * File reserve space entry processor.
- */
- private static class FileReserveSpaceProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, IgfsEntryInfo>,
- Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Space. */
- private long space;
-
- /** Affinity range. */
- private IgfsFileAffinityRange affRange;
-
- /**
- * Default constructor.
- */
- public FileReserveSpaceProcessor() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param space Space.
- * @param affRange
- */
- public FileReserveSpaceProcessor(long space, IgfsFileAffinityRange affRange) {
- this.space = space;
- this.affRange = affRange;
- }
-
- /** {@inheritDoc} */
- @Override public IgfsEntryInfo process(MutableEntry<IgniteUuid, IgfsEntryInfo> entry, Object... args)
- throws EntryProcessorException {
- IgfsEntryInfo oldInfo = entry.getValue();
-
- IgfsFileMap newMap = new IgfsFileMap(oldInfo.fileMap());
-
- newMap.addRange(affRange);
-
- IgfsEntryInfo newInfo = oldInfo.length(oldInfo.length() + space).fileMap(newMap);
-
- entry.setValue(newInfo);
-
- return newInfo;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeLong(space);
- out.writeObject(affRange);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- space = in.readLong();
- affRange = (IgfsFileAffinityRange)in.readObject();
- }
- }
-
- /**
- * Update properties processor.
- */
- private static class UpdatePropertiesProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, IgfsEntryInfo>,
- Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Properties to be updated. */
- private Map<String, String> props;
-
- /**
- * Constructor.
- */
- public UpdatePropertiesProcessor() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param props Properties.
- */
- public UpdatePropertiesProcessor(Map<String, String> props) {
- this.props = props;
- }
-
- /** {@inheritDoc} */
- @Override public IgfsEntryInfo process(MutableEntry<IgniteUuid, IgfsEntryInfo> entry, Object... args)
- throws EntryProcessorException {
- IgfsEntryInfo oldInfo = entry.getValue();
-
- Map<String, String> tmp = oldInfo.properties();
-
- tmp = tmp == null ? new GridLeanMap<String, String>(props.size()) : new GridLeanMap<>(tmp);
-
- for (Map.Entry<String, String> e : props.entrySet()) {
- if (e.getValue() == null)
- // Remove properties with 'null' values.
- tmp.remove(e.getKey());
- else
- // Add/overwrite property.
- tmp.put(e.getKey(), e.getValue());
- }
-
- IgfsEntryInfo newInfo = oldInfo.properties(tmp);
-
- entry.setValue(newInfo);
-
- return newInfo;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- U.writeStringMap(out, props);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- props = U.readStringMap(in);
- }
- }
-
- /**
- * Update times entry processor.
- */
- private static class UpdateTimesProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, Void>,
- Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Access time. */
- private long accessTime;
-
- /** Modification time. */
- private long modificationTime;
-
- /**
- * Default constructor.
- */
- public UpdateTimesProcessor() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param accessTime Access time.
- * @param modificationTime Modification time.
- */
- public UpdateTimesProcessor(long accessTime, long modificationTime) {
- this.accessTime = accessTime;
- this.modificationTime = modificationTime;
- }
-
- /** {@inheritDoc} */
- @Override public Void process(MutableEntry<IgniteUuid, IgfsEntryInfo> entry, Object... args)
- throws EntryProcessorException {
-
- IgfsEntryInfo oldInfo = entry.getValue();
-
- entry.setValue(oldInfo.accessModificationTime(accessTime, modificationTime));
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeLong(accessTime);
- out.writeLong(modificationTime);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- accessTime = in.readLong();
- modificationTime = in.readLong();
- }
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/865e376a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java
index 2903239..e2fe58d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java
@@ -134,7 +134,7 @@ public class IgfsPathIds {
*
* @return Last ID.
*/
- @Nullable public IgniteUuid lastId() {
+ public IgniteUuid lastId() {
return ids[ids.length - 1];
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/865e376a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryCreateProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryCreateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryCreateProcessor.java
new file mode 100644
index 0000000..ffba042
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryCreateProcessor.java
@@ -0,0 +1,117 @@
+package org.apache.ignite.internal.processors.igfs.meta;
+
+import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo;
+import org.apache.ignite.internal.processors.igfs.IgfsListingEntry;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Directory create processor.
+ */
+public class IgfsMetaDirectoryCreateProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, IgfsEntryInfo>,
+ Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Create time. */
+ private long createTime;
+
+ /** Properties. */
+ private Map<String, String> props;
+
+ /** Child name (optional). */
+ private String childName;
+
+ /** Child entry (optional. */
+ private IgfsListingEntry childEntry;
+
+ /**
+ * Constructor.
+ */
+ public IgfsMetaDirectoryCreateProcessor() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param createTime Create time.
+ * @param props Properties.
+ */
+ public IgfsMetaDirectoryCreateProcessor(long createTime, Map<String, String> props) {
+ this(createTime, props, null, null);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param createTime Create time.
+ * @param props Properties.
+ * @param childName Child name.
+ * @param childEntry Child entry.
+ */
+ public IgfsMetaDirectoryCreateProcessor(long createTime, Map<String, String> props, String childName,
+ IgfsListingEntry childEntry) {
+ this.createTime = createTime;
+ this.props = props;
+ this.childName = childName;
+ this.childEntry = childEntry;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsEntryInfo process(MutableEntry<IgniteUuid, IgfsEntryInfo> entry, Object... args)
+ throws EntryProcessorException {
+
+ IgfsEntryInfo info = IgfsUtils.createDirectory(
+ entry.getKey(),
+ null,
+ props,
+ createTime,
+ createTime
+ );
+
+ if (childName != null)
+ info = info.listing(Collections.singletonMap(childName, childEntry));
+
+ entry.setValue(info);
+
+ return info;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeLong(createTime);
+ U.writeStringMap(out, props);
+
+ if (childName != null) {
+ out.writeBoolean(true);
+
+ U.writeString(out, childName);
+ out.writeObject(childEntry);
+ }
+ else
+ out.writeBoolean(false);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ createTime = in.readLong();
+ props = U.readStringMap(in);
+
+ if (in.readBoolean()) {
+ childName = U.readString(in);
+ childEntry = (IgfsListingEntry)in.readObject();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/865e376a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingAddProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingAddProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingAddProcessor.java
new file mode 100644
index 0000000..ab5cd5d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingAddProcessor.java
@@ -0,0 +1,92 @@
+package org.apache.ignite.internal.processors.igfs.meta;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo;
+import org.apache.ignite.internal.processors.igfs.IgfsListingEntry;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.MutableEntry;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Update directory listing closure.
+ */
+public final class IgfsMetaDirectoryListingAddProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, Void>,
+ Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** File name to add into parent listing. */
+ private String fileName;
+
+ /** File ID.*/
+ private IgfsListingEntry entry;
+
+ /**
+ * Empty constructor required for {@link Externalizable}.
+ *
+ */
+ public IgfsMetaDirectoryListingAddProcessor() {
+ // No-op.
+ }
+
+ /**
+ * Constructs update directory listing closure.
+ *
+ * @param fileName File name to add into parent listing.
+ * @param entry Listing entry to add or remove.
+ */
+ public IgfsMetaDirectoryListingAddProcessor(String fileName, IgfsListingEntry entry) {
+ assert fileName != null;
+ assert entry != null;
+
+ this.fileName = fileName;
+ this.entry = entry;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void process(MutableEntry<IgniteUuid, IgfsEntryInfo> e, Object... args) {
+ IgfsEntryInfo fileInfo = e.getValue();
+
+ assert fileInfo.isDirectory();
+
+ Map<String, IgfsListingEntry> listing = new HashMap<>(fileInfo.listing());
+
+ // Modify listing in-place.
+ IgfsListingEntry oldEntry = listing.put(fileName, entry);
+
+ if (oldEntry != null && !oldEntry.fileId().equals(entry.fileId()))
+ throw new IgniteException("Directory listing contains unexpected file" +
+ " [listing=" + listing + ", fileName=" + fileName + ", entry=" + entry +
+ ", oldEntry=" + oldEntry + ']');
+
+ e.setValue(fileInfo.listing(listing));
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeString(out, fileName);
+ out.writeObject(entry);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ fileName = U.readString(in);
+ entry = (IgfsListingEntry)in.readObject();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgfsMetaDirectoryListingAddProcessor.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/865e376a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingRemoveProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingRemoveProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingRemoveProcessor.java
new file mode 100644
index 0000000..181a73e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingRemoveProcessor.java
@@ -0,0 +1,89 @@
+package org.apache.ignite.internal.processors.igfs.meta;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo;
+import org.apache.ignite.internal.processors.igfs.IgfsListingEntry;
+import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Remove entry from directory listing.
+ */
+public class IgfsMetaDirectoryListingRemoveProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, Void>,
+ Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** File name. */
+ private String fileName;
+
+ /** Expected ID. */
+ private IgniteUuid fileId;
+
+ /**
+ * Default constructor.
+ */
+ public IgfsMetaDirectoryListingRemoveProcessor() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param fileName File name.
+ * @param fileId File ID.
+ */
+ public IgfsMetaDirectoryListingRemoveProcessor(String fileName, IgniteUuid fileId) {
+ this.fileName = fileName;
+ this.fileId = fileId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void process(MutableEntry<IgniteUuid, IgfsEntryInfo> e, Object... args)
+ throws EntryProcessorException {
+ IgfsEntryInfo fileInfo = e.getValue();
+
+ assert fileInfo != null;
+ assert fileInfo.isDirectory();
+
+ Map<String, IgfsListingEntry> listing = new HashMap<>(fileInfo.listing());
+
+ listing.putAll(fileInfo.listing());
+
+ IgfsListingEntry oldEntry = listing.get(fileName);
+
+ if (oldEntry == null || !oldEntry.fileId().equals(fileId))
+ throw new IgniteException("Directory listing doesn't contain expected file" +
+ " [listing=" + listing + ", fileName=" + fileName + "]");
+
+ // Modify listing in-place.
+ listing.remove(fileName);
+
+ e.setValue(fileInfo.listing(listing));
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeString(out, fileName);
+ U.writeGridUuid(out, fileId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ fileName = U.readString(in);
+ fileId = U.readGridUuid(in);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/865e376a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingReplaceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingReplaceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingReplaceProcessor.java
new file mode 100644
index 0000000..4c4888c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingReplaceProcessor.java
@@ -0,0 +1,84 @@
+package org.apache.ignite.internal.processors.igfs.meta;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo;
+import org.apache.ignite.internal.processors.igfs.IgfsListingEntry;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Listing replace processor.
+ */
+public final class IgfsMetaDirectoryListingReplaceProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, Void>,
+ Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Name. */
+ private String name;
+
+ /** New ID. */
+ private IgniteUuid id;
+
+ /**
+ * Constructor.
+ */
+ public IgfsMetaDirectoryListingReplaceProcessor() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param name Name.
+ * @param id ID.
+ */
+ public IgfsMetaDirectoryListingReplaceProcessor(String name, IgniteUuid id) {
+ this.name = name;
+ this.id = id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void process(MutableEntry<IgniteUuid, IgfsEntryInfo> e, Object... args)
+ throws EntryProcessorException {
+ IgfsEntryInfo fileInfo = e.getValue();
+
+ assert fileInfo.isDirectory();
+
+ Map<String, IgfsListingEntry> listing = new HashMap<>(fileInfo.listing());
+
+ // Modify listing in-place.
+ IgfsListingEntry oldEntry = listing.get(name);
+
+ if (oldEntry == null)
+ throw new IgniteException("Directory listing doesn't contain expected entry: " + name);
+
+ listing.put(name, new IgfsListingEntry(id, oldEntry.isDirectory()));
+
+ e.setValue(fileInfo.listing(listing));
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeString(out, name);
+ out.writeObject(id);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ name = U.readString(in);
+ id = (IgniteUuid)in.readObject();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/865e376a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileCreateProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileCreateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileCreateProcessor.java
new file mode 100644
index 0000000..a07d764
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileCreateProcessor.java
@@ -0,0 +1,110 @@
+package org.apache.ignite.internal.processors.igfs.meta;
+
+import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Map;
+
+/**
+ * File create processor.
+ */
+public class IgfsMetaFileCreateProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, IgfsEntryInfo>,
+ Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Create time. */
+ private long createTime;
+
+ /** Properties. */
+ private Map<String, String> props;
+
+ /** Block size. */
+ private int blockSize;
+
+ /** Affintiy key. */
+ private IgniteUuid affKey;
+
+ /** Lcok ID. */
+ private IgniteUuid lockId;
+
+ /** Evict exclude flag. */
+ private boolean evictExclude;
+
+ /**
+ * Constructor.
+ */
+ public IgfsMetaFileCreateProcessor() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param createTime Create time.
+ * @param props Properties.
+ * @param blockSize Block size.
+ * @param affKey Affinity key.
+ * @param lockId Lock ID.
+ * @param evictExclude Evict exclude flag.
+ */
+ public IgfsMetaFileCreateProcessor(long createTime, Map<String, String> props, int blockSize,
+ @Nullable IgniteUuid affKey, IgniteUuid lockId, boolean evictExclude) {
+ this.createTime = createTime;
+ this.props = props;
+ this.blockSize = blockSize;
+ this.affKey = affKey;
+ this.lockId = lockId;
+ this.evictExclude = evictExclude;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsEntryInfo process(MutableEntry<IgniteUuid, IgfsEntryInfo> entry, Object... args)
+ throws EntryProcessorException {
+ IgfsEntryInfo info = IgfsUtils.createFile(
+ entry.getKey(),
+ blockSize,
+ 0L,
+ affKey,
+ lockId,
+ evictExclude,
+ props,
+ createTime,
+ createTime
+ );
+
+ entry.setValue(info);
+
+ return info;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeLong(createTime);
+ U.writeStringMap(out, props);
+ out.writeInt(blockSize);
+ out.writeObject(affKey);
+ out.writeObject(lockId);
+ out.writeBoolean(evictExclude);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ createTime = in.readLong();
+ props = U.readStringMap(in);
+ blockSize = in.readInt();
+ affKey = (IgniteUuid)in.readObject();
+ lockId = (IgniteUuid)in.readObject();
+ evictExclude = in.readBoolean();
+ }
+}