You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/19 09:55:14 UTC
[37/50] incubator-ignite git commit: ignite-484-1 - replicated client
cache test added
ignite-484-1 - replicated client cache test added
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/10febf28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/10febf28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/10febf28
Branch: refs/heads/ignite-yardstick-client
Commit: 10febf28fdf3966ffcb369c5725792b604be6c18
Parents: 1fe215e
Author: S.Vladykin <sv...@gridgain.com>
Authored: Wed Jun 17 17:11:00 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Wed Jun 17 17:11:00 2015 +0300
----------------------------------------------------------------------
...lientQueryReplicatedNodeRestartSelfTest.java | 381 +++++++++++++++++++
.../IgniteCacheQuerySelfTestSuite.java | 1 +
2 files changed, 382 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/10febf28/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java
new file mode 100644
index 0000000..23f44c0
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheClientQueryReplicatedNodeRestartSelfTest.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cache.affinity.rendezvous.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.cache.query.annotations.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import javax.cache.*;
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheRebalanceMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
+/**
+ * Test for distributed queries with replicated client cache and node restarts.
+ */
+public class IgniteCacheClientQueryReplicatedNodeRestartSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static final String QRY = "select co.id, count(*) cnt\n" +
+ "from \"pe\".Person pe, \"pr\".Product pr, \"co\".Company co, \"pu\".Purchase pu\n" +
+ "where pe.id = pu.personId and pu.productId = pr.id and pr.companyId = co.id \n" +
+ "group by co.id order by cnt desc, co.id";
+
+ /** */
+ private static final P1<ClusterNode> DATA_NODES_FILTER = new P1<ClusterNode>() {
+ @Override public boolean apply(ClusterNode clusterNode) {
+ String gridName = clusterNode.attribute(IgniteNodeAttributes.ATTR_GRID_NAME);
+
+ return !gridName.endsWith(String.valueOf(GRID_CNT - 1)); // The last one is client only.
+ }
+ };
+
+ /** */
+ private static final int GRID_CNT = 5;
+
+ /** */
+ private static final int PERS_CNT = 600;
+
+ /** */
+ private static final int PURCHASE_CNT = 6000;
+
+ /** */
+ private static final int COMPANY_CNT = 25;
+
+ /** */
+ private static final int PRODUCT_CNT = 100;
+
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ X.println("grid name: " + gridName);
+
+ IgniteConfiguration c = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(ipFinder);
+
+ c.setDiscoverySpi(disco);
+
+ int i = 0;
+
+ CacheConfiguration<?, ?>[] ccs = new CacheConfiguration[4];
+
+ for (String name : F.asList("co", "pr", "pe", "pu")) {
+ CacheConfiguration<?, ?> cc = defaultCacheConfiguration();
+
+ cc.setNodeFilter(DATA_NODES_FILTER);
+ cc.setName(name);
+ cc.setCacheMode(REPLICATED);
+ cc.setWriteSynchronizationMode(FULL_SYNC);
+ cc.setAtomicityMode(TRANSACTIONAL);
+ cc.setRebalanceMode(SYNC);
+ cc.setAffinity(new RendezvousAffinityFunction(false, 50));
+
+ switch (name) {
+ case "co":
+ cc.setIndexedTypes(
+ Integer.class, Company.class
+ );
+
+ break;
+
+ case "pr":
+ cc.setIndexedTypes(
+ Integer.class, Product.class
+ );
+
+ break;
+
+ case "pe":
+ cc.setIndexedTypes(
+ Integer.class, Person.class
+ );
+
+ break;
+
+ case "pu":
+ cc.setIndexedTypes(
+ AffinityKey.class, Purchase.class
+ );
+
+ break;
+ }
+
+ ccs[i++] = cc;
+ }
+
+ c.setCacheConfiguration(ccs);
+
+ return c;
+ }
+
+ /**
+ *
+ */
+ private void fillCaches() {
+ IgniteCache<Integer, Company> co = grid(0).cache("co");
+
+ for (int i = 0; i < COMPANY_CNT; i++)
+ co.put(i, new Company(i));
+
+ IgniteCache<Integer, Product> pr = grid(0).cache("pr");
+
+ Random rnd = new GridRandom();
+
+ for (int i = 0; i < PRODUCT_CNT; i++)
+ pr.put(i, new Product(i, rnd.nextInt(COMPANY_CNT)));
+
+ IgniteCache<Integer, Person> pe = grid(0).cache("pe");
+
+ for (int i = 0; i < PERS_CNT; i++)
+ pe.put(i, new Person(i));
+
+ IgniteCache<AffinityKey<Integer>, Purchase> pu = grid(0).cache("pu");
+
+ for (int i = 0; i < PURCHASE_CNT; i++) {
+ int persId = rnd.nextInt(PERS_CNT);
+ int prodId = rnd.nextInt(PRODUCT_CNT);
+
+ pu.put(new AffinityKey<>(i, persId), new Purchase(persId, prodId));
+ }
+ }
+
+ /**
+ * @param c Cache.
+ * @param client If it must be a client cache.
+ */
+ private void assertClient(IgniteCache<?,?> c, boolean client) {
+ assertTrue(((IgniteCacheProxy)c).context().affinityNode() == !client);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRestarts() throws Exception {
+ int duration = 90 * 1000;
+ int qryThreadNum = 5;
+ int restartThreadsNum = 2; // 2 of 4 data nodes
+ final int nodeLifeTime = 2 * 1000;
+ final int logFreq = 10;
+
+ startGridsMultiThreaded(GRID_CNT);
+
+ final AtomicIntegerArray locks = new AtomicIntegerArray(GRID_CNT - 1); // The last is client only.
+
+ fillCaches();
+
+ final List<List<?>> pRes = grid(0).cache("pu").query(new SqlFieldsQuery(QRY)).getAll();
+
+ Thread.sleep(3000);
+
+ assertEquals(pRes, grid(0).cache("pu").query(new SqlFieldsQuery(QRY)).getAll());
+
+ assertFalse(pRes.isEmpty());
+
+ final AtomicInteger qryCnt = new AtomicInteger();
+ final AtomicBoolean qrysDone = new AtomicBoolean();
+
+ for (int i = 0; i < GRID_CNT - 1; i++) {
+ for (String cacheName : F.asList("co", "pr", "pe", "pu"))
+ assertClient(grid(i).cache(cacheName), false);
+ }
+
+ for (String cacheName : F.asList("co", "pr", "pe", "pu"))
+ assertClient(grid(GRID_CNT - 1).cache(cacheName), true);
+
+ final IgniteCache<?,?> clientCache = grid(GRID_CNT - 1).cache("pu");
+
+ IgniteInternalFuture<?> fut1 = multithreadedAsync(new CAX() {
+ @Override public void applyx() throws IgniteCheckedException {
+ GridRandom rnd = new GridRandom();
+
+ while (!qrysDone.get()) {
+ SqlFieldsQuery qry = new SqlFieldsQuery(QRY);
+
+ boolean smallPageSize = rnd.nextBoolean();
+
+ if (smallPageSize)
+ qry.setPageSize(3);
+
+ try {
+ assertEquals(pRes, clientCache.query(qry).getAll());
+ }
+ catch (CacheException e) {
+ assertTrue("On large page size must retry.", smallPageSize);
+
+ boolean failedOnRemoteFetch = false;
+
+ for (Throwable th = e; th != null; th = th.getCause()) {
+ if (!(th instanceof CacheException))
+ continue;
+
+ if (th.getMessage() != null &&
+ th.getMessage().startsWith("Failed to fetch data from node:")) {
+ failedOnRemoteFetch = true;
+
+ break;
+ }
+ }
+
+ if (!failedOnRemoteFetch) {
+ e.printStackTrace();
+
+ fail("Must fail inside of GridResultPage.fetchNextPage or subclass.");
+ }
+ }
+
+ int c = qryCnt.incrementAndGet();
+
+ if (c % logFreq == 0)
+ info("Executed queries: " + c);
+ }
+ }
+ }, qryThreadNum);
+
+ final AtomicInteger restartCnt = new AtomicInteger();
+
+ final AtomicBoolean restartsDone = new AtomicBoolean();
+
+ IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Object>() {
+ @SuppressWarnings({"BusyWait"})
+ @Override public Object call() throws Exception {
+ GridRandom rnd = new GridRandom();
+
+ while (!restartsDone.get()) {
+ int g;
+
+ do {
+ g = rnd.nextInt(locks.length());
+ }
+ while (!locks.compareAndSet(g, 0, -1));
+
+ stopGrid(g);
+
+ Thread.sleep(rnd.nextInt(nodeLifeTime));
+
+ startGrid(g);
+
+ Thread.sleep(rnd.nextInt(nodeLifeTime));
+
+ locks.set(g, 0);
+
+ int c = restartCnt.incrementAndGet();
+
+ if (c % logFreq == 0)
+ info("Node restarts: " + c);
+ }
+
+ return true;
+ }
+ }, restartThreadsNum);
+
+ Thread.sleep(duration);
+
+ info("Stopping..");
+
+ restartsDone.set(true);
+
+ fut2.get();
+
+ info("Restarts stopped.");
+
+ qrysDone.set(true);
+
+ fut1.get();
+
+ info("Queries stopped.");
+ }
+
+ /**
+ *
+ */
+ private static class Person implements Serializable {
+ @QuerySqlField(index = true)
+ int id;
+
+ Person(int id) {
+ this.id = id;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class Purchase implements Serializable {
+ @QuerySqlField(index = true)
+ int personId;
+
+ @QuerySqlField(index = true)
+ int productId;
+
+ Purchase(int personId, int productId) {
+ this.personId = personId;
+ this.productId = productId;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class Company implements Serializable {
+ @QuerySqlField(index = true)
+ int id;
+
+ Company(int id) {
+ this.id = id;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class Product implements Serializable {
+ @QuerySqlField(index = true)
+ int id;
+
+ @QuerySqlField(index = true)
+ int companyId;
+
+ Product(int id, int companyId) {
+ this.id = id;
+ this.companyId = companyId;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/10febf28/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index c5a2f15..dee3078 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -67,6 +67,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(IgniteCacheOffheapTieredMultithreadedSelfTest.class);
suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class);
suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class);
+ suite.addTestSuite(IgniteCacheClientQueryReplicatedNodeRestartSelfTest.class);
suite.addTestSuite(GridCacheReduceQueryMultithreadedSelfTest.class);
suite.addTestSuite(GridCacheCrossCacheQuerySelfTest.class);
suite.addTestSuite(GridCacheQuerySerializationSelfTest.class);