You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/10/28 14:15:20 UTC
[44/50] [abbrv] ignite git commit: IGNITE-426 Temp
IGNITE-426 Temp
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a8882fd3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a8882fd3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a8882fd3
Branch: refs/heads/ignite-426-2-reb
Commit: a8882fd3ba6c180c71738ae3034627bc57e24e48
Parents: 878a87b
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Thu Oct 22 15:17:28 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Wed Oct 28 15:24:33 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMapEntry.java | 10 ++--
.../dht/GridDhtPartitionTopologyImpl.java | 5 ++
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 1 -
.../continuous/CacheContinuousQueryHandler.java | 9 ----
...acheContinuousQueryFailoverAbstractTest.java | 14 +++--
...ridCacheContinuousQueryAbstractSelfTest.java | 56 ++++++++++++++++++++
.../GridCacheContinuousQueryTxSelfTest.java | 49 +++++++++++++++++
.../IgniteCacheQuerySelfTestSuite.java | 2 +
8 files changed, 125 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a8882fd3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index bbd2ce0..b445619 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1187,9 +1187,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
subjId, null, taskName);
}
- if (!isNear() &&
- // Ignore events on backups for one phase commit.
- !(tx.onePhaseCommit() && updateIdx != null && updateIdx == 0))
+ if (cctx.isLocal() || cctx.isReplicated() ||
+ (!isNear() && !(tx != null && !tx.onePhaseCommit() && !tx.local())))
cctx.continuousQueries().onEntryUpdated(this, key, val, old, tx.local(), false, updateIdx0, topVer);
cctx.dataStructures().onEntryUpdated(key, false);
@@ -1364,9 +1363,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
taskName);
}
- if (!isNear() &&
- // Ignore events on backups for one phase commit.
- !(tx.onePhaseCommit() && updateIdx != null && updateIdx == 0))
+ if (cctx.isLocal() || cctx.isReplicated() ||
+ (!isNear() && !(tx != null && !tx.onePhaseCommit() && !tx.local())))
cctx.continuousQueries().onEntryUpdated(this, key, null, old, tx.local(), false, updateIdx0, topVer);
cctx.dataStructures().onEntryUpdated(key, true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a8882fd3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 1195ddd..a210a29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -1356,6 +1356,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (!rebalancedTopVer.equals(topVer)) {
for (int i = 0; i < cctx.affinity().partitions(); i++) {
List<ClusterNode> affNodes = cctx.affinity().nodes(i, topVer);
+
+ // Topology doesn't contain server nodes (just clients).
+ if (affNodes.isEmpty())
+ continue;
+
List<ClusterNode> owners = owners(i);
if (affNodes.size() != owners.size() || !owners.containsAll(affNodes))
http://git-wip-us.apache.org/repos/asf/ignite/blob/a8882fd3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 169e6a7..d9c12eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -36,7 +36,6 @@ import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
http://git-wip-us.apache.org/repos/asf/ignite/blob/a8882fd3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 8e20fbc..bd44180 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -63,7 +63,6 @@ import org.apache.ignite.internal.processors.platform.cache.query.PlatformContin
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -576,8 +575,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
/** */
private final Map<Long, CacheContinuousQueryEntry> pendingEnts = new TreeMap<>();
- private List<T2<Long, CacheContinuousQueryEntry>> firedEvents = new ArrayList<>();
-
/**
* @param log Logger.
*/
@@ -601,8 +598,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
if (lastFiredEvt == INIT_VALUE) {
lastFiredEvt = entry.updateIndex();
- firedEvents.add(new T2<>(lastFiredEvt, entry));
-
return F.asList(entry);
}
@@ -612,8 +607,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
lastFiredEvt = 1;
- firedEvents.add(new T2<>(lastFiredEvt, entry));
-
return F.asList(entry);
}
@@ -643,8 +636,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
entries.add(e.getValue());
- firedEvents.add(new T2<>(e.getKey(), e.getValue()));
-
iter.remove();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a8882fd3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
index 6029761..6979f6a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
@@ -1564,11 +1564,15 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
restartFut.get();
- boolean check = GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- return checkEvents(false, expEvts, lsnr);
- }
- }, 10_000);
+ boolean check = true;
+
+ if (!expEvts.isEmpty()) {
+ check = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return checkEvents(false, expEvts, lsnr);
+ }
+ }, 10_000);
+ }
if (!check)
assertTrue(checkEvents(true, expEvts, lsnr));
http://git-wip-us.apache.org/repos/asf/ignite/blob/a8882fd3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
index 46a5f8c..637d8a2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
@@ -75,6 +75,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
@@ -368,6 +369,61 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
/**
* @throws Exception If failed.
*/
+ public void testTwoQueryListener() throws Exception {
+ if (cacheMode() == LOCAL)
+ return;
+
+ IgniteCache<Integer, Integer> cache = grid(0).cache(null);
+ IgniteCache<Integer, Integer> cache1 = grid(1).cache(null);
+
+ final AtomicInteger cntr = new AtomicInteger(0);
+ final AtomicInteger cntr1 = new AtomicInteger(0);
+
+ ContinuousQuery<Integer, Integer> qry1 = new ContinuousQuery<>();
+ ContinuousQuery<Integer, Integer> qry2 = new ContinuousQuery<>();
+
+ qry1.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ for (CacheEntryEvent<? extends Integer, ? extends Integer> ignore : evts)
+ cntr.incrementAndGet();
+ }
+ });
+
+ qry2.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+ @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+ for (CacheEntryEvent<? extends Integer, ? extends Integer> ignore : evts)
+ cntr1.incrementAndGet();
+ }
+ });
+
+ try (QueryCursor<Cache.Entry<Integer, Integer>> query2 = cache1.query(qry2);
+ QueryCursor<Cache.Entry<Integer, Integer>> query1 = cache.query(qry1)) {
+ for (int i = 0; i < gridCount(); i++) {
+ IgniteCache<Object, Object> cache0 = grid(i).cache(null);
+
+ cache0.put(1, 1);
+ cache0.put(2, 2);
+ cache0.put(3, 3);
+
+ cache0.remove(1);
+ cache0.remove(2);
+ cache0.remove(3);
+
+ final int iter = i + 1;
+
+ assert GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return iter * 6 /* count operation */ * 2 /* count continues queries*/
+ == (cntr.get() + cntr1.get());
+ }
+ }, 5000L);
+ }
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testEntriesByFilter() throws Exception {
IgniteCache<Integer, Integer> cache = grid(0).cache(null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/a8882fd3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxSelfTest.java
new file mode 100644
index 0000000..91b6b9c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxSelfTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+/**
+ * Continuous queries tests for atomic cache.
+ */
+public class GridCacheContinuousQueryTxSelfTest extends GridCacheContinuousQueryPartitionedSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return TRANSACTIONAL;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return CacheMode.REPLICATED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected NearCacheConfiguration nearConfiguration() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void testInternalKey() throws Exception {
+ // No-op.
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/a8882fd3/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 91dc388..e16dffc 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -81,6 +81,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedOneNodeSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedP2PDisabledSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryTxSelfTest;
import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientReconnectTest;
import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest;
import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTxReconnectTest;
@@ -163,6 +164,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(GridCacheContinuousQueryPartitionedSelfTest.class);
suite.addTestSuite(GridCacheContinuousQueryPartitionedOnlySelfTest.class);
suite.addTestSuite(GridCacheContinuousQueryPartitionedP2PDisabledSelfTest.class);
+ suite.addTestSuite(GridCacheContinuousQueryTxSelfTest.class);
suite.addTestSuite(GridCacheContinuousQueryAtomicSelfTest.class);
suite.addTestSuite(GridCacheContinuousQueryAtomicNearEnabledSelfTest.class);
suite.addTestSuite(GridCacheContinuousQueryAtomicP2PDisabledSelfTest.class);