You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/10/28 14:15:20 UTC

[44/50] [abbrv] ignite git commit: IGNITE-426 Temp

IGNITE-426 Temp


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a8882fd3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a8882fd3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a8882fd3

Branch: refs/heads/ignite-426-2-reb
Commit: a8882fd3ba6c180c71738ae3034627bc57e24e48
Parents: 878a87b
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Thu Oct 22 15:17:28 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Wed Oct 28 15:24:33 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     | 10 ++--
 .../dht/GridDhtPartitionTopologyImpl.java       |  5 ++
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  1 -
 .../continuous/CacheContinuousQueryHandler.java |  9 ----
 ...acheContinuousQueryFailoverAbstractTest.java | 14 +++--
 ...ridCacheContinuousQueryAbstractSelfTest.java | 56 ++++++++++++++++++++
 .../GridCacheContinuousQueryTxSelfTest.java     | 49 +++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite.java          |  2 +
 8 files changed, 125 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a8882fd3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index bbd2ce0..b445619 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1187,9 +1187,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     subjId, null, taskName);
             }
 
-            if (!isNear() &&
-                // Ignore events on backups for one phase commit.
-                !(tx.onePhaseCommit() && updateIdx != null && updateIdx == 0))
+            if (cctx.isLocal() || cctx.isReplicated() ||
+                (!isNear() && !(tx != null && !tx.onePhaseCommit() && !tx.local())))
                 cctx.continuousQueries().onEntryUpdated(this, key, val, old, tx.local(), false, updateIdx0, topVer);
 
             cctx.dataStructures().onEntryUpdated(key, false);
@@ -1364,9 +1363,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     taskName);
             }
 
-            if (!isNear() &&
-                // Ignore events on backups for one phase commit.
-                !(tx.onePhaseCommit() && updateIdx != null && updateIdx == 0))
+            if (cctx.isLocal() || cctx.isReplicated() ||
+                (!isNear() && !(tx != null && !tx.onePhaseCommit() && !tx.local())))
                 cctx.continuousQueries().onEntryUpdated(this, key, null, old, tx.local(), false, updateIdx0, topVer);
 
             cctx.dataStructures().onEntryUpdated(key, true);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a8882fd3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 1195ddd..a210a29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -1356,6 +1356,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
         if (!rebalancedTopVer.equals(topVer)) {
             for (int i = 0; i < cctx.affinity().partitions(); i++) {
                 List<ClusterNode> affNodes = cctx.affinity().nodes(i, topVer);
+
+                // Topology doesn't contain server nodes (just clients).
+                if (affNodes.isEmpty())
+                    continue;
+
                 List<ClusterNode> owners = owners(i);
 
                 if (affNodes.size() != owners.size() || !owners.containsAll(affNodes))

http://git-wip-us.apache.org/repos/asf/ignite/blob/a8882fd3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 169e6a7..d9c12eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -36,7 +36,6 @@ import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;

http://git-wip-us.apache.org/repos/asf/ignite/blob/a8882fd3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 8e20fbc..bd44180 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -63,7 +63,6 @@ import org.apache.ignite.internal.processors.platform.cache.query.PlatformContin
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -576,8 +575,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
         /** */
         private final Map<Long, CacheContinuousQueryEntry> pendingEnts = new TreeMap<>();
 
-        private List<T2<Long, CacheContinuousQueryEntry>> firedEvents = new ArrayList<>();
-
         /**
          * @param log Logger.
          */
@@ -601,8 +598,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
                 if (lastFiredEvt == INIT_VALUE) {
                     lastFiredEvt = entry.updateIndex();
 
-                    firedEvents.add(new T2<>(lastFiredEvt, entry));
-
                     return F.asList(entry);
                 }
 
@@ -612,8 +607,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
 
                     lastFiredEvt = 1;
 
-                    firedEvents.add(new T2<>(lastFiredEvt, entry));
-
                     return F.asList(entry);
                 }
 
@@ -643,8 +636,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
 
                         entries.add(e.getValue());
 
-                        firedEvents.add(new T2<>(e.getKey(), e.getValue()));
-
                         iter.remove();
                     }
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a8882fd3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
index 6029761..6979f6a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
@@ -1564,11 +1564,15 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
         restartFut.get();
 
