You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/06/07 07:56:17 UTC

[14/14] ignite git commit: WIP.

WIP.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/99d244a3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/99d244a3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/99d244a3

Branch: refs/heads/ignite-3264
Commit: 99d244a3009a5bcf347ce09da145a8f6cc3dc19f
Parents: cd92c9e
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Jun 7 10:55:21 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Jun 7 10:55:21 2016 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsMetaManager.java        | 341 +++++++++++--------
 1 file changed, 203 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/99d244a3/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 465116b..404d837 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -1882,121 +1882,8 @@ public class IgfsMetaManager extends IgfsManager {
                 // Events to fire (can be done outside of a transaction).
                 final Deque<IgfsEvent> pendingEvts = new LinkedList<>();
 
-                SynchronizationTask<IgfsSecondaryOutputStreamDescriptor> task =
-                    new SynchronizationTask<IgfsSecondaryOutputStreamDescriptor>() {
-                        /** Output stream to the secondary file system. */
-                        private OutputStream out;
-
-                        @Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath,
-                            IgfsEntryInfo> infos) throws Exception {
-                            validTxState(true);
-
-                            assert !infos.isEmpty();
-
-                            // Determine the first existing parent.
-                            IgfsPath parentPath = null;
-
-                            for (IgfsPath curPath : infos.keySet()) {
-                                if (parentPath == null || curPath.isSubDirectoryOf(parentPath))
-                                    parentPath = curPath;
-                            }
-
-                            assert parentPath != null;
-
-                            IgfsEntryInfo parentInfo = infos.get(parentPath);
-
-                            // Delegate to the secondary file system.
-                            out = simpleCreate ? fs.create(path, overwrite) :
-                                fs.create(path, bufSize, overwrite, replication, blockSize, props);
-
-                            IgfsPath parent0 = path.parent();
-
-                            assert parent0 != null : "path.parent() is null (are we creating ROOT?): " + path;
-
-                            // If some of the parent directories were missing, synchronize again.
-                            if (!parentPath.equals(parent0)) {
-                                parentInfo = synchronize(fs, parentPath, parentInfo, parent0, true, null);
-
-                                // Fire notification about missing directories creation.
-                                if (evts.isRecordable(EventType.EVT_IGFS_DIR_CREATED)) {
-                                    IgfsPath evtPath = parent0;
-
-                                    while (!parentPath.equals(evtPath)) {
-                                        pendingEvts.addFirst(new IgfsEvent(evtPath, locNode,
-                                            EventType.EVT_IGFS_DIR_CREATED));
-
-                                        evtPath = evtPath.parent();
-
-                                        assert evtPath != null; // If this fails, then ROOT does not exist.
-                                    }
-                                }
-                            }
-
-                            // Get created file info.
-                            IgfsFile status = fs.info(path);
-
-                            if (status == null)
-                                throw fsException("Failed to open output stream to the file created in " +
-                                    "the secondary file system because it no longer exists: " + path);
-                            else if (status.isDirectory())
-                                throw fsException("Failed to open output stream to the file created in " +
-                                    "the secondary file system because the path points to a directory: " + path);
-
-                            IgfsEntryInfo newInfo = IgfsUtils.createFile(
-                                IgniteUuid.randomUuid(),
-                                status.blockSize(),
-                                status.length(),
-                                affKey,
-                                createFileLockId(false),
-                                igfsCtx.igfs().evictExclude(path, false),
-                                status.properties(),
-                                status.accessTime(),
-                                status.modificationTime()
-                            );
-
-                            // Add new file info to the listing optionally removing the previous one.
-                            assert parentInfo != null;
-
-                            IgniteUuid oldId = putIfAbsentNonTx(parentInfo.id(), path.name(), newInfo);
-
-                            if (oldId != null) {
-                                IgfsEntryInfo oldInfo = info(oldId);
-
-                                assert oldInfo != null; // Otherwise cache is in inconsistent state.
-
-                                // The contact is that we cannot overwrite a file locked for writing:
-                                if (oldInfo.lockId() != null)
-                                    throw fsException("Failed to overwrite file (file is opened for writing) [path=" +
-                                        path + ", fileId=" + oldId + ", lockId=" + oldInfo.lockId() + ']');
-
-                                id2InfoPrj.remove(oldId); // Remove the old one.
-                                id2InfoPrj.invoke(parentInfo.id(), new IgfsMetaDirectoryListingRemoveProcessor(
-                                    path.name(), parentInfo.listing().get(path.name()).fileId()));
-
-                                createNewEntry(newInfo, parentInfo.id(), path.name()); // Put new one.
-
-                                igfsCtx.data().delete(oldInfo);
-                            }
-
-                            // Record CREATE event if needed.
-                            if (oldId == null && evts.isRecordable(EventType.EVT_IGFS_FILE_CREATED))
-                                pendingEvts.add(new IgfsEvent(path, locNode, EventType.EVT_IGFS_FILE_CREATED));
-
-                            return new IgfsSecondaryOutputStreamDescriptor(newInfo, out);
-                        }
-
-                        @Override public IgfsSecondaryOutputStreamDescriptor onFailure(Exception err)
-                            throws IgniteCheckedException {
-                            U.closeQuiet(out);
-
-                            U.error(log, "File create in DUAL mode failed [path=" + path + ", simpleCreate=" +
-                                simpleCreate + ", props=" + props + ", overwrite=" + overwrite + ", bufferSize=" +
-                                bufSize + ", replication=" + replication + ", blockSize=" + blockSize + ']', err);
-
-                            throw new IgniteCheckedException("Failed to create the file due to secondary file system " +
-                                "exception: " + path, err);
-                        }
-                    };
+                CreateFileSynchronizationTask task = new CreateFileSynchronizationTask(fs, path, simpleCreate, props,
+                    overwrite, bufSize, replication, blockSize, affKey, pendingEvts);
 
                 try {
                     return synchronizeAndExecute(task, fs, false, path.parent());
@@ -2956,29 +2843,6 @@ public class IgfsMetaManager extends IgfsManager {
     }
 
     /**
-     * Synchronization task interface.
-     */
-    private static interface SynchronizationTask<T> {
-        /**
-         * Callback handler in case synchronization was successful.
-         *
-         * @param infos Map from paths to corresponding infos.
-         * @return Task result.
-         * @throws Exception If failed.
-         */
-        public T onSuccess(Map<IgfsPath, IgfsEntryInfo> infos) throws Exception;
-
-        /**
-         * Callback handler in case synchronization failed.
-         *
-         * @param err Optional exception.
-         * @return Task result.
-         * @throws IgniteCheckedException In case exception is to be thrown in that case.
-         */
-        public T onFailure(Exception err) throws IgniteCheckedException;
-    }
-
-    /**
      * Append routine.
      *
      * @param path Path.
@@ -3352,4 +3216,205 @@ public class IgfsMetaManager extends IgfsManager {
         if (delWorker0 != null)
             delWorker0.signal();
     }
+
+    /**
+     * Synchronization task interface.
+     */
+    private static interface SynchronizationTask<T> {
+        /**
+         * Callback handler in case synchronization was successful.
+         *
+         * @param infos Map from paths to corresponding infos.
+         * @return Task result.
+         * @throws Exception If failed.
+         */
+        public T onSuccess(Map<IgfsPath, IgfsEntryInfo> infos) throws Exception;
+
+        /**
+         * Callback handler in case synchronization failed.
+         *
+         * @param err Optional exception.
+         * @return Task result.
+         * @throws IgniteCheckedException In case exception is to be thrown in that case.
+         */
+        public T onFailure(Exception err) throws IgniteCheckedException;
+    }
+
+    /**
+     * Synchronization task to create a file.
+     */
+    private class CreateFileSynchronizationTask implements SynchronizationTask<IgfsSecondaryOutputStreamDescriptor> {
+        /** Secondary file system. */
+        private IgfsSecondaryFileSystem fs;
+
+        /** Path. */
+        private IgfsPath path;
+
+        /** Simple create flag. */
+        private boolean simpleCreate;
+
+        /** Properties. */
+        private Map<String, String> props;
+
+        /** Overwrite flag. */
+        private boolean overwrite;
+
+        /** Buffer size. */
+        private int bufSize;
+
+        /** Replication factor. */
+        private short replication;
+
+        /** Block size. */
+        private long blockSize;
+
+        /** Affinity key. */
+        private IgniteUuid affKey;
+
+        /** Pending events. */
+        private Deque<IgfsEvent> pendingEvts;
+
+        /** Output stream to the secondary file system. */
+        private OutputStream out;
+
+        /**
+         * Constructor.
+         *
+         * @param fs Secondary file system.
+         * @param path Path.
+         * @param simpleCreate Simple create flag.
+         * @param props Properties.
+         * @param overwrite Overwrite flag.
+         * @param bufSize Buffer size.
+         * @param replication Replication factor.
+         * @param blockSize Block size.
+         * @param affKey Affinity key.
+         * @param pendingEvts Pending events.
+         */
+        public CreateFileSynchronizationTask(IgfsSecondaryFileSystem fs, IgfsPath path, boolean simpleCreate,
+            @Nullable Map<String, String> props, boolean overwrite, int bufSize, short replication, long blockSize,
+            IgniteUuid affKey, Deque<IgfsEvent> pendingEvts) {
+            this.fs = fs;
+            this.path = path;
+            this.simpleCreate = simpleCreate;
+            this.props = props;
+            this.overwrite = overwrite;
+            this.bufSize = bufSize;
+            this.replication = replication;
+            this.blockSize = blockSize;
+            this.affKey = affKey;
+            this.pendingEvts = pendingEvts;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath, IgfsEntryInfo> infos)
+            throws Exception {
+            validTxState(true);
+
+            assert !infos.isEmpty();
+
+            // Determine the first existing parent.
+            IgfsPath parentPath = null;
+
+            for (IgfsPath curPath : infos.keySet()) {
+                if (parentPath == null || curPath.isSubDirectoryOf(parentPath))
+                    parentPath = curPath;
+            }
+
+            assert parentPath != null;
+
+            IgfsEntryInfo parentInfo = infos.get(parentPath);
+
+            // Delegate to the secondary file system.
+            out = simpleCreate ? fs.create(path, overwrite) :
+                fs.create(path, bufSize, overwrite, replication, blockSize, props);
+
+            IgfsPath parent0 = path.parent();
+
+            assert parent0 != null : "path.parent() is null (are we creating ROOT?): " + path;
+
+            // If some of the parent directories were missing, synchronize again.
+            if (!parentPath.equals(parent0)) {
+                parentInfo = synchronize(fs, parentPath, parentInfo, parent0, true, null);
+
+                // Fire notification about missing directories creation.
+                if (evts.isRecordable(EventType.EVT_IGFS_DIR_CREATED)) {
+                    IgfsPath evtPath = parent0;
+
+                    while (!parentPath.equals(evtPath)) {
+                        pendingEvts.addFirst(new IgfsEvent(evtPath, locNode,
+                            EventType.EVT_IGFS_DIR_CREATED));
+
+                        evtPath = evtPath.parent();
+
+                        assert evtPath != null; // If this fails, then ROOT does not exist.
+                    }
+                }
+            }
+
+            // Get created file info.
+            IgfsFile status = fs.info(path);
+
+            if (status == null)
+                throw fsException("Failed to open output stream to the file created in " +
+                    "the secondary file system because it no longer exists: " + path);
+            else if (status.isDirectory())
+                throw fsException("Failed to open output stream to the file created in " +
+                    "the secondary file system because the path points to a directory: " + path);
+
+            IgfsEntryInfo newInfo = IgfsUtils.createFile(
+                IgniteUuid.randomUuid(),
+                status.blockSize(),
+                status.length(),
+                affKey,
+                createFileLockId(false),
+                igfsCtx.igfs().evictExclude(path, false),
+                status.properties(),
+                status.accessTime(),
+                status.modificationTime()
+            );
+
+            // Add new file info to the listing optionally removing the previous one.
+            assert parentInfo != null;
+
+            IgniteUuid oldId = putIfAbsentNonTx(parentInfo.id(), path.name(), newInfo);
+
+            if (oldId != null) {
+                IgfsEntryInfo oldInfo = info(oldId);
+
+                assert oldInfo != null; // Otherwise cache is in inconsistent state.
+
+                // The contact is that we cannot overwrite a file locked for writing:
+                if (oldInfo.lockId() != null)
+                    throw fsException("Failed to overwrite file (file is opened for writing) [path=" +
+                        path + ", fileId=" + oldId + ", lockId=" + oldInfo.lockId() + ']');
+
+                id2InfoPrj.remove(oldId); // Remove the old one.
+                id2InfoPrj.invoke(parentInfo.id(), new IgfsMetaDirectoryListingRemoveProcessor(
+                    path.name(), parentInfo.listing().get(path.name()).fileId()));
+
+                createNewEntry(newInfo, parentInfo.id(), path.name()); // Put new one.
+
+                igfsCtx.data().delete(oldInfo);
+            }
+
+            // Record CREATE event if needed.
+            if (oldId == null && evts.isRecordable(EventType.EVT_IGFS_FILE_CREATED))
+                pendingEvts.add(new IgfsEvent(path, locNode, EventType.EVT_IGFS_FILE_CREATED));
+
+            return new IgfsSecondaryOutputStreamDescriptor(newInfo, out);
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgfsSecondaryOutputStreamDescriptor onFailure(Exception err) throws IgniteCheckedException {
+            U.closeQuiet(out);
+
+            U.error(log, "File create in DUAL mode failed [path=" + path + ", simpleCreate=" +
+                simpleCreate + ", props=" + props + ", overwrite=" + overwrite + ", bufferSize=" +
+                bufSize + ", replication=" + replication + ", blockSize=" + blockSize + ']', err);
+
+            throw new IgniteCheckedException("Failed to create the file due to secondary file system " +
+                "exception: " + path, err);
+        }
+    }
 }
\ No newline at end of file