You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2016/06/16 11:31:08 UTC
[01/13] ignite git commit: IGNITE-3287: IGFS Delete in PRIMARY/DUAL
modes goes through the same path.
Repository: ignite
Updated Branches:
refs/heads/master 9d46a73f2 -> ca546b3d4
IGNITE-3287: IGFS Delete in PRIMARY/DUAL modes goes through the same path.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dc1d0885
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dc1d0885
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dc1d0885
Branch: refs/heads/master
Commit: dc1d08856e803ee8b689b2e5aad25c767e4c2015
Parents: 7761e5f
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Jun 10 11:05:48 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Jun 15 16:24:56 2016 +0300
----------------------------------------------------------------------
.../processors/igfs/IgfsDeleteResult.java | 62 +++++
.../internal/processors/igfs/IgfsImpl.java | 30 +--
.../processors/igfs/IgfsMetaManager.java | 263 +++++++------------
.../internal/processors/igfs/IgfsPathIds.java | 10 +
.../processors/igfs/IgfsAbstractSelfTest.java | 14 +-
.../igfs/IgfsMetaManagerSelfTest.java | 20 +-
6 files changed, 177 insertions(+), 222 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/dc1d0885/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteResult.java
new file mode 100644
index 0000000..ff42762
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteResult.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Result of deletion in IGFS.
+ */
+public class IgfsDeleteResult {
+ /** Success flag. */
+ private final boolean success;
+
+ /** Entry info. */
+ private final IgfsEntryInfo info;
+
+ /**
+ * Constructor.
+ *
+ * @param success Success flag.
+ * @param info Entry info.
+ */
+ public IgfsDeleteResult(boolean success, @Nullable IgfsEntryInfo info) {
+ this.success = success;
+ this.info = info;
+ }
+
+ /**
+ * @return Success flag.
+ */
+ public boolean success() {
+ return success;
+ }
+
+ /**
+ * @return Entry info.
+ */
+ public IgfsEntryInfo info() {
+ return info;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgfsDeleteResult.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/dc1d0885/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index eabec7c..e6f2ff6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -104,7 +104,6 @@ import java.util.concurrent.atomic.AtomicLong;
import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_DELETED;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_READ;
-import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_WRITE;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_DELETED;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_READ;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_WRITE;
@@ -723,38 +722,21 @@ public final class IgfsImpl implements IgfsEx {
if (IgfsPath.SLASH.equals(path.toString()))
return false;
- IgfsMode mode = resolveMode(path);
-
Set<IgfsMode> childrenModes = modeRslvr.resolveChildrenModes(path);
- boolean res = false;
-
- FileDescriptor desc = getFileDescriptor(path);
-
- if (childrenModes.contains(PRIMARY)) {
- if (desc != null) {
- IgniteUuid deletedId = meta.softDelete(path, recursive);
-
- res = deletedId != null;
- }
- else if (mode == PRIMARY)
- checkConflictWithPrimary(path);
- }
-
- if (childrenModes.contains(DUAL_SYNC) || childrenModes.contains(DUAL_ASYNC)) {
- assert secondaryFs != null;
+ boolean dual = childrenModes.contains(DUAL_SYNC) ||childrenModes.contains(DUAL_ASYNC);
+ if (dual)
await(path);
- res |= meta.deleteDual(secondaryFs, path, recursive);
- }
+ IgfsDeleteResult res = meta.softDelete(path, recursive, dual ? secondaryFs : null);
// Record event if needed.
- if (res && desc != null)
+ if (res.success() && res.info() != null)
IgfsUtils.sendEvents(igfsCtx.kernalContext(), path,
- desc.isFile ? EVT_IGFS_FILE_DELETED : EVT_IGFS_DIR_DELETED);
+ res.info().isFile() ? EVT_IGFS_FILE_DELETED : EVT_IGFS_DIR_DELETED);
- return res;
+ return res.success();
}
});
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/dc1d0885/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 04e2139..4cdf6a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -1151,152 +1151,130 @@ public class IgfsMetaManager extends IgfsManager {
*
* @param path Path.
* @param recursive Recursive flag.
+ * @param secondaryFs Secondary file system (optional).
* @return ID of an entry located directly under the trash directory.
* @throws IgniteCheckedException If failed.
*/
- IgniteUuid softDelete(final IgfsPath path, final boolean recursive) throws IgniteCheckedException {
- if (busyLock.enterBusy()) {
- try {
- validTxState(false);
-
- IgfsPathIds pathIds = pathIds(path);
-
- // Continue only if the whole path present.
- if (!pathIds.allExists())
- return null; // A fragment of the path no longer exists.
-
- IgniteUuid victimId = pathIds.lastId();
- String victimName = pathIds.lastPart();
-
- if (IgfsUtils.isRootId(victimId))
- throw new IgfsException("Cannot remove root directory");
-
- // Prepare IDs to lock.
- SortedSet<IgniteUuid> allIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR);
-
- pathIds.addExistingIds(allIds, relaxed);
+ IgfsDeleteResult softDelete(final IgfsPath path, final boolean recursive, @Nullable IgfsSecondaryFileSystem secondaryFs)
+ throws IgniteCheckedException {
+ while (true) {
+ if (busyLock.enterBusy()) {
+ try {
+ validTxState(false);
- IgniteUuid trashId = IgfsUtils.randomTrashId();
+ IgfsPathIds pathIds = pathIds(path);
- allIds.add(trashId);
+ boolean relaxed0 = relaxed;
- try (IgniteInternalTx tx = startTx()) {
- // Lock participants.
- Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(allIds);
+ if (!pathIds.allExists()) {
+ if (secondaryFs == null)
+ // Return early if target path doesn't exist and we do not have secondary file system.
+ return new IgfsDeleteResult(false, null);
+ else
+ // If path is missing partially, we need more serious locking for DUAL mode.
+ relaxed0 = false;
+ }
- // Ensure that all participants are still in place.
- if (!pathIds.verifyIntegrity(lockInfos, relaxed))
- return null;
+ IgniteUuid victimId = pathIds.lastId();
+ String victimName = pathIds.lastPart();
- IgfsEntryInfo victimInfo = lockInfos.get(victimId);
+ if (IgfsUtils.isRootId(victimId))
+ throw new IgfsException("Cannot remove root directory");
- // Cannot delete non-empty directory if recursive flag is not set.
- if (!recursive && victimInfo.hasChildren())
- throw new IgfsDirectoryNotEmptyException("Failed to remove directory (directory is not " +
- "empty and recursive flag is not set).");
+ // Prepare IDs to lock.
+ SortedSet<IgniteUuid> allIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR);
- // Prepare trash data.
- IgfsEntryInfo trashInfo = lockInfos.get(trashId);
+ pathIds.addExistingIds(allIds, relaxed0);
- final String trashName = IgfsUtils.composeNameForTrash(path, victimId);
+ IgniteUuid trashId = IgfsUtils.randomTrashId();
- assert !trashInfo.hasChild(trashName) : "Failed to add file name into the " +
- "destination directory (file already exists) [destName=" + trashName + ']';
+ allIds.add(trashId);
- IgniteUuid parentId = pathIds.lastParentId();
- IgfsEntryInfo parentInfo = lockInfos.get(parentId);
+ try (IgniteInternalTx tx = startTx()) {
+ // Lock participants.
+ Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(allIds);
- transferEntry(parentInfo.listing().get(victimName), parentId, victimName, trashId, trashName);
+ if (!pathIds.allExists()) {
+ // We need to ensure that the last locked info is not linked with expected child.
+ // Otherwise there was some ocncurrent file system update and we have to re-try.
+ assert secondaryFs != null;
- tx.commit();
+ // Find the last locked index
+ IgfsEntryInfo lastLockedInfo = null;
+ int lastLockedIdx = -1;
- signalDeleteWorker();
+ while (lastLockedIdx < pathIds.lastExistingIndex()) {
+ IgfsEntryInfo nextInfo = lockInfos.get(pathIds.id(lastLockedIdx + 1));
- return victimId;
- }
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- else
- throw new IllegalStateException("Failed to perform soft delete because Grid is " +
- "stopping [path=" + path + ']');
- }
+ if (nextInfo != null) {
+ lastLockedInfo = nextInfo;
+ lastLockedIdx++;
+ }
+ else
+ break;
+ }
- /**
- * Move path to the trash directory in existing transaction.
- *
- * @param parentId Parent ID.
- * @param path Path name.
- * @param id Path ID.
- * @param trashId Trash folder ID.
- * @return ID of an entry located directly under the trash directory.
- * @throws IgniteCheckedException If failed.
- */
- @SuppressWarnings("RedundantCast")
- @Nullable private IgniteUuid softDeleteNonTx(@Nullable IgniteUuid parentId, IgfsPath path, IgniteUuid id,
- IgniteUuid trashId) throws IgniteCheckedException {
- validTxState(true);
+ assert lastLockedIdx < pathIds.count();
- IgniteUuid resId;
+ if (lastLockedInfo != null) {
+ String part = pathIds.part(lastLockedIdx + 1);
- if (parentId == null) {
- // Handle special case when we deleting root directory.
- assert IgfsUtils.ROOT_ID.equals(id);
+ if (lastLockedInfo.listing().containsKey(part))
+ continue;
+ }
+ }
- IgfsEntryInfo rootInfo = getInfo(IgfsUtils.ROOT_ID);
+ // Ensure that all participants are still in place.
+ if (!pathIds.allExists() || !pathIds.verifyIntegrity(lockInfos, relaxed0)) {
+ // For DUAL mode we will try to update the underlying FS still. Note we do that inside TX.
+ if (secondaryFs != null) {
+ boolean res = secondaryFs.delete(path, recursive);
- if (rootInfo == null)
- return null; // Root was never created.
+ return new IgfsDeleteResult(res, null);
+ }
+ else
+ return new IgfsDeleteResult(false, null);
+ }
- // Ensure trash directory existence.
- createSystemDirectoryIfAbsent(trashId);
+ IgfsEntryInfo victimInfo = lockInfos.get(victimId);
- Map<String, IgfsListingEntry> rootListing = rootInfo.listing();
+ // Cannot delete non-empty directory if recursive flag is not set.
+ if (!recursive && victimInfo.hasChildren())
+ throw new IgfsDirectoryNotEmptyException("Failed to remove directory (directory is not " +
+ "empty and recursive flag is not set).");
- if (!rootListing.isEmpty()) {
- IgniteUuid[] lockIds = new IgniteUuid[rootInfo.listing().size()];
+ // Prepare trash data.
+ IgfsEntryInfo trashInfo = lockInfos.get(trashId);
- int i = 0;
+ final String trashName = IgfsUtils.composeNameForTrash(path, victimId);
- for (IgfsListingEntry entry : rootInfo.listing().values())
- lockIds[i++] = entry.fileId();
+ assert !trashInfo.hasChild(trashName) : "Failed to add file name into the " +
+ "destination directory (file already exists) [destName=" + trashName + ']';
- // Lock children IDs in correct order.
- lockIds(lockIds);
+ IgniteUuid parentId = pathIds.lastParentId();
+ IgfsEntryInfo parentInfo = lockInfos.get(parentId);
- // Construct new info and move locked entries from root to it.
- Map<String, IgfsListingEntry> transferListing = new HashMap<>(rootListing);
+ // Propagate call to the secondary file system.
+ if (secondaryFs != null && !secondaryFs.delete(path, recursive))
+ return new IgfsDeleteResult(false, null);
- IgfsEntryInfo newInfo = IgfsUtils.createDirectory(
- IgniteUuid.randomUuid(),
- transferListing,
- (Map<String,String>)null
- );
+ transferEntry(parentInfo.listing().get(victimName), parentId, victimName, trashId, trashName);
- createNewEntry(newInfo, trashId, newInfo.id().toString());
+ tx.commit();
- // Remove listing entries from root.
- for (Map.Entry<String, IgfsListingEntry> entry : transferListing.entrySet())
- id2InfoPrj.invoke(IgfsUtils.ROOT_ID,
- new IgfsMetaDirectoryListingRemoveProcessor(entry.getKey(), entry.getValue().fileId()));
+ signalDeleteWorker();
- resId = newInfo.id();
+ return new IgfsDeleteResult(true, victimInfo);
+ }
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
}
else
- resId = null;
+ throw new IllegalStateException("Failed to perform soft delete because Grid is " +
+ "stopping [path=" + path + ']');
}
- else {
- // Ensure trash directory existence.
- createSystemDirectoryIfAbsent(trashId);
-
- moveNonTx(id, path.name(), parentId, IgfsUtils.composeNameForTrash(path, id), trashId);
-
- resId = id;
- }
-
- return resId;
}
/**
@@ -2413,71 +2391,6 @@ public class IgfsMetaManager extends IgfsManager {
}
/**
- * Delete path in DUAL mode.
- *
- * @param fs Secondary file system.
- * @param path Path to update.
- * @param recursive Recursive flag.
- * @return Operation result.
- * @throws IgniteCheckedException If delete failed.
- */
- public boolean deleteDual(final IgfsSecondaryFileSystem fs, final IgfsPath path, final boolean recursive)
- throws IgniteCheckedException {
- if (busyLock.enterBusy()) {
- try {
- assert fs != null;
- assert path != null;
-
- final IgniteUuid trashId = IgfsUtils.randomTrashId();
-
- SynchronizationTask<Boolean> task = new SynchronizationTask<Boolean>() {
- @Override public Boolean onSuccess(Map<IgfsPath, IgfsEntryInfo> infos) throws Exception {
- IgfsEntryInfo info = infos.get(path);
-
- if (info == null)
- return false; // File doesn't exist in the secondary file system.
-
- if (!fs.delete(path, recursive))
- return false; // Delete failed remotely.
-
- if (path.parent() != null) {
- assert infos.containsKey(path.parent());
-
- softDeleteNonTx(infos.get(path.parent()).id(), path, info.id(), trashId);
- }
- else {
- assert IgfsUtils.ROOT_ID.equals(info.id());
-
- softDeleteNonTx(null, path, info.id(), trashId);
- }
-
- return true; // No additional handling is required.
- }
-
- @Override public Boolean onFailure(@Nullable Exception err) throws IgniteCheckedException {
- U.error(log, "Path delete in DUAL mode failed [path=" + path + ", recursive=" + recursive + ']',
- err);
-
- throw new IgniteCheckedException("Failed to delete the path due to secondary file system " +
- "exception: ", err);
- }
- };
-
- Boolean res = synchronizeAndExecute(task, fs, false, Collections.singleton(trashId), path);
-
- signalDeleteWorker();
-
- return res;
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- else
- throw new IllegalStateException("Failed to delete in DUAL mode because Grid is stopping: " + path);
- }
-
- /**
* Update path in DUAL mode.
*
* @param fs Secondary file system.
http://git-wip-us.apache.org/repos/asf/ignite/blob/dc1d0885/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java
index 446495e..b710ba2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java
@@ -256,6 +256,16 @@ public class IgfsPathIds {
}
/**
+ * Get ID at the give index.
+ *
+ * @param idx Index.
+ * @return ID.
+ */
+ public IgniteUuid id(int idx) {
+ return idx <= lastExistingIdx ? ids[idx] : surrogateId(idx);
+ }
+
+ /**
* Get surrogate ID at the given index.
*
* @param idx Index.
http://git-wip-us.apache.org/repos/asf/ignite/blob/dc1d0885/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
index a0bef0f..c1c6c6c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
@@ -2908,11 +2908,8 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
* @throws IgniteCheckedException If failed.
*/
protected static void checkExist(IgfsImpl igfs, IgfsPath... paths) throws IgniteCheckedException {
- for (IgfsPath path : paths) {
- assert igfs.context().meta().fileId(path) != null : "Path doesn't exist [igfs=" + igfs.name() +
- ", path=" + path + ']';
+ for (IgfsPath path : paths)
assert igfs.exists(path) : "Path doesn't exist [igfs=" + igfs.name() + ", path=" + path + ']';
- }
}
/**
@@ -2961,11 +2958,8 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
* @throws Exception If failed.
*/
protected void checkNotExist(IgfsImpl igfs, IgfsPath... paths) throws Exception {
- for (IgfsPath path : paths) {
- assert igfs.context().meta().fileId(path) == null : "Path exists [igfs=" + igfs.name() + ", path=" +
- path + ']';
+ for (IgfsPath path : paths)
assert !igfs.exists(path) : "Path exists [igfs=" + igfs.name() + ", path=" + path + ']';
- }
}
/**
@@ -2977,10 +2971,10 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
*/
protected void checkNotExist(UniversalFileSystemAdapter uni, IgfsPath... paths) throws Exception {
IgfsEx ex = uni.unwrap(IgfsEx.class);
+
for (IgfsPath path : paths) {
if (ex != null)
- assert ex.context().meta().fileId(path) == null : "Path exists [igfs=" + ex.name() + ", path=" +
- path + ']';
+ assert !ex.exists(path) : "Path exists [igfs=" + ex.name() + ", path=" + path + ']';
assert !uni.exists(path.toString()) : "Path exists [igfs=" + uni.name() + ", path=" + path + ']';
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/dc1d0885/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
index 039bf8d..8b88157 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
@@ -191,8 +191,8 @@ public class IgfsMetaManagerSelfTest extends IgfsCommonAbstractTest {
assertNull("Unexpected stored properties: " + info, info.properties().get(key2));
}
- mgr.softDelete(new IgfsPath("/dir"), true);
- mgr.softDelete(new IgfsPath("/file"), false);
+ mgr.softDelete(new IgfsPath("/dir"), true, null);
+ mgr.softDelete(new IgfsPath("/file"), false, null);
assertNull(mgr.updateProperties(dir.id(), F.asMap("p", "7")));
}
@@ -328,9 +328,7 @@ public class IgfsMetaManagerSelfTest extends IgfsCommonAbstractTest {
mgr.move(path("/a2"), path("/a"));
- IgniteUuid del = mgr.softDelete(path("/a/b/f3"), false);
-
- assertEquals(f3.id(), del);
+ mgr.softDelete(path("/a/b/f3"), false, null);
assertEquals(F.asMap("a", new IgfsListingEntry(a), "f1", new IgfsListingEntry(f1)),
mgr.directoryListing(ROOT_ID));
@@ -341,8 +339,7 @@ public class IgfsMetaManagerSelfTest extends IgfsCommonAbstractTest {
assertEmpty(mgr.directoryListing(b.id()));
//assertEquals(b, mgr.removeIfEmpty(a.id(), "b", b.id(), new IgfsPath("/a/b"), true));
- del = mgr.softDelete(path("/a/b"), false);
- assertEquals(b.id(), del);
+ mgr.softDelete(path("/a/b"), false, null);
assertEquals(F.asMap("a", new IgfsListingEntry(a), "f1", new IgfsListingEntry(f1)),
mgr.directoryListing(ROOT_ID));
@@ -351,8 +348,7 @@ public class IgfsMetaManagerSelfTest extends IgfsCommonAbstractTest {
assertEmpty(mgr.directoryListing(b.id()));
- del = mgr.softDelete(path("/a/f2"), false);
- assertEquals(f2.id(), del);
+ mgr.softDelete(path("/a/f2"), false, null);
assertEquals(F.asMap("a", new IgfsListingEntry(a), "f1", new IgfsListingEntry(f1)),
mgr.directoryListing(ROOT_ID));
@@ -360,16 +356,14 @@ public class IgfsMetaManagerSelfTest extends IgfsCommonAbstractTest {
assertEmpty(mgr.directoryListing(a.id()));
assertEmpty(mgr.directoryListing(b.id()));
- del = mgr.softDelete(path("/f1"), false);
- assertEquals(f1.id(), del);
+ mgr.softDelete(path("/f1"), false, null);
assertEquals(F.asMap("a", new IgfsListingEntry(a)), mgr.directoryListing(ROOT_ID));
assertEmpty(mgr.directoryListing(a.id()));
assertEmpty(mgr.directoryListing(b.id()));
- del = mgr.softDelete(path("/a"), false);
- assertEquals(a.id(), del);
+ mgr.softDelete(path("/a"), false, null);
assertEmpty(mgr.directoryListing(ROOT_ID));
assertEmpty(mgr.directoryListing(a.id()));
[06/13] ignite git commit: IGNITE-3312: IGFS: Fixed a bug introduced
during create() routine reworking.
Posted by av...@apache.org.
IGNITE-3312: IGFS: Fixed a bug introduced during create() routine reworking.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b46422ad
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b46422ad
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b46422ad
Branch: refs/heads/master
Commit: b46422ad64f531900263fb36f6ff28373e4346f1
Parents: e733e91
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Jun 15 16:02:53 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Jun 15 16:37:44 2016 +0300
----------------------------------------------------------------------
.../processors/igfs/IgfsMetaManager.java | 39 +++++++++++++++-----
1 file changed, 30 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b46422ad/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index fd0b07e..1640918 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -2896,7 +2896,7 @@ public class IgfsMetaManager extends IgfsManager {
else {
// Create file and parent folders.
IgfsPathsCreateResult res = createFile(pathIds, lockInfos, dirProps, fileProps, blockSize,
- affKey, evictExclude, null);
+ affKey, evictExclude, null, null);
if (res == null)
continue;
@@ -3060,11 +3060,21 @@ public class IgfsMetaManager extends IgfsManager {
}
else {
// Create file and parent folders.
+ T1<OutputStream> secondaryOutHolder = null;
+
if (secondaryCtx != null)
- secondaryOut = secondaryCtx.create();
+ secondaryOutHolder = new T1<>();
- IgfsPathsCreateResult res = createFile(pathIds, lockInfos, dirProps, fileProps, blockSize,
- affKey, evictExclude, secondaryCtx);
+ IgfsPathsCreateResult res;
+
+ try {
+ res = createFile(pathIds, lockInfos, dirProps, fileProps, blockSize,
+ affKey, evictExclude, secondaryCtx, secondaryOutHolder);
+ }
+ finally {
+ if (secondaryOutHolder != null)
+ secondaryOut = secondaryOutHolder.get();
+ }
if (res == null)
continue;
@@ -3114,7 +3124,7 @@ public class IgfsMetaManager extends IgfsManager {
throw new IgfsParentNotDirectoryException("Failed to create directory (parent " +
"element is not a directory)");
- return createFileOrDirectory(true, pathIds, lockInfos, dirProps, null, 0, null, false, null);
+ return createFileOrDirectory(true, pathIds, lockInfos, dirProps, null, 0, null, false, null, null);
}
/**
@@ -3128,12 +3138,14 @@ public class IgfsMetaManager extends IgfsManager {
* @param affKey Affinity key (optional)
* @param evictExclude Evict exclude flag.
* @param secondaryCtx Secondary file system create context.
+ * @param secondaryOutHolder Holder for the secondary output stream.
* @return Result or {@code} if the first parent already contained child with the same name.
* @throws IgniteCheckedException If failed.
*/
@Nullable private IgfsPathsCreateResult createFile(IgfsPathIds pathIds, Map<IgniteUuid, IgfsEntryInfo> lockInfos,
Map<String, String> dirProps, Map<String, String> fileProps, int blockSize, @Nullable IgniteUuid affKey,
- boolean evictExclude, @Nullable IgfsSecondaryFileSystemCreateContext secondaryCtx)
+ boolean evictExclude, @Nullable IgfsSecondaryFileSystemCreateContext secondaryCtx,
+ @Nullable T1<OutputStream> secondaryOutHolder)
throws IgniteCheckedException{
// Check if entry we are going to write to is directory.
if (lockInfos.get(pathIds.lastExistingId()).isFile())
@@ -3141,7 +3153,7 @@ public class IgfsMetaManager extends IgfsManager {
"(parent element is not a directory): " + pathIds.path());
return createFileOrDirectory(false, pathIds, lockInfos, dirProps, fileProps, blockSize, affKey, evictExclude,
- secondaryCtx);
+ secondaryCtx, secondaryOutHolder);
}
/**
@@ -3156,6 +3168,7 @@ public class IgfsMetaManager extends IgfsManager {
* @param affKey Affinity key.
* @param evictExclude Evict exclude flag.
* @param secondaryCtx Secondary file system create context.
+ * @param secondaryOutHolder Secondary output stream holder.
* @return Result.
* @throws IgniteCheckedException If failed.
*/
@@ -3163,7 +3176,8 @@ public class IgfsMetaManager extends IgfsManager {
private IgfsPathsCreateResult createFileOrDirectory(boolean dir, IgfsPathIds pathIds,
Map<IgniteUuid, IgfsEntryInfo> lockInfos, Map<String, String> dirProps, Map<String, String> fileProps,
int blockSize, @Nullable IgniteUuid affKey, boolean evictExclude,
- @Nullable IgfsSecondaryFileSystemCreateContext secondaryCtx) throws IgniteCheckedException {
+ @Nullable IgfsSecondaryFileSystemCreateContext secondaryCtx, @Nullable T1<OutputStream> secondaryOutHolder)
+ throws IgniteCheckedException {
// This is our starting point.
int lastExistingIdx = pathIds.lastExistingIndex();
IgfsEntryInfo lastExistingInfo = lockInfos.get(pathIds.lastExistingId());
@@ -3179,6 +3193,13 @@ public class IgfsMetaManager extends IgfsManager {
if (lastExistingInfo.hasChild(curPart))
return null;
+ // Create entry in the secondary file system if needed.
+ if (secondaryCtx != null) {
+ assert secondaryOutHolder != null;
+
+ secondaryOutHolder.set(secondaryCtx.create());
+ }
+
Map<IgniteUuid, EntryProcessor> procMap = new HashMap<>();
// First step: add new entry to the last existing element.
@@ -3210,7 +3231,7 @@ public class IgfsMetaManager extends IgfsManager {
if (secondaryInfo == null)
throw new IgfsException("Failed to perform operation because secondary file system path was " +
- "modified concurrnetly: " + lastCreatedPath);
+ "modified concurrently: " + lastCreatedPath);
else if (secondaryInfo.isFile())
throw new IgfsException("Failed to perform operation because secondary file system entity is " +
"not directory: " + lastCreatedPath);
[07/13] ignite git commit: IGNITE-2767: Cont. query remote filter
requires to be in client nodes class path. Reviewed and merged by Denis
Magda.
Posted by av...@apache.org.
IGNITE-2767: Cont. query remote filter requires to be in client nodes class path.
Reviewed and merged by Denis Magda.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b8411955
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b8411955
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b8411955
Branch: refs/heads/master
Commit: b8411955e5169b38430e077c5b4ed85d8e95b83c
Parents: b46422a
Author: Vladislav Pyatkov <vl...@gmail.com>
Authored: Wed Jun 15 17:18:27 2016 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Wed Jun 15 17:25:26 2016 +0300
----------------------------------------------------------------------
.../spi/discovery/tcp/TcpDiscoverySpi.java | 7 +-
...yRemoteFilterMissingInClassPathSelfTest.java | 237 +++++++++++++++++++
.../IgniteCacheQuerySelfTestSuite3.java | 2 +
3 files changed, 245 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b8411955/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index c135b83..3d6df89 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -51,6 +51,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.AddressResolver;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridComponent;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
@@ -1716,7 +1717,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
data0.put(entry.getKey(), compData);
}
catch (IgniteCheckedException e) {
- U.error(log, "Failed to unmarshal discovery data for component: " + entry.getKey(), e);
+ if (GridComponent.DiscoveryDataExchangeType.CONTINUOUS_PROC.ordinal() == entry.getKey() &&
+ X.hasCause(e, ClassNotFoundException.class) && locNode.isClient())
+ U.warn(log, "Failed to unmarshal continuous query remote filter on client node. Can be ignored.");
+ else
+ U.error(log, "Failed to unmarshal discovery data for component: " + entry.getKey(), e);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b8411955/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryRemoteFilterMissingInClassPathSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryRemoteFilterMissingInClassPathSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryRemoteFilterMissingInClassPathSelfTest.java
new file mode 100644
index 0000000..37f4e01
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryRemoteFilterMissingInClassPathSelfTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.testframework.GridStringLogger;
+import org.apache.ignite.testframework.config.GridTestProperties;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+/**
+ *
+ */
+public class ContinuousQueryRemoteFilterMissingInClassPathSelfTest extends GridCommonAbstractTest {
+ /** URL of classes. */
+ private static final URL[] URLS;
+
+ static {
+ try {
+ URLS = new URL[] {new URL(GridTestProperties.getProperty("p2p.uri.cls"))};
+ }
+ catch (MalformedURLException e) {
+ throw new RuntimeException("Define property p2p.uri.cls", e);
+ }
+ }
+
+ /** */
+ private GridStringLogger log;
+
+ /** */
+ private boolean clientMode;
+
+ /** */
+ private boolean setExternalLoader;
+
+ /** */
+ private ClassLoader ldr;
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(name);
+
+ cfg.setClientMode(clientMode);
+
+ CacheConfiguration cacheCfg = new CacheConfiguration();
+
+ cacheCfg.setName("simple");
+
+ cacheCfg.setCacheMode(CacheMode.PARTITIONED);
+
+ cfg.setCacheConfiguration(cacheCfg);
+
+ if (setExternalLoader)
+ cfg.setClassLoader(ldr);
+ else
+ cfg.setGridLogger(log);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If fail.
+ */
+ public void testWarningMessageOnClientNode() throws Exception {
+ ldr = new URLClassLoader(URLS, getClass().getClassLoader());
+
+ clientMode = false;
+ setExternalLoader = true;
+ final Ignite ignite0 = startGrid(1);
+
+ executeContiniouseQuery(ignite0.cache("simple"));
+
+ log = new GridStringLogger();
+ clientMode = true;
+ setExternalLoader = false;
+
+ startGrid(2);
+
+ assertTrue(log.toString().contains("Failed to unmarshal continuous query remote filter on client node. " +
+ "Can be ignored."));
+ }
+
+ /**
+ * @throws Exception If fail.
+ */
+ public void testNoWarningMessageOnClientNode() throws Exception {
+ ldr = new URLClassLoader(URLS, getClass().getClassLoader());
+
+ setExternalLoader = true;
+
+ clientMode = false;
+ final Ignite ignite0 = startGrid(1);
+
+ executeContiniouseQuery(ignite0.cache("simple"));
+
+ log = new GridStringLogger();
+ clientMode = true;
+
+ startGrid(2);
+
+ assertTrue(!log.toString().contains("Failed to unmarshal continuous query remote filter on client node. " +
+ "Can be ignored."));
+ }
+
+ /**
+ * @throws Exception If fail.
+ */
+ public void testExceptionOnServerNode() throws Exception {
+ ldr = new URLClassLoader(URLS, getClass().getClassLoader());
+
+ clientMode = false;
+
+ setExternalLoader = true;
+ final Ignite ignite0 = startGrid(1);
+
+ executeContiniouseQuery(ignite0.cache("simple"));
+
+ log = new GridStringLogger();
+ setExternalLoader = false;
+
+ startGrid(2);
+
+ assertTrue(log.toString().contains("class org.apache.ignite.IgniteCheckedException: " +
+ "Failed to find class with given class loader for unmarshalling"));
+ }
+
+ /**
+ * @throws Exception If fail.
+ */
+ public void testNoExceptionOnServerNode() throws Exception {
+ ldr = new URLClassLoader(URLS, getClass().getClassLoader());
+
+ clientMode = false;
+
+ setExternalLoader = true;
+ final Ignite ignite0 = startGrid(1);
+
+ executeContiniouseQuery(ignite0.cache("simple"));
+
+ log = new GridStringLogger();
+
+ startGrid(2);
+
+ assertTrue(!log.toString().contains("class org.apache.ignite.IgniteCheckedException: " +
+ "Failed to find class with given class loader for unmarshalling"));
+ }
+
+ /**
+ * @param cache Ignite cache.
+ * @throws Exception If fail.
+ */
+ private void executeContiniouseQuery(IgniteCache cache) throws Exception {
+ ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(
+ new CacheEntryUpdatedListener<Integer, String>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> events)
+ throws CacheEntryListenerException {
+ for (CacheEntryEvent<? extends Integer, ? extends String> event : events)
+ System.out.println("Key = " + event.getKey() + ", Value = " + event.getValue());
+ }
+ }
+ );
+
+ final Class<CacheEntryEventSerializableFilter> remoteFilterClass = (Class<CacheEntryEventSerializableFilter>)
+ ldr.loadClass("org.apache.ignite.tests.p2p.CacheDeploymentCacheEntryEventSerializableFilter");
+
+ qry.setRemoteFilterFactory(new ClassFilterFactory(remoteFilterClass));
+
+ cache.query(qry);
+
+ for (int i = 0; i < 100; i++)
+ cache.put(i, "Message " + i);
+ }
+
+ /**
+ *
+ */
+ private static class ClassFilterFactory implements Factory<CacheEntryEventFilter<Integer, String>> {
+ /** */
+ private Class<CacheEntryEventSerializableFilter> cls;
+
+ /**
+ * @param cls Class.
+ */
+ public ClassFilterFactory(Class<CacheEntryEventSerializableFilter> cls) {
+ this.cls = cls;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheEntryEventSerializableFilter<Integer, String> create() {
+ try {
+ return cls.newInstance();
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b8411955/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
index 94b9dce..f7821a8 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBin
import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationStoreEnabledTest;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationSwapEnabledTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.ContinuousQueryRemoteFilterMissingInClassPathSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicOffheapTieredTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicOffheapValuesTest;
@@ -109,6 +110,7 @@ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite {
suite.addTestSuite(CacheContinuousBatchForceServerModeAckTest.class);
suite.addTestSuite(CacheContinuousQueryExecuteInPrimaryTest.class);
suite.addTestSuite(CacheContinuousQueryLostPartitionTest.class);
+ suite.addTestSuite(ContinuousQueryRemoteFilterMissingInClassPathSelfTest.class);
suite.addTestSuite(IgniteCacheContinuousQueryImmutableEntryTest.class);
suite.addTestSuite(CacheKeepBinaryIterationTest.class);
suite.addTestSuite(CacheKeepBinaryIterationStoreEnabledTest.class);
[09/13] ignite git commit: IGNITE-3151: Using IgniteCountDownLatch
sometimes drives to dead lock. Reviewed and merged by Denis Magda.
Posted by av...@apache.org.
IGNITE-3151: Using IgniteCountDownLatch sometimes drives to dead lock.
Reviewed and merged by Denis Magda.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/19aeec39
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/19aeec39
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/19aeec39
Branch: refs/heads/master
Commit: 19aeec39fe84f141a97ba1b0f4e6b48d2bb21f27
Parents: a672e57
Author: Vladislav Pyatkov <vl...@gmail.com>
Authored: Wed Jun 15 18:43:22 2016 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Wed Jun 15 18:50:07 2016 +0300
----------------------------------------------------------------------
.../GridCacheCountDownLatchImpl.java | 54 ++++++-
.../IgniteCountDownLatchAbstractSelfTest.java | 156 ++++++++++++++++++-
2 files changed, 202 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/19aeec39/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
index 41cc548..723fb55 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
@@ -26,7 +26,7 @@ import java.io.ObjectStreamException;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
@@ -50,6 +50,15 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
/** */
private static final long serialVersionUID = 0L;
+ /** Internal latch is in unitialized state. */
+ private static final int UNINITIALIZED_LATCH_STATE = 0;
+
+ /** Internal latch is being created. */
+ private static final int CREATING_LATCH_STATE = 1;
+
+ /** Internal latch is ready for the usage. */
+ private static final int READY_LATCH_STATE = 2;
+
/** Deserialization stash. */
private static final ThreadLocal<IgniteBiTuple<GridKernalContext, String>> stash =
new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() {
@@ -83,14 +92,17 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
private boolean autoDel;
/** Internal latch (transient). */
- private volatile CountDownLatch internalLatch;
+ private CountDownLatch internalLatch;
/** Initialization guard. */
- private final AtomicBoolean initGuard = new AtomicBoolean();
+ private final AtomicInteger initGuard = new AtomicInteger();
/** Initialization latch. */
private final CountDownLatch initLatch = new CountDownLatch(1);
+ /** Latest latch value that is used at the stage while the internal latch is being initialized. */
+ private Integer lastLatchVal = null;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -236,15 +248,36 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
@Override public void onUpdate(int cnt) {
assert cnt >= 0;
- while (internalLatch != null && internalLatch.getCount() > cnt)
- internalLatch.countDown();
+ CountDownLatch latch0;
+
+ synchronized (initGuard) {
+ int state = initGuard.get();
+
+ if (state != READY_LATCH_STATE) {
+ /** Internal latch is not fully initialized yet. Remember latest latch value. */
+ lastLatchVal = cnt;
+
+ return;
+ }
+
+ /** 'synchronized' statement guarantees visibility of internalLatch. No need to make it volatile. */
+ latch0 = internalLatch;
+ }
+
+ /** Internal latch is fully initialized and ready for the usage. */
+
+ assert latch0 != null;
+
+ while (latch0.getCount() > cnt)
+ latch0.countDown();
+
}
/**
* @throws IgniteCheckedException If operation failed.
*/
private void initializeLatch() throws IgniteCheckedException {
- if (initGuard.compareAndSet(false, true)) {
+ if (initGuard.compareAndSet(UNINITIALIZED_LATCH_STATE, CREATING_LATCH_STATE)) {
try {
internalLatch = CU.outTx(
retryTopologySafe(new Callable<CountDownLatch>() {
@@ -268,6 +301,15 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
ctx
);
+ synchronized (initGuard) {
+ if (lastLatchVal != null) {
+ while (internalLatch.getCount() > lastLatchVal)
+ internalLatch.countDown();
+ }
+
+ initGuard.set(READY_LATCH_STATE);
+ }
+
if (log.isDebugEnabled())
log.debug("Initialized internal latch: " + internalLatch);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/19aeec39/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java
index 2f6f6f4..f6d0287 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java
@@ -22,21 +22,26 @@ import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteCountDownLatch;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.testframework.GridTestUtils;
@@ -144,7 +149,6 @@ public abstract class IgniteCountDownLatchAbstractSelfTest extends IgniteAtomics
/**
* @param latch Latch.
- *
* @throws Exception If failed.
*/
protected void checkRemovedLatch(final IgniteCountDownLatch latch) throws Exception {
@@ -236,8 +240,8 @@ public abstract class IgniteCountDownLatchAbstractSelfTest extends IgniteAtomics
* @param latchName Latch name.
* @param cnt Count.
* @param autoDel Auto delete flag.
- * @throws Exception If failed.
* @return New latch.
+ * @throws Exception If failed.
*/
private IgniteCountDownLatch createLatch(String latchName, int cnt, boolean autoDel)
throws Exception {
@@ -334,6 +338,58 @@ public abstract class IgniteCountDownLatchAbstractSelfTest extends IgniteAtomics
/**
* @throws Exception If failed.
*/
+ public void testLatchBroadcast() throws Exception {
+ Ignite ignite = grid(0);
+ ClusterGroup srvsGrp = ignite.cluster().forServers();
+
+ int numOfSrvs = srvsGrp.nodes().size();
+
+ ignite.destroyCache("testCache");
+ IgniteCache<Object, Object> cache = ignite.createCache("testCache");
+
+ for (ClusterNode node : srvsGrp.nodes())
+ cache.put(String.valueOf(node.id()), 0);
+
+ for (int i = 0; i < 500; i++) {
+ IgniteCountDownLatch latch1 = createLatch1(ignite, numOfSrvs);
+ IgniteCountDownLatch latch2 = createLatch2(ignite, numOfSrvs);
+
+ ignite.compute(srvsGrp).broadcast(new IgniteRunnableJob(latch1, latch2, i));
+ assertTrue(latch2.await(10000));
+ }
+ }
+
+ /**
+ * @param client Ignite client.
+ * @param numOfSrvs Number of server nodes.
+ * @return Ignite latch.
+ */
+ private IgniteCountDownLatch createLatch1(Ignite client, int numOfSrvs) {
+ return client.countDownLatch(
+ "testName1", // Latch name.
+ numOfSrvs, // Initial count.
+ true, // Auto remove, when counter has reached zero.
+ true // Create if it does not exist.
+ );
+ }
+
+ /**
+ * @param client Ignite client.
+ * @param numOfSrvs Number of server nodes.
+ * @return Ignite latch.
+ */
+ private IgniteCountDownLatch createLatch2(Ignite client, int numOfSrvs) {
+ return client.countDownLatch(
+ "testName2", // Latch name.
+ numOfSrvs, // Initial count.
+ true, // Auto remove, when counter has reached zero.
+ true // Create if it does not exist.
+ );
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testLatchMultinode2() throws Exception {
if (gridCount() == 1)
return;
@@ -391,4 +447,100 @@ public abstract class IgniteCountDownLatchAbstractSelfTest extends IgniteAtomics
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
// No-op.
}
+
+ /**
+ * Ignite job
+ */
+ public class IgniteRunnableJob implements IgniteRunnable {
+
+ /**
+ * Ignite.
+ */
+ @IgniteInstanceResource
+ Ignite igniteInstance;
+
+ /**
+ * Number of iteration.
+ */
+ protected final int iteration;
+
+ /**
+ * Ignite latch 1.
+ */
+ private final IgniteCountDownLatch latch1;
+
+ /**
+ * Ignite latch 2.
+ */
+ private final IgniteCountDownLatch latch2;
+
+ /**
+ * @param latch1 Ignite latch 1.
+ * @param latch2 Ignite latch 2.
+ * @param iteration Number of iteration.
+ */
+ public IgniteRunnableJob(IgniteCountDownLatch latch1, IgniteCountDownLatch latch2, int iteration) {
+ this.iteration = iteration;
+ this.latch1 = latch1;
+ this.latch2 = latch2;
+ }
+
+ /**
+ * @return Ignite latch.
+ */
+ IgniteCountDownLatch createLatch1() {
+ return latch1;
+ }
+
+ /**
+ * @return Ignite latch.
+ */
+ IgniteCountDownLatch createLatch2() {
+ return latch2;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void run() {
+
+ IgniteCountDownLatch latch1 = createLatch1();
+ IgniteCountDownLatch latch2 = createLatch2();
+
+ IgniteCache<Object, Object> cache = igniteInstance.cache("testCache");
+
+ for (ClusterNode node : igniteInstance.cluster().forServers().nodes()) {
+ Integer val = (Integer)cache.get(String.valueOf(node.id()));
+ assertEquals(val, (Integer)iteration);
+ }
+
+ latch1.countDown();
+
+ assertTrue(latch1.await(10000));
+
+ cache.put(getUID(), (iteration + 1));
+
+ latch2.countDown();
+
+ }
+
+ /**
+ * @return Node UUID as string.
+ */
+ String getUID() {
+ String id = "";
+ Collection<ClusterNode> nodes = igniteInstance.cluster().forLocal().nodes();
+ for (ClusterNode node : nodes) {
+ if (node.isLocal())
+ id = String.valueOf(node.id());
+ }
+ return id;
+ }
+
+ /**
+ * @return Ignite.
+ */
+ public Ignite igniteInstance() {
+ return igniteInstance;
+ }
+ }
}
[12/13] ignite git commit: IGNITE-3153 TcpDiscoveryZookeeperIpFinder
doesn't properly handle client reconnections
Posted by av...@apache.org.
IGNITE-3153 TcpDiscoveryZookeeperIpFinder doesn't properly handle client reconnections
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e05c012f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e05c012f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e05c012f
Branch: refs/heads/master
Commit: e05c012f293dcad2cd3a184e6d257a0cb2af509a
Parents: 40d41c1
Author: Anton Vinogradov <av...@apache.org>
Authored: Thu Jun 16 10:49:48 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Thu Jun 16 14:27:22 2016 +0300
----------------------------------------------------------------------
.../zk/TcpDiscoveryZookeeperIpFinder.java | 65 +++++++++++---------
.../tcp/ipfinder/zk/ZookeeperIpFinderTest.java | 16 ++---
2 files changed, 42 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e05c012f/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java
index bee4dab..238987b 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java
@@ -39,6 +39,7 @@ import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinderAdapter;
@@ -68,11 +69,8 @@ import org.codehaus.jackson.map.annotate.JsonRootName;
*
* @see <a href="http://zookeeper.apache.org">Apache ZooKeeper</a>
* @see <a href="http://curator.apache.org">Apache Curator</a>
- *
- * @author Raul Kripalani
*/
public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
-
/** System property name to provide the ZK Connection String. */
public static final String PROP_ZK_CONNECTION_STRING = "IGNITE_ZK_CONNECTION_STRING";
@@ -89,6 +87,10 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
@GridToStringExclude
private final AtomicBoolean initGuard = new AtomicBoolean();
+ /** Init guard. */
+ @GridToStringExclude
+ private final AtomicBoolean closeGuard = new AtomicBoolean();
+
/** Logger. */
@LoggerResource
private IgniteLogger log;
@@ -140,9 +142,11 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
curator = CuratorFrameworkFactory.newClient(zkConnectionString, retryPolicy);
}
- if (curator.getState() != CuratorFrameworkState.STARTED)
+ if (curator.getState() == CuratorFrameworkState.LATENT)
curator.start();
+ A.ensure(curator.getState() == CuratorFrameworkState.STARTED, "CuratorFramework can't be started.");
+
discovery = ServiceDiscoveryBuilder.builder(IgniteInstanceDetails.class)
.client(curator)
.basePath(basePath)
@@ -152,8 +156,11 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
/** {@inheritDoc} */
@Override public void onSpiContextDestroyed() {
- if (!initGuard.compareAndSet(true, false))
+ if (!closeGuard.compareAndSet(false, true)) {
+ U.warn(log, "ZooKeeper IP Finder can't be closed more than once.");
+
return;
+ }
log.info("Destroying ZooKeeper IP Finder.");
@@ -175,7 +182,8 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
try {
serviceInstances = discovery.queryForInstances(serviceName);
- } catch (Exception e) {
+ }
+ catch (Exception e) {
log.warning("Error while getting registered addresses from ZooKeeper IP Finder.", e);
return Collections.emptyList();
}
@@ -214,17 +222,18 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
try {
ServiceInstance<IgniteInstanceDetails> si = ServiceInstance.<IgniteInstanceDetails>builder()
- .name(serviceName)
- .uriSpec(URI_SPEC)
- .address(addr.getAddress().getHostAddress())
- .port(addr.getPort())
- .build();
+ .name(serviceName)
+ .uriSpec(URI_SPEC)
+ .address(addr.getAddress().getHostAddress())
+ .port(addr.getPort())
+ .build();
ourInstances.put(addr, si);
discovery.registerService(si);
- } catch (Exception e) {
+ }
+ catch (Exception e) {
log.warning(String.format("Error while registering an address from ZooKeeper IP Finder " +
"[message=%s,addresses=%s]", e.getMessage(), addr), e);
}
@@ -245,13 +254,14 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
ServiceInstance<IgniteInstanceDetails> si = ourInstances.get(addr);
if (si == null) {
log.warning("Asked to unregister address from ZooKeeper IP Finder, but no match was found in local " +
- "instance map for: " + addrs);
+ "instance map for: " + addrs);
continue;
}
try {
discovery.unregisterService(si);
- } catch (Exception e) {
+ }
+ catch (Exception e) {
log.warning("Error while unregistering an address from ZooKeeper IP Finder: " + addr, e);
}
}
@@ -272,7 +282,8 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
}
/**
- * @param zkConnectionString ZooKeeper connection string in case a {@link CuratorFramework} is not being set explicitly.
+ * @param zkConnectionString ZooKeeper connection string in case a {@link CuratorFramework} is not being set
+ * explicitly.
*/
public void setZkConnectionString(String zkConnectionString) {
this.zkConnectionString = zkConnectionString;
@@ -286,8 +297,8 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
}
/**
- * @param retryPolicy {@link RetryPolicy} to use in case a ZK Connection String is being injected, or if
- * using a system property.
+ * @param retryPolicy {@link RetryPolicy} to use in case a ZK Connection String is being injected, or if using a
+ * system property.
*/
public void setRetryPolicy(RetryPolicy retryPolicy) {
this.retryPolicy = retryPolicy;
@@ -315,9 +326,8 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
}
/**
- * @param serviceName Service name to use, as defined by Curator's {#link ServiceDiscovery} recipe. In physical
- * ZK terms, it represents the node under {@link #basePath}, under which services will be
- * registered.
+ * @param serviceName Service name to use, as defined by Curator's {#link ServiceDiscovery} recipe. In physical ZK
+ * terms, it represents the node under {@link #basePath}, under which services will be registered.
*/
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
@@ -331,20 +341,19 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
}
/**
- * @param allowDuplicateRegistrations Whether to register each node only once, or if duplicate registrations
- * are allowed. Nodes will attempt to register themselves, plus those they
- * know about. By default, duplicate registrations are not allowed, but you
- * might want to set this property to <tt>true</tt> if you have multiple
- * network interfaces or if you are facing troubles.
+ * @param allowDuplicateRegistrations Whether to register each node only once, or if duplicate registrations are
+ * allowed. Nodes will attempt to register themselves, plus those they know about. By default, duplicate
+ * registrations are not allowed, but you might want to set this property to <tt>true</tt> if you have multiple
+ * network interfaces or if you are facing troubles.
*/
public void setAllowDuplicateRegistrations(boolean allowDuplicateRegistrations) {
this.allowDuplicateRegistrations = allowDuplicateRegistrations;
}
/**
- * Empty DTO for storing service instances details. Currently acting as a placeholder because Curator requires
- * a payload type when registering and discovering nodes. May be enhanced in the future with further information
- * to assist discovery.
+ * Empty DTO for storing service instances details. Currently acting as a placeholder because Curator requires a
+ * payload type when registering and discovering nodes. May be enhanced in the future with further information to
+ * assist discovery.
*
* @author Raul Kripalani
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/e05c012f/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
index 3f9b1ff..601323c 100644
--- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
+++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
@@ -122,6 +122,7 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
// shall be configured through system property
if (gridName.equals(getTestGridName(0)))
zkIpFinder.setZkConnectionString(zkCluster.getConnectString());
+
else if (gridName.equals(getTestGridName(1))) {
zkIpFinder.setCurator(CuratorFrameworkFactory.newClient(zkCluster.getConnectString(),
new ExponentialBackoffRetry(100, 5)));
@@ -360,23 +361,16 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
// stop all grids
stopAllGrids();
- boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
try {
- return zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size() == 0;
+ return 0 == zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size();
}
catch (Exception e) {
- fail("Unexpected error: ");
-
- return true;
+ return false;
}
}
- }, 2 * 60000);
-
- assertTrue(wait);
-
- // check that all nodes are gone in ZK
- assertEquals(0, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
+ }, 20000));
}
/**
[08/13] ignite git commit: ignite-3209 Waiting for affinity topology
in case of failover for affinity call
Posted by av...@apache.org.
ignite-3209 Waiting for affinity topology in case of failover for affinity call
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a672e576
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a672e576
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a672e576
Branch: refs/heads/master
Commit: a672e576cf88e21859944474355a91d9f4699550
Parents: b841195
Author: agura <ag...@gridgain.com>
Authored: Tue Jun 14 17:32:25 2016 +0300
Committer: agura <ag...@gridgain.com>
Committed: Wed Jun 15 17:43:44 2016 +0300
----------------------------------------------------------------------
.../processors/task/GridTaskWorker.java | 44 ++++++++++++++++++--
.../cache/CacheAffinityCallSelfTest.java | 42 +++++++++++++------
2 files changed, 71 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a672e576/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 05970ed..415d632 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -61,10 +61,12 @@ import org.apache.ignite.internal.GridJobSiblingImpl;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTaskSessionImpl;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.compute.ComputeTaskTimeoutCheckedException;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.closure.AffinityTask;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.typedef.CO;
@@ -76,6 +78,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.internal.visor.util.VisorClusterGroupEmptyException;
+import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.resources.TaskContinuousMapperResource;
@@ -678,6 +681,12 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
// job response was changed in this method apply.
boolean selfOccupied = false;
+ IgniteInternalFuture<?> affFut = null;
+
+ boolean waitForAffTop = false;
+
+ final GridJobExecuteResponse failoverRes = res;
+
try {
synchronized (mux) {
// If task is not waiting for responses,
@@ -847,7 +856,18 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
}
case FAILOVER: {
- if (!failover(res, jobRes, getTaskTopology()))
+ if (affKey != null) {
+ AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
+
+ affFut = ctx.cache().context().exchange().affinityReadyFuture(topVer);
+ }
+
+ if (affFut != null && !affFut.isDone()) {
+ waitForAffTop = true;
+
+ jobRes.resetResponse();
+ }
+ else if (!failover(res, jobRes, getTaskTopology()))
plc = null;
break;
@@ -856,7 +876,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
}
// Outside of synchronization.
- if (plc != null) {
+ if (plc != null && !waitForAffTop) {
// Handle failover.
if (plc == FAILOVER)
sendFailoverRequest(jobRes);
@@ -872,6 +892,8 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
U.error(log, "Failed to obtain topology [ses=" + ses + ", err=" + e + ']', e);
finishTask(null, e);
+
+ waitForAffTop = false;
}
finally {
// Open up job for processing responses.
@@ -890,6 +912,18 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
res = delayedRess.poll();
}
}
+
+ if (waitForAffTop && affFut != null) {
+ affFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut0) {
+ ctx.closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ onResponse(failoverRes);
+ }
+ }, false);
+ }
+ });
+ }
}
}
@@ -1039,7 +1073,11 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
* @param top Topology.
* @return {@code True} if fail-over SPI returned a new node.
*/
- private boolean failover(GridJobExecuteResponse res, GridJobResultImpl jobRes, Collection<? extends ClusterNode> top) {
+ private boolean failover(
+ GridJobExecuteResponse res,
+ GridJobResultImpl jobRes,
+ Collection<? extends ClusterNode> top
+ ) {
assert Thread.holdsLock(mux);
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/a672e576/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
index a1762cc..92e2b9b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
@@ -50,9 +50,6 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest {
private static final String CACHE_NAME = "myCache";
/** */
- private static final int MAX_FAILOVER_ATTEMPTS = 500;
-
- /** */
private static final int SRVS = 4;
/** */
@@ -69,21 +66,22 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest {
cfg.setDiscoverySpi(spi);
AlwaysFailoverSpi failSpi = new AlwaysFailoverSpi();
- failSpi.setMaximumFailoverAttempts(MAX_FAILOVER_ATTEMPTS);
cfg.setFailoverSpi(failSpi);
- CacheConfiguration ccfg = defaultCacheConfiguration();
- ccfg.setName(CACHE_NAME);
- ccfg.setCacheMode(PARTITIONED);
- ccfg.setBackups(1);
-
- cfg.setCacheConfiguration(ccfg);
-
+ // Do not configure cache on client.
if (gridName.equals(getTestGridName(SRVS))) {
cfg.setClientMode(true);
spi.setForceServerMode(true);
}
+ else {
+ CacheConfiguration ccfg = defaultCacheConfiguration();
+ ccfg.setName(CACHE_NAME);
+ ccfg.setCacheMode(PARTITIONED);
+ ccfg.setBackups(1);
+
+ cfg.setCacheConfiguration(ccfg);
+ }
return cfg;
}
@@ -99,7 +97,27 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest {
public void testAffinityCallRestartNode() throws Exception {
startGridsMultiThreaded(SRVS);
- final int ITERS = 5;
+ affinityCallRestartNode();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAffinityCallFromClientRestartNode() throws Exception {
+ startGridsMultiThreaded(SRVS + 1);
+
+ Ignite client = grid(SRVS);
+
+ assertTrue(client.configuration().isClientMode());
+
+ affinityCallRestartNode();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void affinityCallRestartNode() throws Exception {
+ final int ITERS = 10;
for (int i = 0; i < ITERS; i++) {
log.info("Iteration: " + i);
[10/13] ignite git commit: GG-11237 KeepBinary flag will not be send
to another node.
Posted by av...@apache.org.
GG-11237 KeepBinary flag will not be send to another node.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b5f4aac2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b5f4aac2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b5f4aac2
Branch: refs/heads/master
Commit: b5f4aac2a5daad96cdac4df9abf91713824ff2c0
Parents: 19aeec3
Author: Anton Vinogradov <av...@apache.org>
Authored: Thu Jun 16 14:14:33 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Thu Jun 16 14:14:33 2016 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMapEntry.java | 49 ++++-----
.../cache/transactions/IgniteTxAdapter.java | 15 +--
.../GridCacheLazyPlainVersionedEntry.java | 107 +++++++++++++++++++
.../version/GridCachePlainVersionedEntry.java | 7 +-
4 files changed, 140 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b5f4aac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 6d321bd..57fa68e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -54,7 +54,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
-import org.apache.ignite.internal.processors.cache.version.GridCachePlainVersionedEntry;
+import org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
@@ -2035,8 +2035,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
// Cache is conflict-enabled.
if (cctx.conflictNeedResolve()) {
- // Get new value, optionally unmarshalling and/or transforming it.
- Object writeObj0;
+ GridCacheVersionedEntryEx newEntry;
+
+ GridTuple3<Long, Long, Boolean> expiration = ttlAndExpireTime(expiryPlc,
+ explicitTtl,
+ explicitExpireTime);
+
+ // Prepare old and new entries for conflict resolution.
+ GridCacheVersionedEntryEx oldEntry = versionedEntry(keepBinary);
if (op == GridCacheOperation.TRANSFORM) {
transformClo = writeObj;
@@ -2051,14 +2057,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
try {
Object computed = entryProcessor.process(entry, invokeArgs);
- if (entry.modified()) {
- writeObj0 = cctx.unwrapTemporary(entry.getValue());
- writeObj = cctx.toCacheObject(writeObj0);
- }
- else {
+ if (entry.modified())
+ writeObj = cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue()));
+ else
writeObj = oldVal;
- writeObj0 = cctx.unwrapBinaryIfNeeded(oldVal, keepBinary, false);
- }
key0 = entry.key();
@@ -2069,24 +2071,17 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
invokeRes = new IgniteBiTuple(null, e);
writeObj = oldVal;
- writeObj0 = cctx.unwrapBinaryIfNeeded(oldVal, keepBinary, false);
}
}
- else
- writeObj0 = cctx.unwrapBinaryIfNeeded(writeObj, keepBinary, false);
- GridTuple3<Long, Long, Boolean> expiration = ttlAndExpireTime(expiryPlc,
- explicitTtl,
- explicitExpireTime);
-
- // Prepare old and new entries for conflict resolution.
- GridCacheVersionedEntryEx oldEntry = versionedEntry(keepBinary);
- GridCacheVersionedEntryEx newEntry = new GridCachePlainVersionedEntry<>(
- oldEntry.key(),
- writeObj0,
+ newEntry = new GridCacheLazyPlainVersionedEntry<>(
+ cctx,
+ key,
+ (CacheObject)writeObj,
expiration.get1(),
expiration.get2(),
- conflictVer != null ? conflictVer : newVer);
+ conflictVer != null ? conflictVer : newVer,
+ keepBinary);
// Resolve conflict.
conflictCtx = cctx.conflictResolve(oldEntry, newEntry, verCheck);
@@ -3531,12 +3526,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CacheObject val = isNew ? unswap(true, false) : rawGetOrUnmarshalUnlocked(false);
- return new GridCachePlainVersionedEntry<>(cctx.unwrapBinaryIfNeeded(key, keepBinary, true),
- cctx.unwrapBinaryIfNeeded(val, keepBinary, true),
+ return new GridCacheLazyPlainVersionedEntry<>(cctx,
+ key,
+ val,
ttlExtras(),
expireTimeExtras(),
ver.conflictVersion(),
- isNew);
+ isNew,
+ keepBinary);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/b5f4aac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index ece013e..f76f4bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -55,7 +55,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
-import org.apache.ignite.internal.processors.cache.version.GridCachePlainVersionedEntry;
+import org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
@@ -1645,14 +1645,15 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
// Construct new entry info.
GridCacheContext entryCtx = txEntry.context();
- Object newVal0 = entryCtx.unwrapBinaryIfNeeded(newVal, txEntry.keepBinary(), false);
-
- GridCacheVersionedEntryEx newEntry = new GridCachePlainVersionedEntry(
- oldEntry.key(),
- newVal0,
+ GridCacheVersionedEntryEx newEntry = new GridCacheLazyPlainVersionedEntry(
+ entryCtx,
+ txEntry.key(),
+ newVal,
newTtl,
newExpireTime,
- newVer);
+ newVer,
+ false,
+ txEntry.keepBinary());
GridCacheVersionConflictContext ctx = old.context().conflictResolve(oldEntry, newEntry, false);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b5f4aac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheLazyPlainVersionedEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheLazyPlainVersionedEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheLazyPlainVersionedEntry.java
new file mode 100644
index 0000000..50de328
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheLazyPlainVersionedEntry.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.version;
+
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+
+/**
+ * Lazy plain versioned entry.
+ */
+public class GridCacheLazyPlainVersionedEntry<K, V> extends GridCachePlainVersionedEntry<K, V> {
+ /** Cache context. */
+ protected GridCacheContext cctx;
+
+ /** Key cache object. */
+ private KeyCacheObject keyObj;
+
+ /** Cache object value. */
+ private CacheObject valObj;
+
+ /** Keep binary flag. */
+ private boolean keepBinary;
+
+ /**
+ * @param cctx Context.
+ * @param keyObj Key.
+ * @param valObj Value.
+ * @param ttl TTL.
+ * @param expireTime Expire time.
+ * @param ver Version.
+ * @param isStartVer Start version flag.
+ * @param keepBinary Keep binary flag.
+ */
+ public GridCacheLazyPlainVersionedEntry(GridCacheContext cctx,
+ KeyCacheObject keyObj,
+ CacheObject valObj,
+ long ttl,
+ long expireTime,
+ GridCacheVersion ver,
+ boolean isStartVer,
+ boolean keepBinary) {
+ super(null, null, ttl, expireTime, ver, isStartVer);
+
+ this.cctx = cctx;
+ this.keyObj = keyObj;
+ this.valObj = valObj;
+ this.keepBinary = keepBinary;
+ }
+
+ public GridCacheLazyPlainVersionedEntry(GridCacheContext cctx,
+ KeyCacheObject keyObj,
+ CacheObject valObj,
+ long ttl,
+ long expireTime,
+ GridCacheVersion ver,
+ boolean keepBinary) {
+ super(null, null, ttl, expireTime, ver);
+ this.cctx = cctx;
+ this.keepBinary = keepBinary;
+ this.keyObj = keyObj;
+ this.valObj = valObj;
+ }
+
+ /** {@inheritDoc} */
+ @Override public K key() {
+ if (key == null)
+ key = (K)cctx.unwrapBinaryIfNeeded(keyObj, keepBinary);
+
+ return key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public V value() {
+ return value(keepBinary);
+ }
+
+ /**
+ * Returns the value stored in the cache when this entry was created.
+ *
+ * @param keepBinary Flag to keep binary if needed.
+ * @return the value corresponding to this entry
+ */
+ @SuppressWarnings("unchecked")
+ public V value(boolean keepBinary) {
+ if (val == null)
+ val = (V)cctx.unwrapBinaryIfNeeded(valObj, keepBinary, true);
+
+ return val;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b5f4aac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCachePlainVersionedEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCachePlainVersionedEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCachePlainVersionedEntry.java
index e1da1c4..dd682e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCachePlainVersionedEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCachePlainVersionedEntry.java
@@ -25,10 +25,10 @@ import org.jetbrains.annotations.Nullable;
*/
public class GridCachePlainVersionedEntry<K, V> implements GridCacheVersionedEntryEx<K, V> {
/** Key. */
- private final K key;
+ protected K key;
/** Value. */
- private final V val;
+ protected V val;
/** TTL. */
private final long ttl;
@@ -63,9 +63,6 @@ public class GridCachePlainVersionedEntry<K, V> implements GridCacheVersionedEnt
*/
public GridCachePlainVersionedEntry(K key, V val, long ttl, long expireTime, GridCacheVersion ver,
boolean isStartVer) {
- assert ver != null;
- assert key != null;
-
this.key = key;
this.val = val;
this.ttl = ttl;
[13/13] ignite git commit: Merge remote-tracking branch
'remotes/upstream/gridgain-7.6.1'
Posted by av...@apache.org.
Merge remote-tracking branch 'remotes/upstream/gridgain-7.6.1'
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ca546b3d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ca546b3d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ca546b3d
Branch: refs/heads/master
Commit: ca546b3d4ac8431aa9b5104e1d5b732720676d67
Parents: 9d46a73 e05c012
Author: Anton Vinogradov <av...@apache.org>
Authored: Thu Jun 16 14:30:11 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Thu Jun 16 14:30:11 2016 +0300
----------------------------------------------------------------------
.../configuration/FileSystemConfiguration.java | 18 +-
.../processors/cache/GridCacheMapEntry.java | 49 +-
.../cache/transactions/IgniteTxAdapter.java | 15 +-
.../GridCacheLazyPlainVersionedEntry.java | 107 +++
.../version/GridCachePlainVersionedEntry.java | 7 +-
.../GridCacheCountDownLatchImpl.java | 54 +-
.../processors/igfs/IgfsCreateResult.java | 66 ++
.../processors/igfs/IgfsDataManager.java | 166 +----
.../processors/igfs/IgfsDeleteResult.java | 62 ++
.../internal/processors/igfs/IgfsImpl.java | 83 +--
.../processors/igfs/IgfsMetaManager.java | 733 +++++++++----------
.../internal/processors/igfs/IgfsPathIds.java | 10 +
.../IgfsSecondaryFileSystemCreateContext.java | 111 +++
.../IgfsSecondaryOutputStreamDescriptor.java | 59 --
.../igfs/data/IgfsDataPutProcessor.java | 99 +++
.../meta/IgfsMetaDirectoryCreateProcessor.java | 40 +-
...IgfsMetaDirectoryListingRenameProcessor.java | 133 ++++
.../igfs/meta/IgfsMetaFileCreateProcessor.java | 46 +-
.../ignite/internal/util/IgniteUtils.java | 8 +-
.../ignite/internal/util/lang/GridFunc.java | 66 --
.../spi/discovery/tcp/TcpDiscoverySpi.java | 7 +-
.../IgniteCountDownLatchAbstractSelfTest.java | 156 +++-
...yRemoteFilterMissingInClassPathSelfTest.java | 237 ++++++
.../processors/igfs/IgfsAbstractSelfTest.java | 14 +-
.../igfs/IgfsMetaManagerSelfTest.java | 24 +-
.../internal/util/IgniteUtilsSelfTest.java | 15 +
.../IgniteCacheQuerySelfTestSuite3.java | 2 +
.../zk/TcpDiscoveryZookeeperIpFinder.java | 65 +-
.../tcp/ipfinder/zk/ZookeeperIpFinderTest.java | 16 +-
29 files changed, 1628 insertions(+), 840 deletions(-)
----------------------------------------------------------------------
[05/13] ignite git commit: IGNITE-2938: IGFS: Puts during secondary
file reads are now performed synchronously and with proper semantics.
Posted by av...@apache.org.
IGNITE-2938: IGFS: Puts during secondary file reads are now performed synchronously and with proper semantics.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e733e912
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e733e912
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e733e912
Branch: refs/heads/master
Commit: e733e912cccf2c83d7e31d6f856b30fc4c663975
Parents: 95513d3
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Jun 14 17:39:44 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Jun 15 16:35:58 2016 +0300
----------------------------------------------------------------------
.../configuration/FileSystemConfiguration.java | 18 +-
.../processors/igfs/IgfsDataManager.java | 166 ++++---------------
.../igfs/data/IgfsDataPutProcessor.java | 99 +++++++++++
3 files changed, 147 insertions(+), 136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e733e912/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
index 28dd611..6e0e5e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
@@ -704,7 +704,9 @@ public class FileSystemConfiguration {
* Gets maximum timeout awaiting for trash purging in case data cache oversize is detected.
*
* @return Maximum timeout awaiting for trash purging in case data cache oversize is detected.
+ * @deprecated Not used any more.
*/
+ @Deprecated
public long getTrashPurgeTimeout() {
return trashPurgeTimeout;
}
@@ -713,7 +715,9 @@ public class FileSystemConfiguration {
* Sets maximum timeout awaiting for trash purging in case data cache oversize is detected.
*
* @param trashPurgeTimeout Maximum timeout awaiting for trash purging in case data cache oversize is detected.
+ * @deprecated Not used any more.
*/
+ @Deprecated
public void setTrashPurgeTimeout(long trashPurgeTimeout) {
this.trashPurgeTimeout = trashPurgeTimeout;
}
@@ -724,8 +728,10 @@ public class FileSystemConfiguration {
* In case no executor service is provided, default one will be created with maximum amount of threads equals
* to amount of processor cores.
*
- * @return Get DUAL mode put operation executor service
+ * @return Get DUAL mode put operation executor service.
+ * @deprecated Not used any more.
*/
+ @Deprecated
@Nullable public ExecutorService getDualModePutExecutorService() {
return dualModePutExec;
}
@@ -734,7 +740,9 @@ public class FileSystemConfiguration {
* Set DUAL mode put operations executor service.
*
* @param dualModePutExec Dual mode put operations executor service.
+ * @deprecated Not used any more.
*/
+ @Deprecated
public void setDualModePutExecutorService(ExecutorService dualModePutExec) {
this.dualModePutExec = dualModePutExec;
}
@@ -743,7 +751,9 @@ public class FileSystemConfiguration {
* Get DUAL mode put operation executor service shutdown flag.
*
* @return DUAL mode put operation executor service shutdown flag.
+ * @deprecated Not used any more.
*/
+ @Deprecated
public boolean getDualModePutExecutorServiceShutdown() {
return dualModePutExecShutdown;
}
@@ -752,7 +762,9 @@ public class FileSystemConfiguration {
* Set DUAL mode put operations executor service shutdown flag.
*
* @param dualModePutExecShutdown Dual mode put operations executor service shutdown flag.
+ * @deprecated Not used any more.
*/
+ @Deprecated
public void setDualModePutExecutorServiceShutdown(boolean dualModePutExecShutdown) {
this.dualModePutExecShutdown = dualModePutExecShutdown;
}
@@ -766,7 +778,9 @@ public class FileSystemConfiguration {
* avoid issues with increasing GC pauses or out-of-memory error.
*
* @return Maximum amount of pending data read from the secondary file system
+ * @deprecated Not used any more.
*/
+ @Deprecated
public long getDualModeMaxPendingPutsSize() {
return dualModeMaxPendingPutsSize;
}
@@ -775,7 +789,9 @@ public class FileSystemConfiguration {
* Set maximum amount of data in pending put operations.
*
* @param dualModeMaxPendingPutsSize Maximum amount of data in pending put operations.
+ * @deprecated Not used any more.
*/
+ @Deprecated
public void setDualModeMaxPendingPutsSize(long dualModeMaxPendingPutsSize) {
this.dualModeMaxPendingPutsSize = dualModeMaxPendingPutsSize;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e733e912/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index 03777c3..ca8a3af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerCacheUpdaters;
+import org.apache.ignite.internal.processors.igfs.data.IgfsDataPutProcessor;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -53,7 +54,6 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@ -78,10 +78,8 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@@ -98,9 +96,6 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA
* Cache based file's data container.
*/
public class IgfsDataManager extends IgfsManager {
- /** IGFS. */
- private IgfsEx igfs;
-
/** Data internal cache. */
private IgniteInternalCache<IgfsBlockKey, byte[]> dataCachePrj;
@@ -143,31 +138,10 @@ public class IgfsDataManager extends IgfsManager {
/** Async file delete worker. */
private AsyncDeleteWorker delWorker;
- /** Trash purge timeout. */
- private long trashPurgeTimeout;
-
/** On-going remote reads futures. */
private final ConcurrentHashMap8<IgfsBlockKey, IgniteInternalFuture<byte[]>> rmtReadFuts =
new ConcurrentHashMap8<>();
- /** Executor service for puts in dual mode */
- private volatile ExecutorService putExecSvc;
-
- /** Executor service for puts in dual mode shutdown flag. */
- private volatile boolean putExecSvcShutdown;
-
- /** Maximum amount of data in pending puts. */
- private volatile long maxPendingPuts;
-
- /** Current amount of data in pending puts. */
- private long curPendingPuts;
-
- /** Lock for pending puts. */
- private final Lock pendingPutsLock = new ReentrantLock();
-
- /** Condition for pending puts. */
- private final Condition pendingPutsCond = pendingPutsLock.newCondition();
-
/**
*
*/
@@ -182,8 +156,6 @@ public class IgfsDataManager extends IgfsManager {
/** {@inheritDoc} */
@Override protected void start0() throws IgniteCheckedException {
- igfs = igfsCtx.igfs();
-
dataCacheStartLatch = new CountDownLatch(1);
String igfsName = igfsCtx.configuration().getName();
@@ -216,23 +188,6 @@ public class IgfsDataManager extends IgfsManager {
igfsSvc = igfsCtx.kernalContext().getIgfsExecutorService();
- trashPurgeTimeout = igfsCtx.configuration().getTrashPurgeTimeout();
-
- putExecSvc = igfsCtx.configuration().getDualModePutExecutorService();
-
- if (putExecSvc != null)
- putExecSvcShutdown = igfsCtx.configuration().getDualModePutExecutorServiceShutdown();
- else {
- int coresCnt = Runtime.getRuntime().availableProcessors();
-
- // Note that we do not pre-start threads here as IGFS pool may not be needed.
- putExecSvc = new IgniteThreadPoolExecutor(coresCnt, coresCnt, 0, new LinkedBlockingDeque<Runnable>());
-
- putExecSvcShutdown = true;
- }
-
- maxPendingPuts = igfsCtx.configuration().getDualModeMaxPendingPutsSize();
-
delWorker = new AsyncDeleteWorker(igfsCtx.kernalContext().gridName(),
"igfs-" + igfsName + "-delete-worker", log);
}
@@ -282,9 +237,6 @@ public class IgfsDataManager extends IgfsManager {
catch (IgniteInterruptedCheckedException e) {
log.warning("Got interrupter while waiting for delete worker to stop (will continue stopping).", e);
}
-
- if (putExecSvcShutdown)
- U.shutdownNow(getClass(), putExecSvc, log);
}
/**
@@ -308,6 +260,7 @@ public class IgfsDataManager extends IgfsManager {
* @param prevAffKey Affinity key of previous block.
* @return Affinity key.
*/
+ @SuppressWarnings("ConstantConditions")
public IgniteUuid nextAffinityKey(@Nullable IgniteUuid prevAffKey) {
// Do not generate affinity key for non-affinity nodes.
if (!dataCache.context().affinityNode())
@@ -371,8 +324,6 @@ public class IgfsDataManager extends IgfsManager {
@Nullable public IgniteInternalFuture<byte[]> dataBlock(final IgfsEntryInfo fileInfo, final IgfsPath path,
final long blockIdx, @Nullable final IgfsSecondaryFileSystemPositionedReadable secReader)
throws IgniteCheckedException {
- //assert validTxState(any); // Allow this method call for any transaction state.
-
assert fileInfo != null;
assert blockIdx >= 0;
@@ -435,7 +386,7 @@ public class IgfsDataManager extends IgfsManager {
rmtReadFut.onDone(res);
- putSafe(key, res);
+ putBlock(fileInfo.blockSize(), key, res);
metrics.addReadBlocks(1, 1);
}
@@ -471,6 +422,26 @@ public class IgfsDataManager extends IgfsManager {
}
/**
+ * Stores the given block in data cache.
+ *
+ * @param blockSize The size of the block.
+ * @param key The data cache key of the block.
+ * @param data The new value of the block.
+ */
+ private void putBlock(int blockSize, IgfsBlockKey key, byte[] data) throws IgniteCheckedException {
+ if (data.length < blockSize)
+ // partial (incomplete) block:
+ dataCachePrj.invoke(key, new IgfsDataPutProcessor(data));
+ else {
+ // whole block:
+ assert data.length == blockSize;
+
+ dataCachePrj.put(key, data);
+ }
+ }
+
+
+ /**
* Registers write future in igfs data manager.
*
* @param fileId File ID.
@@ -680,7 +651,7 @@ public class IgfsDataManager extends IgfsManager {
byte[] val = vals.get(colocatedKey);
if (val != null) {
- dataCachePrj.put(key, val);
+ putBlock(fileInfo.blockSize(), key, val);
tx.commit();
}
@@ -744,7 +715,6 @@ public class IgfsDataManager extends IgfsManager {
*/
public Collection<IgfsBlockLocation> affinity(IgfsEntryInfo info, long start, long len, long maxLen)
throws IgniteCheckedException {
- assert validTxState(false);
assert info.isFile() : "Failed to get affinity (not a file): " + info;
assert start >= 0 : "Start position should not be negative: " + start;
assert len >= 0 : "Part length should not be negative: " + len;
@@ -974,21 +944,6 @@ public class IgfsDataManager extends IgfsManager {
}
/**
- * Check transaction is (not) started.
- *
- * @param inTx Expected transaction state.
- * @return Transaction state is correct.
- */
- private boolean validTxState(boolean inTx) {
- boolean txState = inTx == (dataCachePrj.tx() != null);
-
- assert txState : (inTx ? "Method cannot be called outside transaction: " :
- "Method cannot be called in transaction: ") + dataCachePrj.tx();
-
- return txState;
- }
-
- /**
* @param fileId File ID.
* @param node Node to process blocks on.
* @param blocks Blocks to put in cache.
@@ -1056,10 +1011,11 @@ public class IgfsDataManager extends IgfsManager {
* @param colocatedKey Block key.
* @param startOff Data start offset within block.
* @param data Data to write.
+ * @param blockSize The block size.
* @throws IgniteCheckedException If update failed.
*/
private void processPartialBlockWrite(IgniteUuid fileId, IgfsBlockKey colocatedKey, int startOff,
- byte[] data) throws IgniteCheckedException {
+ byte[] data, int blockSize) throws IgniteCheckedException {
if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax()) {
final WriteCompletionFuture completionFut = pendingWrites.get(fileId);
@@ -1090,7 +1046,7 @@ public class IgfsDataManager extends IgfsManager {
// If writing from block beginning, just put and return.
if (startOff == 0) {
- dataCachePrj.put(colocatedKey, data);
+ putBlock(blockSize, colocatedKey, data);
return;
}
@@ -1151,67 +1107,6 @@ public class IgfsDataManager extends IgfsManager {
}
/**
- * Put data block read from the secondary file system to the cache.
- *
- * @param key Key.
- * @param data Data.
- * @throws IgniteCheckedException If failed.
- */
- private void putSafe(final IgfsBlockKey key, final byte[] data) throws IgniteCheckedException {
- assert key != null;
- assert data != null;
-
- if (maxPendingPuts > 0) {
- pendingPutsLock.lock();
-
- try {
- while (curPendingPuts > maxPendingPuts)
- pendingPutsCond.await(2000, TimeUnit.MILLISECONDS);
-
- curPendingPuts += data.length;
- }
- catch (InterruptedException ignore) {
- throw new IgniteCheckedException("Failed to put IGFS data block into cache due to interruption: " + key);
- }
- finally {
- pendingPutsLock.unlock();
- }
- }
-
- Runnable task = new Runnable() {
- @Override public void run() {
- try {
- dataCachePrj.put(key, data);
- }
- catch (IgniteCheckedException e) {
- U.warn(log, "Failed to put IGFS data block into cache [key=" + key + ", err=" + e + ']');
- }
- finally {
- if (maxPendingPuts > 0) {
- pendingPutsLock.lock();
-
- try {
- curPendingPuts -= data.length;
-
- pendingPutsCond.signalAll();
- }
- finally {
- pendingPutsLock.unlock();
- }
- }
- }
- }
- };
-
- try {
- putExecSvc.submit(task);
- }
- catch (RejectedExecutionException ignore) {
- task.run();
- }
- }
-
- /**
* @param blocks Blocks to write.
* @return Future that will be completed after put is done.
*/
@@ -1261,6 +1156,7 @@ public class IgfsDataManager extends IgfsManager {
* @param nodeId Node ID.
* @param ackMsg Write acknowledgement message.
*/
+ @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
private void processAckMessage(UUID nodeId, IgfsAckMessage ackMsg) {
try {
ackMsg.finishUnmarshal(igfsCtx.kernalContext().config().getMarshaller(), null);
@@ -1343,6 +1239,7 @@ public class IgfsDataManager extends IgfsManager {
* @throws IgniteCheckedException If failed.
* @return Data remainder if {@code flush} flag is {@code false}.
*/
+ @SuppressWarnings("ConstantConditions")
@Nullable public byte[] storeDataBlocks(
IgfsEntryInfo fileInfo,
long reservedLen,
@@ -1456,7 +1353,7 @@ public class IgfsDataManager extends IgfsManager {
if (size != blockSize) {
// Partial writes must be always synchronous.
- processPartialBlockWrite(id, key, block == first ? off : 0, portion);
+ processPartialBlockWrite(id, key, block == first ? off : 0, portion, blockSize);
writtenTotal++;
}
@@ -1617,8 +1514,6 @@ public class IgfsDataManager extends IgfsManager {
protected AsyncDeleteWorker(@Nullable String gridName, String name, IgniteLogger log) {
super(gridName, name, log);
- long time = System.currentTimeMillis();
-
stopInfo = IgfsUtils.createDirectory(IgniteUuid.randomUuid());
}
@@ -1642,6 +1537,7 @@ public class IgfsDataManager extends IgfsManager {
}
/** {@inheritDoc} */
+ @SuppressWarnings("ConstantConditions")
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
try {
while (!isCancelled()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/e733e912/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/data/IgfsDataPutProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/data/IgfsDataPutProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/data/IgfsDataPutProcessor.java
new file mode 100644
index 0000000..2029d4e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/data/IgfsDataPutProcessor.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs.data;
+
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryReader;
+import org.apache.ignite.binary.BinaryWriter;
+import org.apache.ignite.binary.Binarylizable;
+import org.apache.ignite.internal.processors.igfs.IgfsBlockKey;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * Entry processor to set or replace block byte value.
+ */
+public class IgfsDataPutProcessor implements EntryProcessor<IgfsBlockKey, byte[], Void>, Externalizable, Binarylizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** The new value. */
+ private byte[] newVal;
+
+ /**
+ * Non-arg constructor required by externalizable.
+ */
+ public IgfsDataPutProcessor() {
+ // no-op
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param newVal The new value.
+ */
+ public IgfsDataPutProcessor(byte[] newVal) {
+ assert newVal != null;
+
+ this.newVal = newVal;
+ }
+
+ /** {@inheritDoc} */
+ public Void process(MutableEntry<IgfsBlockKey, byte[]> entry, Object... args)
+ throws EntryProcessorException {
+ byte[] curVal = entry.getValue();
+
+ if (curVal == null || newVal.length > curVal.length)
+ entry.setValue(newVal);
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ newVal = U.readByteArray(in);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeByteArray(out, newVal);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
+ newVal = reader.rawReader().readByteArray();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
+ writer.rawWriter().writeByteArray(newVal);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgfsDataPutProcessor.class, this);
+ }
+}
[04/13] ignite git commit: IGNITE-3294: IGFS: Created separate
processor optimized for entry rename.
Posted by av...@apache.org.
IGNITE-3294: IGFS: Created separate processor optimized for entry rename.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/95513d32
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/95513d32
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/95513d32
Branch: refs/heads/master
Commit: 95513d3214dd1ea7a1a68d6f1709af004c2efccd
Parents: c2f8df9
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Jun 14 11:36:33 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Jun 15 16:33:30 2016 +0300
----------------------------------------------------------------------
.../processors/igfs/IgfsMetaManager.java | 7 +-
...IgfsMetaDirectoryListingRenameProcessor.java | 133 +++++++++++++++++++
2 files changed, 136 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/95513d32/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 5f7dd3d..fd0b07e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -51,6 +51,7 @@ import org.apache.ignite.internal.processors.igfs.client.IgfsClientAbstractCalla
import org.apache.ignite.internal.processors.igfs.client.meta.IgfsClientMetaIdsForPathCallable;
import org.apache.ignite.internal.processors.igfs.client.meta.IgfsClientMetaInfoForPathCallable;
import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaDirectoryCreateProcessor;
+import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaDirectoryListingRenameProcessor;
import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaFileCreateProcessor;
import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaFileLockProcessor;
import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaFileReserveSpaceProcessor;
@@ -1742,10 +1743,8 @@ public class IgfsMetaManager extends IgfsManager {
IgniteUuid destId, String destName) throws IgniteCheckedException {
validTxState(true);
- if (F.eq(srcId, destId)) {
- id2InfoPrj.invoke(srcId, new IgfsMetaDirectoryListingRemoveProcessor(srcName, entry.fileId()));
- id2InfoPrj.invoke(destId, new IgfsMetaDirectoryListingAddProcessor(destName, entry));
- }
+ if (F.eq(srcId, destId))
+ id2InfoPrj.invoke(srcId, new IgfsMetaDirectoryListingRenameProcessor(srcName, destName));
else {
Map<IgniteUuid, EntryProcessor<IgniteUuid, IgfsEntryInfo, Void>> procMap = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/95513d32/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingRenameProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingRenameProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingRenameProcessor.java
new file mode 100644
index 0000000..6460adf
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingRenameProcessor.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs.meta;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.binary.BinaryRawWriter;
+import org.apache.ignite.binary.BinaryReader;
+import org.apache.ignite.binary.BinaryWriter;
+import org.apache.ignite.binary.Binarylizable;
+import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo;
+import org.apache.ignite.internal.processors.igfs.IgfsListingEntry;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Processor to rename a single entry in a directory listing.
+ */
+public class IgfsMetaDirectoryListingRenameProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, Void>,
+ Externalizable, Binarylizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Old name. */
+ private String oldName;
+
+ /** New name. */
+ private String newName;
+
+ /**
+ * Constructor.
+ */
+ public IgfsMetaDirectoryListingRenameProcessor() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param oldName Old name.
+ * @param newName New name.
+ */
+ public IgfsMetaDirectoryListingRenameProcessor(String oldName, String newName) {
+ this.oldName = oldName;
+ this.newName = newName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void process(MutableEntry<IgniteUuid, IgfsEntryInfo> e, Object... args)
+ throws EntryProcessorException {
+ IgfsEntryInfo fileInfo = e.getValue();
+
+ assert fileInfo.isDirectory();
+
+ Map<String, IgfsListingEntry> listing = new HashMap<>(fileInfo.listing());
+
+ // Modify listing in-place.
+ IgfsListingEntry entry = listing.remove(oldName);
+
+ if (entry == null)
+ throw new IgniteException("Directory listing doesn't contain expected entry: " + oldName);
+
+ IgfsListingEntry replacedEntry = listing.put(newName, entry);
+
+ if (replacedEntry != null)
+ throw new IgniteException("Entry with new name already exists [name=" + newName +
+ ", entry=" + replacedEntry + ']');
+
+ e.setValue(fileInfo.listing(listing));
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeString(out, oldName);
+ U.writeString(out, newName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ oldName = U.readString(in);
+ newName = U.readString(in);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
+ BinaryRawWriter out = writer.rawWriter();
+
+ out.writeString(oldName);
+ out.writeString(newName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
+ BinaryRawReader in = reader.rawReader();
+
+ oldName = in.readString();
+ newName = in.readString();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgfsMetaDirectoryListingRenameProcessor.class, this);
+ }
+}
[03/13] ignite git commit: IGNITE-3294: IGFS: Improved "create"
performance in DUAL mode.
Posted by av...@apache.org.
IGNITE-3294: IGFS: Improved "create" performance in DUAL mode.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c2f8df96
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c2f8df96
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c2f8df96
Branch: refs/heads/master
Commit: c2f8df96907e096fdeaa8d040463ff5ec1662395
Parents: 51814fd
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Jun 14 11:09:01 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Jun 15 16:32:47 2016 +0300
----------------------------------------------------------------------
.../processors/igfs/IgfsCreateResult.java | 66 +++
.../internal/processors/igfs/IgfsImpl.java | 53 +-
.../processors/igfs/IgfsMetaManager.java | 524 ++++++++++---------
.../IgfsSecondaryFileSystemCreateContext.java | 111 ++++
.../IgfsSecondaryOutputStreamDescriptor.java | 59 ---
.../meta/IgfsMetaDirectoryCreateProcessor.java | 40 +-
.../igfs/meta/IgfsMetaFileCreateProcessor.java | 46 +-
.../igfs/IgfsMetaManagerSelfTest.java | 4 +-
8 files changed, 540 insertions(+), 363 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c2f8df96/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCreateResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCreateResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCreateResult.java
new file mode 100644
index 0000000..0b09e02
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCreateResult.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.OutputStream;
+
+/**
+ * IGFS file create result.
+ */
+public class IgfsCreateResult {
+ /** File info in the primary file system. */
+ private final IgfsEntryInfo info;
+
+ /** Output stream to the secondary file system. */
+ private final OutputStream secondaryOut;
+
+ /**
+ * Constructor.
+ *
+ * @param info File info in the primary file system.
+ * @param secondaryOut Output stream to the secondary file system.
+ */
+ public IgfsCreateResult(IgfsEntryInfo info, @Nullable OutputStream secondaryOut) {
+ assert info != null;
+
+ this.info = info;
+ this.secondaryOut = secondaryOut;
+ }
+
+ /**
+ * @return File info in the primary file system.
+ */
+ public IgfsEntryInfo info() {
+ return info;
+ }
+
+ /**
+ * @return Output stream to the secondary file system.
+ */
+ @Nullable public OutputStream secondaryOutputStream() {
+ return secondaryOut;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgfsCreateResult.class, this);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/c2f8df96/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index e6f2ff6..efb347f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -106,7 +106,6 @@ import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_DELETED;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_READ;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_DELETED;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_READ;
-import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_WRITE;
import static org.apache.ignite.events.EventType.EVT_IGFS_META_UPDATED;
import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC;
import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
@@ -1019,28 +1018,10 @@ public final class IgfsImpl implements IgfsEx {
log.debug("Open file for writing [path=" + path + ", bufSize=" + bufSize + ", overwrite=" +
overwrite + ", props=" + props + ']');
+ // Resolve mode.
final IgfsMode mode = resolveMode(path);
- IgfsFileWorkerBatch batch;
-
- if (mode != PRIMARY) {
- assert mode == DUAL_SYNC || mode == DUAL_ASYNC;
-
- await(path);
-
- IgfsSecondaryOutputStreamDescriptor desc = meta.createDual(secondaryFs, path, simpleCreate,
- props, overwrite, bufSize, (short) replication, groupBlockSize(), affKey);
-
- batch = newBatch(path, desc.out());
-
- IgfsOutputStreamImpl os = new IgfsOutputStreamImpl(igfsCtx, path, desc.info(),
- bufferSize(bufSize), mode, batch);
-
- IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_WRITE);
-
- return os;
- }
-
+ // Prepare properties.
final Map<String, String> dirProps, fileProps;
if (props == null) {
@@ -1051,19 +1032,37 @@ public final class IgfsImpl implements IgfsEx {
else
dirProps = fileProps = new HashMap<>(props);
- IgfsEntryInfo res = meta.create(
+ // Prepare context for DUAL mode.
+ IgfsSecondaryFileSystemCreateContext secondaryCtx = null;
+
+ if (mode != PRIMARY)
+ secondaryCtx = new IgfsSecondaryFileSystemCreateContext(secondaryFs, path, overwrite, simpleCreate,
+ fileProps, (short)replication, groupBlockSize(), bufSize);
+
+ // Await for async ops completion if in DUAL mode.
+ if (mode != PRIMARY)
+ await(path);
+
+ // Perform create.
+ IgfsCreateResult res = meta.create(
path,
dirProps,
overwrite,
cfg.getBlockSize(),
affKey,
- evictExclude(path, true),
- fileProps
+ evictExclude(path, mode == PRIMARY),
+ fileProps,
+ secondaryCtx
);
assert res != null;
- return new IgfsOutputStreamImpl(igfsCtx, path, res, bufferSize(bufSize), mode, null);
+ // Create secondary file system batch.
+ OutputStream secondaryStream = res.secondaryOutputStream();
+
+ IgfsFileWorkerBatch batch = secondaryStream != null ? newBatch(path, secondaryStream) : null;
+
+ return new IgfsOutputStreamImpl(igfsCtx, path, res.info(), bufferSize(bufSize), mode, batch);
}
});
}
@@ -1094,9 +1093,9 @@ public final class IgfsImpl implements IgfsEx {
await(path);
- IgfsSecondaryOutputStreamDescriptor desc = meta.appendDual(secondaryFs, path, bufSize, create);
+ IgfsCreateResult desc = meta.appendDual(secondaryFs, path, bufSize, create);
- batch = newBatch(path, desc.out());
+ batch = newBatch(path, desc.secondaryOutputStream());
return new IgfsOutputStreamImpl(igfsCtx, path, desc.info(), bufferSize(bufSize), mode, batch);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c2f8df96/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 4cdf6a9..5f7dd3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -394,40 +394,47 @@ public class IgfsMetaManager extends IgfsManager {
* @throws IgniteCheckedException If failed.
*/
public IgfsPathIds pathIds(IgfsPath path) throws IgniteCheckedException {
- if (busyLock.enterBusy()) {
- try {
- validTxState(false);
+ // Prepare parts.
+ String[] components = path.componentsArray();
- // Prepare parts.
- String[] components = path.componentsArray();
+ String[] parts = new String[components.length + 1];
- String[] parts = new String[components.length + 1];
+ System.arraycopy(components, 0, parts, 1, components.length);
- System.arraycopy(components, 0, parts, 1, components.length);
+ // Get IDs.
+ if (client) {
+ List<IgniteUuid> ids = runClientTask(new IgfsClientMetaIdsForPathCallable(cfg.getName(), path));
- // Prepare IDs.
- IgniteUuid[] ids = new IgniteUuid[parts.length];
+ return new IgfsPathIds(path, parts, ids.toArray(new IgniteUuid[ids.size()]));
+ }
+ else {
+ if (busyLock.enterBusy()) {
+ try {
+ validTxState(false);
- ids[0] = IgfsUtils.ROOT_ID;
+ IgniteUuid[] ids = new IgniteUuid[parts.length];
- for (int i = 1; i < ids.length; i++) {
- IgniteUuid id = fileId(ids[i - 1], parts[i], false);
+ ids[0] = IgfsUtils.ROOT_ID;
- if (id != null)
- ids[i] = id;
- else
- break;
- }
+ for (int i = 1; i < ids.length; i++) {
+ IgniteUuid id = fileId(ids[i - 1], parts[i], false);
- // Return.
- return new IgfsPathIds(path, parts, ids);
- }
- finally {
- busyLock.leaveBusy();
+ if (id != null)
+ ids[i] = id;
+ else
+ break;
+ }
+
+ // Return.
+ return new IgfsPathIds(path, parts, ids);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
}
+ else
+ throw new IllegalStateException("Failed to get file IDS because Grid is stopping: " + path);
}
- else
- throw new IllegalStateException("Failed to get file IDS because Grid is stopping: " + path);
}
/**
@@ -1147,6 +1154,46 @@ public class IgfsMetaManager extends IgfsManager {
}
/**
+ * Wheter operation must be re-tries because we have suspicious links which may broke secondary file system
+ * consistency.
+ *
+ * @param pathIds Path IDs.
+ * @param lockInfos Lock infos.
+ * @return Whether to re-try.
+ */
+ private boolean isRetryForSecondary(IgfsPathIds pathIds, Map<IgniteUuid, IgfsEntryInfo> lockInfos) {
+ // We need to ensure that the last locked info is not linked with expected child.
+ // Otherwise there was some concurrent file system update and we have to re-try.
+ if (!pathIds.allExists()) {
+ // Find the last locked index
+ IgfsEntryInfo lastLockedInfo = null;
+ int lastLockedIdx = -1;
+
+ while (lastLockedIdx < pathIds.lastExistingIndex()) {
+ IgfsEntryInfo nextInfo = lockInfos.get(pathIds.id(lastLockedIdx + 1));
+
+ if (nextInfo != null) {
+ lastLockedInfo = nextInfo;
+ lastLockedIdx++;
+ }
+ else
+ break;
+ }
+
+ assert lastLockedIdx < pathIds.count();
+
+ if (lastLockedInfo != null) {
+ String part = pathIds.part(lastLockedIdx + 1);
+
+ if (lastLockedInfo.listing().containsKey(part))
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
* Move path to the trash directory.
*
* @param path Path.
@@ -1155,8 +1202,8 @@ public class IgfsMetaManager extends IgfsManager {
* @return ID of an entry located directly under the trash directory.
* @throws IgniteCheckedException If failed.
*/
- IgfsDeleteResult softDelete(final IgfsPath path, final boolean recursive, @Nullable IgfsSecondaryFileSystem secondaryFs)
- throws IgniteCheckedException {
+ IgfsDeleteResult softDelete(final IgfsPath path, final boolean recursive,
+ @Nullable IgfsSecondaryFileSystem secondaryFs) throws IgniteCheckedException {
while (true) {
if (busyLock.enterBusy()) {
try {
@@ -1164,16 +1211,8 @@ public class IgfsMetaManager extends IgfsManager {
IgfsPathIds pathIds = pathIds(path);
- boolean relaxed0 = relaxed;
-
- if (!pathIds.allExists()) {
- if (secondaryFs == null)
- // Return early if target path doesn't exist and we do not have secondary file system.
- return new IgfsDeleteResult(false, null);
- else
- // If path is missing partially, we need more serious locking for DUAL mode.
- relaxed0 = false;
- }
+ if (!pathIds.allExists() && secondaryFs == null)
+ return new IgfsDeleteResult(false, null);
IgniteUuid victimId = pathIds.lastId();
String victimName = pathIds.lastPart();
@@ -1184,7 +1223,7 @@ public class IgfsMetaManager extends IgfsManager {
// Prepare IDs to lock.
SortedSet<IgniteUuid> allIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR);
- pathIds.addExistingIds(allIds, relaxed0);
+ pathIds.addExistingIds(allIds, relaxed);
IgniteUuid trashId = IgfsUtils.randomTrashId();
@@ -1194,38 +1233,11 @@ public class IgfsMetaManager extends IgfsManager {
// Lock participants.
Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(allIds);
- if (!pathIds.allExists()) {
- // We need to ensure that the last locked info is not linked with expected child.
- // Otherwise there was some ocncurrent file system update and we have to re-try.
- assert secondaryFs != null;
-
- // Find the last locked index
- IgfsEntryInfo lastLockedInfo = null;
- int lastLockedIdx = -1;
-
- while (lastLockedIdx < pathIds.lastExistingIndex()) {
- IgfsEntryInfo nextInfo = lockInfos.get(pathIds.id(lastLockedIdx + 1));
-
- if (nextInfo != null) {
- lastLockedInfo = nextInfo;
- lastLockedIdx++;
- }
- else
- break;
- }
-
- assert lastLockedIdx < pathIds.count();
-
- if (lastLockedInfo != null) {
- String part = pathIds.part(lastLockedIdx + 1);
-
- if (lastLockedInfo.listing().containsKey(part))
- continue;
- }
- }
+ if (secondaryFs != null && isRetryForSecondary(pathIds, lockInfos))
+ continue;
// Ensure that all participants are still in place.
- if (!pathIds.allExists() || !pathIds.verifyIntegrity(lockInfos, relaxed0)) {
+ if (!pathIds.allExists() || !pathIds.verifyIntegrity(lockInfos, relaxed)) {
// For DUAL mode we will try to update the underlying FS still. Note we do that inside TX.
if (secondaryFs != null) {
boolean res = secondaryFs.delete(path, recursive);
@@ -1719,7 +1731,7 @@ public class IgfsMetaManager extends IgfsManager {
/**
* Transfer entry from one directory to another.
*
- * @param entry Entry to be transfered.
+ * @param entry Entry to be transferred.
* @param srcId Source ID.
* @param srcName Source name.
* @param destId Destination ID.
@@ -1730,8 +1742,19 @@ public class IgfsMetaManager extends IgfsManager {
IgniteUuid destId, String destName) throws IgniteCheckedException {
validTxState(true);
- id2InfoPrj.invoke(srcId, new IgfsMetaDirectoryListingRemoveProcessor(srcName, entry.fileId()));
- id2InfoPrj.invoke(destId, new IgfsMetaDirectoryListingAddProcessor(destName, entry));
+ if (F.eq(srcId, destId)) {
+ id2InfoPrj.invoke(srcId, new IgfsMetaDirectoryListingRemoveProcessor(srcName, entry.fileId()));
+ id2InfoPrj.invoke(destId, new IgfsMetaDirectoryListingAddProcessor(destName, entry));
+ }
+ else {
+
+ Map<IgniteUuid, EntryProcessor<IgniteUuid, IgfsEntryInfo, Void>> procMap = new HashMap<>();
+
+ procMap.put(srcId, new IgfsMetaDirectoryListingRemoveProcessor(srcName, entry.fileId()));
+ procMap.put(destId, new IgfsMetaDirectoryListingAddProcessor(destName, entry));
+
+ id2InfoPrj.invokeAll(procMap);
+ }
}
/**
@@ -1805,7 +1828,7 @@ public class IgfsMetaManager extends IgfsManager {
* @return Output stream descriptor.
* @throws Exception On error.
*/
- IgfsSecondaryOutputStreamDescriptor onSuccessCreate(IgfsSecondaryFileSystem fs, IgfsPath path,
+ IgfsCreateResult onSuccessCreate(IgfsSecondaryFileSystem fs, IgfsPath path,
boolean simpleCreate, @Nullable final Map<String, String> props, boolean overwrite,
int bufSize, short replication, long blockSize, IgniteUuid affKey, Map<IgfsPath, IgfsEntryInfo> infos,
final Deque<IgfsEvent> pendingEvts, final T1<OutputStream> t1) throws Exception {
@@ -1904,60 +1927,7 @@ public class IgfsMetaManager extends IgfsManager {
if (oldId == null && evts.isRecordable(EventType.EVT_IGFS_FILE_CREATED))
pendingEvts.add(new IgfsEvent(path, locNode, EventType.EVT_IGFS_FILE_CREATED));
- return new IgfsSecondaryOutputStreamDescriptor(newInfo, out);
- }
-
- /**
- * A delegate method that performs file creation in the synchronization task.
- *
- * @param fs File system.
- * @param path Path.
- * @param simpleCreate "Simple create" flag.
- * @param props Properties..
- * @param overwrite Overwrite flag.
- * @param bufSize Buffer size.
- * @param replication Replication factor.
- * @param blockSize Block size.
- * @param affKey Affinity key.
- * @return Output stream descriptor.
- * @throws IgniteCheckedException If file creation failed.
- */
- public IgfsSecondaryOutputStreamDescriptor createDual(final IgfsSecondaryFileSystem fs,
- final IgfsPath path,
- final boolean simpleCreate,
- @Nullable final Map<String, String> props,
- final boolean overwrite,
- final int bufSize,
- final short replication,
- final long blockSize,
- final IgniteUuid affKey)
- throws IgniteCheckedException
- {
- if (busyLock.enterBusy()) {
- try {
- assert fs != null;
- assert path != null;
-
- // Events to fire (can be done outside of a transaction).
- final Deque<IgfsEvent> pendingEvts = new LinkedList<>();
-
- CreateFileSynchronizationTask task = new CreateFileSynchronizationTask(fs, path, simpleCreate, props,
- overwrite, bufSize, replication, blockSize, affKey, pendingEvts);
-
- try {
- return synchronizeAndExecute(task, fs, false, path.parent());
- }
- finally {
- for (IgfsEvent evt : pendingEvts)
- evts.record(evt);
- }
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- else
- throw new IllegalStateException("Failed to create file in DUAL mode because Grid is stopping: " + path);
+ return new IgfsCreateResult(newInfo, out);
}
/**
@@ -1970,7 +1940,7 @@ public class IgfsMetaManager extends IgfsManager {
* @return Output stream descriptor.
* @throws IgniteCheckedException If output stream open for append has failed.
*/
- IgfsSecondaryOutputStreamDescriptor appendDual(final IgfsSecondaryFileSystem fs, final IgfsPath path,
+ public IgfsCreateResult appendDual(final IgfsSecondaryFileSystem fs, final IgfsPath path,
final int bufSize, final boolean create) throws IgniteCheckedException {
if (busyLock.enterBusy()) {
try {
@@ -1980,12 +1950,12 @@ public class IgfsMetaManager extends IgfsManager {
// Events to fire (can be done outside of a transaction).
final Deque<IgfsEvent> pendingEvts = new LinkedList<>();
- SynchronizationTask<IgfsSecondaryOutputStreamDescriptor> task =
- new SynchronizationTask<IgfsSecondaryOutputStreamDescriptor>() {
+ SynchronizationTask<IgfsCreateResult> task =
+ new SynchronizationTask<IgfsCreateResult>() {
/** Container for the secondary file system output stream. */
private final T1<OutputStream> outT1 = new T1<>(null);
- @Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath,
+ @Override public IgfsCreateResult onSuccess(Map<IgfsPath,
IgfsEntryInfo> infos) throws Exception {
validTxState(true);
@@ -2034,10 +2004,10 @@ public class IgfsMetaManager extends IgfsManager {
if (evts.isRecordable(EventType.EVT_IGFS_FILE_OPENED_WRITE))
pendingEvts.add(new IgfsEvent(path, locNode, EventType.EVT_IGFS_FILE_OPENED_WRITE));
- return new IgfsSecondaryOutputStreamDescriptor(lockedInfo, outT1.get());
+ return new IgfsCreateResult(lockedInfo, outT1.get());
}
- @Override public IgfsSecondaryOutputStreamDescriptor onFailure(@Nullable Exception err)
+ @Override public IgfsCreateResult onFailure(@Nullable Exception err)
throws IgniteCheckedException {
U.closeQuiet(outT1.get());
@@ -2926,8 +2896,8 @@ public class IgfsMetaManager extends IgfsManager {
}
else {
// Create file and parent folders.
- IgfsPathsCreateResult res =
- createFile(pathIds, lockInfos, dirProps, fileProps, blockSize, affKey, evictExclude);
+ IgfsPathsCreateResult res = createFile(pathIds, lockInfos, dirProps, fileProps, blockSize,
+ affKey, evictExclude, null);
if (res == null)
continue;
@@ -2961,21 +2931,25 @@ public class IgfsMetaManager extends IgfsManager {
* @param affKey Affinity key.
* @param evictExclude Evict exclude flag.
* @param fileProps File properties.
- * @return @return Resulting info.
+ * @param secondaryCtx Secondary file system create context.
+ * @return @return Operation result.
* @throws IgniteCheckedException If failed.
*/
- IgfsEntryInfo create(
+ IgfsCreateResult create(
final IgfsPath path,
Map<String, String> dirProps,
final boolean overwrite,
final int blockSize,
final @Nullable IgniteUuid affKey,
final boolean evictExclude,
- @Nullable Map<String, String> fileProps) throws IgniteCheckedException {
+ @Nullable Map<String, String> fileProps,
+ @Nullable IgfsSecondaryFileSystemCreateContext secondaryCtx) throws IgniteCheckedException {
validTxState(false);
while (true) {
if (busyLock.enterBusy()) {
+ OutputStream secondaryOut = null;
+
try {
// Prepare path IDs.
IgfsPathIds pathIds = pathIds(path);
@@ -3002,6 +2976,9 @@ public class IgfsMetaManager extends IgfsManager {
try (IgniteInternalTx tx = startTx()) {
Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds);
+ if (secondaryCtx != null && isRetryForSecondary(pathIds, lockInfos))
+ continue;
+
if (!pathIds.verifyIntegrity(lockInfos, relaxed))
// Directory structure changed concurrently. So we simply re-try.
continue;
@@ -3024,35 +3001,71 @@ public class IgfsMetaManager extends IgfsManager {
// At this point file can be re-created safely.
- // First step: add existing to trash listing.
+ // Add existing to trash listing.
IgniteUuid oldId = pathIds.lastId();
id2InfoPrj.invoke(trashId, new IgfsMetaDirectoryListingAddProcessor(
IgfsUtils.composeNameForTrash(path, oldId), new IgfsListingEntry(oldInfo)));
- // Second step: replace ID in parent directory.
+ // Replace ID in parent directory.
String name = pathIds.lastPart();
IgniteUuid parentId = pathIds.lastParentId();
id2InfoPrj.invoke(parentId, new IgfsMetaDirectoryListingReplaceProcessor(name, overwriteId));
- // Third step: create the file.
- long createTime = System.currentTimeMillis();
+ // Create the file.
+ IgniteUuid newLockId = createFileLockId(false);
+
+ long newAccessTime;
+ long newModificationTime;
+ Map<String, String> newProps;
+ long newLen;
+ int newBlockSize;
+
+ if (secondaryCtx != null) {
+ secondaryOut = secondaryCtx.create();
+
+ IgfsFile secondaryFile = secondaryCtx.info();
- IgfsEntryInfo newInfo = invokeAndGet(overwriteId, new IgfsMetaFileCreateProcessor(createTime,
- fileProps, blockSize, affKey, createFileLockId(false), evictExclude));
+ if (secondaryFile == null)
+ throw fsException("Failed to open output stream to the file created in " +
+ "the secondary file system because it no longer exists: " + path);
+ else if (secondaryFile.isDirectory())
+ throw fsException("Failed to open output stream to the file created in " +
+ "the secondary file system because the path points to a directory: " + path);
+
+ newAccessTime = secondaryFile.accessTime();
+ newModificationTime = secondaryFile.modificationTime();
+ newProps = secondaryFile.properties();
+ newLen = secondaryFile.length();
+ newBlockSize = secondaryFile.blockSize();
+ }
+ else {
+ newAccessTime = System.currentTimeMillis();
+ newModificationTime = newAccessTime;
+ newProps = fileProps;
+ newLen = 0L;
+ newBlockSize = blockSize;
+ }
+
+ IgfsEntryInfo newInfo = invokeAndGet(overwriteId,
+ new IgfsMetaFileCreateProcessor(newAccessTime, newModificationTime, newProps,
+ newBlockSize, affKey, newLockId, evictExclude, newLen));
// Prepare result and commit.
tx.commit();
IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EventType.EVT_IGFS_FILE_OPENED_WRITE);
- return newInfo;
+ return new IgfsCreateResult(newInfo, secondaryOut);
}
else {
// Create file and parent folders.
- IgfsPathsCreateResult res =
- createFile(pathIds, lockInfos, dirProps, fileProps, blockSize, affKey, evictExclude);
+ if (secondaryCtx != null)
+ secondaryOut = secondaryCtx.create();
+
+ IgfsPathsCreateResult res = createFile(pathIds, lockInfos, dirProps, fileProps, blockSize,
+ affKey, evictExclude, secondaryCtx);
if (res == null)
continue;
@@ -3063,10 +3076,20 @@ public class IgfsMetaManager extends IgfsManager {
// Generate events.
generateCreateEvents(res.createdPaths(), true);
- return res.info();
+ return new IgfsCreateResult(res.info(), secondaryOut);
}
}
}
+ catch (IgniteException | IgniteCheckedException e) {
+ U.closeQuiet(secondaryOut);
+
+ throw e;
+ }
+ catch (Exception e) {
+ U.closeQuiet(secondaryOut);
+
+ throw new IgniteCheckedException("Create failed due to unexpected exception: " + path, e);
+ }
finally {
busyLock.leaveBusy();
}
@@ -3092,7 +3115,7 @@ public class IgfsMetaManager extends IgfsManager {
throw new IgfsParentNotDirectoryException("Failed to create directory (parent " +
"element is not a directory)");
- return createFileOrDirectory(true, pathIds, lockInfos, dirProps, null, 0, null, false);
+ return createFileOrDirectory(true, pathIds, lockInfos, dirProps, null, 0, null, false, null);
}
/**
@@ -3105,22 +3128,25 @@ public class IgfsMetaManager extends IgfsManager {
* @param blockSize Block size.
* @param affKey Affinity key (optional)
* @param evictExclude Evict exclude flag.
+ * @param secondaryCtx Secondary file system create context.
* @return Result or {@code} if the first parent already contained child with the same name.
* @throws IgniteCheckedException If failed.
*/
@Nullable private IgfsPathsCreateResult createFile(IgfsPathIds pathIds, Map<IgniteUuid, IgfsEntryInfo> lockInfos,
Map<String, String> dirProps, Map<String, String> fileProps, int blockSize, @Nullable IgniteUuid affKey,
- boolean evictExclude) throws IgniteCheckedException{
+ boolean evictExclude, @Nullable IgfsSecondaryFileSystemCreateContext secondaryCtx)
+ throws IgniteCheckedException{
// Check if entry we are going to write to is directory.
if (lockInfos.get(pathIds.lastExistingId()).isFile())
throw new IgfsParentNotDirectoryException("Failed to open file for write " +
"(parent element is not a directory): " + pathIds.path());
- return createFileOrDirectory(false, pathIds, lockInfos, dirProps, fileProps, blockSize, affKey, evictExclude);
+ return createFileOrDirectory(false, pathIds, lockInfos, dirProps, fileProps, blockSize, affKey, evictExclude,
+ secondaryCtx);
}
/**
- * Ceate file or directory.
+ * Create file or directory.
*
* @param dir Directory flag.
* @param pathIds Path IDs.
@@ -3130,12 +3156,15 @@ public class IgfsMetaManager extends IgfsManager {
* @param blockSize Block size.
* @param affKey Affinity key.
* @param evictExclude Evict exclude flag.
+ * @param secondaryCtx Secondary file system create context.
* @return Result.
* @throws IgniteCheckedException If failed.
*/
+ @SuppressWarnings("unchecked")
private IgfsPathsCreateResult createFileOrDirectory(boolean dir, IgfsPathIds pathIds,
Map<IgniteUuid, IgfsEntryInfo> lockInfos, Map<String, String> dirProps, Map<String, String> fileProps,
- int blockSize, @Nullable IgniteUuid affKey, boolean evictExclude) throws IgniteCheckedException {
+ int blockSize, @Nullable IgniteUuid affKey, boolean evictExclude,
+ @Nullable IgfsSecondaryFileSystemCreateContext secondaryCtx) throws IgniteCheckedException {
// This is our starting point.
int lastExistingIdx = pathIds.lastExistingIndex();
IgfsEntryInfo lastExistingInfo = lockInfos.get(pathIds.lastExistingId());
@@ -3151,8 +3180,10 @@ public class IgfsMetaManager extends IgfsManager {
if (lastExistingInfo.hasChild(curPart))
return null;
+ Map<IgniteUuid, EntryProcessor> procMap = new HashMap<>();
+
// First step: add new entry to the last existing element.
- id2InfoPrj.invoke(lastExistingInfo.id(), new IgfsMetaDirectoryListingAddProcessor(curPart,
+ procMap.put(lastExistingInfo.id(), new IgfsMetaDirectoryListingAddProcessor(curPart,
new IgfsListingEntry(curId, dir || !pathIds.isLastIndex(curIdx))));
// Events support.
@@ -3161,20 +3192,44 @@ public class IgfsMetaManager extends IgfsManager {
List<IgfsPath> createdPaths = new ArrayList<>(pathIds.count() - curIdx);
// Second step: create middle directories.
- long createTime = System.currentTimeMillis();
+ long curTime = System.currentTimeMillis();
while (curIdx < pathIds.count() - 1) {
+ lastCreatedPath = new IgfsPath(lastCreatedPath, curPart);
+
int nextIdx = curIdx + 1;
String nextPart = pathIds.part(nextIdx);
IgniteUuid nextId = pathIds.surrogateId(nextIdx);
- id2InfoPrj.invoke(curId, new IgfsMetaDirectoryCreateProcessor(createTime, dirProps,
+ long accessTime;
+ long modificationTime;
+ Map<String, String> props;
+
+ if (secondaryCtx != null) {
+ IgfsFile secondaryInfo = secondaryCtx.fileSystem().info(lastCreatedPath);
+
+ if (secondaryInfo == null)
+ throw new IgfsException("Failed to perform operation because secondary file system path was " +
+ "modified concurrnetly: " + lastCreatedPath);
+ else if (secondaryInfo.isFile())
+ throw new IgfsException("Failed to perform operation because secondary file system entity is " +
+ "not directory: " + lastCreatedPath);
+
+ accessTime = secondaryInfo.accessTime();
+ modificationTime = secondaryInfo.modificationTime();
+ props = secondaryInfo.properties();
+ }
+ else {
+ accessTime = curTime;
+ modificationTime = curTime;
+ props = dirProps;
+ }
+
+ procMap.put(curId, new IgfsMetaDirectoryCreateProcessor(accessTime, modificationTime, props,
nextPart, new IgfsListingEntry(nextId, dir || !pathIds.isLastIndex(nextIdx))));
// Save event.
- lastCreatedPath = new IgfsPath(lastCreatedPath, curPart);
-
createdPaths.add(lastCreatedPath);
// Advance things further.
@@ -3185,16 +3240,75 @@ public class IgfsMetaManager extends IgfsManager {
}
// Third step: create leaf.
- IgfsEntryInfo info;
+ if (dir) {
+ long accessTime;
+ long modificationTime;
+ Map<String, String> props;
- if (dir)
- info = invokeAndGet(curId, new IgfsMetaDirectoryCreateProcessor(createTime, dirProps));
- else
- info = invokeAndGet(curId, new IgfsMetaFileCreateProcessor(createTime, fileProps,
- blockSize, affKey, createFileLockId(false), evictExclude));
+ if (secondaryCtx != null) {
+ IgfsFile secondaryInfo = secondaryCtx.fileSystem().info(pathIds.path());
+
+ if (secondaryInfo == null)
+ throw new IgfsException("Failed to perform operation because secondary file system path was " +
+ "modified concurrnetly: " + pathIds.path());
+ else if (secondaryInfo.isFile())
+ throw new IgfsException("Failed to perform operation because secondary file system entity is " +
+ "not directory: " + lastCreatedPath);
+
+ accessTime = secondaryInfo.accessTime();
+ modificationTime = secondaryInfo.modificationTime();
+ props = secondaryInfo.properties();
+ }
+ else {
+ accessTime = curTime;
+ modificationTime = curTime;
+ props = dirProps;
+ }
+
+ procMap.put(curId, new IgfsMetaDirectoryCreateProcessor(accessTime, modificationTime, props));
+ }
+ else {
+ long newAccessTime;
+ long newModificationTime;
+ Map<String, String> newProps;
+ long newLen;
+ int newBlockSize;
+
+ if (secondaryCtx != null) {
+ IgfsFile secondaryFile = secondaryCtx.info();
+
+ if (secondaryFile == null)
+ throw fsException("Failed to open output stream to the file created in " +
+ "the secondary file system because it no longer exists: " + pathIds.path());
+ else if (secondaryFile.isDirectory())
+ throw fsException("Failed to open output stream to the file created in " +
+ "the secondary file system because the path points to a directory: " + pathIds.path());
+
+ newAccessTime = secondaryFile.accessTime();
+ newModificationTime = secondaryFile.modificationTime();
+ newProps = secondaryFile.properties();
+ newLen = secondaryFile.length();
+ newBlockSize = secondaryFile.blockSize();
+ }
+ else {
+ newAccessTime = curTime;
+ newModificationTime = curTime;
+ newProps = fileProps;
+ newLen = 0L;
+ newBlockSize = blockSize;
+ }
+
+ procMap.put(curId, new IgfsMetaFileCreateProcessor(newAccessTime, newModificationTime, newProps,
+ newBlockSize, affKey, createFileLockId(false), evictExclude, newLen));
+ }
createdPaths.add(pathIds.path());
+ // Execute cache operations.
+ Map<Object, EntryProcessorResult> invokeRes = ((IgniteInternalCache)id2InfoPrj).invokeAll(procMap);
+
+ IgfsEntryInfo info = (IgfsEntryInfo)invokeRes.get(curId).get();
+
return new IgfsPathsCreateResult(createdPaths, info);
}
@@ -3253,90 +3367,4 @@ public class IgfsMetaManager extends IgfsManager {
*/
public T onFailure(Exception err) throws IgniteCheckedException;
}
-
- /**
- * Synchronization task to create a file.
- */
- private class CreateFileSynchronizationTask implements SynchronizationTask<IgfsSecondaryOutputStreamDescriptor> {
- /** Secondary file system. */
- private IgfsSecondaryFileSystem fs;
-
- /** Path. */
- private IgfsPath path;
-
- /** Simple create flag. */
- private boolean simpleCreate;
-
- /** Properties. */
- private Map<String, String> props;
-
- /** Overwrite flag. */
- private boolean overwrite;
-
- /** Buffer size. */
- private int bufSize;
-
- /** Replication factor. */
- private short replication;
-
- /** Block size. */
- private long blockSize;
-
- /** Affinity key. */
- private IgniteUuid affKey;
-
- /** Pending events. */
- private Deque<IgfsEvent> pendingEvts;
-
- /** Output stream to the secondary file system. */
- private final T1<OutputStream> outT1 = new T1<>(null);
-
- /**
- * Constructor.
- *
- * @param fs Secondary file system.
- * @param path Path.
- * @param simpleCreate Simple create flag.
- * @param props Properties.
- * @param overwrite Overwrite flag.
- * @param bufSize Buffer size.
- * @param replication Replication factor.
- * @param blockSize Block size.
- * @param affKey Affinity key.
- * @param pendingEvts Pending events.
- */
- public CreateFileSynchronizationTask(IgfsSecondaryFileSystem fs, IgfsPath path, boolean simpleCreate,
- @Nullable Map<String, String> props, boolean overwrite, int bufSize, short replication, long blockSize,
- IgniteUuid affKey, Deque<IgfsEvent> pendingEvts) {
- this.fs = fs;
- this.path = path;
- this.simpleCreate = simpleCreate;
- this.props = props;
- this.overwrite = overwrite;
- this.bufSize = bufSize;
- this.replication = replication;
- this.blockSize = blockSize;
- this.affKey = affKey;
- this.pendingEvts = pendingEvts;
- }
-
- /** {@inheritDoc} */
- @Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath, IgfsEntryInfo> infos)
- throws Exception {
- return onSuccessCreate(fs, path, simpleCreate, props,
- overwrite, bufSize, replication, blockSize, affKey, infos, pendingEvts, outT1);
- }
-
- /** {@inheritDoc} */
- @Override public IgfsSecondaryOutputStreamDescriptor onFailure(Exception err) throws IgniteCheckedException {
- U.closeQuiet(outT1.get());
-
- U.error(log, "File create in DUAL mode failed [path=" + path + ", simpleCreate=" +
- simpleCreate + ", props=" + props + ", overwrite=" + overwrite + ", bufferSize=" +
- bufSize + ", replication=" + replication + ", blockSize=" + blockSize + ']', err);
-
- throw new IgniteCheckedException("Failed to create the file due to secondary file system " +
- "exception: " + path, err);
- }
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/c2f8df96/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemCreateContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemCreateContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemCreateContext.java
new file mode 100644
index 0000000..1c0efd6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemCreateContext.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.igfs.IgfsFile;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.OutputStream;
+import java.util.Map;
+
+/**
+ * Context for secondary file system create request.
+ */
+public class IgfsSecondaryFileSystemCreateContext {
+ /** File system. */
+ private final IgfsSecondaryFileSystem fs;
+
+ /** Path. */
+ private final IgfsPath path;
+
+ /** Overwrite flag. */
+ private final boolean overwrite;
+
+ /** Simple create flag. */
+ private final boolean simpleCreate;
+
+ /** Properties. */
+ private final Map<String, String> props;
+
+ /** Replication. */
+ private final short replication;
+
+ /** Block size. */
+ private final long blockSize;
+
+ /** Buffer size. */
+ private final int bufSize;
+
+ /**
+ * Constructor.
+ *
+ * @param fs File system.
+ * @param path Path.
+ * @param overwrite Overwrite flag.
+ * @param simpleCreate Simple create flag.
+ * @param props Properties.
+ * @param replication Replication.
+ * @param blockSize Block size.
+ * @param bufSize Buffer size.
+ */
+ public IgfsSecondaryFileSystemCreateContext(IgfsSecondaryFileSystem fs, IgfsPath path, boolean overwrite,
+ boolean simpleCreate, @Nullable Map<String, String> props, short replication, long blockSize, int bufSize) {
+ this.fs = fs;
+ this.path = path;
+ this.overwrite = overwrite;
+ this.simpleCreate = simpleCreate;
+ this.props = props;
+ this.replication = replication;
+ this.blockSize = blockSize;
+ this.bufSize = bufSize;
+ }
+
+ /**
+ * Create file in the secondary file system.
+ *
+ * @return Output stream.
+ */
+ public OutputStream create() {
+ return simpleCreate ? fs.create(path, overwrite) :
+ fs.create(path, bufSize, overwrite, replication, blockSize, props);
+ }
+
+ /**
+ * Get file info.
+ *
+ * @return File.
+ */
+ public IgfsFile info() {
+ return fs.info(path);
+ }
+
+ /**
+ * @return Secondary file system.
+ */
+ public IgfsSecondaryFileSystem fileSystem() {
+ return fs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgfsSecondaryFileSystemCreateContext.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c2f8df96/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryOutputStreamDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryOutputStreamDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryOutputStreamDescriptor.java
deleted file mode 100644
index 6bbc2c0..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryOutputStreamDescriptor.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.igfs;
-
-import java.io.OutputStream;
-
-/**
- * Descriptor of an output stream opened to the secondary file system.
- */
-public class IgfsSecondaryOutputStreamDescriptor {
- /** File info in the primary file system. */
- private final IgfsEntryInfo info;
-
- /** Output stream to the secondary file system. */
- private final OutputStream out;
-
- /**
- * Constructor.
- *
- * @param info File info in the primary file system.
- * @param out Output stream to the secondary file system.
- */
- IgfsSecondaryOutputStreamDescriptor(IgfsEntryInfo info, OutputStream out) {
- assert info != null;
- assert out != null;
-
- this.info = info;
- this.out = out;
- }
-
- /**
- * @return File info in the primary file system.
- */
- IgfsEntryInfo info() {
- return info;
- }
-
- /**
- * @return Output stream to the secondary file system.
- */
- OutputStream out() {
- return out;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/c2f8df96/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryCreateProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryCreateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryCreateProcessor.java
index eee9300..b016633 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryCreateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryCreateProcessor.java
@@ -48,8 +48,11 @@ public class IgfsMetaDirectoryCreateProcessor implements EntryProcessor<IgniteUu
/** */
private static final long serialVersionUID = 0L;
- /** Create time. */
- private long createTime;
+ /** Access time. */
+ private long accessTime;
+
+ /** Modification time. */
+ private long modificationTime;
/** Properties. */
private Map<String, String> props;
@@ -70,24 +73,27 @@ public class IgfsMetaDirectoryCreateProcessor implements EntryProcessor<IgniteUu
/**
* Constructor.
*
- * @param createTime Create time.
+ * @param accessTime Create time.
+ * @param modificationTime Modification time.
* @param props Properties.
*/
- public IgfsMetaDirectoryCreateProcessor(long createTime, Map<String, String> props) {
- this(createTime, props, null, null);
+ public IgfsMetaDirectoryCreateProcessor(long accessTime, long modificationTime, Map<String, String> props) {
+ this(accessTime, modificationTime, props, null, null);
}
/**
* Constructor.
*
- * @param createTime Create time.
+ * @param accessTime Create time.
+ * @param modificationTime Modification time.
* @param props Properties.
* @param childName Child name.
* @param childEntry Child entry.
*/
- public IgfsMetaDirectoryCreateProcessor(long createTime, Map<String, String> props, String childName,
- IgfsListingEntry childEntry) {
- this.createTime = createTime;
+ public IgfsMetaDirectoryCreateProcessor(long accessTime, long modificationTime, Map<String, String> props,
+ String childName, IgfsListingEntry childEntry) {
+ this.accessTime = accessTime;
+ this.modificationTime = modificationTime;
this.props = props;
this.childName = childName;
this.childEntry = childEntry;
@@ -101,8 +107,8 @@ public class IgfsMetaDirectoryCreateProcessor implements EntryProcessor<IgniteUu
entry.getKey(),
null,
props,
- createTime,
- createTime
+ accessTime,
+ modificationTime
);
if (childName != null)
@@ -115,7 +121,8 @@ public class IgfsMetaDirectoryCreateProcessor implements EntryProcessor<IgniteUu
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeLong(createTime);
+ out.writeLong(accessTime);
+ out.writeLong(modificationTime);
IgfsUtils.writeProperties(out, props);
@@ -127,7 +134,8 @@ public class IgfsMetaDirectoryCreateProcessor implements EntryProcessor<IgniteUu
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- createTime = in.readLong();
+ accessTime = in.readLong();
+ modificationTime = in.readLong();
props = IgfsUtils.readProperties(in);
@@ -141,7 +149,8 @@ public class IgfsMetaDirectoryCreateProcessor implements EntryProcessor<IgniteUu
@Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
BinaryRawWriter out = writer.rawWriter();
- out.writeLong(createTime);
+ out.writeLong(accessTime);
+ out.writeLong(modificationTime);
IgfsUtils.writeProperties(out, props);
@@ -155,7 +164,8 @@ public class IgfsMetaDirectoryCreateProcessor implements EntryProcessor<IgniteUu
@Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
BinaryRawReader in = reader.rawReader();
- createTime = in.readLong();
+ accessTime = in.readLong();
+ modificationTime = in.readLong();
props = IgfsUtils.readProperties(in);
http://git-wip-us.apache.org/repos/asf/ignite/blob/c2f8df96/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileCreateProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileCreateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileCreateProcessor.java
index 8c4c296..a3e9d48 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileCreateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileCreateProcessor.java
@@ -49,7 +49,10 @@ public class IgfsMetaFileCreateProcessor implements EntryProcessor<IgniteUuid, I
private static final long serialVersionUID = 0L;
/** Create time. */
- private long createTime;
+ private long accessTime;
+
+ /** Modification time. */
+ private long modificationTime;
/** Properties. */
private Map<String, String> props;
@@ -66,6 +69,9 @@ public class IgfsMetaFileCreateProcessor implements EntryProcessor<IgniteUuid, I
/** Evict exclude flag. */
private boolean evictExclude;
+ /** File length. */
+ private long len;
+
/**
* Constructor.
*/
@@ -76,21 +82,25 @@ public class IgfsMetaFileCreateProcessor implements EntryProcessor<IgniteUuid, I
/**
* Constructor.
*
- * @param createTime Create time.
+ * @param accessTime Access time.
+ * @param modificationTime Modification time.
* @param props Properties.
* @param blockSize Block size.
* @param affKey Affinity key.
* @param lockId Lock ID.
* @param evictExclude Evict exclude flag.
+ * @param len File length.
*/
- public IgfsMetaFileCreateProcessor(long createTime, Map<String, String> props, int blockSize,
- @Nullable IgniteUuid affKey, IgniteUuid lockId, boolean evictExclude) {
- this.createTime = createTime;
+ public IgfsMetaFileCreateProcessor(long accessTime, long modificationTime, Map<String, String> props,
+ int blockSize, @Nullable IgniteUuid affKey, IgniteUuid lockId, boolean evictExclude, long len) {
+ this.accessTime = accessTime;
+ this.modificationTime = modificationTime;
this.props = props;
this.blockSize = blockSize;
this.affKey = affKey;
this.lockId = lockId;
this.evictExclude = evictExclude;
+ this.len = len;
}
/** {@inheritDoc} */
@@ -99,13 +109,13 @@ public class IgfsMetaFileCreateProcessor implements EntryProcessor<IgniteUuid, I
IgfsEntryInfo info = IgfsUtils.createFile(
entry.getKey(),
blockSize,
- 0L,
+ len,
affKey,
lockId,
evictExclude,
props,
- createTime,
- createTime
+ accessTime,
+ modificationTime
);
entry.setValue(info);
@@ -115,7 +125,8 @@ public class IgfsMetaFileCreateProcessor implements EntryProcessor<IgniteUuid, I
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeLong(createTime);
+ out.writeLong(accessTime);
+ out.writeLong(modificationTime);
IgfsUtils.writeProperties(out, props);
@@ -123,11 +134,14 @@ public class IgfsMetaFileCreateProcessor implements EntryProcessor<IgniteUuid, I
U.writeGridUuid(out, affKey);
U.writeGridUuid(out, lockId);
out.writeBoolean(evictExclude);
+
+ out.writeLong(len);
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- createTime = in.readLong();
+ accessTime = in.readLong();
+ modificationTime = in.readLong();
props = IgfsUtils.readProperties(in);
@@ -135,13 +149,16 @@ public class IgfsMetaFileCreateProcessor implements EntryProcessor<IgniteUuid, I
affKey = U.readGridUuid(in);
lockId = U.readGridUuid(in);
evictExclude = in.readBoolean();
+
+ len = in.readLong();
}
/** {@inheritDoc} */
@Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
BinaryRawWriter out = writer.rawWriter();
- out.writeLong(createTime);
+ out.writeLong(accessTime);
+ out.writeLong(modificationTime);
IgfsUtils.writeProperties(out, props);
@@ -149,13 +166,16 @@ public class IgfsMetaFileCreateProcessor implements EntryProcessor<IgniteUuid, I
BinaryUtils.writeIgniteUuid(out, affKey);
BinaryUtils.writeIgniteUuid(out, lockId);
out.writeBoolean(evictExclude);
+
+ out.writeLong(len);
}
/** {@inheritDoc} */
@Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
BinaryRawReader in = reader.rawReader();
- createTime = in.readLong();
+ accessTime = in.readLong();
+ modificationTime = in.readLong();
props = IgfsUtils.readProperties(in);
@@ -163,6 +183,8 @@ public class IgfsMetaFileCreateProcessor implements EntryProcessor<IgniteUuid, I
affKey = BinaryUtils.readIgniteUuid(in);
lockId = BinaryUtils.readIgniteUuid(in);
evictExclude = in.readBoolean();
+
+ len = in.readLong();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/c2f8df96/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
index 8b88157..6053d3b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
@@ -142,7 +142,7 @@ public class IgfsMetaManagerSelfTest extends IgfsCommonAbstractTest {
assertEmpty(mgr.directoryListing(ROOT_ID));
assertTrue(mgr.mkdirs(new IgfsPath("/dir"), IgfsImpl.DFLT_DIR_META));
- assertNotNull(mgr.create(new IgfsPath("/file"), null, false, 400, null, false, null));
+ assertNotNull(mgr.create(new IgfsPath("/file"), null, false, 400, null, false, null, null));
IgfsListingEntry dirEntry = mgr.directoryListing(ROOT_ID).get("dir");
assertNotNull(dirEntry);
@@ -214,7 +214,7 @@ public class IgfsMetaManagerSelfTest extends IgfsCommonAbstractTest {
private IgfsEntryInfo createFileAndGetInfo(String path) throws IgniteCheckedException {
IgfsPath p = path(path);
- IgfsEntryInfo res = mgr.create(p, null, false, 400, null, false, null);
+ IgfsEntryInfo res = mgr.create(p, null, false, 400, null, false, null, null).info();
assert res != null;
assert !res.isDirectory();
[02/13] ignite git commit: Merge remote-tracking branch
'upstream/gridgain-7.6.1' into gridgain-7.6.1
Posted by av...@apache.org.
Merge remote-tracking branch 'upstream/gridgain-7.6.1' into gridgain-7.6.1
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/51814fd8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/51814fd8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/51814fd8
Branch: refs/heads/master
Commit: 51814fd816f1f3c4bf72c91399c03db7b1358ed1
Parents: dc1d088 5c14928
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Jun 15 16:25:19 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Jun 15 16:25:19 2016 +0300
----------------------------------------------------------------------
modules/cassandra/pom.xml | 2 +-
.../cassandra/bean/CassandraLifeCycleBean.java | 149 --------
.../ignite/tests/utils/CassandraHelper.java | 2 +-
.../tests/utils/CassandraLifeCycleBean.java | 149 ++++++++
.../affinity/fair/FairAffinityFunction.java | 2 +
.../rendezvous/RendezvousAffinityFunction.java | 2 +
.../configuration/CacheConfiguration.java | 3 +
.../ignite/internal/IgniteServicesImpl.java | 2 +-
.../ignite/internal/MarshallerContextImpl.java | 51 ++-
.../ignite/internal/binary/BinaryContext.java | 43 ++-
.../GridClientConnectionManagerAdapter.java | 25 +-
.../connection/GridClientNioTcpConnection.java | 3 +
.../GridClientOptimizedMarshaller.java | 4 +-
.../GridClientZipOptimizedMarshaller.java | 167 ++++++++
.../impl/GridTcpRouterNioListenerAdapter.java | 11 +-
.../discovery/GridDiscoveryManager.java | 23 +-
.../processors/cache/GridCacheAdapter.java | 46 ++-
.../processors/cache/GridCacheEntryEx.java | 9 +-
.../processors/cache/GridCacheMapEntry.java | 100 +++--
.../processors/cache/GridCacheProcessor.java | 28 +-
.../processors/cache/GridCacheUtils.java | 3 +
.../binary/CacheObjectBinaryProcessorImpl.java | 29 +-
.../distributed/dht/GridDhtCacheAdapter.java | 10 +-
.../distributed/dht/GridDhtLockFuture.java | 18 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 2 -
.../distributed/dht/GridDhtTxPrepareFuture.java | 19 +-
.../dht/GridPartitionedGetFuture.java | 2 -
.../dht/GridPartitionedSingleGetFuture.java | 2 -
.../dht/atomic/GridDhtAtomicCache.java | 8 -
.../dht/colocated/GridDhtColocatedCache.java | 4 +-
.../dht/preloader/GridDhtForceKeysFuture.java | 3 +-
.../dht/preloader/GridDhtPartitionDemander.java | 3 +-
.../dht/preloader/GridDhtPartitionMap2.java | 7 +-
.../distributed/near/GridNearGetFuture.java | 4 -
.../cache/distributed/near/GridNearTxLocal.java | 7 +-
.../local/atomic/GridLocalAtomicCache.java | 8 -
.../continuous/CacheContinuousQueryEntry.java | 16 +
.../continuous/CacheContinuousQueryHandler.java | 2 +-
.../continuous/CacheContinuousQueryManager.java | 135 +++++++
.../cache/transactions/IgniteTxAdapter.java | 2 -
.../cache/transactions/IgniteTxEntry.java | 24 +-
.../cache/transactions/IgniteTxHandler.java | 46 +++
.../transactions/IgniteTxLocalAdapter.java | 34 +-
.../cacheobject/IgniteCacheObjectProcessor.java | 7 +
.../IgniteCacheObjectProcessorImpl.java | 5 +
.../continuous/GridContinuousProcessor.java | 216 ++++++++---
.../datastreamer/DataStreamerImpl.java | 3 +-
.../message/GridClientHandshakeRequest.java | 4 +-
.../protocols/tcp/GridTcpRestNioListener.java | 19 +-
.../rest/protocols/tcp/GridTcpRestProtocol.java | 12 +-
.../service/GridServiceProcessor.java | 307 ++++++++-------
.../ignite/internal/util/nio/GridNioServer.java | 10 +-
.../ignite/internal/visor/cache/VisorCache.java | 56 +--
.../visor/cache/VisorCachePartition.java | 89 +++++
.../visor/cache/VisorCachePartitions.java | 88 +++++
.../visor/cache/VisorCachePartitionsTask.java | 152 ++++++++
.../internal/visor/cache/VisorCacheV3.java | 68 +---
.../ignite/spi/discovery/tcp/ServerImpl.java | 4 +-
.../ipfinder/jdbc/BasicJdbcIpFinderDialect.java | 28 ++
.../tcp/ipfinder/jdbc/JdbcIpFinderDialect.java | 28 ++
.../jdbc/OracleJdbcIpFinderDialect.java | 28 ++
.../ipfinder/jdbc/TcpDiscoveryJdbcIpFinder.java | 72 ++--
.../internal/ClusterNodeMetricsSelfTest.java | 101 ++++-
...eClientReconnectContinuousProcessorTest.java | 60 ++-
.../ignite/internal/binary/AffinityKey.java | 69 ++++
.../binary/GridBinaryAffinityKeySelfTest.java | 15 +
...aultBinaryMappersBinaryMetaDataSelfTest.java | 17 +
.../processors/cache/GridCacheTestEntryEx.java | 5 +-
.../cache/IgniteCacheAbstractTest.java | 2 +-
...gniteCacheInvokeReadThroughAbstractTest.java | 382 +++++++++++++++++++
...iteCacheInvokeReadThroughSingleNodeTest.java | 106 +++++
.../cache/IgniteCacheInvokeReadThroughTest.java | 182 +++++----
.../IgniteCacheReadThroughStoreCallTest.java | 288 ++++++++++++++
.../IgniteCacheSyncRebalanceModeSelfTest.java | 114 ++++++
.../IgniteCacheLoaderWriterAbstractTest.java | 10 +
...ridCacheContinuousQueryAbstractSelfTest.java | 2 +-
...niteCacheContinuousQueryBackupQueueTest.java | 135 +++++++
.../IgniteNoCustomEventsOnNodeStart.java | 85 +++++
.../service/GridServiceClientNodeTest.java | 102 ++++-
.../unsafe/GridOffheapSnapTreeSelfTest.java | 2 +-
.../testsuites/IgniteCacheTestSuite2.java | 3 +
.../testsuites/IgniteCacheTestSuite4.java | 4 +
.../testsuites/IgniteCacheTestSuite5.java | 2 +
.../query/h2/opt/GridH2AbstractKeyValueRow.java | 23 +-
.../query/h2/opt/GridH2KeyValueRowOffheap.java | 17 +-
.../processors/query/h2/sql/GridSqlConst.java | 5 +
.../processors/query/h2/sql/GridSqlJoin.java | 17 +-
.../processors/query/h2/sql/GridSqlType.java | 5 +
.../cache/IgniteCacheOffheapIndexScanTest.java | 195 ++++++++++
.../query/IgniteSqlSplitterSelfTest.java | 75 ++++
.../IgniteCacheQuerySelfTestSuite.java | 2 +
.../IgniteCacheQuerySelfTestSuite3.java | 2 +
.../org/apache/ignite/spark/IgniteContext.scala | 4 +-
.../org/apache/ignite/spark/IgniteRDD.scala | 25 +-
.../apache/ignite/spark/JavaIgniteContext.scala | 4 +-
.../org/apache/ignite/spark/JavaIgniteRDD.scala | 2 +
.../ignite/spark/impl/IgniteAbstractRDD.scala | 15 +-
.../apache/ignite/spark/impl/IgniteSqlRDD.scala | 5 +-
.../spark/impl/JavaIgniteAbstractRDD.scala | 34 --
.../org/apache/ignite/spark/IgniteRDDSpec.scala | 25 +-
.../cache/websession/WebSessionFilter.java | 39 +-
.../internal/websession/WebSessionSelfTest.java | 330 +++++++++++++++-
102 files changed, 4030 insertions(+), 885 deletions(-)
----------------------------------------------------------------------
[11/13] ignite git commit: IGNITE-3216 Need to deduplicate addresses
registered in the IP finder
Posted by av...@apache.org.
IGNITE-3216 Need to deduplicate addresses registered in the IP finder
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/40d41c10
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/40d41c10
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/40d41c10
Branch: refs/heads/master
Commit: 40d41c10e3179436bc1ae1e12bdc2d6e3a7208f0
Parents: b5f4aac
Author: Anton Vinogradov <av...@apache.org>
Authored: Thu Jun 16 10:25:38 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Thu Jun 16 14:23:05 2016 +0300
----------------------------------------------------------------------
.../ignite/internal/util/IgniteUtils.java | 8 +--
.../ignite/internal/util/lang/GridFunc.java | 66 --------------------
.../internal/util/IgniteUtilsSelfTest.java | 15 +++++
3 files changed, 19 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/40d41c10/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 1147124..2a83ad4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -8581,7 +8581,7 @@ public abstract class IgniteUtils {
*/
public static Collection<InetAddress> toInetAddresses(Collection<String> addrs,
Collection<String> hostNames) throws IgniteCheckedException {
- List<InetAddress> res = new ArrayList<>(addrs.size());
+ Set<InetAddress> res = new HashSet<>(addrs.size());
Iterator<String> hostNamesIt = hostNames.iterator();
@@ -8614,7 +8614,7 @@ public abstract class IgniteUtils {
throw new IgniteCheckedException("Addresses can not be resolved [addr=" + addrs +
", hostNames=" + hostNames + ']');
- return Collections.unmodifiableList(res);
+ return res;
}
/**
@@ -8640,7 +8640,7 @@ public abstract class IgniteUtils {
*/
public static Collection<InetSocketAddress> toSocketAddresses(Collection<String> addrs,
Collection<String> hostNames, int port) {
- List<InetSocketAddress> res = new ArrayList<>(addrs.size());
+ Set<InetSocketAddress> res = new HashSet<>(addrs.size());
Iterator<String> hostNamesIt = hostNames.iterator();
@@ -8661,7 +8661,7 @@ public abstract class IgniteUtils {
res.add(new InetSocketAddress(addr, port));
}
- return Collections.unmodifiableList(res);
+ return res;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/40d41c10/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index ab31625..6ad136c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -1715,72 +1715,6 @@ public class GridFunc {
}
/**
- * Creates read-only light-weight view on given list with provided transformation.
- * Resulting list will only "have" {@code transformed} elements. Note that only wrapping
- * list will be created and no duplication of data will occur.
- *
- * @param c Input list that serves as a base for the view.
- * @param trans Transformation closure.
- * @param <T1> Type of the list.
- * @return Light-weight view on given list with provided transformation.
- */
- @SuppressWarnings("RedundantTypeArguments")
- @Deprecated
- public static <T1, T2> List<T2> viewListReadOnly(@Nullable final List<? extends T1> c,
- final IgniteClosure<? super T1, T2> trans) {
- A.notNull(trans, "trans");
-
- if (isEmpty(c))
- return Collections.emptyList();
-
- assert c != null;
-
- return new GridSerializableList<T2>() {
- /** */
- private static final long serialVersionUID = 3126625219739967068L;
-
- @Override public T2 get(int idx) {
- return trans.apply(c.get(idx));
- }
-
- @NotNull
- @Override public Iterator<T2> iterator() {
- return F.<T1, T2>iterator(c, trans, true);
- }
-
- @Override public int size() {
- return c.size();
- }
-
- @Override public boolean isEmpty() {
- return c.isEmpty();
- }
- };
- }
-
- /**
- * Creates a view on given list with provided transformer and predicates.
- * Resulting list will only "have" elements for which all provided predicates, if any,
- * evaluate to {@code true}. Note that a new collection will be created and data will
- * be copied.
- *
- * @param c Input list that serves as a base for the view.
- * @param trans Transforming closure from T1 to T2.
- * @param p Optional predicates. If predicates are not provided - all elements will be in the view.
- * @return View on given list with provided predicate.
- */
- @Deprecated
- public static <T1, T2> List<T2> transformList(Collection<? extends T1> c,
- IgniteClosure<? super T1, T2> trans, @Nullable IgnitePredicate<? super T1>... p) {
- A.notNull(c, "c", trans, "trans");
-
- if (isAlwaysFalse(p))
- return Collections.emptyList();
-
- return new ArrayList<>(transform(retain(c, true, p), trans));
- }
-
- /**
* Creates light-weight view on given map with provided predicates. Resulting map will
* only "have" keys for which all provided predicates, if any, evaluates to {@code true}.
* Note that only wrapping map will be created and no duplication of data will occur.
http://git-wip-us.apache.org/repos/asf/ignite/blob/40d41c10/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
index 520fa76..d774065 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
@@ -742,6 +742,21 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest {
}
/**
+ *
+ */
+ public void testToSocketAddressesNoDuplicates() {
+ Collection<String> addrs = new ArrayList<>();
+
+ addrs.add("127.0.0.1");
+ addrs.add("localhost");
+
+ Collection<String> hostNames = new ArrayList<>();
+ int port = 1234;
+
+ assertEquals(1, U.toSocketAddresses(addrs, hostNames, port).size());
+ }
+
+ /**
* Test enum.
*/
private enum TestEnum {