You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/08/05 03:46:08 UTC
[08/19] incubator-ignite git commit: IGNITE-104 - Ordered ATOMIC
updates
IGNITE-104 - Ordered ATOMIC updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e3ebcb96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e3ebcb96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e3ebcb96
Branch: refs/heads/ignite-104
Commit: e3ebcb96efd469dd162b5b0e8cd3904ab7003a67
Parents: 98ad892
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Tue Aug 4 11:08:45 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Tue Aug 4 11:08:45 2015 -0700
----------------------------------------------------------------------
.../configuration/CacheConfiguration.java | 22 ------------
.../processors/cache/GridCacheContext.java | 8 ++---
.../dht/atomic/GridDhtAtomicCache.java | 35 +++++---------------
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 8 ++---
.../dht/atomic/GridNearAtomicUpdateFuture.java | 30 ++++++-----------
5 files changed, 24 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3ebcb96/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index ff32551..3ad0f01 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -219,10 +219,6 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/** Write ordering mode. */
private CacheAtomicWriteOrderMode atomicWriteOrderMode;
- /** Ordered updates mode. */
- // TODO: IGNITE-104 - Switch default to false
- private boolean atomicOrderedUpdates = true;
-
/** Number of backups for cache. */
private int backups = DFLT_BACKUPS;
@@ -349,7 +345,6 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
aff = cc.getAffinity();
affMapper = cc.getAffinityMapper();
atomicityMode = cc.getAtomicityMode();
- atomicOrderedUpdates = cc.isAtomicOrderedUpdates();
atomicWriteOrderMode = cc.getAtomicWriteOrderMode();
backups = cc.getBackups();
cacheLoaderFactory = cc.getCacheLoaderFactory();
@@ -901,23 +896,6 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
}
/**
- * @return Ordered updates mode.
- */
- public boolean isAtomicOrderedUpdates() {
- return atomicOrderedUpdates;
- }
-
- /**
- * @param atomicOrderedUpdates Ordered updates mode.
- * @return {@code this} for chaining.
- */
- public CacheConfiguration<K, V> setAtomicOrderedUpdates(boolean atomicOrderedUpdates) {
- this.atomicOrderedUpdates = atomicOrderedUpdates;
-
- return this;
- }
-
- /**
* Gets number of nodes used to back up single partition for {@link CacheMode#PARTITIONED} cache.
* <p>
* If not set, default value is {@link #DFLT_BACKUPS}.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3ebcb96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 05ce183..db62f20 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -537,12 +537,8 @@ public class GridCacheContext<K, V> implements Externalizable {
* @return {@code True} if entries should not be deleted from cache immediately.
*/
public boolean deferredDelete(GridCacheAdapter<?, ?> cache) {
- boolean nearAtomic = cache.isNear() && cache.configuration().getAtomicityMode() == ATOMIC;
- boolean orderedUpdates = cache.configuration().isAtomicOrderedUpdates();
-
- return cache.isDht() || cache.isColocated() ||
- (cache.isDhtAtomic() && !orderedUpdates) ||
- (nearAtomic && !orderedUpdates);
+ // Only TRANSACTIONAL caches.
+ return cache.isDht() || cache.isColocated();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3ebcb96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index f5119f6..01694d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -187,33 +187,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
});
- if (ctx.config().isAtomicOrderedUpdates()) {
- for (int part = 0; part < ctx.affinity().partitions(); part++) {
- Object nearTopic = new GridAtomicRequestTopic(ctx.cacheId(), part, true);
+ for (int part = 0; part < ctx.affinity().partitions(); part++) {
+ Object nearTopic = new GridAtomicRequestTopic(ctx.cacheId(), part, true);
- ctx.io().addPerTopicHandler(nearTopic, new CI2<UUID, GridNearAtomicUpdateRequest>() {
- @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) {
- processNearAtomicUpdateRequest(nodeId, req);
- }
- });
-
- Object dhtTopic = new GridAtomicRequestTopic(ctx.cacheId(), part, false);
-
- ctx.io().addPerTopicHandler(dhtTopic, new CI2<UUID, GridDhtAtomicUpdateRequest>() {
- @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) {
- processDhtAtomicUpdateRequest(nodeId, req);
- }
- });
- }
- }
- else {
- ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() {
+ ctx.io().addPerTopicHandler(nearTopic, new CI2<UUID, GridNearAtomicUpdateRequest>() {
@Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) {
processNearAtomicUpdateRequest(nodeId, req);
}
});
- ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateRequest.class, new CI2<UUID, GridDhtAtomicUpdateRequest>() {
+ Object dhtTopic = new GridAtomicRequestTopic(ctx.cacheId(), part, false);
+
+ ctx.io().addPerTopicHandler(dhtTopic, new CI2<UUID, GridDhtAtomicUpdateRequest>() {
@Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) {
processDhtAtomicUpdateRequest(nodeId, req);
}
@@ -253,11 +238,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
for (DeferredResponseBuffer buf : pendingResponses.values())
buf.finish();
- if (ctx.config().isAtomicOrderedUpdates()) {
- for (int part = 0; part < ctx.affinity().partitions(); part++) {
- ctx.io().removePerTopicHandler(new GridAtomicRequestTopic(ctx.cacheId(), part, true));
- ctx.io().removePerTopicHandler(new GridAtomicRequestTopic(ctx.cacheId(), part, false));
- }
+ for (int part = 0; part < ctx.affinity().partitions(); part++) {
+ ctx.io().removePerTopicHandler(new GridAtomicRequestTopic(ctx.cacheId(), part, true));
+ ctx.io().removePerTopicHandler(new GridAtomicRequestTopic(ctx.cacheId(), part, false));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3ebcb96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 5c22b3b..52d59ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -123,8 +123,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
waitForExchange = !topLocked;
// We can send entry processor instead of value to backup if updates are ordered.
- forceTransformBackups = updateReq.operation() == GridCacheOperation.TRANSFORM &&
- cctx.config().isAtomicOrderedUpdates();
+ forceTransformBackups = updateReq.operation() == GridCacheOperation.TRANSFORM;
}
/** {@inheritDoc} */
@@ -218,9 +217,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(part, topVer);
- if (!cctx.config().isAtomicOrderedUpdates())
- part = -1;
-
if (log.isDebugEnabled())
log.debug("Mapping entry to DHT nodes [nodes=" + U.nodeIds(dhtNodes) + ", entry=" + entry + ']');
@@ -281,7 +277,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
AffinityTopologyVersion topVer = updateReq.topologyVersion();
- int part = cctx.config().isAtomicOrderedUpdates() ? entry.partition() : -1;
+ int part = entry.partition();
for (UUID nodeId : readers) {
GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, part);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3ebcb96/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 5150113..c4704cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -512,7 +512,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
Collection<KeyCacheObject> remapKeys = fastMap ? null : res.remapKeys();
- mapOnTopology(remapKeys, true, nodeId);
+ mapOnTopology(remapKeys, true, new GridAtomicMappingKey(nodeId, res.partition()));
return;
}
@@ -591,9 +591,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
*
* @param keys Keys to map.
* @param remap Boolean flag indicating if this is partial future remap.
- * @param oldNodeId Old node ID if remap.
+ * @param remapKey Mapping key (if remap).
*/
- private void mapOnTopology(final Collection<?> keys, final boolean remap, final UUID oldNodeId) {
+ private void mapOnTopology(final Collection<?> keys, final boolean remap, final GridAtomicMappingKey remapKey) {
cache.topology().readLock();
AffinityTopologyVersion topVer = null;
@@ -624,7 +624,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
cctx.kernalContext().closure().runLocalSafe(new Runnable() {
@Override public void run() {
- mapOnTopology(keys, remap, oldNodeId);
+ mapOnTopology(keys, remap, remapKey);
}
});
}
@@ -640,7 +640,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
cache.topology().readUnlock();
}
- map0(topVer, keys, remap, oldNodeId);
+ map0(topVer, keys, remap, remapKey);
}
/**
@@ -683,14 +683,14 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
* @param topVer Topology version.
* @param remapKeys Keys to remap or {@code null} to map all keys.
* @param remap Flag indicating if this is partial remap for this future.
- * @param oldNodeId Old node ID if was remap.
+ * @param remapKey Mapping key (if remap).
*/
private void map0(
AffinityTopologyVersion topVer,
@Nullable Collection<?> remapKeys,
boolean remap,
- @Nullable UUID oldNodeId) {
- assert oldNodeId == null || remap || fastMapRemap;
+ @Nullable GridAtomicMappingKey remapKey) {
+ assert remapKey == null || remap || fastMapRemap;
Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
@@ -783,9 +783,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
int part = cctx.affinity().partition(cacheKey);
ClusterNode primary = cctx.affinity().primary(part, topVer);
- if (!ccfg.isAtomicOrderedUpdates())
- part = -1;
-
if (primary == null) {
onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
"left the grid)."));
@@ -852,13 +849,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
// Must do this in synchronized block because we need to atomically remove and add mapping.
// Otherwise checkComplete() may see empty intermediate state.
synchronized (this) {
- if (oldNodeId != null) {
- // TODO: IGNITE-104 - Try to avoid iteration.
- for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) {
- if (e.getKey().nodeId().equals(oldNodeId))
- mappings.remove(e.getKey());
- }
- }
+ if (remapKey != null)
+ mappings.remove(remapKey);
// For fastMap mode wait for all responses before remapping.
if (remap && fastMap && !mappings.isEmpty()) {
@@ -930,7 +922,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
T2<Integer, Collection<ClusterNode>> t = mapKey(cacheKey, topVer, fastMap);
- int part = ccfg.isAtomicOrderedUpdates() ? t.get1() : -1;
+ int part = t.get1();
Collection<ClusterNode> affNodes = t.get2();
if (affNodes.isEmpty()) {