You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/07/30 03:02:04 UTC

[1/8] incubator-ignite git commit: IGNITE-104 - Ordered ATOMIC updates

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-104 e6f0ac3c4 -> aa11f6446


IGNITE-104 - Ordered ATOMIC updates


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/03e48ba8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/03e48ba8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/03e48ba8

Branch: refs/heads/ignite-104
Commit: 03e48ba8ccb417cfd1f512f2bd1efe6ceb6c86fa
Parents: e6f0ac3
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Tue Jul 28 22:19:49 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Tue Jul 28 22:19:49 2015 -0700

----------------------------------------------------------------------
 .../cache/distributed/dht/atomic/GridDhtAtomicCache.java           | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/03e48ba8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index bb036da..7a8cc06 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1764,7 +1764,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             dhtFut.addNearWriteEntries(filteredReaders,
                                 entry,
                                 updRes.newValue(),
-                                null,
+                                op == TRANSFORM ? req.entryProcessor(i) : null,
                                 updRes.newTtl(),
                                 updRes.conflictExpireTime());
                     }


[7/8] incubator-ignite git commit: IGNITE-104 - Ordered ATOMIC updates

Posted by vk...@apache.org.
IGNITE-104 - Ordered ATOMIC updates


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/112c567c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/112c567c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/112c567c

Branch: refs/heads/ignite-104
Commit: 112c567cb780fc04a0d81c9b8b1e1f60cd2fbabf
Parents: 9781ea4
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Wed Jul 29 17:58:33 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Wed Jul 29 17:58:33 2015 -0700

----------------------------------------------------------------------
 .../dht/atomic/GridNearAtomicUpdateRequest.java | 48 +++++++-------------
 .../atomic/GridNearAtomicUpdateResponse.java    | 20 ++------
 2 files changed, 20 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/112c567c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index 734cf6d..86c5ab8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -666,60 +666,54 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 writer.incrementState();
 
             case 16:
-//                if (!writer.writeInt("part", part))
-//                    return false;
-
-                writer.incrementState();
-
-            case 17:
                 if (!writer.writeBoolean("retval", retval))
                     return false;
 
                 writer.incrementState();
 
-            case 18:
+            case 17:
                 if (!writer.writeBoolean("skipStore", skipStore))
                     return false;
 
                 writer.incrementState();
 
-            case 19:
+            case 18:
                 if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
-            case 20:
+            case 19:
                 if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
                     return false;
 
                 writer.incrementState();
 
-            case 21:
+            case 20:
                 if (!writer.writeInt("taskNameHash", taskNameHash))
                     return false;
 
                 writer.incrementState();
 
-            case 22:
+            case 21:
                 if (!writer.writeBoolean("topLocked", topLocked))
                     return false;
 
                 writer.incrementState();
 
-            case 23:
+            case 22:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
 
-            case 24:
+            case 23:
                 if (!writer.writeMessage("updateVer", updateVer))
                     return false;
 
                 writer.incrementState();
 
-            case 25:
+            case 24:
                 if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
                     return false;
 
@@ -850,14 +844,6 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 16:
-//                part = reader.readInt("part");
-//
-//                if (!reader.isLastRead())
-//                    return false;
-
-                reader.incrementState();
-
-            case 17:
                 retval = reader.readBoolean("retval");
 
                 if (!reader.isLastRead())
@@ -865,7 +851,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 18:
+            case 17:
                 skipStore = reader.readBoolean("skipStore");
 
                 if (!reader.isLastRead())
@@ -873,7 +859,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 19:
+            case 18:
                 subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
@@ -881,7 +867,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 20:
+            case 19:
                 byte syncModeOrd;
 
                 syncModeOrd = reader.readByte("syncMode");
@@ -893,7 +879,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 21:
+            case 20:
                 taskNameHash = reader.readInt("taskNameHash");
 
                 if (!reader.isLastRead())
@@ -901,7 +887,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 22:
+            case 21:
                 topLocked = reader.readBoolean("topLocked");
 
                 if (!reader.isLastRead())
@@ -909,7 +895,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 23:
+            case 22:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -917,7 +903,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 24:
+            case 23:
                 updateVer = reader.readMessage("updateVer");
 
                 if (!reader.isLastRead())
@@ -925,7 +911,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
                 reader.incrementState();
 
-            case 25:
+            case 24:
                 vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -945,7 +931,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 26;
+        return 25;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/112c567c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 2b30536..8e1bee2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -485,18 +485,12 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
                 writer.incrementState();
 
             case 12:
-//                if (!writer.writeInt("part", part))
-//                    return false;
-
-                writer.incrementState();
-
-            case 13:
                 if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
-            case 14:
+            case 13:
                 if (!writer.writeMessage("ret", ret))
                     return false;
 
@@ -591,14 +585,6 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
                 reader.incrementState();
 
             case 12:
-//                part = reader.readInt("part");
-//
-//                if (!reader.isLastRead())
-//                    return false;
-
-                reader.incrementState();
-
-            case 13:
                 remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -606,7 +592,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
                 reader.incrementState();
 
-            case 14:
+            case 13:
                 ret = reader.readMessage("ret");
 
                 if (!reader.isLastRead())
@@ -626,7 +612,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 15;
+        return 14;
     }
 
     /** {@inheritDoc} */


[5/8] incubator-ignite git commit: IGNITE-104 - Ordered ATOMIC updates

Posted by vk...@apache.org.
IGNITE-104 - Ordered ATOMIC updates


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7c73fc5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7c73fc5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7c73fc5d

Branch: refs/heads/ignite-104
Commit: 7c73fc5d8d81f3cded6bffd4dcf3d1e48ad84d64
Parents: e5c69b8
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Wed Jul 29 17:26:44 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Wed Jul 29 17:26:44 2015 -0700

