You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/21 13:52:06 UTC

[05/40] incubator-ignite git commit: IGNITE-1265 - Entry processor must always have the correct cache value.

IGNITE-1265 - Entry processor must always have the correct cache value.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5065a1ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5065a1ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5065a1ec

Branch: refs/heads/ignite-1258
Commit: 5065a1eccb3d71b2573d37bb6ff2c78a1bbc107c
Parents: ccaa2b2
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Tue Aug 18 19:35:50 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Tue Aug 18 19:35:50 2015 -0700

----------------------------------------------------------------------
 .../dht/GridClientPartitionTopology.java        |  20 +++
 .../dht/GridDhtPartitionTopology.java           |   7 +
 .../dht/GridDhtPartitionTopologyImpl.java       |  20 +++
 .../cache/distributed/dht/GridDhtTxLocal.java   |   4 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java | 136 +++++++++++++++++--
 .../cache/transactions/IgniteTxEntry.java       |  18 +++
 .../IgniteCacheEntryProcessorNodeJoinTest.java  |  54 ++++----
 .../cache/IgniteCacheInvokeReadThroughTest.java |   2 +-
 8 files changed, 223 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index c3f3e7f..531678e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -331,6 +331,26 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public GridDhtPartitionState partitionState(UUID nodeId, int part) {
+        lock.readLock().lock();
+
+        try {
+            GridDhtPartitionMap partMap = node2part.get(nodeId);
+
+            if (partMap != null) {
+                GridDhtPartitionState state = partMap.get(part);
+
+                return state == null ? EVICTED : state;
+            }
+
+            return EVICTED;
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public Collection<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
         lock.readLock().lock();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index c551fb3..7b08510 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -129,6 +129,13 @@ public interface GridDhtPartitionTopology {
     public GridDhtPartitionMap localPartitionMap();
 
     /**
+     * @param nodeId Node ID.
+     * @param part Partition.
+     * @return Partition state.
+     */
+    public GridDhtPartitionState partitionState(UUID nodeId, int part);
+
+    /**
      * @return Current update sequence.
      */
     public long updateSequence();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index de7f876..f356138 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -614,6 +614,26 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
+    @Override public GridDhtPartitionState partitionState(UUID nodeId, int part) {
+        lock.readLock().lock();
+
+        try {
+            GridDhtPartitionMap partMap = node2part.get(nodeId);
+
+            if (partMap != null) {
+                GridDhtPartitionState state = partMap.get(part);
+
+                return state == null ? EVICTED : state;
+            }
+
+            return EVICTED;
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public Collection<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
         Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 6a72c89..7da6e07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -363,8 +363,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
      * @return Future that will be completed when locks are acquired.
      */
     public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync(
-        @Nullable Iterable<IgniteTxEntry> reads,
-        @Nullable Iterable<IgniteTxEntry> writes,
+        @Nullable Collection<IgniteTxEntry> reads,
+        @Nullable Collection<IgniteTxEntry> writes,
         Map<IgniteTxKey, GridCacheVersion> verMap,
         long msgId,
         IgniteUuid nearMiniId,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 2b7e1bc..ad1023f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -135,6 +135,9 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
     /** Keys that should be locked. */
     private GridConcurrentHashSet<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
 
+    /** Force keys future for correct transforms. */
+    private IgniteInternalFuture<?> forceKeysFut;
+
     /** Locks ready flag. */
     private volatile boolean locksReady;
 
@@ -291,7 +294,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
                 boolean hasFilters = !F.isEmptyOrNulls(txEntry.filters()) && !F.isAlwaysTrue(txEntry.filters());
 
-                if (hasFilters || retVal || txEntry.op() == GridCacheOperation.DELETE) {
+                if (hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM) {
                     cached.unswap(retVal);
 
                     boolean readThrough = (retVal || hasFilters) &&
@@ -312,7 +315,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                         null,
                         null);
 
-                    if (retVal) {
+                    if (retVal || txEntry.op() == TRANSFORM) {
                         if (!F.isEmpty(txEntry.entryProcessors())) {
                             invoke = true;
 
@@ -339,6 +342,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                                 }
                             }
 
+                            txEntry.entryProcessorCalculatedValue(val);
+
                             if (err != null || procRes != null)
                                 ret.addEntryProcessResult(txEntry.context(), key, null, procRes, err);
                             else
@@ -362,7 +367,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                         ret.success(false);
                     }
                     else
-                        ret.success(txEntry.op() != GridCacheOperation.DELETE || cached.hasValue());
+                        ret.success(txEntry.op() != DELETE || cached.hasValue());
                 }
             }
             catch (IgniteCheckedException e) {
@@ -466,7 +471,25 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
      */
     private boolean mapIfLocked() {
         if (checkLocks()) {
-            prepare0();
+            if (!mapped.compareAndSet(false, true))
+                return false;
+
+            if (forceKeysFut == null || (forceKeysFut.isDone() && forceKeysFut.error() == null))
+                prepare0();
+            else {
+                forceKeysFut.listen(new CI1<IgniteInternalFuture<?>>() {
+                    @Override public void apply(IgniteInternalFuture<?> f) {
+                        try {
+                            f.get();
+
+                            prepare0();
+                        }
+                        catch (IgniteCheckedException e) {
+                            onError(e);
+                        }
+                    }
+                });
+            }
 
             return true;
         }
@@ -709,7 +732,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
      * @param writes Write entries.
      * @param txNodes Transaction nodes mapping.
      */
-    public void prepare(Iterable<IgniteTxEntry> reads, Iterable<IgniteTxEntry> writes,
+    @SuppressWarnings("TypeMayBeWeakened")
+    public void prepare(Collection<IgniteTxEntry> reads, Collection<IgniteTxEntry> writes,
         Map<UUID, Collection<UUID>> txNodes) {
         if (tx.empty()) {
             tx.setRollbackOnly();
@@ -721,6 +745,15 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
         this.writes = writes;
         this.txNodes = txNodes;
 
+        if (!F.isEmpty(writes)) {
+            Map<Integer, Collection<KeyCacheObject>> forceKeys = null;
+
+            for (IgniteTxEntry entry : writes)
+                forceKeys = checkNeedRebalanceKeys(entry, forceKeys);
+
+            forceKeysFut = forceRebalanceKeys(forceKeys);
+        }
+
         readyLocks();
 
         mapIfLocked();
@@ -735,12 +768,75 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
     }
 
     /**
+     * Checks if this transaction needs previous value for the given tx entry. Will use passed in map to store
+     * required key or will create new map if passed in map is {@code null}.
      *
+     * @param e TX entry.
+     * @param map Map with needed preload keys.
+     * @return Map if it was created.
      */
-    private void prepare0() {
-        if (!mapped.compareAndSet(false, true))
-            return;
+    private Map<Integer, Collection<KeyCacheObject>> checkNeedRebalanceKeys(
+        IgniteTxEntry e,
+        Map<Integer, Collection<KeyCacheObject>> map
+    ) {
+        if (retVal || !F.isEmpty(e.entryProcessors())) {
+            if (map == null)
+                map = new HashMap<>();
+
+            Collection<KeyCacheObject> keys = map.get(e.cacheId());
+
+            if (keys == null) {
+                keys = new ArrayList<>();
+
+                map.put(e.cacheId(), keys);
+            }
+
+            keys.add(e.key());
+        }
+
+        return map;
+    }
+
+    private IgniteInternalFuture<Object> forceRebalanceKeys(Map<Integer, Collection<KeyCacheObject>> keysMap) {
+        if (F.isEmpty(keysMap))
+            return null;
+
+        GridCompoundFuture<Object, Object> compFut = null;
+        IgniteInternalFuture<Object> lastForceFut = null;
 
+        for (Map.Entry<Integer, Collection<KeyCacheObject>> entry : keysMap.entrySet()) {
+            if (lastForceFut != null && compFut == null) {
+                compFut = new GridCompoundFuture();
+
+                compFut.add(lastForceFut);
+            }
+
+            int cacheId = entry.getKey();
+
+            Collection<KeyCacheObject> keys = entry.getValue();
+
+            lastForceFut = cctx.cacheContext(cacheId).preloader().request(keys, tx.topologyVersion());
+
+            if (compFut != null)
+                compFut.add(lastForceFut);
+        }
+
+        if (compFut != null) {
+            compFut.markInitialized();
+
+            return compFut;
+        }
+        else {
+            assert lastForceFut != null;
+
+            return lastForceFut;
+        }
+    }
+
+    /**
+     *
+     */
+    private void prepare0() {
         try {
             // We are holding transaction-level locks for entries here, so we can get next write version.
             onEntriesLocked();
@@ -957,7 +1053,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
     private boolean map(
         IgniteTxEntry entry,
         Map<UUID, GridDistributedTxMapping> futDhtMap,
-        Map<UUID, GridDistributedTxMapping> futNearMap) {
+        Map<UUID, GridDistributedTxMapping> futNearMap
+    ) {
         if (entry.cached().isLocal())
             return false;
 
@@ -1024,14 +1121,31 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
      * @param locMap Exclude map.
      * @return {@code True} if mapped.
      */
-    private boolean map(IgniteTxEntry entry, Iterable<ClusterNode> nodes,
-        Map<UUID, GridDistributedTxMapping> globalMap, Map<UUID, GridDistributedTxMapping> locMap) {
+    private boolean map(
+        IgniteTxEntry entry,
+        Iterable<ClusterNode> nodes,
+        Map<UUID, GridDistributedTxMapping> globalMap,
+        Map<UUID, GridDistributedTxMapping> locMap
+    ) {
         boolean ret = false;
 
         if (nodes != null) {
             for (ClusterNode n : nodes) {
                 GridDistributedTxMapping global = globalMap.get(n.id());
 
+                if (!F.isEmpty(entry.entryProcessors())) {
+                    GridDhtPartitionState state = entry.context().topology().partitionState(n.id(),
+                        entry.cached().partition());
+
+                    if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) {
+                        CacheObject procVal = entry.entryProcessorCalculatedValue();
+
+                        entry.op(procVal == null ? DELETE : UPDATE);
+                        entry.value(procVal, true, false);
+                        entry.entryProcessors(null);
+                    }
+                }
+
                 if (global == null)
                     globalMap.put(n.id(), global = new GridDistributedTxMapping(n));
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 247d350..7890831 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -79,6 +79,10 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
     @GridDirectTransient
     private Collection<T2<EntryProcessor<Object, Object, Object>, Object[]>> entryProcessorsCol;
 
+    /** Transient field for calculated entry processor value. */
+    @GridDirectTransient
+    private CacheObject entryProcessorCalcVal;
+
     /** Transform closure bytes. */
     @GridToStringExclude
     private byte[] transformClosBytes;
@@ -775,6 +779,20 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
         return expiryPlc;
     }
 
+    /**
+     * @return Entry processor calculated value.
+     */
+    public CacheObject entryProcessorCalculatedValue() {
+        return entryProcessorCalcVal;
+    }
+
+    /**
+     * @param entryProcessorCalcVal Entry processor calculated value.
+     */
+    public void entryProcessorCalculatedValue(CacheObject entryProcessorCalcVal) {
+        this.entryProcessorCalcVal = entryProcessorCalcVal;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
index 9c17ebd..94bfd8f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
@@ -122,38 +122,44 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
         final AtomicReference<Throwable> error = new AtomicReference<>();
         final int started = 6;
 
-        IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
-            @Override public void run() {
-                try {
-                    for (int i = 0; i < started; i++) {
-                        U.sleep(1_000);
-
-                        startGrid(GRID_CNT + i);
+        try {
+            IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+                @Override public void run() {
+                    try {
+                        for (int i = 0; i < started; i++) {
+                            U.sleep(1_000);
+
+                            startGrid(GRID_CNT + i);
+                        }
+                    }
+                    catch (Exception e) {
+                        error.compareAndSet(null, e);
                     }
                 }
-                catch (Exception e) {
-                    error.compareAndSet(null, e);
-                }
-            }
-        }, 1, "starter");
+            }, 1, "starter");
 
-        try {
-            checkIncrement(invokeAll);
-        }
-        finally {
-            stop.set(true);
+            try {
+                checkIncrement(invokeAll);
+            }
+            finally {
+                stop.set(true);
 
-            fut.get(getTestTimeout());
-        }
+                fut.get(getTestTimeout());
+            }
 
-        for (int i = 0; i < NUM_SETS; i++) {
-            for (int g = 0; g < GRID_CNT + started; g++) {
-                Set<String> vals = ignite(g).<String, Set<String>>cache(null).get("set-" + i);
+            for (int i = 0; i < NUM_SETS; i++) {
+                for (int g = 0; g < GRID_CNT + started; g++) {
+                    Set<String> vals = ignite(g).<String, Set<String>>cache(null).get("set-" + i);
 
-                assertNotNull(vals);
-                assertEquals(100, vals.size());
+                    assertNotNull(vals);
+                    assertEquals(100, vals.size());
+                }
             }
         }
+        finally {
+            for (int i = 0; i < started; i++)
+                stopGrid(GRID_CNT + i);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
index 10ab1ab..b72540d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
@@ -34,7 +34,7 @@ import static org.apache.ignite.cache.CacheMode.*;
 public class IgniteCacheInvokeReadThroughTest extends IgniteCacheAbstractTest {
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-114");
+//        fail("https://issues.apache.org/jira/browse/IGNITE-114");
     }
 
     /** */