You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/10/25 15:07:27 UTC
ignite git commit: ignite-426-2-reb WIP
Repository: ignite
Updated Branches:
refs/heads/ignite-426-2-reb 4f263f0e1 -> f9a1753c9
ignite-426-2-reb WIP
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f9a1753c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f9a1753c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f9a1753c
Branch: refs/heads/ignite-426-2-reb
Commit: f9a1753c9d41d02e696ff5c740dfbaf30149812d
Parents: 4f263f0
Author: Tikhonov Nikolay <ti...@gmail.com>
Authored: Sun Oct 25 17:10:10 2015 +0300
Committer: Tikhonov Nikolay <ti...@gmail.com>
Committed: Sun Oct 25 17:10:10 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMapEntry.java | 69 ++++++++++++++++----
.../cache/GridCacheUpdateAtomicResult.java | 17 ++++-
.../dht/atomic/GridDhtAtomicCache.java | 24 ++++++-
.../continuous/CacheContinuousQueryManager.java | 6 ++
...acheContinuousQueryFailoverAbstractTest.java | 8 +--
5 files changed, 106 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a1753c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 0b2a7fd..f592ba6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -33,6 +33,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
import org.apache.ignite.cache.CacheMemoryMode;
import org.apache.ignite.cache.eviction.EvictableEntry;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -59,6 +60,7 @@ import org.apache.ignite.internal.util.lang.GridTuple3;
import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -1705,6 +1707,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CacheObject oldVal;
CacheObject updated;
+ if (!primary) {
+ int z = 0;
+
+ ++z;
+ }
+
GridCacheVersion enqueueVer = null;
GridCacheVersionConflictContext<?, ?> conflictCtx = null;
@@ -1723,6 +1731,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
Object updated0 = null;
Long updateIdx0 = null;
+ CI1<IgniteInternalFuture<Void>> contQryNtf = null;
synchronized (this) {
boolean needVal = intercept || retval || op == GridCacheOperation.TRANSFORM || !F.isEmptyOrNulls(filter);
@@ -1832,7 +1841,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
null,
null,
false,
- updateIdx0 == null ? 0 : updateIdx0);
+ updateIdx0 == null ? 0 : updateIdx0,
+ null);
}
// Will update something.
else {
@@ -1909,8 +1919,23 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (updateIdx != null)
updateIdx0 = updateIdx;
- cctx.continuousQueries().onEntryUpdated(this, key, evtVal, prevVal, primary, false,
- updateIdx0, topVer);
+ final boolean primary0 = primary;
+ final CacheObject prevVal0 = prevVal;
+ final CacheObject evtVal0 = evtVal;
+ final AffinityTopologyVersion topVer0 = topVer;
+ final long updateIdx00 = updateIdx0;
+
+ contQryNtf = new CI1<IgniteInternalFuture<Void>>() {
+ @Override public void apply(IgniteInternalFuture<Void> voidIgniteInternalFuture) {
+ try {
+ cctx.continuousQueries().onEntryUpdated(GridCacheMapEntry.this, key, evtVal0,
+ prevVal0, primary0, false, updateIdx00, topVer0);
+ }
+ catch (IgniteCheckedException e) {
+ // No-op.
+ }
+ }
+ };
}
return new GridCacheUpdateAtomicResult(false,
@@ -1922,7 +1947,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
null,
null,
false,
- updateIdx0 == null ? 0 : updateIdx0);
+ updateIdx0 == null ? 0 : updateIdx0,
+ contQryNtf);
}
}
else
@@ -1999,7 +2025,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
null,
null,
false,
- updateIdx0 == null ? 0 : updateIdx0);
+ updateIdx0 == null ? 0 : updateIdx0,
+ null);
}
}
@@ -2047,7 +2074,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
null,
null,
false,
- updateIdx0 == null ? 0 : updateIdx);
+ updateIdx0 == null ? 0 : updateIdx,
+ null);
}
}
else
@@ -2148,7 +2176,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
null,
null,
false,
- updateIdx0 == null ? 0 : updateIdx0);
+ updateIdx0 == null ? 0 : updateIdx0,
+ null);
else if (interceptorVal != updated0) {
updated0 = cctx.unwrapTemporary(interceptorVal);
@@ -2230,7 +2259,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
null,
null,
false,
- updateIdx0 == null ? 0 : updateIdx0);
+ updateIdx0 == null ? 0 : updateIdx0,
+ null);
}
if (writeThrough)
@@ -2316,8 +2346,24 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (res)
updateMetrics(op, metrics);
- if (!isNear())
- cctx.continuousQueries().onEntryUpdated(this, key, val, oldVal, primary, false, updateIdx0, topVer);
+ if (!isNear()) {
+ final boolean primary0 = primary;
+ final CacheObject oldVal0 = oldVal;
+ final AffinityTopologyVersion topVer0 = topVer;
+ final long updateIdx00 = updateIdx0;
+
+ contQryNtf = new CI1<IgniteInternalFuture<Void>>() {
+ @Override public void apply(IgniteInternalFuture<Void> voidIgniteInternalFuture) {
+ try {
+ cctx.continuousQueries().onEntryUpdated(GridCacheMapEntry.this, key, val, oldVal0, primary0,
+ false, updateIdx00, topVer0);
+ }
+ catch (IgniteCheckedException e) {
+ // No-op.
+ }
+ }
+ };
+ }
cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE);
@@ -2344,7 +2390,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
enqueueVer,
conflictCtx,
true,
- updateIdx0);
+ updateIdx0,
+ contQryNtf);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a1753c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
index 092d990..9e2aca6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
@@ -18,9 +18,12 @@
package org.apache.ignite.internal.processors.cache;
import javax.cache.processor.EntryProcessor;
+
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.Nullable;
@@ -63,6 +66,9 @@ public class GridCacheUpdateAtomicResult {
/** Value computed by entry processor. */
private IgniteBiTuple<Object, Exception> res;
+ /** Continuous query notify listener. */
+ private CI1<IgniteInternalFuture<Void>> contQryNtfy;
+
/**
* Constructor.
*
@@ -86,7 +92,8 @@ public class GridCacheUpdateAtomicResult {
@Nullable GridCacheVersion rmvVer,
@Nullable GridCacheVersionConflictContext<?, ?> conflictRes,
boolean sndToDht,
- long updateIdx) {
+ long updateIdx,
+ @Nullable CI1<IgniteInternalFuture<Void>> contQryNtfy) {
this.success = success;
this.oldVal = oldVal;
this.newVal = newVal;
@@ -97,6 +104,7 @@ public class GridCacheUpdateAtomicResult {
this.conflictRes = conflictRes;
this.sndToDht = sndToDht;
this.updateIdx = updateIdx;
+ this.contQryNtfy = contQryNtfy;
}
/**
@@ -170,6 +178,13 @@ public class GridCacheUpdateAtomicResult {
return sndToDht;
}
+ /**
+ * @return Continuous notify closure.
+ */
+ public CI1<IgniteInternalFuture<Void>> contQryNtfy() {
+ return contQryNtfy;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridCacheUpdateAtomicResult.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a1753c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index a9ee79a..c78f3bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1772,7 +1772,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id()));
}
- GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
+ final GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
ver,
node.id(),
locNodeId,
@@ -1806,6 +1806,25 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
readersOnly = true;
}
+ if (!primary) {
+ int z = 0;
+
+ ++z;
+ }
+
+ if (updRes.contQryNtfy() != null) {
+ if (primary && dhtFut != null) {
+ dhtFut.listen(new CI1<IgniteInternalFuture<Void>>() {
+ @Override public void apply(IgniteInternalFuture<Void> f) {
+ if (f.isDone() && f.error() == null)
+ updRes.contQryNtfy().apply(f);
+ }
+ });
+ }
+ else
+ updRes.contQryNtfy().apply(null);
+ }
+
if (dhtFut != null) {
if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios.
GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult();
@@ -2567,6 +2586,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (updRes.removeVersion() != null)
ctx.onDeferredDelete(entry, updRes.removeVersion());
+ if (updRes.contQryNtfy() != null)
+ updRes.contQryNtfy().apply(null);
+
entry.onUnlock();
break; // While.
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a1753c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index b8922a3..b558a0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -189,6 +189,12 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
if (preload && !internal)
return;
+ if (!primary) {
+ int z = 0;
+
+ ++z;
+ }
+
ConcurrentMap<UUID, CacheContinuousQueryListener> lsnrCol;
if (internal)
http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a1753c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
index 6979f6a..90e21ad 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
@@ -27,11 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -93,6 +89,7 @@ import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
/**
*
@@ -122,6 +119,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
TestCommunicationSpi commSpi = new TestCommunicationSpi();
+ commSpi.setSharedMemoryPort(-1);
commSpi.setIdleConnectionTimeout(100);
cfg.setCommunicationSpi(commSpi);