----------------------------------------------------------------------
 .../dht/atomic/GridDhtAtomicCache.java           |  9 ++-------
 .../dht/atomic/GridDhtAtomicUpdateFuture.java    |  3 ---
 .../dht/atomic/GridDhtAtomicUpdateRequest.java   | 19 ++-----------------
 .../distributed/near/GridNearAtomicCache.java    |  4 ++--
 4 files changed, 6 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7c73fc5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 02e48df..31606b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1767,7 +1767,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             dhtFut.addNearWriteEntries(filteredReaders,
                                 entry,
                                 updRes.newValue(),
-                                op == TRANSFORM ? req.entryProcessor(i) : null,
                                 updRes.newTtl(),
                                 updRes.conflictExpireTime());
                     }
@@ -2034,13 +2033,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     }
 
                     if (dhtFut != null) {
-                        EntryProcessor<Object, Object, Object> entryProcessor =
-                            entryProcessorMap == null ? null : entryProcessorMap.get(entry.key());
-
                         if (!batchRes.readersOnly())
                             dhtFut.addWriteEntry(entry,
                                 writeVal,
-                                entryProcessor,
+                                entryProcessorMap == null ? null : entryProcessorMap.get(entry.key()),
                                 updRes.newTtl(),
                                 CU.EXPIRE_TIME_CALCULATE,
                                 null);
@@ -2049,7 +2045,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             dhtFut.addNearWriteEntries(filteredReaders,
                                 entry,
                                 writeVal,
-                                entryProcessor,
                                 updRes.newTtl(),
                                 CU.EXPIRE_TIME_CALCULATE);
                     }
@@ -2465,7 +2460,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             /*event*/true,
                             /*metrics*/true,
                             /*primary*/false,
-                            /*check version*/!req.forceTransformBackups(),
+                            /*check version*/op != TRANSFORM || !req.forceTransformBackups(),
                             req.topologyVersion(),
                             CU.empty0(),
                             replicate ? DR_BACKUP : DR_NONE,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7c73fc5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 15ec121..ab0c2e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -270,14 +270,12 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
      * @param readers Entry readers.
      * @param entry Entry.
      * @param val Value.
