You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/27 14:19:50 UTC
[14/27] ignite git commit: Fixed GridEmbeddedFuture used for async
cache operations, added test.
Fixed GridEmbeddedFuture used for async cache operations, added test.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e567f8cb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e567f8cb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e567f8cb
Branch: refs/heads/ignite-1124
Commit: e567f8cb3dd88c3c0a253c4fde06f8ced9b97fde
Parents: c8bc1f9
Author: sboikov <sb...@gridgain.com>
Authored: Thu Aug 27 12:39:52 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Aug 27 12:41:05 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 6 +-
.../dht/atomic/GridDhtAtomicCache.java | 6 +-
.../util/future/GridEmbeddedFuture.java | 55 +++-
.../distributed/CacheAsyncOperationsTest.java | 280 +++++++++++++++++++
.../testsuites/IgniteCacheTestSuite3.java | 2 +
5 files changed, 342 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e567f8cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 7adea2b..54d33e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -4132,9 +4132,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
final IgniteTxLocalAdapter tx0 = tx;
if (fut != null && !fut.isDone()) {
- IgniteInternalFuture<T> f = new GridEmbeddedFuture<>(fut,
- new C2<T, Exception, IgniteInternalFuture<T>>() {
- @Override public IgniteInternalFuture<T> apply(T t, Exception e) {
+ IgniteInternalFuture<T> f = new GridEmbeddedFuture(fut,
+ new IgniteOutClosure<IgniteInternalFuture>() {
+ @Override public IgniteInternalFuture<T> apply() {
return op.op(tx0).chain(new CX1<IgniteInternalFuture<T>, T>() {
@Override public T applyx(IgniteInternalFuture<T> tFut) throws IgniteCheckedException {
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/e567f8cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index c44b028..9087d20 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -567,9 +567,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
IgniteInternalFuture fut = holder.future();
if (fut != null && !fut.isDone()) {
- IgniteInternalFuture<T> f = new GridEmbeddedFuture<>(fut,
- new C2<T, Exception, IgniteInternalFuture<T>>() {
- @Override public IgniteInternalFuture<T> apply(T t, Exception e) {
+ IgniteInternalFuture<T> f = new GridEmbeddedFuture(fut,
+ new IgniteOutClosure<IgniteInternalFuture>() {
+ @Override public IgniteInternalFuture<T> apply() {
return op.apply();
}
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/e567f8cb/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java
index 4475fae..11b28b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java
@@ -68,7 +68,8 @@ public class GridEmbeddedFuture<A, B> extends GridFutureAdapter<A> {
/**
* Embeds futures. Specific change order of arguments to avoid conflicts.
- * @param embedded Closure.
+ *
+ * @param embedded Embedded future.
* @param c Closure which runs upon completion of embedded closure and which returns another future.
*/
public GridEmbeddedFuture(
@@ -200,6 +201,58 @@ public class GridEmbeddedFuture<A, B> extends GridFutureAdapter<A> {
});
}
+ /**
+ * @param embedded Embedded future.
+ * @param c Closure to create next future.
+ */
+ public GridEmbeddedFuture(
+ IgniteInternalFuture<B> embedded,
+ final IgniteOutClosure<IgniteInternalFuture<A>> c
+ ) {
+ assert embedded != null;
+ assert c != null;
+
+ this.embedded = embedded;
+
+ embedded.listen(new AL1() {
+ @Override public void applyx(IgniteInternalFuture<B> embedded) {
+ try {
+ IgniteInternalFuture<A> next = c.apply();
+
+ if (next == null) {
+ onDone();
+
+ return;
+ }
+
+ next.listen(new AL2() {
+ @Override public void applyx(IgniteInternalFuture<A> next) {
+ try {
+ onDone(next.get());
+ }
+ catch (GridClosureException e) {
+ onDone(e.unwrap());
+ }
+ catch (IgniteCheckedException | RuntimeException e) {
+ onDone(e);
+ }
+ catch (Error e) {
+ onDone(e);
+
+ throw e;
+ }
+ }
+ });
+ }
+ catch (Error e) {
+ onDone(e);
+
+ throw e;
+ }
+ }
+ });
+ }
+
/** {@inheritDoc} */
@Override public boolean cancel() throws IgniteCheckedException {
return embedded.cancel();
http://git-wip-us.apache.org/repos/asf/ignite/blob/e567f8cb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsTest.java
new file mode 100644
index 0000000..2094d0a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import javax.cache.*;
+import javax.cache.configuration.*;
+import javax.cache.integration.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
+/**
+ *
+ */
+public class CacheAsyncOperationsTest extends GridCommonAbstractTest {
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static volatile CountDownLatch latch;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ if (gridName.equals(getTestGridName(1)))
+ cfg.setClientMode(true);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrids(2);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAsyncOperationsTx() throws Exception {
+ asyncOperations(TRANSACTIONAL);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAsyncOperationsAtomic() throws Exception {
+ asyncOperations(ATOMIC);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ CountDownLatch latch0 = latch;
+
+ if (latch0 != null)
+ latch0.countDown();
+
+ latch = null;
+ }
+
+ /**
+ * @param atomicityMode Atomicity mode.
+ * @throws Exception If failed.
+ */
+ public void asyncOperations(CacheAtomicityMode atomicityMode) throws Exception {
+ try (IgniteCache<Integer, Integer> cache = ignite(1).getOrCreateCache(cacheConfiguration(atomicityMode))) {
+ async1(cache);
+
+ async2(cache);
+ }
+ }
+
+ /**
+ * @param cache Cache.
+ */
+ private void async1(IgniteCache<Integer, Integer> cache) {
+ cache.put(1, 1);
+
+ latch = new CountDownLatch(1);
+
+ IgniteCache<Integer, Integer> asyncCache = cache.withAsync();
+
+ asyncCache.put(0, 0);
+
+ IgniteFuture<?> fut1 = asyncCache.future();
+
+ asyncCache.getAndPut(1, 2);
+
+ IgniteFuture<?> fut2 = asyncCache.future();
+
+ asyncCache.getAndPut(1, 3);
+
+ IgniteFuture<?> fut3 = asyncCache.future();
+
+ assertFalse(fut1.isDone());
+ assertFalse(fut2.isDone());
+ assertFalse(fut3.isDone());
+
+ latch.countDown();
+
+ try {
+ fut1.get();
+
+ fail();
+ }
+ catch (CacheException e) {
+ log.info("Expected error: " + e);
+ }
+
+ assertEquals(1, fut2.get());
+ assertEquals(2, fut3.get());
+
+ assertNull(cache.get(0));
+ assertEquals((Integer)3, cache.get(1));
+ }
+ /**
+ *
+ * @param cache Cache.
+ */
+ private void async2(IgniteCache<Integer, Integer> cache) {
+ cache.put(1, 1);
+
+ latch = new CountDownLatch(1);
+
+ IgniteCache<Integer, Integer> asyncCache = cache.withAsync();
+
+ asyncCache.put(0, 0);
+
+ IgniteFuture<?> fut1 = asyncCache.future();
+
+ asyncCache.put(0, 0);
+
+ IgniteFuture<?> fut2 = asyncCache.future();
+
+ asyncCache.getAndPut(1, 2);
+
+ IgniteFuture<?> fut3 = asyncCache.future();
+
+ asyncCache.put(0, 0);
+
+ IgniteFuture<?> fut4 = asyncCache.future();
+
+ assertFalse(fut1.isDone());
+ assertFalse(fut2.isDone());
+ assertFalse(fut3.isDone());
+ assertFalse(fut4.isDone());
+
+ latch.countDown();
+
+ try {
+ fut1.get();
+
+ fail();
+ }
+ catch (CacheException e) {
+ log.info("Expected error: " + e);
+ }
+
+ try {
+ fut2.get();
+
+ fail();
+ }
+ catch (CacheException e) {
+ log.info("Expected error: " + e);
+ }
+
+ assertEquals(1, fut3.get());
+
+ try {
+ fut4.get();
+
+ fail();
+ }
+ catch (CacheException e) {
+ log.info("Expected error: " + e);
+ }
+
+ assertNull(cache.get(0));
+ assertEquals((Integer)2, cache.get(1));
+ }
+
+ /**
+ * @param atomicityMode Atomicity mode.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Integer, Integer> cacheConfiguration(CacheAtomicityMode atomicityMode) {
+ CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+
+ ccfg.setCacheMode(PARTITIONED);
+ ccfg.setAtomicityMode(atomicityMode);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setWriteThrough(true);
+ ccfg.setCacheStoreFactory(new StoreFactory());
+
+ return ccfg;
+ }
+
+ /**
+ *
+ */
+ private static class StoreFactory implements Factory<TestStore> {
+ /** {@inheritDoc} */
+ @Override public TestStore create() {
+ return new TestStore();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestStore extends CacheStoreAdapter<Integer, Integer> {
+ /** {@inheritDoc} */
+ @Override public Integer load(Integer key) throws CacheLoaderException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) {
+ CountDownLatch latch0 = latch;
+
+ if (latch0 != null)
+ U.awaitQuiet(latch0);
+
+ Integer key = entry.getKey();
+
+ if (key.equals(0)) {
+ System.out.println(Thread.currentThread().getName() + ": fail operation for key: " + key);
+
+ throw new CacheWriterException("Test error.");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object key) {
+ // No-op.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e567f8cb/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
index 5947d33..c20e901 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
@@ -133,6 +133,8 @@ public class IgniteCacheTestSuite3 extends TestSuite {
suite.addTestSuite(IgniteTxGetAfterStopTest.class);
+ suite.addTestSuite(CacheAsyncOperationsTest.class);
+
return suite;
}
}