You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/26 12:34:46 UTC

[13/16] ignite git commit: # ignite-1124

http://git-wip-us.apache.org/repos/asf/ignite/blob/4109bf4c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
index 5d0cacc..041a0f9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -27,12 +27,14 @@ import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.testframework.*;
 
+import javax.cache.processor.*;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*;
 import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheRebalanceMode.*;
 
 /**
  *
@@ -46,7 +48,9 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr
     /**
      * @return Keys count for the test.
      */
-    protected abstract int keysCount();
+    private int keysCount() {
+        return 10_000;
+    }
 
     /** {@inheritDoc} */
     @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
@@ -54,7 +58,7 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr
 
         cfg.setAtomicWriteOrderMode(writeOrderMode());
         cfg.setBackups(1);
-        cfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+        cfg.setRebalanceMode(SYNC);
 
         return cfg;
     }
@@ -78,25 +82,47 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr
     protected CacheAtomicWriteOrderMode writeOrderMode() {
         return CLOCK;
     }
+
     /**
      * @throws Exception If failed.
      */
     public void testPut() throws Exception {
-        checkPut(false);
+        checkRetry(Test.PUT);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAll() throws Exception {
+        checkRetry(Test.PUT_ALL);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testPutAsync() throws Exception {
-        checkPut(true);
+        checkRetry(Test.PUT_ASYNC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvoke() throws Exception {
+        checkRetry(Test.INVOKE);
     }
 
     /**
-     * @param async If {@code true} tests asynchronous put.
      * @throws Exception If failed.
      */
-    private void checkPut(boolean async) throws Exception {
+    public void testInvokeAll() throws Exception {
+        checkRetry(Test.INVOKE_ALL);
+    }
+
+    /**
+     * @param test Test type.
+     * @throws Exception If failed.
+     */
+    private void checkRetry(Test test) throws Exception {
         final AtomicBoolean finished = new AtomicBoolean();
 
         int keysCnt = keysCount();
@@ -115,52 +141,140 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr
             }
         });
 
+        IgniteCache<Integer, Integer> cache = ignite(0).cache(null);
 
-        IgniteCache<Object, Object> cache = ignite(0).cache(null);
+        int iter = 0;
 
-        if (atomicityMode() == ATOMIC)
-            assertEquals(writeOrderMode(), cache.getConfiguration(CacheConfiguration.class).getAtomicWriteOrderMode());
+        try {
+            if (atomicityMode() == ATOMIC)
+                assertEquals(writeOrderMode(), cache.getConfiguration(CacheConfiguration.class).getAtomicWriteOrderMode());
 
-        int iter = 0;
+            long stopTime = System.currentTimeMillis() + 60_000;
 
-        long stopTime = System.currentTimeMillis() + 60_000;
+            switch (test) {
+                case PUT: {
+                    while (System.currentTimeMillis() < stopTime) {
+                        Integer val = ++iter;
 
-        if (async) {
-            IgniteCache<Object, Object> cache0 = cache.withAsync();
+                        for (int i = 0; i < keysCnt; i++)
+                            cache.put(i, val);
 
-            while (System.currentTimeMillis() < stopTime) {
-                Integer val = ++iter;
+                        for (int i = 0; i < keysCnt; i++)
+                            assertEquals(val, cache.get(i));
+                    }
 
-                for (int i = 0; i < keysCnt; i++) {
-                    cache0.put(i, val);
+                    break;
+                }
+
+                case PUT_ALL: {
+                    while (System.currentTimeMillis() < stopTime) {
+                        Integer val = ++iter;
+
+                        Map<Integer, Integer> map = new LinkedHashMap<>();
+
+                        for (int i = 0; i < keysCnt; i++) {
+                            map.put(i, val);
 
-                    cache0.future().get();
+                            if (map.size() == 100 || i == keysCnt - 1)
+                                cache.putAll(map);
+                        }
+
+                        for (int i = 0; i < keysCnt; i++)
+                            assertEquals(val, cache.get(i));
+                    }
                 }
 
-                for (int i = 0; i < keysCnt; i++) {
-                    cache0.get(i);
+                case PUT_ASYNC: {
+                    IgniteCache<Integer, Integer> cache0 = cache.withAsync();
+
+                    while (System.currentTimeMillis() < stopTime) {
+                        Integer val = ++iter;
+
+                        for (int i = 0; i < keysCnt; i++) {
+                            cache0.put(i, val);
+
+                            cache0.future().get();
+                        }
 
-                    assertEquals(val, cache0.future().get());
+                        for (int i = 0; i < keysCnt; i++) {
+                            cache0.get(i);
+
+                            assertEquals(val, cache0.future().get());
+                        }
+                    }
+
+                    break;
                 }
-            }
-        }
-        else {
-            while (System.currentTimeMillis() < stopTime) {
-                Integer val = ++iter;
 
-                for (int i = 0; i < keysCnt; i++)
-                    cache.put(i, val);
+                case INVOKE: {
+                    while (System.currentTimeMillis() < stopTime) {
+                        Integer val = ++iter;
+
+                        Integer expOld = iter - 1;
+
+                        for (int i = 0; i < keysCnt; i++) {
+                            Integer old = cache.invoke(i, new SetEntryProcessor(val));
+
+                            assertNotNull(old);
+                            assertTrue(old.equals(expOld) || old.equals(val));
+                        }
+
+                        for (int i = 0; i < keysCnt; i++)
+                            assertEquals(val, cache.get(i));
+                    }
+
+                    break;
+                }
+
+                case INVOKE_ALL: {
+                    while (System.currentTimeMillis() < stopTime) {
+                        Integer val = ++iter;
+
+                        Integer expOld = iter - 1;
+
+                        Set<Integer> keys = new LinkedHashSet<>();
+
+                        for (int i = 0; i < keysCnt; i++) {
+                            keys.add(i);
+
+                            if (keys.size() == 100 || i == keysCnt - 1) {
+                                Map<Integer, EntryProcessorResult<Integer>> resMap =
+                                    cache.invokeAll(keys, new SetEntryProcessor(val));
+
+                                for (Integer key : keys) {
+                                    EntryProcessorResult<Integer> res = resMap.get(key);
+
+                                    assertNotNull(res);
+
+                                    Integer old = res.get();
+
+                                    assertTrue(old.equals(expOld) || old.equals(val));
+                                }
 
-                for (int i = 0; i < keysCnt; i++)
-                    assertEquals(val, cache.get(i));
+                                assertEquals(keys.size(), resMap.size());
+
+                                keys.clear();
+                            }
+                        }
+
+                        for (int i = 0; i < keysCnt; i++)
+                            assertEquals(val, cache.get(i));
+                    }
+
+                    break;
+                }
+
+                default:
+                    assert false : test;
             }
         }
-
-        finished.set(true);
-        fut.get();
+        finally {
+            finished.set(true);
+            fut.get();
+        }
 
         for (int i = 0; i < keysCnt; i++)
-            assertEquals(iter, cache.get(i));
+            assertEquals((Integer)iter, cache.get(i));
     }
 
     /**
@@ -201,34 +315,41 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr
         try {
             int keysCnt = keysCount();
 
-        boolean eThrown = false;
+            boolean eThrown = false;
 
-        IgniteCache<Object, Object> cache = ignite(0).cache(null).withNoRetries();
+            IgniteCache<Object, Object> cache = ignite(0).cache(null).withNoRetries();
 
-        if (async)
-            cache = cache.withAsync();
+            if (async)
+                cache = cache.withAsync();
 
-        for (int i = 0; i < keysCnt; i++) {
-            try {
-                if (async) {
-                    cache.put(i, i);
+            long stopTime = System.currentTimeMillis() + 60_000;
 
-                    cache.future().get();
+            while (System.currentTimeMillis() < stopTime) {
+                for (int i = 0; i < keysCnt; i++) {
+                    try {
+                        if (async) {
+                            cache.put(i, i);
+
+                            cache.future().get();
+                        }
+                        else
+                            cache.put(i, i);
+                    }
+                    catch (Exception e) {
+                        assertTrue("Invalid exception: " + e,
+                            X.hasCause(e, ClusterTopologyCheckedException.class, CachePartialUpdateException.class));
+
+                        eThrown = true;
+
+                        break;
+                    }
                 }
-                else
-                    cache.put(i, i);
-            }
-            catch (Exception e) {
-                assertTrue("Invalid exception: " + e,
-                    X.hasCause(e, ClusterTopologyCheckedException.class, CachePartialUpdateException.class));
-
-                eThrown = true;
 
+                if (eThrown)
                     break;
-                }
             }
 
-        assertTrue(eThrown);
+            assertTrue(eThrown);
 
             finished.set(true);
 
@@ -243,4 +364,48 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr
     @Override protected long getTestTimeout() {
         return 3 * 60 * 1000;
     }
+
+    /**
+     *
+     */
+    enum Test {
+        /** */
+        PUT,
+
+        /** */
+        PUT_ALL,
+
+        /** */
+        PUT_ASYNC,
+
+        /** */
+        INVOKE,
+
+        /** */
+        INVOKE_ALL
+    }
+
+    /**
+     *
+     */
+    class SetEntryProcessor implements CacheEntryProcessor<Integer, Integer, Integer> {
+        /** */
+        private Integer val;
+
+        /**
+         * @param val Value.
+         */
+        public SetEntryProcessor(Integer val) {
+            this.val = val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Integer process(MutableEntry<Integer, Integer> e, Object... args) {
+            Integer old = e.getValue();
+
+            e.setValue(val);
+
+            return old == null ? 0 : old;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4109bf4c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java
index e76663a..be442d2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java
@@ -16,7 +16,21 @@
  */
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
+import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.transactions.*;
+
+import javax.cache.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
 
 /**
  *
@@ -24,11 +38,66 @@ import org.apache.ignite.cache.*;
 public class IgniteCachePutRetryAtomicSelfTest extends IgniteCachePutRetryAbstractSelfTest {
     /** {@inheritDoc} */
     @Override protected CacheAtomicityMode atomicityMode() {
-        return CacheAtomicityMode.ATOMIC;
+        return ATOMIC;
     }
 
-    /** {@inheritDoc} */
-    @Override protected int keysCount() {
-        return 60_000;
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutInsideTransaction() throws Exception {
+        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+
+        ccfg.setName("tx-cache");
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+
+        try (IgniteCache<Integer, Integer> txCache = ignite(0).getOrCreateCache(ccfg)) {
+            final AtomicBoolean finished = new AtomicBoolean();
+
+            IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    while (!finished.get()) {
+                        stopGrid(3);
+
+                        U.sleep(300);
+
+                        startGrid(3);
+                    }
+
+                    return null;
+                }
+            });
+
+            try {
+                IgniteTransactions txs = ignite(0).transactions();
+
+                IgniteCache<Object, Object> cache = ignite(0).cache(null);
+
+                long stopTime = System.currentTimeMillis() + 60_000;
+
+                while (System.currentTimeMillis() < stopTime) {
+                    for (int i = 0; i < 10_000; i++) {
+                        try {
+                            try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                                txCache.put(0, 0);
+
+                                cache.put(i, i);
+
+                                tx.commit();
+                            }
+                        }
+                        catch (IgniteException | CacheException e) {
+                            log.info("Ignore exception: " + e);
+                        }
+                    }
+                }
+
+                finished.set(true);
+
+                fut.get();
+            }
+            finally {
+                finished.set(true);
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4109bf4c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
index 0ab5729..e113fcc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
@@ -32,6 +33,7 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
 import static org.apache.ignite.transactions.TransactionConcurrency.*;
 import static org.apache.ignite.transactions.TransactionIsolation.*;
 
@@ -44,12 +46,12 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
 
     /** {@inheritDoc} */
     @Override protected CacheAtomicityMode atomicityMode() {
-        return CacheAtomicityMode.TRANSACTIONAL;
+        return TRANSACTIONAL;
     }
 
     /** {@inheritDoc} */
-    @Override protected int keysCount() {
-        return 20_000;
+    @Override protected NearCacheConfiguration nearConfiguration() {
+        return null;
     }
 
     /**
@@ -74,7 +76,7 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
             }
         });
 
-        int keysCnt = keysCount();
+        final int keysCnt = 20_000;
 
         try {
             for (int i = 0; i < keysCnt; i++)
@@ -90,6 +92,7 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
     public void testExplicitTransactionRetries() throws Exception {
         final AtomicInteger idx = new AtomicInteger();
         int threads = 8;
@@ -97,8 +100,7 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
         final AtomicReferenceArray<Exception> err = new AtomicReferenceArray<>(threads);
 
         IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
-            @Override
-            public Object call() throws Exception {
+            @Override public Object call() throws Exception {
                 int th = idx.getAndIncrement();
                 int base = th * FACTOR;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4109bf4c/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java
index 5b9af4f..5bb1706 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java
@@ -22,6 +22,8 @@ import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.testframework.*;
 
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+
 /**
  *
  */
@@ -37,11 +39,6 @@ public class IgniteCacheSslStartStopSelfTest extends IgniteCachePutRetryAbstract
 
     /** {@inheritDoc} */
     @Override protected CacheAtomicityMode atomicityMode() {
-        return CacheAtomicityMode.ATOMIC;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected int keysCount() {
-        return 60_000;
+        return ATOMIC;
     }
 }