You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/08/31 04:01:17 UTC
[34/50] [abbrv] ignite git commit: Added failover tests for async
operations.
Added failover tests for async operations.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/73ab5e2f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/73ab5e2f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/73ab5e2f
Branch: refs/heads/ignite-843
Commit: 73ab5e2f7bc121eaf496096f205547c026c91464
Parents: b132006
Author: sboikov <sb...@gridgain.com>
Authored: Thu Aug 27 15:07:49 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Aug 27 15:07:49 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 4 +
.../dht/atomic/GridDhtAtomicCache.java | 4 +
...acheAsyncOperationsFailoverAbstractTest.java | 329 +++++++++++++++++++
.../CacheAsyncOperationsFailoverAtomicTest.java | 32 ++
.../CacheAsyncOperationsFailoverTxTest.java | 32 ++
.../IgniteCacheFailoverTestSuite2.java | 4 +
6 files changed, 405 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/73ab5e2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 54d33e0..c3bbbe4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -4135,6 +4135,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteInternalFuture<T> f = new GridEmbeddedFuture(fut,
new IgniteOutClosure<IgniteInternalFuture>() {
@Override public IgniteInternalFuture<T> apply() {
+ if (ctx.kernalContext().isStopping())
+ return new GridFinishedFuture<>(
+ new IgniteCheckedException("Operation has been cancelled (node is stopping)."));
+
return op.op(tx0).chain(new CX1<IgniteInternalFuture<T>, T>() {
@Override public T applyx(IgniteInternalFuture<T> tFut) throws IgniteCheckedException {
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/73ab5e2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 9087d20..4b8585e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -570,6 +570,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
IgniteInternalFuture<T> f = new GridEmbeddedFuture(fut,
new IgniteOutClosure<IgniteInternalFuture>() {
@Override public IgniteInternalFuture<T> apply() {
+ if (ctx.kernalContext().isStopping())
+ return new GridFinishedFuture<>(
+ new IgniteCheckedException("Operation has been cancelled (node is stopping)."));
+
return op.apply();
}
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/73ab5e2f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAbstractTest.java
new file mode 100644
index 0000000..1669404
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAbstractTest.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.testframework.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public abstract class CacheAsyncOperationsFailoverAbstractTest extends GridCacheAbstractSelfTest {
+ /** */
+ private static final int NODE_CNT = 4;
+
+ /** */
+ private static final long TEST_TIME = 60_000;
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return NODE_CNT;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return TEST_TIME + 60_000;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
+ CacheConfiguration ccfg = super.cacheConfiguration(gridName);
+
+ ccfg.setCacheStoreFactory(null);
+ ccfg.setReadThrough(false);
+ ccfg.setWriteThrough(false);
+
+ return ccfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected NearCacheConfiguration nearConfiguration() {
+ return null;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutAllAsyncFailover() throws Exception {
+ putAllAsyncFailover(5, 10);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutAllAsyncFailoverManyThreads() throws Exception {
+ putAllAsyncFailover(ignite(0).configuration().getSystemThreadPoolSize() * 2, 3);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAsyncFailover() throws Exception {
+ for (int i = 0; i < 3; i++) {
+ log.info("Iteration: " + i);
+
+ startGrid(NODE_CNT);
+
+ final IgniteCache<TestKey, TestValue> cache = ignite(0).<TestKey, TestValue>cache(null).withAsync();
+
+ int ops = cache.getConfiguration(CacheConfiguration.class).getMaxConcurrentAsyncOperations();
+
+ log.info("Max concurrent async operations: " + ops);
+
+ assertTrue(ops > 0);
+
+ final List<IgniteFuture<?>> futs = Collections.synchronizedList(new ArrayList<IgniteFuture<?>>(ops));
+
+ final AtomicInteger left = new AtomicInteger(ops);
+
+ GridTestUtils.runMultiThreaded(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ List<IgniteFuture<?>> futs0 = new ArrayList<>();
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ while (left.getAndDecrement() > 0) {
+ TreeMap<TestKey, TestValue> map = new TreeMap<>();
+
+ int keys = 50;
+
+ for (int k = 0; k < keys; k++)
+ map.put(new TestKey(rnd.nextInt(10_000)), new TestValue(k));
+
+ cache.putAll(map);
+
+ IgniteFuture<?> fut = cache.future();
+
+ assertNotNull(fut);
+
+ futs0.add(fut);
+ }
+
+ futs.addAll(futs0);
+
+ return null;
+ }
+ }, 10, "put-thread");
+
+ stopGrid(NODE_CNT);
+
+ assertEquals(ops, futs.size());
+
+ for (IgniteFuture<?> fut : futs)
+ fut.get();
+ }
+ }
+
+ /**
+ * @param threads Number of threads.
+ * @param opsPerThread Number of concurrent async operations per thread.
+ * @throws Exception If failed.
+ */
+ private void putAllAsyncFailover(final int threads, final int opsPerThread) throws Exception {
+ log.info("Start test [threads=" + threads + ", opsPerThread=" + opsPerThread + ']');
+
+ final AtomicBoolean finished = new AtomicBoolean();
+
+ final long endTime = System.currentTimeMillis() + TEST_TIME;
+
+ IgniteInternalFuture<Object> restartFut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ Thread.currentThread().setName("restart-thread");
+
+ while (!finished.get() && System.currentTimeMillis() < endTime) {
+ startGrid(NODE_CNT);
+
+ U.sleep(500);
+
+ stopGrid(NODE_CNT);
+ }
+
+ return null;
+ }
+ });
+
+ try {
+ final IgniteCache<TestKey, TestValue> cache = ignite(0).<TestKey, TestValue>cache(null).withAsync();
+
+ GridTestUtils.runMultiThreaded(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ int iter = 0;
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ long time;
+
+ long lastInfo = 0;
+
+ while ((time = System.currentTimeMillis()) < endTime) {
+ if (time - lastInfo > 5000)
+ log.info("Starting operations [iter=" + iter + ']');
+
+ List<IgniteFuture<?>> futs = new ArrayList<>(opsPerThread);
+
+ for (int i = 0; i < opsPerThread; i++) {
+ TreeMap<TestKey, TestValue> map = new TreeMap<>();
+
+ int keys = rnd.nextInt(1, 50);
+
+ for (int k = 0; k < keys; k++)
+ map.put(new TestKey(rnd.nextInt(10_000)), new TestValue(iter));
+
+ cache.putAll(map);
+
+ IgniteFuture<?> fut = cache.future();
+
+ assertNotNull(fut);
+
+ futs.add(fut);
+ }
+
+ if (time - lastInfo > 5000) {
+ log.info("Waiting for futures [iter=" + iter + ']');
+
+ lastInfo = time;
+ }
+
+ for (IgniteFuture<?> fut : futs)
+ fut.get();
+
+ iter++;
+ }
+
+ return null;
+ }
+ }, threads, "update-thread");
+
+ finished.set(true);
+
+ restartFut.get();
+ }
+ finally {
+ finished.set(true);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestKey implements Serializable, Comparable<TestKey> {
+ /** */
+ private long key;
+
+ /**
+ * @param key Key.
+ */
+ public TestKey(long key) {
+ this.key = key;
+ }
+
+ /**
+ * @return Key.
+ */
+ public long key() {
+ return key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compareTo(@NotNull TestKey other) {
+ return ((Long)key).compareTo(other.key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ TestKey other = (TestKey)o;
+
+ return key == other.key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return (int)(key ^ (key >>> 32));
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TestKey.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestValue implements Serializable {
+ /** */
+ private long val;
+
+ /**
+ * @param val Value.
+ */
+ public TestValue(long val) {
+ this.val = val;
+ }
+
+ /**
+ * @return Value.
+ */
+ public long value() {
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ TestValue other = (TestValue)o;
+
+ return val == other.val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TestValue.class, this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/73ab5e2f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAtomicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAtomicTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAtomicTest.java
new file mode 100644
index 0000000..6e01a4a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAtomicTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.cache.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+
+/**
+ *
+ */
+public class CacheAsyncOperationsFailoverAtomicTest extends CacheAsyncOperationsFailoverAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return ATOMIC;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/73ab5e2f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverTxTest.java
new file mode 100644
index 0000000..ba3ad7a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverTxTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.cache.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+
+/**
+ *
+ */
+public class CacheAsyncOperationsFailoverTxTest extends CacheAsyncOperationsFailoverAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return TRANSACTIONAL;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/73ab5e2f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java
index f3fac23..ba510f3 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
import junit.framework.*;
import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*;
@@ -48,6 +49,9 @@ public class IgniteCacheFailoverTestSuite2 {
suite.addTestSuite(IgniteCacheCrossCacheTxFailoverTest.class);
+ suite.addTestSuite(CacheAsyncOperationsFailoverAtomicTest.class);
+ suite.addTestSuite(CacheAsyncOperationsFailoverTxTest.class);
+
return suite;
}
}