You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/06/09 08:48:33 UTC

[48/50] incubator-ignite git commit: ignite-484-1 - test

ignite-484-1 - test


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e197640b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e197640b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e197640b

Branch: refs/heads/ignite-484-1
Commit: e197640bbc9dcef4cf872552566fce6d6dbbd5f3
Parents: ae3279a
Author: S.Vladykin <sv...@gridgain.com>
Authored: Tue Jun 9 09:26:55 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Tue Jun 9 09:26:55 2015 +0300

----------------------------------------------------------------------
 .../IgniteCacheQueryNodeRestartSelfTest2.java   | 331 +++++++++++++++++++
 1 file changed, 331 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e197640b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
new file mode 100644
index 0000000..1f0a6e6
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cache.affinity.rendezvous.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.cache.query.annotations.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ * Test for distributed queries with node restarts.
+ */
+public class IgniteCacheQueryNodeRestartSelfTest2 extends GridCommonAbstractTest {
+    /** */
+    private static final String PARTITIONED_QRY = "select co.id, count(*) cnt\n" +
+        "from \"pe\".Person pe, \"pr\".Product pr, \"co\".Company co, \"pu\".Purchase pu\n" +
+        "where pe.id = pu.personId and pu.productId = pr.id and pr.companyId = co.id \n" +
+        "group by co.id order by cnt desc, co.id";
+
+    /** */
+    private static final String REPLICATED_QRY = "select pr.id, co.id\n" +
+        "from \"pr\".Product pr, \"co\".Company co\n" +
+        "where pr.companyId = co.id\n" +
+        "order by co.id, pr.id ";
+
+    /** */
+    private static final int GRID_CNT = 6;
+
+    /** */
+    private static final int PERS_CNT = 600;
+
+    /** */
+    private static final int PURCHASE_CNT = 6000;
+
+    /** */
+    private static final int COMPANY_CNT = 25;
+
+    /** */
+    private static final int PRODUCT_CNT = 100;
+
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(ipFinder);
+
+        c.setDiscoverySpi(disco);
+
+        int i = 0;
+
+        CacheConfiguration<?, ?>[] ccs = new CacheConfiguration[4];
+
+        for (String name : F.asList("pe", "pu")) {
+            CacheConfiguration<?, ?> cc = defaultCacheConfiguration();
+
+            cc.setName(name);
+            cc.setCacheMode(PARTITIONED);
+            cc.setBackups(2);
+            cc.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+            cc.setAtomicityMode(TRANSACTIONAL);
+            cc.setRebalanceMode(CacheRebalanceMode.SYNC);
+            cc.setAffinity(new RendezvousAffinityFunction(false, name.equals("pe") ? 50 : 60));
+
+            if (name.equals("pe")) {
+                cc.setIndexedTypes(
+                    Integer.class, Person.class
+                );
+            }
+            else if (name.equals("pu")) {
+                cc.setIndexedTypes(
+                    AffinityKey.class, Purchase.class
+                );
+            }
+
+            ccs[i++] = cc;
+        }
+
+        for (String name : F.asList("co", "pr")) {
+            CacheConfiguration<?, ?> cc = defaultCacheConfiguration();
+
+            cc.setName(name);
+            cc.setCacheMode(REPLICATED);
+            cc.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+            cc.setAtomicityMode(TRANSACTIONAL);
+            cc.setRebalanceMode(CacheRebalanceMode.SYNC);
+            cc.setAffinity(new RendezvousAffinityFunction(false, 50));
+
+            if (name.equals("co")) {
+                cc.setIndexedTypes(
+                    Integer.class, Company.class
+                );
+            }
+            else if (name.equals("pr")) {
+                cc.setIndexedTypes(
+                    Integer.class, Product.class
+                );
+            }
+
+            ccs[i++] = cc;
+        }
+
+        c.setCacheConfiguration(ccs);
+
+        return c;
+    }
+
+    private void fillCaches() {
+        IgniteCache<Integer, Company> co = grid(0).cache("co");
+
+        for (int i = 0; i < COMPANY_CNT; i++)
+            co.put(i, new Company(i));
+
+        IgniteCache<Integer, Product> pr = grid(0).cache("pr");
+
+        Random rnd = new GridRandom();
+
+        for (int i = 0; i < PRODUCT_CNT; i++)
+            pr.put(i, new Product(i, rnd.nextInt(COMPANY_CNT)));
+
+        IgniteCache<Integer, Person> pe = grid(0).cache("pe");
+
+        for (int i = 0; i < PERS_CNT; i++)
+            pe.put(i, new Person(i));
+
+        IgniteCache<AffinityKey<Integer>, Purchase> pu = grid(0).cache("pu");
+
+        for (int i = 0; i < PURCHASE_CNT; i++) {
+            int persId = rnd.nextInt(PERS_CNT);
+            int prodId = rnd.nextInt(PRODUCT_CNT);
+
+            pu.put(new AffinityKey<>(i, persId), new Purchase(persId, prodId));
+        }
+    }
+
+    /**
+     * JUnit.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRestarts() throws Exception {
+        int duration = 150 * 1000;
+        int qryThreadNum = 4;
+        int restartThreadsNum = 1; // 4 + 2 = 6 nodes
+        final int nodeLifeTime = 2 * 1000;
+        final int logFreq = 50;
+
+        startGridsMultiThreaded(GRID_CNT);
+
+        final AtomicIntegerArray locks = new AtomicIntegerArray(GRID_CNT);
+
+        fillCaches();
+
+        final List<List<?>> pRes = grid(0).cache("pu").query(new SqlFieldsQuery(PARTITIONED_QRY)).getAll();
+
+        Thread.sleep(3000);
+
+        assertEquals(pRes, grid(0).cache("pu").query(new SqlFieldsQuery(PARTITIONED_QRY)).getAll());
+
+        final List<List<?>> rRes = grid(0).cache("co").query(new SqlFieldsQuery(REPLICATED_QRY)).getAll();
+
+        final AtomicInteger qryCnt = new AtomicInteger();
+
+        final AtomicBoolean done = new AtomicBoolean();
+
+        IgniteInternalFuture<?> fut1 = multithreadedAsync(new CAX() {
+            @Override public void applyx() throws IgniteCheckedException {
+                GridRandom rnd = new GridRandom();
+
+                while (!done.get()) {
+                    int g;
+
+                    do {
+                        g = rnd.nextInt(locks.length());
+                    }
+                    while (!locks.compareAndSet(g, 0, 1));
+
+//                    if (rnd.nextBoolean()) { // Partitioned query.
+                        IgniteCache<?,?> cache = grid(g).cache("pu");
+
+                        assertEquals(pRes, cache.query(new SqlFieldsQuery(PARTITIONED_QRY)).getAll());
+//                    }
+//                    else { // Replicated query.
+//                        IgniteCache<?,?> cache = grid(g).cache("co");
+//
+//                        assertEquals(rRes, cache.query(new SqlFieldsQuery(REPLICATED_QRY)).getAll());
+//                    }
+
+                    locks.set(g, 0);
+
+                    int c = qryCnt.incrementAndGet();
+
+                    if (c % logFreq == 0)
+                        info("Executed queries: " + c);
+                }
+            }
+        }, qryThreadNum);
+
+        final AtomicInteger restartCnt = new AtomicInteger();
+
+        IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Object>() {
+            @SuppressWarnings({"BusyWait"})
+            @Override public Object call() throws Exception {
+                GridRandom rnd = new GridRandom();
+
+                while (!done.get()) {
+                    int g;
+
+                    do {
+                        g = rnd.nextInt(locks.length());
+                    }
+                    while (!locks.compareAndSet(g, 0, -1));
+
+                    stopGrid(g);
+
+                    Thread.sleep(rnd.nextInt(nodeLifeTime));
+
+                    startGrid(g);
+
+                    Thread.sleep(rnd.nextInt(nodeLifeTime));
+
+                    locks.set(g, 0);
+
+                    int c = restartCnt.incrementAndGet();
+
+                    if (c % logFreq == 0)
+                        info("Node restarts: " + c);
+                }
+
+                return true;
+            }
+        }, restartThreadsNum);
+
+        Thread.sleep(duration);
+
+        info("Stopping..");
+
+        done.set(true);
+
+        fut2.get();
+
+        info("Restarts stopped.");
+
+        fut1.get();
+
+        info("Queries stopped.");
+    }
+
+    // Partitioned
+    private static class Person implements Serializable {
+        @QuerySqlField(index = true)
+        int id;
+
+        Person(int id) {
+            this.id = id;
+        }
+    }
+
+    private static class Purchase implements Serializable {
+        @QuerySqlField(index = true)
+        int personId;
+
+        @QuerySqlField(index = true)
+        int productId;
+
+        Purchase(int personId, int productId) {
+            this.personId = personId;
+            this.productId = productId;
+        }
+    }
+
+    // Replicated
+    private static class Company implements Serializable {
+        @QuerySqlField(index = true)
+        int id;
+
+        Company(int id) {
+            this.id = id;
+        }
+    }
+
+    private static class Product implements Serializable {
+        @QuerySqlField(index = true)
+        int id;
+
+        @QuerySqlField(index = true)
+        int companyId;
+
+        Product(int id, int companyId) {
+            this.id = id;
+            this.companyId = companyId;
+        }
+    }
+}