-     * @param entryProcessor Entry processor..
      * @param ttl TTL for near cache update (optional).
      * @param expireTime Expire time for near cache update (optional).
      */
     public void addNearWriteEntries(Iterable<UUID> readers,
         GridDhtCacheEntry entry,
         @Nullable CacheObject val,
-        EntryProcessor<Object, Object, Object> entryProcessor,
         long ttl,
         long expireTime) {
         CacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode();
@@ -323,7 +321,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
 
             updateReq.addNearWriteValue(entry.key(),
                 val,
-                entryProcessor,
                 ttl,
                 expireTime);
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7c73fc5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 7149dec..6340c93 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -267,36 +267,21 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     /**
      * @param key Key to add.
      * @param val Value, {@code null} if should be removed.
-     * @param entryProcessor Entry processor.
      * @param ttl TTL.
      * @param expireTime Expire time.
      */
     public void addNearWriteValue(KeyCacheObject key,
         @Nullable CacheObject val,
-        EntryProcessor<Object, Object, Object> entryProcessor,
         long ttl,
         long expireTime)
     {
         if (nearKeys == null) {
             nearKeys = new ArrayList<>();
-
-            if (forceTransformBackups) {
-                nearEntryProcessors = new ArrayList<>();
-                nearEntryProcessorsBytes = new ArrayList<>();
-            }
-            else
-                nearVals = new ArrayList<>();
+            nearVals = new ArrayList<>();
         }
 
         nearKeys.add(key);
-
-        if (forceTransformBackups) {
-            assert entryProcessor != null;
-
-            nearEntryProcessors.add(entryProcessor);
-        }
-        else
-            nearVals.add(val);
+        nearVals.add(val);
 
         if (ttl >= 0) {
             if (nearTtls == null) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7c73fc5d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 0aa1638..707facc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -225,7 +225,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                         /*write-through*/false,
                         /*read-through*/false,
                         /*retval*/false,
-                        /**expiry policy*/null,
+                        /*expiry policy*/null,
                         /*event*/true,
                         /*metrics*/true,
                         /*primary*/false,
@@ -336,7 +336,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
                             /*event*/true,
                             /*metrics*/true,
                             /*primary*/false,
-                            /*check version*/!req.forceTransformBackups(),
+                            /*check version*/op != TRANSFORM || !req.forceTransformBackups(),
                             req.topologyVersion(),
                             CU.empty0(),
                             DR_NONE,


[8/8] incubator-ignite git commit: IGNITE-104 - Ordered ATOMIC updates

Posted by vk...@apache.org.
IGNITE-104 - Ordered ATOMIC updates


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/aa11f644
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/aa11f644
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/aa11f644

Branch: refs/heads/ignite-104
Commit: aa11f6446f14174e0ca4e67b85b1403ec6ed7016
Parents: 112c567
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Wed Jul 29 18:01:36 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Wed Jul 29 18:01:36 2015 -0700

----------------------------------------------------------------------
 .../dht/atomic/GridAtomicMappingKey.java        | 86 -------------------
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   | 87 +++++++++++++++++---
 2 files changed, 75 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa11f644/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java
deleted file mode 100644
index 52e3c7f..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.util.*;
-
-/**
- * Mapping Key.
- */
-class GridAtomicMappingKey {
-    /** Node ID. */
-    private final UUID nodeId;
-
-    /** Partition. */
-    private final int part;
-
-    /**
-     * @param nodeId Node ID.
-     * @param part Partition.
-     */
-    GridAtomicMappingKey(UUID nodeId, int part) {
-        assert nodeId != null;
-        assert part >= -1 : part;
-
-        this.nodeId = nodeId;
-        this.part = part;
-    }
-
-    /**
-     * @return Node ID.
-     */
-    UUID nodeId() {
-        return nodeId;
-    }
-
-    /**
-     * @return Partition.
-     */
-    int partition() {
-        return part;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object o) {
-        if (this == o)
-            return true;
-
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        GridAtomicMappingKey key = (GridAtomicMappingKey)o;
-
-        return nodeId.equals(key.nodeId) && part == key.part;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        int res = nodeId.hashCode();
-
-        res = 31 * res + part;
-
-        return res;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridAtomicMappingKey.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa11f644/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 8595dc7..93c20da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -73,7 +73,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
 
     /** Mappings. */
     @GridToStringInclude
-    private ConcurrentMap<GridAtomicMappingKey, GridDhtAtomicUpdateRequest> mappings = new ConcurrentHashMap8<>();
+    private ConcurrentMap<MappingKey, GridDhtAtomicUpdateRequest> mappings = new ConcurrentHashMap8<>();
 
     /** Entries with readers. */
     private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries;
@@ -142,8 +142,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
 
     /** {@inheritDoc} */
     @Override public Collection<? extends ClusterNode> nodes() {
-        return F.view(F.viewReadOnly(mappings.keySet(), new C1<GridAtomicMappingKey, ClusterNode>() {
-            @Override public ClusterNode apply(GridAtomicMappingKey mappingKey) {
+        return F.view(F.viewReadOnly(mappings.keySet(), new C1<MappingKey, ClusterNode>() {
+            @Override public ClusterNode apply(MappingKey mappingKey) {
                 return cctx.kernalContext().discovery().node(mappingKey.nodeId());
             }
         }), F.notNull());
@@ -154,15 +154,15 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         if (log.isDebugEnabled())
             log.debug("Processing node leave event [fut=" + this + ", nodeId=" + nodeId + ']');
 
-        Collection<GridAtomicMappingKey> mappingKeys = new ArrayList<>(mappings.size());
+        Collection<MappingKey> mappingKeys = new ArrayList<>(mappings.size());
 
-        for (GridAtomicMappingKey mappingKey : mappings.keySet()) {
+        for (MappingKey mappingKey : mappings.keySet()) {
             if (mappingKey.nodeId().equals(nodeId))
                 mappingKeys.add(mappingKey);
         }
 
         if (!mappingKeys.isEmpty()) {
-            for (GridAtomicMappingKey mappingKey : mappingKeys)
+            for (MappingKey mappingKey : mappingKeys)
                 mappings.remove(mappingKey);
 
             checkComplete();
@@ -234,7 +234,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         for (ClusterNode node : dhtNodes) {
             UUID nodeId = node.id();
 
-            GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, part);
+            MappingKey mappingKey = new MappingKey(nodeId, part);
 
             if (!nodeId.equals(cctx.localNodeId())) {
                 GridDhtAtomicUpdateRequest updateReq = mappings.get(mappingKey);
@@ -287,7 +287,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         int part = cctx.config().isAtomicOrderedUpdates() ? entry.partition() : -1;
 
         for (UUID nodeId : readers) {
-            GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, part);
+            MappingKey mappingKey = new MappingKey(nodeId, part);
 
             GridDhtAtomicUpdateRequest updateReq = mappings.get(mappingKey);
 
@@ -345,8 +345,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
      */
     public void map() {
         if (!mappings.isEmpty()) {
-            for (Map.Entry<GridAtomicMappingKey, GridDhtAtomicUpdateRequest> e : mappings.entrySet()) {
-                GridAtomicMappingKey mappingKey = e.getKey();
+            for (Map.Entry<MappingKey, GridDhtAtomicUpdateRequest> e : mappings.entrySet()) {
+                MappingKey mappingKey = e.getKey();
                 GridDhtAtomicUpdateRequest req = e.getValue();
 
                 UUID nodeId = mappingKey.nodeId();
@@ -429,7 +429,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
             }
         }
 
-        mappings.remove(new GridAtomicMappingKey(nodeId, updateRes.partition()));
+        mappings.remove(new MappingKey(nodeId, updateRes.partition()));
 
         checkComplete();
     }
@@ -445,7 +445,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
             log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId + ']');
 
         for (Integer part : res.partitions())
-            mappings.remove(new GridAtomicMappingKey(nodeId, part));
+            mappings.remove(new MappingKey(nodeId, part));
 
         checkComplete();
     }
@@ -468,4 +468,67 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         return S.toString(GridDhtAtomicUpdateFuture.class, this);
     }
 
+    /**
+     * Mapping Key.
+     */
+    private static class MappingKey {
+        /** Node ID. */
+        private final UUID nodeId;
+
+        /** Partition. */
+        private final int part;
+
+        /**
+         * @param nodeId Node ID.
+         * @param part Partition.
+         */
+        MappingKey(UUID nodeId, int part) {
+            assert nodeId != null;
+            assert part >= -1 : part;
+
+            this.nodeId = nodeId;
+            this.part = part;
+        }
+
+        /**
+         * @return Node ID.
+         */
+        UUID nodeId() {
+            return nodeId;
+        }
+
+        /**
+         * @return Partition.
+         */
+        int partition() {
+            return part;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            MappingKey key = (MappingKey)o;
+
+            return nodeId.equals(key.nodeId) && part == key.part;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = nodeId.hashCode();
+
+            res = 31 * res + part;
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(MappingKey.class, this);
+        }
+    }
 }


[2/8] incubator-ignite git commit: #ignite-gg-10610: Security hole if DataStreamer is used for populating the cache

Posted by vk...@apache.org.
#ignite-gg-10610: Security hole if DataStreamer is used for populating the cache


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5288b2d8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5288b2d8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5288b2d8

Branch: refs/heads/ignite-104
Commit: 5288b2d8b882bbb86d69e1019821d51803685861
Parents: a127756
Author: ivasilinets <iv...@gridgain.com>
Authored: Wed Jul 29 15:27:31 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Wed Jul 29 15:27:31 2015 +0300

----------------------------------------------------------------------
 .../datastreamer/DataStreamerImpl.java          | 22 ++++++++++++++++++++
 .../datastreamer/DataStreamerUpdateJob.java     | 20 +++++++++++++++++-
 2 files changed, 41 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5288b2d8/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 605f478..5fae676 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.security.*;
 import org.apache.ignite.stream.*;
 import org.jetbrains.annotations.*;
 import org.jsr166.*;
@@ -413,6 +414,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
     @Override public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) {
         A.notEmpty(entries, "entries");
 
+        checkSecurityPermission(SecurityPermission.CACHE_PUT);
+
         enterBusy();
 
         try {
@@ -520,6 +523,11 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
     @Override public IgniteFuture<?> addData(K key, V val) {
         A.notNull(key, "key");
 
+        if (val == null)
+            checkSecurityPermission(SecurityPermission.CACHE_REMOVE);
+        else
+            checkSecurityPermission(SecurityPermission.CACHE_PUT);
+
         KeyCacheObject key0 = cacheObjProc.toCacheKeyObject(cacheObjCtx, key, true);
         CacheObject val0 = cacheObjProc.toCacheObject(cacheObjCtx, val, true);
 
@@ -980,6 +988,20 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
     }
 
     /**
+     * Check permissions for streaming.
+     *
+     * @param perm Security permission.
+     * @throws org.apache.ignite.plugin.security.SecurityException If permissions are not enough for streaming.
+     */
+    private void checkSecurityPermission(SecurityPermission perm)
+        throws org.apache.ignite.plugin.security.SecurityException{
+        if (!ctx.security().enabled())
+            return;
+
+        ctx.security().authorize(cacheName, perm, null);
+    }
+
+    /**
      *
      */
     private class Buffer {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5288b2d8/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
index 21ba3ac..9e0703a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.plugin.security.*;
 import org.apache.ignite.stream.*;
 import org.jetbrains.annotations.*;
 
@@ -106,8 +107,13 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> {
 
                 CacheObject val = e.getValue();
 
-                if (val != null)
+                if (val != null) {
+                    checkSecurityPermission(SecurityPermission.CACHE_PUT);
+
                     val.finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader());
+                }
+                else
+                    checkSecurityPermission(SecurityPermission.CACHE_REMOVE);
             }
 
             if (unwrapEntries()) {
@@ -139,4 +145,16 @@ class DataStreamerUpdateJob implements GridPlainCallable<Object> {
     private boolean unwrapEntries() {
         return !(rcvr instanceof DataStreamerCacheUpdaters.InternalUpdater);
     }
+
+    /**
+     * @param perm Security permission.
+     * @throws org.apache.ignite.plugin.security.SecurityException If permission is not enough.
+     */
+    private void checkSecurityPermission(SecurityPermission perm)
+        throws org.apache.ignite.plugin.security.SecurityException {
+        if (!ctx.security().enabled())
+            return;
+
+        ctx.security().authorize(cacheName, perm, null);
+    }
 }


[3/8] incubator-ignite git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-104

Posted by vk...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-104


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/424ab07c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/424ab07c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/424ab07c

Branch: refs/heads/ignite-104
Commit: 424ab07cb3dff8f7aeafeb1de5af1af8045145e3
Parents: 03e48ba 5288b2d
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Wed Jul 29 12:39:02 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Wed Jul 29 12:39:02 2015 -0700

----------------------------------------------------------------------
 .../datastreamer/DataStreamerImpl.java          | 22 ++++++++++++++++++++
 .../datastreamer/DataStreamerUpdateJob.java     | 20 +++++++++++++++++-
 2 files changed, 41 insertions(+), 1 deletion(-)
----------------------------------------------------------------------



[4/8] incubator-ignite git commit: IGNITE-104 - Ordered ATOMIC updates

Posted by vk...@apache.org.
IGNITE-104 - Ordered ATOMIC updates


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e5c69b83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e5c69b83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e5c69b83

Branch: refs/heads/ignite-104
Commit: e5c69b831a8f564440bd0960cc2a865cd907525a
Parents: 424ab07
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Wed Jul 29 14:19:24 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Wed Jul 29 14:19:24 2015 -0700

----------------------------------------------------------------------
 .../dht/atomic/GridDhtAtomicCache.java          |  9 ++++--
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   | 33 +++++++++++++++-----
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  |  7 ++++-
 3 files changed, 37 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5c69b83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 7a8cc06..02e48df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1178,6 +1178,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 e.printStackTrace();
             }
             finally {
+                if (dhtFut != null && !remap)
+                    dhtFut.map();
+
                 if (locked != null)
                     unlockEntries(locked, req.topologyVersion());
 
@@ -1221,8 +1224,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         else {
             // If there are backups, map backup update future.
             if (dhtFut != null)
-                dhtFut.map();
-                // Otherwise, complete the call.
+                dhtFut.onMapped();
+            // Otherwise, complete the call.
             else
                 completionCb.apply(req, res);
         }
@@ -2523,7 +2526,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         }
         catch (ClusterTopologyCheckedException ignored) {
             U.warn(log, "Failed to send DHT atomic update response to node because it left grid: " +
-                req.nodeId());
+                nodeId);
         }
         catch (IgniteCheckedException e) {
             U.error(log, "Failed to send DHT atomic update response (did node leave grid?) [nodeId=" + nodeId +

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5c69b83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 3a68263..15ec121 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -90,6 +90,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
     /** */
     private boolean waitForExchange;
 
+    /** */
+    private boolean mapped;
+
     /**
      * @param cctx Cache context.
      * @param completionCb Callback to invoke when future is completed.
@@ -349,37 +352,51 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
                 GridAtomicMappingKey mappingKey = e.getKey();
                 GridDhtAtomicUpdateRequest req = e.getValue();
 
+                UUID nodeId = mappingKey.nodeId();
+                int part = mappingKey.partition();
+
+                assert !nodeId.equals(cctx.localNodeId());
+
                 try {
                     if (log.isDebugEnabled())
-                        log.debug("Sending DHT atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
+                        log.debug("Sending DHT atomic update request [nodeId=" + nodeId + ", req=" + req + ']');
 
-                    if (mappingKey.partition() >= 0) {
-                        Object topic = CU.partitionMessageTopic(cctx, mappingKey.partition(), false);
+                    if (part >= 0) {
+                        Object topic = CU.partitionMessageTopic(cctx, part, false);
 
-                        cctx.io().sendOrderedMessage(mappingKey.nodeId(), topic, req, cctx.ioPolicy(),
+                        cctx.io().sendOrderedMessage(nodeId, topic, req, cctx.ioPolicy(),
                             2 * cctx.gridConfig().getNetworkTimeout());
                     }
                     else {
-                        assert mappingKey.partition() == -1;
+                        assert part == -1;
 
-                        cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+                        cctx.io().send(nodeId, req, cctx.ioPolicy());
                     }
                 }
                 catch (ClusterTopologyCheckedException ignored) {
                     U.warn(log, "Failed to send update request to backup node because it left grid: " +
-                        req.nodeId());
+                        nodeId);
 
                     mappings.remove(mappingKey);
                 }
                 catch (IgniteCheckedException ex) {
                     U.error(log, "Failed to send update request to backup node (did node leave the grid?): "
-                        + req.nodeId(), ex);
+                        + nodeId, ex);
 
                     mappings.remove(mappingKey);
                 }
             }
         }
 
+        mapped = true;
+    }
+
+    /**
+     * On mapped callback.
+     */
+    public void onMapped() {
+        assert mapped;
+
         checkComplete();
 
         // Send response right away if no ACKs from backup is required.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e5c69b83/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 35c6910..7149dec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -44,7 +44,12 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     /** Message index. */
     public static final int CACHE_MSG_IDX = nextIndexId();
 
-    /** Node ID. */
+    /**
+     * Node ID.
+     *
+     * @deprecated Not used anymore, but removal will break compatibility.
+     */
+    @Deprecated
     private UUID nodeId;
 
     /** Future version. */


[6/8] incubator-ignite git commit: IGNITE-104 - Ordered ATOMIC updates

Posted by vk...@apache.org.
IGNITE-104 - Ordered ATOMIC updates


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9781ea43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9781ea43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9781ea43

Branch: refs/heads/ignite-104
Commit: 9781ea4384a553e5126b8a7320f7070f6a340809
Parents: 7c73fc5
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Wed Jul 29 17:57:49 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Wed Jul 29 17:57:49 2015 -0700

----------------------------------------------------------------------
 .../org/apache/ignite/internal/GridTopic.java   |  17 +-
 .../processors/cache/GridCacheIoManager.java    |   3 +-
 .../processors/cache/GridCacheUtils.java        |   4 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  29 ++--
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |   2 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 172 ++++++-------------
 .../dht/atomic/GridNearAtomicUpdateRequest.java |  27 +--
 .../atomic/GridNearAtomicUpdateResponse.java    |  28 +--
 8 files changed, 87 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index 3cf92f8..e9da40c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -185,11 +185,10 @@ public enum GridTopic {
     /**
      * @param id1 ID1.
      * @param id2 ID2.
-     * @param id3 ID3.
      * @return Grid message topic with specified IDs.
      */
-    public Object topic(int id1, int id2, byte id3) {
-        return new T9(this, id1, id2, id3);
+    public Object topic(int id1, int id2) {
+        return new T9(this, id1, id2);
     }
 
     /**
@@ -782,9 +781,6 @@ public enum GridTopic {
         /** */
         private int id2;
 
-        /** */
-        private int id3;
-
         /**
          * No-arg constructor needed for {@link Serializable}.
          */
@@ -796,13 +792,11 @@ public enum GridTopic {
          * @param topic Topic.
          * @param id1 ID1.
          * @param id2 ID2.
-         * @param id3 ID3.
          */
-        private T9(GridTopic topic, int id1, int id2, byte id3) {
+        private T9(GridTopic topic, int id1, int id2) {
             this.topic = topic;
             this.id1 = id1;
             this.id2 = id2;
-            this.id3 = id3;
         }
 
         /** {@inheritDoc} */
@@ -811,7 +805,6 @@ public enum GridTopic {
 
             res += 31 * res + id1;
             res += 31 * res + id2;
-            res += 31 * res + id3;
 
             return res;
         }
@@ -821,7 +814,7 @@ public enum GridTopic {
             if (obj.getClass() == T9.class) {
                 T9 that = (T9)obj;
 
-                return topic == that.topic && id1 == that.id1 && id2 == that.id2 && id3 == that.id3;
+                return topic == that.topic && id1 == that.id1 && id2 == that.id2;
             }
 
             return false;
@@ -832,7 +825,6 @@ public enum GridTopic {
             out.writeByte(topic.ordinal());
             out.writeInt(id1);
             out.writeInt(id2);
-            out.writeByte(id3);
         }
 
         /** {@inheritDoc} */
@@ -840,7 +832,6 @@ public enum GridTopic {
             topic = fromOrdinal(in.readByte());
             id1 = in.readInt();
             id2 = in.readInt();
-            id3 = in.readByte();
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index dec6aef..5858424 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -394,8 +394,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                 GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
                     ctx.cacheId(),
                     nodeId,
-                    req.futureVersion(),
-                    req.partition());
+                    req.futureVersion());
 
                 res.error(req.classError());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 96df7c5..d82acca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1739,9 +1739,9 @@ public class GridCacheUtils {
      * @param part Partition.
      * @return Per-partition message topic.
      */
-    public static Object partitionMessageTopic(GridCacheContext ctx, int part, boolean nearMsg) {
+    public static Object partitionMessageTopic(GridCacheContext ctx, int part) {
         assert part >= 0;
 
-        return TOPIC_CACHE.topic(ctx.cacheId(), part, (byte)(nearMsg ? 1 : 0));
+        return TOPIC_CACHE.topic(ctx.cacheId(), part);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 31606b2..3084e68 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -181,15 +181,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         });
 
+        ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() {
+            @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) {
+                processNearAtomicUpdateRequest(nodeId, req);
+            }
+        });
+
         if (ctx.config().isAtomicOrderedUpdates()) {
             for (int part = 0; part < ctx.affinity().partitions(); part++) {
-                ctx.io().addOrderedHandler(CU.partitionMessageTopic(ctx, part, true), new CI2<UUID, GridNearAtomicUpdateRequest>() {
-                    @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) {
-                        processNearAtomicUpdateRequest(nodeId, req);
-                    }
-                });
-
-                ctx.io().addOrderedHandler(CU.partitionMessageTopic(ctx, part, false), new CI2<UUID, GridDhtAtomicUpdateRequest>() {
+                ctx.io().addOrderedHandler(CU.partitionMessageTopic(ctx, part), new CI2<UUID, GridDhtAtomicUpdateRequest>() {
                     @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) {
                         processDhtAtomicUpdateRequest(nodeId, req);
                     }
@@ -197,12 +197,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         }
         else {
-            ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() {
-                @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) {
-                    processNearAtomicUpdateRequest(nodeId, req);
-                }
-            });
-
             ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateRequest.class, new CI2<UUID, GridDhtAtomicUpdateRequest>() {
                 @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) {
                     processDhtAtomicUpdateRequest(nodeId, req);
@@ -244,10 +238,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             buf.finish();
 
         if (ctx.config().isAtomicOrderedUpdates()) {
-            for (int part = 0; part < ctx.affinity().partitions(); part++) {
-                ctx.io().removeOrderedHandler(CU.partitionMessageTopic(ctx, part, true));
-                ctx.io().removeOrderedHandler(CU.partitionMessageTopic(ctx, part, false));
-            }
+            for (int part = 0; part < ctx.affinity().partitions(); part++)
+                ctx.io().removeOrderedHandler(CU.partitionMessageTopic(ctx, part));
         }
     }
 
@@ -1041,8 +1033,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     ) {
         GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
             nodeId,
-            req.futureVersion(),
-            req.partition());
+            req.futureVersion());
 
         List<KeyCacheObject> keys = req.keys();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index ab0c2e1..8595dc7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -359,7 +359,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
                         log.debug("Sending DHT atomic update request [nodeId=" + nodeId + ", req=" + req + ']');
 
                     if (part >= 0) {
-                        Object topic = CU.partitionMessageTopic(cctx, part, false);
+                        Object topic = CU.partitionMessageTopic(cctx, part);
 
                         cctx.io().sendOrderedMessage(nodeId, topic, req, cctx.ioPolicy(),
                             2 * cctx.gridConfig().getNetworkTimeout());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 9b2a5e2..4c8a161 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -90,7 +90,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
     /** Mappings. */
     @GridToStringInclude
-    private ConcurrentMap<GridAtomicMappingKey, GridNearAtomicUpdateRequest> mappings;
+    private ConcurrentMap<UUID, GridNearAtomicUpdateRequest> mappings;
 
     /** Error. */
     private volatile CachePartialUpdateCheckedException err;
@@ -246,11 +246,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
     /** {@inheritDoc} */
     @Override public Collection<? extends ClusterNode> nodes() {
-        return F.view(F.viewReadOnly(mappings.keySet(), new C1<GridAtomicMappingKey, ClusterNode>() {
-            @Override public ClusterNode apply(GridAtomicMappingKey mappingKey) {
-                return cctx.kernalContext().discovery().node(mappingKey.nodeId());
-            }
-        }), F.notNull());
+        return F.view(F.viewReadOnly(mappings.keySet(), U.id2Node(cctx.kernalContext())), F.notNull());
     }
 
     /**
@@ -287,24 +283,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             return false;
         }
 
-        Collection<GridAtomicMappingKey> mappingKeys = new ArrayList<>(mappings.size());
-        Collection<KeyCacheObject> failedKeys = new ArrayList<>();
-
-        for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) {
-            if (e.getKey().nodeId().equals(nodeId)) {
-                mappingKeys.add(e.getKey());
-
-                failedKeys.addAll(e.getValue().keys());
-            }
-        }
+        GridNearAtomicUpdateRequest req = mappings.get(nodeId);
 
-        if (!mappingKeys.isEmpty()) {
-            if (!failedKeys.isEmpty())
-                addFailedKeys(failedKeys, new ClusterTopologyCheckedException("Primary node left grid before " +
-                    "response is received: " + nodeId));
+        if (req != null) {
+            addFailedKeys(req.keys(), new ClusterTopologyCheckedException("Primary node left grid before response is " +
+                "received: " + nodeId));
 
-            for (GridAtomicMappingKey key : mappingKeys)
-                mappings.remove(key);
+            mappings.remove(nodeId);
 
             checkComplete();
 
@@ -544,9 +529,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             }
         }
         else {
-            GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, res.partition());
-
-            GridNearAtomicUpdateRequest req = mappings.get(mappingKey);
+            GridNearAtomicUpdateRequest req = mappings.get(nodeId);
 
             if (req != null) { // req can be null if onResult is being processed concurrently with onNodeLeft.
                 updateNear(req, res);
@@ -564,7 +547,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                         opRes = ret;
                 }
 
-                mappings.remove(mappingKey);
+                mappings.remove(nodeId);
             }
 
             checkComplete();
@@ -780,11 +763,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             if (op != TRANSFORM)
                 val = cctx.toCacheObject(val);
 
-            int part = cctx.affinity().partition(cacheKey);
-            ClusterNode primary = cctx.affinity().primary(part, topVer);
-
-            if (!ccfg.isAtomicOrderedUpdates())
-                part = -1;
+            ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
 
             if (primary == null) {
                 onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
@@ -810,8 +789,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                 subjId,
                 taskNameHash,
                 skipStore,
-                cctx.kernalContext().clientNode(),
-                part);
+                cctx.kernalContext().clientNode());
 
             req.addUpdateEntry(cacheKey,
                 val,
@@ -827,7 +805,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             }
 
             // Optimize mapping for single key.
-            mapSingle(new GridAtomicMappingKey(primary.id(), part), req);
+            mapSingle(primary.id(), req);
 
             return;
         }
@@ -847,18 +825,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         if (conflictRmvVals != null)
             conflictRmvValsIt = conflictRmvVals.iterator();
 
-        Map<GridAtomicMappingKey, GridNearAtomicUpdateRequest> pendingMappings = new HashMap<>(topNodes.size(), 1.0f);
+        Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = new HashMap<>(topNodes.size(), 1.0f);
 
         // Must do this in synchronized block because we need to atomically remove and add mapping.
         // Otherwise checkComplete() may see empty intermediate state.
         synchronized (this) {
-            if (oldNodeId != null) {
-                // TODO: IGNITE-104 - Try to avoid iteration.
-                for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) {
-                    if (e.getKey().nodeId().equals(oldNodeId))
-                        mappings.remove(e.getKey());
-                }
-            }
+            if (oldNodeId != null)
+                removeMapping(oldNodeId);
 
             // For fastMap mode wait for all responses before remapping.
             if (remap && fastMap && !mappings.isEmpty()) {
@@ -928,10 +901,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                 if (op != TRANSFORM)
                     val = cctx.toCacheObject(val);
 
-                T2<Integer, Collection<ClusterNode>> t = mapKey(cacheKey, topVer, fastMap);
-
-                int part = ccfg.isAtomicOrderedUpdates() ? t.get1() : -1;
-                Collection<ClusterNode> affNodes = t.get2();
+                Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap);
 
                 if (affNodes.isEmpty()) {
                     onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
@@ -952,9 +922,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
                     UUID nodeId = affNode.id();
 
-                    GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, part);
-
-                    GridNearAtomicUpdateRequest mapped = pendingMappings.get(mappingKey);
+                    GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
 
                     if (mapped == null) {
                         mapped = new GridNearAtomicUpdateRequest(
@@ -974,12 +942,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                             subjId,
                             taskNameHash,
                             skipStore,
-                            cctx.kernalContext().clientNode(),
-                            part);
+                            cctx.kernalContext().clientNode());
 
-                        pendingMappings.put(mappingKey, mapped);
+                        pendingMappings.put(nodeId, mapped);
 
-                        GridNearAtomicUpdateRequest old = mappings.put(mappingKey, mapped);
+                        GridNearAtomicUpdateRequest old = mappings.put(nodeId, mapped);
 
                         assert old == null || (old != null && remap) :
                             "Invalid mapping state [old=" + old + ", remap=" + remap + ']';
@@ -997,7 +964,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         }
 
         if ((single == null || single) && pendingMappings.size() == 1) {
-            Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> entry = F.first(pendingMappings.entrySet());
+            Map.Entry<UUID, GridNearAtomicUpdateRequest> entry = F.first(pendingMappings.entrySet());
 
             single = true;
 
@@ -1020,35 +987,31 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
      * @param fastMap Flag indicating whether mapping is performed for fast-circuit update.
      * @return Collection of nodes to which key is mapped.
      */
-    private T2<Integer, Collection<ClusterNode>> mapKey(
+    private Collection<ClusterNode> mapKey(
         KeyCacheObject key,
         AffinityTopologyVersion topVer,
         boolean fastMap
     ) {
         GridCacheAffinityManager affMgr = cctx.affinity();
 
-        int part = affMgr.partition(key);
-
         // If we can send updates in parallel - do it.
-        Collection<ClusterNode> nodes = fastMap ?
-            cctx.topology().nodes(part, topVer) :
-            Collections.singletonList(affMgr.primary(part, topVer));
-
-        return new T2<>(part, nodes);
+        return fastMap ?
+            cctx.topology().nodes(affMgr.partition(key), topVer) :
+            Collections.singletonList(affMgr.primary(key, topVer));
     }
 
     /**
      * Maps future to single node.
      *
-     * @param mappingKey Mapping key.
+     * @param nodeId Node ID.
      * @param req Request.
      */
-    private void mapSingle(GridAtomicMappingKey mappingKey, GridNearAtomicUpdateRequest req) {
-        singleNodeId = mappingKey.nodeId();
+    private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) {
+        singleNodeId = nodeId;
         singleReq = req;
 
-        if (cctx.localNodeId().equals(mappingKey.nodeId())) {
-            cache.updateAllAsyncInternal(mappingKey.nodeId(), req,
+        if (cctx.localNodeId().equals(nodeId)) {
+            cache.updateAllAsyncInternal(nodeId, req,
                 new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
                     @Override public void apply(GridNearAtomicUpdateRequest req,
                         GridNearAtomicUpdateResponse res) {
@@ -1063,7 +1026,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                 if (log.isDebugEnabled())
                     log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
 
-                sendRequest(mappingKey, req);
+                cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
 
                 if (syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY)
                     onDone(new GridCacheReturn(cctx, true, null, true));
@@ -1079,37 +1042,34 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
      *
      * @param mappings Mappings to send.
      */
-    private void doUpdate(Map<GridAtomicMappingKey, GridNearAtomicUpdateRequest> mappings) {
+    private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) {
         UUID locNodeId = cctx.localNodeId();
 
-        Collection<GridNearAtomicUpdateRequest> locUpdates = null;
+        GridNearAtomicUpdateRequest locUpdate = null;
 
         // Send messages to remote nodes first, then run local update.
-        for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) {
-            GridAtomicMappingKey mappingKey = e.getKey();
-            GridNearAtomicUpdateRequest req = e.getValue();
-
+        for (GridNearAtomicUpdateRequest req : mappings.values()) {
             if (locNodeId.equals(req.nodeId())) {
-                if (locUpdates == null)
-                    locUpdates = new ArrayList<>(mappings.size());
+                assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate +
+                    ", req=" + req + ']';
 
-                locUpdates.add(req);
+                locUpdate = req;
             }
             else {
                 try {
                     if (log.isDebugEnabled())
                         log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
 
-                    sendRequest(mappingKey, req);
+                    cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
                 }
-                catch (IgniteCheckedException ex) {
-                    addFailedKeys(req.keys(), ex);
+                catch (IgniteCheckedException e) {
+                    addFailedKeys(req.keys(), e);
 
-                    removeMapping(mappingKey);
+                    removeMapping(req.nodeId());
                 }
 
                 if (syncMode == PRIMARY_SYNC && !req.hasPrimary())
-                    removeMapping(mappingKey);
+                    removeMapping(req.nodeId());
             }
         }
 
@@ -1117,52 +1077,28 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             // In FULL_ASYNC mode always return (null, true).
             opRes = new GridCacheReturn(cctx, true, null, true);
 
-        if (locUpdates != null) {
-            for (GridNearAtomicUpdateRequest locUpdate : locUpdates) {
-                cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
-                    new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
-                        @Override public void apply(GridNearAtomicUpdateRequest req,
-                            GridNearAtomicUpdateResponse res) {
-                            assert res.futureVersion().equals(futVer) : futVer;
+        if (locUpdate != null) {
+            cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
+                new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
+                    @Override public void apply(GridNearAtomicUpdateRequest req,
+                        GridNearAtomicUpdateResponse res) {
+                        assert res.futureVersion().equals(futVer) : futVer;
 
-                            onResult(res.nodeId(), res);
-                        }
-                    });
-            }
+                        onResult(res.nodeId(), res);
+                    }
+                });
         }
 
         checkComplete();
     }
 
     /**
-     * Sends request.
-     *
-     * @param mappingKey Mapping key.
-     * @param req Update request.
-     * @throws IgniteCheckedException In case of error.
-     */
-    private void sendRequest(GridAtomicMappingKey mappingKey, GridNearAtomicUpdateRequest req)
-        throws IgniteCheckedException {
-        if (mappingKey.partition() >= 0) {
-            Object topic = CU.partitionMessageTopic(cctx, mappingKey.partition(), true);
-
-            cctx.io().sendOrderedMessage(mappingKey.nodeId(), topic, req, cctx.ioPolicy(),
-                2 * cctx.gridConfig().getNetworkTimeout());
-        }
-        else {
-            assert mappingKey.partition() == -1;
-
-            cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
-        }
-    }
-
-    /**
      * Removes mapping from future mappings map.
      *
-     * @param mappingKey Mapping key.
+     * @param nodeId Node ID to remove mapping for.
      */
-    private void removeMapping(GridAtomicMappingKey mappingKey) {
-        mappings.remove(mappingKey);
+    private void removeMapping(UUID nodeId) {
+        mappings.remove(nodeId);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index b3075c4..734cf6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -135,9 +135,6 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
     /** */
     private boolean clientReq;
 
-    /** Partition. */
-    private int part;
-
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -165,7 +162,6 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
      * @param taskNameHash Task name hash code.
      * @param skipStore Skip write-through to a persistent storage.
      * @param clientReq Client node request flag.
-     * @param part Partition.
      */
     public GridNearAtomicUpdateRequest(
         int cacheId,
@@ -184,8 +180,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
         @Nullable UUID subjId,
         int taskNameHash,
         boolean skipStore,
-        boolean clientReq,
-        int part
+        boolean clientReq
     ) {
         this.cacheId = cacheId;
         this.nodeId = nodeId;
@@ -205,7 +200,6 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
         this.taskNameHash = taskNameHash;
         this.skipStore = skipStore;
         this.clientReq = clientReq;
-        this.part = part;
 
         keys = new ArrayList<>();
     }
@@ -321,13 +315,6 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
     }
 
     /**
-     * @return Partition.
-     */
-    public int partition() {
-        return part;
-    }
-
-    /**
      * @param key Key to add.
      * @param val Optional update value.
      * @param conflictTtl Conflict TTL (optional).
@@ -679,8 +666,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 writer.incrementState();
 
             case 16:
-                if (!writer.writeInt("part", part))
-                    return false;
+//                if (!writer.writeInt("part", part))
+//                    return false;
 
                 writer.incrementState();
 
@@ -863,10 +850,10 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 reader.incrementState();
 
             case 16:
-                part = reader.readInt("part");
-
-                if (!reader.isLastRead())
-                    return false;
+//                part = reader.readInt("part");
+//
+//                if (!reader.isLastRead())
+//                    return false;
 
                 reader.incrementState();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9781ea43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index e2d33d5..2b30536 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -92,9 +92,6 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     /** Near expire times. */
     private GridLongList nearExpireTimes;
 
-    /** Partition. */
-    private int part;
-
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -106,13 +103,11 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
      * @param cacheId Cache ID.
      * @param nodeId Node ID this reply should be sent to.
      * @param futVer Future version.
-     * @param part Partition.
      */
-    public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer, int part) {
+    public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer) {
         this.cacheId = cacheId;
         this.nodeId = nodeId;
         this.futVer = futVer;
-        this.part = part;
     }
 
     /** {@inheritDoc} */
@@ -143,7 +138,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
 
     /**
      * Sets update error.
-     * @param err
+     * @param err Exception.
      */
     public void error(IgniteCheckedException err){
         this.err = err;
@@ -193,13 +188,6 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     }
 
     /**
-     * @return Partition.
-     */
-    public int partition() {
-        return part;
-    }
-
-    /**
      * Adds value to be put in near cache on originating node.
      *
      * @param keyIdx Key index.
@@ -497,8 +485,8 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
                 writer.incrementState();
 
             case 12:
-                if (!writer.writeInt("part", part))
-                    return false;
+//                if (!writer.writeInt("part", part))
+//                    return false;
 
                 writer.incrementState();
 
@@ -603,10 +591,10 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
                 reader.incrementState();
 
             case 12:
-                part = reader.readInt("part");
-
-                if (!reader.isLastRead())
-                    return false;
+//                part = reader.readInt("part");
+//
+//                if (!reader.isLastRead())
+//                    return false;
 
                 reader.incrementState();