You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/03/31 12:59:21 UTC
ignite git commit: IGNITE-1631: IGFS: Fixed append operation with
[create=true] for DUAL modes. This closes #580.
Repository: ignite
Updated Branches:
refs/heads/master bbf87df68 -> f19955314
IGNITE-1631: IGFS: Fixed append operation with [create=true] for DUAL modes. This closes #580.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f1995531
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f1995531
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f1995531
Branch: refs/heads/master
Commit: f199553142ea5a459904f9e7bcef5e265b4e5d5c
Parents: bbf87df
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Mar 31 13:59:11 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Mar 31 13:59:11 2016 +0300
----------------------------------------------------------------------
.../internal/processors/igfs/IgfsImpl.java | 4 +-
.../processors/igfs/IgfsMetaManager.java | 315 +++++++++++--------
.../processors/igfs/IgfsAbstractSelfTest.java | 3 -
3 files changed, 183 insertions(+), 139 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f1995531/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 358aaf0..ec3a45e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -96,13 +96,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_DELETED;
-import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_RENAMED;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_READ;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_WRITE;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_DELETED;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_READ;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_WRITE;
-import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_RENAMED;
import static org.apache.ignite.events.EventType.EVT_IGFS_META_UPDATED;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -1072,7 +1070,7 @@ public final class IgfsImpl implements IgfsEx {
await(path);
- IgfsSecondaryOutputStreamDescriptor desc = meta.appendDual(secondaryFs, path, bufSize);
+ IgfsSecondaryOutputStreamDescriptor desc = meta.appendDual(secondaryFs, path, bufSize, create);
batch = newBatch(path, desc.out());
http://git-wip-us.apache.org/repos/asf/ignite/blob/f1995531/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index a4212ba..7b1d68c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T1;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.transactions.TransactionConcurrency;
@@ -514,11 +515,11 @@ public class IgfsMetaManager extends IgfsManager {
* Lock the file explicitly outside of transaction.
*
* @param fileId File ID to lock.
- * @param delete If file is being locked for delete.
+ * @param del If file is being locked for delete.
* @return Locked file info or {@code null} if file cannot be locked or doesn't exist.
* @throws IgniteCheckedException If the file with such id does not exist, or on another failure.
*/
- public @Nullable IgfsEntryInfo lock(IgniteUuid fileId, boolean delete) throws IgniteCheckedException {
+ public @Nullable IgfsEntryInfo lock(IgniteUuid fileId, boolean del) throws IgniteCheckedException {
if (busyLock.enterBusy()) {
try {
validTxState(false);
@@ -535,7 +536,7 @@ public class IgfsMetaManager extends IgfsManager {
if (oldInfo.lockId() != null)
return null; // The file is already locked, we cannot lock it.
- IgfsEntryInfo newInfo = invokeLock(fileId, delete);
+ IgfsEntryInfo newInfo = invokeLock(fileId, del);
tx.commit();
@@ -556,11 +557,11 @@ public class IgfsMetaManager extends IgfsManager {
/**
* Create file lock ID.
*
- * @param delete If lock ID is required for file deletion.
+ * @param del If lock ID is required for file deletion.
* @return Lock ID.
*/
- private IgniteUuid createFileLockId(boolean delete) {
- if (delete)
+ private IgniteUuid createFileLockId(boolean del) {
+ if (del)
return IgfsUtils.DELETE_LOCK_ID;
return IgniteUuid.fromUuid(locNode.id());
@@ -1736,12 +1737,12 @@ public class IgfsMetaManager extends IgfsManager {
* Invoke lock processor.
*
* @param id File ID.
- * @param delete Whether lock is taken for delete.
+ * @param del Whether lock is taken for delete.
* @return Resulting file info.
* @throws IgniteCheckedException If failed.
*/
- private IgfsEntryInfo invokeLock(IgniteUuid id, boolean delete) throws IgniteCheckedException {
- return invokeAndGet(id, new IgfsMetaFileLockProcessor(createFileLockId(delete)));
+ private IgfsEntryInfo invokeLock(IgniteUuid id, boolean del) throws IgniteCheckedException {
+ return invokeAndGet(id, new IgfsMetaFileLockProcessor(createFileLockId(del)));
}
/**
@@ -1786,7 +1787,7 @@ public class IgfsMetaManager extends IgfsManager {
}
/**
- * Create the file in DUAL mode.
+ * A delegate method that performs file creation in the synchronization task.
*
* @param fs File system.
* @param path Path.
@@ -1797,134 +1798,162 @@ public class IgfsMetaManager extends IgfsManager {
* @param replication Replication factor.
* @param blockSize Block size.
* @param affKey Affinity key.
+ * @param infos Map from paths to corresponding infos.
+ * @param pendingEvts A non-null collection the events are to be accumulated in.
+ * @param t1 A signle-object tuple to hold the created output stream.
* @return Output stream descriptor.
- * @throws IgniteCheckedException If file creation failed.
+ * @throws Exception On error.
*/
- public IgfsSecondaryOutputStreamDescriptor createDual(final IgfsSecondaryFileSystem fs,
- final IgfsPath path,
- final boolean simpleCreate,
- @Nullable final Map<String, String> props,
- final boolean overwrite,
- final int bufSize,
- final short replication,
- final long blockSize,
- final IgniteUuid affKey)
- throws IgniteCheckedException
- {
- if (busyLock.enterBusy()) {
- try {
- assert fs != null;
- assert path != null;
+ IgfsSecondaryOutputStreamDescriptor onSuccessCreate(IgfsSecondaryFileSystem fs, IgfsPath path,
+ boolean simpleCreate, @Nullable final Map<String, String> props, boolean overwrite,
+ int bufSize, short replication, long blockSize, IgniteUuid affKey, Map<IgfsPath, IgfsEntryInfo> infos,
+ final Deque<IgfsEvent> pendingEvts, final T1<OutputStream> t1) throws Exception {
+ validTxState(true);
- // Events to fire (can be done outside of a transaction).
- final Deque<IgfsEvent> pendingEvts = new LinkedList<>();
+ assert !infos.isEmpty();
- SynchronizationTask<IgfsSecondaryOutputStreamDescriptor> task =
- new SynchronizationTask<IgfsSecondaryOutputStreamDescriptor>() {
- /** Output stream to the secondary file system. */
- private OutputStream out;
+ // Determine the first existing parent.
+ IgfsPath parentPath = null;
- @Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath,
- IgfsEntryInfo> infos) throws Exception {
- validTxState(true);
+ for (IgfsPath curPath : infos.keySet()) {
+ if (parentPath == null || curPath.isSubDirectoryOf(parentPath))
+ parentPath = curPath;
+ }
- assert !infos.isEmpty();
+ assert parentPath != null;
- // Determine the first existing parent.
- IgfsPath parentPath = null;
+ IgfsEntryInfo parentInfo = infos.get(parentPath);
- for (IgfsPath curPath : infos.keySet()) {
- if (parentPath == null || curPath.isSubDirectoryOf(parentPath))
- parentPath = curPath;
- }
+ // Delegate to the secondary file system.
+ OutputStream out = simpleCreate ? fs.create(path, overwrite) :
+ fs.create(path, bufSize, overwrite, replication, blockSize, props);
- assert parentPath != null;
+ t1.set(out);
- IgfsEntryInfo parentInfo = infos.get(parentPath);
+ IgfsPath parent0 = path.parent();
- // Delegate to the secondary file system.
- out = simpleCreate ? fs.create(path, overwrite) :
- fs.create(path, bufSize, overwrite, replication, blockSize, props);
+ assert parent0 != null : "path.parent() is null (are we creating ROOT?): " + path;
- IgfsPath parent0 = path.parent();
+ // If some of the parent directories were missing, synchronize again.
+ if (!parentPath.equals(parent0)) {
+ parentInfo = synchronize(fs, parentPath, parentInfo, parent0, true, null);
- assert parent0 != null : "path.parent() is null (are we creating ROOT?): " + path;
+ // Fire notification about missing directories creation.
+ if (evts.isRecordable(EventType.EVT_IGFS_DIR_CREATED)) {
+ IgfsPath evtPath = parent0;
- // If some of the parent directories were missing, synchronize again.
- if (!parentPath.equals(parent0)) {
- parentInfo = synchronize(fs, parentPath, parentInfo, parent0, true, null);
+ while (!parentPath.equals(evtPath)) {
+ pendingEvts.addFirst(new IgfsEvent(evtPath, locNode,
+ EventType.EVT_IGFS_DIR_CREATED));
- // Fire notification about missing directories creation.
- if (evts.isRecordable(EventType.EVT_IGFS_DIR_CREATED)) {
- IgfsPath evtPath = parent0;
+ evtPath = evtPath.parent();
- while (!parentPath.equals(evtPath)) {
- pendingEvts.addFirst(new IgfsEvent(evtPath, locNode,
- EventType.EVT_IGFS_DIR_CREATED));
+ assert evtPath != null; // If this fails, then ROOT does not exist.
+ }
+ }
+ }
- evtPath = evtPath.parent();
+ // Get created file info.
+ IgfsFile status = fs.info(path);
- assert evtPath != null; // If this fails, then ROOT does not exist.
- }
- }
- }
+ if (status == null)
+ throw fsException("Failed to open output stream to the file created in " +
+ "the secondary file system because it no longer exists: " + path);
+ else if (status.isDirectory())
+ throw fsException("Failed to open output stream to the file created in " +
+ "the secondary file system because the path points to a directory: " + path);
- // Get created file info.
- IgfsFile status = fs.info(path);
+ IgfsEntryInfo newInfo = IgfsUtils.createFile(
+ IgniteUuid.randomUuid(),
+ status.blockSize(),
+ status.length(),
+ affKey,
+ createFileLockId(false),
+ igfsCtx.igfs().evictExclude(path, false),
+ status.properties(),
+ status.accessTime(),
+ status.modificationTime()
+ );
- if (status == null)
- throw fsException("Failed to open output stream to the file created in " +
- "the secondary file system because it no longer exists: " + path);
- else if (status.isDirectory())
- throw fsException("Failed to open output stream to the file created in " +
- "the secondary file system because the path points to a directory: " + path);
+ // Add new file info to the listing optionally removing the previous one.
+ assert parentInfo != null;
- IgfsEntryInfo newInfo = IgfsUtils.createFile(
- IgniteUuid.randomUuid(),
- status.blockSize(),
- status.length(),
- affKey,
- createFileLockId(false),
- igfsCtx.igfs().evictExclude(path, false),
- status.properties(),
- status.accessTime(),
- status.modificationTime()
- );
+ IgniteUuid oldId = putIfAbsentNonTx(parentInfo.id(), path.name(), newInfo);
- // Add new file info to the listing optionally removing the previous one.
- assert parentInfo != null;
+ if (oldId != null) {
+ IgfsEntryInfo oldInfo = info(oldId);
- IgniteUuid oldId = putIfAbsentNonTx(parentInfo.id(), path.name(), newInfo);
+ assert oldInfo != null; // Otherwise cache is in inconsistent state.
- if (oldId != null) {
- IgfsEntryInfo oldInfo = info(oldId);
+ // The contact is that we cannot overwrite a file locked for writing:
+ if (oldInfo.lockId() != null)
+ throw fsException("Failed to overwrite file (file is opened for writing) [path=" +
+ path + ", fileId=" + oldId + ", lockId=" + oldInfo.lockId() + ']');
- assert oldInfo != null; // Otherwise cache is in inconsistent state.
+ id2InfoPrj.remove(oldId); // Remove the old one.
+ id2InfoPrj.invoke(parentInfo.id(), new IgfsMetaDirectoryListingRemoveProcessor(
+ path.name(), parentInfo.listing().get(path.name()).fileId()));
- // The contact is that we cannot overwrite a file locked for writing:
- if (oldInfo.lockId() != null)
- throw fsException("Failed to overwrite file (file is opened for writing) [path=" +
- path + ", fileId=" + oldId + ", lockId=" + oldInfo.lockId() + ']');
+ createNewEntry(newInfo, parentInfo.id(), path.name()); // Put new one.
- id2InfoPrj.remove(oldId); // Remove the old one.
- id2InfoPrj.invoke(parentInfo.id(), new IgfsMetaDirectoryListingRemoveProcessor(
- path.name(), parentInfo.listing().get(path.name()).fileId()));
+ igfsCtx.data().delete(oldInfo);
+ }
- createNewEntry(newInfo, parentInfo.id(), path.name()); // Put new one.
+ // Record CREATE event if needed.
+ if (oldId == null && evts.isRecordable(EventType.EVT_IGFS_FILE_CREATED))
+ pendingEvts.add(new IgfsEvent(path, locNode, EventType.EVT_IGFS_FILE_CREATED));
- igfsCtx.data().delete(oldInfo);
- }
+ return new IgfsSecondaryOutputStreamDescriptor(newInfo, out);
+ }
- // Record CREATE event if needed.
- if (oldId == null && evts.isRecordable(EventType.EVT_IGFS_FILE_CREATED))
- pendingEvts.add(new IgfsEvent(path, locNode, EventType.EVT_IGFS_FILE_CREATED));
+ /**
+ * Create the file in DUAL mode.
+ *
+ * @param fs File system.
+ * @param path Path.
+ * @param simpleCreate "Simple create" flag.
+ * @param props Properties..
+ * @param overwrite Overwrite flag.
+ * @param bufSize Buffer size.
+ * @param replication Replication factor.
+ * @param blockSize Block size.
+ * @param affKey Affinity key.
+ * @return Output stream descriptor.
+ * @throws IgniteCheckedException If file creation failed.
+ */
+ public IgfsSecondaryOutputStreamDescriptor createDual(final IgfsSecondaryFileSystem fs,
+ final IgfsPath path,
+ final boolean simpleCreate,
+ @Nullable final Map<String, String> props,
+ final boolean overwrite,
+ final int bufSize,
+ final short replication,
+ final long blockSize,
+ final IgniteUuid affKey)
+ throws IgniteCheckedException
+ {
+ if (busyLock.enterBusy()) {
+ try {
+ assert fs != null;
+ assert path != null;
+
+ // Events to fire (can be done outside of a transaction).
+ final Deque<IgfsEvent> pendingEvts = new LinkedList<>();
+
+ SynchronizationTask<IgfsSecondaryOutputStreamDescriptor> task =
+ new SynchronizationTask<IgfsSecondaryOutputStreamDescriptor>() {
+ /** Container for the secondary file system output stream. */
+ private final T1<OutputStream> outT1 = new T1<>(null);
- return new IgfsSecondaryOutputStreamDescriptor(newInfo, out);
+ @Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath,
+ IgfsEntryInfo> infos) throws Exception {
+ return onSuccessCreate(fs, path, simpleCreate, props,
+ overwrite, bufSize, replication, blockSize, affKey, infos, pendingEvts, outT1);
}
@Override public IgfsSecondaryOutputStreamDescriptor onFailure(Exception err)
throws IgniteCheckedException {
- U.closeQuiet(out);
+ U.closeQuiet(outT1.get());
U.error(log, "File create in DUAL mode failed [path=" + path + ", simpleCreate=" +
simpleCreate + ", props=" + props + ", overwrite=" + overwrite + ", bufferSize=" +
@@ -1957,20 +1986,24 @@ public class IgfsMetaManager extends IgfsManager {
* @param fs File system.
* @param path Path.
* @param bufSize Buffer size.
+ * @param create Create flag.
* @return Output stream descriptor.
* @throws IgniteCheckedException If output stream open for append has failed.
*/
- public IgfsSecondaryOutputStreamDescriptor appendDual(final IgfsSecondaryFileSystem fs, final IgfsPath path,
- final int bufSize) throws IgniteCheckedException {
+ IgfsSecondaryOutputStreamDescriptor appendDual(final IgfsSecondaryFileSystem fs, final IgfsPath path,
+ final int bufSize, final boolean create) throws IgniteCheckedException {
if (busyLock.enterBusy()) {
try {
assert fs != null;
assert path != null;
+ // Events to fire (can be done outside of a transaction).
+ final Deque<IgfsEvent> pendingEvts = new LinkedList<>();
+
SynchronizationTask<IgfsSecondaryOutputStreamDescriptor> task =
new SynchronizationTask<IgfsSecondaryOutputStreamDescriptor>() {
- /** Output stream to the secondary file system. */
- private OutputStream out;
+ /** Container for the secondary file system output stream. */
+ private final T1<OutputStream> outT1 = new T1<>(null);
@Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath,
IgfsEntryInfo> infos) throws Exception {
@@ -1978,55 +2011,71 @@ public class IgfsMetaManager extends IgfsManager {
final IgfsEntryInfo info = infos.get(path);
- if (info.isDirectory())
- throw fsException("Failed to open output stream to the file in the " +
- "secondary file system because the path points to a directory: " + path);
+ final IgfsEntryInfo lockedInfo;
- out = fs.append(path, bufSize, false, null);
+ if (info == null)
+ return onSuccessCreate(fs, path, true/*simpleCreate*/, null,
+ false/*overwrite*/, bufSize, (short)0, 0, null, infos, pendingEvts, outT1);
+ else {
+ if (info.isDirectory())
+ throw fsException("Failed to open output stream to the file in the " +
+ "secondary file system because the path points to a directory: " + path);
+
+ outT1.set(fs.append(path, bufSize, false, null));
- // Synchronize file ending.
- long len = info.length();
- int blockSize = info.blockSize();
+ // Synchronize file ending.
+ long len = info.length();
+ int blockSize = info.blockSize();
- int remainder = (int)(len % blockSize);
+ int remainder = (int) (len % blockSize);
- if (remainder > 0) {
- int blockIdx = (int)(len / blockSize);
+ if (remainder > 0) {
+ int blockIdx = (int) (len / blockSize);
- try (IgfsSecondaryFileSystemPositionedReadable reader = fs.open(path, bufSize)) {
- IgniteInternalFuture<byte[]> fut =
- igfsCtx.data().dataBlock(info, path, blockIdx, reader);
+ try (IgfsSecondaryFileSystemPositionedReadable reader = fs.open(path, bufSize)) {
+ IgniteInternalFuture<byte[]> fut =
+ igfsCtx.data().dataBlock(info, path, blockIdx, reader);
- assert fut != null;
+ assert fut != null;
- fut.get();
+ fut.get();
+ }
}
- }
- if (info.lockId() != null) {
- throw fsException("Failed to open file (file is opened for writing) [path=" +
- path + ", fileId=" + info.id() + ", lockId=" + info.lockId() + ']');
+ if (info.lockId() != null) {
+ throw fsException("Failed to open file (file is opened for writing) [path=" +
+ path + ", fileId=" + info.id() + ", lockId=" + info.lockId() + ']');
+ }
+
+ // Set lock and return.
+ lockedInfo = invokeLock(info.id(), false);
}
- // Set lock and return.
- IgfsEntryInfo lockedInfo = invokeLock(info.id(), false);
+ if (evts.isRecordable(EventType.EVT_IGFS_FILE_OPENED_WRITE))
+ pendingEvts.add(new IgfsEvent(path, locNode, EventType.EVT_IGFS_FILE_OPENED_WRITE));
- return new IgfsSecondaryOutputStreamDescriptor(lockedInfo, out);
+ return new IgfsSecondaryOutputStreamDescriptor(lockedInfo, outT1.get());
}
@Override public IgfsSecondaryOutputStreamDescriptor onFailure(@Nullable Exception err)
throws IgniteCheckedException {
- U.closeQuiet(out);
+ U.closeQuiet(outT1.get());
U.error(log, "File append in DUAL mode failed [path=" + path + ", bufferSize=" + bufSize +
']', err);
throw new IgniteCheckedException("Failed to append to the file due to secondary file " +
"system exception: " + path, err);
- }
- };
+ }
+ };
- return synchronizeAndExecute(task, fs, true, path);
+ try {
+ return synchronizeAndExecute(task, fs, !create/*strict*/, path);
+ }
+ finally {
+ for (IgfsEvent evt : pendingEvts)
+ evts.record(evt);
+ }
}
finally {
busyLock.leaveBusy();
http://git-wip-us.apache.org/repos/asf/ignite/blob/f1995531/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
index b361d42..e4d39d6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
@@ -1543,9 +1543,6 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
* @throws Exception If failed.
*/
public void testAppend() throws Exception {
- if (dual)
- fail("Test fails in DUAL modes, see https://issues.apache.org/jira/browse/IGNITE-1631");
-
create(igfs, paths(DIR, SUBDIR), null);
assert igfs.exists(SUBDIR);