You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/19 17:18:42 UTC

[33/50] [abbrv] ignite git commit: IGNITE-2468

 IGNITE-2468


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/725d6cb5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/725d6cb5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/725d6cb5

Branch: refs/heads/ignite-961
Commit: 725d6cb557684ac8f31dfde8f5fcb4ddb95a18dd
Parents: 763bf57
Author: Anton Vinogradov <av...@apache.org>
Authored: Fri Feb 12 14:08:25 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Fri Feb 12 14:08:25 2016 +0300

----------------------------------------------------------------------
 .../internal/GridMessageListenHandler.java      |  16 ++
 .../continuous/GridContinuousProcessor.java     |  50 +++--
 ...eClientReconnectContinuousProcessorTest.java |  32 +++-
 ...IgniteCacheContinuousQueryReconnectTest.java | 192 +++++++++++++++++++
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |   2 +-
 .../IgniteCacheQuerySelfTestSuite.java          |   2 +
 6 files changed, 279 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/725d6cb5/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index 13aeb54..bf81944 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -83,6 +83,22 @@ public class GridMessageListenHandler implements GridContinuousHandler {
         this.pred = pred;
     }
 
+    /**
+     *
+     * @param orig Handler to be copied.
+     */
+    public GridMessageListenHandler(GridMessageListenHandler orig) {
+        assert orig != null;
+
+        this.clsName = orig.clsName;
+        this.depInfo = orig.depInfo;
+        this.pred = orig.pred;
+        this.predBytes = orig.predBytes;
+        this.topic = orig.topic;
+        this.topicBytes = orig.topicBytes;
+        this.depEnabled = false;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean isEvents() {
         return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/725d6cb5/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 0218897..496f820 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -40,6 +40,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridMessageListenHandler;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteDeploymentCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -428,11 +429,17 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                         ctx.resource().injectGeneric(item.prjPred);
 
                     // Register handler only if local node passes projection predicate.
-                    if (item.prjPred == null || item.prjPred.apply(ctx.discovery().localNode())) {
+                    if ((item.prjPred == null || item.prjPred.apply(ctx.discovery().localNode())) &&
+                        !locInfos.containsKey(item.routineId)) {
                         if (registerHandler(data.nodeId, item.routineId, item.hnd, item.bufSize, item.interval,
                             item.autoUnsubscribe, false))
                             item.hnd.onListenerRegistered(item.routineId, ctx);
                     }
+
+                    if (!item.autoUnsubscribe)
+                        // Register routine locally.
+                        locInfos.putIfAbsent(item.routineId, new LocalRoutineInfo(
+                            item.prjPred, item.hnd, item.bufSize, item.interval, item.autoUnsubscribe));
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to register continuous handler.", e);
@@ -854,6 +861,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             U.error(log, "Failed to register handler [nodeId=" + node.id() + ", routineId=" + routineId + ']', e);
         }
 
+        GridContinuousHandler hnd0 = hnd instanceof GridMessageListenHandler ?
+            new GridMessageListenHandler((GridMessageListenHandler)hnd) :
+            hnd;
+
         if (node.isClient()) {
             Map<UUID, LocalRoutineInfo> clientRoutineMap = clientInfos.get(node.id());
 
@@ -866,7 +877,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             }
 
             clientRoutineMap.put(routineId, new LocalRoutineInfo(data.projectionPredicate(),
-                hnd,
+                hnd0,
                 data.bufferSize(),
                 data.interval(),
                 data.autoUnsubscribe()));
@@ -881,10 +892,16 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 if (prjPred != null)
                     ctx.resource().injectGeneric(prjPred);
 
-                if (prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) {
-                    registered = registerHandler(node.id(), routineId, hnd, data.bufferSize(), data.interval(),
+                if ((prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) &&
+                    !locInfos.containsKey(routineId)) {
+                    registered = registerHandler(node.id(), routineId, hnd0, data.bufferSize(), data.interval(),
                         data.autoUnsubscribe(), false);
                 }
+
+                if (!data.autoUnsubscribe())
+                    // Register routine locally.
+                    locInfos.putIfAbsent(routineId, new LocalRoutineInfo(
+                        prjPred, hnd0, data.bufferSize(), data.interval(), data.autoUnsubscribe()));
             }
             catch (IgniteCheckedException e) {
                 err = e;
@@ -894,11 +911,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         }
 
         // Load partition counters.
-        if (hnd.isQuery()) {
+        if (hnd0.isQuery()) {
             GridCacheProcessor proc = ctx.cache();
 
             if (proc != null) {
-                GridCacheAdapter cache = ctx.cache().internalCache(hnd.cacheName());
+                GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName());
 
                 if (cache != null && !cache.isLocal()) {
                     Map<Integer, Long> cntrs = cache.context().topology().updateCounters();
@@ -912,7 +929,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             req.addError(ctx.localNodeId(), err);
 
         if (registered)
-            hnd.onListenerRegistered(routineId, ctx);
+            hnd0.onListenerRegistered(routineId, ctx);
     }
 
     /**
@@ -1095,22 +1112,31 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
      */
     @SuppressWarnings("TooBroadScope")
     private void unregisterRemote(UUID routineId) {
-        RemoteRoutineInfo info;
+        RemoteRoutineInfo remote;
+        LocalRoutineInfo loc;
 
         stopLock.lock();
 
         try {
-            info = rmtInfos.remove(routineId);
+            remote = rmtInfos.remove(routineId);
 
-            if (info == null)
+            loc = locInfos.remove(routineId);
+
+            if (remote == null)
                 stopped.add(routineId);
         }
         finally {
             stopLock.unlock();
         }
 
-        if (info != null)
-            unregisterHandler(routineId, info.hnd, false);
+        if (remote != null)
+            unregisterHandler(routineId, remote.hnd, false);
+        else {
+            assert loc != null;
+
+            // Removes routine at node started it when stopRoutine called from another node.
+            unregisterHandler(routineId, loc.hnd, false);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/725d6cb5/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
index dc94c96..4c44adc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java
@@ -113,7 +113,21 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
     /**
      * @throws Exception If failed.
      */
-    public void testMessageListenerReconnect() throws Exception {
+    public void testMessageListenerReconnectAndStopFromServer() throws Exception {
+        testMessageListenerReconnect(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMessageListenerReconnectAndStopFromClient() throws Exception {
+        testMessageListenerReconnect(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void testMessageListenerReconnect(boolean stopFromClient) throws Exception {
         Ignite client = grid(serverCount());
 
         assertTrue(client.cluster().localNode().isClient());
@@ -166,7 +180,7 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
 
         log.info("Stop listen, should not get remote messages anymore.");
 
-        client.message().stopRemoteListen(opId);
+        (stopFromClient ? client : srv).message().stopRemoteListen(opId);
 
         srv.message().send(topic, "msg3");
 
@@ -175,6 +189,20 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe
 
         assertTrue(locLsnr.latch.await(5000, MILLISECONDS));
         assertFalse(latch.await(3000, MILLISECONDS));
+
+        log.info("New nodes should not register stopped listeners.");
+
+        startGrid(serverCount() + 1);
+
+        srv.message().send(topic, "msg4");
+
+        locLsnr.latch = new CountDownLatch(1);
+        latch = new CountDownLatch(1);
+
+        assertTrue(locLsnr.latch.await(5000, MILLISECONDS));
+        assertFalse(latch.await(3000, MILLISECONDS));
+
+        stopGrid(serverCount() + 1);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/725d6cb5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryReconnectTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryReconnectTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryReconnectTest.java
new file mode 100644
index 0000000..b1d8a49
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryReconnectTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import javax.cache.CacheException;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteClientDisconnectedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheContinuousQueryReconnectTest extends GridCommonAbstractTest implements Serializable {
+    /** */
+    final private static AtomicInteger cnt = new AtomicInteger();
+
+    /** */
+    private volatile boolean isClient = false;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setAtomicityMode(atomicMode());
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setBackups(1);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        if (isClient)
+            cfg.setClientMode(true);
+
+        return cfg;
+    }
+
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @return Atomic mode.
+     */
+    protected CacheAtomicityMode atomicMode() {
+        return ATOMIC;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectServer() throws Exception {
+        testReconnect(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectClient() throws Exception {
+        testReconnect(true);
+    }
+
+    /**
+     *
+     */
+    private void putAndCheck(IgniteCache<Object, Object> cache, int diff) {
+        cnt.set(0);
+
+        cache.put(1, "1");
+
+        assertEquals(diff, cnt.get());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void testReconnect(boolean clientQuery) throws Exception {
+        Ignite srv1 = startGrid(0);
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+            @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
+                // No-op.
+            }
+        });
+
+        qry.setAutoUnsubscribe(false);
+
+        qry.setRemoteFilter(new CacheEntryEventSerializableFilter<Object, Object>() {
+            @Override public boolean evaluate(CacheEntryEvent<?, ?> event) throws CacheEntryListenerException {
+                cnt.incrementAndGet();
+
+                return true;
+            }
+        });
+
+        isClient = true;
+
+        Ignite client = startGrid(1);
+
+        isClient = false;
+
+        IgniteCache<Object, Object> cache1 = srv1.cache(null);
+        IgniteCache<Object, Object> clCache = client.cache(null);
+
+        putAndCheck(clCache, 0); // 0 remote listeners.
+
+        QueryCursor<Cache.Entry<Object, Object>> cur = (clientQuery ? clCache : cache1).query(qry);
+
+        putAndCheck(clCache, 1); // 1 remote listener.
+
+        final Ignite srv2 = startGrid(2);
+
+        putAndCheck(clCache, 2); // 2 remote listeners.
+
+        stopGrid(0);
+
+        while (true) {
+            try {
+                clCache.get(1);
+
+                break;
+            }
+            catch (IgniteClientDisconnectedException e) {
+                e.reconnectFuture().get(); // Wait for reconnect.
+
+            }
+            catch (CacheException e) {
+                if (e.getCause() instanceof IgniteClientDisconnectedException)
+                    ((IgniteClientDisconnectedException)e.getCause()).reconnectFuture().get(); // Wait for reconnect.
+            }
+        }
+
+        putAndCheck(clCache, 1); // 1 remote listener.
+
+        Ignite srv3 = startGrid(3);
+
+        putAndCheck(clCache, 2); // 2 remote listeners.
+
+        stopGrid(1); // Client node.
+
+        isClient = true;
+
+        client = startGrid(4);
+
+        isClient = false;
+
+        clCache = client.cache(null);
+
+        putAndCheck(clCache, 2); // 2 remote listeners.
+
+        Ignite srv4 = startGrid(5);
+
+        putAndCheck(clCache, 3); // 3 remote listeners.
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/725d6cb5/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 030c653..7debb41 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -2050,7 +2050,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override public boolean apply(UUID uuid, Object msg) {
-            X.println(">>> Received [locNodeId=" + ignite.configuration().getNodeId() + ", msg=" + msg + ']');
+            X.println(">>> Received [node=" + ignite.name() + ", msg=" + msg + ']');
 
             msgLatch.countDown();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/725d6cb5/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 3cd4579..cecb8ad 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -100,6 +100,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon
 import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientReconnectTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTxReconnectTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryReconnectTest;
 import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryAtomicSelfTest;
 import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryLocalSelfTest;
 import org.apache.ignite.internal.processors.cache.reducefields.GridCacheReduceFieldsQueryPartitionedSelfTest;
@@ -200,6 +201,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheContinuousQueryPartitionTxOneNodeTest.class);
         suite.addTestSuite(GridCacheContinuousQueryPartitionAtomicOneNodeTest.class);
         suite.addTestSuite(IgniteCacheContinuousQueryClientTest.class);
+        suite.addTestSuite(IgniteCacheContinuousQueryReconnectTest.class);
         suite.addTestSuite(IgniteCacheContinuousQueryClientReconnectTest.class);
         suite.addTestSuite(IgniteCacheContinuousQueryClientTxReconnectTest.class);
         suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest.class);