You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/06/10 08:47:52 UTC

[3/3] ignite git commit: IGNITE-3294: Moved all logic to meta manager "create" method.

IGNITE-3294: Moved all logic to meta manager "create" method.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fd0e5bf6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fd0e5bf6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fd0e5bf6

Branch: refs/heads/ignite-3294
Commit: fd0e5bf63181972aa8b64f6b404aae4d9a11cc8c
Parents: bab3b1b
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Jun 10 11:47:42 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Jun 10 11:47:42 2016 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsCreateResult.java       | 66 ++++++++++++++
 .../internal/processors/igfs/IgfsImpl.java      | 53 ++++++-----
 .../processors/igfs/IgfsMetaManager.java        | 38 ++++----
 .../IgfsSecondaryFileSystemCreateContext.java   | 92 ++++++++++++++++++++
 .../IgfsSecondaryOutputStreamDescriptor.java    | 59 -------------
 .../igfs/IgfsMetaManagerSelfTest.java           |  4 +-
 6 files changed, 208 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/fd0e5bf6/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCreateResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCreateResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCreateResult.java
new file mode 100644
index 0000000..0b09e02
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCreateResult.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.OutputStream;
+
+/**
+ * IGFS file create result.
+ */
+public class IgfsCreateResult {
+    /** File info in the primary file system. */
+    private final IgfsEntryInfo info;
+
+    /** Output stream to the secondary file system. */
+    private final OutputStream secondaryOut;
+
+    /**
+     * Constructor.
+     *
+     * @param info File info in the primary file system.
+     * @param secondaryOut Output stream to the secondary file system.
+     */
+    public IgfsCreateResult(IgfsEntryInfo info, @Nullable OutputStream secondaryOut) {
+        assert info != null;
+
+        this.info = info;
+        this.secondaryOut = secondaryOut;
+    }
+
+    /**
+     * @return File info in the primary file system.
+     */
+    public IgfsEntryInfo info() {
+        return info;
+    }
+
+    /**
+     * @return Output stream to the secondary file system.
+     */
+    @Nullable public OutputStream secondaryOutputStream() {
+        return secondaryOut;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsCreateResult.class, this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/fd0e5bf6/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 0808619..9f83f36 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -106,7 +106,6 @@ import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_DELETED;
 import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_READ;
 import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_DELETED;
 import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_READ;
-import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_WRITE;
 import static org.apache.ignite.events.EventType.EVT_IGFS_META_UPDATED;
 import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC;
 import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
@@ -1019,28 +1018,10 @@ public final class IgfsImpl implements IgfsEx {
                     log.debug("Open file for writing [path=" + path + ", bufSize=" + bufSize + ", overwrite=" +
                         overwrite + ", props=" + props + ']');
 
+                // Resolve mode.
                 final IgfsMode mode = resolveMode(path);
 
-                IgfsFileWorkerBatch batch;
-
-                if (mode != PRIMARY) {
-                    assert mode == DUAL_SYNC || mode == DUAL_ASYNC;
-
-                    await(path);
-
-                    IgfsSecondaryOutputStreamDescriptor desc = meta.createDual(secondaryFs, path, simpleCreate,
-                        props, overwrite, bufSize, (short) replication, groupBlockSize(), affKey);
-
-                    batch = newBatch(path, desc.out());
-
-                    IgfsOutputStreamImpl os = new IgfsOutputStreamImpl(igfsCtx, path, desc.info(),
-                        bufferSize(bufSize), mode, batch);
-
-                    IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_WRITE);
-
-                    return os;
-                }
-
+                // Prepare properties.
                 final Map<String, String> dirProps, fileProps;
 
                 if (props == null) {
@@ -1051,19 +1032,37 @@ public final class IgfsImpl implements IgfsEx {
                 else
                     dirProps = fileProps = new HashMap<>(props);
 
-                IgfsEntryInfo res = meta.create(
+                // Prepare context for DUAL mode.
+                IgfsSecondaryFileSystemCreateContext secondaryCtx = null;
+
+                if (mode != PRIMARY)
+                    secondaryCtx = new IgfsSecondaryFileSystemCreateContext(secondaryFs, simpleCreate,
+                        (short)replication, groupBlockSize(), bufSize);
+
+                // Await for async ops completion if in DUAL mode.
+                if (mode != PRIMARY)
+                    await(path);
+
+                // Perform create.
+                IgfsCreateResult res = meta.create(
                     path,
                     dirProps,
                     overwrite,
                     cfg.getBlockSize(),
                     affKey,
-                    evictExclude(path, true),
-                    fileProps
+                    evictExclude(path, mode == PRIMARY),
+                    fileProps,
+                    secondaryCtx
                 );
 
                 assert res != null;
 
-                return new IgfsOutputStreamImpl(igfsCtx, path, res, bufferSize(bufSize), mode, null);
+                // Create secondary file system batch.
+                OutputStream secondaryStream = res.secondaryOutputStream();
+
+                IgfsFileWorkerBatch batch = secondaryStream != null ? newBatch(path, secondaryStream) : null;
+
+                return new IgfsOutputStreamImpl(igfsCtx, path, res.info(), bufferSize(bufSize), mode, batch);
             }
         });
     }
@@ -1094,9 +1093,9 @@ public final class IgfsImpl implements IgfsEx {
 
                     await(path);
 
-                    IgfsSecondaryOutputStreamDescriptor desc = meta.appendDual(secondaryFs, path, bufSize);
+                    IgfsCreateResult desc = meta.appendDual(secondaryFs, path, bufSize);
 
-                    batch = newBatch(path, desc.out());
+                    batch = newBatch(path, desc.secondaryOutputStream());
 
                     return new IgfsOutputStreamImpl(igfsCtx, path, desc.info(), bufferSize(bufSize), mode, batch);
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/fd0e5bf6/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 3d82ebd..bff274f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -1792,7 +1792,7 @@ public class IgfsMetaManager extends IgfsManager {
      * @return Output stream descriptor.
      * @throws IgniteCheckedException If file creation failed.
      */
-    public IgfsSecondaryOutputStreamDescriptor createDual(final IgfsSecondaryFileSystem fs,
+    public IgfsCreateResult createDual(final IgfsSecondaryFileSystem fs,
         final IgfsPath path,
         final boolean simpleCreate,
         @Nullable final Map<String, String> props,
@@ -1839,19 +1839,19 @@ public class IgfsMetaManager extends IgfsManager {
      * @return Output stream descriptor.
      * @throws IgniteCheckedException If output stream open for append has failed.
      */
-    public IgfsSecondaryOutputStreamDescriptor appendDual(final IgfsSecondaryFileSystem fs, final IgfsPath path,
+    public IgfsCreateResult appendDual(final IgfsSecondaryFileSystem fs, final IgfsPath path,
         final int bufSize) throws IgniteCheckedException {
         if (busyLock.enterBusy()) {
             try {
                 assert fs != null;
                 assert path != null;
 
-                SynchronizationTask<IgfsSecondaryOutputStreamDescriptor> task =
-                    new SynchronizationTask<IgfsSecondaryOutputStreamDescriptor>() {
+                SynchronizationTask<IgfsCreateResult> task =
+                    new SynchronizationTask<IgfsCreateResult>() {
                         /** Output stream to the secondary file system. */
                         private OutputStream out;
 
-                        @Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath,
+                        @Override public IgfsCreateResult onSuccess(Map<IgfsPath,
                             IgfsEntryInfo> infos) throws Exception {
                             validTxState(true);
 
@@ -1890,10 +1890,10 @@ public class IgfsMetaManager extends IgfsManager {
                             // Set lock and return.
                             IgfsEntryInfo lockedInfo = invokeLock(info.id(), false);
 
-                            return new IgfsSecondaryOutputStreamDescriptor(lockedInfo, out);
+                            return new IgfsCreateResult(lockedInfo, out);
                         }
 
-                        @Override public IgfsSecondaryOutputStreamDescriptor onFailure(@Nullable Exception err)
+                        @Override public IgfsCreateResult onFailure(@Nullable Exception err)
                             throws IgniteCheckedException {
                             U.closeQuiet(out);
 
@@ -2811,19 +2811,25 @@ public class IgfsMetaManager extends IgfsManager {
      * @param affKey Affinity key.
      * @param evictExclude Evict exclude flag.
      * @param fileProps File properties.
-     * @return @return Resulting info.
+     * @param secondaryCtx Secondary file system create context.
+     * @return @return Operation result.
      * @throws IgniteCheckedException If failed.
      */
-    IgfsEntryInfo create(
+    IgfsCreateResult create(
         final IgfsPath path,
         Map<String, String> dirProps,
         final boolean overwrite,
         final int blockSize,
         final @Nullable IgniteUuid affKey,
         final boolean evictExclude,
-        @Nullable Map<String, String> fileProps) throws IgniteCheckedException {
+        @Nullable Map<String, String> fileProps,
+        @Nullable IgfsSecondaryFileSystemCreateContext secondaryCtx) throws IgniteCheckedException {
         validTxState(false);
 
+        if (secondaryCtx != null)
+            return createDual(secondaryCtx.fileSystem(), path, secondaryCtx.simpleCreate(), fileProps, overwrite,
+                secondaryCtx.bufferSize(), secondaryCtx.replication(), secondaryCtx.blockSize(), affKey);
+
         while (true) {
             if (busyLock.enterBusy()) {
                 try {
@@ -2897,7 +2903,7 @@ public class IgfsMetaManager extends IgfsManager {
 
                             IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EventType.EVT_IGFS_FILE_OPENED_WRITE);
 
-                            return newInfo;
+                            return new IgfsCreateResult(newInfo, null);
                         }
                         else {
                             // Create file and parent folders.
@@ -2913,7 +2919,7 @@ public class IgfsMetaManager extends IgfsManager {
                             // Generate events.
                             generateCreateEvents(res.createdPaths(), true);
 
-                            return res.info();
+                            return new IgfsCreateResult(res.info(), null);
                         }
                     }
                 }
@@ -3107,7 +3113,7 @@ public class IgfsMetaManager extends IgfsManager {
     /**
      * Synchronization task to create a file.
      */
-    private class CreateFileSynchronizationTask implements SynchronizationTask<IgfsSecondaryOutputStreamDescriptor> {
+    private class CreateFileSynchronizationTask implements SynchronizationTask<IgfsCreateResult> {
         /** Secondary file system. */
         private IgfsSecondaryFileSystem fs;
 
@@ -3171,7 +3177,7 @@ public class IgfsMetaManager extends IgfsManager {
         }
 
         /** {@inheritDoc} */
-        @Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath, IgfsEntryInfo> infos)
+        @Override public IgfsCreateResult onSuccess(Map<IgfsPath, IgfsEntryInfo> infos)
             throws Exception {
             validTxState(true);
 
@@ -3266,11 +3272,11 @@ public class IgfsMetaManager extends IgfsManager {
             if (oldId == null && evts.isRecordable(EventType.EVT_IGFS_FILE_CREATED))
                 pendingEvts.add(new IgfsEvent(path, locNode, EventType.EVT_IGFS_FILE_CREATED));
 
-            return new IgfsSecondaryOutputStreamDescriptor(newInfo, out);
+            return new IgfsCreateResult(newInfo, out);
         }
 
         /** {@inheritDoc} */
-        @Override public IgfsSecondaryOutputStreamDescriptor onFailure(Exception err) throws IgniteCheckedException {
+        @Override public IgfsCreateResult onFailure(Exception err) throws IgniteCheckedException {
             U.closeQuiet(out);
 
             U.error(log, "File create in DUAL mode failed [path=" + path + ", simpleCreate=" +

http://git-wip-us.apache.org/repos/asf/ignite/blob/fd0e5bf6/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemCreateContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemCreateContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemCreateContext.java
new file mode 100644
index 0000000..ef71c6c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemCreateContext.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
+
+/**
+ * Context for secondary file system create request.
+ */
+public class IgfsSecondaryFileSystemCreateContext {
+    /** File system. */
+    private final IgfsSecondaryFileSystem fs;
+
+    /** Simple create flag. */
+    private final boolean simpleCreate;
+
+    /** Replication. */
+    private final short replication;
+
+    /** Block size. */
+    private final long blockSize;
+
+    /** Buffer size. */
+    private final int bufSize;
+
+    /**
+     * Constructor.
+     *
+     * @param fs File system.
+     * @param simpleCreate Simple create flag.
+     * @param blockSize Block size.
+     * @param replication Replication.
+     */
+    public IgfsSecondaryFileSystemCreateContext(IgfsSecondaryFileSystem fs, boolean simpleCreate, short replication,
+        long blockSize, int bufSize) {
+        this.fs = fs;
+        this.simpleCreate = simpleCreate;
+        this.replication = replication;
+        this.blockSize = blockSize;
+        this.bufSize = bufSize;
+    }
+
+    /**
+     * @return Secondary file system.
+     */
+    public IgfsSecondaryFileSystem fileSystem() {
+        return fs;
+    }
+
+    /**
+     * @return Simple crate flag.
+     */
+    public boolean simpleCreate() {
+        return simpleCreate;
+    }
+
+    /**
+     * @return Replication.
+     */
+    public short replication() {
+        return replication;
+    }
+
+    /**
+     * @return Block size.
+     */
+    public long blockSize() {
+        return blockSize;
+    }
+
+    /**
+     * @return Buffer size.
+     */
+    public int bufferSize() {
+        return bufSize;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/fd0e5bf6/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryOutputStreamDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryOutputStreamDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryOutputStreamDescriptor.java
deleted file mode 100644
index 6bbc2c0..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryOutputStreamDescriptor.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.igfs;
-
-import java.io.OutputStream;
-
-/**
- * Descriptor of an output stream opened to the secondary file system.
- */
-public class IgfsSecondaryOutputStreamDescriptor {
-    /** File info in the primary file system. */
-    private final IgfsEntryInfo info;
-
-    /** Output stream to the secondary file system. */
-    private final OutputStream out;
-
-    /**
-     * Constructor.
-     *
-     * @param info File info in the primary file system.
-     * @param out Output stream to the secondary file system.
-     */
-    IgfsSecondaryOutputStreamDescriptor(IgfsEntryInfo info, OutputStream out) {
-        assert info != null;
-        assert out != null;
-
-        this.info = info;
-        this.out = out;
-    }
-
-    /**
-     * @return File info in the primary file system.
-     */
-    IgfsEntryInfo info() {
-        return info;
-    }
-
-    /**
-     * @return Output stream to the secondary file system.
-     */
-    OutputStream out() {
-        return out;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/fd0e5bf6/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
index 8b88157..6053d3b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
@@ -142,7 +142,7 @@ public class IgfsMetaManagerSelfTest extends IgfsCommonAbstractTest {
         assertEmpty(mgr.directoryListing(ROOT_ID));
 
         assertTrue(mgr.mkdirs(new IgfsPath("/dir"), IgfsImpl.DFLT_DIR_META));
-        assertNotNull(mgr.create(new IgfsPath("/file"), null, false, 400, null, false, null));
+        assertNotNull(mgr.create(new IgfsPath("/file"), null, false, 400, null, false, null, null));
 
         IgfsListingEntry dirEntry = mgr.directoryListing(ROOT_ID).get("dir");
         assertNotNull(dirEntry);
@@ -214,7 +214,7 @@ public class IgfsMetaManagerSelfTest extends IgfsCommonAbstractTest {
     private IgfsEntryInfo createFileAndGetInfo(String path) throws IgniteCheckedException {
         IgfsPath p = path(path);
 
-        IgfsEntryInfo res = mgr.create(p, null, false, 400, null, false, null);
+        IgfsEntryInfo res = mgr.create(p, null, false, 400, null, false, null, null).info();
 
         assert res != null;
         assert !res.isDirectory();