You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/31 09:23:32 UTC
[42/51] ignite git commit: ignite-5075
ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e914572f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e914572f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e914572f
Branch: refs/heads/ignite-5075-pds
Commit: e914572ffcd464264257f83d1f53b5aa68e8f141
Parents: 43fbc93
Author: sboikov <sb...@gridgain.com>
Authored: Tue May 30 16:06:55 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue May 30 18:52:27 2017 +0300
----------------------------------------------------------------------
.../cache/CacheGroupInfrastructure.java | 2 +
.../processors/cache/GridCacheProcessor.java | 11 ++--
.../continuous/CacheContinuousQueryHandler.java | 68 +++++++++-----------
.../continuous/CacheContinuousQueryManager.java | 4 +-
.../processors/cache/IgniteCacheGroupsTest.java | 16 +++--
...nuousQueryConcurrentPartitionUpdateTest.java | 8 +--
.../cache/IgniteCacheAbstractBenchmark.java | 5 +-
7 files changed, 59 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e914572f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
index 97f9324..b7d8243 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
@@ -732,6 +732,7 @@ public class CacheGroupInfrastructure {
public void addCacheWithContinuousQuery(GridCacheContext cctx) {
assert sharedGroup() : cacheOrGroupName();
assert cctx.group() == this : cctx.name();
+ assert !cctx.isLocal() : cctx.name();
synchronized (this) {
List<GridCacheContext> contQryCaches = this.contQryCaches;
@@ -751,6 +752,7 @@ public class CacheGroupInfrastructure {
public void removeCacheWithContinuousQuery(GridCacheContext cctx) {
assert sharedGroup() : cacheOrGroupName();
assert cctx.group() == this : cctx.name();
+ assert !cctx.isLocal() : cctx.name();
synchronized (this) {
List<GridCacheContext> contQryCaches = this.contQryCaches;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e914572f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 82771b0..bee7860 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1303,7 +1303,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
ctx.group().stopCache(ctx, destroy);
- U.stopLifecycleAware(log, lifecycleAwares(cache.configuration(), ctx.store().configuredStore()));
+ U.stopLifecycleAware(log, lifecycleAwares(ctx.group(), cache.configuration(), ctx.store().configuredStore()));
if (log.isInfoEnabled()) {
if (ctx.group().sharedGroup())
@@ -1465,7 +1465,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
prepare(cfg, toPrepare);
- U.startLifecycleAware(lifecycleAwares(cfg, cfgStore));
+ U.startLifecycleAware(lifecycleAwares(grp, cfg, cfgStore));
boolean nearEnabled = GridCacheUtils.isNearEnabled(cfg);
@@ -3455,14 +3455,17 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
+ * @param grp Cache group.
* @param ccfg Cache configuration.
* @param objs Extra components.
* @return Components provided in cache configuration which can implement {@link LifecycleAware} interface.
*/
- private Iterable<Object> lifecycleAwares(CacheConfiguration ccfg, Object... objs) {
+ private Iterable<Object> lifecycleAwares(CacheGroupInfrastructure grp, CacheConfiguration ccfg, Object... objs) {
Collection<Object> ret = new ArrayList<>(7 + objs.length);
- ret.add(ccfg.getAffinity());
+ if (grp.affinityFunction() != ccfg.getAffinity())
+ ret.add(ccfg.getAffinity());
+
ret.add(ccfg.getAffinityMapper());
ret.add(ccfg.getEvictionFilter());
ret.add(ccfg.getEvictionPolicy());
http://git-wip-us.apache.org/repos/asf/ignite/blob/e914572f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 3d56531..149bd69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -493,51 +493,47 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
long cntr,
AffinityTopologyVersion topVer,
boolean primary) {
- CacheContinuousQueryEventBuffer buf = partitionBuffer(cctx, part);
-
if (skipCtx == null)
skipCtx = new CounterSkipContext(part, cntr, topVer);
- final Object entryOrList = buf.processEntry(skipCtx.entry(), !primary);
+ if (loc) {
+ assert !locCache;
- if (entryOrList != null) {
- if (loc && asyncCb) {
- // TODO
- return skipCtx;
+ final Collection<CacheEntryEvent<? extends K, ? extends V>> evts = handleEvent(ctx, skipCtx.entry());
+
+ if (!evts.isEmpty()) {
+ if (asyncCb) {
+ ctx.asyncCallbackPool().execute(new Runnable() {
+ @Override public void run() {
+ locLsnr.onUpdated(evts);
+ }
+ }, part);
+ }
+ else
+ skipCtx.addSendClosure(new Runnable() {
+ @Override public void run() {
+ locLsnr.onUpdated(evts);
+ }
+ });
}
+ return skipCtx;
+ }
+
+ CacheContinuousQueryEventBuffer buf = partitionBuffer(cctx, part);
+
+ final Object entryOrList = buf.processEntry(skipCtx.entry(), !primary);
+
+ if (entryOrList != null) {
skipCtx.addSendClosure(new Runnable() {
@Override public void run() {
try {
- if (loc) {
- if (entryOrList instanceof CacheContinuousQueryEntry) {
- CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(null,
- cctx,
- (CacheContinuousQueryEntry)entryOrList);
-
- handleLocalListener(evt);
- }
- else {
- List<CacheContinuousQueryEntry> list =
- (List<CacheContinuousQueryEntry>)entryOrList;
-
- for (CacheContinuousQueryEntry entry : list) {
- CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(null,
- cctx,
- entry);
-
- handleLocalListener(evt);
- }
- }
- }
- else {
- ctx.continuous().addNotification(nodeId,
- routineId,
- entryOrList,
- topic,
- false,
- true);
- }
+ ctx.continuous().addNotification(nodeId,
+ routineId,
+ entryOrList,
+ topic,
+ false,
+ true);
}
catch (ClusterTopologyCheckedException ex) {
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/e914572f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 5156455..d472054 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -834,7 +834,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
if (added) {
int cnt = lsnrCnt.incrementAndGet();
- if (cctx.group().sharedGroup() && cnt == 1)
+ if (cctx.group().sharedGroup() && cnt == 1 && !cctx.isLocal())
cctx.group().addCacheWithContinuousQuery(cctx);
}
}
@@ -866,7 +866,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
if ((lsnr = lsnrs.remove(id)) != null) {
int cnt = lsnrCnt.decrementAndGet();
- if (cctx.group().sharedGroup() && cnt == 0)
+ if (cctx.group().sharedGroup() && cnt == 0 && !cctx.isLocal())
cctx.group().removeCacheWithContinuousQuery(cctx);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e914572f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
index 2076981..7b70472 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
@@ -595,13 +595,14 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
GridTestUtils.runMultiThreaded(cls, "loaders");
}
- assertTrue("Expected: [cntr1=" + keys + ", cntr2=" + keys + "] " +
- "but was: [cntr1=" + cntr1.get() + ", cntr2=" + cntr2.get() + "]",
- GridTestUtils.waitForCondition(new PA() {
+ GridTestUtils.waitForCondition(new PA() {
@Override public boolean apply() {
return cntr1.get() == keys && cntr2.get() == keys;
}
- }, 2000));
+ }, 2000);
+
+ assertEquals(cntr1.get(), keys);
+ assertEquals(cntr2.get(), keys);
qry1.close();
@@ -610,12 +611,13 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
srv0.cache(CACHE1).putAll(map);
srv0.cache(CACHE2).putAll(map);
- assertTrue("Expected: <" + keys + 10 + "> but was: <" + cntr2.get() + ">",
- GridTestUtils.waitForCondition(new PA() {
+ GridTestUtils.waitForCondition(new PA() {
@Override public boolean apply() {
return cntr2.get() == keys + 10;
}
- }, 2000));
+ }, 2000);
+
+ assertEquals(keys + 10, cntr2.get());
assertEquals(keys, cntr1.get());
http://git-wip-us.apache.org/repos/asf/ignite/blob/e914572f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
index ed0dec0..32320f6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryConcurrentPartitionUpdateTest.java
@@ -91,15 +91,15 @@ public class CacheContinuousQueryConcurrentPartitionUpdateTest extends GridCommo
/**
* @throws Exception If failed.
*/
- public void testConcurrentUpdatePartitionTxCacheGroup() throws Exception {
- concurrentUpdatePartition(TRANSACTIONAL, true);
+ public void testConcurrentUpdatePartitionAtomicCacheGroup() throws Exception {
+ concurrentUpdatePartition(ATOMIC, true);
}
/**
* @throws Exception If failed.
*/
- public void testConcurrentUpdatePartitionAtomicCacheGroup() throws Exception {
- concurrentUpdatePartition(ATOMIC, true);
+ public void testConcurrentUpdatePartitionTxCacheGroup() throws Exception {
+ concurrentUpdatePartition(TRANSACTIONAL, true);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/e914572f/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteCacheAbstractBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteCacheAbstractBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteCacheAbstractBenchmark.java
index 6514dd7..9c0344d 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteCacheAbstractBenchmark.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteCacheAbstractBenchmark.java
@@ -233,8 +233,9 @@ public abstract class IgniteCacheAbstractBenchmark<K, V> extends IgniteAbstractB
loadCacheData(cacheName);
- println(cfg, "Finished populating data [cache=" + cacheName +
- ", time=" + ((System.nanoTime() - start) / 1_000_000) + "ms]");
+ long time = ((System.nanoTime() - start) / 1_000_000);
+
+ println(cfg, "Finished populating data [cache=" + cacheName + ", time=" + time + "ms]");
}
/**