You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/31 09:23:32 UTC

[42/51] ignite git commit: ignite-5075

ignite-5075


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e914572f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e914572f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e914572f

Branch: refs/heads/ignite-5075-pds
Commit: e914572ffcd464264257f83d1f53b5aa68e8f141
Parents: 43fbc93
Author: sboikov <sb...@gridgain.com>
Authored: Tue May 30 16:06:55 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue May 30 18:52:27 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheGroupInfrastructure.java         |  2 +
 .../processors/cache/GridCacheProcessor.java    | 11 ++--
 .../continuous/CacheContinuousQueryHandler.java | 68 +++++++++-----------
 .../continuous/CacheContinuousQueryManager.java |  4 +-
 .../processors/cache/IgniteCacheGroupsTest.java | 16 +++--
 ...nuousQueryConcurrentPartitionUpdateTest.java |  8 +--
 .../cache/IgniteCacheAbstractBenchmark.java     |  5 +-
 7 files changed, 59 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e914572f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
index 97f9324..b7d8243 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
@@ -732,6 +732,7 @@ public class CacheGroupInfrastructure {
     public void addCacheWithContinuousQuery(GridCacheContext cctx) {
         assert sharedGroup() : cacheOrGroupName();
         assert cctx.group() == this : cctx.name();
+        assert !cctx.isLocal() : cctx.name();
 
         synchronized (this) {
             List<GridCacheContext> contQryCaches = this.contQryCaches;
@@ -751,6 +752,7 @@ public class CacheGroupInfrastructure {
     public void removeCacheWithContinuousQuery(GridCacheContext cctx) {
         assert sharedGroup() : cacheOrGroupName();
         assert cctx.group() == this : cctx.name();
+        assert !cctx.isLocal() : cctx.name();
 
         synchronized (this) {
             List<GridCacheContext> contQryCaches = this.contQryCaches;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e914572f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 82771b0..bee7860 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1303,7 +1303,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         ctx.group().stopCache(ctx, destroy);
 
-        U.stopLifecycleAware(log, lifecycleAwares(cache.configuration(), ctx.store().configuredStore()));
+        U.stopLifecycleAware(log, lifecycleAwares(ctx.group(), cache.configuration(), ctx.store().configuredStore()));
 
         if (log.isInfoEnabled()) {
             if (ctx.group().sharedGroup())
@@ -1465,7 +1465,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         prepare(cfg, toPrepare);
 
-        U.startLifecycleAware(lifecycleAwares(cfg, cfgStore));
+        U.startLifecycleAware(lifecycleAwares(grp, cfg, cfgStore));
 
         boolean nearEnabled = GridCacheUtils.isNearEnabled(cfg);
 
@@ -3455,14 +3455,17 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param grp Cache group.
      * @param ccfg Cache configuration.
      * @param objs Extra components.
      * @return Components provided in cache configuration which can implement {@link LifecycleAware} interface.
      */
-    private Iterable<Object> lifecycleAwares(CacheConfiguration ccfg, Object... objs) {
+    private Iterable<Object> lifecycleAwares(CacheGroupInfrastructure grp, CacheConfiguration ccfg, Object... objs) {
         Collection<Object> ret = new ArrayList<>(7 + objs.length);
 
-        ret.add(ccfg.getAffinity());
+        if (grp.affinityFunction() != ccfg.getAffinity())
+            ret.add(ccfg.getAffinity());
+
         ret.add(ccfg.getAffinityMapper());
         ret.add(ccfg.getEvictionFilter());
         ret.add(ccfg.getEvictionPolicy());

http://git-wip-us.apache.org/repos/asf/ignite/blob/e914572f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 3d56531..149bd69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -493,51 +493,47 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 long cntr,
                 AffinityTopologyVersion topVer,
                 boolean primary) {
-                CacheContinuousQueryEventBuffer buf = partitionBuffer(cctx, part);
-
                 if (skipCtx == null)
                     skipCtx = new CounterSkipContext(part, cntr, topVer);
 
-                final Object entryOrList = buf.processEntry(skipCtx.entry(), !primary);
+                if (loc) {
+                    assert !locCache;
 
-                if (entryOrList != null) {
-                    if (loc && asyncCb) {
-                        // TODO
-                        return skipCtx;
+                    final Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, skipCtx.entry());
+
+                    if (!evts.isEmpty()) {
+                        if (asyncCb) {
+                            ctx.asyncCallbackPool().execute(new Runnable() {
+                                @Override public void run() {
+                                    locLsnr.onUpdated(evts);
+                                }
+                            }, part);
+                        }
+                        else
+                            skipCtx.addSendClosure(new Runnable() {
+                                @Override public void run() {
+                                    locLsnr.onUpdated(evts);
+                                }
+                            });
                     }
 
+                    return skipCtx;
+                }
+
+                CacheContinuousQueryEventBuffer buf = partitionBuffer(cctx, part);
+
+                final Object entryOrList = buf.processEntry(skipCtx.entry(), !primary);
+
+                if (entryOrList != null) {
                     skipCtx.addSendClosure(new Runnable() {
                         @Override public void run() {
                             try {
-                                if (loc) {
-                                    if (entryOrList instanceof CacheContinuousQueryEntry) {
-                                        CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(null,
-                                            cctx,
-                                            (CacheContinuousQueryEntry)entryOrList);
-
-                                        handleLocalListener(evt);
-                                    }
-                                    else {
-                                        List<CacheContinuousQueryEntry> list =
-                                            (List<CacheContinuousQueryEntry>)entryOrList;
-
-                                        for (CacheContinuousQueryEntry entry : list) {
-                                            CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(null,
-                                                cctx,
-                                                entry);
-
-                                            handleLocalListener(evt);
-                                        }
-                                    }
-                                }
-                                else {
-                                    ctx.continuous().addNotification(nodeId,
-                                        routineId,
-                                        entryOrList,
-                                        topic,
-                                        false,
-                                        true);
-                                }
+                                ctx.continuous().addNotification(nodeId,
+                                    routineId,
+                                    entryOrList,
+                                    topic,
+                                    false,
+                                    true);
                             }
                             catch (ClusterTopologyCheckedException ex) {
                                 if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/e914572f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 5156455..d472054 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -834,7 +834,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 if (added) {
                     int cnt = lsnrCnt.incrementAndGet();
 
-                    if (cctx.group().sharedGroup() && cnt == 1)
+                    if (cctx.group().sharedGroup() && cnt == 1 && !cctx.isLocal())
                         cctx.group().addCacheWithContinuousQuery(cctx);
                 }
             }
@@ -866,7 +866,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 if ((lsnr = lsnrs.remove(id)) != null) {
                     int cnt = lsnrCnt.decrementAndGet();
 
-                    if (cctx.group().sharedGroup() && cnt == 0)
+                    if (cctx.group().sharedGroup() && cnt == 0 && !cctx.isLocal())
                         cctx.group().removeCacheWithContinuousQuery(cctx);
                 }
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e914572f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
index 2076981..7b70472 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
@@ -595,13 +595,14 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
             GridTestUtils.runMultiThreaded(cls, "loaders");
         }
 
-        assertTrue("Expected: [cntr1=" + keys + ", cntr2=" + keys + "] " +
-            "but was: [cntr1=" + cntr1.get() + ", cntr2=" + cntr2.get() + "]",
-            GridTestUtils.waitForCondition(new PA() {
+        GridTestUtils.waitForCondition(new PA() {
             @Override public boolean apply() {
                 return cntr1.get() == keys && cntr2.get() == keys;
             }
-        }, 2000));
+        }, 2000);
+
+        assertEquals(cntr1.get(), keys);
+        assertEquals(cntr2.get(), keys);
 
         qry1.close();
 
@@ -610,12 +611,13 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
         srv0.cache(CACHE1).putAll(map);
         srv0.cache(CACHE2).putAll(map);
 
-        assertTrue("Expected: <" + keys + 10 + "> but was: <" + cntr2.get() + ">",
-            GridTestUtils.waitForCondition(new PA() {
+        GridTestUtils.waitForCondition(new PA() {
             @Override public boolean apply() {
                 return cntr2.get() == keys + 10;
             }
-        }, 2000));
+        }, 2000);
+
+        assertEquals(keys + 10, cntr2.get());
 
         assertEquals(keys, cntr1.get());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e914572f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
index ed0dec0..32320f6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
@@ -91,15 +91,15 @@ public class CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo
     /**
      * @throws Exception If failed.
      */
-    public void testConcurrentUpdatePartitionTxCacheGroup() throws Exception {
-        concurrentUpdatePartition(TRANSACTIONAL, true);
+    public void testConcurrentUpdatePartitionAtomicCacheGroup() throws Exception {
+        concurrentUpdatePartition(ATOMIC, true);
     }
 
     /**
      * @throws Exception If failed.
      */
-    public void testConcurrentUpdatePartitionAtomicCacheGroup() throws Exception {
-        concurrentUpdatePartition(ATOMIC, true);
+    public void testConcurrentUpdatePartitionTxCacheGroup() throws Exception {
+        concurrentUpdatePartition(TRANSACTIONAL, true);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/e914572f/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteCacheAbstractBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteCacheAbstractBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteCacheAbstractBenchmark.java
index 6514dd7..9c0344d 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteCacheAbstractBenchmark.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteCacheAbstractBenchmark.java
@@ -233,8 +233,9 @@ public abstract class IgniteCacheAbstractBenchmark<K, V> extends IgniteAbstractB
 
         loadCacheData(cacheName);
 
-        println(cfg, "Finished populating data [cache=" + cacheName +
-            ", time=" + ((System.nanoTime() - start) / 1_000_000) + "ms]");
+        long time = ((System.nanoTime() - start) / 1_000_000);
+
+        println(cfg, "Finished populating data [cache=" + cacheName + ", time=" + time + "ms]");
     }
 
     /**