You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/08/31 04:01:17 UTC

[34/50] [abbrv] ignite git commit: Added failover tests for async operations.

Added failover tests for async operations.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/73ab5e2f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/73ab5e2f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/73ab5e2f

Branch: refs/heads/ignite-843
Commit: 73ab5e2f7bc121eaf496096f205547c026c91464
Parents: b132006
Author: sboikov <sb...@gridgain.com>
Authored: Thu Aug 27 15:07:49 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Aug 27 15:07:49 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |   4 +
 .../dht/atomic/GridDhtAtomicCache.java          |   4 +
 ...acheAsyncOperationsFailoverAbstractTest.java | 329 +++++++++++++++++++
 .../CacheAsyncOperationsFailoverAtomicTest.java |  32 ++
 .../CacheAsyncOperationsFailoverTxTest.java     |  32 ++
 .../IgniteCacheFailoverTestSuite2.java          |   4 +
 6 files changed, 405 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/73ab5e2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 54d33e0..c3bbbe4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -4135,6 +4135,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                 IgniteInternalFuture<T> f = new GridEmbeddedFuture(fut,
                     new IgniteOutClosure<IgniteInternalFuture>() {
                         @Override public IgniteInternalFuture<T> apply() {
+                            if (ctx.kernalContext().isStopping())
+                                return new GridFinishedFuture<>(
+                                    new IgniteCheckedException("Operation has been cancelled (node is stopping)."));
+
                             return op.op(tx0).chain(new CX1<IgniteInternalFuture<T>, T>() {
                                 @Override public T applyx(IgniteInternalFuture<T> tFut) throws IgniteCheckedException {
                                     try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/73ab5e2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 9087d20..4b8585e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -570,6 +570,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 IgniteInternalFuture<T> f = new GridEmbeddedFuture(fut,
                     new IgniteOutClosure<IgniteInternalFuture>() {
                         @Override public IgniteInternalFuture<T> apply() {
+                            if (ctx.kernalContext().isStopping())
+                                return new GridFinishedFuture<>(
+                                    new IgniteCheckedException("Operation has been cancelled (node is stopping)."));
+
                             return op.apply();
                         }
                     });

http://git-wip-us.apache.org/repos/asf/ignite/blob/73ab5e2f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAbstractTest.java
new file mode 100644
index 0000000..1669404
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAbstractTest.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.testframework.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public abstract class CacheAsyncOperationsFailoverAbstractTest extends GridCacheAbstractSelfTest {
+    /** */
+    private static final int NODE_CNT = 4;
+
+    /** */
+    private static final long TEST_TIME = 60_000;
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return NODE_CNT;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return TEST_TIME + 60_000;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
+        CacheConfiguration ccfg = super.cacheConfiguration(gridName);
+
+        ccfg.setCacheStoreFactory(null);
+        ccfg.setReadThrough(false);
+        ccfg.setWriteThrough(false);
+
+        return ccfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected NearCacheConfiguration nearConfiguration() {
+        return null;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllAsyncFailover() throws Exception {
+        putAllAsyncFailover(5, 10);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllAsyncFailoverManyThreads() throws Exception {
+        putAllAsyncFailover(ignite(0).configuration().getSystemThreadPoolSize() * 2, 3);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAsyncFailover() throws Exception {
+        for (int i = 0; i < 3; i++) {
+            log.info("Iteration: " + i);
+
+            startGrid(NODE_CNT);
+
+            final IgniteCache<TestKey, TestValue> cache = ignite(0).<TestKey, TestValue>cache(null).withAsync();
+
+            int ops = cache.getConfiguration(CacheConfiguration.class).getMaxConcurrentAsyncOperations();
+
+            log.info("Max concurrent async operations: " + ops);
+
+            assertTrue(ops > 0);
+
+            final List<IgniteFuture<?>> futs = Collections.synchronizedList(new ArrayList<IgniteFuture<?>>(ops));
+
+            final AtomicInteger left = new AtomicInteger(ops);
+
+            GridTestUtils.runMultiThreaded(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    List<IgniteFuture<?>> futs0 = new ArrayList<>();
+
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    while (left.getAndDecrement() > 0) {
+                        TreeMap<TestKey, TestValue> map = new TreeMap<>();
+
+                        int keys = 50;
+
+                        for (int k = 0; k < keys; k++)
+                            map.put(new TestKey(rnd.nextInt(10_000)), new TestValue(k));
+
+                        cache.putAll(map);
+
+                        IgniteFuture<?> fut = cache.future();
+
+                        assertNotNull(fut);
+
+                        futs0.add(fut);
+                    }
+
+                    futs.addAll(futs0);
+
+                    return null;
+                }
+            }, 10, "put-thread");
+
+            stopGrid(NODE_CNT);
+
+            assertEquals(ops, futs.size());
+
+            for (IgniteFuture<?> fut : futs)
+                fut.get();
+        }
+    }
+
+    /**
+     * @param threads Number of threads.
+     * @param opsPerThread Number of concurrent async operations per thread.
+     * @throws Exception If failed.
+     */
+    private void putAllAsyncFailover(final int threads, final int opsPerThread) throws Exception {
+        log.info("Start test [threads=" + threads + ", opsPerThread=" + opsPerThread + ']');
+
+        final AtomicBoolean finished = new AtomicBoolean();
+
+        final long endTime = System.currentTimeMillis() + TEST_TIME;
+
+        IgniteInternalFuture<Object> restartFut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                Thread.currentThread().setName("restart-thread");
+
+                while (!finished.get() && System.currentTimeMillis() < endTime) {
+                    startGrid(NODE_CNT);
+
+                    U.sleep(500);
+
+                    stopGrid(NODE_CNT);
+                }
+
+                return null;
+            }
+        });
+
+        try {
+            final IgniteCache<TestKey, TestValue> cache = ignite(0).<TestKey, TestValue>cache(null).withAsync();
+
+            GridTestUtils.runMultiThreaded(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    int iter = 0;
+
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    long time;
+
+                    long lastInfo = 0;
+
+                    while ((time = System.currentTimeMillis()) < endTime) {
+                        if (time - lastInfo > 5000)
+                            log.info("Starting operations [iter=" + iter + ']');
+
+                        List<IgniteFuture<?>> futs = new ArrayList<>(opsPerThread);
+
+                        for (int i = 0; i < opsPerThread; i++) {
+                            TreeMap<TestKey, TestValue> map = new TreeMap<>();
+
+                            int keys = rnd.nextInt(1, 50);
+
+                            for (int k = 0; k < keys; k++)
+                                map.put(new TestKey(rnd.nextInt(10_000)), new TestValue(iter));
+
+                            cache.putAll(map);
+
+                            IgniteFuture<?> fut = cache.future();
+
+                            assertNotNull(fut);
+
+                            futs.add(fut);
+                        }
+
+                        if (time - lastInfo > 5000) {
+                            log.info("Waiting for futures [iter=" + iter + ']');
+
+                            lastInfo = time;
+                        }
+
+                        for (IgniteFuture<?> fut : futs)
+                            fut.get();
+
+                        iter++;
+                    }
+
+                    return null;
+                }
+            }, threads, "update-thread");
+
+            finished.set(true);
+
+            restartFut.get();
+        }
+        finally {
+            finished.set(true);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestKey implements Serializable, Comparable<TestKey> {
+        /** */
+        private long key;
+
+        /**
+         * @param key Key.
+         */
+        public TestKey(long key) {
+            this.key = key;
+        }
+
+        /**
+         * @return Key.
+         */
+        public long key() {
+            return key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int compareTo(@NotNull TestKey other) {
+            return ((Long)key).compareTo(other.key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestKey other = (TestKey)o;
+
+            return key == other.key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return (int)(key ^ (key >>> 32));
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(TestKey.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestValue implements Serializable {
+        /** */
+        private long val;
+
+        /**
+         * @param val Value.
+         */
+        public TestValue(long val) {
+            this.val = val;
+        }
+
+        /**
+         * @return Value.
+         */
+        public long value() {
+            return val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestValue other = (TestValue)o;
+
+            return val == other.val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(TestValue.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/73ab5e2f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAtomicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAtomicTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAtomicTest.java
new file mode 100644
index 0000000..6e01a4a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAtomicTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.cache.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+
+/**
+ *
+ */
+public class CacheAsyncOperationsFailoverAtomicTest extends CacheAsyncOperationsFailoverAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/73ab5e2f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverTxTest.java
new file mode 100644
index 0000000..ba3ad7a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverTxTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.cache.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+
+/**
+ *
+ */
+public class CacheAsyncOperationsFailoverTxTest extends CacheAsyncOperationsFailoverAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return TRANSACTIONAL;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/73ab5e2f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java
index f3fac23..ba510f3 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
 
 import junit.framework.*;
 import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
@@ -48,6 +49,9 @@ public class IgniteCacheFailoverTestSuite2 {
 
         suite.addTestSuite(IgniteCacheCrossCacheTxFailoverTest.class);
 
+        suite.addTestSuite(CacheAsyncOperationsFailoverAtomicTest.class);
+        suite.addTestSuite(CacheAsyncOperationsFailoverTxTest.class);
+
         return suite;
     }
 }