You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2016/06/16 11:31:08 UTC

[01/13] ignite git commit: IGNITE-3287: IGFS Delete in PRIMARY/DUAL modes goes through the same path.

Repository: ignite
Updated Branches:
  refs/heads/master 9d46a73f2 -> ca546b3d4


IGNITE-3287: IGFS Delete in PRIMARY/DUAL modes goes through the same path.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dc1d0885
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dc1d0885
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dc1d0885

Branch: refs/heads/master
Commit: dc1d08856e803ee8b689b2e5aad25c767e4c2015
Parents: 7761e5f
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Jun 10 11:05:48 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Jun 15 16:24:56 2016 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsDeleteResult.java       |  62 +++++
 .../internal/processors/igfs/IgfsImpl.java      |  30 +--
 .../processors/igfs/IgfsMetaManager.java        | 263 +++++++------------
 .../internal/processors/igfs/IgfsPathIds.java   |  10 +
 .../processors/igfs/IgfsAbstractSelfTest.java   |  14 +-
 .../igfs/IgfsMetaManagerSelfTest.java           |  20 +-
 6 files changed, 177 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/dc1d0885/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteResult.java
new file mode 100644
index 0000000..ff42762
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteResult.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Result of deletion in IGFS.
+ */
+public class IgfsDeleteResult {
+    /** Success flag. */
+    private final boolean success;
+
+    /** Entry info. */
+    private final IgfsEntryInfo info;
+
+    /**
+     * Constructor.
+     *
+     * @param success Success flag.
+     * @param info Entry info.
+     */
+    public IgfsDeleteResult(boolean success, @Nullable IgfsEntryInfo info) {
+        this.success = success;
+        this.info = info;
+    }
+
+    /**
+     * @return Success flag.
+     */
+    public boolean success() {
+        return success;
+    }
+
+    /**
+     * @return Entry info.
+     */
+    public IgfsEntryInfo info() {
+        return info;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsDeleteResult.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc1d0885/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index eabec7c..e6f2ff6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -104,7 +104,6 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_DELETED;
 import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_READ;
-import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_WRITE;
 import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_DELETED;
 import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_READ;
 import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_WRITE;
@@ -723,38 +722,21 @@ public final class IgfsImpl implements IgfsEx {
                 if (IgfsPath.SLASH.equals(path.toString()))
                     return false;
 
-                IgfsMode mode = resolveMode(path);
-
                 Set<IgfsMode> childrenModes = modeRslvr.resolveChildrenModes(path);
 
-                boolean res = false;
-
-                FileDescriptor desc = getFileDescriptor(path);
-
-                if (childrenModes.contains(PRIMARY)) {
-                    if (desc != null) {
-                        IgniteUuid deletedId = meta.softDelete(path, recursive);
-
-                        res = deletedId != null;
-                    }
-                    else if (mode == PRIMARY)
-                        checkConflictWithPrimary(path);
-                }
-
-                if (childrenModes.contains(DUAL_SYNC) || childrenModes.contains(DUAL_ASYNC)) {
-                    assert secondaryFs != null;
+                boolean dual = childrenModes.contains(DUAL_SYNC) ||childrenModes.contains(DUAL_ASYNC);
 
+                if (dual)
                     await(path);
 
-                    res |= meta.deleteDual(secondaryFs, path, recursive);
-                }
+                IgfsDeleteResult res = meta.softDelete(path, recursive, dual ? secondaryFs : null);
 
                 // Record event if needed.
-                if (res && desc != null)
+                if (res.success() && res.info() != null)
                     IgfsUtils.sendEvents(igfsCtx.kernalContext(), path,
-                            desc.isFile ? EVT_IGFS_FILE_DELETED : EVT_IGFS_DIR_DELETED);
+                        res.info().isFile() ? EVT_IGFS_FILE_DELETED : EVT_IGFS_DIR_DELETED);
 
-                return res;
+                return res.success();
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc1d0885/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 04e2139..4cdf6a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -1151,152 +1151,130 @@ public class IgfsMetaManager extends IgfsManager {
      *
      * @param path Path.
      * @param recursive Recursive flag.
+     * @param secondaryFs Secondary file system (optional).
      * @return ID of an entry located directly under the trash directory.
      * @throws IgniteCheckedException If failed.
      */
-    IgniteUuid softDelete(final IgfsPath path, final boolean recursive) throws IgniteCheckedException {
-        if (busyLock.enterBusy()) {
-            try {
-                validTxState(false);
-
-                IgfsPathIds pathIds = pathIds(path);
-
-                // Continue only if the whole path present.
-                if (!pathIds.allExists())
-                    return null; // A fragment of the path no longer exists.
-
-                IgniteUuid victimId = pathIds.lastId();
-                String victimName = pathIds.lastPart();
-
-                if (IgfsUtils.isRootId(victimId))
-                    throw new IgfsException("Cannot remove root directory");
-
-                // Prepare IDs to lock.
-                SortedSet<IgniteUuid> allIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR);
-
-                pathIds.addExistingIds(allIds, relaxed);
+    IgfsDeleteResult softDelete(final IgfsPath path, final boolean recursive, @Nullable IgfsSecondaryFileSystem secondaryFs)
+        throws IgniteCheckedException {
+        while (true) {
+            if (busyLock.enterBusy()) {
+                try {
+                    validTxState(false);
 
-                IgniteUuid trashId = IgfsUtils.randomTrashId();
+                    IgfsPathIds pathIds = pathIds(path);
 
-                allIds.add(trashId);
+                    boolean relaxed0 = relaxed;
 
-                try (IgniteInternalTx tx = startTx()) {
-                    // Lock participants.
-                    Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(allIds);
+                    if (!pathIds.allExists()) {
+                        if (secondaryFs == null)
+                            // Return early if target path doesn't exist and we do not have secondary file system.
+                            return new IgfsDeleteResult(false, null);
+                        else
+                            // If path is missing partially, we need more serious locking for DUAL mode.
+                            relaxed0 = false;
+                    }
 
-                    // Ensure that all participants are still in place.
-                    if (!pathIds.verifyIntegrity(lockInfos, relaxed))
-                        return null;
+                    IgniteUuid victimId = pathIds.lastId();
+                    String victimName = pathIds.lastPart();
 
-                    IgfsEntryInfo victimInfo = lockInfos.get(victimId);
+                    if (IgfsUtils.isRootId(victimId))
+                        throw new IgfsException("Cannot remove root directory");
 
-                    // Cannot delete non-empty directory if recursive flag is not set.
-                    if (!recursive && victimInfo.hasChildren())
-                        throw new IgfsDirectoryNotEmptyException("Failed to remove directory (directory is not " +
-                            "empty and recursive flag is not set).");
+                    // Prepare IDs to lock.
+                    SortedSet<IgniteUuid> allIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR);
 
-                    // Prepare trash data.
-                    IgfsEntryInfo trashInfo = lockInfos.get(trashId);
+                    pathIds.addExistingIds(allIds, relaxed0);
 
-                    final String trashName = IgfsUtils.composeNameForTrash(path, victimId);
+                    IgniteUuid trashId = IgfsUtils.randomTrashId();
 
-                    assert !trashInfo.hasChild(trashName) : "Failed to add file name into the " +
-                        "destination directory (file already exists) [destName=" + trashName + ']';
+                    allIds.add(trashId);
 
-                    IgniteUuid parentId = pathIds.lastParentId();
-                    IgfsEntryInfo parentInfo = lockInfos.get(parentId);
+                    try (IgniteInternalTx tx = startTx()) {
+                        // Lock participants.
+                        Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(allIds);
 
-                    transferEntry(parentInfo.listing().get(victimName), parentId, victimName, trashId, trashName);
+                        if (!pathIds.allExists()) {
+                            // We need to ensure that the last locked info is not linked with expected child.
+                            // Otherwise there was some ocncurrent file system update and we have to re-try.
+                            assert secondaryFs != null;
 
-                    tx.commit();
+                            // Find the last locked index
+                            IgfsEntryInfo lastLockedInfo = null;
+                            int lastLockedIdx = -1;
 
-                    signalDeleteWorker();
+                            while (lastLockedIdx < pathIds.lastExistingIndex()) {
+                                IgfsEntryInfo nextInfo = lockInfos.get(pathIds.id(lastLockedIdx + 1));
 
-                    return victimId;
-                }
-            }
-            finally {
-                busyLock.leaveBusy();
-            }
-        }
-        else
-            throw new IllegalStateException("Failed to perform soft delete because Grid is " +
-                "stopping [path=" + path + ']');
-    }
+                                if (nextInfo != null) {
+                                    lastLockedInfo = nextInfo;
+                                    lastLockedIdx++;
+                                }
+                                else
+                                    break;
+                            }
 
-    /**
-     * Move path to the trash directory in existing transaction.
-     *
-     * @param parentId Parent ID.
-     * @param path Path name.
-     * @param id Path ID.
-     * @param trashId Trash folder ID.
-     * @return ID of an entry located directly under the trash directory.
-     * @throws IgniteCheckedException If failed.
-     */
-    @SuppressWarnings("RedundantCast")
-    @Nullable private IgniteUuid softDeleteNonTx(@Nullable IgniteUuid parentId, IgfsPath path, IgniteUuid id,
-        IgniteUuid trashId) throws IgniteCheckedException {
-        validTxState(true);
+                            assert lastLockedIdx < pathIds.count();
 
-        IgniteUuid resId;
+                            if (lastLockedInfo != null) {
+                                String part = pathIds.part(lastLockedIdx + 1);
 
-        if (parentId == null) {
-            // Handle special case when we deleting root directory.
-            assert IgfsUtils.ROOT_ID.equals(id);
+                                if (lastLockedInfo.listing().containsKey(part))
+                                    continue;
+                            }
+                        }
 
-            IgfsEntryInfo rootInfo = getInfo(IgfsUtils.ROOT_ID);
+                        // Ensure that all participants are still in place.
+                        if (!pathIds.allExists() || !pathIds.verifyIntegrity(lockInfos, relaxed0)) {
+                            // For DUAL mode we will try to update the underlying FS still. Note we do that inside TX.
+                            if (secondaryFs != null) {
+                                boolean res = secondaryFs.delete(path, recursive);
 
-            if (rootInfo == null)
-                return null; // Root was never created.
+                                return new IgfsDeleteResult(res, null);
+                            }
+                            else
+                                return new IgfsDeleteResult(false, null);
+                        }
 
-            // Ensure trash directory existence.
-            createSystemDirectoryIfAbsent(trashId);
+                        IgfsEntryInfo victimInfo = lockInfos.get(victimId);
 
-            Map<String, IgfsListingEntry> rootListing = rootInfo.listing();
+                        // Cannot delete non-empty directory if recursive flag is not set.
+                        if (!recursive && victimInfo.hasChildren())
+                            throw new IgfsDirectoryNotEmptyException("Failed to remove directory (directory is not " +
+                                "empty and recursive flag is not set).");
 
-            if (!rootListing.isEmpty()) {
-                IgniteUuid[] lockIds = new IgniteUuid[rootInfo.listing().size()];
+                    // Prepare trash data.
+                    IgfsEntryInfo trashInfo = lockInfos.get(trashId);
 
-                int i = 0;
+                    final String trashName = IgfsUtils.composeNameForTrash(path, victimId);
 
-                for (IgfsListingEntry entry : rootInfo.listing().values())
-                    lockIds[i++] = entry.fileId();
+                        assert !trashInfo.hasChild(trashName) : "Failed to add file name into the " +
+                            "destination directory (file already exists) [destName=" + trashName + ']';
 
-                // Lock children IDs in correct order.
-                lockIds(lockIds);
+                        IgniteUuid parentId = pathIds.lastParentId();
+                        IgfsEntryInfo parentInfo = lockInfos.get(parentId);
 
-                // Construct new info and move locked entries from root to it.
-                Map<String, IgfsListingEntry> transferListing = new HashMap<>(rootListing);
+                        // Propagate call to the secondary file system.
+                        if (secondaryFs != null && !secondaryFs.delete(path, recursive))
+                            return new IgfsDeleteResult(false, null);
 
-                IgfsEntryInfo newInfo = IgfsUtils.createDirectory(
-                    IgniteUuid.randomUuid(),
-                    transferListing,
-                    (Map<String,String>)null
-                );
+                        transferEntry(parentInfo.listing().get(victimName), parentId, victimName, trashId, trashName);
 
-                createNewEntry(newInfo, trashId, newInfo.id().toString());
+                        tx.commit();
 
-                // Remove listing entries from root.
-                for (Map.Entry<String, IgfsListingEntry> entry : transferListing.entrySet())
-                    id2InfoPrj.invoke(IgfsUtils.ROOT_ID,
-                        new IgfsMetaDirectoryListingRemoveProcessor(entry.getKey(), entry.getValue().fileId()));
+                        signalDeleteWorker();
 
-                resId = newInfo.id();
+                        return new IgfsDeleteResult(true, victimInfo);
+                    }
+                }
+                finally {
+                    busyLock.leaveBusy();
+                }
             }
             else
-                resId = null;
+                throw new IllegalStateException("Failed to perform soft delete because Grid is " +
+                    "stopping [path=" + path + ']');
         }
-        else {
-            // Ensure trash directory existence.
-            createSystemDirectoryIfAbsent(trashId);
-
-            moveNonTx(id, path.name(), parentId, IgfsUtils.composeNameForTrash(path, id), trashId);
-
-            resId = id;
-        }
-
-        return resId;
     }
 
     /**
@@ -2413,71 +2391,6 @@ public class IgfsMetaManager extends IgfsManager {
     }
 
     /**
-     * Delete path in DUAL mode.
-     *
-     * @param fs Secondary file system.
-     * @param path Path to update.
-     * @param recursive Recursive flag.
-     * @return Operation result.
-     * @throws IgniteCheckedException If delete failed.
-     */
-    public boolean deleteDual(final IgfsSecondaryFileSystem fs, final IgfsPath path, final boolean recursive)
-        throws IgniteCheckedException {
-        if (busyLock.enterBusy()) {
-            try {
-                assert fs != null;
-                assert path != null;
-
-                final IgniteUuid trashId = IgfsUtils.randomTrashId();
-
-                SynchronizationTask<Boolean> task = new SynchronizationTask<Boolean>() {
-                    @Override public Boolean onSuccess(Map<IgfsPath, IgfsEntryInfo> infos) throws Exception {
-                        IgfsEntryInfo info = infos.get(path);
-
-                        if (info == null)
-                            return false; // File doesn't exist in the secondary file system.
-
-                        if (!fs.delete(path, recursive))
-                            return false; // Delete failed remotely.
-
-                        if (path.parent() != null) {
-                            assert infos.containsKey(path.parent());
-
-                            softDeleteNonTx(infos.get(path.parent()).id(), path, info.id(), trashId);
-                        }
-                        else {
-                            assert IgfsUtils.ROOT_ID.equals(info.id());
-
-                            softDeleteNonTx(null, path, info.id(), trashId);
-                        }
-
-                        return true; // No additional handling is required.
-                    }
-
-                    @Override public Boolean onFailure(@Nullable Exception err) throws IgniteCheckedException {
-                        U.error(log, "Path delete in DUAL mode failed [path=" + path + ", recursive=" + recursive + ']',
-                            err);
-
-                        throw new IgniteCheckedException("Failed to delete the path due to secondary file system " +
-                            "exception: ", err);
-                    }
-                };
-
-                Boolean res = synchronizeAndExecute(task, fs, false, Collections.singleton(trashId), path);
-
-                signalDeleteWorker();
-
-                return res;
-            }
-            finally {
-                busyLock.leaveBusy();
-            }
-        }
-        else
-            throw new IllegalStateException("Failed to delete in DUAL mode because Grid is stopping: " + path);
-    }
-
-    /**
      * Update path in DUAL mode.
      *
      * @param fs Secondary file system.

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc1d0885/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java
index 446495e..b710ba2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java
@@ -256,6 +256,16 @@ public class IgfsPathIds {
     }
 
     /**
+     * Get ID at the give index.
+     *
+     * @param idx Index.
+     * @return ID.
+     */
+    public IgniteUuid id(int idx) {
+        return idx <= lastExistingIdx ? ids[idx] : surrogateId(idx);
+    }
+
+    /**
      * Get surrogate ID at the given index.
      *
      * @param idx Index.

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc1d0885/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
index a0bef0f..c1c6c6c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
@@ -2908,11 +2908,8 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
      * @throws IgniteCheckedException If failed.
      */
     protected static void checkExist(IgfsImpl igfs, IgfsPath... paths) throws IgniteCheckedException {
-        for (IgfsPath path : paths) {
-            assert igfs.context().meta().fileId(path) != null : "Path doesn't exist [igfs=" + igfs.name() +
-                ", path=" + path + ']';
+        for (IgfsPath path : paths)
             assert igfs.exists(path) : "Path doesn't exist [igfs=" + igfs.name() + ", path=" + path + ']';
-        }
     }
 
     /**
@@ -2961,11 +2958,8 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
      * @throws Exception If failed.
      */
     protected void checkNotExist(IgfsImpl igfs, IgfsPath... paths) throws Exception {
-        for (IgfsPath path : paths) {
-            assert igfs.context().meta().fileId(path) == null : "Path exists [igfs=" + igfs.name() + ", path=" +
-                path + ']';
+        for (IgfsPath path : paths)
             assert !igfs.exists(path) : "Path exists [igfs=" + igfs.name() + ", path=" + path + ']';
-        }
     }
 
     /**
@@ -2977,10 +2971,10 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
      */
     protected void checkNotExist(UniversalFileSystemAdapter uni, IgfsPath... paths) throws Exception {
         IgfsEx ex = uni.unwrap(IgfsEx.class);
+
         for (IgfsPath path : paths) {
             if (ex != null)
-                assert ex.context().meta().fileId(path) == null : "Path exists [igfs=" + ex.name() + ", path=" +
-                    path + ']';
+                assert !ex.exists(path) : "Path exists [igfs=" + ex.name() + ", path=" + path + ']';
 
             assert !uni.exists(path.toString()) : "Path exists [igfs=" + uni.name() + ", path=" + path + ']';
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/dc1d0885/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
index 039bf8d..8b88157 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
@@ -191,8 +191,8 @@ public class IgfsMetaManagerSelfTest extends IgfsCommonAbstractTest {
             assertNull("Unexpected stored properties: " + info, info.properties().get(key2));
         }
 
-        mgr.softDelete(new IgfsPath("/dir"), true);
-        mgr.softDelete(new IgfsPath("/file"), false);
+        mgr.softDelete(new IgfsPath("/dir"), true, null);
+        mgr.softDelete(new IgfsPath("/file"), false, null);
 
         assertNull(mgr.updateProperties(dir.id(), F.asMap("p", "7")));
     }
@@ -328,9 +328,7 @@ public class IgfsMetaManagerSelfTest extends IgfsCommonAbstractTest {
 
         mgr.move(path("/a2"), path("/a"));
 
-        IgniteUuid del = mgr.softDelete(path("/a/b/f3"), false);
-
-        assertEquals(f3.id(), del);
+        mgr.softDelete(path("/a/b/f3"), false, null);
 
         assertEquals(F.asMap("a", new IgfsListingEntry(a), "f1", new IgfsListingEntry(f1)),
                 mgr.directoryListing(ROOT_ID));
@@ -341,8 +339,7 @@ public class IgfsMetaManagerSelfTest extends IgfsCommonAbstractTest {
         assertEmpty(mgr.directoryListing(b.id()));
 
         //assertEquals(b, mgr.removeIfEmpty(a.id(), "b", b.id(), new IgfsPath("/a/b"), true));
-        del = mgr.softDelete(path("/a/b"), false);
-        assertEquals(b.id(), del);
+        mgr.softDelete(path("/a/b"), false, null);
 
         assertEquals(F.asMap("a", new IgfsListingEntry(a), "f1", new IgfsListingEntry(f1)),
             mgr.directoryListing(ROOT_ID));
@@ -351,8 +348,7 @@ public class IgfsMetaManagerSelfTest extends IgfsCommonAbstractTest {
 
         assertEmpty(mgr.directoryListing(b.id()));
 
-        del = mgr.softDelete(path("/a/f2"), false);
-        assertEquals(f2.id(), del);
+        mgr.softDelete(path("/a/f2"), false, null);
 
         assertEquals(F.asMap("a", new IgfsListingEntry(a), "f1", new IgfsListingEntry(f1)),
             mgr.directoryListing(ROOT_ID));
@@ -360,16 +356,14 @@ public class IgfsMetaManagerSelfTest extends IgfsCommonAbstractTest {
         assertEmpty(mgr.directoryListing(a.id()));
         assertEmpty(mgr.directoryListing(b.id()));
 
-        del = mgr.softDelete(path("/f1"), false);
-        assertEquals(f1.id(), del);
+        mgr.softDelete(path("/f1"), false, null);
 
         assertEquals(F.asMap("a", new IgfsListingEntry(a)), mgr.directoryListing(ROOT_ID));
 
         assertEmpty(mgr.directoryListing(a.id()));
         assertEmpty(mgr.directoryListing(b.id()));
 
-        del = mgr.softDelete(path("/a"), false);
-        assertEquals(a.id(), del);
+        mgr.softDelete(path("/a"), false, null);
 
         assertEmpty(mgr.directoryListing(ROOT_ID));
         assertEmpty(mgr.directoryListing(a.id()));


[06/13] ignite git commit: IGNITE-3312: IGFS: Fixed a bug introduced during create() routine reworking.

Posted by av...@apache.org.
IGNITE-3312: IGFS: Fixed a bug introduced during create() routine reworking.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b46422ad
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b46422ad
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b46422ad

Branch: refs/heads/master
Commit: b46422ad64f531900263fb36f6ff28373e4346f1
Parents: e733e91
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Jun 15 16:02:53 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Jun 15 16:37:44 2016 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsMetaManager.java        | 39 +++++++++++++++-----
 1 file changed, 30 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b46422ad/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index fd0b07e..1640918 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -2896,7 +2896,7 @@ public class IgfsMetaManager extends IgfsManager {
                         else {
                             // Create file and parent folders.
                             IgfsPathsCreateResult res = createFile(pathIds, lockInfos, dirProps, fileProps, blockSize,
-                                affKey, evictExclude, null);
+                                affKey, evictExclude, null, null);
 
                             if (res == null)
                                 continue;
@@ -3060,11 +3060,21 @@ public class IgfsMetaManager extends IgfsManager {
                         }
                         else {
                             // Create file and parent folders.
+                            T1<OutputStream> secondaryOutHolder = null;
+
                             if (secondaryCtx != null)
-                                secondaryOut = secondaryCtx.create();
+                                secondaryOutHolder = new T1<>();
 
-                            IgfsPathsCreateResult res = createFile(pathIds, lockInfos, dirProps, fileProps, blockSize,
-                                affKey, evictExclude, secondaryCtx);
+                            IgfsPathsCreateResult res;
+
+                            try {
+                                res = createFile(pathIds, lockInfos, dirProps, fileProps, blockSize,
+                                    affKey, evictExclude, secondaryCtx, secondaryOutHolder);
+                            }
+                            finally {
+                                if (secondaryOutHolder != null)
+                                    secondaryOut =  secondaryOutHolder.get();
+                            }
 
                             if (res == null)
                                 continue;
@@ -3114,7 +3124,7 @@ public class IgfsMetaManager extends IgfsManager {
             throw new IgfsParentNotDirectoryException("Failed to create directory (parent " +
                 "element is not a directory)");
 
-        return createFileOrDirectory(true, pathIds, lockInfos, dirProps, null, 0, null, false, null);
+        return createFileOrDirectory(true, pathIds, lockInfos, dirProps, null, 0, null, false, null, null);
     }
 
     /**
@@ -3128,12 +3138,14 @@ public class IgfsMetaManager extends IgfsManager {
      * @param affKey Affinity key (optional)
      * @param evictExclude Evict exclude flag.
      * @param secondaryCtx Secondary file system create context.
+     * @param secondaryOutHolder Holder for the secondary output stream.
      * @return Result or {@code} if the first parent already contained child with the same name.
      * @throws IgniteCheckedException If failed.
      */
     @Nullable private IgfsPathsCreateResult createFile(IgfsPathIds pathIds, Map<IgniteUuid, IgfsEntryInfo> lockInfos,
         Map<String, String> dirProps, Map<String, String> fileProps, int blockSize, @Nullable IgniteUuid affKey,
-        boolean evictExclude, @Nullable IgfsSecondaryFileSystemCreateContext secondaryCtx)
+        boolean evictExclude, @Nullable IgfsSecondaryFileSystemCreateContext secondaryCtx,
+        @Nullable T1<OutputStream> secondaryOutHolder)
         throws IgniteCheckedException{
         // Check if entry we are going to write to is directory.
         if (lockInfos.get(pathIds.lastExistingId()).isFile())
@@ -3141,7 +3153,7 @@ public class IgfsMetaManager extends IgfsManager {
                 "(parent element is not a directory): " + pathIds.path());
 
         return createFileOrDirectory(false, pathIds, lockInfos, dirProps, fileProps, blockSize, affKey, evictExclude,
-            secondaryCtx);
+            secondaryCtx, secondaryOutHolder);
     }
 
     /**
@@ -3156,6 +3168,7 @@ public class IgfsMetaManager extends IgfsManager {
      * @param affKey Affinity key.
      * @param evictExclude Evict exclude flag.
      * @param secondaryCtx Secondary file system create context.
+     * @param secondaryOutHolder Secondary output stream holder.
      * @return Result.
      * @throws IgniteCheckedException If failed.
      */
@@ -3163,7 +3176,8 @@ public class IgfsMetaManager extends IgfsManager {
     private IgfsPathsCreateResult createFileOrDirectory(boolean dir, IgfsPathIds pathIds,
         Map<IgniteUuid, IgfsEntryInfo> lockInfos, Map<String, String> dirProps, Map<String, String> fileProps,
         int blockSize, @Nullable IgniteUuid affKey, boolean evictExclude,
-        @Nullable IgfsSecondaryFileSystemCreateContext secondaryCtx) throws IgniteCheckedException {
+        @Nullable IgfsSecondaryFileSystemCreateContext secondaryCtx, @Nullable T1<OutputStream> secondaryOutHolder)
+        throws IgniteCheckedException {
         // This is our starting point.
         int lastExistingIdx = pathIds.lastExistingIndex();
         IgfsEntryInfo lastExistingInfo = lockInfos.get(pathIds.lastExistingId());
@@ -3179,6 +3193,13 @@ public class IgfsMetaManager extends IgfsManager {
         if (lastExistingInfo.hasChild(curPart))
             return null;
 
+        // Create entry in the secondary file system if needed.
+        if (secondaryCtx != null) {
+            assert secondaryOutHolder != null;
+
+            secondaryOutHolder.set(secondaryCtx.create());
+        }
+
         Map<IgniteUuid, EntryProcessor> procMap = new HashMap<>();
 
         // First step: add new entry to the last existing element.
@@ -3210,7 +3231,7 @@ public class IgfsMetaManager extends IgfsManager {
 
                 if (secondaryInfo == null)
                     throw new IgfsException("Failed to perform operation because secondary file system path was " +
-                        "modified concurrnetly: " + lastCreatedPath);
+                        "modified concurrently: " + lastCreatedPath);
                 else if (secondaryInfo.isFile())
                     throw new IgfsException("Failed to perform operation because secondary file system entity is " +
                         "not directory: " + lastCreatedPath);


[07/13] ignite git commit: IGNITE-2767: Cont. query remote filter requires to be in client nodes class path. Reviewed and merged by Denis Magda.

Posted by av...@apache.org.
IGNITE-2767: Cont. query remote filter requires to be in client nodes class path.
Reviewed and merged by Denis Magda.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b8411955
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b8411955
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b8411955

Branch: refs/heads/master
Commit: b8411955e5169b38430e077c5b4ed85d8e95b83c
Parents: b46422a
Author: Vladislav Pyatkov <vl...@gmail.com>
Authored: Wed Jun 15 17:18:27 2016 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Wed Jun 15 17:25:26 2016 +0300

----------------------------------------------------------------------
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   7 +-
 ...yRemoteFilterMissingInClassPathSelfTest.java | 237 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite3.java         |   2 +
 3 files changed, 245 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b8411955/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index c135b83..3d6df89 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -51,6 +51,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.AddressResolver;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridComponent;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.F;
@@ -1716,7 +1717,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
                 data0.put(entry.getKey(), compData);
             }
             catch (IgniteCheckedException e) {
-                U.error(log, "Failed to unmarshal discovery data for component: "  + entry.getKey(), e);
+                if (GridComponent.DiscoveryDataExchangeType.CONTINUOUS_PROC.ordinal() == entry.getKey() &&
+                    X.hasCause(e, ClassNotFoundException.class) && locNode.isClient())
+                    U.warn(log, "Failed to unmarshal continuous query remote filter on client node. Can be ignored.");
+                else
+                    U.error(log, "Failed to unmarshal discovery data for component: "  + entry.getKey(), e);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b8411955/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryRemoteFilterMissingInClassPathSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryRemoteFilterMissingInClassPathSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryRemoteFilterMissingInClassPathSelfTest.java
new file mode 100644
index 0000000..37f4e01
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ContinuousQueryRemoteFilterMissingInClassPathSelfTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.testframework.GridStringLogger;
+import org.apache.ignite.testframework.config.GridTestProperties;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+/**
+ *
+ */
+public class ContinuousQueryRemoteFilterMissingInClassPathSelfTest extends GridCommonAbstractTest {
+    /** URL of classes. */
+    private static final URL[] URLS;
+
+    static {
+        try {
+            URLS = new URL[] {new URL(GridTestProperties.getProperty("p2p.uri.cls"))};
+        }
+        catch (MalformedURLException e) {
+            throw new RuntimeException("Define property p2p.uri.cls", e);
+        }
+    }
+
+    /** */
+    private GridStringLogger log;
+
+    /** */
+    private boolean clientMode;
+
+    /** */
+    private boolean setExternalLoader;
+
+    /** */
+    private ClassLoader ldr;
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(name);
+
+        cfg.setClientMode(clientMode);
+
+        CacheConfiguration cacheCfg = new CacheConfiguration();
+
+        cacheCfg.setName("simple");
+
+        cacheCfg.setCacheMode(CacheMode.PARTITIONED);
+
+        cfg.setCacheConfiguration(cacheCfg);
+
+        if (setExternalLoader)
+            cfg.setClassLoader(ldr);
+        else
+            cfg.setGridLogger(log);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If fail.
+     */
+    public void testWarningMessageOnClientNode() throws Exception {
+        ldr = new URLClassLoader(URLS, getClass().getClassLoader());
+
+        clientMode = false;
+        setExternalLoader = true;
+        final Ignite ignite0 = startGrid(1);
+
+        executeContiniouseQuery(ignite0.cache("simple"));
+
+        log = new GridStringLogger();
+        clientMode = true;
+        setExternalLoader = false;
+
+        startGrid(2);
+
+        assertTrue(log.toString().contains("Failed to unmarshal continuous query remote filter on client node. " +
+            "Can be ignored."));
+    }
+
+    /**
+     * @throws Exception If fail.
+     */
+    public void testNoWarningMessageOnClientNode() throws Exception {
+        ldr = new URLClassLoader(URLS, getClass().getClassLoader());
+
+        setExternalLoader = true;
+
+        clientMode = false;
+        final Ignite ignite0 = startGrid(1);
+
+        executeContiniouseQuery(ignite0.cache("simple"));
+
+        log = new GridStringLogger();
+        clientMode = true;
+
+        startGrid(2);
+
+        assertTrue(!log.toString().contains("Failed to unmarshal continuous query remote filter on client node. " +
+            "Can be ignored."));
+    }
+
+    /**
+     * @throws Exception If fail.
+     */
+    public void testExceptionOnServerNode() throws Exception {
+        ldr = new URLClassLoader(URLS, getClass().getClassLoader());
+
+        clientMode = false;
+
+        setExternalLoader = true;
+        final Ignite ignite0 = startGrid(1);
+
+        executeContiniouseQuery(ignite0.cache("simple"));
+
+        log = new GridStringLogger();
+        setExternalLoader = false;
+
+        startGrid(2);
+
+        assertTrue(log.toString().contains("class org.apache.ignite.IgniteCheckedException: " +
+            "Failed to find class with given class loader for unmarshalling"));
+    }
+
+    /**
+     * @throws Exception If fail.
+     */
+    public void testNoExceptionOnServerNode() throws Exception {
+        ldr = new URLClassLoader(URLS, getClass().getClassLoader());
+
+        clientMode = false;
+
+        setExternalLoader = true;
+        final Ignite ignite0 = startGrid(1);
+
+        executeContiniouseQuery(ignite0.cache("simple"));
+
+        log = new GridStringLogger();
+
+        startGrid(2);
+
+        assertTrue(!log.toString().contains("class org.apache.ignite.IgniteCheckedException: " +
+            "Failed to find class with given class loader for unmarshalling"));
+    }
+
+    /**
+     * @param cache Ignite cache.
+     * @throws Exception If fail.
+     */
+    private void executeContiniouseQuery(IgniteCache cache) throws Exception {
+        ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();
+
+        qry.setLocalListener(
+            new CacheEntryUpdatedListener<Integer, String>() {
+                @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> events)
+                    throws CacheEntryListenerException {
+                    for (CacheEntryEvent<? extends Integer, ? extends String> event : events)
+                        System.out.println("Key = " + event.getKey() + ", Value = " + event.getValue());
+                }
+            }
+        );
+
+        final Class<CacheEntryEventSerializableFilter> remoteFilterClass = (Class<CacheEntryEventSerializableFilter>)
+            ldr.loadClass("org.apache.ignite.tests.p2p.CacheDeploymentCacheEntryEventSerializableFilter");
+
+        qry.setRemoteFilterFactory(new ClassFilterFactory(remoteFilterClass));
+
+        cache.query(qry);
+
+        for (int i = 0; i < 100; i++)
+            cache.put(i, "Message " + i);
+    }
+
+    /**
+     *
+     */
+    private static class ClassFilterFactory implements Factory<CacheEntryEventFilter<Integer, String>> {
+        /** */
+        private Class<CacheEntryEventSerializableFilter> cls;
+
+        /**
+         * @param cls Class.
+         */
+        public ClassFilterFactory(Class<CacheEntryEventSerializableFilter> cls) {
+            this.cls = cls;
+        }
+
+        /** {@inheritDoc} */
+        @Override public CacheEntryEventSerializableFilter<Integer, String> create() {
+            try {
+                return cls.newInstance();
+            }
+            catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b8411955/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
index 94b9dce..f7821a8 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBin
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationStoreEnabledTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheKeepBinaryIterationSwapEnabledTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.ContinuousQueryRemoteFilterMissingInClassPathSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicOffheapTieredTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicOffheapValuesTest;
@@ -109,6 +110,7 @@ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite {
         suite.addTestSuite(CacheContinuousBatchForceServerModeAckTest.class);
         suite.addTestSuite(CacheContinuousQueryExecuteInPrimaryTest.class);
         suite.addTestSuite(CacheContinuousQueryLostPartitionTest.class);
+        suite.addTestSuite(ContinuousQueryRemoteFilterMissingInClassPathSelfTest.class);
         suite.addTestSuite(IgniteCacheContinuousQueryImmutableEntryTest.class);
         suite.addTestSuite(CacheKeepBinaryIterationTest.class);
         suite.addTestSuite(CacheKeepBinaryIterationStoreEnabledTest.class);


[09/13] ignite git commit: IGNITE-3151: Using IgniteCountDownLatch sometimes drives to dead lock. Reviewed and merged by Denis Magda.

Posted by av...@apache.org.
IGNITE-3151: Using IgniteCountDownLatch sometimes drives to dead lock.
Reviewed and merged by Denis Magda.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/19aeec39
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/19aeec39
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/19aeec39

Branch: refs/heads/master
Commit: 19aeec39fe84f141a97ba1b0f4e6b48d2bb21f27
Parents: a672e57
Author: Vladislav Pyatkov <vl...@gmail.com>
Authored: Wed Jun 15 18:43:22 2016 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Wed Jun 15 18:50:07 2016 +0300

----------------------------------------------------------------------
 .../GridCacheCountDownLatchImpl.java            |  54 ++++++-
 .../IgniteCountDownLatchAbstractSelfTest.java   | 156 ++++++++++++++++++-
 2 files changed, 202 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/19aeec39/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
index 41cc548..723fb55 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
@@ -26,7 +26,7 @@ import java.io.ObjectStreamException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.GridKernalContext;
@@ -50,6 +50,15 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Internal latch is in unitialized state. */
+    private static final int UNINITIALIZED_LATCH_STATE = 0;
+
+    /** Internal latch is being created. */
+    private static final int CREATING_LATCH_STATE = 1;
+
+    /** Internal latch is ready for the usage. */
+    private static final int READY_LATCH_STATE = 2;
+
     /** Deserialization stash. */
     private static final ThreadLocal<IgniteBiTuple<GridKernalContext, String>> stash =
         new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() {
@@ -83,14 +92,17 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
     private boolean autoDel;
 
     /** Internal latch (transient). */
-    private volatile CountDownLatch internalLatch;
+    private CountDownLatch internalLatch;
 
     /** Initialization guard. */
-    private final AtomicBoolean initGuard = new AtomicBoolean();
+    private final AtomicInteger initGuard = new AtomicInteger();
 
     /** Initialization latch. */
     private final CountDownLatch initLatch = new CountDownLatch(1);
 
+    /** Latest latch value that is used at the stage while the internal latch is being initialized. */
+    private Integer lastLatchVal = null;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -236,15 +248,36 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
     @Override public void onUpdate(int cnt) {
         assert cnt >= 0;
 
-        while (internalLatch != null && internalLatch.getCount() > cnt)
-            internalLatch.countDown();
+        CountDownLatch latch0;
+
+        synchronized (initGuard) {
+            int state = initGuard.get();
+
+            if (state != READY_LATCH_STATE) {
+                /** Internal latch is not fully initialized yet. Remember latest latch value. */
+                lastLatchVal = cnt;
+
+                return;
+            }
+
+            /** 'synchronized' statement guarantees visibility of internalLatch. No need to make it volatile. */
+            latch0 = internalLatch;
+        }
+
+        /** Internal latch is fully initialized and ready for the usage. */
+
+        assert latch0 != null;
+
+        while (latch0.getCount() > cnt)
+            latch0.countDown();
+
     }
 
     /**
      * @throws IgniteCheckedException If operation failed.
      */
     private void initializeLatch() throws IgniteCheckedException {
-        if (initGuard.compareAndSet(false, true)) {
+        if (initGuard.compareAndSet(UNINITIALIZED_LATCH_STATE, CREATING_LATCH_STATE)) {
             try {
                 internalLatch = CU.outTx(
                     retryTopologySafe(new Callable<CountDownLatch>() {
@@ -268,6 +301,15 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
                     ctx
                 );
 
+                synchronized (initGuard) {
+                    if (lastLatchVal != null) {
+                        while (internalLatch.getCount() > lastLatchVal)
+                            internalLatch.countDown();
+                    }
+
+                    initGuard.set(READY_LATCH_STATE);
+                }
+
                 if (log.isDebugEnabled())
                     log.debug("Initialized internal latch: " + internalLatch);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/19aeec39/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java
index 2f6f6f4..f6d0287 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteCountDownLatchAbstractSelfTest.java
@@ -22,21 +22,26 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteCountDownLatch;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -144,7 +149,6 @@ public abstract class IgniteCountDownLatchAbstractSelfTest extends IgniteAtomics
 
     /**
      * @param latch Latch.
-     *
      * @throws Exception If failed.
      */
     protected void checkRemovedLatch(final IgniteCountDownLatch latch) throws Exception {
@@ -236,8 +240,8 @@ public abstract class IgniteCountDownLatchAbstractSelfTest extends IgniteAtomics
      * @param latchName Latch name.
      * @param cnt Count.
      * @param autoDel Auto delete flag.
-     * @throws Exception If failed.
      * @return New latch.
+     * @throws Exception If failed.
      */
     private IgniteCountDownLatch createLatch(String latchName, int cnt, boolean autoDel)
         throws Exception {
@@ -334,6 +338,58 @@ public abstract class IgniteCountDownLatchAbstractSelfTest extends IgniteAtomics
     /**
      * @throws Exception If failed.
      */
+    public void testLatchBroadcast() throws Exception {
+        Ignite ignite = grid(0);
+        ClusterGroup srvsGrp = ignite.cluster().forServers();
+
+        int numOfSrvs = srvsGrp.nodes().size();
+
+        ignite.destroyCache("testCache");
+        IgniteCache<Object, Object> cache = ignite.createCache("testCache");
+
+        for (ClusterNode node : srvsGrp.nodes())
+            cache.put(String.valueOf(node.id()), 0);
+
+        for (int i = 0; i < 500; i++) {
+            IgniteCountDownLatch latch1 = createLatch1(ignite, numOfSrvs);
+            IgniteCountDownLatch latch2 = createLatch2(ignite, numOfSrvs);
+
+            ignite.compute(srvsGrp).broadcast(new IgniteRunnableJob(latch1, latch2, i));
+            assertTrue(latch2.await(10000));
+        }
+    }
+
+    /**
+     * @param client Ignite client.
+     * @param numOfSrvs Number of server nodes.
+     * @return Ignite latch.
+     */
+    private IgniteCountDownLatch createLatch1(Ignite client, int numOfSrvs) {
+        return client.countDownLatch(
+            "testName1", // Latch name.
+            numOfSrvs,          // Initial count.
+            true,        // Auto remove, when counter has reached zero.
+            true         // Create if it does not exist.
+        );
+    }
+
+    /**
+     * @param client Ignite client.
+     * @param numOfSrvs Number of server nodes.
+     * @return Ignite latch.
+     */
+    private IgniteCountDownLatch createLatch2(Ignite client, int numOfSrvs) {
+        return client.countDownLatch(
+            "testName2", // Latch name.
+            numOfSrvs,          // Initial count.
+            true,        // Auto remove, when counter has reached zero.
+            true         // Create if it does not exist.
+        );
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testLatchMultinode2() throws Exception {
         if (gridCount() == 1)
             return;
@@ -391,4 +447,100 @@ public abstract class IgniteCountDownLatchAbstractSelfTest extends IgniteAtomics
     @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
         // No-op.
     }
+
+    /**
+     * Ignite job
+     */
+    public class IgniteRunnableJob implements IgniteRunnable {
+
+        /**
+         * Ignite.
+         */
+        @IgniteInstanceResource
+        Ignite igniteInstance;
+
+        /**
+         * Number of iteration.
+         */
+        protected final int iteration;
+
+        /**
+         * Ignite latch 1.
+         */
+        private final IgniteCountDownLatch latch1;
+
+        /**
+         * Ignite latch 2.
+         */
+        private final IgniteCountDownLatch latch2;
+
+        /**
+         * @param latch1 Ignite latch 1.
+         * @param latch2 Ignite latch 2.
+         * @param iteration Number of iteration.
+         */
+        public IgniteRunnableJob(IgniteCountDownLatch latch1, IgniteCountDownLatch latch2, int iteration) {
+            this.iteration = iteration;
+            this.latch1 = latch1;
+            this.latch2 = latch2;
+        }
+
+        /**
+         * @return Ignite latch.
+         */
+        IgniteCountDownLatch createLatch1() {
+            return latch1;
+        }
+
+        /**
+         * @return Ignite latch.
+         */
+        IgniteCountDownLatch createLatch2() {
+            return latch2;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void run() {
+
+            IgniteCountDownLatch latch1 = createLatch1();
+            IgniteCountDownLatch latch2 = createLatch2();
+
+            IgniteCache<Object, Object> cache = igniteInstance.cache("testCache");
+
+            for (ClusterNode node : igniteInstance.cluster().forServers().nodes()) {
+                Integer val = (Integer)cache.get(String.valueOf(node.id()));
+                assertEquals(val, (Integer)iteration);
+            }
+
+            latch1.countDown();
+
+            assertTrue(latch1.await(10000));
+
+            cache.put(getUID(), (iteration + 1));
+
+            latch2.countDown();
+
+        }
+
+        /**
+         * @return Node UUID as string.
+         */
+        String getUID() {
+            String id = "";
+            Collection<ClusterNode> nodes = igniteInstance.cluster().forLocal().nodes();
+            for (ClusterNode node : nodes) {
+                if (node.isLocal())
+                    id = String.valueOf(node.id());
+            }
+            return id;
+        }
+
+        /**
+         * @return Ignite.
+         */
+        public Ignite igniteInstance() {
+            return igniteInstance;
+        }
+    }
 }


[12/13] ignite git commit: IGNITE-3153 TcpDiscoveryZookeeperIpFinder doesn't properly handle client reconnections

Posted by av...@apache.org.
IGNITE-3153 TcpDiscoveryZookeeperIpFinder doesn't properly handle client reconnections


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e05c012f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e05c012f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e05c012f

Branch: refs/heads/master
Commit: e05c012f293dcad2cd3a184e6d257a0cb2af509a
Parents: 40d41c1
Author: Anton Vinogradov <av...@apache.org>
Authored: Thu Jun 16 10:49:48 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Thu Jun 16 14:27:22 2016 +0300

----------------------------------------------------------------------
 .../zk/TcpDiscoveryZookeeperIpFinder.java       | 65 +++++++++++---------
 .../tcp/ipfinder/zk/ZookeeperIpFinderTest.java  | 16 ++---
 2 files changed, 42 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e05c012f/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java
index bee4dab..238987b 100644
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java
+++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java
@@ -39,6 +39,7 @@ import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinderAdapter;
@@ -68,11 +69,8 @@ import org.codehaus.jackson.map.annotate.JsonRootName;
  *
  * @see <a href="http://zookeeper.apache.org">Apache ZooKeeper</a>
  * @see <a href="http://curator.apache.org">Apache Curator</a>
- *
- * @author Raul Kripalani
  */
 public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
-
     /** System property name to provide the ZK Connection String. */
     public static final String PROP_ZK_CONNECTION_STRING = "IGNITE_ZK_CONNECTION_STRING";
 
@@ -89,6 +87,10 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
     @GridToStringExclude
     private final AtomicBoolean initGuard = new AtomicBoolean();
 
+    /** Init guard. */
+    @GridToStringExclude
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
     /** Logger. */
     @LoggerResource
     private IgniteLogger log;
@@ -140,9 +142,11 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
             curator = CuratorFrameworkFactory.newClient(zkConnectionString, retryPolicy);
         }
 
-        if (curator.getState() != CuratorFrameworkState.STARTED)
+        if (curator.getState() == CuratorFrameworkState.LATENT)
             curator.start();
 
+        A.ensure(curator.getState() == CuratorFrameworkState.STARTED, "CuratorFramework can't be started.");
+
         discovery = ServiceDiscoveryBuilder.builder(IgniteInstanceDetails.class)
             .client(curator)
             .basePath(basePath)
@@ -152,8 +156,11 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
 
     /** {@inheritDoc} */
     @Override public void onSpiContextDestroyed() {
-        if (!initGuard.compareAndSet(true, false))
+        if (!closeGuard.compareAndSet(false, true)) {
+            U.warn(log, "ZooKeeper IP Finder can't be closed more than once.");
+
             return;
+        }
 
         log.info("Destroying ZooKeeper IP Finder.");
 
@@ -175,7 +182,8 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
 
         try {
             serviceInstances = discovery.queryForInstances(serviceName);
-        } catch (Exception e) {
+        }
+        catch (Exception e) {
             log.warning("Error while getting registered addresses from ZooKeeper IP Finder.", e);
             return Collections.emptyList();
         }
@@ -214,17 +222,18 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
 
             try {
                 ServiceInstance<IgniteInstanceDetails> si = ServiceInstance.<IgniteInstanceDetails>builder()
-                        .name(serviceName)
-                        .uriSpec(URI_SPEC)
-                        .address(addr.getAddress().getHostAddress())
-                        .port(addr.getPort())
-                        .build();
+                    .name(serviceName)
+                    .uriSpec(URI_SPEC)
+                    .address(addr.getAddress().getHostAddress())
+                    .port(addr.getPort())
+                    .build();
 
                 ourInstances.put(addr, si);
 
                 discovery.registerService(si);
 
-            } catch (Exception e) {
+            }
+            catch (Exception e) {
                 log.warning(String.format("Error while registering an address from ZooKeeper IP Finder " +
                     "[message=%s,addresses=%s]", e.getMessage(), addr), e);
             }
@@ -245,13 +254,14 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
             ServiceInstance<IgniteInstanceDetails> si = ourInstances.get(addr);
             if (si == null) {
                 log.warning("Asked to unregister address from ZooKeeper IP Finder, but no match was found in local " +
-                        "instance map for: " + addrs);
+                    "instance map for: " + addrs);
                 continue;
             }
 
             try {
                 discovery.unregisterService(si);
-            } catch (Exception e) {
+            }
+            catch (Exception e) {
                 log.warning("Error while unregistering an address from ZooKeeper IP Finder: " + addr, e);
             }
         }
@@ -272,7 +282,8 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
     }
 
     /**
-     * @param zkConnectionString ZooKeeper connection string in case a {@link CuratorFramework} is not being set explicitly.
+     * @param zkConnectionString ZooKeeper connection string in case a {@link CuratorFramework} is not being set
+     * explicitly.
      */
     public void setZkConnectionString(String zkConnectionString) {
         this.zkConnectionString = zkConnectionString;
@@ -286,8 +297,8 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
     }
 
     /**
-     * @param retryPolicy {@link RetryPolicy} to use in case a ZK Connection String is being injected, or if
-     *                    using a system property.
+     * @param retryPolicy {@link RetryPolicy} to use in case a ZK Connection String is being injected, or if using a
+     * system property.
      */
     public void setRetryPolicy(RetryPolicy retryPolicy) {
         this.retryPolicy = retryPolicy;
@@ -315,9 +326,8 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
     }
 
     /**
-     * @param serviceName Service name to use, as defined by Curator's {#link ServiceDiscovery} recipe. In physical
-     *                    ZK terms, it represents the node under {@link #basePath}, under which services will be
-     *                    registered.
+     * @param serviceName Service name to use, as defined by Curator's {#link ServiceDiscovery} recipe. In physical ZK
+     * terms, it represents the node under {@link #basePath}, under which services will be registered.
      */
     public void setServiceName(String serviceName) {
         this.serviceName = serviceName;
@@ -331,20 +341,19 @@ public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
     }
 
     /**
-     * @param allowDuplicateRegistrations Whether to register each node only once, or if duplicate registrations
-     *                                    are allowed. Nodes will attempt to register themselves, plus those they
-     *                                    know about. By default, duplicate registrations are not allowed, but you
-     *                                    might want to set this property to <tt>true</tt> if you have multiple
-     *                                    network interfaces or if you are facing troubles.
+     * @param allowDuplicateRegistrations Whether to register each node only once, or if duplicate registrations are
+     * allowed. Nodes will attempt to register themselves, plus those they know about. By default, duplicate
+     * registrations are not allowed, but you might want to set this property to <tt>true</tt> if you have multiple
+     * network interfaces or if you are facing troubles.
      */
     public void setAllowDuplicateRegistrations(boolean allowDuplicateRegistrations) {
         this.allowDuplicateRegistrations = allowDuplicateRegistrations;
     }
 
     /**
-     * Empty DTO for storing service instances details. Currently acting as a placeholder because Curator requires
-     * a payload type when registering and discovering nodes. May be enhanced in the future with further information
-     * to assist discovery.
+     * Empty DTO for storing service instances details. Currently acting as a placeholder because Curator requires a
+     * payload type when registering and discovering nodes. May be enhanced in the future with further information to
+     * assist discovery.
      *
      * @author Raul Kripalani
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e05c012f/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
index 3f9b1ff..601323c 100644
--- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
+++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
@@ -122,6 +122,7 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
         // shall be configured through system property
         if (gridName.equals(getTestGridName(0)))
             zkIpFinder.setZkConnectionString(zkCluster.getConnectString());
+
         else if (gridName.equals(getTestGridName(1))) {
             zkIpFinder.setCurator(CuratorFrameworkFactory.newClient(zkCluster.getConnectString(),
                 new ExponentialBackoffRetry(100, 5)));
@@ -360,23 +361,16 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
         // stop all grids
         stopAllGrids();
 
-        boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
                 try {
-                    return zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size() == 0;
+                    return 0 == zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size();
                 }
                 catch (Exception e) {
-                    fail("Unexpected error: ");
-
-                    return true;
+                    return false;
                 }
             }
-        }, 2 * 60000);
-
-        assertTrue(wait);
-
-        // check that all nodes are gone in ZK
-        assertEquals(0, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
+        }, 20000));
     }
 
     /**


[08/13] ignite git commit: ignite-3209 Waiting for affinity topology in case of failover for affinity call

Posted by av...@apache.org.
ignite-3209 Waiting for affinity topology in case of failover for affinity call


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a672e576
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a672e576
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a672e576

Branch: refs/heads/master
Commit: a672e576cf88e21859944474355a91d9f4699550
Parents: b841195
Author: agura <ag...@gridgain.com>
Authored: Tue Jun 14 17:32:25 2016 +0300
Committer: agura <ag...@gridgain.com>
Committed: Wed Jun 15 17:43:44 2016 +0300

----------------------------------------------------------------------
 .../processors/task/GridTaskWorker.java         | 44 ++++++++++++++++++--
 .../cache/CacheAffinityCallSelfTest.java        | 42 +++++++++++++------
 2 files changed, 71 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a672e576/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 05970ed..415d632 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -61,10 +61,12 @@ import org.apache.ignite.internal.GridJobSiblingImpl;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTaskSessionImpl;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.compute.ComputeTaskTimeoutCheckedException;
 import org.apache.ignite.internal.managers.deployment.GridDeployment;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.closure.AffinityTask;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.typedef.CO;
@@ -76,6 +78,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.internal.visor.util.VisorClusterGroupEmptyException;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.resources.TaskContinuousMapperResource;
@@ -678,6 +681,12 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
             // job response was changed in this method apply.
             boolean selfOccupied = false;
 
+            IgniteInternalFuture<?> affFut = null;
+
+            boolean waitForAffTop = false;
+
+            final GridJobExecuteResponse failoverRes = res;
+
             try {
                 synchronized (mux) {
                     // If task is not waiting for responses,
@@ -847,7 +856,18 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                         }
 
                         case FAILOVER: {
-                            if (!failover(res, jobRes, getTaskTopology()))
+                            if (affKey != null) {
+                                AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx();
+
+                                affFut = ctx.cache().context().exchange().affinityReadyFuture(topVer);
+                            }
+
+                            if (affFut != null && !affFut.isDone()) {
+                                waitForAffTop = true;
+
+                                jobRes.resetResponse();
+                            }
+                            else if (!failover(res, jobRes, getTaskTopology()))
                                 plc = null;
 
                             break;
@@ -856,7 +876,7 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                 }
 
                 // Outside of synchronization.
-                if (plc != null) {
+                if (plc != null && !waitForAffTop) {
                     // Handle failover.
                     if (plc == FAILOVER)
                         sendFailoverRequest(jobRes);
@@ -872,6 +892,8 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                 U.error(log, "Failed to obtain topology [ses=" + ses + ", err=" + e + ']', e);
 
                 finishTask(null, e);
+
+                waitForAffTop = false;
             }
             finally {
                 // Open up job for processing responses.
@@ -890,6 +912,18 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
                     res = delayedRess.poll();
                 }
             }
+
+            if (waitForAffTop && affFut != null) {
+                affFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
+                    @Override public void apply(IgniteInternalFuture<?> fut0) {
+                        ctx.closure().runLocalSafe(new Runnable() {
+                            @Override public void run() {
+                                onResponse(failoverRes);
+                            }
+                        }, false);
+                    }
+                });
+            }
         }
     }
 
@@ -1039,7 +1073,11 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
      * @param top Topology.
      * @return {@code True} if fail-over SPI returned a new node.
      */
-    private boolean failover(GridJobExecuteResponse res, GridJobResultImpl jobRes, Collection<? extends ClusterNode> top) {
+    private boolean failover(
+        GridJobExecuteResponse res,
+        GridJobResultImpl jobRes,
+        Collection<? extends ClusterNode> top
+    ) {
         assert Thread.holdsLock(mux);
 
         try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a672e576/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
index a1762cc..92e2b9b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAffinityCallSelfTest.java
@@ -50,9 +50,6 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest {
     private static final String CACHE_NAME = "myCache";
 
     /** */
-    private static final int MAX_FAILOVER_ATTEMPTS = 500;
-
-    /** */
     private static final int SRVS = 4;
 
     /** */
@@ -69,21 +66,22 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest {
         cfg.setDiscoverySpi(spi);
 
         AlwaysFailoverSpi failSpi = new AlwaysFailoverSpi();
-        failSpi.setMaximumFailoverAttempts(MAX_FAILOVER_ATTEMPTS);
         cfg.setFailoverSpi(failSpi);
 
-        CacheConfiguration ccfg = defaultCacheConfiguration();
-        ccfg.setName(CACHE_NAME);
-        ccfg.setCacheMode(PARTITIONED);
-        ccfg.setBackups(1);
-
-        cfg.setCacheConfiguration(ccfg);
-
+        // Do not configure cache on client.
         if (gridName.equals(getTestGridName(SRVS))) {
             cfg.setClientMode(true);
 
             spi.setForceServerMode(true);
         }
+        else {
+            CacheConfiguration ccfg = defaultCacheConfiguration();
+            ccfg.setName(CACHE_NAME);
+            ccfg.setCacheMode(PARTITIONED);
+            ccfg.setBackups(1);
+
+            cfg.setCacheConfiguration(ccfg);
+        }
 
         return cfg;
     }
@@ -99,7 +97,27 @@ public class CacheAffinityCallSelfTest extends GridCommonAbstractTest {
     public void testAffinityCallRestartNode() throws Exception {
         startGridsMultiThreaded(SRVS);
 
-        final int ITERS = 5;
+        affinityCallRestartNode();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAffinityCallFromClientRestartNode() throws Exception {
+        startGridsMultiThreaded(SRVS + 1);
+
+        Ignite client = grid(SRVS);
+
+        assertTrue(client.configuration().isClientMode());
+
+        affinityCallRestartNode();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void affinityCallRestartNode() throws Exception {
+        final int ITERS = 10;
 
         for (int i = 0; i < ITERS; i++) {
             log.info("Iteration: " + i);


[10/13] ignite git commit: GG-11237 KeepBinary flag will not be send to another node.

Posted by av...@apache.org.
GG-11237 KeepBinary flag will not be send to another node.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b5f4aac2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b5f4aac2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b5f4aac2

Branch: refs/heads/master
Commit: b5f4aac2a5daad96cdac4df9abf91713824ff2c0
Parents: 19aeec3
Author: Anton Vinogradov <av...@apache.org>
Authored: Thu Jun 16 14:14:33 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Thu Jun 16 14:14:33 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     |  49 ++++-----
 .../cache/transactions/IgniteTxAdapter.java     |  15 +--
 .../GridCacheLazyPlainVersionedEntry.java       | 107 +++++++++++++++++++
 .../version/GridCachePlainVersionedEntry.java   |   7 +-
 4 files changed, 140 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b5f4aac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 6d321bd..57fa68e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -54,7 +54,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
-import org.apache.ignite.internal.processors.cache.version.GridCachePlainVersionedEntry;
+import org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
@@ -2035,8 +2035,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 // Cache is conflict-enabled.
                 if (cctx.conflictNeedResolve()) {
-                    // Get new value, optionally unmarshalling and/or transforming it.
-                    Object writeObj0;
+                    GridCacheVersionedEntryEx newEntry;
+
+                    GridTuple3<Long, Long, Boolean> expiration = ttlAndExpireTime(expiryPlc,
+                        explicitTtl,
+                        explicitExpireTime);
+
+                    // Prepare old and new entries for conflict resolution.
+                    GridCacheVersionedEntryEx oldEntry = versionedEntry(keepBinary);
 
                     if (op == GridCacheOperation.TRANSFORM) {
                         transformClo = writeObj;
@@ -2051,14 +2057,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                         try {
                             Object computed = entryProcessor.process(entry, invokeArgs);
 
-                            if (entry.modified()) {
-                                writeObj0 = cctx.unwrapTemporary(entry.getValue());
-                                writeObj = cctx.toCacheObject(writeObj0);
-                            }
-                            else {
+                            if (entry.modified())
+                                writeObj = cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue()));
+                            else
                                 writeObj = oldVal;
-                                writeObj0 = cctx.unwrapBinaryIfNeeded(oldVal, keepBinary, false);
-                            }
 
                             key0 = entry.key();
 
@@ -2069,24 +2071,17 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                             invokeRes = new IgniteBiTuple(null, e);
 
                             writeObj = oldVal;
-                            writeObj0 = cctx.unwrapBinaryIfNeeded(oldVal, keepBinary, false);
                         }
                     }
-                    else
-                        writeObj0 = cctx.unwrapBinaryIfNeeded(writeObj, keepBinary, false);
 
-                    GridTuple3<Long, Long, Boolean> expiration = ttlAndExpireTime(expiryPlc,
-                        explicitTtl,
-                        explicitExpireTime);
-
-                    // Prepare old and new entries for conflict resolution.
-                    GridCacheVersionedEntryEx oldEntry = versionedEntry(keepBinary);
-                    GridCacheVersionedEntryEx newEntry = new GridCachePlainVersionedEntry<>(
-                        oldEntry.key(),
-                        writeObj0,
+                    newEntry = new GridCacheLazyPlainVersionedEntry<>(
+                        cctx,
+                        key,
+                        (CacheObject)writeObj,
                         expiration.get1(),
                         expiration.get2(),
-                        conflictVer != null ? conflictVer : newVer);
+                        conflictVer != null ? conflictVer : newVer,
+                        keepBinary);
 
                     // Resolve conflict.
                     conflictCtx = cctx.conflictResolve(oldEntry, newEntry, verCheck);
@@ -3531,12 +3526,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
         CacheObject val = isNew ? unswap(true, false) : rawGetOrUnmarshalUnlocked(false);
 
-        return new GridCachePlainVersionedEntry<>(cctx.unwrapBinaryIfNeeded(key, keepBinary, true),
-            cctx.unwrapBinaryIfNeeded(val, keepBinary, true),
+        return new GridCacheLazyPlainVersionedEntry<>(cctx,
+            key,
+            val,
             ttlExtras(),
             expireTimeExtras(),
             ver.conflictVersion(),
-            isNew);
+            isNew,
+            keepBinary);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5f4aac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index ece013e..f76f4bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -55,7 +55,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
-import org.apache.ignite.internal.processors.cache.version.GridCachePlainVersionedEntry;
+import org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
@@ -1645,14 +1645,15 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
         // Construct new entry info.
         GridCacheContext entryCtx = txEntry.context();
 
-        Object newVal0 = entryCtx.unwrapBinaryIfNeeded(newVal, txEntry.keepBinary(), false);
-
-        GridCacheVersionedEntryEx newEntry = new GridCachePlainVersionedEntry(
-            oldEntry.key(),
-            newVal0,
+        GridCacheVersionedEntryEx newEntry = new GridCacheLazyPlainVersionedEntry(
+            entryCtx,
+            txEntry.key(),
+            newVal,
             newTtl,
             newExpireTime,
-            newVer);
+            newVer,
+            false,
+            txEntry.keepBinary());
 
         GridCacheVersionConflictContext ctx = old.context().conflictResolve(oldEntry, newEntry, false);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5f4aac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheLazyPlainVersionedEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheLazyPlainVersionedEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheLazyPlainVersionedEntry.java
new file mode 100644
index 0000000..50de328
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheLazyPlainVersionedEntry.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.version;
+
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+
+/**
+ * Lazy plain versioned entry.
+ */
+public class GridCacheLazyPlainVersionedEntry<K, V> extends GridCachePlainVersionedEntry<K, V> {
+    /** Cache context. */
+    protected GridCacheContext cctx;
+
+    /** Key cache object. */
+    private KeyCacheObject keyObj;
+
+    /** Cache object value. */
+    private CacheObject valObj;
+
+    /** Keep binary flag. */
+    private boolean keepBinary;
+
+    /**
+     * @param cctx Context.
+     * @param keyObj Key.
+     * @param valObj Value.
+     * @param ttl TTL.
+     * @param expireTime Expire time.
+     * @param ver Version.
+     * @param isStartVer Start version flag.
+     * @param keepBinary Keep binary flag.
+     */
+    public GridCacheLazyPlainVersionedEntry(GridCacheContext cctx,
+        KeyCacheObject keyObj,
+        CacheObject valObj,
+        long ttl,
+        long expireTime,
+        GridCacheVersion ver,
+        boolean isStartVer,
+        boolean keepBinary) {
+        super(null, null, ttl, expireTime, ver, isStartVer);
+
+        this.cctx = cctx;
+        this.keyObj = keyObj;
+        this.valObj = valObj;
+        this.keepBinary = keepBinary;
+    }
+
+    public GridCacheLazyPlainVersionedEntry(GridCacheContext cctx,
+        KeyCacheObject keyObj,
+        CacheObject valObj,
+        long ttl,
+        long expireTime,
+        GridCacheVersion ver,
+        boolean keepBinary) {
+        super(null, null, ttl, expireTime, ver);
+        this.cctx = cctx;
+        this.keepBinary = keepBinary;
+        this.keyObj = keyObj;
+        this.valObj = valObj;
+    }
+
+    /** {@inheritDoc} */
+    @Override public K key() {
+        if (key == null)
+            key = (K)cctx.unwrapBinaryIfNeeded(keyObj, keepBinary);
+
+        return key;
+    }
+
+    /** {@inheritDoc} */
+    @Override public V value() {
+        return value(keepBinary);
+    }
+
+    /**
+     * Returns the value stored in the cache when this entry was created.
+     *
+     * @param keepBinary Flag to keep binary if needed.
+     * @return the value corresponding to this entry
+     */
+    @SuppressWarnings("unchecked")
+    public V value(boolean keepBinary) {
+        if (val == null)
+            val = (V)cctx.unwrapBinaryIfNeeded(valObj, keepBinary, true);
+
+        return val;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b5f4aac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCachePlainVersionedEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCachePlainVersionedEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCachePlainVersionedEntry.java
index e1da1c4..dd682e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCachePlainVersionedEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCachePlainVersionedEntry.java
@@ -25,10 +25,10 @@ import org.jetbrains.annotations.Nullable;
  */
 public class GridCachePlainVersionedEntry<K, V> implements GridCacheVersionedEntryEx<K, V> {
     /** Key. */
-    private final K key;
+    protected K key;
 
     /** Value. */
-    private final V val;
+    protected V val;
 
     /** TTL. */
     private final long ttl;
@@ -63,9 +63,6 @@ public class GridCachePlainVersionedEntry<K, V> implements GridCacheVersionedEnt
      */
     public GridCachePlainVersionedEntry(K key, V val, long ttl, long expireTime, GridCacheVersion ver,
         boolean isStartVer) {
-        assert ver != null;
-        assert key != null;
-
         this.key = key;
         this.val = val;
         this.ttl = ttl;


[13/13] ignite git commit: Merge remote-tracking branch 'remotes/upstream/gridgain-7.6.1'

Posted by av...@apache.org.
Merge remote-tracking branch 'remotes/upstream/gridgain-7.6.1'


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ca546b3d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ca546b3d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ca546b3d

Branch: refs/heads/master
Commit: ca546b3d4ac8431aa9b5104e1d5b732720676d67
Parents: 9d46a73 e05c012
Author: Anton Vinogradov <av...@apache.org>
Authored: Thu Jun 16 14:30:11 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Thu Jun 16 14:30:11 2016 +0300

----------------------------------------------------------------------
 .../configuration/FileSystemConfiguration.java  |  18 +-
 .../processors/cache/GridCacheMapEntry.java     |  49 +-
 .../cache/transactions/IgniteTxAdapter.java     |  15 +-
 .../GridCacheLazyPlainVersionedEntry.java       | 107 +++
 .../version/GridCachePlainVersionedEntry.java   |   7 +-
 .../GridCacheCountDownLatchImpl.java            |  54 +-
 .../processors/igfs/IgfsCreateResult.java       |  66 ++
 .../processors/igfs/IgfsDataManager.java        | 166 +----
 .../processors/igfs/IgfsDeleteResult.java       |  62 ++
 .../internal/processors/igfs/IgfsImpl.java      |  83 +--
 .../processors/igfs/IgfsMetaManager.java        | 733 +++++++++----------
 .../internal/processors/igfs/IgfsPathIds.java   |  10 +
 .../IgfsSecondaryFileSystemCreateContext.java   | 111 +++
 .../IgfsSecondaryOutputStreamDescriptor.java    |  59 --
 .../igfs/data/IgfsDataPutProcessor.java         |  99 +++
 .../meta/IgfsMetaDirectoryCreateProcessor.java  |  40 +-
 ...IgfsMetaDirectoryListingRenameProcessor.java | 133 ++++
 .../igfs/meta/IgfsMetaFileCreateProcessor.java  |  46 +-
 .../ignite/internal/util/IgniteUtils.java       |   8 +-
 .../ignite/internal/util/lang/GridFunc.java     |  66 --
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   7 +-
 .../IgniteCountDownLatchAbstractSelfTest.java   | 156 +++-
 ...yRemoteFilterMissingInClassPathSelfTest.java | 237 ++++++
 .../processors/igfs/IgfsAbstractSelfTest.java   |  14 +-
 .../igfs/IgfsMetaManagerSelfTest.java           |  24 +-
 .../internal/util/IgniteUtilsSelfTest.java      |  15 +
 .../IgniteCacheQuerySelfTestSuite3.java         |   2 +
 .../zk/TcpDiscoveryZookeeperIpFinder.java       |  65 +-
 .../tcp/ipfinder/zk/ZookeeperIpFinderTest.java  |  16 +-
 29 files changed, 1628 insertions(+), 840 deletions(-)
----------------------------------------------------------------------



[05/13] ignite git commit: IGNITE-2938: IGFS: Puts during secondary file reads are now performed synchronously and with proper semantics.

Posted by av...@apache.org.
IGNITE-2938: IGFS: Puts during secondary file reads are now performed synchronously and with proper semantics.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e733e912
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e733e912
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e733e912

Branch: refs/heads/master
Commit: e733e912cccf2c83d7e31d6f856b30fc4c663975
Parents: 95513d3
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Jun 14 17:39:44 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Jun 15 16:35:58 2016 +0300

----------------------------------------------------------------------
 .../configuration/FileSystemConfiguration.java  |  18 +-
 .../processors/igfs/IgfsDataManager.java        | 166 ++++---------------
 .../igfs/data/IgfsDataPutProcessor.java         |  99 +++++++++++
 3 files changed, 147 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e733e912/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
index 28dd611..6e0e5e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
@@ -704,7 +704,9 @@ public class FileSystemConfiguration {
      * Gets maximum timeout awaiting for trash purging in case data cache oversize is detected.
      *
      * @return Maximum timeout awaiting for trash purging in case data cache oversize is detected.
+     * @deprecated Not used any more.
      */
+    @Deprecated
     public long getTrashPurgeTimeout() {
         return trashPurgeTimeout;
     }
@@ -713,7 +715,9 @@ public class FileSystemConfiguration {
      * Sets maximum timeout awaiting for trash purging in case data cache oversize is detected.
      *
      * @param trashPurgeTimeout Maximum timeout awaiting for trash purging in case data cache oversize is detected.
+     * @deprecated Not used any more.
      */
+    @Deprecated
     public void setTrashPurgeTimeout(long trashPurgeTimeout) {
         this.trashPurgeTimeout = trashPurgeTimeout;
     }
@@ -724,8 +728,10 @@ public class FileSystemConfiguration {
      * In case no executor service is provided, default one will be created with maximum amount of threads equals
      * to amount of processor cores.
      *
-     * @return Get DUAL mode put operation executor service
+     * @return Get DUAL mode put operation executor service.
+     * @deprecated Not used any more.
      */
+    @Deprecated
     @Nullable public ExecutorService getDualModePutExecutorService() {
         return dualModePutExec;
     }
@@ -734,7 +740,9 @@ public class FileSystemConfiguration {
      * Set DUAL mode put operations executor service.
      *
      * @param dualModePutExec Dual mode put operations executor service.
+     * @deprecated Not used any more.
      */
+    @Deprecated
     public void setDualModePutExecutorService(ExecutorService dualModePutExec) {
         this.dualModePutExec = dualModePutExec;
     }
@@ -743,7 +751,9 @@ public class FileSystemConfiguration {
      * Get DUAL mode put operation executor service shutdown flag.
      *
      * @return DUAL mode put operation executor service shutdown flag.
+     * @deprecated Not used any more.
      */
+    @Deprecated
     public boolean getDualModePutExecutorServiceShutdown() {
         return dualModePutExecShutdown;
     }
@@ -752,7 +762,9 @@ public class FileSystemConfiguration {
      * Set DUAL mode put operations executor service shutdown flag.
      *
      * @param dualModePutExecShutdown Dual mode put operations executor service shutdown flag.
+     * @deprecated Not used any more.
      */
+    @Deprecated
     public void setDualModePutExecutorServiceShutdown(boolean dualModePutExecShutdown) {
         this.dualModePutExecShutdown = dualModePutExecShutdown;
     }
@@ -766,7 +778,9 @@ public class FileSystemConfiguration {
      * avoid issues with increasing GC pauses or out-of-memory error.
      *
      * @return Maximum amount of pending data read from the secondary file system
+     * @deprecated Not used any more.
      */
+    @Deprecated
     public long getDualModeMaxPendingPutsSize() {
         return dualModeMaxPendingPutsSize;
     }
@@ -775,7 +789,9 @@ public class FileSystemConfiguration {
      * Set maximum amount of data in pending put operations.
      *
      * @param dualModeMaxPendingPutsSize Maximum amount of data in pending put operations.
+     * @deprecated Not used any more.
      */
+    @Deprecated
     public void setDualModeMaxPendingPutsSize(long dualModeMaxPendingPutsSize) {
         this.dualModeMaxPendingPutsSize = dualModeMaxPendingPutsSize;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e733e912/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index 03777c3..ca8a3af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerCacheUpdaters;
+import org.apache.ignite.internal.processors.igfs.data.IgfsDataPutProcessor;
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -53,7 +54,6 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
@@ -78,10 +78,8 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
@@ -98,9 +96,6 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA
  * Cache based file's data container.
  */
 public class IgfsDataManager extends IgfsManager {
-    /** IGFS. */
-    private IgfsEx igfs;
-
     /** Data internal cache. */
     private IgniteInternalCache<IgfsBlockKey, byte[]> dataCachePrj;
 
@@ -143,31 +138,10 @@ public class IgfsDataManager extends IgfsManager {
     /** Async file delete worker. */
     private AsyncDeleteWorker delWorker;
 
-    /** Trash purge timeout. */
-    private long trashPurgeTimeout;
-
     /** On-going remote reads futures. */
     private final ConcurrentHashMap8<IgfsBlockKey, IgniteInternalFuture<byte[]>> rmtReadFuts =
         new ConcurrentHashMap8<>();
 
-    /** Executor service for puts in dual mode */
-    private volatile ExecutorService putExecSvc;
-
-    /** Executor service for puts in dual mode shutdown flag. */
-    private volatile boolean putExecSvcShutdown;
-
-    /** Maximum amount of data in pending puts. */
-    private volatile long maxPendingPuts;
-
-    /** Current amount of data in pending puts. */
-    private long curPendingPuts;
-
-    /** Lock for pending puts. */
-    private final Lock pendingPutsLock = new ReentrantLock();
-
-    /** Condition for pending puts. */
-    private final Condition pendingPutsCond = pendingPutsLock.newCondition();
-
     /**
      *
      */
@@ -182,8 +156,6 @@ public class IgfsDataManager extends IgfsManager {
 
     /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
-        igfs = igfsCtx.igfs();
-
         dataCacheStartLatch = new CountDownLatch(1);
 
         String igfsName = igfsCtx.configuration().getName();
@@ -216,23 +188,6 @@ public class IgfsDataManager extends IgfsManager {
 
         igfsSvc = igfsCtx.kernalContext().getIgfsExecutorService();
 
-        trashPurgeTimeout = igfsCtx.configuration().getTrashPurgeTimeout();
-
-        putExecSvc = igfsCtx.configuration().getDualModePutExecutorService();
-
-        if (putExecSvc != null)
-            putExecSvcShutdown = igfsCtx.configuration().getDualModePutExecutorServiceShutdown();
-        else {
-            int coresCnt = Runtime.getRuntime().availableProcessors();
-
-            // Note that we do not pre-start threads here as IGFS pool may not be needed.
-            putExecSvc = new IgniteThreadPoolExecutor(coresCnt, coresCnt, 0, new LinkedBlockingDeque<Runnable>());
-
-            putExecSvcShutdown = true;
-        }
-
-        maxPendingPuts = igfsCtx.configuration().getDualModeMaxPendingPutsSize();
-
         delWorker = new AsyncDeleteWorker(igfsCtx.kernalContext().gridName(),
             "igfs-" + igfsName + "-delete-worker", log);
     }
@@ -282,9 +237,6 @@ public class IgfsDataManager extends IgfsManager {
         catch (IgniteInterruptedCheckedException e) {
             log.warning("Got interrupter while waiting for delete worker to stop (will continue stopping).", e);
         }
-
-        if (putExecSvcShutdown)
-            U.shutdownNow(getClass(), putExecSvc, log);
     }
 
     /**
@@ -308,6 +260,7 @@ public class IgfsDataManager extends IgfsManager {
      * @param prevAffKey Affinity key of previous block.
      * @return Affinity key.
      */
+    @SuppressWarnings("ConstantConditions")
     public IgniteUuid nextAffinityKey(@Nullable IgniteUuid prevAffKey) {
         // Do not generate affinity key for non-affinity nodes.
         if (!dataCache.context().affinityNode())
@@ -371,8 +324,6 @@ public class IgfsDataManager extends IgfsManager {
     @Nullable public IgniteInternalFuture<byte[]> dataBlock(final IgfsEntryInfo fileInfo, final IgfsPath path,
         final long blockIdx, @Nullable final IgfsSecondaryFileSystemPositionedReadable secReader)
         throws IgniteCheckedException {
-        //assert validTxState(any); // Allow this method call for any transaction state.
-
         assert fileInfo != null;
         assert blockIdx >= 0;
 
@@ -435,7 +386,7 @@ public class IgfsDataManager extends IgfsManager {
 
                                 rmtReadFut.onDone(res);
 
-                                putSafe(key, res);
+                                putBlock(fileInfo.blockSize(), key, res);
 
                                 metrics.addReadBlocks(1, 1);
                             }
@@ -471,6 +422,26 @@ public class IgfsDataManager extends IgfsManager {
     }
 
     /**
+     * Stores the given block in data cache.
+     *
+     * @param blockSize The size of the block.
+     * @param key The data cache key of the block.
+     * @param data The new value of the block.
+     */
+    private void putBlock(int blockSize, IgfsBlockKey key, byte[] data) throws IgniteCheckedException {
+        if (data.length < blockSize)
+            // partial (incomplete) block:
+            dataCachePrj.invoke(key, new IgfsDataPutProcessor(data));
+        else {
+            // whole block:
+            assert data.length == blockSize;
+
+            dataCachePrj.put(key, data);
+        }
+    }
+
+
+    /**
      * Registers write future in igfs data manager.
      *
      * @param fileId File ID.
@@ -680,7 +651,7 @@ public class IgfsDataManager extends IgfsManager {
                                 byte[] val = vals.get(colocatedKey);
 
                                 if (val != null) {
-                                    dataCachePrj.put(key, val);
+                                    putBlock(fileInfo.blockSize(), key, val);
 
                                     tx.commit();
                                 }
@@ -744,7 +715,6 @@ public class IgfsDataManager extends IgfsManager {
      */
     public Collection<IgfsBlockLocation> affinity(IgfsEntryInfo info, long start, long len, long maxLen)
         throws IgniteCheckedException {
-        assert validTxState(false);
         assert info.isFile() : "Failed to get affinity (not a file): " + info;
         assert start >= 0 : "Start position should not be negative: " + start;
         assert len >= 0 : "Part length should not be negative: " + len;
@@ -974,21 +944,6 @@ public class IgfsDataManager extends IgfsManager {
     }
 
     /**
-     * Check transaction is (not) started.
-     *
-     * @param inTx Expected transaction state.
-     * @return Transaction state is correct.
-     */
-    private boolean validTxState(boolean inTx) {
-        boolean txState = inTx == (dataCachePrj.tx() != null);
-
-        assert txState : (inTx ? "Method cannot be called outside transaction: " :
-            "Method cannot be called in transaction: ") + dataCachePrj.tx();
-
-        return txState;
-    }
-
-    /**
      * @param fileId File ID.
      * @param node Node to process blocks on.
      * @param blocks Blocks to put in cache.
@@ -1056,10 +1011,11 @@ public class IgfsDataManager extends IgfsManager {
      * @param colocatedKey Block key.
      * @param startOff Data start offset within block.
      * @param data Data to write.
+     * @param blockSize The block size.
      * @throws IgniteCheckedException If update failed.
      */
     private void processPartialBlockWrite(IgniteUuid fileId, IgfsBlockKey colocatedKey, int startOff,
-        byte[] data) throws IgniteCheckedException {
+        byte[] data, int blockSize) throws IgniteCheckedException {
         if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax()) {
             final WriteCompletionFuture completionFut = pendingWrites.get(fileId);
 
@@ -1090,7 +1046,7 @@ public class IgfsDataManager extends IgfsManager {
 
         // If writing from block beginning, just put and return.
         if (startOff == 0) {
-            dataCachePrj.put(colocatedKey, data);
+            putBlock(blockSize, colocatedKey, data);
 
             return;
         }
@@ -1151,67 +1107,6 @@ public class IgfsDataManager extends IgfsManager {
     }
 
     /**
-     * Put data block read from the secondary file system to the cache.
-     *
-     * @param key Key.
-     * @param data Data.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void putSafe(final IgfsBlockKey key, final byte[] data) throws IgniteCheckedException {
-        assert key != null;
-        assert data != null;
-
-        if (maxPendingPuts > 0) {
-            pendingPutsLock.lock();
-
-            try {
-                while (curPendingPuts > maxPendingPuts)
-                    pendingPutsCond.await(2000, TimeUnit.MILLISECONDS);
-
-                curPendingPuts += data.length;
-            }
-            catch (InterruptedException ignore) {
-                throw new IgniteCheckedException("Failed to put IGFS data block into cache due to interruption: " + key);
-            }
-            finally {
-                pendingPutsLock.unlock();
-            }
-        }
-
-        Runnable task = new Runnable() {
-            @Override public void run() {
-                try {
-                    dataCachePrj.put(key, data);
-                }
-                catch (IgniteCheckedException e) {
-                    U.warn(log, "Failed to put IGFS data block into cache [key=" + key + ", err=" + e + ']');
-                }
-                finally {
-                    if (maxPendingPuts > 0) {
-                        pendingPutsLock.lock();
-
-                        try {
-                            curPendingPuts -= data.length;
-
-                            pendingPutsCond.signalAll();
-                        }
-                        finally {
-                            pendingPutsLock.unlock();
-                        }
-                    }
-                }
-            }
-        };
-
-        try {
-            putExecSvc.submit(task);
-        }
-        catch (RejectedExecutionException ignore) {
-            task.run();
-        }
-    }
-
-    /**
      * @param blocks Blocks to write.
      * @return Future that will be completed after put is done.
      */
@@ -1261,6 +1156,7 @@ public class IgfsDataManager extends IgfsManager {
      * @param nodeId Node ID.
      * @param ackMsg Write acknowledgement message.
      */
+    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
     private void processAckMessage(UUID nodeId, IgfsAckMessage ackMsg) {
         try {
             ackMsg.finishUnmarshal(igfsCtx.kernalContext().config().getMarshaller(), null);
@@ -1343,6 +1239,7 @@ public class IgfsDataManager extends IgfsManager {
          * @throws IgniteCheckedException If failed.
          * @return Data remainder if {@code flush} flag is {@code false}.
          */
+        @SuppressWarnings("ConstantConditions")
         @Nullable public byte[] storeDataBlocks(
             IgfsEntryInfo fileInfo,
             long reservedLen,
@@ -1456,7 +1353,7 @@ public class IgfsDataManager extends IgfsManager {
 
                 if (size != blockSize) {
                     // Partial writes must be always synchronous.
-                    processPartialBlockWrite(id, key, block == first ? off : 0, portion);
+                    processPartialBlockWrite(id, key, block == first ? off : 0, portion, blockSize);
 
                     writtenTotal++;
                 }
@@ -1617,8 +1514,6 @@ public class IgfsDataManager extends IgfsManager {
         protected AsyncDeleteWorker(@Nullable String gridName, String name, IgniteLogger log) {
             super(gridName, name, log);
 
-            long time = System.currentTimeMillis();
-
             stopInfo = IgfsUtils.createDirectory(IgniteUuid.randomUuid());
         }
 
@@ -1642,6 +1537,7 @@ public class IgfsDataManager extends IgfsManager {
         }
 
         /** {@inheritDoc} */
+        @SuppressWarnings("ConstantConditions")
         @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
             try {
                 while (!isCancelled()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e733e912/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/data/IgfsDataPutProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/data/IgfsDataPutProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/data/IgfsDataPutProcessor.java
new file mode 100644
index 0000000..2029d4e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/data/IgfsDataPutProcessor.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs.data;
+
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryReader;
+import org.apache.ignite.binary.BinaryWriter;
+import org.apache.ignite.binary.Binarylizable;
+import org.apache.ignite.internal.processors.igfs.IgfsBlockKey;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * Entry processor to set or replace block byte value.
+ */
+public class IgfsDataPutProcessor implements EntryProcessor<IgfsBlockKey, byte[], Void>, Externalizable, Binarylizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** The new value. */
+    private byte[] newVal;
+
+    /**
+     * Non-arg constructor required by externalizable.
+     */
+    public IgfsDataPutProcessor() {
+        // no-op
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param newVal The new value.
+     */
+    public IgfsDataPutProcessor(byte[] newVal) {
+        assert newVal != null;
+
+        this.newVal = newVal;
+    }
+
+    /** {@inheritDoc} */
+    public Void process(MutableEntry<IgfsBlockKey, byte[]> entry, Object... args)
+        throws EntryProcessorException {
+        byte[] curVal = entry.getValue();
+
+        if (curVal == null || newVal.length > curVal.length)
+            entry.setValue(newVal);
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        newVal = U.readByteArray(in);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeByteArray(out, newVal);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
+        newVal = reader.rawReader().readByteArray();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
+        writer.rawWriter().writeByteArray(newVal);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsDataPutProcessor.class, this);
+    }
+}


[04/13] ignite git commit: IGNITE-3294: IGFS: Created separate processor optimized for entry rename.

Posted by av...@apache.org.
IGNITE-3294: IGFS: Created separate processor optimized for entry rename.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/95513d32
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/95513d32
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/95513d32

Branch: refs/heads/master
Commit: 95513d3214dd1ea7a1a68d6f1709af004c2efccd
Parents: c2f8df9
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Jun 14 11:36:33 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Jun 15 16:33:30 2016 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsMetaManager.java        |   7 +-
 ...IgfsMetaDirectoryListingRenameProcessor.java | 133 +++++++++++++++++++
 2 files changed, 136 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/95513d32/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 5f7dd3d..fd0b07e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -51,6 +51,7 @@ import org.apache.ignite.internal.processors.igfs.client.IgfsClientAbstractCalla
 import org.apache.ignite.internal.processors.igfs.client.meta.IgfsClientMetaIdsForPathCallable;
 import org.apache.ignite.internal.processors.igfs.client.meta.IgfsClientMetaInfoForPathCallable;
 import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaDirectoryCreateProcessor;
+import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaDirectoryListingRenameProcessor;
 import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaFileCreateProcessor;
 import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaFileLockProcessor;
 import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaFileReserveSpaceProcessor;
@@ -1742,10 +1743,8 @@ public class IgfsMetaManager extends IgfsManager {
         IgniteUuid destId, String destName) throws IgniteCheckedException {
         validTxState(true);
 
-        if (F.eq(srcId, destId)) {
-            id2InfoPrj.invoke(srcId, new IgfsMetaDirectoryListingRemoveProcessor(srcName, entry.fileId()));
-            id2InfoPrj.invoke(destId, new IgfsMetaDirectoryListingAddProcessor(destName, entry));
-        }
+        if (F.eq(srcId, destId))
+            id2InfoPrj.invoke(srcId, new IgfsMetaDirectoryListingRenameProcessor(srcName, destName));
         else {
 
             Map<IgniteUuid, EntryProcessor<IgniteUuid, IgfsEntryInfo, Void>> procMap = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/95513d32/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingRenameProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingRenameProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingRenameProcessor.java
new file mode 100644
index 0000000..6460adf
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryListingRenameProcessor.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs.meta;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.binary.BinaryRawWriter;
+import org.apache.ignite.binary.BinaryReader;
+import org.apache.ignite.binary.BinaryWriter;
+import org.apache.ignite.binary.Binarylizable;
+import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo;
+import org.apache.ignite.internal.processors.igfs.IgfsListingEntry;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Processor to rename a single entry in a directory listing.
+ */
+public class IgfsMetaDirectoryListingRenameProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, Void>,
+    Externalizable, Binarylizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Old name. */
+    private String oldName;
+
+    /** New name. */
+    private String newName;
+
+    /**
+     * Constructor.
+     */
+    public IgfsMetaDirectoryListingRenameProcessor() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param oldName Old name.
+     * @param newName New name.
+     */
+    public IgfsMetaDirectoryListingRenameProcessor(String oldName, String newName) {
+        this.oldName = oldName;
+        this.newName = newName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Void process(MutableEntry<IgniteUuid, IgfsEntryInfo> e, Object... args)
+        throws EntryProcessorException {
+        IgfsEntryInfo fileInfo = e.getValue();
+
+        assert fileInfo.isDirectory();
+
+        Map<String, IgfsListingEntry> listing = new HashMap<>(fileInfo.listing());
+
+        // Modify listing in-place.
+        IgfsListingEntry entry = listing.remove(oldName);
+
+        if (entry == null)
+            throw new IgniteException("Directory listing doesn't contain expected entry: " + oldName);
+
+        IgfsListingEntry replacedEntry = listing.put(newName, entry);
+
+        if (replacedEntry != null)
+            throw new IgniteException("Entry with new name already exists [name=" + newName +
+                ", entry=" + replacedEntry + ']');
+
+        e.setValue(fileInfo.listing(listing));
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeString(out, oldName);
+        U.writeString(out, newName);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        oldName = U.readString(in);
+        newName = U.readString(in);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
+        BinaryRawWriter out = writer.rawWriter();
+
+        out.writeString(oldName);
+        out.writeString(newName);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
+        BinaryRawReader in = reader.rawReader();
+
+        oldName = in.readString();
+        newName = in.readString();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsMetaDirectoryListingRenameProcessor.class, this);
+    }
+}


[03/13] ignite git commit: IGNITE-3294: IGFS: Improved "create" performance in DUAL mode.

Posted by av...@apache.org.
IGNITE-3294: IGFS: Improved "create" performance in DUAL mode.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c2f8df96
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c2f8df96
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c2f8df96

Branch: refs/heads/master
Commit: c2f8df96907e096fdeaa8d040463ff5ec1662395
Parents: 51814fd
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Jun 14 11:09:01 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Jun 15 16:32:47 2016 +0300

----------------------------------------------------------------------
 .../processors/igfs/IgfsCreateResult.java       |  66 +++
 .../internal/processors/igfs/IgfsImpl.java      |  53 +-
 .../processors/igfs/IgfsMetaManager.java        | 524 ++++++++++---------
 .../IgfsSecondaryFileSystemCreateContext.java   | 111 ++++
 .../IgfsSecondaryOutputStreamDescriptor.java    |  59 ---
 .../meta/IgfsMetaDirectoryCreateProcessor.java  |  40 +-
 .../igfs/meta/IgfsMetaFileCreateProcessor.java  |  46 +-
 .../igfs/IgfsMetaManagerSelfTest.java           |   4 +-
 8 files changed, 540 insertions(+), 363 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c2f8df96/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCreateResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCreateResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCreateResult.java
new file mode 100644
index 0000000..0b09e02
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsCreateResult.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.OutputStream;
+
+/**
+ * IGFS file create result.
+ */
+public class IgfsCreateResult {
+    /** File info in the primary file system. */
+    private final IgfsEntryInfo info;
+
+    /** Output stream to the secondary file system. */
+    private final OutputStream secondaryOut;
+
+    /**
+     * Constructor.
+     *
+     * @param info File info in the primary file system.
+     * @param secondaryOut Output stream to the secondary file system.
+     */
+    public IgfsCreateResult(IgfsEntryInfo info, @Nullable OutputStream secondaryOut) {
+        assert info != null;
+
+        this.info = info;
+        this.secondaryOut = secondaryOut;
+    }
+
+    /**
+     * @return File info in the primary file system.
+     */
+    public IgfsEntryInfo info() {
+        return info;
+    }
+
+    /**
+     * @return Output stream to the secondary file system.
+     */
+    @Nullable public OutputStream secondaryOutputStream() {
+        return secondaryOut;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsCreateResult.class, this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/c2f8df96/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index e6f2ff6..efb347f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -106,7 +106,6 @@ import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_DELETED;
 import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_READ;
 import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_DELETED;
 import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_READ;
-import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_WRITE;
 import static org.apache.ignite.events.EventType.EVT_IGFS_META_UPDATED;
 import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC;
 import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
@@ -1019,28 +1018,10 @@ public final class IgfsImpl implements IgfsEx {
                     log.debug("Open file for writing [path=" + path + ", bufSize=" + bufSize + ", overwrite=" +
                         overwrite + ", props=" + props + ']');
 
+                // Resolve mode.
                 final IgfsMode mode = resolveMode(path);
 
-                IgfsFileWorkerBatch batch;
-
-                if (mode != PRIMARY) {
-                    assert mode == DUAL_SYNC || mode == DUAL_ASYNC;
-
-                    await(path);
-
-                    IgfsSecondaryOutputStreamDescriptor desc = meta.createDual(secondaryFs, path, simpleCreate,
-                        props, overwrite, bufSize, (short) replication, groupBlockSize(), affKey);
-
-                    batch = newBatch(path, desc.out());
-
-                    IgfsOutputStreamImpl os = new IgfsOutputStreamImpl(igfsCtx, path, desc.info(),
-                        bufferSize(bufSize), mode, batch);
-
-                    IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_WRITE);
-
-                    return os;
-                }
-
+                // Prepare properties.
                 final Map<String, String> dirProps, fileProps;
 
                 if (props == null) {
@@ -1051,19 +1032,37 @@ public final class IgfsImpl implements IgfsEx {
                 else
                     dirProps = fileProps = new HashMap<>(props);
 
-                IgfsEntryInfo res = meta.create(
+                // Prepare context for DUAL mode.
+                IgfsSecondaryFileSystemCreateContext secondaryCtx = null;
+
+                if (mode != PRIMARY)
+                    secondaryCtx = new IgfsSecondaryFileSystemCreateContext(secondaryFs, path, overwrite, simpleCreate,
+                        fileProps, (short)replication, groupBlockSize(), bufSize);
+
+                // Await for async ops completion if in DUAL mode.
+                if (mode != PRIMARY)
+                    await(path);
+
+                // Perform create.
+                IgfsCreateResult res = meta.create(
                     path,
                     dirProps,
                     overwrite,
                     cfg.getBlockSize(),
                     affKey,
-                    evictExclude(path, true),
-                    fileProps
+                    evictExclude(path, mode == PRIMARY),
+                    fileProps,
+                    secondaryCtx
                 );
 
                 assert res != null;
 
-                return new IgfsOutputStreamImpl(igfsCtx, path, res, bufferSize(bufSize), mode, null);
+                // Create secondary file system batch.
+                OutputStream secondaryStream = res.secondaryOutputStream();
+
+                IgfsFileWorkerBatch batch = secondaryStream != null ? newBatch(path, secondaryStream) : null;
+
+                return new IgfsOutputStreamImpl(igfsCtx, path, res.info(), bufferSize(bufSize), mode, batch);
             }
         });
     }
@@ -1094,9 +1093,9 @@ public final class IgfsImpl implements IgfsEx {
 
                     await(path);
 
-                    IgfsSecondaryOutputStreamDescriptor desc = meta.appendDual(secondaryFs, path, bufSize, create);
+                    IgfsCreateResult desc = meta.appendDual(secondaryFs, path, bufSize, create);
 
-                    batch = newBatch(path, desc.out());
+                    batch = newBatch(path, desc.secondaryOutputStream());
 
                     return new IgfsOutputStreamImpl(igfsCtx, path, desc.info(), bufferSize(bufSize), mode, batch);
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c2f8df96/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 4cdf6a9..5f7dd3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -394,40 +394,47 @@ public class IgfsMetaManager extends IgfsManager {
      * @throws IgniteCheckedException If failed.
      */
     public IgfsPathIds pathIds(IgfsPath path) throws IgniteCheckedException {
-        if (busyLock.enterBusy()) {
-            try {
-                validTxState(false);
+        // Prepare parts.
+        String[] components = path.componentsArray();
 
-                // Prepare parts.
-                String[] components = path.componentsArray();
+        String[] parts = new String[components.length + 1];
 
-                String[] parts = new String[components.length + 1];
+        System.arraycopy(components, 0, parts, 1, components.length);
 
-                System.arraycopy(components, 0, parts, 1, components.length);
+        // Get IDs.
+        if (client) {
+            List<IgniteUuid> ids = runClientTask(new IgfsClientMetaIdsForPathCallable(cfg.getName(), path));
 
-                // Prepare IDs.
-                IgniteUuid[] ids = new IgniteUuid[parts.length];
+            return new IgfsPathIds(path, parts, ids.toArray(new IgniteUuid[ids.size()]));
+        }
+        else {
+            if (busyLock.enterBusy()) {
+                try {
+                    validTxState(false);
 
-                ids[0] = IgfsUtils.ROOT_ID;
+                    IgniteUuid[] ids = new IgniteUuid[parts.length];
 
-                for (int i = 1; i < ids.length; i++) {
-                    IgniteUuid id = fileId(ids[i - 1], parts[i], false);
+                    ids[0] = IgfsUtils.ROOT_ID;
 
-                    if (id != null)
-                        ids[i] = id;
-                    else
-                        break;
-                }
+                    for (int i = 1; i < ids.length; i++) {
+                        IgniteUuid id = fileId(ids[i - 1], parts[i], false);
 
-                // Return.
-                return new IgfsPathIds(path, parts, ids);
-            }
-            finally {
-                busyLock.leaveBusy();
+                        if (id != null)
+                            ids[i] = id;
+                        else
+                            break;
+                    }
+
+                    // Return.
+                    return new IgfsPathIds(path, parts, ids);
+                }
+                finally {
+                    busyLock.leaveBusy();
+                }
             }
+            else
+                throw new IllegalStateException("Failed to get file IDS because Grid is stopping: " + path);
         }
-        else
-            throw new IllegalStateException("Failed to get file IDS because Grid is stopping: " + path);
     }
 
     /**
@@ -1147,6 +1154,46 @@ public class IgfsMetaManager extends IgfsManager {
     }
 
     /**
+     * Wheter operation must be re-tries because we have suspicious links which may broke secondary file system
+     * consistency.
+     *
+     * @param pathIds Path IDs.
+     * @param lockInfos Lock infos.
+     * @return Whether to re-try.
+     */
+    private boolean isRetryForSecondary(IgfsPathIds pathIds, Map<IgniteUuid, IgfsEntryInfo> lockInfos) {
+        // We need to ensure that the last locked info is not linked with expected child.
+        // Otherwise there was some concurrent file system update and we have to re-try.
+        if (!pathIds.allExists()) {
+            // Find the last locked index
+            IgfsEntryInfo lastLockedInfo = null;
+            int lastLockedIdx = -1;
+
+            while (lastLockedIdx < pathIds.lastExistingIndex()) {
+                IgfsEntryInfo nextInfo = lockInfos.get(pathIds.id(lastLockedIdx + 1));
+
+                if (nextInfo != null) {
+                    lastLockedInfo = nextInfo;
+                    lastLockedIdx++;
+                }
+                else
+                    break;
+            }
+
+            assert lastLockedIdx < pathIds.count();
+
+            if (lastLockedInfo != null) {
+                String part = pathIds.part(lastLockedIdx + 1);
+
+                if (lastLockedInfo.listing().containsKey(part))
+                    return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
      * Move path to the trash directory.
      *
      * @param path Path.
@@ -1155,8 +1202,8 @@ public class IgfsMetaManager extends IgfsManager {
      * @return ID of an entry located directly under the trash directory.
      * @throws IgniteCheckedException If failed.
      */
-    IgfsDeleteResult softDelete(final IgfsPath path, final boolean recursive, @Nullable IgfsSecondaryFileSystem secondaryFs)
-        throws IgniteCheckedException {
+    IgfsDeleteResult softDelete(final IgfsPath path, final boolean recursive,
+        @Nullable IgfsSecondaryFileSystem secondaryFs) throws IgniteCheckedException {
         while (true) {
             if (busyLock.enterBusy()) {
                 try {
@@ -1164,16 +1211,8 @@ public class IgfsMetaManager extends IgfsManager {
 
                     IgfsPathIds pathIds = pathIds(path);
 
-                    boolean relaxed0 = relaxed;
-
-                    if (!pathIds.allExists()) {
-                        if (secondaryFs == null)
-                            // Return early if target path doesn't exist and we do not have secondary file system.
-                            return new IgfsDeleteResult(false, null);
-                        else
-                            // If path is missing partially, we need more serious locking for DUAL mode.
-                            relaxed0 = false;
-                    }
+                    if (!pathIds.allExists() && secondaryFs == null)
+                        return new IgfsDeleteResult(false, null);
 
                     IgniteUuid victimId = pathIds.lastId();
                     String victimName = pathIds.lastPart();
@@ -1184,7 +1223,7 @@ public class IgfsMetaManager extends IgfsManager {
                     // Prepare IDs to lock.
                     SortedSet<IgniteUuid> allIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR);
 
-                    pathIds.addExistingIds(allIds, relaxed0);
+                    pathIds.addExistingIds(allIds, relaxed);
 
                     IgniteUuid trashId = IgfsUtils.randomTrashId();
 
@@ -1194,38 +1233,11 @@ public class IgfsMetaManager extends IgfsManager {
                         // Lock participants.
                         Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(allIds);
 
-                        if (!pathIds.allExists()) {
-                            // We need to ensure that the last locked info is not linked with expected child.
-                            // Otherwise there was some ocncurrent file system update and we have to re-try.
-                            assert secondaryFs != null;
-
-                            // Find the last locked index
-                            IgfsEntryInfo lastLockedInfo = null;
-                            int lastLockedIdx = -1;
-
-                            while (lastLockedIdx < pathIds.lastExistingIndex()) {
-                                IgfsEntryInfo nextInfo = lockInfos.get(pathIds.id(lastLockedIdx + 1));
-
-                                if (nextInfo != null) {
-                                    lastLockedInfo = nextInfo;
-                                    lastLockedIdx++;
-                                }
-                                else
-                                    break;
-                            }
-
-                            assert lastLockedIdx < pathIds.count();
-
-                            if (lastLockedInfo != null) {
-                                String part = pathIds.part(lastLockedIdx + 1);
-
-                                if (lastLockedInfo.listing().containsKey(part))
-                                    continue;
-                            }
-                        }
+                        if (secondaryFs != null && isRetryForSecondary(pathIds, lockInfos))
+                            continue;
 
                         // Ensure that all participants are still in place.
-                        if (!pathIds.allExists() || !pathIds.verifyIntegrity(lockInfos, relaxed0)) {
+                        if (!pathIds.allExists() || !pathIds.verifyIntegrity(lockInfos, relaxed)) {
                             // For DUAL mode we will try to update the underlying FS still. Note we do that inside TX.
                             if (secondaryFs != null) {
                                 boolean res = secondaryFs.delete(path, recursive);
@@ -1719,7 +1731,7 @@ public class IgfsMetaManager extends IgfsManager {
     /**
      * Transfer entry from one directory to another.
      *
-     * @param entry Entry to be transfered.
+     * @param entry Entry to be transferred.
      * @param srcId Source ID.
      * @param srcName Source name.
      * @param destId Destination ID.
@@ -1730,8 +1742,19 @@ public class IgfsMetaManager extends IgfsManager {
         IgniteUuid destId, String destName) throws IgniteCheckedException {
         validTxState(true);
 
-        id2InfoPrj.invoke(srcId, new IgfsMetaDirectoryListingRemoveProcessor(srcName, entry.fileId()));
-        id2InfoPrj.invoke(destId, new IgfsMetaDirectoryListingAddProcessor(destName, entry));
+        if (F.eq(srcId, destId)) {
+            id2InfoPrj.invoke(srcId, new IgfsMetaDirectoryListingRemoveProcessor(srcName, entry.fileId()));
+            id2InfoPrj.invoke(destId, new IgfsMetaDirectoryListingAddProcessor(destName, entry));
+        }
+        else {
+
+            Map<IgniteUuid, EntryProcessor<IgniteUuid, IgfsEntryInfo, Void>> procMap = new HashMap<>();
+
+            procMap.put(srcId, new IgfsMetaDirectoryListingRemoveProcessor(srcName, entry.fileId()));
+            procMap.put(destId, new IgfsMetaDirectoryListingAddProcessor(destName, entry));
+
+            id2InfoPrj.invokeAll(procMap);
+        }
     }
 
     /**
@@ -1805,7 +1828,7 @@ public class IgfsMetaManager extends IgfsManager {
      * @return Output stream descriptor.
      * @throws Exception On error.
      */
-    IgfsSecondaryOutputStreamDescriptor onSuccessCreate(IgfsSecondaryFileSystem fs, IgfsPath path,
+    IgfsCreateResult onSuccessCreate(IgfsSecondaryFileSystem fs, IgfsPath path,
         boolean simpleCreate, @Nullable final Map<String, String> props, boolean overwrite,
         int bufSize, short replication, long blockSize, IgniteUuid affKey, Map<IgfsPath, IgfsEntryInfo> infos,
         final Deque<IgfsEvent> pendingEvts, final T1<OutputStream> t1) throws Exception {
@@ -1904,60 +1927,7 @@ public class IgfsMetaManager extends IgfsManager {
         if (oldId == null && evts.isRecordable(EventType.EVT_IGFS_FILE_CREATED))
             pendingEvts.add(new IgfsEvent(path, locNode, EventType.EVT_IGFS_FILE_CREATED));
 
-        return new IgfsSecondaryOutputStreamDescriptor(newInfo, out);
-    }
-
-    /**
-     * A delegate method that performs file creation in the synchronization task.
-     *
-     * @param fs File system.
-     * @param path Path.
-     * @param simpleCreate "Simple create" flag.
-     * @param props Properties..
-     * @param overwrite Overwrite flag.
-     * @param bufSize Buffer size.
-     * @param replication Replication factor.
-     * @param blockSize Block size.
-     * @param affKey Affinity key.
-     * @return Output stream descriptor.
-     * @throws IgniteCheckedException If file creation failed.
-     */
-    public IgfsSecondaryOutputStreamDescriptor createDual(final IgfsSecondaryFileSystem fs,
-        final IgfsPath path,
-        final boolean simpleCreate,
-        @Nullable final Map<String, String> props,
-        final boolean overwrite,
-        final int bufSize,
-        final short replication,
-        final long blockSize,
-        final IgniteUuid affKey)
-        throws IgniteCheckedException
-    {
-        if (busyLock.enterBusy()) {
-            try {
-                assert fs != null;
-                assert path != null;
-
-                // Events to fire (can be done outside of a transaction).
-                final Deque<IgfsEvent> pendingEvts = new LinkedList<>();
-
-                CreateFileSynchronizationTask task = new CreateFileSynchronizationTask(fs, path, simpleCreate, props,
-                    overwrite, bufSize, replication, blockSize, affKey, pendingEvts);
-
-                try {
-                    return synchronizeAndExecute(task, fs, false, path.parent());
-                }
-                finally {
-                    for (IgfsEvent evt : pendingEvts)
-                        evts.record(evt);
-                }
-            }
-            finally {
-                busyLock.leaveBusy();
-            }
-        }
-        else
-            throw new IllegalStateException("Failed to create file in DUAL mode because Grid is stopping: " + path);
+        return new IgfsCreateResult(newInfo, out);
     }
 
     /**
@@ -1970,7 +1940,7 @@ public class IgfsMetaManager extends IgfsManager {
      * @return Output stream descriptor.
      * @throws IgniteCheckedException If output stream open for append has failed.
      */
-    IgfsSecondaryOutputStreamDescriptor appendDual(final IgfsSecondaryFileSystem fs, final IgfsPath path,
+    public IgfsCreateResult appendDual(final IgfsSecondaryFileSystem fs, final IgfsPath path,
         final int bufSize, final boolean create) throws IgniteCheckedException {
         if (busyLock.enterBusy()) {
             try {
@@ -1980,12 +1950,12 @@ public class IgfsMetaManager extends IgfsManager {
                 // Events to fire (can be done outside of a transaction).
                 final Deque<IgfsEvent> pendingEvts = new LinkedList<>();
 
-                SynchronizationTask<IgfsSecondaryOutputStreamDescriptor> task =
-                    new SynchronizationTask<IgfsSecondaryOutputStreamDescriptor>() {
+                SynchronizationTask<IgfsCreateResult> task =
+                    new SynchronizationTask<IgfsCreateResult>() {
                         /** Container for the secondary file system output stream. */
                         private final T1<OutputStream> outT1 = new T1<>(null);
 
-                        @Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath,
+                        @Override public IgfsCreateResult onSuccess(Map<IgfsPath,
                             IgfsEntryInfo> infos) throws Exception {
                             validTxState(true);
 
@@ -2034,10 +2004,10 @@ public class IgfsMetaManager extends IgfsManager {
                             if (evts.isRecordable(EventType.EVT_IGFS_FILE_OPENED_WRITE))
                                 pendingEvts.add(new IgfsEvent(path, locNode, EventType.EVT_IGFS_FILE_OPENED_WRITE));
 
-                            return new IgfsSecondaryOutputStreamDescriptor(lockedInfo, outT1.get());
+                            return new IgfsCreateResult(lockedInfo, outT1.get());
                         }
 
-                        @Override public IgfsSecondaryOutputStreamDescriptor onFailure(@Nullable Exception err)
+                        @Override public IgfsCreateResult onFailure(@Nullable Exception err)
                             throws IgniteCheckedException {
                             U.closeQuiet(outT1.get());
 
@@ -2926,8 +2896,8 @@ public class IgfsMetaManager extends IgfsManager {
                         }
                         else {
                             // Create file and parent folders.
-                            IgfsPathsCreateResult res =
-                                createFile(pathIds, lockInfos, dirProps, fileProps, blockSize, affKey, evictExclude);
+                            IgfsPathsCreateResult res = createFile(pathIds, lockInfos, dirProps, fileProps, blockSize,
+                                affKey, evictExclude, null);
 
                             if (res == null)
                                 continue;
@@ -2961,21 +2931,25 @@ public class IgfsMetaManager extends IgfsManager {
      * @param affKey Affinity key.
      * @param evictExclude Evict exclude flag.
      * @param fileProps File properties.
-     * @return @return Resulting info.
+     * @param secondaryCtx Secondary file system create context.
+     * @return @return Operation result.
      * @throws IgniteCheckedException If failed.
      */
-    IgfsEntryInfo create(
+    IgfsCreateResult create(
         final IgfsPath path,
         Map<String, String> dirProps,
         final boolean overwrite,
         final int blockSize,
         final @Nullable IgniteUuid affKey,
         final boolean evictExclude,
-        @Nullable Map<String, String> fileProps) throws IgniteCheckedException {
+        @Nullable Map<String, String> fileProps,
+        @Nullable IgfsSecondaryFileSystemCreateContext secondaryCtx) throws IgniteCheckedException {
         validTxState(false);
 
         while (true) {
             if (busyLock.enterBusy()) {
+                OutputStream secondaryOut = null;
+
                 try {
                     // Prepare path IDs.
                     IgfsPathIds pathIds = pathIds(path);
@@ -3002,6 +2976,9 @@ public class IgfsMetaManager extends IgfsManager {
                     try (IgniteInternalTx tx = startTx()) {
                         Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds);
 
+                        if (secondaryCtx != null && isRetryForSecondary(pathIds, lockInfos))
+                            continue;
+
                         if (!pathIds.verifyIntegrity(lockInfos, relaxed))
                             // Directory structure changed concurrently. So we simply re-try.
                             continue;
@@ -3024,35 +3001,71 @@ public class IgfsMetaManager extends IgfsManager {
 
                             // At this point file can be re-created safely.
 
-                            // First step: add existing to trash listing.
+                            // Add existing to trash listing.
                             IgniteUuid oldId = pathIds.lastId();
 
                             id2InfoPrj.invoke(trashId, new IgfsMetaDirectoryListingAddProcessor(
                                 IgfsUtils.composeNameForTrash(path, oldId), new IgfsListingEntry(oldInfo)));
 
-                            // Second step: replace ID in parent directory.
+                            // Replace ID in parent directory.
                             String name = pathIds.lastPart();
                             IgniteUuid parentId = pathIds.lastParentId();
 
                             id2InfoPrj.invoke(parentId, new IgfsMetaDirectoryListingReplaceProcessor(name, overwriteId));
 
-                            // Third step: create the file.
-                            long createTime = System.currentTimeMillis();
+                            // Create the file.
+                            IgniteUuid newLockId = createFileLockId(false);
+
+                            long newAccessTime;
+                            long newModificationTime;
+                            Map<String, String> newProps;
+                            long newLen;
+                            int newBlockSize;
+
+                            if (secondaryCtx != null) {
+                                secondaryOut = secondaryCtx.create();
+
+                                IgfsFile secondaryFile = secondaryCtx.info();
 
-                            IgfsEntryInfo newInfo = invokeAndGet(overwriteId, new IgfsMetaFileCreateProcessor(createTime,
-                                fileProps, blockSize, affKey, createFileLockId(false), evictExclude));
+                                if (secondaryFile == null)
+                                    throw fsException("Failed to open output stream to the file created in " +
+                                        "the secondary file system because it no longer exists: " + path);
+                                else if (secondaryFile.isDirectory())
+                                    throw fsException("Failed to open output stream to the file created in " +
+                                        "the secondary file system because the path points to a directory: " + path);
+
+                                newAccessTime = secondaryFile.accessTime();
+                                newModificationTime = secondaryFile.modificationTime();
+                                newProps = secondaryFile.properties();
+                                newLen = secondaryFile.length();
+                                newBlockSize = secondaryFile.blockSize();
+                            }
+                            else {
+                                newAccessTime = System.currentTimeMillis();
+                                newModificationTime = newAccessTime;
+                                newProps = fileProps;
+                                newLen = 0L;
+                                newBlockSize = blockSize;
+                            }
+
+                            IgfsEntryInfo newInfo = invokeAndGet(overwriteId,
+                                new IgfsMetaFileCreateProcessor(newAccessTime, newModificationTime, newProps,
+                                    newBlockSize, affKey, newLockId, evictExclude, newLen));
 
                             // Prepare result and commit.
                             tx.commit();
 
                             IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EventType.EVT_IGFS_FILE_OPENED_WRITE);
 
-                            return newInfo;
+                            return new IgfsCreateResult(newInfo, secondaryOut);
                         }
                         else {
                             // Create file and parent folders.
-                            IgfsPathsCreateResult res =
-                                createFile(pathIds, lockInfos, dirProps, fileProps, blockSize, affKey, evictExclude);
+                            if (secondaryCtx != null)
+                                secondaryOut = secondaryCtx.create();
+
+                            IgfsPathsCreateResult res = createFile(pathIds, lockInfos, dirProps, fileProps, blockSize,
+                                affKey, evictExclude, secondaryCtx);
 
                             if (res == null)
                                 continue;
@@ -3063,10 +3076,20 @@ public class IgfsMetaManager extends IgfsManager {
                             // Generate events.
                             generateCreateEvents(res.createdPaths(), true);
 
-                            return res.info();
+                            return new IgfsCreateResult(res.info(), secondaryOut);
                         }
                     }
                 }
+                catch (IgniteException | IgniteCheckedException e) {
+                    U.closeQuiet(secondaryOut);
+
+                    throw e;
+                }
+                catch (Exception e) {
+                    U.closeQuiet(secondaryOut);
+
+                    throw new IgniteCheckedException("Create failed due to unexpected exception: " + path, e);
+                }
                 finally {
                     busyLock.leaveBusy();
                 }
@@ -3092,7 +3115,7 @@ public class IgfsMetaManager extends IgfsManager {
             throw new IgfsParentNotDirectoryException("Failed to create directory (parent " +
                 "element is not a directory)");
 
-        return createFileOrDirectory(true, pathIds, lockInfos, dirProps, null, 0, null, false);
+        return createFileOrDirectory(true, pathIds, lockInfos, dirProps, null, 0, null, false, null);
     }
 
     /**
@@ -3105,22 +3128,25 @@ public class IgfsMetaManager extends IgfsManager {
      * @param blockSize Block size.
      * @param affKey Affinity key (optional)
      * @param evictExclude Evict exclude flag.
+     * @param secondaryCtx Secondary file system create context.
      * @return Result or {@code} if the first parent already contained child with the same name.
      * @throws IgniteCheckedException If failed.
      */
     @Nullable private IgfsPathsCreateResult createFile(IgfsPathIds pathIds, Map<IgniteUuid, IgfsEntryInfo> lockInfos,
         Map<String, String> dirProps, Map<String, String> fileProps, int blockSize, @Nullable IgniteUuid affKey,
-        boolean evictExclude) throws IgniteCheckedException{
+        boolean evictExclude, @Nullable IgfsSecondaryFileSystemCreateContext secondaryCtx)
+        throws IgniteCheckedException{
         // Check if entry we are going to write to is directory.
         if (lockInfos.get(pathIds.lastExistingId()).isFile())
             throw new IgfsParentNotDirectoryException("Failed to open file for write " +
                 "(parent element is not a directory): " + pathIds.path());
 
-        return createFileOrDirectory(false, pathIds, lockInfos, dirProps, fileProps, blockSize, affKey, evictExclude);
+        return createFileOrDirectory(false, pathIds, lockInfos, dirProps, fileProps, blockSize, affKey, evictExclude,
+            secondaryCtx);
     }
 
     /**
-     * Ceate file or directory.
+     * Create file or directory.
      *
      * @param dir Directory flag.
      * @param pathIds Path IDs.
@@ -3130,12 +3156,15 @@ public class IgfsMetaManager extends IgfsManager {
      * @param blockSize Block size.
      * @param affKey Affinity key.
      * @param evictExclude Evict exclude flag.
+     * @param secondaryCtx Secondary file system create context.
      * @return Result.
      * @throws IgniteCheckedException If failed.
      */
+    @SuppressWarnings("unchecked")
     private IgfsPathsCreateResult createFileOrDirectory(boolean dir, IgfsPathIds pathIds,
         Map<IgniteUuid, IgfsEntryInfo> lockInfos, Map<String, String> dirProps, Map<String, String> fileProps,
-        int blockSize, @Nullable IgniteUuid affKey, boolean evictExclude) throws IgniteCheckedException {
+        int blockSize, @Nullable IgniteUuid affKey, boolean evictExclude,
+        @Nullable IgfsSecondaryFileSystemCreateContext secondaryCtx) throws IgniteCheckedException {
         // This is our starting point.
         int lastExistingIdx = pathIds.lastExistingIndex();
         IgfsEntryInfo lastExistingInfo = lockInfos.get(pathIds.lastExistingId());
@@ -3151,8 +3180,10 @@ public class IgfsMetaManager extends IgfsManager {
         if (lastExistingInfo.hasChild(curPart))
             return null;
 
+        Map<IgniteUuid, EntryProcessor> procMap = new HashMap<>();
+
         // First step: add new entry to the last existing element.
-        id2InfoPrj.invoke(lastExistingInfo.id(), new IgfsMetaDirectoryListingAddProcessor(curPart,
+        procMap.put(lastExistingInfo.id(), new IgfsMetaDirectoryListingAddProcessor(curPart,
             new IgfsListingEntry(curId, dir || !pathIds.isLastIndex(curIdx))));
 
         // Events support.
@@ -3161,20 +3192,44 @@ public class IgfsMetaManager extends IgfsManager {
         List<IgfsPath> createdPaths = new ArrayList<>(pathIds.count() - curIdx);
 
         // Second step: create middle directories.
-        long createTime = System.currentTimeMillis();
+        long curTime = System.currentTimeMillis();
 
         while (curIdx < pathIds.count() - 1) {
+            lastCreatedPath = new IgfsPath(lastCreatedPath, curPart);
+
             int nextIdx = curIdx + 1;
 
             String nextPart = pathIds.part(nextIdx);
             IgniteUuid nextId = pathIds.surrogateId(nextIdx);
 
-            id2InfoPrj.invoke(curId, new IgfsMetaDirectoryCreateProcessor(createTime, dirProps,
+            long accessTime;
+            long modificationTime;
+            Map<String, String> props;
+
+            if (secondaryCtx != null) {
+                IgfsFile secondaryInfo = secondaryCtx.fileSystem().info(lastCreatedPath);
+
+                if (secondaryInfo == null)
+                    throw new IgfsException("Failed to perform operation because secondary file system path was " +
+                        "modified concurrnetly: " + lastCreatedPath);
+                else if (secondaryInfo.isFile())
+                    throw new IgfsException("Failed to perform operation because secondary file system entity is " +
+                        "not directory: " + lastCreatedPath);
+
+                accessTime = secondaryInfo.accessTime();
+                modificationTime = secondaryInfo.modificationTime();
+                props = secondaryInfo.properties();
+            }
+            else {
+                accessTime = curTime;
+                modificationTime = curTime;
+                props = dirProps;
+            }
+
+            procMap.put(curId, new IgfsMetaDirectoryCreateProcessor(accessTime, modificationTime, props,
                 nextPart, new IgfsListingEntry(nextId, dir || !pathIds.isLastIndex(nextIdx))));
 
             // Save event.
-            lastCreatedPath = new IgfsPath(lastCreatedPath, curPart);
-
             createdPaths.add(lastCreatedPath);
 
             // Advance things further.
@@ -3185,16 +3240,75 @@ public class IgfsMetaManager extends IgfsManager {
         }
 
         // Third step: create leaf.
-        IgfsEntryInfo info;
+        if (dir) {
+            long accessTime;
+            long modificationTime;
+            Map<String, String> props;
 
-        if (dir)
-            info = invokeAndGet(curId, new IgfsMetaDirectoryCreateProcessor(createTime, dirProps));
-        else
-            info = invokeAndGet(curId, new IgfsMetaFileCreateProcessor(createTime, fileProps,
-                blockSize, affKey, createFileLockId(false), evictExclude));
+            if (secondaryCtx != null) {
+                IgfsFile secondaryInfo = secondaryCtx.fileSystem().info(pathIds.path());
+
+                if (secondaryInfo == null)
+                    throw new IgfsException("Failed to perform operation because secondary file system path was " +
+                        "modified concurrnetly: " + pathIds.path());
+                else if (secondaryInfo.isFile())
+                    throw new IgfsException("Failed to perform operation because secondary file system entity is " +
+                        "not directory: " + lastCreatedPath);
+
+                accessTime = secondaryInfo.accessTime();
+                modificationTime = secondaryInfo.modificationTime();
+                props = secondaryInfo.properties();
+            }
+            else {
+                accessTime = curTime;
+                modificationTime = curTime;
+                props = dirProps;
+            }
+
+            procMap.put(curId, new IgfsMetaDirectoryCreateProcessor(accessTime, modificationTime, props));
+        }
+        else {
+            long newAccessTime;
+            long newModificationTime;
+            Map<String, String> newProps;
+            long newLen;
+            int newBlockSize;
+
+            if (secondaryCtx != null) {
+                IgfsFile secondaryFile = secondaryCtx.info();
+
+                if (secondaryFile == null)
+                    throw fsException("Failed to open output stream to the file created in " +
+                        "the secondary file system because it no longer exists: " + pathIds.path());
+                else if (secondaryFile.isDirectory())
+                    throw fsException("Failed to open output stream to the file created in " +
+                        "the secondary file system because the path points to a directory: " + pathIds.path());
+
+                newAccessTime = secondaryFile.accessTime();
+                newModificationTime = secondaryFile.modificationTime();
+                newProps = secondaryFile.properties();
+                newLen = secondaryFile.length();
+                newBlockSize = secondaryFile.blockSize();
+            }
+            else {
+                newAccessTime = curTime;
+                newModificationTime = curTime;
+                newProps = fileProps;
+                newLen = 0L;
+                newBlockSize = blockSize;
+            }
+
+            procMap.put(curId, new IgfsMetaFileCreateProcessor(newAccessTime, newModificationTime, newProps,
+                newBlockSize, affKey, createFileLockId(false), evictExclude, newLen));
+        }
 
         createdPaths.add(pathIds.path());
 
+        // Execute cache operations.
+        Map<Object, EntryProcessorResult> invokeRes = ((IgniteInternalCache)id2InfoPrj).invokeAll(procMap);
+
+        IgfsEntryInfo info = (IgfsEntryInfo)invokeRes.get(curId).get();
+
         return new IgfsPathsCreateResult(createdPaths, info);
     }
 
@@ -3253,90 +3367,4 @@ public class IgfsMetaManager extends IgfsManager {
          */
         public T onFailure(Exception err) throws IgniteCheckedException;
     }
-
-    /**
-     * Synchronization task to create a file.
-     */
-    private class CreateFileSynchronizationTask implements SynchronizationTask<IgfsSecondaryOutputStreamDescriptor> {
-        /** Secondary file system. */
-        private IgfsSecondaryFileSystem fs;
-
-        /** Path. */
-        private IgfsPath path;
-
-        /** Simple create flag. */
-        private boolean simpleCreate;
-
-        /** Properties. */
-        private Map<String, String> props;
-
-        /** Overwrite flag. */
-        private boolean overwrite;
-
-        /** Buffer size. */
-        private int bufSize;
-
-        /** Replication factor. */
-        private short replication;
-
-        /** Block size. */
-        private long blockSize;
-
-        /** Affinity key. */
-        private IgniteUuid affKey;
-
-        /** Pending events. */
-        private Deque<IgfsEvent> pendingEvts;
-
-        /** Output stream to the secondary file system. */
-        private final T1<OutputStream> outT1 = new T1<>(null);
-
-        /**
-         * Constructor.
-         *
-         * @param fs Secondary file system.
-         * @param path Path.
-         * @param simpleCreate Simple create flag.
-         * @param props Properties.
-         * @param overwrite Overwrite flag.
-         * @param bufSize Buffer size.
-         * @param replication Replication factor.
-         * @param blockSize Block size.
-         * @param affKey Affinity key.
-         * @param pendingEvts Pending events.
-         */
-        public CreateFileSynchronizationTask(IgfsSecondaryFileSystem fs, IgfsPath path, boolean simpleCreate,
-            @Nullable Map<String, String> props, boolean overwrite, int bufSize, short replication, long blockSize,
-            IgniteUuid affKey, Deque<IgfsEvent> pendingEvts) {
-            this.fs = fs;
-            this.path = path;
-            this.simpleCreate = simpleCreate;
-            this.props = props;
-            this.overwrite = overwrite;
-            this.bufSize = bufSize;
-            this.replication = replication;
-            this.blockSize = blockSize;
-            this.affKey = affKey;
-            this.pendingEvts = pendingEvts;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath, IgfsEntryInfo> infos)
-            throws Exception {
-            return onSuccessCreate(fs, path, simpleCreate, props,
-                overwrite, bufSize, replication, blockSize, affKey, infos, pendingEvts, outT1);
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgfsSecondaryOutputStreamDescriptor onFailure(Exception err) throws IgniteCheckedException {
-            U.closeQuiet(outT1.get());
-
-            U.error(log, "File create in DUAL mode failed [path=" + path + ", simpleCreate=" +
-                simpleCreate + ", props=" + props + ", overwrite=" + overwrite + ", bufferSize=" +
-                bufSize + ", replication=" + replication + ", blockSize=" + blockSize + ']', err);
-
-            throw new IgniteCheckedException("Failed to create the file due to secondary file system " +
-                "exception: " + path, err);
-        }
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/c2f8df96/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemCreateContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemCreateContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemCreateContext.java
new file mode 100644
index 0000000..1c0efd6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemCreateContext.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.igfs.IgfsFile;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.OutputStream;
+import java.util.Map;
+
+/**
+ * Context for secondary file system create request.
+ */
+public class IgfsSecondaryFileSystemCreateContext {
+    /** File system. */
+    private final IgfsSecondaryFileSystem fs;
+
+    /** Path. */
+    private final IgfsPath path;
+
+    /** Overwrite flag. */
+    private final boolean overwrite;
+
+    /** Simple create flag. */
+    private final boolean simpleCreate;
+
+    /** Properties. */
+    private final Map<String, String> props;
+
+    /** Replication. */
+    private final short replication;
+
+    /** Block size. */
+    private final long blockSize;
+
+    /** Buffer size. */
+    private final int bufSize;
+
+    /**
+     * Constructor.
+     *
+     * @param fs File system.
+     * @param path Path.
+     * @param overwrite Overwrite flag.
+     * @param simpleCreate Simple create flag.
+     * @param props Properties.
+     * @param replication Replication.
+     * @param blockSize Block size.
+     * @param bufSize Buffer size.
+     */
+    public IgfsSecondaryFileSystemCreateContext(IgfsSecondaryFileSystem fs, IgfsPath path, boolean overwrite,
+        boolean simpleCreate, @Nullable Map<String, String> props, short replication, long blockSize, int bufSize) {
+        this.fs = fs;
+        this.path = path;
+        this.overwrite = overwrite;
+        this.simpleCreate = simpleCreate;
+        this.props = props;
+        this.replication = replication;
+        this.blockSize = blockSize;
+        this.bufSize = bufSize;
+    }
+
+    /**
+     * Create file in the secondary file system.
+     *
+     * @return Output stream.
+     */
+    public OutputStream create() {
+        return simpleCreate ? fs.create(path, overwrite) :
+            fs.create(path, bufSize, overwrite, replication, blockSize, props);
+    }
+
+    /**
+     * Get file info.
+     *
+     * @return File.
+     */
+    public IgfsFile info() {
+        return fs.info(path);
+    }
+
+    /**
+     * @return Secondary file system.
+     */
+    public IgfsSecondaryFileSystem fileSystem() {
+        return fs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsSecondaryFileSystemCreateContext.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c2f8df96/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryOutputStreamDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryOutputStreamDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryOutputStreamDescriptor.java
deleted file mode 100644
index 6bbc2c0..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryOutputStreamDescriptor.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.igfs;
-
-import java.io.OutputStream;
-
-/**
- * Descriptor of an output stream opened to the secondary file system.
- */
-public class IgfsSecondaryOutputStreamDescriptor {
-    /** File info in the primary file system. */
-    private final IgfsEntryInfo info;
-
-    /** Output stream to the secondary file system. */
-    private final OutputStream out;
-
-    /**
-     * Constructor.
-     *
-     * @param info File info in the primary file system.
-     * @param out Output stream to the secondary file system.
-     */
-    IgfsSecondaryOutputStreamDescriptor(IgfsEntryInfo info, OutputStream out) {
-        assert info != null;
-        assert out != null;
-
-        this.info = info;
-        this.out = out;
-    }
-
-    /**
-     * @return File info in the primary file system.
-     */
-    IgfsEntryInfo info() {
-        return info;
-    }
-
-    /**
-     * @return Output stream to the secondary file system.
-     */
-    OutputStream out() {
-        return out;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/c2f8df96/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryCreateProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryCreateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryCreateProcessor.java
index eee9300..b016633 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryCreateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaDirectoryCreateProcessor.java
@@ -48,8 +48,11 @@ public class IgfsMetaDirectoryCreateProcessor implements EntryProcessor<IgniteUu
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Create time. */
-    private long createTime;
+    /** Access time. */
+    private long accessTime;
+
+    /** Modification time. */
+    private long modificationTime;
 
     /** Properties. */
     private Map<String, String> props;
@@ -70,24 +73,27 @@ public class IgfsMetaDirectoryCreateProcessor implements EntryProcessor<IgniteUu
     /**
      * Constructor.
      *
-     * @param createTime Create time.
+     * @param accessTime Create time.
+     * @param modificationTime Modification time.
      * @param props Properties.
      */
-    public IgfsMetaDirectoryCreateProcessor(long createTime, Map<String, String> props) {
-        this(createTime, props, null, null);
+    public IgfsMetaDirectoryCreateProcessor(long accessTime, long modificationTime, Map<String, String> props) {
+        this(accessTime, modificationTime, props, null, null);
     }
 
     /**
      * Constructor.
      *
-     * @param createTime Create time.
+     * @param accessTime Create time.
+     * @param modificationTime Modification time.
      * @param props Properties.
      * @param childName Child name.
      * @param childEntry Child entry.
      */
-    public IgfsMetaDirectoryCreateProcessor(long createTime, Map<String, String> props, String childName,
-        IgfsListingEntry childEntry) {
-        this.createTime = createTime;
+    public IgfsMetaDirectoryCreateProcessor(long accessTime, long modificationTime, Map<String, String> props,
+        String childName, IgfsListingEntry childEntry) {
+        this.accessTime = accessTime;
+        this.modificationTime = modificationTime;
         this.props = props;
         this.childName = childName;
         this.childEntry = childEntry;
@@ -101,8 +107,8 @@ public class IgfsMetaDirectoryCreateProcessor implements EntryProcessor<IgniteUu
             entry.getKey(),
             null,
             props,
-            createTime,
-            createTime
+            accessTime,
+            modificationTime
         );
 
         if (childName != null)
@@ -115,7 +121,8 @@ public class IgfsMetaDirectoryCreateProcessor implements EntryProcessor<IgniteUu
 
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeLong(createTime);
+        out.writeLong(accessTime);
+        out.writeLong(modificationTime);
 
         IgfsUtils.writeProperties(out, props);
 
@@ -127,7 +134,8 @@ public class IgfsMetaDirectoryCreateProcessor implements EntryProcessor<IgniteUu
 
     /** {@inheritDoc} */
     @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        createTime = in.readLong();
+        accessTime = in.readLong();
+        modificationTime = in.readLong();
 
         props = IgfsUtils.readProperties(in);
 
@@ -141,7 +149,8 @@ public class IgfsMetaDirectoryCreateProcessor implements EntryProcessor<IgniteUu
     @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
         BinaryRawWriter out = writer.rawWriter();
 
-        out.writeLong(createTime);
+        out.writeLong(accessTime);
+        out.writeLong(modificationTime);
 
         IgfsUtils.writeProperties(out, props);
 
@@ -155,7 +164,8 @@ public class IgfsMetaDirectoryCreateProcessor implements EntryProcessor<IgniteUu
     @Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
         BinaryRawReader in = reader.rawReader();
 
-        createTime = in.readLong();
+        accessTime = in.readLong();
+        modificationTime = in.readLong();
 
         props = IgfsUtils.readProperties(in);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c2f8df96/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileCreateProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileCreateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileCreateProcessor.java
index 8c4c296..a3e9d48 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileCreateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileCreateProcessor.java
@@ -49,7 +49,10 @@ public class IgfsMetaFileCreateProcessor implements EntryProcessor<IgniteUuid, I
     private static final long serialVersionUID = 0L;
 
     /** Create time. */
-    private long createTime;
+    private long accessTime;
+
+    /** Modification time. */
+    private long modificationTime;
 
     /** Properties. */
     private Map<String, String> props;
@@ -66,6 +69,9 @@ public class IgfsMetaFileCreateProcessor implements EntryProcessor<IgniteUuid, I
     /** Evict exclude flag. */
     private boolean evictExclude;
 
+    /** File length. */
+    private long len;
+
     /**
      * Constructor.
      */
@@ -76,21 +82,25 @@ public class IgfsMetaFileCreateProcessor implements EntryProcessor<IgniteUuid, I
     /**
      * Constructor.
      *
-     * @param createTime Create time.
+     * @param accessTime Access time.
+     * @param modificationTime Modification time.
      * @param props Properties.
      * @param blockSize Block size.
      * @param affKey Affinity key.
      * @param lockId Lock ID.
      * @param evictExclude Evict exclude flag.
+     * @param len File length.
      */
-    public IgfsMetaFileCreateProcessor(long createTime, Map<String, String> props, int blockSize,
-        @Nullable IgniteUuid affKey, IgniteUuid lockId, boolean evictExclude) {
-        this.createTime = createTime;
+    public IgfsMetaFileCreateProcessor(long accessTime, long modificationTime, Map<String, String> props,
+        int blockSize, @Nullable IgniteUuid affKey, IgniteUuid lockId, boolean evictExclude, long len) {
+        this.accessTime = accessTime;
+        this.modificationTime = modificationTime;
         this.props = props;
         this.blockSize = blockSize;
         this.affKey = affKey;
         this.lockId = lockId;
         this.evictExclude = evictExclude;
+        this.len = len;
     }
 
     /** {@inheritDoc} */
@@ -99,13 +109,13 @@ public class IgfsMetaFileCreateProcessor implements EntryProcessor<IgniteUuid, I
         IgfsEntryInfo info = IgfsUtils.createFile(
             entry.getKey(),
             blockSize,
-            0L,
+            len,
             affKey,
             lockId,
             evictExclude,
             props,
-            createTime,
-            createTime
+            accessTime,
+            modificationTime
         );
 
         entry.setValue(info);
@@ -115,7 +125,8 @@ public class IgfsMetaFileCreateProcessor implements EntryProcessor<IgniteUuid, I
 
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeLong(createTime);
+        out.writeLong(accessTime);
+        out.writeLong(modificationTime);
 
         IgfsUtils.writeProperties(out, props);
 
@@ -123,11 +134,14 @@ public class IgfsMetaFileCreateProcessor implements EntryProcessor<IgniteUuid, I
         U.writeGridUuid(out, affKey);
         U.writeGridUuid(out, lockId);
         out.writeBoolean(evictExclude);
+
+        out.writeLong(len);
     }
 
     /** {@inheritDoc} */
     @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        createTime = in.readLong();
+        accessTime = in.readLong();
+        modificationTime = in.readLong();
 
         props = IgfsUtils.readProperties(in);
 
@@ -135,13 +149,16 @@ public class IgfsMetaFileCreateProcessor implements EntryProcessor<IgniteUuid, I
         affKey = U.readGridUuid(in);
         lockId = U.readGridUuid(in);
         evictExclude = in.readBoolean();
+
+        len = in.readLong();
     }
 
     /** {@inheritDoc} */
     @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
         BinaryRawWriter out = writer.rawWriter();
 
-        out.writeLong(createTime);
+        out.writeLong(accessTime);
+        out.writeLong(modificationTime);
 
         IgfsUtils.writeProperties(out, props);
 
@@ -149,13 +166,16 @@ public class IgfsMetaFileCreateProcessor implements EntryProcessor<IgniteUuid, I
         BinaryUtils.writeIgniteUuid(out, affKey);
         BinaryUtils.writeIgniteUuid(out, lockId);
         out.writeBoolean(evictExclude);
+
+        out.writeLong(len);
     }
 
     /** {@inheritDoc} */
     @Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
         BinaryRawReader in = reader.rawReader();
 
-        createTime = in.readLong();
+        accessTime = in.readLong();
+        modificationTime = in.readLong();
 
         props = IgfsUtils.readProperties(in);
 
@@ -163,6 +183,8 @@ public class IgfsMetaFileCreateProcessor implements EntryProcessor<IgniteUuid, I
         affKey = BinaryUtils.readIgniteUuid(in);
         lockId = BinaryUtils.readIgniteUuid(in);
         evictExclude = in.readBoolean();
+
+        len = in.readLong();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/c2f8df96/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
index 8b88157..6053d3b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
@@ -142,7 +142,7 @@ public class IgfsMetaManagerSelfTest extends IgfsCommonAbstractTest {
         assertEmpty(mgr.directoryListing(ROOT_ID));
 
         assertTrue(mgr.mkdirs(new IgfsPath("/dir"), IgfsImpl.DFLT_DIR_META));
-        assertNotNull(mgr.create(new IgfsPath("/file"), null, false, 400, null, false, null));
+        assertNotNull(mgr.create(new IgfsPath("/file"), null, false, 400, null, false, null, null));
 
         IgfsListingEntry dirEntry = mgr.directoryListing(ROOT_ID).get("dir");
         assertNotNull(dirEntry);
@@ -214,7 +214,7 @@ public class IgfsMetaManagerSelfTest extends IgfsCommonAbstractTest {
     private IgfsEntryInfo createFileAndGetInfo(String path) throws IgniteCheckedException {
         IgfsPath p = path(path);
 
-        IgfsEntryInfo res = mgr.create(p, null, false, 400, null, false, null);
+        IgfsEntryInfo res = mgr.create(p, null, false, 400, null, false, null, null).info();
 
         assert res != null;
         assert !res.isDirectory();


[02/13] ignite git commit: Merge remote-tracking branch 'upstream/gridgain-7.6.1' into gridgain-7.6.1

Posted by av...@apache.org.
Merge remote-tracking branch 'upstream/gridgain-7.6.1' into gridgain-7.6.1


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/51814fd8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/51814fd8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/51814fd8

Branch: refs/heads/master
Commit: 51814fd816f1f3c4bf72c91399c03db7b1358ed1
Parents: dc1d088 5c14928
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Jun 15 16:25:19 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Jun 15 16:25:19 2016 +0300

----------------------------------------------------------------------
 modules/cassandra/pom.xml                       |   2 +-
 .../cassandra/bean/CassandraLifeCycleBean.java  | 149 --------
 .../ignite/tests/utils/CassandraHelper.java     |   2 +-
 .../tests/utils/CassandraLifeCycleBean.java     | 149 ++++++++
 .../affinity/fair/FairAffinityFunction.java     |   2 +
 .../rendezvous/RendezvousAffinityFunction.java  |   2 +
 .../configuration/CacheConfiguration.java       |   3 +
 .../ignite/internal/IgniteServicesImpl.java     |   2 +-
 .../ignite/internal/MarshallerContextImpl.java  |  51 ++-
 .../ignite/internal/binary/BinaryContext.java   |  43 ++-
 .../GridClientConnectionManagerAdapter.java     |  25 +-
 .../connection/GridClientNioTcpConnection.java  |   3 +
 .../GridClientOptimizedMarshaller.java          |   4 +-
 .../GridClientZipOptimizedMarshaller.java       | 167 ++++++++
 .../impl/GridTcpRouterNioListenerAdapter.java   |  11 +-
 .../discovery/GridDiscoveryManager.java         |  23 +-
 .../processors/cache/GridCacheAdapter.java      |  46 ++-
 .../processors/cache/GridCacheEntryEx.java      |   9 +-
 .../processors/cache/GridCacheMapEntry.java     | 100 +++--
 .../processors/cache/GridCacheProcessor.java    |  28 +-
 .../processors/cache/GridCacheUtils.java        |   3 +
 .../binary/CacheObjectBinaryProcessorImpl.java  |  29 +-
 .../distributed/dht/GridDhtCacheAdapter.java    |  10 +-
 .../distributed/dht/GridDhtLockFuture.java      |  18 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |   2 -
 .../distributed/dht/GridDhtTxPrepareFuture.java |  19 +-
 .../dht/GridPartitionedGetFuture.java           |   2 -
 .../dht/GridPartitionedSingleGetFuture.java     |   2 -
 .../dht/atomic/GridDhtAtomicCache.java          |   8 -
 .../dht/colocated/GridDhtColocatedCache.java    |   4 +-
 .../dht/preloader/GridDhtForceKeysFuture.java   |   3 +-
 .../dht/preloader/GridDhtPartitionDemander.java |   3 +-
 .../dht/preloader/GridDhtPartitionMap2.java     |   7 +-
 .../distributed/near/GridNearGetFuture.java     |   4 -
 .../cache/distributed/near/GridNearTxLocal.java |   7 +-
 .../local/atomic/GridLocalAtomicCache.java      |   8 -
 .../continuous/CacheContinuousQueryEntry.java   |  16 +
 .../continuous/CacheContinuousQueryHandler.java |   2 +-
 .../continuous/CacheContinuousQueryManager.java | 135 +++++++
 .../cache/transactions/IgniteTxAdapter.java     |   2 -
 .../cache/transactions/IgniteTxEntry.java       |  24 +-
 .../cache/transactions/IgniteTxHandler.java     |  46 +++
 .../transactions/IgniteTxLocalAdapter.java      |  34 +-
 .../cacheobject/IgniteCacheObjectProcessor.java |   7 +
 .../IgniteCacheObjectProcessorImpl.java         |   5 +
 .../continuous/GridContinuousProcessor.java     | 216 ++++++++---
 .../datastreamer/DataStreamerImpl.java          |   3 +-
 .../message/GridClientHandshakeRequest.java     |   4 +-
 .../protocols/tcp/GridTcpRestNioListener.java   |  19 +-
 .../rest/protocols/tcp/GridTcpRestProtocol.java |  12 +-
 .../service/GridServiceProcessor.java           | 307 ++++++++-------
 .../ignite/internal/util/nio/GridNioServer.java |  10 +-
 .../ignite/internal/visor/cache/VisorCache.java |  56 +--
 .../visor/cache/VisorCachePartition.java        |  89 +++++
 .../visor/cache/VisorCachePartitions.java       |  88 +++++
 .../visor/cache/VisorCachePartitionsTask.java   | 152 ++++++++
 .../internal/visor/cache/VisorCacheV3.java      |  68 +---
 .../ignite/spi/discovery/tcp/ServerImpl.java    |   4 +-
 .../ipfinder/jdbc/BasicJdbcIpFinderDialect.java |  28 ++
 .../tcp/ipfinder/jdbc/JdbcIpFinderDialect.java  |  28 ++
 .../jdbc/OracleJdbcIpFinderDialect.java         |  28 ++
 .../ipfinder/jdbc/TcpDiscoveryJdbcIpFinder.java |  72 ++--
 .../internal/ClusterNodeMetricsSelfTest.java    | 101 ++++-
 ...eClientReconnectContinuousProcessorTest.java |  60 ++-
 .../ignite/internal/binary/AffinityKey.java     |  69 ++++
 .../binary/GridBinaryAffinityKeySelfTest.java   |  15 +
 ...aultBinaryMappersBinaryMetaDataSelfTest.java |  17 +
 .../processors/cache/GridCacheTestEntryEx.java  |   5 +-
 .../cache/IgniteCacheAbstractTest.java          |   2 +-
 ...gniteCacheInvokeReadThroughAbstractTest.java | 382 +++++++++++++++++++
 ...iteCacheInvokeReadThroughSingleNodeTest.java | 106 +++++
 .../cache/IgniteCacheInvokeReadThroughTest.java | 182 +++++----
 .../IgniteCacheReadThroughStoreCallTest.java    | 288 ++++++++++++++
 .../IgniteCacheSyncRebalanceModeSelfTest.java   | 114 ++++++
 .../IgniteCacheLoaderWriterAbstractTest.java    |  10 +
 ...ridCacheContinuousQueryAbstractSelfTest.java |   2 +-
 ...niteCacheContinuousQueryBackupQueueTest.java | 135 +++++++
 .../IgniteNoCustomEventsOnNodeStart.java        |  85 +++++
 .../service/GridServiceClientNodeTest.java      | 102 ++++-
 .../unsafe/GridOffheapSnapTreeSelfTest.java     |   2 +-
 .../testsuites/IgniteCacheTestSuite2.java       |   3 +
 .../testsuites/IgniteCacheTestSuite4.java       |   4 +
 .../testsuites/IgniteCacheTestSuite5.java       |   2 +
 .../query/h2/opt/GridH2AbstractKeyValueRow.java |  23 +-
 .../query/h2/opt/GridH2KeyValueRowOffheap.java  |  17 +-
 .../processors/query/h2/sql/GridSqlConst.java   |   5 +
 .../processors/query/h2/sql/GridSqlJoin.java    |  17 +-
 .../processors/query/h2/sql/GridSqlType.java    |   5 +
 .../cache/IgniteCacheOffheapIndexScanTest.java  | 195 ++++++++++
 .../query/IgniteSqlSplitterSelfTest.java        |  75 ++++
 .../IgniteCacheQuerySelfTestSuite.java          |   2 +
 .../IgniteCacheQuerySelfTestSuite3.java         |   2 +
 .../org/apache/ignite/spark/IgniteContext.scala |   4 +-
 .../org/apache/ignite/spark/IgniteRDD.scala     |  25 +-
 .../apache/ignite/spark/JavaIgniteContext.scala |   4 +-
 .../org/apache/ignite/spark/JavaIgniteRDD.scala |   2 +
 .../ignite/spark/impl/IgniteAbstractRDD.scala   |  15 +-
 .../apache/ignite/spark/impl/IgniteSqlRDD.scala |   5 +-
 .../spark/impl/JavaIgniteAbstractRDD.scala      |  34 --
 .../org/apache/ignite/spark/IgniteRDDSpec.scala |  25 +-
 .../cache/websession/WebSessionFilter.java      |  39 +-
 .../internal/websession/WebSessionSelfTest.java | 330 +++++++++++++++-
 102 files changed, 4030 insertions(+), 885 deletions(-)
----------------------------------------------------------------------



[11/13] ignite git commit: IGNITE-3216 Need to deduplicate addresses registered in the IP finder

Posted by av...@apache.org.
IGNITE-3216 Need to deduplicate addresses registered in the IP finder


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/40d41c10
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/40d41c10
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/40d41c10

Branch: refs/heads/master
Commit: 40d41c10e3179436bc1ae1e12bdc2d6e3a7208f0
Parents: b5f4aac
Author: Anton Vinogradov <av...@apache.org>
Authored: Thu Jun 16 10:25:38 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Thu Jun 16 14:23:05 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/util/IgniteUtils.java       |  8 +--
 .../ignite/internal/util/lang/GridFunc.java     | 66 --------------------
 .../internal/util/IgniteUtilsSelfTest.java      | 15 +++++
 3 files changed, 19 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/40d41c10/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 1147124..2a83ad4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -8581,7 +8581,7 @@ public abstract class IgniteUtils {
      */
     public static Collection<InetAddress> toInetAddresses(Collection<String> addrs,
         Collection<String> hostNames) throws IgniteCheckedException {
-        List<InetAddress> res = new ArrayList<>(addrs.size());
+        Set<InetAddress> res = new HashSet<>(addrs.size());
 
         Iterator<String> hostNamesIt = hostNames.iterator();
 
@@ -8614,7 +8614,7 @@ public abstract class IgniteUtils {
             throw new IgniteCheckedException("Addresses can not be resolved [addr=" + addrs +
                 ", hostNames=" + hostNames + ']');
 
-        return Collections.unmodifiableList(res);
+        return res;
     }
 
     /**
@@ -8640,7 +8640,7 @@ public abstract class IgniteUtils {
      */
     public static Collection<InetSocketAddress> toSocketAddresses(Collection<String> addrs,
         Collection<String> hostNames, int port) {
-        List<InetSocketAddress> res = new ArrayList<>(addrs.size());
+        Set<InetSocketAddress> res = new HashSet<>(addrs.size());
 
         Iterator<String> hostNamesIt = hostNames.iterator();
 
@@ -8661,7 +8661,7 @@ public abstract class IgniteUtils {
             res.add(new InetSocketAddress(addr, port));
         }
 
-        return Collections.unmodifiableList(res);
+        return res;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/40d41c10/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index ab31625..6ad136c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -1715,72 +1715,6 @@ public class GridFunc {
     }
 
     /**
-     * Creates read-only light-weight view on given list with provided transformation.
-     * Resulting list will only "have" {@code transformed} elements. Note that only wrapping
-     * list will be created and no duplication of data will occur.
-     *
-     * @param c Input list that serves as a base for the view.
-     * @param trans Transformation closure.
-     * @param <T1> Type of the list.
-     * @return Light-weight view on given list with provided transformation.
-     */
-    @SuppressWarnings("RedundantTypeArguments")
-    @Deprecated
-    public static <T1, T2> List<T2> viewListReadOnly(@Nullable final List<? extends T1> c,
-        final IgniteClosure<? super T1, T2> trans) {
-        A.notNull(trans, "trans");
-
-        if (isEmpty(c))
-            return Collections.emptyList();
-
-        assert c != null;
-
-        return new GridSerializableList<T2>() {
-            /** */
-            private static final long serialVersionUID = 3126625219739967068L;
-
-            @Override public T2 get(int idx) {
-                return trans.apply(c.get(idx));
-            }
-
-            @NotNull
-            @Override public Iterator<T2> iterator() {
-                return F.<T1, T2>iterator(c, trans, true);
-            }
-
-            @Override public int size() {
-                return c.size();
-            }
-
-            @Override public boolean isEmpty() {
-                return c.isEmpty();
-            }
-        };
-    }
-
-    /**
-     * Creates a view on given list with provided transformer and predicates.
-     * Resulting list will only "have" elements for which all provided predicates, if any,
-     * evaluate to {@code true}. Note that a new collection will be created and data will
-     * be copied.
-     *
-     * @param c Input list that serves as a base for the view.
-     * @param trans Transforming closure from T1 to T2.
-     * @param p Optional predicates. If predicates are not provided - all elements will be in the view.
-     * @return View on given list with provided predicate.
-     */
-    @Deprecated
-    public static <T1, T2> List<T2> transformList(Collection<? extends T1> c,
-        IgniteClosure<? super T1, T2> trans, @Nullable IgnitePredicate<? super T1>... p) {
-        A.notNull(c, "c", trans, "trans");
-
-        if (isAlwaysFalse(p))
-            return Collections.emptyList();
-
-        return new ArrayList<>(transform(retain(c, true, p), trans));
-    }
-
-    /**
      * Creates light-weight view on given map with provided predicates. Resulting map will
      * only "have" keys for which all provided predicates, if any, evaluates to {@code true}.
      * Note that only wrapping map will be created and no duplication of data will occur.

http://git-wip-us.apache.org/repos/asf/ignite/blob/40d41c10/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
index 520fa76..d774065 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
@@ -742,6 +742,21 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     *
+     */
+    public void testToSocketAddressesNoDuplicates() {
+        Collection<String> addrs = new ArrayList<>();
+
+        addrs.add("127.0.0.1");
+        addrs.add("localhost");
+
+        Collection<String> hostNames = new ArrayList<>();
+        int port = 1234;
+
+        assertEquals(1, U.toSocketAddresses(addrs, hostNames, port).size());
+    }
+
+    /**
      * Test enum.
      */
     private enum TestEnum {