You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/06/07 07:56:17 UTC
[14/14] ignite git commit: WIP.
WIP.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/99d244a3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/99d244a3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/99d244a3
Branch: refs/heads/ignite-3264
Commit: 99d244a3009a5bcf347ce09da145a8f6cc3dc19f
Parents: cd92c9e
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Jun 7 10:55:21 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Jun 7 10:55:21 2016 +0300
----------------------------------------------------------------------
.../processors/igfs/IgfsMetaManager.java | 341 +++++++++++--------
1 file changed, 203 insertions(+), 138 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/99d244a3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 465116b..404d837 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -1882,121 +1882,8 @@ public class IgfsMetaManager extends IgfsManager {
// Events to fire (can be done outside of a transaction).
final Deque<IgfsEvent> pendingEvts = new LinkedList<>();
- SynchronizationTask<IgfsSecondaryOutputStreamDescriptor> task =
- new SynchronizationTask<IgfsSecondaryOutputStreamDescriptor>() {
- /** Output stream to the secondary file system. */
- private OutputStream out;
-
- @Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath,
- IgfsEntryInfo> infos) throws Exception {
- validTxState(true);
-
- assert !infos.isEmpty();
-
- // Determine the first existing parent.
- IgfsPath parentPath = null;
-
- for (IgfsPath curPath : infos.keySet()) {
- if (parentPath == null || curPath.isSubDirectoryOf(parentPath))
- parentPath = curPath;
- }
-
- assert parentPath != null;
-
- IgfsEntryInfo parentInfo = infos.get(parentPath);
-
- // Delegate to the secondary file system.
- out = simpleCreate ? fs.create(path, overwrite) :
- fs.create(path, bufSize, overwrite, replication, blockSize, props);
-
- IgfsPath parent0 = path.parent();
-
- assert parent0 != null : "path.parent() is null (are we creating ROOT?): " + path;
-
- // If some of the parent directories were missing, synchronize again.
- if (!parentPath.equals(parent0)) {
- parentInfo = synchronize(fs, parentPath, parentInfo, parent0, true, null);
-
- // Fire notification about missing directories creation.
- if (evts.isRecordable(EventType.EVT_IGFS_DIR_CREATED)) {
- IgfsPath evtPath = parent0;
-
- while (!parentPath.equals(evtPath)) {
- pendingEvts.addFirst(new IgfsEvent(evtPath, locNode,
- EventType.EVT_IGFS_DIR_CREATED));
-
- evtPath = evtPath.parent();
-
- assert evtPath != null; // If this fails, then ROOT does not exist.
- }
- }
- }
-
- // Get created file info.
- IgfsFile status = fs.info(path);
-
- if (status == null)
- throw fsException("Failed to open output stream to the file created in " +
- "the secondary file system because it no longer exists: " + path);
- else if (status.isDirectory())
- throw fsException("Failed to open output stream to the file created in " +
- "the secondary file system because the path points to a directory: " + path);
-
- IgfsEntryInfo newInfo = IgfsUtils.createFile(
- IgniteUuid.randomUuid(),
- status.blockSize(),
- status.length(),
- affKey,
- createFileLockId(false),
- igfsCtx.igfs().evictExclude(path, false),
- status.properties(),
- status.accessTime(),
- status.modificationTime()
- );
-
- // Add new file info to the listing optionally removing the previous one.
- assert parentInfo != null;
-
- IgniteUuid oldId = putIfAbsentNonTx(parentInfo.id(), path.name(), newInfo);
-
- if (oldId != null) {
- IgfsEntryInfo oldInfo = info(oldId);
-
- assert oldInfo != null; // Otherwise cache is in inconsistent state.
-
- // The contact is that we cannot overwrite a file locked for writing:
- if (oldInfo.lockId() != null)
- throw fsException("Failed to overwrite file (file is opened for writing) [path=" +
- path + ", fileId=" + oldId + ", lockId=" + oldInfo.lockId() + ']');
-
- id2InfoPrj.remove(oldId); // Remove the old one.
- id2InfoPrj.invoke(parentInfo.id(), new IgfsMetaDirectoryListingRemoveProcessor(
- path.name(), parentInfo.listing().get(path.name()).fileId()));
-
- createNewEntry(newInfo, parentInfo.id(), path.name()); // Put new one.
-
- igfsCtx.data().delete(oldInfo);
- }
-
- // Record CREATE event if needed.
- if (oldId == null && evts.isRecordable(EventType.EVT_IGFS_FILE_CREATED))
- pendingEvts.add(new IgfsEvent(path, locNode, EventType.EVT_IGFS_FILE_CREATED));
-
- return new IgfsSecondaryOutputStreamDescriptor(newInfo, out);
- }
-
- @Override public IgfsSecondaryOutputStreamDescriptor onFailure(Exception err)
- throws IgniteCheckedException {
- U.closeQuiet(out);
-
- U.error(log, "File create in DUAL mode failed [path=" + path + ", simpleCreate=" +
- simpleCreate + ", props=" + props + ", overwrite=" + overwrite + ", bufferSize=" +
- bufSize + ", replication=" + replication + ", blockSize=" + blockSize + ']', err);
-
- throw new IgniteCheckedException("Failed to create the file due to secondary file system " +
- "exception: " + path, err);
- }
- };
+ CreateFileSynchronizationTask task = new CreateFileSynchronizationTask(fs, path, simpleCreate, props,
+ overwrite, bufSize, replication, blockSize, affKey, pendingEvts);
try {
return synchronizeAndExecute(task, fs, false, path.parent());
@@ -2956,29 +2843,6 @@ public class IgfsMetaManager extends IgfsManager {
}
/**
- * Synchronization task interface.
- */
- private static interface SynchronizationTask<T> {
- /**
- * Callback handler in case synchronization was successful.
- *
- * @param infos Map from paths to corresponding infos.
- * @return Task result.
- * @throws Exception If failed.
- */
- public T onSuccess(Map<IgfsPath, IgfsEntryInfo> infos) throws Exception;
-
- /**
- * Callback handler in case synchronization failed.
- *
- * @param err Optional exception.
- * @return Task result.
- * @throws IgniteCheckedException In case exception is to be thrown in that case.
- */
- public T onFailure(Exception err) throws IgniteCheckedException;
- }
-
- /**
* Append routine.
*
* @param path Path.
@@ -3352,4 +3216,205 @@ public class IgfsMetaManager extends IgfsManager {
if (delWorker0 != null)
delWorker0.signal();
}
+
+ /**
+ * Synchronization task interface.
+ */
+ private static interface SynchronizationTask<T> {
+ /**
+ * Callback handler in case synchronization was successful.
+ *
+ * @param infos Map from paths to corresponding infos.
+ * @return Task result.
+ * @throws Exception If failed.
+ */
+ public T onSuccess(Map<IgfsPath, IgfsEntryInfo> infos) throws Exception;
+
+ /**
+ * Callback handler in case synchronization failed.
+ *
+ * @param err Optional exception.
+ * @return Task result.
+ * @throws IgniteCheckedException In case exception is to be thrown in that case.
+ */
+ public T onFailure(Exception err) throws IgniteCheckedException;
+ }
+
+ /**
+ * Synchronization task to create a file.
+ */
+ private class CreateFileSynchronizationTask implements SynchronizationTask<IgfsSecondaryOutputStreamDescriptor> {
+ /** Secondary file system. */
+ private IgfsSecondaryFileSystem fs;
+
+ /** Path. */
+ private IgfsPath path;
+
+ /** Simple create flag. */
+ private boolean simpleCreate;
+
+ /** Properties. */
+ private Map<String, String> props;
+
+ /** Overwrite flag. */
+ private boolean overwrite;
+
+ /** Buffer size. */
+ private int bufSize;
+
+ /** Replication factor. */
+ private short replication;
+
+ /** Block size. */
+ private long blockSize;
+
+ /** Affinity key. */
+ private IgniteUuid affKey;
+
+ /** Pending events. */
+ private Deque<IgfsEvent> pendingEvts;
+
+ /** Output stream to the secondary file system. */
+ private OutputStream out;
+
+ /**
+ * Constructor.
+ *
+ * @param fs Secondary file system.
+ * @param path Path.
+ * @param simpleCreate Simple create flag.
+ * @param props Properties.
+ * @param overwrite Overwrite flag.
+ * @param bufSize Buffer size.
+ * @param replication Replication factor.
+ * @param blockSize Block size.
+ * @param affKey Affinity key.
+ * @param pendingEvts Pending events.
+ */
+ public CreateFileSynchronizationTask(IgfsSecondaryFileSystem fs, IgfsPath path, boolean simpleCreate,
+ @Nullable Map<String, String> props, boolean overwrite, int bufSize, short replication, long blockSize,
+ IgniteUuid affKey, Deque<IgfsEvent> pendingEvts) {
+ this.fs = fs;
+ this.path = path;
+ this.simpleCreate = simpleCreate;
+ this.props = props;
+ this.overwrite = overwrite;
+ this.bufSize = bufSize;
+ this.replication = replication;
+ this.blockSize = blockSize;
+ this.affKey = affKey;
+ this.pendingEvts = pendingEvts;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath, IgfsEntryInfo> infos)
+ throws Exception {
+ validTxState(true);
+
+ assert !infos.isEmpty();
+
+ // Determine the first existing parent.
+ IgfsPath parentPath = null;
+
+ for (IgfsPath curPath : infos.keySet()) {
+ if (parentPath == null || curPath.isSubDirectoryOf(parentPath))
+ parentPath = curPath;
+ }
+
+ assert parentPath != null;
+
+ IgfsEntryInfo parentInfo = infos.get(parentPath);
+
+ // Delegate to the secondary file system.
+ out = simpleCreate ? fs.create(path, overwrite) :
+ fs.create(path, bufSize, overwrite, replication, blockSize, props);
+
+ IgfsPath parent0 = path.parent();
+
+ assert parent0 != null : "path.parent() is null (are we creating ROOT?): " + path;
+
+ // If some of the parent directories were missing, synchronize again.
+ if (!parentPath.equals(parent0)) {
+ parentInfo = synchronize(fs, parentPath, parentInfo, parent0, true, null);
+
+ // Fire notification about missing directories creation.
+ if (evts.isRecordable(EventType.EVT_IGFS_DIR_CREATED)) {
+ IgfsPath evtPath = parent0;
+
+ while (!parentPath.equals(evtPath)) {
+ pendingEvts.addFirst(new IgfsEvent(evtPath, locNode,
+ EventType.EVT_IGFS_DIR_CREATED));
+
+ evtPath = evtPath.parent();
+
+ assert evtPath != null; // If this fails, then ROOT does not exist.
+ }
+ }
+ }
+
+ // Get created file info.
+ IgfsFile status = fs.info(path);
+
+ if (status == null)
+ throw fsException("Failed to open output stream to the file created in " +
+ "the secondary file system because it no longer exists: " + path);
+ else if (status.isDirectory())
+ throw fsException("Failed to open output stream to the file created in " +
+ "the secondary file system because the path points to a directory: " + path);
+
+ IgfsEntryInfo newInfo = IgfsUtils.createFile(
+ IgniteUuid.randomUuid(),
+ status.blockSize(),
+ status.length(),
+ affKey,
+ createFileLockId(false),
+ igfsCtx.igfs().evictExclude(path, false),
+ status.properties(),
+ status.accessTime(),
+ status.modificationTime()
+ );
+
+ // Add new file info to the listing optionally removing the previous one.
+ assert parentInfo != null;
+
+ IgniteUuid oldId = putIfAbsentNonTx(parentInfo.id(), path.name(), newInfo);
+
+ if (oldId != null) {
+ IgfsEntryInfo oldInfo = info(oldId);
+
+ assert oldInfo != null; // Otherwise cache is in inconsistent state.
+
+ // The contact is that we cannot overwrite a file locked for writing:
+ if (oldInfo.lockId() != null)
+ throw fsException("Failed to overwrite file (file is opened for writing) [path=" +
+ path + ", fileId=" + oldId + ", lockId=" + oldInfo.lockId() + ']');
+
+ id2InfoPrj.remove(oldId); // Remove the old one.
+ id2InfoPrj.invoke(parentInfo.id(), new IgfsMetaDirectoryListingRemoveProcessor(
+ path.name(), parentInfo.listing().get(path.name()).fileId()));
+
+ createNewEntry(newInfo, parentInfo.id(), path.name()); // Put new one.
+
+ igfsCtx.data().delete(oldInfo);
+ }
+
+ // Record CREATE event if needed.
+ if (oldId == null && evts.isRecordable(EventType.EVT_IGFS_FILE_CREATED))
+ pendingEvts.add(new IgfsEvent(path, locNode, EventType.EVT_IGFS_FILE_CREATED));
+
+ return new IgfsSecondaryOutputStreamDescriptor(newInfo, out);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsSecondaryOutputStreamDescriptor onFailure(Exception err) throws IgniteCheckedException {
+ U.closeQuiet(out);
+
+ U.error(log, "File create in DUAL mode failed [path=" + path + ", simpleCreate=" +
+ simpleCreate + ", props=" + props + ", overwrite=" + overwrite + ", bufferSize=" +
+ bufSize + ", replication=" + replication + ", blockSize=" + blockSize + ']', err);
+
+ throw new IgniteCheckedException("Failed to create the file due to secondary file system " +
+ "exception: " + path, err);
+ }
+ }
}
\ No newline at end of file