You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2016/03/24 15:40:58 UTC

[2/3] ignite git commit: IGNITE-11048 WIP

IGNITE-11048 WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/060f4ce3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/060f4ce3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/060f4ce3

Branch: refs/heads/ignite-11048
Commit: 060f4ce353a47f968ad514a74aa71363235dd145
Parents: 27fc9d1
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Thu Mar 24 16:26:01 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Thu Mar 24 16:26:01 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     |  20 +-
 .../dht/atomic/GridDhtAtomicCache.java          |   4 +-
 ...ContinuousQueryFailoverAbstractSelfTest.java |   5 -
 ...acheContinuousQueryRandomOperationsTest.java | 196 +++++++++++++++----
 ...inuousQueryRandomOperationsTwoNodesTest.java |  28 +++
 5 files changed, 202 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/060f4ce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 323cbd6..c5df29b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -2092,7 +2092,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                             null,
                             null,
                             false,
-                            updateCntr0 == null ? 0 : updateCntr0);
+                            0);
                     }
                 }
                 else
@@ -2355,6 +2355,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 update(updated, newExpireTime, newTtl, newVer, true);
 
+                updateCntr0 = nextPartCounter(topVer);
+
+                if (updateCntr != null)
+                    updateCntr0 = updateCntr;
+
                 drReplicate(drType, updated, newVer, topVer);
 
                 recordNodeId(affNodeId, topVer);
@@ -2448,6 +2453,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 recordNodeId(affNodeId, topVer);
 
+                updateCntr0 = nextPartCounter(topVer);
+
+                if (updateCntr != null)
+                    updateCntr0 = updateCntr;
+
                 drReplicate(drType, null, newVer, topVer);
 
                 if (evt) {
@@ -2475,15 +2485,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                 res = hadVal;
             }
 
-            if (res) {
+            if (res)
                 updateMetrics(op, metrics);
 
-                updateCntr0 = nextPartCounter(topVer);
-
-                if (updateCntr != null)
-                    updateCntr0 = updateCntr;
-            }
-
             cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE, keepBinary);
 
             if (intercept) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/060f4ce3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index e908c05..4938794 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -2115,7 +2115,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                                 "[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']');
                     }
                 }
