You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/09/07 22:46:44 UTC

[17/50] [abbrv] ignite git commit: # ignite-901 client reconnect support

http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectQueriesTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectQueriesTest.java
new file mode 100644
index 0000000..547adcb
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectQueriesTest.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.cache.query.annotations.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.testframework.*;
+
+import javax.cache.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectQueriesTest extends IgniteClientReconnectAbstractTest {
+    /** */
+    public static final String QUERY_CACHE = "query";
+
+    /** {@inheritDoc} */
+    @Override protected int serverCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int clientCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration<Integer, Person> ccfg = new CacheConfiguration<Integer, Person>(QUERY_CACHE)
+            .setCacheMode(PARTITIONED)
+            .setAtomicityMode(ATOMIC)
+            .setBackups(1)
+            .setIndexedTypes(Integer.class, Person.class);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        grid(0).getOrCreateCache(QUERY_CACHE).removeAll();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testQueryReconnect() throws Exception {
+        Ignite cln = grid(serverCount());
+
+        assertTrue(cln.cluster().localNode().isClient());
+
+        final Ignite srv = clientRouter(cln);
+
+        final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE);
+
+        final IgniteCache<Integer, Person> srvCache = srv.getOrCreateCache(QUERY_CACHE);
+
+        clnCache.put(1, new Person(1, "name1", "surname1"));
+        clnCache.put(2, new Person(2, "name2", "surname2"));
+        clnCache.put(3, new Person(3, "name3", "surname3"));
+
+        final SqlQuery<Integer, Person> qry = new SqlQuery<>(Person.class, "_key <> 0");
+
+        qry.setPageSize(1);
+
+        QueryCursor<Cache.Entry<Integer, Person>> cur = clnCache.query(qry);
+
+        reconnectClientNode(cln, srv, new Runnable() {
+            @Override public void run() {
+                srvCache.put(4, new Person(4, "name4", "surname4"));
+
+                try {
+                    clnCache.query(qry);
+
+                    fail();
+                } catch (CacheException e) {
+                    check(e);
+                }
+            }
+        });
+
+        List<Cache.Entry<Integer, Person>> res = cur.getAll();
+
+        assertNotNull(res);
+        assertEquals(4, res.size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReconnectQueryInProgress() throws Exception {
+        Ignite cln = grid(serverCount());
+
+        assertTrue(cln.cluster().localNode().isClient());
+
+        final Ignite srv = clientRouter(cln);
+
+        final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE);
+
+        clnCache.put(1, new Person(1, "name1", "surname1"));
+        clnCache.put(2, new Person(2, "name2", "surname2"));
+        clnCache.put(3, new Person(3, "name3", "surname3"));
+
+        blockMessage(GridQueryNextPageResponse.class);
+
+        final SqlQuery<Integer, Person> qry = new SqlQuery<>(Person.class, "_key <> 0");
+
+        qry.setPageSize(1);
+
+        final QueryCursor<Cache.Entry<Integer, Person>> cur1 = clnCache.query(qry);
+
+        final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                try {
+                    cur1.getAll();
+                }
+                catch (CacheException e) {
+                    checkAndWait(e);
+
+                    return true;
+                }
+
+                return false;
+            }
+        });
+
+        // Check that client waiting operation.
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return fut.get(200);
+            }
+        }, IgniteFutureTimeoutCheckedException.class, null);
+
+        assertNotDone(fut);
+
+        unblockMessage();
+
+        reconnectClientNode(cln, srv, null);
+
+        assertTrue((Boolean) fut.get(2, SECONDS));
+
+        QueryCursor<Cache.Entry<Integer, Person>> cur2 = clnCache.query(qry);
+
+        assertEquals(3, cur2.getAll().size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testScanQueryReconnect() throws Exception {
+        Ignite cln = grid(serverCount());
+
+        assertTrue(cln.cluster().localNode().isClient());
+
+        final Ignite srv = clientRouter(cln);
+
+        final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE);
+
+        final IgniteCache<Integer, Person> srvCache = srv.getOrCreateCache(QUERY_CACHE);
+
+        for (int i = 0; i < 10_000; i++)
+            clnCache.put(i, new Person(i, "name-" + i, "surname-" + i));
+
+        final ScanQuery<Integer, Person> scanQry = new ScanQuery<>();
+
+        scanQry.setPageSize(1);
+
+        scanQry.setFilter(new IgniteBiPredicate<Integer, Person>() {
+            @Override public boolean apply(Integer integer, Person person) {
+                return true;
+            }
+        });
+
+        QueryCursor<Cache.Entry<Integer, Person>> qryCursor = clnCache.query(scanQry);
+
+        reconnectClientNode(cln, srv, new Runnable() {
+            @Override public void run() {
+                srvCache.put(10_001, new Person(10_001, "name", "surname"));
+
+                try {
+                    clnCache.query(scanQry);
+
+                    fail();
+                } catch (CacheException e) {
+                    check(e);
+                }
+            }
+        });
+
+        try {
+            qryCursor.getAll();
+
+            fail();
+        }
+        catch (CacheException e) {
+            checkAndWait(e);
+        }
+
+        qryCursor = clnCache.query(scanQry);
+
+        assertEquals(10_001, qryCursor.getAll().size());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testScanQueryReconnectInProgress1() throws Exception {
+        scanQueryReconnectInProgress(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testScanQueryReconnectInProgress2() throws Exception {
+        scanQueryReconnectInProgress(true);
+    }
+
+    /**
+     * @param setPart If {@code true} sets partition for scan query.
+     * @throws Exception If failed.
+     */
+    private void scanQueryReconnectInProgress(boolean setPart) throws Exception {
+        Ignite cln = grid(serverCount());
+
+        assertTrue(cln.cluster().localNode().isClient());
+
+        final Ignite srv = clientRouter(cln);
+
+        final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE);
+
+        clnCache.put(1, new Person(1, "name1", "surname1"));
+        clnCache.put(2, new Person(2, "name2", "surname2"));
+        clnCache.put(3, new Person(3, "name3", "surname3"));
+
+        final ScanQuery<Integer, Person> scanQry = new ScanQuery<>();
+
+        scanQry.setPageSize(1);
+
+        scanQry.setFilter(new IgniteBiPredicate<Integer, Person>() {
+            @Override public boolean apply(Integer integer, Person person) {
+                return true;
+            }
+        });
+
+        if (setPart)
+            scanQry.setPartition(1);
+
+        blockMessage(GridCacheQueryResponse.class);
+
+        final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                try {
+                    QueryCursor<Cache.Entry<Integer, Person>> qryCursor = clnCache.query(scanQry);
+
+                    qryCursor.getAll();
+                }
+                catch (CacheException e) {
+                    checkAndWait(e);
+
+                    return true;
+                }
+
+                return false;
+            }
+        });
+
+        // Check that client waiting operation.
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                return fut.get(200);
+            }
+        }, IgniteFutureTimeoutCheckedException.class, null);
+
+        assertNotDone(fut);
+
+        unblockMessage();
+
+        reconnectClientNode(cln, srv, null);
+
+        assertTrue((Boolean)fut.get(2, SECONDS));
+
+        QueryCursor<Cache.Entry<Integer, Person>> qryCursor2 = clnCache.query(scanQry);
+
+        assertEquals(setPart ? 1 : 3, qryCursor2.getAll().size());
+    }
+
+    /**
+     * @param clazz Message class.
+     */
+    private void blockMessage(Class<?> clazz) {
+        for (int i = 0; i < serverCount(); i++) {
+            BlockTpcCommunicationSpi commSpi = commSpi(grid(i));
+
+            commSpi.blockMessage(clazz);
+        }
+    }
+
+    /**
+     *
+     */
+    private void unblockMessage() {
+        for (int i = 0; i < serverCount(); i++) {
+            BlockTpcCommunicationSpi commSpi = commSpi(grid(i));
+
+            commSpi.unblockMessage();
+        }
+    }
+
+    /**
+     *
+     */
+    public static class Person {
+        /** */
+        @QuerySqlField
+        public int id;
+
+        /** */
+        @QuerySqlField
+        public String name;
+
+        /** */
+        @QuerySqlField
+        public String surname;
+
+        /**
+         * @param id Id.
+         * @param name Name.
+         * @param surname Surname.
+         */
+        public Person(int id, String name, String surname) {
+            this.id = id;
+            this.name = name;
+            this.surname = surname;
+        }
+
+        /**
+         * @return Id.
+         */
+        public int getId() {
+            return id;
+        }
+
+        /**
+         * @param id Set id.
+         */
+        public void setId(int id) {
+            this.id = id;
+        }
+
+        /**
+         * @return Name.
+         */
+        public String getName() {
+            return name;
+        }
+
+        /**
+         * @param name Name.
+         */
+        public void setName(String name) {
+            this.name = name;
+        }
+
+        /**
+         * @return Surname.
+         */
+        public String getSurname() {
+            return surname;
+        }
+
+        /**
+         * @param surname Surname.
+         */
+        public void setSurname(String surname) {
+            this.surname = surname;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            return this == o || !(o == null || getClass() != o.getClass()) && id == ((Person)o).id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(Person.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java
index 8ab2485..ca80acf 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java
@@ -375,6 +375,7 @@ public abstract class GridCacheAbstractReduceFieldsQuerySelfTest extends GridCom
         /** */
         private int cnt;
 
+        /** {@inheritDoc} */
         @Override public boolean collect(List<?> e) {
             sum += (Integer)e.get(0);
 
@@ -383,6 +384,7 @@ public abstract class GridCacheAbstractReduceFieldsQuerySelfTest extends GridCom
             return true;
         }
 
+        /** {@inheritDoc} */
         @Override public IgniteBiTuple<Integer, Integer> reduce() {
             return F.t(sum, cnt);
         }
@@ -398,6 +400,7 @@ public abstract class GridCacheAbstractReduceFieldsQuerySelfTest extends GridCom
         /** */
         private int cnt;
 
+        /** {@inheritDoc} */
         @Override public boolean collect(IgniteBiTuple<Integer, Integer> t) {
             sum += t.get1();
             cnt += t.get2();
@@ -405,6 +408,7 @@ public abstract class GridCacheAbstractReduceFieldsQuerySelfTest extends GridCom
             return true;
         }
 
+        /** {@inheritDoc} */
         @Override public Integer reduce() {
             return cnt == 0 ? 0 : sum / cnt;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
index 67ebda9..cc01540 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
@@ -52,6 +52,7 @@ public class IgniteCacheWithIndexingTestSuite extends TestSuite {
         suite.addTestSuite(CacheConfigurationP2PTest.class);
 
         suite.addTestSuite(IgniteCacheConfigurationPrimitiveTypesSelfTest.class);
+        suite.addTestSuite(IgniteClientReconnectQueriesTest.class);
 
         return suite;
     }