You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/10/22 11:09:27 UTC
[15/50] [abbrv] ignite git commit: IGNITE-1590: Reworked create and
append operations to match overall design.
IGNITE-1590: Reworked create and append operations to match overall design.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/962fcce3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/962fcce3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/962fcce3
Branch: refs/heads/ignite-1486
Commit: 962fcce3acbecd028c4787a6255fedcdcbdf9db1
Parents: 6844370
Author: iveselovskiy <iv...@gridgain.com>
Authored: Wed Oct 14 15:59:57 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Oct 14 15:59:57 2015 +0300
----------------------------------------------------------------------
.../processors/igfs/IgfsDataManager.java | 2 -
.../processors/igfs/IgfsDeleteWorker.java | 102 ++-
.../internal/processors/igfs/IgfsImpl.java | 164 +---
.../processors/igfs/IgfsMetaManager.java | 897 ++++++++++++-------
.../processors/igfs/IgfsOutputStreamImpl.java | 2 +
.../internal/processors/igfs/IgfsUtils.java | 23 +
.../ignite/igfs/IgfsEventsAbstractSelfTest.java | 6 +-
.../processors/igfs/IgfsAbstractSelfTest.java | 639 ++++++++++---
.../igfs/IgfsDataManagerSelfTest.java | 13 +-
.../igfs/IgfsMetaManagerSelfTest.java | 170 ++--
.../processors/igfs/IgfsProcessorSelfTest.java | 12 +-
.../ignite/testsuites/IgniteIgfsTestSuite.java | 6 +
.../ignite/igfs/Hadoop1DualAbstractTest.java | 5 -
.../HadoopIgfs20FileSystemAbstractSelfTest.java | 5 +-
.../IgniteHadoopFileSystemAbstractSelfTest.java | 2 +-
15 files changed, 1289 insertions(+), 759 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/962fcce3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index b1b51f9..125d728 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -580,8 +580,6 @@ public class IgfsDataManager extends IgfsManager {
* @return Delete future that will be completed when file is actually erased.
*/
public IgniteInternalFuture<Object> delete(IgfsFileInfo fileInfo) {
- //assert validTxState(any); // Allow this method call for any transaction state.
-
if (!fileInfo.isFile()) {
if (log.isDebugEnabled())
log.debug("Cannot delete content of not-data file: " + fileInfo);
http://git-wip-us.apache.org/repos/asf/ignite/blob/962fcce3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
index 98672e8..95a6a5d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
@@ -27,12 +27,10 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.events.IgfsEvent;
import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
-import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.LT;
@@ -62,9 +60,6 @@ public class IgfsDeleteWorker extends IgfsThread {
/** Data manager. */
private final IgfsDataManager data;
- /** Event manager. */
- private final GridEventStorageManager evts;
-
/** Logger. */
private final IgniteLogger log;
@@ -96,8 +91,6 @@ public class IgfsDeleteWorker extends IgfsThread {
meta = igfsCtx.meta();
data = igfsCtx.data();
- evts = igfsCtx.kernalContext().event();
-
String igfsName = igfsCtx.igfs().name();
topic = F.isEmpty(igfsName) ? TOPIC_IGFS : TOPIC_IGFS.topic(igfsName);
@@ -147,6 +140,9 @@ public class IgfsDeleteWorker extends IgfsThread {
}
}
+ /**
+ * Cancels the worker.
+ */
void cancel() {
cancelled = true;
@@ -218,7 +214,8 @@ public class IgfsDeleteWorker extends IgfsThread {
if (info != null) {
if (info.isDirectory()) {
- deleteDirectory(TRASH_ID, id);
+ if (!deleteDirectoryContents(TRASH_ID, id))
+ return false;
if (meta.delete(TRASH_ID, name, id))
return true;
@@ -226,19 +223,22 @@ public class IgfsDeleteWorker extends IgfsThread {
else {
assert info.isFile();
+ // Lock the file with special lock Id to prevent concurrent writing:
+ IgfsFileInfo lockedInfo = meta.lock(id, true);
+
+ if (lockedInfo == null)
+ return false; // File is locked, we cannot delete it.
+
+ assert id.equals(lockedInfo.id());
+
// Delete file content first.
// In case this node crashes, other node will re-delete the file.
- data.delete(info).get();
+ data.delete(lockedInfo).get();
boolean ret = meta.delete(TRASH_ID, name, id);
- if (evts.isRecordable(EVT_IGFS_FILE_PURGED)) {
- if (info.path() != null)
- evts.record(new IgfsEvent(info.path(),
- igfsCtx.kernalContext().discovery().localNode(), EVT_IGFS_FILE_PURGED));
- else
- LT.warn(log, null, "Removing file without path info: " + info);
- }
+ if (info.path() != null)
+ IgfsUtils.sendEvents(igfsCtx.kernalContext(), info.path(), EVT_IGFS_FILE_PURGED);
return ret;
}
@@ -253,9 +253,10 @@ public class IgfsDeleteWorker extends IgfsThread {
*
* @param parentId Parent ID.
* @param id Entry id.
+ * @return true iff all the items in the directory were deleted (directory is seen to be empty).
* @throws IgniteCheckedException If delete failed for some reason.
*/
- private void deleteDirectory(IgniteUuid parentId, IgniteUuid id) throws IgniteCheckedException {
+ private boolean deleteDirectoryContents(IgniteUuid parentId, final IgniteUuid id) throws IgniteCheckedException {
assert parentId != null;
assert id != null;
@@ -265,47 +266,50 @@ public class IgfsDeleteWorker extends IgfsThread {
if (info != null) {
assert info.isDirectory();
- Map<String, IgfsListingEntry> listing = info.listing();
+ final Map<String, IgfsListingEntry> listing = info.listing();
if (listing.isEmpty())
- return; // Directory is empty.
+ return true; // Directory is empty.
- Map<String, IgfsListingEntry> delListing;
+ final Map<String, IgfsListingEntry> delListing = new HashMap<>(MAX_DELETE_BATCH, 1.0f);
- if (listing.size() <= MAX_DELETE_BATCH)
- delListing = listing;
- else {
- delListing = new HashMap<>(MAX_DELETE_BATCH, 1.0f);
+ final GridCompoundFuture<Object, ?> fut = new GridCompoundFuture<>();
- int i = 0;
+ int failedFiles = 0;
- for (Map.Entry<String, IgfsListingEntry> entry : listing.entrySet()) {
- delListing.put(entry.getKey(), entry.getValue());
+ for (final Map.Entry<String, IgfsListingEntry> entry : listing.entrySet()) {
+ if (cancelled)
+ return false;
- if (++i == MAX_DELETE_BATCH)
- break;
+ if (entry.getValue().isDirectory()) {
+ if (deleteDirectoryContents(id, entry.getValue().fileId())) // *** Recursive call.
+ delListing.put(entry.getKey(), entry.getValue());
+ else
+ failedFiles++;
}
- }
+ else {
+ IgfsFileInfo fileInfo = meta.info(entry.getValue().fileId());
- GridCompoundFuture<Object, ?> fut = new GridCompoundFuture<>();
+ if (fileInfo != null) {
+ assert fileInfo.isFile();
- // Delegate to child folders.
- for (IgfsListingEntry entry : delListing.values()) {
- if (!cancelled) {
- if (entry.isDirectory())
- deleteDirectory(id, entry.fileId());
- else {
- IgfsFileInfo fileInfo = meta.info(entry.fileId());
+ IgfsFileInfo lockedInfo = meta.lock(fileInfo.id(), true);
+
+ if (lockedInfo == null)
+ // File is already locked:
+ failedFiles++;
+ else {
+ assert IgfsMetaManager.DELETE_LOCK_ID.equals(lockedInfo.lockId());
- if (fileInfo != null) {
- assert fileInfo.isFile();
+ fut.add(data.delete(lockedInfo));
- fut.add(data.delete(fileInfo));
+ delListing.put(entry.getKey(), entry.getValue());
}
}
}
- else
- return;
+
+ if (delListing.size() == MAX_DELETE_BATCH)
+ break;
}
fut.markInitialized();
@@ -318,17 +322,21 @@ public class IgfsDeleteWorker extends IgfsThread {
// This future can be cancelled only due to IGFS shutdown.
cancelled = true;
- return;
+ return false;
}
// Actual delete of folder content.
Collection<IgniteUuid> delIds = meta.delete(id, delListing);
- if (delListing == listing && delListing.size() == delIds.size())
- break; // All entries were deleted.
+ if (listing.size() == delIds.size())
+ return true; // All entries were deleted.
+
+ if (listing.size() == delListing.size() + failedFiles)
+ // All the files were tried, no reason to continue the loop:
+ return false;
}
else
- break; // Entry was deleted concurrently.
+ return true; // Directory entry was deleted concurrently.
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/962fcce3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index fa3a955..0d5cda3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -60,7 +60,6 @@ import org.apache.ignite.igfs.IgfsMetrics;
import org.apache.ignite.igfs.IgfsMode;
import org.apache.ignite.igfs.IgfsOutputStream;
import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.igfs.IgfsPathAlreadyExistsException;
import org.apache.ignite.igfs.IgfsPathIsDirectoryException;
import org.apache.ignite.igfs.IgfsPathNotFoundException;
import org.apache.ignite.igfs.IgfsPathSummary;
@@ -97,7 +96,6 @@ import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_DELETED;
import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_RENAMED;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_READ;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_WRITE;
-import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CREATED;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_DELETED;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_READ;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_WRITE;
@@ -112,7 +110,6 @@ import static org.apache.ignite.igfs.IgfsMode.PROXY;
import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGFS;
import static org.apache.ignite.internal.processors.igfs.IgfsFileInfo.ROOT_ID;
-import static org.apache.ignite.internal.processors.igfs.IgfsFileInfo.TRASH_ID;
/**
* Cache-based IGFS implementation.
@@ -122,7 +119,7 @@ public final class IgfsImpl implements IgfsEx {
private static final String PERMISSION_DFLT_VAL = "0777";
/** Default directory metadata. */
- private static final Map<String, String> DFLT_DIR_META = F.asMap(PROP_PERMISSION, PERMISSION_DFLT_VAL);
+ static final Map<String, String> DFLT_DIR_META = F.asMap(PROP_PERMISSION, PERMISSION_DFLT_VAL);
/** Handshake message. */
private final IgfsPaths secondaryPaths;
@@ -740,14 +737,9 @@ public final class IgfsImpl implements IgfsEx {
}
// Record event if needed.
- if (res && desc != null) {
- if (desc.isFile) {
- if (evts.isRecordable(EVT_IGFS_FILE_DELETED))
- evts.record(new IgfsEvent(path, localNode(), EVT_IGFS_FILE_DELETED));
- }
- else if (evts.isRecordable(EVT_IGFS_DIR_DELETED))
- evts.record(new IgfsEvent(path, localNode(), EVT_IGFS_DIR_DELETED));
- }
+ if (res && desc != null)
+ IgfsUtils.sendEvents(igfsCtx.kernalContext(), path,
+ desc.isFile ? EVT_IGFS_FILE_DELETED : EVT_IGFS_DIR_DELETED);
return res;
}
@@ -928,8 +920,7 @@ public final class IgfsImpl implements IgfsEx {
IgfsEventAwareInputStream os = new IgfsEventAwareInputStream(igfsCtx, path, desc.info(),
cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, desc.reader(), metrics);
- if (evts.isRecordable(EVT_IGFS_FILE_OPENED_READ))
- evts.record(new IgfsEvent(path, localNode(), EVT_IGFS_FILE_OPENED_READ));
+ IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ);
return os;
}
@@ -949,8 +940,7 @@ public final class IgfsImpl implements IgfsEx {
IgfsEventAwareInputStream os = new IgfsEventAwareInputStream(igfsCtx, path, info,
cfg.getPrefetchBlocks(), seqReadsBeforePrefetch, null, metrics);
- if (evts.isRecordable(EVT_IGFS_FILE_OPENED_READ))
- evts.record(new IgfsEvent(path, localNode(), EVT_IGFS_FILE_OPENED_READ));
+ IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_READ);
return os;
}
@@ -1004,7 +994,7 @@ public final class IgfsImpl implements IgfsEx {
log.debug("Open file for writing [path=" + path + ", bufSize=" + bufSize + ", overwrite=" +
overwrite + ", props=" + props + ']');
- IgfsMode mode = resolveMode(path);
+ final IgfsMode mode = resolveMode(path);
IgfsFileWorkerBatch batch;
@@ -1021,71 +1011,28 @@ public final class IgfsImpl implements IgfsEx {
IgfsEventAwareOutputStream os = new IgfsEventAwareOutputStream(path, desc.info(), desc.parentId(),
bufSize == 0 ? cfg.getStreamBufferSize() : bufSize, mode, batch);
- if (evts.isRecordable(EVT_IGFS_FILE_OPENED_WRITE))
- evts.record(new IgfsEvent(path, localNode(), EVT_IGFS_FILE_OPENED_WRITE));
+ IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_WRITE);
return os;
}
- // Re-create parents when working in PRIMARY mode. In DUAL mode this is done by MetaManager.
- IgfsPath parent = path.parent();
-
- // Create missing parent directories if necessary.
- if (parent != null)
- mkdirs(parent, props);
-
- List<IgniteUuid> ids = meta.fileIds(path);
-
- // Resolve parent ID for file.
- IgniteUuid parentId = ids.size() >= 2 ? ids.get(ids.size() - 2) : null;
-
- if (parentId == null)
- throw new IgfsPathNotFoundException("Failed to resolve parent directory: " + parent);
-
- String fileName = path.name();
-
- // Constructs new file info.
- IgfsFileInfo info = new IgfsFileInfo(cfg.getBlockSize(), affKey, evictExclude(path, true), props);
-
- // Add new file into tree structure.
- while (true) {
- IgniteUuid oldId = meta.putIfAbsent(parentId, fileName, info);
-
- if (oldId == null)
- break;
-
- if (!overwrite)
- throw new IgfsPathAlreadyExistsException("Failed to create file (file already exists): " +
- path);
-
- IgfsFileInfo oldInfo = meta.info(oldId);
-
- assert oldInfo != null;
-
- if (oldInfo.isDirectory())
- throw new IgfsPathAlreadyExistsException("Failed to create file (path points to a " +
- "directory): " + path);
+ final Map<String, String> dirProps, fileProps;
- // Remove old file from the tree.
- // Only one file is deleted, so we use internal data streamer.
- deleteFile(path, new FileDescriptor(parentId, fileName, oldId, oldInfo.isFile()), false);
+ if (props == null) {
+ dirProps = DFLT_DIR_META;
- if (evts.isRecordable(EVT_IGFS_FILE_DELETED))
- evts.record(new IgfsEvent(path, localNode(), EVT_IGFS_FILE_DELETED));
+ fileProps = null;
}
+ else
+ dirProps = fileProps = new HashMap<>(props);
- if (evts.isRecordable(EVT_IGFS_FILE_CREATED))
- evts.record(new IgfsEvent(path, localNode(), EVT_IGFS_FILE_CREATED));
+ IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = meta.create(path, false/*append*/, overwrite, dirProps,
+ cfg.getBlockSize(), affKey, evictExclude(path, true), fileProps);
- info = meta.lock(info.id());
+ assert t2 != null;
- IgfsEventAwareOutputStream os = new IgfsEventAwareOutputStream(path, info, parentId,
+ return new IgfsEventAwareOutputStream(path, t2.get1(), t2.get2(),
bufSize == 0 ? cfg.getStreamBufferSize() : bufSize, mode, null);
-
- if (evts.isRecordable(EVT_IGFS_FILE_OPENED_WRITE))
- evts.record(new IgfsEvent(path, localNode(), EVT_IGFS_FILE_OPENED_WRITE));
-
- return os;
}
});
}
@@ -1107,7 +1054,7 @@ public final class IgfsImpl implements IgfsEx {
log.debug("Open file for appending [path=" + path + ", bufSize=" + bufSize + ", create=" + create +
", props=" + props + ']');
- IgfsMode mode = resolveMode(path);
+ final IgfsMode mode = resolveMode(path);
IgfsFileWorkerBatch batch;
@@ -1124,46 +1071,39 @@ public final class IgfsImpl implements IgfsEx {
bufSize == 0 ? cfg.getStreamBufferSize() : bufSize, mode, batch);
}
- List<IgniteUuid> ids = meta.fileIds(path);
+ final List<IgniteUuid> ids = meta.fileIds(path);
- IgfsFileInfo info = meta.info(ids.get(ids.size() - 1));
+ final IgniteUuid id = ids.get(ids.size() - 1);
- // Resolve parent ID for the file.
- IgniteUuid parentId = ids.size() >= 2 ? ids.get(ids.size() - 2) : null;
-
- if (info == null) {
+ if (id == null) {
if (!create) {
checkConflictWithPrimary(path);
throw new IgfsPathNotFoundException("File not found: " + path);
}
+ }
- if (parentId == null)
- throw new IgfsPathNotFoundException("Failed to resolve parent directory: " + path.parent());
-
- info = new IgfsFileInfo(cfg.getBlockSize(), /**affinity key*/null, evictExclude(path, true), props);
+ // Prevent attempt to append to ROOT in early stage:
+ if (ids.size() == 1)
+ throw new IgfsPathIsDirectoryException("Failed to open file (not a file): " + path);
- IgniteUuid oldId = meta.putIfAbsent(parentId, path.name(), info);
+ final Map<String, String> dirProps, fileProps;
- if (oldId != null)
- info = meta.info(oldId);
+ if (props == null) {
+ dirProps = DFLT_DIR_META;
- if (evts.isRecordable(EVT_IGFS_FILE_CREATED))
- evts.record(new IgfsEvent(path, localNode(), EVT_IGFS_FILE_CREATED));
+ fileProps = null;
}
+ else
+ dirProps = fileProps = new HashMap<>(props);
- assert info != null;
-
- if (!info.isFile())
- throw new IgfsPathIsDirectoryException("Failed to open file (not a file): " + path);
-
- info = meta.lock(info.id());
+ IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = meta.create(path, true/*append*/, false/*overwrite*/,
+ dirProps, cfg.getBlockSize(), null/*affKey*/, evictExclude(path, true), fileProps);
- if (evts.isRecordable(EVT_IGFS_FILE_OPENED_WRITE))
- evts.record(new IgfsEvent(path, localNode(), EVT_IGFS_FILE_OPENED_WRITE));
+ assert t2 != null;
- return new IgfsEventAwareOutputStream(path, info, parentId, bufSize == 0 ?
- cfg.getStreamBufferSize() : bufSize, mode, null);
+ return new IgfsEventAwareOutputStream(path, t2.get1(), t2.get2(),
+ bufSize == 0 ? cfg.getStreamBufferSize() : bufSize, mode, null);
}
});
}
@@ -1451,30 +1391,6 @@ public final class IgfsImpl implements IgfsEx {
}
/**
- * Remove file from the file system (structure and data).
- *
- * @param path Path of the deleted file.
- * @param desc Detailed file descriptor to remove.
- * @param rmvLocked Whether to remove this entry in case it is has explicit lock.
- * @throws IgniteCheckedException If failed.
- */
- private void deleteFile(IgfsPath path, FileDescriptor desc, boolean rmvLocked) throws IgniteCheckedException {
- IgniteUuid parentId = desc.parentId;
- IgniteUuid fileId = desc.fileId;
-
- if (parentId == null || ROOT_ID.equals(fileId)) {
- assert parentId == null && ROOT_ID.equals(fileId) : "Invalid file descriptor: " + desc;
-
- return; // Never remove the root directory!
- }
-
- if (TRASH_ID.equals(fileId))
- return; // Never remove trash directory.
-
- meta.removeIfEmpty(parentId, desc.fileName, fileId, path, rmvLocked);
- }
-
- /**
* Check whether IGFS with the same name exists among provided attributes.
*
* @param attrs Attributes.
@@ -2005,13 +1921,13 @@ public final class IgfsImpl implements IgfsEx {
/**
* Perform IGFS operation in safe context.
*
- * @param action Action.
+ * @param act Action.
* @return Result.
*/
- private <T> T safeOp(Callable<T> action) {
+ private <T> T safeOp(Callable<T> act) {
if (enterBusy()) {
try {
- return action.call();
+ return act.call();
}
catch (Exception e) {
throw IgfsUtils.toIgfsException(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/962fcce3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 927067a..c016e46 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -37,8 +37,11 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -46,6 +49,7 @@ import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.events.EventType;
import org.apache.ignite.events.IgfsEvent;
import org.apache.ignite.igfs.IgfsConcurrentModificationException;
import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException;
@@ -73,9 +77,10 @@ import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
@@ -83,9 +88,8 @@ import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_CREATED;
import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_RENAMED;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CREATED;
-import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_DELETED;
-import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_PURGED;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_RENAMED;
+import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_WRITE;
import static org.apache.ignite.internal.processors.igfs.IgfsFileInfo.ROOT_ID;
import static org.apache.ignite.internal.processors.igfs.IgfsFileInfo.TRASH_ID;
import static org.apache.ignite.internal.processors.igfs.IgfsFileInfo.builder;
@@ -97,6 +101,9 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA
*/
@SuppressWarnings("all")
public class IgfsMetaManager extends IgfsManager {
+ /** Lock Id used to lock files being deleted from TRASH. This is a global constant. */
+ static final IgniteUuid DELETE_LOCK_ID = new IgniteUuid(new UUID(0L, 0L), 0L);
+
/** Comparator for Id sorting. */
private static final Comparator<IgniteUuid> PATH_ID_SORTING_COMPARATOR
= new Comparator<IgniteUuid>() {
@@ -295,7 +302,8 @@ public class IgfsMetaManager extends IgfsManager {
* @return File ID.
* @throws IgniteCheckedException If failed.
*/
- @Nullable private IgniteUuid fileId(IgniteUuid parentId, String fileName, boolean skipTx) throws IgniteCheckedException {
+ @Nullable private IgniteUuid fileId(IgniteUuid parentId, String fileName, boolean skipTx)
+ throws IgniteCheckedException {
IgfsListingEntry entry = directoryListing(parentId, skipTx).get(fileName);
if (entry == null) {
@@ -464,9 +472,9 @@ public class IgfsMetaManager extends IgfsManager {
*
* @param fileId File ID to lock.
* @return Locked file info or {@code null} if file cannot be locked or doesn't exist.
- * @throws IgniteCheckedException If failed.
+ * @throws IgniteCheckedException If the file with such id does not exist, or on another failure.
*/
- public IgfsFileInfo lock(IgniteUuid fileId) throws IgniteCheckedException {
+ public @Nullable IgfsFileInfo lock(IgniteUuid fileId, boolean isDeleteLock) throws IgniteCheckedException {
if (busyLock.enterBusy()) {
try {
assert validTxState(false);
@@ -479,14 +487,19 @@ public class IgfsMetaManager extends IgfsManager {
IgfsFileInfo oldInfo = info(fileId);
if (oldInfo == null)
- throw new IgniteCheckedException("Failed to lock file (file not found): " + fileId);
+ return null;
- IgfsFileInfo newInfo = lockInfo(oldInfo);
+ if (oldInfo.lockId() != null)
+ return null; // The file is already locked, we cannot lock it.
- boolean put = metaCache.put(fileId, newInfo);
+ IgfsFileInfo newInfo = lockInfo(oldInfo, isDeleteLock);
+
+ boolean put = metaCache.replace(fileId, oldInfo, newInfo);
assert put : "Value was not stored in cache [fileId=" + fileId + ", newInfo=" + newInfo + ']';
+ assert newInfo.id().equals(oldInfo.id()); // Same id.
+
tx.commit();
return newInfo;
@@ -510,26 +523,26 @@ public class IgfsMetaManager extends IgfsManager {
* Set lock on file info.
*
* @param info File info.
- * @return New file info with lock set.
+ * @return New file info with lock set, or null if the info passed in is already locked.
* @throws IgniteCheckedException In case lock is already set on that file.
*/
- public IgfsFileInfo lockInfo(IgfsFileInfo info) throws IgniteCheckedException {
- if (busyLock.enterBusy()) {
- try {
- assert info != null;
+ private static @Nullable IgfsFileInfo lockInfo(IgfsFileInfo info, boolean isDeleteLock) {
+ assert info != null;
- if (info.lockId() != null)
- throw new IgniteCheckedException("Failed to lock file (file is being concurrently written) [fileId=" +
- info.id() + ", lockId=" + info.lockId() + ']');
+ if (info.lockId() != null)
+ return null; // Null return value indicates that the file is already locked.
- return new IgfsFileInfo(info, IgniteUuid.randomUuid(), info.modificationTime());
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- else
- throw new IllegalStateException("Failed to get lock info because Grid is stopping: " + info);
+ return new IgfsFileInfo(info, composeLockId(isDeleteLock), info.modificationTime());
+ }
+
+ /**
+ * Gets a new lock id.
+ *
+ * @param isDeleteLock if this is special delete lock.
+ * @return The new lock id.
+ */
+ private static IgniteUuid composeLockId(boolean isDeleteLock) {
+ return isDeleteLock ? DELETE_LOCK_ID : IgniteUuid.randomUuid();
}
/**
@@ -556,23 +569,28 @@ public class IgfsMetaManager extends IgfsManager {
try {
IgfsUtils.doInTransactionWithRetries(metaCache, new IgniteOutClosureX<Void>() {
@Override public Void applyx() throws IgniteCheckedException {
+ assert validTxState(true);
+
IgniteUuid fileId = info.id();
// Lock file ID for this transaction.
IgfsFileInfo oldInfo = info(fileId);
if (oldInfo == null)
- throw fsException(new IgfsPathNotFoundException("Failed to unlock file (file not found): " + fileId));
+ throw fsException(new IgfsPathNotFoundException("Failed to unlock file (file not " +
+ "found): " + fileId));
if (!info.lockId().equals(oldInfo.lockId()))
- throw new IgniteCheckedException("Failed to unlock file (inconsistent file lock ID) [fileId=" + fileId +
- ", lockId=" + info.lockId() + ", actualLockId=" + oldInfo.lockId() + ']');
+ throw new IgniteCheckedException("Failed to unlock file (inconsistent file lock ID) " +
+ "[fileId=" + fileId + ", lockId=" + info.lockId() + ", actualLockId=" +
+ oldInfo.lockId() + ']');
IgfsFileInfo newInfo = new IgfsFileInfo(oldInfo, null, modificationTime);
boolean put = metaCache.put(fileId, newInfo);
- assert put : "Value was not stored in cache [fileId=" + fileId + ", newInfo=" + newInfo + ']';
+ assert put : "Value was not stored in cache [fileId=" + fileId + ", newInfo=" + newInfo
+ + ']';
return null;
}
@@ -668,7 +686,8 @@ public class IgfsMetaManager extends IgfsManager {
* @param id The id to check.
* @throws IgniteCheckedException On error.
*/
- private void addInfoIfNeeded(Collection<IgniteUuid> fileIds, Map<IgniteUuid, IgfsFileInfo> map, IgniteUuid id) throws IgniteCheckedException {
+ private void addInfoIfNeeded(Collection<IgniteUuid> fileIds, Map<IgniteUuid, IgfsFileInfo> map, IgniteUuid id)
+ throws IgniteCheckedException {
assert validTxState(true);
if (fileIds.contains(id) && !map.containsKey(id)) {
@@ -773,7 +792,8 @@ public class IgfsMetaManager extends IgfsManager {
* @return Directory listing for the specified file.*
* @throws IgniteCheckedException If failed.
*/
- private Map<String, IgfsListingEntry> directoryListing(IgniteUuid fileId, boolean skipTx) throws IgniteCheckedException {
+ private Map<String, IgfsListingEntry> directoryListing(IgniteUuid fileId, boolean skipTx)
+ throws IgniteCheckedException {
assert fileId != null;
IgfsFileInfo info = skipTx ? id2InfoPrj.getAllOutTx(Collections.singleton(fileId)).get(fileId) :
@@ -783,48 +803,6 @@ public class IgfsMetaManager extends IgfsManager {
}
/**
- * Add file into file system structure.
- *
- * @param parentId Parent file ID.
- * @param fileName File name in the parent's listing.
- * @param newFileInfo File info to store in the parent's listing.
- * @return File id already stored in meta cache or {@code null} if passed file info was stored.
- * @throws IgniteCheckedException If failed.
- */
- public IgniteUuid putIfAbsent(IgniteUuid parentId, String fileName, IgfsFileInfo newFileInfo)
- throws IgniteCheckedException {
- if (busyLock.enterBusy()) {
- try {
- assert validTxState(false);
- assert parentId != null;
- assert fileName != null;
- assert newFileInfo != null;
-
- IgniteUuid res = null;
-
- IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
-
- try {
- res = putIfAbsentNonTx(parentId, fileName, newFileInfo);
-
- tx.commit();
- }
- finally {
- tx.close();
- }
-
- return res;
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- else
- throw new IllegalStateException("Failed to put file because Grid is stopping [parentId=" + parentId +
- ", fileName=" + fileName + ", newFileInfo=" + newFileInfo + ']');
- }
-
- /**
* Add file into file system structure. Do not create new transaction expecting that the one already exists.
*
* @param parentId Parent file ID.
@@ -845,7 +823,8 @@ public class IgfsMetaManager extends IgfsManager {
IgfsFileInfo parentInfo = info(parentId);
if (parentInfo == null)
- throw fsException(new IgfsPathNotFoundException("Failed to lock parent directory (not found): " + parentId));
+ throw fsException(new IgfsPathNotFoundException("Failed to lock parent directory (not found): " +
+ parentId));
if (!parentInfo.isDirectory())
throw fsException(new IgfsPathIsNotDirectoryException("Parent file is not a directory: " + parentInfo));
@@ -1122,124 +1101,6 @@ public class IgfsMetaManager extends IgfsManager {
}
/**
- * Remove file from the file system structure.
- *
- * @param parentId Parent file ID.
- * @param fileName New file name in the parent's listing.
- * @param fileId File ID to remove.
- * @param path Path of the deleted file.
- * @param rmvLocked Whether to remove this entry in case it is has explicit lock.
- * @return The last actual file info or {@code null} if such file no more exist.
- * @throws IgniteCheckedException If failed.
- */
- @Nullable public IgfsFileInfo removeIfEmpty(IgniteUuid parentId, String fileName, IgniteUuid fileId,
- IgfsPath path, boolean rmvLocked)
- throws IgniteCheckedException {
- if (busyLock.enterBusy()) {
- try {
- assert validTxState(false);
-
- IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
-
- try {
- if (parentId != null)
- lockIds(parentId, fileId, TRASH_ID);
- else
- lockIds(fileId, TRASH_ID);
-
- IgfsFileInfo fileInfo = removeIfEmptyNonTx(parentId, fileName, fileId, path, rmvLocked);
-
- tx.commit();
-
- delWorker.signal();
-
- return fileInfo;
- }
- finally {
- tx.close();
- }
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- else
- throw new IllegalStateException("Failed to remove file system entry because Grid is stopping [parentId=" +
- parentId + ", fileName=" + fileName + ", fileId=" + fileId + ", path=" + path + ']');
- }
-
- /**
- * Remove file from the file system structure in existing transaction.
- *
- * @param parentId Parent file ID.
- * @param fileName New file name in the parent's listing.
- * @param fileId File ID to remove.
- * @param path Path of the deleted file.
- * @param rmvLocked Whether to remove this entry in case it has explicit lock.
- * @return The last actual file info or {@code null} if such file no more exist.
- * @throws IgniteCheckedException If failed.
- */
- @Nullable private IgfsFileInfo removeIfEmptyNonTx(@Nullable IgniteUuid parentId, String fileName, IgniteUuid fileId,
- IgfsPath path, boolean rmvLocked)
- throws IgniteCheckedException {
- assert validTxState(true);
- assert parentId != null;
- assert fileName != null;
- assert fileId != null;
- assert !ROOT_ID.equals(fileId);
-
- if (log.isDebugEnabled())
- log.debug("Remove file: [parentId=" + parentId + ", fileName= " + fileName + ", fileId=" + fileId + ']');
-
- // Safe gets because locks are obtained in removeIfEmpty.
- IgfsFileInfo fileInfo = id2InfoPrj.get(fileId);
- IgfsFileInfo parentInfo = id2InfoPrj.get(parentId);
-
- if (fileInfo == null || parentInfo == null) {
- if (parentInfo != null) { // fileInfo == null
- IgfsListingEntry entry = parentInfo.listing().get(fileName);
-
- // If file info does not exists but listing entry exists, throw inconsistent exception.
- if (entry != null && entry.fileId().equals(fileId))
- throw new IgniteCheckedException("Failed to remove file (file system is in inconsistent state) " +
- "[fileInfo=" + fileInfo + ", fileName=" + fileName + ", fileId=" + fileId + ']');
- }
-
- return null; // Parent directory or removed file cannot be locked (not found?).
- }
-
- assert parentInfo.isDirectory();
-
- if (!rmvLocked && fileInfo.lockId() != null)
- throw fsException("Failed to remove file (file is opened for writing) [fileName=" +
- fileName + ", fileId=" + fileId + ", lockId=" + fileInfo.lockId() + ']');
-
- // Validate own directory listing.
- if (fileInfo.isDirectory()) {
- Map<String, IgfsListingEntry> listing = fileInfo.listing();
-
- if (!F.isEmpty(listing))
- throw fsException(new IgfsDirectoryNotEmptyException("Failed to remove file (directory is not empty)" +
- " [fileId=" + fileId + ", listing=" + listing + ']'));
- }
-
- // Validate file in the parent listing.
- IgfsListingEntry listingEntry = parentInfo.listing().get(fileName);
-
- if (listingEntry == null || !listingEntry.fileId().equals(fileId))
- return null;
-
- // Actual remove.
- softDeleteNonTx(parentId, fileName, fileId);
-
- // Update a file info of the removed file with a file path,
- // which will be used by delete worker for event notifications.
- id2InfoPrj.invoke(fileId, new UpdatePath(path));
-
- return builder(fileInfo).path(path).build();
- }
-
- /**
* Deletes (moves to TRASH) all elements under the root folder.
*
* @return The new Id if the artificially created folder containing all former root
@@ -1528,6 +1389,9 @@ public class IgfsMetaManager extends IgfsManager {
IgfsFileInfo entryInfo = locks.get(entryId);
if (entryInfo != null) {
+ // File must be locked for deletion:
+ assert entryInfo.isDirectory() || DELETE_LOCK_ID.equals(entryInfo.lockId());
+
// Delete only files or empty folders.
if (entryInfo.isFile() || entryInfo.isDirectory() && entryInfo.listing().isEmpty()) {
id2InfoPrj.getAndRemove(entryId);
@@ -1588,6 +1452,14 @@ public class IgfsMetaManager extends IgfsManager {
Map<IgniteUuid, IgfsFileInfo> infos = lockIds(parentId, id);
+ IgfsFileInfo victim = infos.get(id);
+
+ if (victim == null)
+ return res;
+
+ assert victim.isDirectory() || DELETE_LOCK_ID.equals(victim.lockId()) :
+ " isDir: " + victim.isDirectory() + ", lockId: " + victim.lockId();
+
// Proceed only in case both parent and child exist.
if (infos.containsKey(parentId) && infos.containsKey(id)) {
IgfsFileInfo parentInfo = infos.get(parentId);
@@ -1599,7 +1471,9 @@ public class IgfsMetaManager extends IgfsManager {
if (listingEntry != null)
id2InfoPrj.invoke(parentId, new UpdateListing(name, listingEntry, true));
- id2InfoPrj.getAndRemove(id);
+ IgfsFileInfo deleted = id2InfoPrj.getAndRemove(id);
+
+ assert victim.id().equals(deleted.id());
res = true;
}
@@ -1885,66 +1759,34 @@ public class IgfsMetaManager extends IgfsManager {
assert props != null;
assert validTxState(false);
- List<String> components;
- SortedSet<IgniteUuid> idSet;
- IgfsPath existingPath;
+ DirectoryChainBuilder b = null;
while (true) {
if (busyLock.enterBusy()) {
try {
- // Take the ids in *path* order out of transaction:
- final List<IgniteUuid> idList = fileIds(path);
-
- idSet = new TreeSet<IgniteUuid>(PATH_ID_SORTING_COMPARATOR);
-
- idSet.add(ROOT_ID);
-
- components = path.components();
-
- // Store all the non-null ids in the set & construct existing path in one loop:
- existingPath = path.root();
-
- assert idList.size() == components.size() + 1;
-
- // Find the lowermost existing id:
- IgniteUuid parentId = ROOT_ID;
-
- for (int i = 1; i < idList.size(); i++) {
- IgniteUuid id = idList.get(i);
-
- if (id == null)
- break;
-
- parentId = id;
-
- boolean added = idSet.add(id);
-
- assert added;
-
- existingPath = new IgfsPath(existingPath, components.get(i - 1));
- }
+ b = new DirectoryChainBuilder(path, props, props);
// Start TX.
IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
try {
- final Map<IgniteUuid, IgfsFileInfo> lockedInfos = lockIds(idSet);
+ final Map<IgniteUuid, IgfsFileInfo> lockedInfos = lockIds(b.idSet);
// If the path was changed, we close the current Tx and repeat the procedure again
// starting from taking the path ids.
- if (verifyPathIntegrity(existingPath, idList, lockedInfos)) {
+ if (verifyPathIntegrity(b.existingPath, b.idList, lockedInfos)) {
// Locked path okay, trying to proceed with the remainder creation.
- IgfsFileInfo parentInfo = lockedInfos.get(parentId);
+ IgfsFileInfo lowermostExistingInfo = lockedInfos.get(b.lowermostExistingId);
// Check only the lowermost directory in the existing directory chain
// because others are already checked in #verifyPathIntegrity() above.
- if (!parentInfo.isDirectory())
+ if (!lowermostExistingInfo.isDirectory())
throw new IgfsParentNotDirectoryException("Failed to create directory (parent " +
"element is not a directory)");
- if (idSet.size() == components.size() + 1) {
- assert existingPath.equals(path);
- assert lockedInfos.size() == idSet.size();
+ if (b.existingIdCnt == b.components.size() + 1) {
+ assert b.existingPath.equals(path);
+ assert lockedInfos.size() == b.existingIdCnt;
// The target directory already exists, nothing to do.
// (The fact that all the path consisns of directories is already checked above).
@@ -1952,48 +1794,15 @@ public class IgfsMetaManager extends IgfsManager {
return false;
}
- Map<String, IgfsListingEntry> parentListing = parentInfo.listing();
+ Map<String, IgfsListingEntry> parentListing = lowermostExistingInfo.listing();
- String shortName = components.get(idSet.size() - 1);
+ String shortName = b.components.get(b.existingIdCnt - 1);
IgfsListingEntry entry = parentListing.get(shortName);
if (entry == null) {
- IgfsFileInfo childInfo = null;
-
- String childName = null;
-
- IgfsFileInfo newDirInfo;
-
- // This loop creates the missing directory chain from the bottom to the top:
- for (int i = components.size() - 1; i >= idSet.size() - 1; i--) {
- // Required entry does not exist.
- // Create new directory info:
- if (childName == null) {
- assert childInfo == null;
-
- newDirInfo = new IgfsFileInfo(true, props);
- }
- else {
- assert childInfo != null;
-
- newDirInfo = new IgfsFileInfo(Collections.singletonMap(childName,
- new IgfsListingEntry(childInfo)), props);
- }
-
- boolean put = id2InfoPrj.putIfAbsent(newDirInfo.id(), newDirInfo);
-
- assert put; // Because we used a new id that should be unique.
+ b.doBuild();
- childInfo = newDirInfo;
- childName = components.get(i);
- }
-
- // Now link the newly created directory chain to the lowermost existing parent:
- id2InfoPrj.invoke(parentId,
- new UpdateListing(childName, new IgfsListingEntry(childInfo), false));
-
- // We're close to finish:
tx.commit();
break;
@@ -2022,17 +1831,11 @@ public class IgfsMetaManager extends IgfsManager {
}
else
throw new IllegalStateException("Failed to mkdir because Grid is stopping. [path=" + path + ']');
- } // retry loop
-
- if (evts.isRecordable(EVT_IGFS_DIR_CREATED)) {
- IgfsPath createdPath = existingPath;
+ }
- for (int i = idSet.size() - 1; i < components.size(); i++) {
- createdPath = new IgfsPath(createdPath, components.get(i));
+ assert b != null;
- evts.record(new IgfsEvent(createdPath, locNode, EVT_IGFS_DIR_CREATED));
- }
- }
+ b.sendEvents();
return true;
}
@@ -2135,6 +1938,8 @@ public class IgfsMetaManager extends IgfsManager {
@Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath,
IgfsFileInfo> infos) throws Exception {
+ assert validTxState(true);
+
assert !infos.isEmpty();
// Determine the first existing parent.
@@ -2186,7 +1991,7 @@ public class IgfsMetaManager extends IgfsManager {
"the secondary file system because the path points to a directory: " + path);
IgfsFileInfo newInfo = new IgfsFileInfo(status.blockSize(), status.length(), affKey,
- IgniteUuid.randomUuid(), igfsCtx.igfs().evictExclude(path, false), status.properties());
+ composeLockId(false), igfsCtx.igfs().evictExclude(path, false), status.properties());
// Add new file info to the listing optionally removing the previous one.
IgniteUuid oldId = putIfAbsentNonTx(parentInfo.id(), path.name(), newInfo);
@@ -2194,6 +1999,13 @@ public class IgfsMetaManager extends IgfsManager {
if (oldId != null) {
IgfsFileInfo oldInfo = info(oldId);
+ assert oldInfo != null; // Otherwise cache is in inconsistent state.
+
+ // The contact is that we cannot overwrite a file locked for writing:
+ if (oldInfo.lockId() != null)
+ throw fsException("Failed to overwrite file (file is opened for writing) [path=" +
+ path + ", fileId=" + oldId + ", lockId=" + oldInfo.lockId() + ']');
+
id2InfoPrj.remove(oldId); // Remove the old one.
id2InfoPrj.put(newInfo.id(), newInfo); // Put the new one.
@@ -2203,29 +2015,6 @@ public class IgfsMetaManager extends IgfsManager {
new UpdateListing(path.name(), new IgfsListingEntry(newInfo), false));
IgniteInternalFuture<?> delFut = igfsCtx.data().delete(oldInfo);
-
- // Record PURGE event if needed.
- if (evts.isRecordable(EVT_IGFS_FILE_PURGED)) {
- delFut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> t) {
- try {
- t.get(); // Ensure delete succeeded.
-
- evts.record(new IgfsEvent(path, locNode, EVT_IGFS_FILE_PURGED));
- }
- catch (IgniteCheckedException e) {
- LT.warn(log, e, "Old file deletion failed in DUAL mode [path=" + path +
- ", simpleCreate=" + simpleCreate + ", props=" + props +
- ", overwrite=" + overwrite + ", bufferSize=" + bufSize +
- ", replication=" + replication + ", blockSize=" + blockSize + ']');
- }
- }
- });
- }
-
- // Record DELETE event if needed.
- if (evts.isRecordable(EVT_IGFS_FILE_DELETED))
- pendingEvts.add(new IgfsEvent(path, locNode, EVT_IGFS_FILE_DELETED));
}
// Record CREATE event if needed.
@@ -2287,7 +2076,9 @@ public class IgfsMetaManager extends IgfsManager {
@Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath,
IgfsFileInfo> infos) throws Exception {
- IgfsFileInfo info = infos.get(path);
+ assert validTxState(true);
+
+ final IgfsFileInfo info = infos.get(path);
if (info.isDirectory())
throw fsException("Failed to open output stream to the file in the " +
@@ -2314,12 +2105,22 @@ public class IgfsMetaManager extends IgfsManager {
}
}
+ if (info.lockId() != null) {
+ throw fsException("Failed to open file (file is opened for writing) [path=" +
+ path + ", fileId=" + info.id() + ", lockId=" + info.lockId() + ']');
+ }
+
// Set lock and return.
- info = lockInfo(info);
+ IgfsFileInfo lockedInfo = lockInfo(info, false);
+
+ assert lockedInfo != null; // We checked the lock above.
- metaCache.put(info.id(), info);
+ boolean put = metaCache.put(info.id(), lockedInfo);
- return new IgfsSecondaryOutputStreamDescriptor(infos.get(path.parent()).id(), info, out);
+ assert put;
+
+ return new IgfsSecondaryOutputStreamDescriptor(infos.get(path.parent()).id(),
+ lockedInfo, out);
}
@Override public IgfsSecondaryOutputStreamDescriptor onFailure(@Nullable Exception err)
@@ -2329,8 +2130,8 @@ public class IgfsMetaManager extends IgfsManager {
U.error(log, "File append in DUAL mode failed [path=" + path + ", bufferSize=" + bufSize +
']', err);
- throw new IgniteCheckedException("Failed to append to the file due to secondary file system " +
- "exception: " + path, err);
+ throw new IgniteCheckedException("Failed to append to the file due to secondary file " +
+ "system exception: " + path, err);
}
};
@@ -2438,8 +2239,8 @@ public class IgfsMetaManager extends IgfsManager {
}
@Override public IgfsFileInfo onFailure(@Nullable Exception err) throws IgniteCheckedException {
- throw new IgniteCheckedException("Failed to synchronize path due to secondary file system " +
- "exception: " + path, err);
+ throw new IgniteCheckedException("Failed to synchronize path due to secondary file " +
+ "system exception: " + path, err);
}
};
@@ -2517,8 +2318,8 @@ public class IgfsMetaManager extends IgfsManager {
U.error(log, "Directory creation in DUAL mode failed [path=" + path + ", properties=" + props +
']', err);
- throw new IgniteCheckedException("Failed to create the path due to secondary file system exception: " +
- path, err);
+ throw new IgniteCheckedException("Failed to create the path due to secondary file system " +
+ "exception: " + path, err);
}
};
@@ -2685,8 +2486,8 @@ public class IgfsMetaManager extends IgfsManager {
U.error(log, "Path delete in DUAL mode failed [path=" + path + ", recursive=" + recursive + ']',
err);
- throw new IgniteCheckedException("Failed to delete the path due to secondary file system exception: ",
- err);
+ throw new IgniteCheckedException("Failed to delete the path due to secondary file system " +
+ "exception: ", err);
}
};
@@ -2713,8 +2514,8 @@ public class IgfsMetaManager extends IgfsManager {
* @return Update file info.
* @throws IgniteCheckedException If update failed.
*/
- public IgfsFileInfo updateDual(final IgfsSecondaryFileSystem fs, final IgfsPath path, final Map<String, String> props)
- throws IgniteCheckedException {
+ public IgfsFileInfo updateDual(final IgfsSecondaryFileSystem fs, final IgfsPath path,
+ final Map<String, String> props) throws IgniteCheckedException {
assert fs != null;
assert path != null;
assert props != null && !props.isEmpty();
@@ -2740,8 +2541,8 @@ public class IgfsMetaManager extends IgfsManager {
U.error(log, "Path update in DUAL mode failed [path=" + path + ", properties=" + props + ']',
err);
- throw new IgniteCheckedException("Failed to update the path due to secondary file system exception: " +
- path, err);
+ throw new IgniteCheckedException("Failed to update the path due to secondary file system " +
+ "exception: " + path, err);
}
};
@@ -2805,8 +2606,8 @@ public class IgfsMetaManager extends IgfsManager {
if (status != null) {
if (!status.isDirectory() && !curPath.equals(endPath))
- throw new IgniteCheckedException("Failed to create path the locally because secondary file system " +
- "directory structure was modified concurrently and the path is not a directory as " +
+ throw new IgniteCheckedException("Failed to create path the locally because secondary file " +
+ "system directory structure was modified concurrently and the path is not a directory as " +
"expected: " + curPath);
}
else {
@@ -3084,7 +2885,8 @@ public class IgfsMetaManager extends IgfsManager {
* @return {@code True} if value was stored in cache, {@code false} otherwise.
* @throws IgniteCheckedException If operation failed.
*/
- private <K, V> boolean putx(IgniteInternalCache<K, V> cache, K key, IgniteClosure<V, V> c) throws IgniteCheckedException {
+ private <K, V> boolean putx(IgniteInternalCache<K, V> cache, K key, IgniteClosure<V, V> c)
+ throws IgniteCheckedException {
assert validTxState(true);
V oldVal = cache.get(key);
@@ -3549,4 +3351,455 @@ public class IgfsMetaManager extends IgfsManager {
return S.toString(UpdatePath.class, this);
}
}
+
+ /**
+ * Create a new file.
+ *
+ * @param path Path.
+ * @param bufSize Buffer size.
+ * @param overwrite Overwrite flag.
+ * @param affKey Affinity key.
+ * @param replication Replication factor.
+ * @param props Properties.
+ * @param simpleCreate Whether new file should be created in secondary FS using create(Path, boolean) method.
+ * @return Tuple containing the created file info and its parent id.
+ */
+ IgniteBiTuple<IgfsFileInfo, IgniteUuid> create(
+ final IgfsPath path,
+ final boolean append,
+ final boolean overwrite,
+ Map<String, String> dirProps,
+ final int blockSize,
+ final @Nullable IgniteUuid affKey,
+ final boolean evictExclude,
+ @Nullable Map<String, String> fileProps) throws IgniteCheckedException {
+ assert validTxState(false);
+ assert path != null;
+
+ final String name = path.name();
+
+ DirectoryChainBuilder b = null;
+
+ while (true) {
+ if (busyLock.enterBusy()) {
+ try {
+ b = new DirectoryChainBuilder(path, dirProps, fileProps) {
+ /** {@inheritDoc} */
+ @Override protected IgfsFileInfo buildLeaf() {
+ return new IgfsFileInfo(blockSize, 0L, affKey, composeLockId(false),
+ evictExclude, leafProps);
+ }
+ };
+
+ // Start Tx:
+ IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
+
+ try {
+ if (overwrite)
+ // Lock also the TRASH directory because in case of overwrite we
+ // may need to delete the old file:
+ b.idSet.add(TRASH_ID);
+
+ final Map<IgniteUuid, IgfsFileInfo> lockedInfos = lockIds(b.idSet);
+
+ assert !overwrite || lockedInfos.get(TRASH_ID) != null; // TRASH must exist at this point.
+
+ // If the path was changed, we close the current Tx and repeat the procedure again
+ // starting from taking the path ids.
+ if (verifyPathIntegrity(b.existingPath, b.idList, lockedInfos)) {
+ // Locked path okay, trying to proceed with the remainder creation.
+ final IgfsFileInfo lowermostExistingInfo = lockedInfos.get(b.lowermostExistingId);
+
+ if (b.existingIdCnt == b.components.size() + 1) {
+ // Full requestd path exists.
+
+ assert b.existingPath.equals(path);
+ assert lockedInfos.size() ==
+ (overwrite ? b.existingIdCnt + 1/*TRASH*/ : b.existingIdCnt);
+
+ if (lowermostExistingInfo.isDirectory()) {
+ throw new IgfsPathAlreadyExistsException("Failed to "
+ + (append ? "open" : "create") + " file (path points to an " +
+ "existing directory): " + path);
+ }
+ else {
+ // This is a file.
+ assert lowermostExistingInfo.isFile();
+
+ final IgniteUuid parentId = b.idList.get(b.idList.size() - 2);
+
+ final IgniteUuid lockId = lowermostExistingInfo.lockId();
+
+ if (append) {
+ if (lockId != null)
+ throw fsException("Failed to open file (file is opened for writing) "
+ + "[fileName=" + name + ", fileId=" + lowermostExistingInfo.id()
+ + ", lockId=" + lockId + ']');
+
+ IgniteUuid newLockId = composeLockId(false);
+
+ EntryProcessorResult<IgfsFileInfo> result
+ = id2InfoPrj.invoke(lowermostExistingInfo.id(),
+ new LockFileProcessor(newLockId));
+
+ IgfsFileInfo lockedInfo = result.get();
+
+ assert lockedInfo != null; // we already checked lock above.
+ assert lockedInfo.lockId() != null;
+ assert lockedInfo.lockId().equals(newLockId);
+ assert lockedInfo.id().equals(lowermostExistingInfo.id());
+
+ IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = new T2<>(lockedInfo, parentId);
+
+ tx.commit();
+
+ IgfsUtils.sendEvents(igfsCtx.kernalContext(), path,
+ EventType.EVT_IGFS_FILE_OPENED_WRITE);
+
+ return t2;
+ }
+ else if (overwrite) {
+ // Delete existing file, but fail if it is locked:
+ if (lockId != null)
+ throw fsException("Failed to overwrite file (file is opened for writing) " +
+ "[fileName=" + name + ", fileId=" + lowermostExistingInfo.id()
+ + ", lockId=" + lockId + ']');
+
+ final IgfsListingEntry deletedEntry = lockedInfos.get(parentId).listing()
+ .get(name);
+
+ assert deletedEntry != null;
+
+ id2InfoPrj.invoke(parentId, new UpdateListing(name, deletedEntry, true));
+
+ // Add listing entry into the destination parent listing.
+ id2InfoPrj.invoke(TRASH_ID, new UpdateListing(
+ lowermostExistingInfo.id().toString(), deletedEntry, false));
+
+ // Update a file info of the removed file with a file path,
+ // which will be used by delete worker for event notifications.
+ id2InfoPrj.invoke(lowermostExistingInfo.id(), new UpdatePath(path));
+
+ // Make a new locked info:
+ final IgfsFileInfo newFileInfo = new IgfsFileInfo(cfg.getBlockSize(), 0L,
+ affKey, composeLockId(false), evictExclude, fileProps);
+
+ assert newFileInfo.lockId() != null; // locked info should be created.
+
+ boolean put = id2InfoPrj.putIfAbsent(newFileInfo.id(), newFileInfo);
+
+ assert put;
+
+ id2InfoPrj.invoke(parentId,
+ new UpdateListing(name, new IgfsListingEntry(newFileInfo), false));
+
+ IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = new T2<>(newFileInfo, parentId);
+
+ tx.commit();
+
+ delWorker.signal();
+
+ IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_WRITE);
+
+ return t2;
+ }
+ else {
+ throw new IgfsPathAlreadyExistsException("Failed to create file (file " +
+ "already exists and overwrite flag is false): " + path);
+ }
+ }
+ }
+
+ // The full requested path does not exist.
+
+ // Check only the lowermost directory in the existing directory chain
+ // because others are already checked in #verifyPathIntegrity() above.
+ if (!lowermostExistingInfo.isDirectory())
+ throw new IgfsParentNotDirectoryException("Failed to " + (append ? "open" : "create" )
+ + " file (parent element is not a directory)");
+
+ Map<String, IgfsListingEntry> parentListing = lowermostExistingInfo.listing();
+
+ final String uppermostFileToBeCreatedName = b.components.get(b.existingIdCnt - 1);
+
+ final IgfsListingEntry entry = parentListing.get(uppermostFileToBeCreatedName);
+
+ if (entry == null) {
+ b.doBuild();
+
+ assert b.leafInfo != null;
+ assert b.leafParentId != null;
+
+ IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = new T2<>(b.leafInfo, b.leafParentId);
+
+ tx.commit();
+
+ b.sendEvents();
+
+ return t2;
+ }
+
+ // Another thread concurrently created file or directory in the path with
+ // the name we need.
+ }
+ }
+ finally {
+ tx.close();
+ }
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ } else
+ throw new IllegalStateException("Failed to mkdir because Grid is stopping. [path=" + path + ']');
+ }
+ }
+
+ /** File chain builder. */
+ private class DirectoryChainBuilder {
+ /** The requested path to be created. */
+ protected final IgfsPath path;
+
+ /** Full path components. */
+ protected final List<String> components;
+
+ /** The list of ids. */
+ protected final List<IgniteUuid> idList;
+
+ /** The set of ids. */
+ protected final SortedSet<IgniteUuid> idSet;
+
+ /** The middle node properties. */
+ protected final Map<String, String> middleProps;
+
+ /** The leaf node properties. */
+ protected final Map<String, String> leafProps;
+
+ /** The lowermost exsiting path id. */
+ protected final IgniteUuid lowermostExistingId;
+
+ /** The existing path. */
+ protected final IgfsPath existingPath;
+
+ /** The created leaf info. */
+ protected IgfsFileInfo leafInfo;
+
+ /** The leaf parent id. */
+ protected IgniteUuid leafParentId;
+
+ /** The number of existing ids. */
+ protected final int existingIdCnt;
+
+ /**
+ * Creates the builder and performa all the initial calculations.
+ */
+ protected DirectoryChainBuilder(IgfsPath path,
+ Map<String,String> middleProps, Map<String,String> leafProps) throws IgniteCheckedException {
+ this.path = path;
+
+ this.components = path.components();
+
+ this.idList = fileIds(path);
+
+ this.idSet = new TreeSet<IgniteUuid>(PATH_ID_SORTING_COMPARATOR);
+
+ this.middleProps = middleProps;
+
+ this.leafProps = leafProps;
+ // Store all the non-null ids in the set & construct existing path in one loop:
+ IgfsPath existingPath = path.root();
+
+ assert idList.size() == components.size() + 1;
+
+ // Find the lowermost existing id:
+ IgniteUuid lowermostExistingId = null;
+
+ int idIdx = 0;
+
+ for (IgniteUuid id: idList) {
+ if (id == null)
+ break;
+
+ lowermostExistingId = id;
+
+ boolean added = idSet.add(id);
+
+ assert added : "Not added id = " + id;
+
+ if (idIdx >= 1) // skip root.
+ existingPath = new IgfsPath(existingPath, components.get(idIdx - 1));
+
+ idIdx++;
+ }
+
+ assert idSet.contains(ROOT_ID);
+
+ this.lowermostExistingId = lowermostExistingId;
+
+ this.existingPath = existingPath;
+
+ this.existingIdCnt = idSet.size();
+ }
+
+ /**
+ * Builds middle nodes.
+ */
+ protected IgfsFileInfo buildMiddleNode(String childName, IgfsFileInfo childInfo) {
+ return new IgfsFileInfo(Collections.singletonMap(childName,
+ new IgfsListingEntry(childInfo)), middleProps);
+ }
+
+ /**
+ * Builds leaf.
+ */
+ protected IgfsFileInfo buildLeaf() {
+ return new IgfsFileInfo(true, leafProps);
+ }
+
+ /**
+ * Links newly created chain to existing parent.
+ */
+ final void linkBuiltChainToExistingParent(String childName, IgfsFileInfo childInfo)
+ throws IgniteCheckedException {
+ assert childInfo != null;
+
+ id2InfoPrj.invoke(lowermostExistingId,
+ new UpdateListing(childName, new IgfsListingEntry(childInfo), false));
+ }
+
+ /**
+ * Does the main portion of job building the renmaining path.
+ */
+ public final void doBuild() throws IgniteCheckedException {
+ IgfsFileInfo childInfo = null;
+
+ String childName = null;
+
+ IgfsFileInfo newLeafInfo;
+ IgniteUuid parentId = null;
+
+ // This loop creates the missing directory chain from the bottom to the top:
+ for (int i = components.size() - 1; i >= existingIdCnt - 1; i--) {
+ // Required entry does not exist.
+ // Create new directory info:
+ if (childName == null) {
+ assert childInfo == null;
+
+ newLeafInfo = buildLeaf();
+
+ assert newLeafInfo != null;
+
+ leafInfo = newLeafInfo;
+ }
+ else {
+ assert childInfo != null;
+
+ newLeafInfo = buildMiddleNode(childName, childInfo);
+
+ assert newLeafInfo != null;
+
+ if (parentId == null)
+ parentId = newLeafInfo.id();
+ }
+
+ boolean put = id2InfoPrj.putIfAbsent(newLeafInfo.id(), newLeafInfo);
+
+ assert put; // Because we used a new id that should be unique.
+
+ childInfo = newLeafInfo;
+
+ childName = components.get(i);
+ }
+
+ if (parentId == null)
+ parentId = lowermostExistingId;
+
+ leafParentId = parentId;
+
+ // Now link the newly created directory chain to the lowermost existing parent:
+ linkBuiltChainToExistingParent(childName, childInfo);
+ }
+
+ /**
+ * Sends events.
+ */
+ public final void sendEvents() {
+ if (evts.isRecordable(EVT_IGFS_DIR_CREATED)) {
+ IgfsPath createdPath = existingPath;
+
+ for (int i = existingPath.components().size(); i < components.size() - 1; i++) {
+ createdPath = new IgfsPath(createdPath, components.get(i));
+
+ IgfsUtils.sendEvents(igfsCtx.kernalContext(), createdPath, EVT_IGFS_DIR_CREATED);
+ }
+ }
+
+ if (leafInfo.isDirectory())
+ IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_DIR_CREATED);
+ else {
+ IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_CREATED);
+ IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_WRITE);
+ }
+ }
+ }
+
+ /**
+ * Processor closure to locks a file for writing.
+ */
+ private static class LockFileProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, IgfsFileInfo>,
+ Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** New lock id to lock the entry. */
+ private IgniteUuid newLockId;
+
+ /**
+ * Constructor.
+ */
+ public LockFileProcessor(IgniteUuid newLockId) {
+ assert newLockId != null;
+
+ this.newLockId = newLockId;
+ }
+
+ /**
+ * Empty constructor required for {@link Externalizable}.
+ */
+ public LockFileProcessor() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public IgfsFileInfo process(MutableEntry<IgniteUuid, IgfsFileInfo> entry,
+ Object... arguments) throws EntryProcessorException {
+ final IgfsFileInfo info = entry.getValue();
+
+ assert info != null;
+
+ if (info.lockId() != null)
+ return null; // file is already locked.
+
+ IgfsFileInfo newInfo = new IgfsFileInfo(info, newLockId, info.modificationTime());
+
+ entry.setValue(newInfo);
+
+ return newInfo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeGridUuid(out, newLockId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ newLockId = U.readGridUuid(in);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(LockFileProcessor.class, this);
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/962fcce3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index c297eed..c9225ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -121,6 +121,8 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
if (fileInfo.lockId() == null)
throw new IgfsException("Failed to acquire file lock (concurrently modified?): " + path);
+ assert !IgfsMetaManager.DELETE_LOCK_ID.equals(fileInfo.lockId());
+
this.igfsCtx = igfsCtx;
meta = igfsCtx.meta();
data = igfsCtx.data();
http://git-wip-us.apache.org/repos/asf/ignite/blob/962fcce3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index 50ebd56..07fdda4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@ -21,10 +21,15 @@ import java.lang.reflect.Constructor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.events.IgfsEvent;
import org.apache.ignite.igfs.IgfsException;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
@@ -158,4 +163,22 @@ public class IgfsUtils {
throw new IgniteCheckedException("Failed to perform operation since max number of attempts " +
"exceeded. [maxAttempts=" + MAX_CACHE_TX_RETRIES + ']');
}
+
+
+ /**
+ * Sends a series of event.
+ *
+ * @param path The path of the created file.
+ * @param type The type of event to send.
+ */
+ public static void sendEvents(GridKernalContext kernalCtx, IgfsPath path, int type) {
+ assert kernalCtx != null;
+ assert path != null;
+
+ GridEventStorageManager evts = kernalCtx.event();
+ ClusterNode locNode = kernalCtx.discovery().localNode();
+
+ if (evts.isRecordable(type))
+ evts.record(new IgfsEvent(path, locNode, type));
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/962fcce3/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java
index f0f86ec..6ca75a1 100644
--- a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsEventsAbstractSelfTest.java
@@ -683,7 +683,7 @@ public abstract class IgfsEventsAbstractSelfTest extends GridCommonAbstractTest
public void testSingleFileOverwrite() throws Exception {
final List<Event> evtList = new ArrayList<>();
- final int evtsCnt = 3 + 4 + 1;
+ final int evtsCnt = 1 + 4 + 1;
final CountDownLatch latch = new CountDownLatch(evtsCnt);
@@ -703,7 +703,7 @@ public abstract class IgfsEventsAbstractSelfTest extends GridCommonAbstractTest
igfs.create(file, false).close(); // Will generate create, open and close events.
- igfs.create(file, true).close(); // Will generate same event set + delete and purge events.
+ igfs.create(file, true).close(); // Will generate only OPEN_WRITE & close events.
try {
igfs.create(file, false).close(); // Won't generate any event.
@@ -732,7 +732,7 @@ public abstract class IgfsEventsAbstractSelfTest extends GridCommonAbstractTest
assertEquals(0, evt.dataSize());
assertOneToOne(
- evtList.subList(3, 8),
+ evtList.subList(3, evtsCnt),
new P1<Event>() {
@Override public boolean apply(Event e) {
IgfsEvent e0 = (IgfsEvent)e;