You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/10/25 15:07:27 UTC

ignite git commit: ignite-426-2-reb WIP

Repository: ignite
Updated Branches:
  refs/heads/ignite-426-2-reb 4f263f0e1 -> f9a1753c9


ignite-426-2-reb WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f9a1753c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f9a1753c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f9a1753c

Branch: refs/heads/ignite-426-2-reb
Commit: f9a1753c9d41d02e696ff5c740dfbaf30149812d
Parents: 4f263f0
Author: Tikhonov Nikolay <ti...@gmail.com>
Authored: Sun Oct 25 17:10:10 2015 +0300
Committer: Tikhonov Nikolay <ti...@gmail.com>
Committed: Sun Oct 25 17:10:10 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     | 69 ++++++++++++++++----
 .../cache/GridCacheUpdateAtomicResult.java      | 17 ++++-
 .../dht/atomic/GridDhtAtomicCache.java          | 24 ++++++-
 .../continuous/CacheContinuousQueryManager.java |  6 ++
 ...acheContinuousQueryFailoverAbstractTest.java |  8 +--
 5 files changed, 106 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a1753c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 0b2a7fd..f592ba6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -33,6 +33,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
 import org.apache.ignite.cache.CacheMemoryMode;
 import org.apache.ignite.cache.eviction.EvictableEntry;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -59,6 +60,7 @@ import org.apache.ignite.internal.util.lang.GridTuple3;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T3;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -1705,6 +1707,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         CacheObject oldVal;
         CacheObject updated;
 
+        if (!primary) {
+            int z = 0;
+
+            ++z;
+        }
+
         GridCacheVersion enqueueVer = null;
 
         GridCacheVersionConflictContext<?, ?> conflictCtx = null;
@@ -1723,6 +1731,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         Object updated0 = null;
 
         Long updateIdx0 = null;
+        CI1<IgniteInternalFuture<Void>> contQryNtf = null;
 
         synchronized (this) {
             boolean needVal = intercept || retval || op == GridCacheOperation.TRANSFORM || !F.isEmptyOrNulls(filter);
@@ -1832,7 +1841,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                             null,
                             null,
                             false,
-                            updateIdx0 == null ? 0 : updateIdx0);
+                            updateIdx0 == null ? 0 : updateIdx0,
+                            null);
                     }
                     // Will update something.
                     else {
@@ -1909,8 +1919,23 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                             if (updateIdx != null)
                                 updateIdx0 = updateIdx;
 
-                            cctx.continuousQueries().onEntryUpdated(this, key, evtVal, prevVal, primary, false,
-                                updateIdx0, topVer);
+                            final boolean primary0 = primary;
+                            final CacheObject prevVal0 = prevVal;
+                            final CacheObject evtVal0 = evtVal;
+                            final AffinityTopologyVersion topVer0 = topVer;
+                            final long updateIdx00 = updateIdx0;
+
+                            contQryNtf = new CI1<IgniteInternalFuture<Void>>() {
+                                @Override public void apply(IgniteInternalFuture<Void> voidIgniteInternalFuture) {
+                                    try {
+                                        cctx.continuousQueries().onEntryUpdated(GridCacheMapEntry.this, key, evtVal0,
+                                                prevVal0, primary0, false, updateIdx00, topVer0);
+                                    }
+                                    catch (IgniteCheckedException e) {
+                                        // No-op.
+                                    }
+                                }
+                            };
                         }
 
                         return new GridCacheUpdateAtomicResult(false,
@@ -1922,7 +1947,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                             null,
                             null,
                             false,
-                            updateIdx0 == null ? 0 : updateIdx0);
+                            updateIdx0 == null ? 0 : updateIdx0,
+                            contQryNtf);
                     }
                 }
                 else
@@ -1999,7 +2025,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                         null,
                         null,
                         false,
-                        updateIdx0 == null ? 0 : updateIdx0);
+                        updateIdx0 == null ? 0 : updateIdx0,
+                        null);
                 }
             }
 
@@ -2047,7 +2074,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                         null,
                         null,
                         false,
-                        updateIdx0 == null ? 0 : updateIdx);
+                        updateIdx0 == null ? 0 : updateIdx,
+                        null);
                 }
             }
             else
@@ -2148,7 +2176,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                             null,
                             null,
                             false,
-                            updateIdx0 == null ? 0 : updateIdx0);
+                            updateIdx0 == null ? 0 : updateIdx0,
+                            null);
                     else if (interceptorVal != updated0) {
                         updated0 = cctx.unwrapTemporary(interceptorVal);
 
@@ -2230,7 +2259,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                             null,
                             null,
                             false,
-                            updateIdx0 == null ? 0 : updateIdx0);
+                            updateIdx0 == null ? 0 : updateIdx0,
+                            null);
                 }
 
                 if (writeThrough)
