You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/08/05 03:46:08 UTC

[08/19] incubator-ignite git commit: IGNITE-104 - Ordered ATOMIC updates

IGNITE-104 - Ordered ATOMIC updates


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e3ebcb96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e3ebcb96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e3ebcb96

Branch: refs/heads/ignite-104
Commit: e3ebcb96efd469dd162b5b0e8cd3904ab7003a67
Parents: 98ad892
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Tue Aug 4 11:08:45 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Tue Aug 4 11:08:45 2015 -0700

----------------------------------------------------------------------
 .../configuration/CacheConfiguration.java       | 22 ------------
 .../processors/cache/GridCacheContext.java      |  8 ++---
 .../dht/atomic/GridDhtAtomicCache.java          | 35 +++++---------------
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  8 ++---
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 30 ++++++-----------
 5 files changed, 24 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3ebcb96/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index ff32551..3ad0f01 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -219,10 +219,6 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     /** Write ordering mode. */
     private CacheAtomicWriteOrderMode atomicWriteOrderMode;
 
-    /** Ordered updates mode. */
-    // TODO: IGNITE-104 - Switch default to false
-    private boolean atomicOrderedUpdates = true;
-
     /** Number of backups for cache. */
     private int backups = DFLT_BACKUPS;
 
@@ -349,7 +345,6 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
         aff = cc.getAffinity();
         affMapper = cc.getAffinityMapper();
         atomicityMode = cc.getAtomicityMode();
-        atomicOrderedUpdates = cc.isAtomicOrderedUpdates();
         atomicWriteOrderMode = cc.getAtomicWriteOrderMode();
         backups = cc.getBackups();
         cacheLoaderFactory = cc.getCacheLoaderFactory();
@@ -901,23 +896,6 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     }
 
     /**
-     * @return Ordered updates mode.
-     */
-    public boolean isAtomicOrderedUpdates() {
-        return atomicOrderedUpdates;
-    }
-
-    /**
-     * @param atomicOrderedUpdates Ordered updates mode.
-     * @return {@code this} for chaining.
-     */
-    public CacheConfiguration<K, V> setAtomicOrderedUpdates(boolean atomicOrderedUpdates) {
-        this.atomicOrderedUpdates = atomicOrderedUpdates;
-
-        return this;
-    }
-
-    /**
      * Gets number of nodes used to back up single partition for {@link CacheMode#PARTITIONED} cache.
      * <p>
      * If not set, default value is {@link #DFLT_BACKUPS}.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3ebcb96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 05ce183..db62f20 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -537,12 +537,8 @@ public class GridCacheContext<K, V> implements Externalizable {
      * @return {@code True} if entries should not be deleted from cache immediately.
      */
     public boolean deferredDelete(GridCacheAdapter<?, ?> cache) {
-        boolean nearAtomic = cache.isNear() && cache.configuration().getAtomicityMode() == ATOMIC;
-        boolean orderedUpdates = cache.configuration().isAtomicOrderedUpdates();
-
-        return cache.isDht() || cache.isColocated() ||
-            (cache.isDhtAtomic() && !orderedUpdates) ||
-            (nearAtomic && !orderedUpdates);
+        // Only TRANSACTIONAL caches.
+        return cache.isDht() || cache.isColocated();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3ebcb96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index f5119f6..01694d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -187,33 +187,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             }
         });
 