-                else if (lsnrs != null && updRes.success()) {
+                else if (lsnrs != null && updRes.updateCounter() != 0) {
                     ctx.continuousQueries().onEntryUpdated(
                         lsnrs,
                         entry.key(),
@@ -2868,7 +2868,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                         if (updRes.removeVersion() != null)
                             ctx.onDeferredDelete(entry, updRes.removeVersion());
 
-                        if (lsnrs != null && updRes.success()) {
+                        if (lsnrs != null && updRes.updateCounter() != 0) {
                             ctx.continuousQueries().onEntryUpdated(
                                 lsnrs,
                                 entry.key(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/060f4ce3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index f104f21..0596cdb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -809,11 +809,6 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
         checkBackupQueue(3, false);
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean isDebug() {
-        return true;
-    }
-
     /**
      * @param backups Number of backups.
      * @param updateFromClient If {@code true} executes cache update from client node.

http://git-wip-us.apache.org/repos/asf/ignite/blob/060f4ce3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
index 3d8d455..509d99d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
@@ -32,13 +32,13 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicInteger;
 import javax.cache.Cache;
 import javax.cache.configuration.Factory;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryListenerException;
 import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.event.EventType;
 import javax.cache.integration.CacheLoaderException;
 import javax.cache.integration.CacheWriterException;
 import javax.cache.processor.EntryProcessor;
@@ -58,6 +58,7 @@ import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.cache.store.CacheStoreAdapter;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -80,7 +81,6 @@ import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.ALL;
 import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.CLIENT;
 import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.SERVER;
@@ -96,7 +96,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
     private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
     /** */
-    private static final int NODES = 2;
+    private static final int NODES = 5;
 
     /** */
     private static final int KEYS = 50;
@@ -125,11 +125,11 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
 
-        startGridsMultiThreaded(NODES - 1);
+        startGridsMultiThreaded(getClientIndex());
 
         client = true;
 
-        startGrid(NODES - 1);
+        startGrid(getClientIndex());
     }
 
     /** {@inheritDoc} */
@@ -418,64 +418,172 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
         testContinuousQuery(ccfg, SERVER);
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDoubleRemoveAtomicWithoutBackup() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        testDoubleRemove(ccfg);
+    }
 
     /**
      * @throws Exception If failed.
      */
-    public void testDoubleRemove() throws Exception {
+    public void testDoubleRemoveAtomic() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        testDoubleRemove(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDoubleRemoveAtomicOffheap() throws Exception {
         CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
             0,
             ATOMIC,
+            OFFHEAP_TIERED,
+            false);
+
+        testDoubleRemove(ccfg);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDoubleRemoveTx() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
             ONHEAP_TIERED,
             false);
 
-        try {
-            IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(ccfg);
+        testDoubleRemove(ccfg);
+    }
 
-            final AtomicInteger cnrt = new AtomicInteger(0);
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDoubleRemoveReplicatedTx() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
 
-            ContinuousQuery qry = new ContinuousQuery();
+        testDoubleRemove(ccfg);
+    }
 
-            final List<Object> evts = new CopyOnWriteArrayList<>();
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDoubleRemoveReplicatedAtomic() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
 
-            qry.setLocalListener(new CacheEntryUpdatedListener() {
-                @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
-                    for (Object o : iterable) {
-                        evts.add(o);
+        testDoubleRemove(ccfg);
+    }
 
-                        cnrt.incrementAndGet();
-                    }
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDoubleRemove(CacheConfiguration ccfg) throws Exception {
+        IgniteCache<QueryTestKey, QueryTestValue> cache = grid(getClientIndex()).createCache(ccfg);
+
+        try {
+            ContinuousQuery<QueryTestKey, QueryTestValue> qry = new ContinuousQuery<>();
+
+            final List<CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> evts =
+                new CopyOnWriteArrayList<>();
+
+            qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() {
+                @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey,
+                    ? extends QueryTestValue>> events) throws CacheEntryListenerException {
+                    for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : events)
+                        evts.add(e);
                 }
             });
 
+            QueryTestKey key = new QueryTestKey(1);
+
             try (QueryCursor qryCur = cache.query(qry)) {
-                cache.put(1, 1);
+                for (int i = 0; i < ITERATION_CNT; i++) {
+                    log.info("Start iteration: " + i);
 
-                cache.remove(1);
-                cache.remove(1);
-                cache.remove(1);
-                cache.remove(1);
+                    cache.put(key, new QueryTestValue(1));
 
-                cache.put(1, 1);
+                    cache.remove(key);
+                    cache.invoke(key, (EntryProcessor<QueryTestKey, QueryTestValue, ? extends Object>)
+                        (Object)new EntrySetValueProcessor(null, false));
+                    cache.invoke(key, (EntryProcessor<QueryTestKey, QueryTestValue, ? extends Object>)
+                        (Object)new EntrySetValueProcessor(null, false));
+                    cache.remove(key);
 
-                cache.remove(1);
-                cache.remove(1);
+                    cache.put(key, new QueryTestValue(2));
 
-                cache.put(1, 1);
+                    cache.invoke(key, (EntryProcessor<QueryTestKey, QueryTestValue, ? extends Object>)
+                        (Object)new EntrySetValueProcessor(null, false));
+                    cache.remove(key);
 
-                assert GridTestUtils.waitForCondition(new PA() {
-                    @Override public boolean apply() {
-                        return cnrt.get() == 5;
-                    }
-                }, 5_000);
+                    cache.put(key, new QueryTestValue(3));
+                    cache.put(key, new QueryTestValue(4));
+
+                    assert GridTestUtils.waitForCondition(new PA() {
+                        @Override public boolean apply() {
+                            return evts.size() == 6;
+                        }
+                    }, 5_000);
+
+                    checkSingleEvent(evts.get(0), EventType.CREATED, new QueryTestValue(1), null);
+                    checkSingleEvent(evts.get(1), EventType.REMOVED, null, new QueryTestValue(1));
+                    checkSingleEvent(evts.get(2), EventType.CREATED, new QueryTestValue(2), null);
+                    checkSingleEvent(evts.get(3), EventType.REMOVED, null, new QueryTestValue(2));
+                    checkSingleEvent(evts.get(4), EventType.CREATED, new QueryTestValue(3), null);
+                    checkSingleEvent(evts.get(5), EventType.UPDATED, new QueryTestValue(4), new QueryTestValue(3));
+
+                    cache.remove(key);
+                    cache.remove(key);
+
+                    evts.clear();
+
+                    log.info("Finish iteration: " + i);
+                }
             }
         }
         finally {
-            grid(0).destroyCache(ccfg.getName());
+            grid(getClientIndex()).destroyCache(ccfg.getName());
         }
     }
 
     /**
+     * @param event Event.
+     * @param type Event type.
+     * @param val Value.
+     * @param oldVal Old value.
+     */
+    private void checkSingleEvent(
+        CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> event,
+        EventType type,
+        QueryTestValue val,
+        QueryTestValue oldVal) {
+        assertEquals(event.getEventType(), type);
+        assertEquals(event.getValue(), val);
+        assertEquals(event.getOldValue(), oldVal);
+    }
+
+    /**
      * @throws Exception If failed.
      */
     public void testTxClient() throws Exception {
@@ -717,7 +825,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
 
                 evtsQueues.add(evtsQueue);
 
-                QueryCursor<?> cur = grid(NODES - 1).cache(ccfg.getName()).query(qry);
+                QueryCursor<?> cur = grid(getClientIndex()).cache(ccfg.getName()).query(qry);
 
                 curs.add(cur);
             }
@@ -735,12 +843,12 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
 
                 evtsQueues.add(evtsQueue);
 
-                QueryCursor<?> cur = grid(rnd.nextInt(NODES - 1)).cache(ccfg.getName()).query(qry);
+                QueryCursor<?> cur = grid(rnd.nextInt(getClientIndex())).cache(ccfg.getName()).query(qry);
 
                 curs.add(cur);
             }
             else {
-                for (int i = 0; i < NODES - 1; i++) {
+                for (int i = 0; i < getClientIndex(); i++) {
                     ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
 
                     final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000);
@@ -769,7 +877,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
                     if (i % 20 == 0)
                         log.info("Iteration: " + i);
 
-                    for (int idx = 0; idx < NODES; idx++)
+                    for (int idx = 0; idx < getNodes(); idx++)
                         randomUpdate(rnd, evtsQueues, expData, partCntr, grid(idx).cache(ccfg.getName()));
                 }
             }
@@ -784,6 +892,20 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
     }
 
     /**
+     * @return Client node index.
+     */
+    private int getClientIndex() {
+        return getNodes() - 1;
+    }
+
+    /**
+     * @return Count nodes.
+     */
+    protected int getNodes() {
+        return NODES;
+    }
+
+    /**
      * @param rnd Random generator.
      * @param evtsQueues Events queue.
      * @param expData Expected cache data.
@@ -1333,9 +1455,11 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract
      */
     public static class QueryTestValue implements Serializable {
         /** */
+        @GridToStringInclude
         protected final Integer val1;
 
         /** */
+        @GridToStringInclude
         protected final String val2;
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/060f4ce3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTwoNodesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTwoNodesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTwoNodesTest.java
new file mode 100644
index 0000000..2bbbb9f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTwoNodesTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+/**
+ *
+ */
+public class CacheContinuousQueryRandomOperationsTwoNodesTest extends CacheContinuousQueryRandomOperationsTest {
+    /** {@inheritDoc} */
+    @Override protected int getNodes() {
+        return 2;
+    }
+}