You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/26 12:34:46 UTC
[13/16] ignite git commit: # ignite-1124
http://git-wip-us.apache.org/repos/asf/ignite/blob/4109bf4c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
index 5d0cacc..041a0f9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -27,12 +27,14 @@ import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.testframework.*;
+import javax.cache.processor.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*;
import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheRebalanceMode.*;
/**
*
@@ -46,7 +48,9 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr
/**
* @return Keys count for the test.
*/
- protected abstract int keysCount();
+ private int keysCount() {
+ return 10_000;
+ }
/** {@inheritDoc} */
@Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
@@ -54,7 +58,7 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr
cfg.setAtomicWriteOrderMode(writeOrderMode());
cfg.setBackups(1);
- cfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+ cfg.setRebalanceMode(SYNC);
return cfg;
}
@@ -78,25 +82,47 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr
protected CacheAtomicWriteOrderMode writeOrderMode() {
return CLOCK;
}
+
/**
* @throws Exception If failed.
*/
public void testPut() throws Exception {
- checkPut(false);
+ checkRetry(Test.PUT);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutAll() throws Exception {
+ checkRetry(Test.PUT_ALL);
}
/**
* @throws Exception If failed.
*/
public void testPutAsync() throws Exception {
- checkPut(true);
+ checkRetry(Test.PUT_ASYNC);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testInvoke() throws Exception {
+ checkRetry(Test.INVOKE);
}
/**
- * @param async If {@code true} tests asynchronous put.
* @throws Exception If failed.
*/
- private void checkPut(boolean async) throws Exception {
+ public void testInvokeAll() throws Exception {
+ checkRetry(Test.INVOKE_ALL);
+ }
+
+ /**
+ * @param test Test type.
+ * @throws Exception If failed.
+ */
+ private void checkRetry(Test test) throws Exception {
final AtomicBoolean finished = new AtomicBoolean();
int keysCnt = keysCount();
@@ -115,52 +141,140 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr
}
});
+ IgniteCache<Integer, Integer> cache = ignite(0).cache(null);
- IgniteCache<Object, Object> cache = ignite(0).cache(null);
+ int iter = 0;
- if (atomicityMode() == ATOMIC)
- assertEquals(writeOrderMode(), cache.getConfiguration(CacheConfiguration.class).getAtomicWriteOrderMode());
+ try {
+ if (atomicityMode() == ATOMIC)
+ assertEquals(writeOrderMode(), cache.getConfiguration(CacheConfiguration.class).getAtomicWriteOrderMode());
- int iter = 0;
+ long stopTime = System.currentTimeMillis() + 60_000;
- long stopTime = System.currentTimeMillis() + 60_000;
+ switch (test) {
+ case PUT: {
+ while (System.currentTimeMillis() < stopTime) {
+ Integer val = ++iter;
- if (async) {
- IgniteCache<Object, Object> cache0 = cache.withAsync();
+ for (int i = 0; i < keysCnt; i++)
+ cache.put(i, val);
- while (System.currentTimeMillis() < stopTime) {
- Integer val = ++iter;
+ for (int i = 0; i < keysCnt; i++)
+ assertEquals(val, cache.get(i));
+ }
- for (int i = 0; i < keysCnt; i++) {
- cache0.put(i, val);
+ break;
+ }
+
+ case PUT_ALL: {
+ while (System.currentTimeMillis() < stopTime) {
+ Integer val = ++iter;
+
+ Map<Integer, Integer> map = new LinkedHashMap<>();
+
+ for (int i = 0; i < keysCnt; i++) {
+ map.put(i, val);
- cache0.future().get();
+ if (map.size() == 100 || i == keysCnt - 1)
+ cache.putAll(map);
+ }
+
+ for (int i = 0; i < keysCnt; i++)
+ assertEquals(val, cache.get(i));
+ }
}
- for (int i = 0; i < keysCnt; i++) {
- cache0.get(i);
+ case PUT_ASYNC: {
+ IgniteCache<Integer, Integer> cache0 = cache.withAsync();
+
+ while (System.currentTimeMillis() < stopTime) {
+ Integer val = ++iter;
+
+ for (int i = 0; i < keysCnt; i++) {
+ cache0.put(i, val);
+
+ cache0.future().get();
+ }
- assertEquals(val, cache0.future().get());
+ for (int i = 0; i < keysCnt; i++) {
+ cache0.get(i);
+
+ assertEquals(val, cache0.future().get());
+ }
+ }
+
+ break;
}
- }
- }
- else {
- while (System.currentTimeMillis() < stopTime) {
- Integer val = ++iter;
- for (int i = 0; i < keysCnt; i++)
- cache.put(i, val);
+ case INVOKE: {
+ while (System.currentTimeMillis() < stopTime) {
+ Integer val = ++iter;
+
+ Integer expOld = iter - 1;
+
+ for (int i = 0; i < keysCnt; i++) {
+ Integer old = cache.invoke(i, new SetEntryProcessor(val));
+
+ assertNotNull(old);
+ assertTrue(old.equals(expOld) || old.equals(val));
+ }
+
+ for (int i = 0; i < keysCnt; i++)
+ assertEquals(val, cache.get(i));
+ }
+
+ break;
+ }
+
+ case INVOKE_ALL: {
+ while (System.currentTimeMillis() < stopTime) {
+ Integer val = ++iter;
+
+ Integer expOld = iter - 1;
+
+ Set<Integer> keys = new LinkedHashSet<>();
+
+ for (int i = 0; i < keysCnt; i++) {
+ keys.add(i);
+
+ if (keys.size() == 100 || i == keysCnt - 1) {
+ Map<Integer, EntryProcessorResult<Integer>> resMap =
+ cache.invokeAll(keys, new SetEntryProcessor(val));
+
+ for (Integer key : keys) {
+ EntryProcessorResult<Integer> res = resMap.get(key);
+
+ assertNotNull(res);
+
+ Integer old = res.get();
+
+ assertTrue(old.equals(expOld) || old.equals(val));
+ }
- for (int i = 0; i < keysCnt; i++)
- assertEquals(val, cache.get(i));
+ assertEquals(keys.size(), resMap.size());
+
+ keys.clear();
+ }
+ }
+
+ for (int i = 0; i < keysCnt; i++)
+ assertEquals(val, cache.get(i));
+ }
+
+ break;
+ }
+
+ default:
+ assert false : test;
}
}
-
- finished.set(true);
- fut.get();
+ finally {
+ finished.set(true);
+ fut.get();
+ }
for (int i = 0; i < keysCnt; i++)
- assertEquals(iter, cache.get(i));
+ assertEquals((Integer)iter, cache.get(i));
}
/**
@@ -201,34 +315,41 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr
try {
int keysCnt = keysCount();
- boolean eThrown = false;
+ boolean eThrown = false;
- IgniteCache<Object, Object> cache = ignite(0).cache(null).withNoRetries();
+ IgniteCache<Object, Object> cache = ignite(0).cache(null).withNoRetries();
- if (async)
- cache = cache.withAsync();
+ if (async)
+ cache = cache.withAsync();
- for (int i = 0; i < keysCnt; i++) {
- try {
- if (async) {
- cache.put(i, i);
+ long stopTime = System.currentTimeMillis() + 60_000;
- cache.future().get();
+ while (System.currentTimeMillis() < stopTime) {
+ for (int i = 0; i < keysCnt; i++) {
+ try {
+ if (async) {
+ cache.put(i, i);
+
+ cache.future().get();
+ }
+ else
+ cache.put(i, i);
+ }
+ catch (Exception e) {
+ assertTrue("Invalid exception: " + e,
+ X.hasCause(e, ClusterTopologyCheckedException.class, CachePartialUpdateException.class));
+
+ eThrown = true;
+
+ break;
+ }
}
- else
- cache.put(i, i);
- }
- catch (Exception e) {
- assertTrue("Invalid exception: " + e,
- X.hasCause(e, ClusterTopologyCheckedException.class, CachePartialUpdateException.class));
-
- eThrown = true;
+ if (eThrown)
break;
- }
}
- assertTrue(eThrown);
+ assertTrue(eThrown);
finished.set(true);
@@ -243,4 +364,48 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr
@Override protected long getTestTimeout() {
return 3 * 60 * 1000;
}
+
+ /**
+ *
+ */
+ enum Test {
+ /** */
+ PUT,
+
+ /** */
+ PUT_ALL,
+
+ /** */
+ PUT_ASYNC,
+
+ /** */
+ INVOKE,
+
+ /** */
+ INVOKE_ALL
+ }
+
+ /**
+ *
+ */
+ class SetEntryProcessor implements CacheEntryProcessor<Integer, Integer, Integer> {
+ /** */
+ private Integer val;
+
+ /**
+ * @param val Value.
+ */
+ public SetEntryProcessor(Integer val) {
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Integer process(MutableEntry<Integer, Integer> e, Object... args) {
+ Integer old = e.getValue();
+
+ e.setValue(val);
+
+ return old == null ? 0 : old;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4109bf4c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java
index e76663a..be442d2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java
@@ -16,7 +16,21 @@
*/
package org.apache.ignite.internal.processors.cache.distributed.dht;
+import org.apache.ignite.*;
import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.transactions.*;
+
+import javax.cache.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
/**
*
@@ -24,11 +38,66 @@ import org.apache.ignite.cache.*;
public class IgniteCachePutRetryAtomicSelfTest extends IgniteCachePutRetryAbstractSelfTest {
/** {@inheritDoc} */
@Override protected CacheAtomicityMode atomicityMode() {
- return CacheAtomicityMode.ATOMIC;
+ return ATOMIC;
}
- /** {@inheritDoc} */
- @Override protected int keysCount() {
- return 60_000;
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPutInsideTransaction() throws Exception {
+ CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+
+ ccfg.setName("tx-cache");
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+
+ try (IgniteCache<Integer, Integer> txCache = ignite(0).getOrCreateCache(ccfg)) {
+ final AtomicBoolean finished = new AtomicBoolean();
+
+ IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ while (!finished.get()) {
+ stopGrid(3);
+
+ U.sleep(300);
+
+ startGrid(3);
+ }
+
+ return null;
+ }
+ });
+
+ try {
+ IgniteTransactions txs = ignite(0).transactions();
+
+ IgniteCache<Object, Object> cache = ignite(0).cache(null);
+
+ long stopTime = System.currentTimeMillis() + 60_000;
+
+ while (System.currentTimeMillis() < stopTime) {
+ for (int i = 0; i < 10_000; i++) {
+ try {
+ try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ txCache.put(0, 0);
+
+ cache.put(i, i);
+
+ tx.commit();
+ }
+ }
+ catch (IgniteException | CacheException e) {
+ log.info("Ignore exception: " + e);
+ }
+ }
+ }
+
+ finished.set(true);
+
+ fut.get();
+ }
+ finally {
+ finished.set(true);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4109bf4c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
index 0ab5729..e113fcc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
@@ -32,6 +33,7 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
import static org.apache.ignite.transactions.TransactionConcurrency.*;
import static org.apache.ignite.transactions.TransactionIsolation.*;
@@ -44,12 +46,12 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
/** {@inheritDoc} */
@Override protected CacheAtomicityMode atomicityMode() {
- return CacheAtomicityMode.TRANSACTIONAL;
+ return TRANSACTIONAL;
}
/** {@inheritDoc} */
- @Override protected int keysCount() {
- return 20_000;
+ @Override protected NearCacheConfiguration nearConfiguration() {
+ return null;
}
/**
@@ -74,7 +76,7 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
}
});
- int keysCnt = keysCount();
+ final int keysCnt = 20_000;
try {
for (int i = 0; i < keysCnt; i++)
@@ -90,6 +92,7 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
}
/** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
public void testExplicitTransactionRetries() throws Exception {
final AtomicInteger idx = new AtomicInteger();
int threads = 8;
@@ -97,8 +100,7 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
final AtomicReferenceArray<Exception> err = new AtomicReferenceArray<>(threads);
IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
- @Override
- public Object call() throws Exception {
+ @Override public Object call() throws Exception {
int th = idx.getAndIncrement();
int base = th * FACTOR;
http://git-wip-us.apache.org/repos/asf/ignite/blob/4109bf4c/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java
index 5b9af4f..5bb1706 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteCacheSslStartStopSelfTest.java
@@ -22,6 +22,8 @@ import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.*;
import org.apache.ignite.testframework.*;
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+
/**
*
*/
@@ -37,11 +39,6 @@ public class IgniteCacheSslStartStopSelfTest extends IgniteCachePutRetryAbstract
/** {@inheritDoc} */
@Override protected CacheAtomicityMode atomicityMode() {
- return CacheAtomicityMode.ATOMIC;
- }
-
- /** {@inheritDoc} */
- @Override protected int keysCount() {
- return 60_000;
+ return ATOMIC;
}
}