-        if (ctx.config().isAtomicOrderedUpdates()) {
-            for (int part = 0; part < ctx.affinity().partitions(); part++) {
-                Object nearTopic = new GridAtomicRequestTopic(ctx.cacheId(), part, true);
+        for (int part = 0; part < ctx.affinity().partitions(); part++) {
+            Object nearTopic = new GridAtomicRequestTopic(ctx.cacheId(), part, true);
 
-                ctx.io().addPerTopicHandler(nearTopic, new CI2<UUID, GridNearAtomicUpdateRequest>() {
-                    @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) {
-                        processNearAtomicUpdateRequest(nodeId, req);
-                    }
-                });
-
-                Object dhtTopic = new GridAtomicRequestTopic(ctx.cacheId(), part, false);
-
-                ctx.io().addPerTopicHandler(dhtTopic, new CI2<UUID, GridDhtAtomicUpdateRequest>() {
-                    @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) {
-                        processDhtAtomicUpdateRequest(nodeId, req);
-                    }
-                });
-            }
-        }
-        else {
-            ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() {
+            ctx.io().addPerTopicHandler(nearTopic, new CI2<UUID, GridNearAtomicUpdateRequest>() {
                 @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) {
                     processNearAtomicUpdateRequest(nodeId, req);
                 }
             });
 
-            ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateRequest.class, new CI2<UUID, GridDhtAtomicUpdateRequest>() {
+            Object dhtTopic = new GridAtomicRequestTopic(ctx.cacheId(), part, false);
+
+            ctx.io().addPerTopicHandler(dhtTopic, new CI2<UUID, GridDhtAtomicUpdateRequest>() {
                 @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) {
                     processDhtAtomicUpdateRequest(nodeId, req);
                 }
@@ -253,11 +238,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         for (DeferredResponseBuffer buf : pendingResponses.values())
             buf.finish();
 
-        if (ctx.config().isAtomicOrderedUpdates()) {
-            for (int part = 0; part < ctx.affinity().partitions(); part++) {
-                ctx.io().removePerTopicHandler(new GridAtomicRequestTopic(ctx.cacheId(), part, true));
-                ctx.io().removePerTopicHandler(new GridAtomicRequestTopic(ctx.cacheId(), part, false));
-            }
+        for (int part = 0; part < ctx.affinity().partitions(); part++) {
+            ctx.io().removePerTopicHandler(new GridAtomicRequestTopic(ctx.cacheId(), part, true));
+            ctx.io().removePerTopicHandler(new GridAtomicRequestTopic(ctx.cacheId(), part, false));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3ebcb96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 5c22b3b..52d59ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -123,8 +123,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         waitForExchange = !topLocked;
 
         // We can send entry processor instead of value to backup if updates are ordered.
-        forceTransformBackups = updateReq.operation() == GridCacheOperation.TRANSFORM &&
-            cctx.config().isAtomicOrderedUpdates();
+        forceTransformBackups = updateReq.operation() == GridCacheOperation.TRANSFORM;
     }
 
     /** {@inheritDoc} */
@@ -218,9 +217,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
 
         Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(part, topVer);
 
-        if (!cctx.config().isAtomicOrderedUpdates())
-            part = -1;
-
         if (log.isDebugEnabled())
             log.debug("Mapping entry to DHT nodes [nodes=" + U.nodeIds(dhtNodes) + ", entry=" + entry + ']');
 
@@ -281,7 +277,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
 
         AffinityTopologyVersion topVer = updateReq.topologyVersion();
 
-        int part = cctx.config().isAtomicOrderedUpdates() ? entry.partition() : -1;
+        int part = entry.partition();
 
         for (UUID nodeId : readers) {
             GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, part);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3ebcb96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 5150113..c4704cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -512,7 +512,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
             Collection<KeyCacheObject> remapKeys = fastMap ? null : res.remapKeys();
 
-            mapOnTopology(remapKeys, true, nodeId);
+            mapOnTopology(remapKeys, true, new GridAtomicMappingKey(nodeId, res.partition()));
 
             return;
         }
@@ -591,9 +591,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
      *
      * @param keys Keys to map.
      * @param remap Boolean flag indicating if this is partial future remap.
-     * @param oldNodeId Old node ID if remap.
+     * @param remapKey Mapping key (if remap).
      */
-    private void mapOnTopology(final Collection<?> keys, final boolean remap, final UUID oldNodeId) {
+    private void mapOnTopology(final Collection<?> keys, final boolean remap, final GridAtomicMappingKey remapKey) {
         cache.topology().readLock();
 
         AffinityTopologyVersion topVer = null;
@@ -624,7 +624,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                         @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
                             cctx.kernalContext().closure().runLocalSafe(new Runnable() {
                                 @Override public void run() {
-                                    mapOnTopology(keys, remap, oldNodeId);
+                                    mapOnTopology(keys, remap, remapKey);
                                 }
                             });
                         }
@@ -640,7 +640,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             cache.topology().readUnlock();
         }
 
-        map0(topVer, keys, remap, oldNodeId);
+        map0(topVer, keys, remap, remapKey);
     }
 
     /**
@@ -683,14 +683,14 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
      * @param topVer Topology version.
      * @param remapKeys Keys to remap or {@code null} to map all keys.
      * @param remap Flag indicating if this is partial remap for this future.
-     * @param oldNodeId Old node ID if was remap.
+     * @param remapKey Mapping key (if remap).
      */
     private void map0(
         AffinityTopologyVersion topVer,
         @Nullable Collection<?> remapKeys,
         boolean remap,
-        @Nullable UUID oldNodeId) {
-        assert oldNodeId == null || remap || fastMapRemap;
+        @Nullable GridAtomicMappingKey remapKey) {
+        assert remapKey == null || remap || fastMapRemap;
 
         Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
 
@@ -783,9 +783,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
             int part = cctx.affinity().partition(cacheKey);
             ClusterNode primary = cctx.affinity().primary(part, topVer);
 
-            if (!ccfg.isAtomicOrderedUpdates())
-                part = -1;
-
             if (primary == null) {
                 onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
                     "left the grid)."));
@@ -852,13 +849,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
         // Must do this in synchronized block because we need to atomically remove and add mapping.
         // Otherwise checkComplete() may see empty intermediate state.
         synchronized (this) {
-            if (oldNodeId != null) {
-                // TODO: IGNITE-104 - Try to avoid iteration.
-                for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) {
-                    if (e.getKey().nodeId().equals(oldNodeId))
-                        mappings.remove(e.getKey());
-                }
-            }
+            if (remapKey != null)
+                mappings.remove(remapKey);
 
             // For fastMap mode wait for all responses before remapping.
             if (remap && fastMap && !mappings.isEmpty()) {
@@ -930,7 +922,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
 
                 T2<Integer, Collection<ClusterNode>> t = mapKey(cacheKey, topVer, fastMap);
 
-                int part = ccfg.isAtomicOrderedUpdates() ? t.get1() : -1;
+                int part = t.get1();
                 Collection<ClusterNode> affNodes = t.get2();
 
                 if (affNodes.isEmpty()) {