You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/06/10 08:47:52 UTC
[3/3] ignite git commit: IGNITE-3294: Moved all logic to meta manager
"create" method.
IGNITE-3294: Moved all logic to meta manager "create" method.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fd0e5bf6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fd0e5bf6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fd0e5bf6
Branch: refs/heads/ignite-3294
Commit: fd0e5bf63181972aa8b64f6b404aae4d9a11cc8c
Parents: bab3b1b
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Jun 10 11:47:42 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Jun 10 11:47:42 2016 +0300
----------------------------------------------------------------------
.../processors/igfs/IgfsCreateResult.java | 66 ++++++++++++++
.../internal/processors/igfs/IgfsImpl.java | 53 ++++++-----
.../processors/igfs/IgfsMetaManager.java | 38 ++++----
.../IgfsSecondaryFileSystemCreateContext.java | 92 ++++++++++++++++++++
.../IgfsSecondaryOutputStreamDescriptor.java | 59 -------------
.../igfs/IgfsMetaManagerSelfTest.java | 4 +-
6 files changed, 208 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/fd0e5bf6/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCreateResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCreateResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCreateResult.java
new file mode 100644
index 0000000..0b09e02
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCreateResult.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.OutputStream;
+
+/**
+ * IGFS file create result.
+ */
+public class IgfsCreateResult {
+ /** File info in the primary file system. */
+ private final IgfsEntryInfo info;
+
+ /** Output stream to the secondary file system. */
+ private final OutputStream secondaryOut;
+
+ /**
+ * Constructor.
+ *
+ * @param info File info in the primary file system.
+ * @param secondaryOut Output stream to the secondary file system.
+ */
+ public IgfsCreateResult(IgfsEntryInfo info, @Nullable OutputStream secondaryOut) {
+ assert info != null;
+
+ this.info = info;
+ this.secondaryOut = secondaryOut;
+ }
+
+ /**
+ * @return File info in the primary file system.
+ */
+ public IgfsEntryInfo info() {
+ return info;
+ }
+
+ /**
+ * @return Output stream to the secondary file system.
+ */
+ @Nullable public OutputStream secondaryOutputStream() {
+ return secondaryOut;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgfsCreateResult.class, this);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/fd0e5bf6/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 0808619..9f83f36 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -106,7 +106,6 @@ import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_DELETED;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_READ;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_DELETED;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_READ;
-import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_WRITE;
import static org.apache.ignite.events.EventType.EVT_IGFS_META_UPDATED;
import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC;
import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
@@ -1019,28 +1018,10 @@ public final class IgfsImpl implements IgfsEx {
log.debug("Open file for writing [path=" + path + ", bufSize=" + bufSize + ", overwrite=" +
overwrite + ", props=" + props + ']');
+ // Resolve mode.
final IgfsMode mode = resolveMode(path);
- IgfsFileWorkerBatch batch;
-
- if (mode != PRIMARY) {
- assert mode == DUAL_SYNC || mode == DUAL_ASYNC;
-
- await(path);
-
- IgfsSecondaryOutputStreamDescriptor desc = meta.createDual(secondaryFs, path, simpleCreate,
- props, overwrite, bufSize, (short) replication, groupBlockSize(), affKey);
-
- batch = newBatch(path, desc.out());
-
- IgfsOutputStreamImpl os = new IgfsOutputStreamImpl(igfsCtx, path, desc.info(),
- bufferSize(bufSize), mode, batch);
-
- IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_WRITE);
-
- return os;
- }
-
+ // Prepare properties.
final Map<String, String> dirProps, fileProps;
if (props == null) {
@@ -1051,19 +1032,37 @@ public final class IgfsImpl implements IgfsEx {
else
dirProps = fileProps = new HashMap<>(props);
- IgfsEntryInfo res = meta.create(
+ // Prepare context for DUAL mode.
+ IgfsSecondaryFileSystemCreateContext secondaryCtx = null;
+
+ if (mode != PRIMARY)
+ secondaryCtx = new IgfsSecondaryFileSystemCreateContext(secondaryFs, simpleCreate,
+ (short)replication, groupBlockSize(), bufSize);
+
+ // Await for async ops completion if in DUAL mode.
+ if (mode != PRIMARY)
+ await(path);
+
+ // Perform create.
+ IgfsCreateResult res = meta.create(
path,
dirProps,
overwrite,
cfg.getBlockSize(),
affKey,
- evictExclude(path, true),
- fileProps
+ evictExclude(path, mode == PRIMARY),
+ fileProps,
+ secondaryCtx
);
assert res != null;
- return new IgfsOutputStreamImpl(igfsCtx, path, res, bufferSize(bufSize), mode, null);
+ // Create secondary file system batch.
+ OutputStream secondaryStream = res.secondaryOutputStream();
+
+ IgfsFileWorkerBatch batch = secondaryStream != null ? newBatch(path, secondaryStream) : null;
+
+ return new IgfsOutputStreamImpl(igfsCtx, path, res.info(), bufferSize(bufSize), mode, batch);
}
});
}
@@ -1094,9 +1093,9 @@ public final class IgfsImpl implements IgfsEx {
await(path);
- IgfsSecondaryOutputStreamDescriptor desc = meta.appendDual(secondaryFs, path, bufSize);
+ IgfsCreateResult desc = meta.appendDual(secondaryFs, path, bufSize);
- batch = newBatch(path, desc.out());
+ batch = newBatch(path, desc.secondaryOutputStream());
return new IgfsOutputStreamImpl(igfsCtx, path, desc.info(), bufferSize(bufSize), mode, batch);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fd0e5bf6/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 3d82ebd..bff274f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -1792,7 +1792,7 @@ public class IgfsMetaManager extends IgfsManager {
* @return Output stream descriptor.
* @throws IgniteCheckedException If file creation failed.
*/
- public IgfsSecondaryOutputStreamDescriptor createDual(final IgfsSecondaryFileSystem fs,
+ public IgfsCreateResult createDual(final IgfsSecondaryFileSystem fs,
final IgfsPath path,
final boolean simpleCreate,
@Nullable final Map<String, String> props,
@@ -1839,19 +1839,19 @@ public class IgfsMetaManager extends IgfsManager {
* @return Output stream descriptor.
* @throws IgniteCheckedException If output stream open for append has failed.
*/
- public IgfsSecondaryOutputStreamDescriptor appendDual(final IgfsSecondaryFileSystem fs, final IgfsPath path,
+ public IgfsCreateResult appendDual(final IgfsSecondaryFileSystem fs, final IgfsPath path,
final int bufSize) throws IgniteCheckedException {
if (busyLock.enterBusy()) {
try {
assert fs != null;
assert path != null;
- SynchronizationTask<IgfsSecondaryOutputStreamDescriptor> task =
- new SynchronizationTask<IgfsSecondaryOutputStreamDescriptor>() {
+ SynchronizationTask<IgfsCreateResult> task =
+ new SynchronizationTask<IgfsCreateResult>() {
/** Output stream to the secondary file system. */
private OutputStream out;
- @Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath,
+ @Override public IgfsCreateResult onSuccess(Map<IgfsPath,
IgfsEntryInfo> infos) throws Exception {
validTxState(true);
@@ -1890,10 +1890,10 @@ public class IgfsMetaManager extends IgfsManager {
// Set lock and return.
IgfsEntryInfo lockedInfo = invokeLock(info.id(), false);
- return new IgfsSecondaryOutputStreamDescriptor(lockedInfo, out);
+ return new IgfsCreateResult(lockedInfo, out);
}
- @Override public IgfsSecondaryOutputStreamDescriptor onFailure(@Nullable Exception err)
+ @Override public IgfsCreateResult onFailure(@Nullable Exception err)
throws IgniteCheckedException {
U.closeQuiet(out);
@@ -2811,19 +2811,25 @@ public class IgfsMetaManager extends IgfsManager {
* @param affKey Affinity key.
* @param evictExclude Evict exclude flag.
* @param fileProps File properties.
- * @return @return Resulting info.
+ * @param secondaryCtx Secondary file system create context.
+ * @return @return Operation result.
* @throws IgniteCheckedException If failed.
*/
- IgfsEntryInfo create(
+ IgfsCreateResult create(
final IgfsPath path,
Map<String, String> dirProps,
final boolean overwrite,
final int blockSize,
final @Nullable IgniteUuid affKey,
final boolean evictExclude,
- @Nullable Map<String, String> fileProps) throws IgniteCheckedException {
+ @Nullable Map<String, String> fileProps,
+ @Nullable IgfsSecondaryFileSystemCreateContext secondaryCtx) throws IgniteCheckedException {
validTxState(false);
+ if (secondaryCtx != null)
+ return createDual(secondaryCtx.fileSystem(), path, secondaryCtx.simpleCreate(), fileProps, overwrite,
+ secondaryCtx.bufferSize(), secondaryCtx.replication(), secondaryCtx.blockSize(), affKey);
+
while (true) {
if (busyLock.enterBusy()) {
try {
@@ -2897,7 +2903,7 @@ public class IgfsMetaManager extends IgfsManager {
IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EventType.EVT_IGFS_FILE_OPENED_WRITE);
- return newInfo;
+ return new IgfsCreateResult(newInfo, null);
}
else {
// Create file and parent folders.
@@ -2913,7 +2919,7 @@ public class IgfsMetaManager extends IgfsManager {
// Generate events.
generateCreateEvents(res.createdPaths(), true);
- return res.info();
+ return new IgfsCreateResult(res.info(), null);
}
}
}
@@ -3107,7 +3113,7 @@ public class IgfsMetaManager extends IgfsManager {
/**
* Synchronization task to create a file.
*/
- private class CreateFileSynchronizationTask implements SynchronizationTask<IgfsSecondaryOutputStreamDescriptor> {
+ private class CreateFileSynchronizationTask implements SynchronizationTask<IgfsCreateResult> {
/** Secondary file system. */
private IgfsSecondaryFileSystem fs;
@@ -3171,7 +3177,7 @@ public class IgfsMetaManager extends IgfsManager {
}
/** {@inheritDoc} */
- @Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath, IgfsEntryInfo> infos)
+ @Override public IgfsCreateResult onSuccess(Map<IgfsPath, IgfsEntryInfo> infos)
throws Exception {
validTxState(true);
@@ -3266,11 +3272,11 @@ public class IgfsMetaManager extends IgfsManager {
if (oldId == null && evts.isRecordable(EventType.EVT_IGFS_FILE_CREATED))
pendingEvts.add(new IgfsEvent(path, locNode, EventType.EVT_IGFS_FILE_CREATED));
- return new IgfsSecondaryOutputStreamDescriptor(newInfo, out);
+ return new IgfsCreateResult(newInfo, out);
}
/** {@inheritDoc} */
- @Override public IgfsSecondaryOutputStreamDescriptor onFailure(Exception err) throws IgniteCheckedException {
+ @Override public IgfsCreateResult onFailure(Exception err) throws IgniteCheckedException {
U.closeQuiet(out);
U.error(log, "File create in DUAL mode failed [path=" + path + ", simpleCreate=" +
http://git-wip-us.apache.org/repos/asf/ignite/blob/fd0e5bf6/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemCreateContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemCreateContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemCreateContext.java
new file mode 100644
index 0000000..ef71c6c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemCreateContext.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
+
+/**
+ * Context for secondary file system create request.
+ */
+public class IgfsSecondaryFileSystemCreateContext {
+ /** File system. */
+ private final IgfsSecondaryFileSystem fs;
+
+ /** Simple create flag. */
+ private final boolean simpleCreate;
+
+ /** Replication. */
+ private final short replication;
+
+ /** Block size. */
+ private final long blockSize;
+
+ /** Buffer size. */
+ private final int bufSize;
+
+ /**
+ * Constructor.
+ *
+ * @param fs File system.
+ * @param simpleCreate Simple create flag.
+ * @param blockSize Block size.
+ * @param replication Replication.
+ */
+ public IgfsSecondaryFileSystemCreateContext(IgfsSecondaryFileSystem fs, boolean simpleCreate, short replication,
+ long blockSize, int bufSize) {
+ this.fs = fs;
+ this.simpleCreate = simpleCreate;
+ this.replication = replication;
+ this.blockSize = blockSize;
+ this.bufSize = bufSize;
+ }
+
+ /**
+ * @return Secondary file system.
+ */
+ public IgfsSecondaryFileSystem fileSystem() {
+ return fs;
+ }
+
+ /**
+ * @return Simple crate flag.
+ */
+ public boolean simpleCreate() {
+ return simpleCreate;
+ }
+
+ /**
+ * @return Replication.
+ */
+ public short replication() {
+ return replication;
+ }
+
+ /**
+ * @return Block size.
+ */
+ public long blockSize() {
+ return blockSize;
+ }
+
+ /**
+ * @return Buffer size.
+ */
+ public int bufferSize() {
+ return bufSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/fd0e5bf6/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryOutputStreamDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryOutputStreamDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryOutputStreamDescriptor.java
deleted file mode 100644
index 6bbc2c0..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryOutputStreamDescriptor.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.igfs;
-
-import java.io.OutputStream;
-
-/**
- * Descriptor of an output stream opened to the secondary file system.
- */
-public class IgfsSecondaryOutputStreamDescriptor {
- /** File info in the primary file system. */
- private final IgfsEntryInfo info;
-
- /** Output stream to the secondary file system. */
- private final OutputStream out;
-
- /**
- * Constructor.
- *
- * @param info File info in the primary file system.
- * @param out Output stream to the secondary file system.
- */
- IgfsSecondaryOutputStreamDescriptor(IgfsEntryInfo info, OutputStream out) {
- assert info != null;
- assert out != null;
-
- this.info = info;
- this.out = out;
- }
-
- /**
- * @return File info in the primary file system.
- */
- IgfsEntryInfo info() {
- return info;
- }
-
- /**
- * @return Output stream to the secondary file system.
- */
- OutputStream out() {
- return out;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/fd0e5bf6/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
index 8b88157..6053d3b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
@@ -142,7 +142,7 @@ public class IgfsMetaManagerSelfTest extends IgfsCommonAbstractTest {
assertEmpty(mgr.directoryListing(ROOT_ID));
assertTrue(mgr.mkdirs(new IgfsPath("/dir"), IgfsImpl.DFLT_DIR_META));
- assertNotNull(mgr.create(new IgfsPath("/file"), null, false, 400, null, false, null));
+ assertNotNull(mgr.create(new IgfsPath("/file"), null, false, 400, null, false, null, null));
IgfsListingEntry dirEntry = mgr.directoryListing(ROOT_ID).get("dir");
assertNotNull(dirEntry);
@@ -214,7 +214,7 @@ public class IgfsMetaManagerSelfTest extends IgfsCommonAbstractTest {
private IgfsEntryInfo createFileAndGetInfo(String path) throws IgniteCheckedException {
IgfsPath p = path(path);
- IgfsEntryInfo res = mgr.create(p, null, false, 400, null, false, null);
+ IgfsEntryInfo res = mgr.create(p, null, false, 400, null, false, null, null).info();
assert res != null;
assert !res.isDirectory();