You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/21 13:52:06 UTC
[05/40] incubator-ignite git commit: IGNITE-1265 - Entry processor
must always have the correct cache value.
IGNITE-1265 - Entry processor must always have the correct cache value.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5065a1ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5065a1ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5065a1ec
Branch: refs/heads/ignite-1258
Commit: 5065a1eccb3d71b2573d37bb6ff2c78a1bbc107c
Parents: ccaa2b2
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Tue Aug 18 19:35:50 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Tue Aug 18 19:35:50 2015 -0700
----------------------------------------------------------------------
.../dht/GridClientPartitionTopology.java | 20 +++
.../dht/GridDhtPartitionTopology.java | 7 +
.../dht/GridDhtPartitionTopologyImpl.java | 20 +++
.../cache/distributed/dht/GridDhtTxLocal.java | 4 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 136 +++++++++++++++++--
.../cache/transactions/IgniteTxEntry.java | 18 +++
.../IgniteCacheEntryProcessorNodeJoinTest.java | 54 ++++----
.../cache/IgniteCacheInvokeReadThroughTest.java | 2 +-
8 files changed, 223 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index c3f3e7f..531678e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -331,6 +331,26 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
+ @Override public GridDhtPartitionState partitionState(UUID nodeId, int part) {
+ lock.readLock().lock();
+
+ try {
+ GridDhtPartitionMap partMap = node2part.get(nodeId);
+
+ if (partMap != null) {
+ GridDhtPartitionState state = partMap.get(part);
+
+ return state == null ? EVICTED : state;
+ }
+
+ return EVICTED;
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public Collection<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
lock.readLock().lock();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index c551fb3..7b08510 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -129,6 +129,13 @@ public interface GridDhtPartitionTopology {
public GridDhtPartitionMap localPartitionMap();
/**
+ * @param nodeId Node ID.
+ * @param part Partition.
+ * @return Partition state.
+ */
+ public GridDhtPartitionState partitionState(UUID nodeId, int part);
+
+ /**
* @return Current update sequence.
*/
public long updateSequence();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index de7f876..f356138 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -614,6 +614,26 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
+ @Override public GridDhtPartitionState partitionState(UUID nodeId, int part) {
+ lock.readLock().lock();
+
+ try {
+ GridDhtPartitionMap partMap = node2part.get(nodeId);
+
+ if (partMap != null) {
+ GridDhtPartitionState state = partMap.get(part);
+
+ return state == null ? EVICTED : state;
+ }
+
+ return EVICTED;
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public Collection<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
Collection<ClusterNode> affNodes = cctx.affinity().nodes(p, topVer);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 6a72c89..7da6e07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -363,8 +363,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
* @return Future that will be completed when locks are acquired.
*/
public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync(
- @Nullable Iterable<IgniteTxEntry> reads,
- @Nullable Iterable<IgniteTxEntry> writes,
+ @Nullable Collection<IgniteTxEntry> reads,
+ @Nullable Collection<IgniteTxEntry> writes,
Map<IgniteTxKey, GridCacheVersion> verMap,
long msgId,
IgniteUuid nearMiniId,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 2b7e1bc..ad1023f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -135,6 +135,9 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
/** Keys that should be locked. */
private GridConcurrentHashSet<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>();
+ /** Force keys future for correct transforms. */
+ private IgniteInternalFuture<?> forceKeysFut;
+
/** Locks ready flag. */
private volatile boolean locksReady;
@@ -291,7 +294,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
boolean hasFilters = !F.isEmptyOrNulls(txEntry.filters()) && !F.isAlwaysTrue(txEntry.filters());
- if (hasFilters || retVal || txEntry.op() == GridCacheOperation.DELETE) {
+ if (hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM) {
cached.unswap(retVal);
boolean readThrough = (retVal || hasFilters) &&
@@ -312,7 +315,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
null,
null);
- if (retVal) {
+ if (retVal || txEntry.op() == TRANSFORM) {
if (!F.isEmpty(txEntry.entryProcessors())) {
invoke = true;
@@ -339,6 +342,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
}
}
+ txEntry.entryProcessorCalculatedValue(val);
+
if (err != null || procRes != null)
ret.addEntryProcessResult(txEntry.context(), key, null, procRes, err);
else
@@ -362,7 +367,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
ret.success(false);
}
else
- ret.success(txEntry.op() != GridCacheOperation.DELETE || cached.hasValue());
+ ret.success(txEntry.op() != DELETE || cached.hasValue());
}
}
catch (IgniteCheckedException e) {
@@ -466,7 +471,25 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
*/
private boolean mapIfLocked() {
if (checkLocks()) {
- prepare0();
+ if (!mapped.compareAndSet(false, true))
+ return false;
+
+ if (forceKeysFut == null || (forceKeysFut.isDone() && forceKeysFut.error() == null))
+ prepare0();
+ else {
+ forceKeysFut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> f) {
+ try {
+ f.get();
+
+ prepare0();
+ }
+ catch (IgniteCheckedException e) {
+ onError(e);
+ }
+ }
+ });
+ }
return true;
}
@@ -709,7 +732,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
* @param writes Write entries.
* @param txNodes Transaction nodes mapping.
*/
- public void prepare(Iterable<IgniteTxEntry> reads, Iterable<IgniteTxEntry> writes,
+ @SuppressWarnings("TypeMayBeWeakened")
+ public void prepare(Collection<IgniteTxEntry> reads, Collection<IgniteTxEntry> writes,
Map<UUID, Collection<UUID>> txNodes) {
if (tx.empty()) {
tx.setRollbackOnly();
@@ -721,6 +745,15 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
this.writes = writes;
this.txNodes = txNodes;
+ if (!F.isEmpty(writes)) {
+ Map<Integer, Collection<KeyCacheObject>> forceKeys = null;
+
+ for (IgniteTxEntry entry : writes)
+ forceKeys = checkNeedRebalanceKeys(entry, forceKeys);
+
+ forceKeysFut = forceRebalanceKeys(forceKeys);
+ }
+
readyLocks();
mapIfLocked();
@@ -735,12 +768,75 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
}
/**
+ * Checks if this transaction needs previous value for the given tx entry. Will use passed in map to store
+ * required key or will create new map if passed in map is {@code null}.
*
+ * @param e TX entry.
+ * @param map Map with needed preload keys.
+ * @return Map if it was created.
*/
- private void prepare0() {
- if (!mapped.compareAndSet(false, true))
- return;
+ private Map<Integer, Collection<KeyCacheObject>> checkNeedRebalanceKeys(
+ IgniteTxEntry e,
+ Map<Integer, Collection<KeyCacheObject>> map
+ ) {
+ if (retVal || !F.isEmpty(e.entryProcessors())) {
+ if (map == null)
+ map = new HashMap<>();
+
+ Collection<KeyCacheObject> keys = map.get(e.cacheId());
+
+ if (keys == null) {
+ keys = new ArrayList<>();
+
+ map.put(e.cacheId(), keys);
+ }
+
+ keys.add(e.key());
+ }
+
+ return map;
+ }
+
+ private IgniteInternalFuture<Object> forceRebalanceKeys(Map<Integer, Collection<KeyCacheObject>> keysMap) {
+ if (F.isEmpty(keysMap))
+ return null;
+
+ GridCompoundFuture<Object, Object> compFut = null;
+ IgniteInternalFuture<Object> lastForceFut = null;
+ for (Map.Entry<Integer, Collection<KeyCacheObject>> entry : keysMap.entrySet()) {
+ if (lastForceFut != null && compFut == null) {
+ compFut = new GridCompoundFuture();
+
+ compFut.add(lastForceFut);
+ }
+
+ int cacheId = entry.getKey();
+
+ Collection<KeyCacheObject> keys = entry.getValue();
+
+ lastForceFut = cctx.cacheContext(cacheId).preloader().request(keys, tx.topologyVersion());
+
+ if (compFut != null)
+ compFut.add(lastForceFut);
+ }
+
+ if (compFut != null) {
+ compFut.markInitialized();
+
+ return compFut;
+ }
+ else {
+ assert lastForceFut != null;
+
+ return lastForceFut;
+ }
+ }
+
+ /**
+ *
+ */
+ private void prepare0() {
try {
// We are holding transaction-level locks for entries here, so we can get next write version.
onEntriesLocked();
@@ -957,7 +1053,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
private boolean map(
IgniteTxEntry entry,
Map<UUID, GridDistributedTxMapping> futDhtMap,
- Map<UUID, GridDistributedTxMapping> futNearMap) {
+ Map<UUID, GridDistributedTxMapping> futNearMap
+ ) {
if (entry.cached().isLocal())
return false;
@@ -1024,14 +1121,31 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
* @param locMap Exclude map.
* @return {@code True} if mapped.
*/
- private boolean map(IgniteTxEntry entry, Iterable<ClusterNode> nodes,
- Map<UUID, GridDistributedTxMapping> globalMap, Map<UUID, GridDistributedTxMapping> locMap) {
+ private boolean map(
+ IgniteTxEntry entry,
+ Iterable<ClusterNode> nodes,
+ Map<UUID, GridDistributedTxMapping> globalMap,
+ Map<UUID, GridDistributedTxMapping> locMap
+ ) {
boolean ret = false;
if (nodes != null) {
for (ClusterNode n : nodes) {
GridDistributedTxMapping global = globalMap.get(n.id());
+ if (!F.isEmpty(entry.entryProcessors())) {
+ GridDhtPartitionState state = entry.context().topology().partitionState(n.id(),
+ entry.cached().partition());
+
+ if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) {
+ CacheObject procVal = entry.entryProcessorCalculatedValue();
+
+ entry.op(procVal == null ? DELETE : UPDATE);
+ entry.value(procVal, true, false);
+ entry.entryProcessors(null);
+ }
+ }
+
if (global == null)
globalMap.put(n.id(), global = new GridDistributedTxMapping(n));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 247d350..7890831 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -79,6 +79,10 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
@GridDirectTransient
private Collection<T2<EntryProcessor<Object, Object, Object>, Object[]>> entryProcessorsCol;
+ /** Transient field for calculated entry processor value. */
+ @GridDirectTransient
+ private CacheObject entryProcessorCalcVal;
+
/** Transform closure bytes. */
@GridToStringExclude
private byte[] transformClosBytes;
@@ -775,6 +779,20 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
return expiryPlc;
}
+ /**
+ * @return Entry processor calculated value.
+ */
+ public CacheObject entryProcessorCalculatedValue() {
+ return entryProcessorCalcVal;
+ }
+
+ /**
+ * @param entryProcessorCalcVal Entry processor calculated value.
+ */
+ public void entryProcessorCalculatedValue(CacheObject entryProcessorCalcVal) {
+ this.entryProcessorCalcVal = entryProcessorCalcVal;
+ }
+
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
index 9c17ebd..94bfd8f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryProcessorNodeJoinTest.java
@@ -122,38 +122,44 @@ public class IgniteCacheEntryProcessorNodeJoinTest extends GridCommonAbstractTes
final AtomicReference<Throwable> error = new AtomicReference<>();
final int started = 6;
- IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
- @Override public void run() {
- try {
- for (int i = 0; i < started; i++) {
- U.sleep(1_000);
-
- startGrid(GRID_CNT + i);
+ try {
+ IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
+ @Override public void run() {
+ try {
+ for (int i = 0; i < started; i++) {
+ U.sleep(1_000);
+
+ startGrid(GRID_CNT + i);
+ }
+ }
+ catch (Exception e) {
+ error.compareAndSet(null, e);
}
}
- catch (Exception e) {
- error.compareAndSet(null, e);
- }
- }
- }, 1, "starter");
+ }, 1, "starter");
- try {
- checkIncrement(invokeAll);
- }
- finally {
- stop.set(true);
+ try {
+ checkIncrement(invokeAll);
+ }
+ finally {
+ stop.set(true);
- fut.get(getTestTimeout());
- }
+ fut.get(getTestTimeout());
+ }
- for (int i = 0; i < NUM_SETS; i++) {
- for (int g = 0; g < GRID_CNT + started; g++) {
- Set<String> vals = ignite(g).<String, Set<String>>cache(null).get("set-" + i);
+ for (int i = 0; i < NUM_SETS; i++) {
+ for (int g = 0; g < GRID_CNT + started; g++) {
+ Set<String> vals = ignite(g).<String, Set<String>>cache(null).get("set-" + i);
- assertNotNull(vals);
- assertEquals(100, vals.size());
+ assertNotNull(vals);
+ assertEquals(100, vals.size());
+ }
}
}
+ finally {
+ for (int i = 0; i < started; i++)
+ stopGrid(GRID_CNT + i);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5065a1ec/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
index 10ab1ab..b72540d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheInvokeReadThroughTest.java
@@ -34,7 +34,7 @@ import static org.apache.ignite.cache.CacheMode.*;
public class IgniteCacheInvokeReadThroughTest extends IgniteCacheAbstractTest {
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-114");
+// fail("https://issues.apache.org/jira/browse/IGNITE-114");
}
/** */