@@ -2316,8 +2346,24 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             if (res)
                 updateMetrics(op, metrics);
 
-            if (!isNear())
-                cctx.continuousQueries().onEntryUpdated(this, key, val, oldVal, primary, false, updateIdx0, topVer);
+            if (!isNear()) {
+                final boolean primary0 = primary;
+                final CacheObject oldVal0 = oldVal;
+                final AffinityTopologyVersion topVer0 = topVer;
+                final long updateIdx00 = updateIdx0;
+
+                contQryNtf = new CI1<IgniteInternalFuture<Void>>() {
+                    @Override public void apply(IgniteInternalFuture<Void> voidIgniteInternalFuture) {
+                        try {
+                            cctx.continuousQueries().onEntryUpdated(GridCacheMapEntry.this, key, val, oldVal0, primary0,
+                                false, updateIdx00, topVer0);
+                        }
+                        catch (IgniteCheckedException e) {
+                            // No-op.
+                        }
+                    }
+                };
+            }
 
             cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE);
 
@@ -2344,7 +2390,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             enqueueVer,
             conflictCtx,
             true,
-            updateIdx0);
+            updateIdx0,
+            contQryNtf);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a1753c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
index 092d990..9e2aca6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
@@ -18,9 +18,12 @@
 package org.apache.ignite.internal.processors.cache;
 
 import javax.cache.processor.EntryProcessor;
+
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.jetbrains.annotations.Nullable;
@@ -63,6 +66,9 @@ public class GridCacheUpdateAtomicResult {
     /** Value computed by entry processor. */
     private IgniteBiTuple<Object, Exception> res;
 
+    /** Continuous query notify listener. */
+    private CI1<IgniteInternalFuture<Void>> contQryNtfy;
+
     /**
      * Constructor.
      *
@@ -86,7 +92,8 @@ public class GridCacheUpdateAtomicResult {
         @Nullable GridCacheVersion rmvVer,
         @Nullable GridCacheVersionConflictContext<?, ?> conflictRes,
         boolean sndToDht,
-        long updateIdx) {
+        long updateIdx,
+        @Nullable CI1<IgniteInternalFuture<Void>> contQryNtfy) {
         this.success = success;
         this.oldVal = oldVal;
         this.newVal = newVal;
@@ -97,6 +104,7 @@ public class GridCacheUpdateAtomicResult {
         this.conflictRes = conflictRes;
         this.sndToDht = sndToDht;
         this.updateIdx = updateIdx;
+        this.contQryNtfy = contQryNtfy;
     }
 
     /**
@@ -170,6 +178,13 @@ public class GridCacheUpdateAtomicResult {
         return sndToDht;
     }
 
+    /**
+     * @return Continuous notify closure.
+     */
+    public CI1<IgniteInternalFuture<Void>> contQryNtfy() {
+        return contQryNtfy;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridCacheUpdateAtomicResult.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a1753c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index a9ee79a..c78f3bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1772,7 +1772,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id()));
                 }
 
-                GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
+                final GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
                     ver,
                     node.id(),
                     locNodeId,
@@ -1806,6 +1806,25 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     readersOnly = true;
                 }
 
+                if (!primary) {
+                    int z = 0;
+
+                    ++z;
+                }
+
+                if (updRes.contQryNtfy() != null) {
+                    if (primary && dhtFut != null) {
+                        dhtFut.listen(new CI1<IgniteInternalFuture<Void>>() {
+                            @Override public void apply(IgniteInternalFuture<Void> f) {
+                                if (f.isDone() && f.error() == null)
+                                        updRes.contQryNtfy().apply(f);
+                                }
+                            });
+                    }
+                    else
+                        updRes.contQryNtfy().apply(null);
+                }
+
                 if (dhtFut != null) {
                     if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios.
                         GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult();
@@ -2567,6 +2586,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         if (updRes.removeVersion() != null)
                             ctx.onDeferredDelete(entry, updRes.removeVersion());
 
+                        if (updRes.contQryNtfy() != null)
+                            updRes.contQryNtfy().apply(null);
+
                         entry.onUnlock();
 
                         break; // While.

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a1753c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index b8922a3..b558a0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -189,6 +189,12 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         if (preload && !internal)
             return;
 
+        if (!primary) {
+            int z = 0;
+
+            ++z;
+        }
+
         ConcurrentMap<UUID, CacheContinuousQueryListener> lsnrCol;
 
         if (internal)

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9a1753c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
index 6979f6a..90e21ad 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
@@ -27,11 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -93,6 +89,7 @@ import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 
 /**
  *
@@ -122,6 +119,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
         TestCommunicationSpi commSpi = new TestCommunicationSpi();
 
+        commSpi.setSharedMemoryPort(-1);
         commSpi.setIdleConnectionTimeout(100);
 
         cfg.setCommunicationSpi(commSpi);