-        boolean check = GridTestUtils.waitForCondition(new GridAbsPredicate() {
-            @Override public boolean apply() {
-                return checkEvents(false, expEvts, lsnr);
-            }
-        }, 10_000);
+        boolean check = true;
+
+        if (!expEvts.isEmpty()) {
+            check = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return checkEvents(false, expEvts, lsnr);
+                }
+            }, 10_000);
+        }
 
         if (!check)
             assertTrue(checkEvents(true, expEvts, lsnr));

http://git-wip-us.apache.org/repos/asf/ignite/blob/a8882fd3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
index 46a5f8c..637d8a2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
@@ -75,6 +75,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
@@ -368,6 +369,61 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
     /**
      * @throws Exception If failed.
      */
+    public void testTwoQueryListener() throws Exception {
+        if (cacheMode() == LOCAL)
+            return;
+
+        IgniteCache<Integer, Integer> cache = grid(0).cache(null);
+        IgniteCache<Integer, Integer> cache1 = grid(1).cache(null);
+
+        final AtomicInteger cntr = new AtomicInteger(0);
+        final AtomicInteger cntr1 = new AtomicInteger(0);
+
+        ContinuousQuery<Integer, Integer> qry1 = new ContinuousQuery<>();
+        ContinuousQuery<Integer, Integer> qry2 = new ContinuousQuery<>();
+
+        qry1.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> ignore : evts)
+                    cntr.incrementAndGet();
+            }
+        });
+
+        qry2.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> ignore : evts)
+                    cntr1.incrementAndGet();
+            }
+        });
+
+        try (QueryCursor<Cache.Entry<Integer, Integer>> query2 = cache1.query(qry2);
+            QueryCursor<Cache.Entry<Integer, Integer>> query1 = cache.query(qry1)) {
+            for (int i = 0; i < gridCount(); i++) {
+                IgniteCache<Object, Object> cache0 = grid(i).cache(null);
+
+                cache0.put(1, 1);
+                cache0.put(2, 2);
+                cache0.put(3, 3);
+
+                cache0.remove(1);
+                cache0.remove(2);
+                cache0.remove(3);
+
+                final int iter = i + 1;
+
+                assert GridTestUtils.waitForCondition(new PA() {
+                    @Override public boolean apply() {
+                        return iter * 6 /* count operation */ * 2 /* count continues queries*/
+                            == (cntr.get() + cntr1.get());
+                    }
+                }, 5000L);
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testEntriesByFilter() throws Exception {
         IgniteCache<Integer, Integer> cache = grid(0).cache(null);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a8882fd3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxSelfTest.java
new file mode 100644
index 0000000..91b6b9c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxSelfTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+/**
+ * Continuous queries tests for atomic cache.
+ */
+public class GridCacheContinuousQueryTxSelfTest extends GridCacheContinuousQueryPartitionedSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return TRANSACTIONAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.REPLICATED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected NearCacheConfiguration nearConfiguration() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void testInternalKey() throws Exception {
+        // No-op.
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a8882fd3/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 91dc388..e16dffc 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -81,6 +81,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedOneNodeSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedP2PDisabledSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryTxSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientReconnectTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTxReconnectTest;
@@ -163,6 +164,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheContinuousQueryPartitionedSelfTest.class);
         suite.addTestSuite(GridCacheContinuousQueryPartitionedOnlySelfTest.class);
         suite.addTestSuite(GridCacheContinuousQueryPartitionedP2PDisabledSelfTest.class);
+        suite.addTestSuite(GridCacheContinuousQueryTxSelfTest.class);
         suite.addTestSuite(GridCacheContinuousQueryAtomicSelfTest.class);
         suite.addTestSuite(GridCacheContinuousQueryAtomicNearEnabledSelfTest.class);
         suite.addTestSuite(GridCacheContinuousQueryAtomicP2PDisabledSelfTest.class);