You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/27 14:19:50 UTC

[14/27] ignite git commit: Fixed GridEmbeddedFuture used for async cache operations, added test.

Fixed GridEmbeddedFuture used for async cache operations, added test.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e567f8cb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e567f8cb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e567f8cb

Branch: refs/heads/ignite-1124
Commit: e567f8cb3dd88c3c0a253c4fde06f8ced9b97fde
Parents: c8bc1f9
Author: sboikov <sb...@gridgain.com>
Authored: Thu Aug 27 12:39:52 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Aug 27 12:41:05 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |   6 +-
 .../dht/atomic/GridDhtAtomicCache.java          |   6 +-
 .../util/future/GridEmbeddedFuture.java         |  55 +++-
 .../distributed/CacheAsyncOperationsTest.java   | 280 +++++++++++++++++++
 .../testsuites/IgniteCacheTestSuite3.java       |   2 +
 5 files changed, 342 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e567f8cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 7adea2b..54d33e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -4132,9 +4132,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             final IgniteTxLocalAdapter tx0 = tx;
 
             if (fut != null && !fut.isDone()) {
-                IgniteInternalFuture<T> f = new GridEmbeddedFuture<>(fut,
-                    new C2<T, Exception, IgniteInternalFuture<T>>() {
-                        @Override public IgniteInternalFuture<T> apply(T t, Exception e) {
+                IgniteInternalFuture<T> f = new GridEmbeddedFuture(fut,
+                    new IgniteOutClosure<IgniteInternalFuture>() {
+                        @Override public IgniteInternalFuture<T> apply() {
                             return op.op(tx0).chain(new CX1<IgniteInternalFuture<T>, T>() {
                                 @Override public T applyx(IgniteInternalFuture<T> tFut) throws IgniteCheckedException {
                                     try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e567f8cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index c44b028..9087d20 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -567,9 +567,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             IgniteInternalFuture fut = holder.future();
 
             if (fut != null && !fut.isDone()) {
-                IgniteInternalFuture<T> f = new GridEmbeddedFuture<>(fut,
-                    new C2<T, Exception, IgniteInternalFuture<T>>() {
-                        @Override public IgniteInternalFuture<T> apply(T t, Exception e) {
+                IgniteInternalFuture<T> f = new GridEmbeddedFuture(fut,
+                    new IgniteOutClosure<IgniteInternalFuture>() {
+                        @Override public IgniteInternalFuture<T> apply() {
                             return op.apply();
                         }
                     });

http://git-wip-us.apache.org/repos/asf/ignite/blob/e567f8cb/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java
index 4475fae..11b28b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java
@@ -68,7 +68,8 @@ public class GridEmbeddedFuture<A, B> extends GridFutureAdapter<A> {
 
     /**
      * Embeds futures. Specific change order of arguments to avoid conflicts.
-     *  @param embedded Closure.
+     *
+     * @param embedded Embedded future.
      * @param c Closure which runs upon completion of embedded closure and which returns another future.
      */
     public GridEmbeddedFuture(
@@ -200,6 +201,58 @@ public class GridEmbeddedFuture<A, B> extends GridFutureAdapter<A> {
         });
     }
 
+    /**
+     * @param embedded Embedded future.
+     * @param c Closure to create next future.
+     */
+    public GridEmbeddedFuture(
+        IgniteInternalFuture<B> embedded,
+        final IgniteOutClosure<IgniteInternalFuture<A>> c
+    ) {
+        assert embedded != null;
+        assert c != null;
+
+        this.embedded = embedded;
+
+        embedded.listen(new AL1() {
+            @Override public void applyx(IgniteInternalFuture<B> embedded) {
+                try {
+                    IgniteInternalFuture<A> next = c.apply();
+
+                    if (next == null) {
+                        onDone();
+
+                        return;
+                    }
+
+                    next.listen(new AL2() {
+                        @Override public void applyx(IgniteInternalFuture<A> next) {
+                            try {
+                                onDone(next.get());
+                            }
+                            catch (GridClosureException e) {
+                                onDone(e.unwrap());
+                            }
+                            catch (IgniteCheckedException | RuntimeException e) {
+                                onDone(e);
+                            }
+                            catch (Error e) {
+                                onDone(e);
+
+                                throw e;
+                            }
+                        }
+                    });
+                }
+                catch (Error e) {
+                    onDone(e);
+
+                    throw e;
+                }
+            }
+        });
+    }
+
     /** {@inheritDoc} */
     @Override public boolean cancel() throws IgniteCheckedException {
         return embedded.cancel();

http://git-wip-us.apache.org/repos/asf/ignite/blob/e567f8cb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsTest.java
new file mode 100644
index 0000000..2094d0a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import javax.cache.*;
+import javax.cache.configuration.*;
+import javax.cache.integration.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
+/**
+ *
+ */
+public class CacheAsyncOperationsTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static volatile CountDownLatch latch;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        if (gridName.equals(getTestGridName(1)))
+            cfg.setClientMode(true);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(2);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAsyncOperationsTx() throws Exception {
+        asyncOperations(TRANSACTIONAL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAsyncOperationsAtomic() throws Exception {
+        asyncOperations(ATOMIC);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        CountDownLatch latch0 = latch;
+
+        if (latch0 != null)
+            latch0.countDown();
+
+        latch = null;
+    }
+
+    /**
+     * @param atomicityMode Atomicity mode.
+     * @throws Exception If failed.
+     */
+    public void asyncOperations(CacheAtomicityMode atomicityMode) throws Exception {
+        try (IgniteCache<Integer, Integer> cache = ignite(1).getOrCreateCache(cacheConfiguration(atomicityMode))) {
+            async1(cache);
+
+            async2(cache);
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     */
+    private void async1(IgniteCache<Integer, Integer> cache) {
+        cache.put(1, 1);
+
+        latch = new CountDownLatch(1);
+
+        IgniteCache<Integer, Integer> asyncCache = cache.withAsync();
+
+        asyncCache.put(0, 0);
+
+        IgniteFuture<?> fut1 = asyncCache.future();
+
+        asyncCache.getAndPut(1, 2);
+
+        IgniteFuture<?> fut2 = asyncCache.future();
+
+        asyncCache.getAndPut(1, 3);
+
+        IgniteFuture<?> fut3 = asyncCache.future();
+
+        assertFalse(fut1.isDone());
+        assertFalse(fut2.isDone());
+        assertFalse(fut3.isDone());
+
+        latch.countDown();
+
+        try {
+            fut1.get();
+
+            fail();
+        }
+        catch (CacheException e) {
+            log.info("Expected error: " + e);
+        }
+
+        assertEquals(1, fut2.get());
+        assertEquals(2, fut3.get());
+
+        assertNull(cache.get(0));
+        assertEquals((Integer)3, cache.get(1));
+    }
+    /**
+     *
+     * @param cache Cache.
+     */
+    private void async2(IgniteCache<Integer, Integer> cache) {
+        cache.put(1, 1);
+
+        latch = new CountDownLatch(1);
+
+        IgniteCache<Integer, Integer> asyncCache = cache.withAsync();
+
+        asyncCache.put(0, 0);
+
+        IgniteFuture<?> fut1 = asyncCache.future();
+
+        asyncCache.put(0, 0);
+
+        IgniteFuture<?> fut2 = asyncCache.future();
+
+        asyncCache.getAndPut(1, 2);
+
+        IgniteFuture<?> fut3 = asyncCache.future();
+
+        asyncCache.put(0, 0);
+
+        IgniteFuture<?> fut4 = asyncCache.future();
+
+        assertFalse(fut1.isDone());
+        assertFalse(fut2.isDone());
+        assertFalse(fut3.isDone());
+        assertFalse(fut4.isDone());
+
+        latch.countDown();
+
+        try {
+            fut1.get();
+
+            fail();
+        }
+        catch (CacheException e) {
+            log.info("Expected error: " + e);
+        }
+
+        try {
+            fut2.get();
+
+            fail();
+        }
+        catch (CacheException e) {
+            log.info("Expected error: " + e);
+        }
+
+        assertEquals(1, fut3.get());
+
+        try {
+            fut4.get();
+
+            fail();
+        }
+        catch (CacheException e) {
+            log.info("Expected error: " + e);
+        }
+
+        assertNull(cache.get(0));
+        assertEquals((Integer)2, cache.get(1));
+    }
+
+    /**
+     * @param atomicityMode Atomicity mode.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Integer, Integer> cacheConfiguration(CacheAtomicityMode atomicityMode) {
+        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setWriteThrough(true);
+        ccfg.setCacheStoreFactory(new StoreFactory());
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    private static class StoreFactory implements Factory<TestStore> {
+        /** {@inheritDoc} */
+        @Override public TestStore create() {
+            return new TestStore();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestStore extends CacheStoreAdapter<Integer, Integer> {
+        /** {@inheritDoc} */
+        @Override public Integer load(Integer key) throws CacheLoaderException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) {
+            CountDownLatch latch0 = latch;
+
+            if (latch0 != null)
+                U.awaitQuiet(latch0);
+
+            Integer key = entry.getKey();
+
+            if (key.equals(0)) {
+                System.out.println(Thread.currentThread().getName() + ": fail operation for key: " + key);
+
+                throw new CacheWriterException("Test error.");
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object key) {
+            // No-op.
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e567f8cb/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
index 5947d33..c20e901 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
@@ -133,6 +133,8 @@ public class IgniteCacheTestSuite3 extends TestSuite {
 
         suite.addTestSuite(IgniteTxGetAfterStopTest.class);
 
+        suite.addTestSuite(CacheAsyncOperationsTest.class);
+
         return suite;
     }
 }