You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/09/07 22:46:44 UTC
[17/50] [abbrv] ignite git commit: # ignite-901 client reconnect
support
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectQueriesTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectQueriesTest.java
new file mode 100644
index 0000000..547adcb
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectQueriesTest.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.cache.query.annotations.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.testframework.*;
+
+import javax.cache.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectQueriesTest extends IgniteClientReconnectAbstractTest {
+ /** */
+ public static final String QUERY_CACHE = "query";
+
+ /** {@inheritDoc} */
+ @Override protected int serverCount() {
+ return 3;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int clientCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ CacheConfiguration<Integer, Person> ccfg = new CacheConfiguration<Integer, Person>(QUERY_CACHE)
+ .setCacheMode(PARTITIONED)
+ .setAtomicityMode(ATOMIC)
+ .setBackups(1)
+ .setIndexedTypes(Integer.class, Person.class);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ grid(0).getOrCreateCache(QUERY_CACHE).removeAll();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testQueryReconnect() throws Exception {
+ Ignite cln = grid(serverCount());
+
+ assertTrue(cln.cluster().localNode().isClient());
+
+ final Ignite srv = clientRouter(cln);
+
+ final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE);
+
+ final IgniteCache<Integer, Person> srvCache = srv.getOrCreateCache(QUERY_CACHE);
+
+ clnCache.put(1, new Person(1, "name1", "surname1"));
+ clnCache.put(2, new Person(2, "name2", "surname2"));
+ clnCache.put(3, new Person(3, "name3", "surname3"));
+
+ final SqlQuery<Integer, Person> qry = new SqlQuery<>(Person.class, "_key <> 0");
+
+ qry.setPageSize(1);
+
+ QueryCursor<Cache.Entry<Integer, Person>> cur = clnCache.query(qry);
+
+ reconnectClientNode(cln, srv, new Runnable() {
+ @Override public void run() {
+ srvCache.put(4, new Person(4, "name4", "surname4"));
+
+ try {
+ clnCache.query(qry);
+
+ fail();
+ } catch (CacheException e) {
+ check(e);
+ }
+ }
+ });
+
+ List<Cache.Entry<Integer, Person>> res = cur.getAll();
+
+ assertNotNull(res);
+ assertEquals(4, res.size());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectQueryInProgress() throws Exception {
+ Ignite cln = grid(serverCount());
+
+ assertTrue(cln.cluster().localNode().isClient());
+
+ final Ignite srv = clientRouter(cln);
+
+ final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE);
+
+ clnCache.put(1, new Person(1, "name1", "surname1"));
+ clnCache.put(2, new Person(2, "name2", "surname2"));
+ clnCache.put(3, new Person(3, "name3", "surname3"));
+
+ blockMessage(GridQueryNextPageResponse.class);
+
+ final SqlQuery<Integer, Person> qry = new SqlQuery<>(Person.class, "_key <> 0");
+
+ qry.setPageSize(1);
+
+ final QueryCursor<Cache.Entry<Integer, Person>> cur1 = clnCache.query(qry);
+
+ final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try {
+ cur1.getAll();
+ }
+ catch (CacheException e) {
+ checkAndWait(e);
+
+ return true;
+ }
+
+ return false;
+ }
+ });
+
+ // Check that client waiting operation.
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return fut.get(200);
+ }
+ }, IgniteFutureTimeoutCheckedException.class, null);
+
+ assertNotDone(fut);
+
+ unblockMessage();
+
+ reconnectClientNode(cln, srv, null);
+
+ assertTrue((Boolean) fut.get(2, SECONDS));
+
+ QueryCursor<Cache.Entry<Integer, Person>> cur2 = clnCache.query(qry);
+
+ assertEquals(3, cur2.getAll().size());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testScanQueryReconnect() throws Exception {
+ Ignite cln = grid(serverCount());
+
+ assertTrue(cln.cluster().localNode().isClient());
+
+ final Ignite srv = clientRouter(cln);
+
+ final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE);
+
+ final IgniteCache<Integer, Person> srvCache = srv.getOrCreateCache(QUERY_CACHE);
+
+ for (int i = 0; i < 10_000; i++)
+ clnCache.put(i, new Person(i, "name-" + i, "surname-" + i));
+
+ final ScanQuery<Integer, Person> scanQry = new ScanQuery<>();
+
+ scanQry.setPageSize(1);
+
+ scanQry.setFilter(new IgniteBiPredicate<Integer, Person>() {
+ @Override public boolean apply(Integer integer, Person person) {
+ return true;
+ }
+ });
+
+ QueryCursor<Cache.Entry<Integer, Person>> qryCursor = clnCache.query(scanQry);
+
+ reconnectClientNode(cln, srv, new Runnable() {
+ @Override public void run() {
+ srvCache.put(10_001, new Person(10_001, "name", "surname"));
+
+ try {
+ clnCache.query(scanQry);
+
+ fail();
+ } catch (CacheException e) {
+ check(e);
+ }
+ }
+ });
+
+ try {
+ qryCursor.getAll();
+
+ fail();
+ }
+ catch (CacheException e) {
+ checkAndWait(e);
+ }
+
+ qryCursor = clnCache.query(scanQry);
+
+ assertEquals(10_001, qryCursor.getAll().size());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testScanQueryReconnectInProgress1() throws Exception {
+ scanQueryReconnectInProgress(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testScanQueryReconnectInProgress2() throws Exception {
+ scanQueryReconnectInProgress(true);
+ }
+
+ /**
+ * @param setPart If {@code true} sets partition for scan query.
+ * @throws Exception If failed.
+ */
+ private void scanQueryReconnectInProgress(boolean setPart) throws Exception {
+ Ignite cln = grid(serverCount());
+
+ assertTrue(cln.cluster().localNode().isClient());
+
+ final Ignite srv = clientRouter(cln);
+
+ final IgniteCache<Integer, Person> clnCache = cln.getOrCreateCache(QUERY_CACHE);
+
+ clnCache.put(1, new Person(1, "name1", "surname1"));
+ clnCache.put(2, new Person(2, "name2", "surname2"));
+ clnCache.put(3, new Person(3, "name3", "surname3"));
+
+ final ScanQuery<Integer, Person> scanQry = new ScanQuery<>();
+
+ scanQry.setPageSize(1);
+
+ scanQry.setFilter(new IgniteBiPredicate<Integer, Person>() {
+ @Override public boolean apply(Integer integer, Person person) {
+ return true;
+ }
+ });
+
+ if (setPart)
+ scanQry.setPartition(1);
+
+ blockMessage(GridCacheQueryResponse.class);
+
+ final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try {
+ QueryCursor<Cache.Entry<Integer, Person>> qryCursor = clnCache.query(scanQry);
+
+ qryCursor.getAll();
+ }
+ catch (CacheException e) {
+ checkAndWait(e);
+
+ return true;
+ }
+
+ return false;
+ }
+ });
+
+ // Check that client waiting operation.
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return fut.get(200);
+ }
+ }, IgniteFutureTimeoutCheckedException.class, null);
+
+ assertNotDone(fut);
+
+ unblockMessage();
+
+ reconnectClientNode(cln, srv, null);
+
+ assertTrue((Boolean)fut.get(2, SECONDS));
+
+ QueryCursor<Cache.Entry<Integer, Person>> qryCursor2 = clnCache.query(scanQry);
+
+ assertEquals(setPart ? 1 : 3, qryCursor2.getAll().size());
+ }
+
+ /**
+ * @param clazz Message class.
+ */
+ private void blockMessage(Class<?> clazz) {
+ for (int i = 0; i < serverCount(); i++) {
+ BlockTpcCommunicationSpi commSpi = commSpi(grid(i));
+
+ commSpi.blockMessage(clazz);
+ }
+ }
+
+ /**
+ *
+ */
+ private void unblockMessage() {
+ for (int i = 0; i < serverCount(); i++) {
+ BlockTpcCommunicationSpi commSpi = commSpi(grid(i));
+
+ commSpi.unblockMessage();
+ }
+ }
+
+ /**
+ *
+ */
+ public static class Person {
+ /** */
+ @QuerySqlField
+ public int id;
+
+ /** */
+ @QuerySqlField
+ public String name;
+
+ /** */
+ @QuerySqlField
+ public String surname;
+
+ /**
+ * @param id Id.
+ * @param name Name.
+ * @param surname Surname.
+ */
+ public Person(int id, String name, String surname) {
+ this.id = id;
+ this.name = name;
+ this.surname = surname;
+ }
+
+ /**
+ * @return Id.
+ */
+ public int getId() {
+ return id;
+ }
+
+ /**
+ * @param id Set id.
+ */
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ /**
+ * @return Name.
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * @param name Name.
+ */
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * @return Surname.
+ */
+ public String getSurname() {
+ return surname;
+ }
+
+ /**
+ * @param surname Surname.
+ */
+ public void setSurname(String surname) {
+ this.surname = surname;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ return this == o || !(o == null || getClass() != o.getClass()) && id == ((Person)o).id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(Person.class, this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java
index 8ab2485..ca80acf 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/reducefields/GridCacheAbstractReduceFieldsQuerySelfTest.java
@@ -375,6 +375,7 @@ public abstract class GridCacheAbstractReduceFieldsQuerySelfTest extends GridCom
/** */
private int cnt;
+ /** {@inheritDoc} */
@Override public boolean collect(List<?> e) {
sum += (Integer)e.get(0);
@@ -383,6 +384,7 @@ public abstract class GridCacheAbstractReduceFieldsQuerySelfTest extends GridCom
return true;
}
+ /** {@inheritDoc} */
@Override public IgniteBiTuple<Integer, Integer> reduce() {
return F.t(sum, cnt);
}
@@ -398,6 +400,7 @@ public abstract class GridCacheAbstractReduceFieldsQuerySelfTest extends GridCom
/** */
private int cnt;
+ /** {@inheritDoc} */
@Override public boolean collect(IgniteBiTuple<Integer, Integer> t) {
sum += t.get1();
cnt += t.get2();
@@ -405,6 +408,7 @@ public abstract class GridCacheAbstractReduceFieldsQuerySelfTest extends GridCom
return true;
}
+ /** {@inheritDoc} */
@Override public Integer reduce() {
return cnt == 0 ? 0 : sum / cnt;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/57ac2b3b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
index 67ebda9..cc01540 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
@@ -52,6 +52,7 @@ public class IgniteCacheWithIndexingTestSuite extends TestSuite {
suite.addTestSuite(CacheConfigurationP2PTest.class);
suite.addTestSuite(IgniteCacheConfigurationPrimitiveTypesSelfTest.class);
+ suite.addTestSuite(IgniteClientReconnectQueriesTest.class);
return suite;
}