You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/11/05 13:17:55 UTC
[11/38] ignite git commit: ignite-1397: Load/consistency tests.
http://git-wip-us.apache.org/repos/asf/ignite/blob/5b0a18dd/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalInvokeRetryBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalInvokeRetryBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalInvokeRetryBenchmark.java
new file mode 100644
index 0000000..f8a1689
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalInvokeRetryBenchmark.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.yardstickframework.BenchmarkConfiguration;
+
+import static org.yardstickframework.BenchmarkUtils.println;
+
+/**
+ * Invoke retry failover benchmark. <p> Each client maintains a local map that it updates together with cache. Client
+ * invokes an increment closure for all generated keys and atomically increments value for corresponding keys in the
+ * local map. To validate cache contents, all writes from the client are stopped, values in the local map are compared
+ * to the values in the cache.
+ */
+public class IgniteTransactionalInvokeRetryBenchmark extends IgniteFailoverAbstractBenchmark<String, Long> {
+ /** */
+ private final ConcurrentMap<String, AtomicLong> map = new ConcurrentHashMap<>();
+
+ /** */
+ private final ReadWriteLock rwl = new ReentrantReadWriteLock(true);
+
+ /** */
+ private volatile Exception ex;
+
+ /** {@inheritDoc} */
+ @Override public void setUp(final BenchmarkConfiguration cfg) throws Exception {
+ super.setUp(cfg);
+
+ Thread thread = new Thread(new Runnable() {
+ @Override public void run() {
+ try {
+ final int timeout = args.cacheOperationTimeoutMillis();
+ final int keysCnt = args.keysCount();
+
+ while (!Thread.currentThread().isInterrupted()) {
+ Thread.sleep(args.cacheConsistencyCheckingPeriod() * 1000);
+
+ rwl.writeLock().lock();
+
+ try {
+ println("Start cache validation.");
+
+ long startTime = U.currentTimeMillis();
+
+ Map<String, Long> notEqualsCacheVals = new HashMap<>();
+ Map<String, Long> notEqualsLocMapVals = new HashMap<>();
+
+ for (int k = 0; k < args.range(); k++) {
+ if (k % 10_000 == 0)
+ println("Start validation for keys like 'key-" + k + "-*'");
+
+ for (int i = 0; i < keysCnt; i++) {
+ String key = "key-" + k + "-" + cfg.memberId() + "-" + i;
+
+ asyncCache.get(key);
+ Long cacheVal = asyncCache.<Long>future().get(timeout);
+
+ AtomicLong aVal = map.get(key);
+ Long mapVal = aVal != null ? aVal.get() : null;
+
+ if (!Objects.equals(cacheVal, mapVal)) {
+ notEqualsCacheVals.put(key, cacheVal);
+ notEqualsLocMapVals.put(key, mapVal);
+ }
+ }
+ }
+
+ assert notEqualsCacheVals.size() == notEqualsLocMapVals.size() : "Invalid state " +
+ "[cacheMapVals=" + notEqualsCacheVals + ", mapVals=" + notEqualsLocMapVals + "]";
+
+ if (!notEqualsCacheVals.isEmpty()) {
+ // Print all usefull information and finish.
+ for (Map.Entry<String, Long> eLocMap : notEqualsLocMapVals.entrySet()) {
+ String key = eLocMap.getKey();
+ Long mapVal = eLocMap.getValue();
+ Long cacheVal = notEqualsCacheVals.get(key);
+
+ println(cfg, "Got different values [key='" + key
+ + "', cacheVal=" + cacheVal + ", localMapVal=" + mapVal + "]");
+ }
+
+ println(cfg, "Local driver map contant:\n " + map);
+
+ println(cfg, "Cache content:");
+
+ for (int k2 = 0; k2 < args.range(); k2++) {
+ for (int i2 = 0; i2 < keysCnt; i2++) {
+ String key2 = "key-" + k2 + "-" + cfg.memberId() + "-" + i2;
+
+ asyncCache.get(key2);
+ Long val = asyncCache.<Long>future().get(timeout);
+
+ if (val != null)
+ println(cfg, "Entry [key=" + key2 + ", val=" + val + "]");
+ }
+ }
+
+ throw new IllegalStateException("Cache and local map are in inconsistent state.");
+ }
+
+ println("Cache validation successfully finished in "
+ + (U.currentTimeMillis() - startTime) / 1000 + " sec.");
+ }
+ finally {
+ rwl.writeLock().unlock();
+ }
+ }
+ }
+ catch (Throwable e) {
+ ex = new Exception(e);
+
+ println("Got exception: " + e);
+
+ e.printStackTrace();
+
+ if (e instanceof Error)
+ throw (Error)e;
+ }
+ }
+ }, "cache-" + cacheName() + "-validator");
+
+ thread.setDaemon(true);
+
+ thread.start();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+ final int k = nextRandom(args.range());
+
+ final String[] keys = new String[args.keysCount()];
+
+ assert keys.length > 0 : "Count of keys: " + keys.length;
+
+ for (int i = 0; i < keys.length; i++)
+ keys[i] = "key-" + k + "-" + cfg.memberId() + "-" + i;
+
+ for (String key : keys) {
+ rwl.readLock().lock();
+
+ try {
+ if (ex != null)
+ throw ex;
+
+ asyncCache.invoke(key, new IncrementCacheEntryProcessor());
+ asyncCache.future().get(args.cacheOperationTimeoutMillis());
+
+ AtomicLong prevVal = map.putIfAbsent(key, new AtomicLong(0));
+
+ if (prevVal != null)
+ prevVal.incrementAndGet();
+ }
+ finally {
+ rwl.readLock().unlock();
+ }
+ }
+
+ if (ex != null)
+ throw ex;
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected String cacheName() {
+ return "tx-invoke-retry";
+ }
+
+ /**
+ */
+ private static class IncrementCacheEntryProcessor implements CacheEntryProcessor<String, Long, Long> {
+ /** */
+ private static final long serialVersionUID = 0;
+
+ /** {@inheritDoc} */
+ @Override public Long process(MutableEntry<String, Long> entry,
+ Object... arguments) throws EntryProcessorException {
+ long newVal = entry.getValue() == null ? 0 : entry.getValue() + 1;
+
+ entry.setValue(newVal);
+
+ return newVal;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5b0a18dd/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapInvokeRetryBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapInvokeRetryBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapInvokeRetryBenchmark.java
new file mode 100644
index 0000000..4cbcf67
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapInvokeRetryBenchmark.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+/**
+ * Invoke retry failover benchmark.
+ * <p>
+ * Each client maintains a local map that it updates together with cache.
+ * Client invokes an increment closure for all generated keys and atomically increments value for corresponding
+ * keys in the local map. To validate cache contents, all writes from the client are stopped, values in
+ * the local map are compared to the values in the cache.
+ */
+public class IgniteTransactionalOffHeapInvokeRetryBenchmark extends IgniteTransactionalInvokeRetryBenchmark {
+ /** {@inheritDoc} */
+ @Override protected String cacheName() {
+ return "tx-offheap-invoke-retry";
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5b0a18dd/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteInvokeBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteInvokeBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteInvokeBenchmark.java
new file mode 100644
index 0000000..7fa2d1a
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteInvokeBenchmark.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+/**
+ * Transactional write invoke failover benchmark.
+ * <p>
+ * Each client generates a random integer K in a limited range and creates keys in the form 'key-' + K + 'master',
+ * 'key-' + K + '-1', 'key-' + K + '-2', ... Then client starts a pessimistic repeatable read transaction
+ * and randomly chooses between read and write scenarios:
+ * <ul>
+ * <li>Reads value associated with the master key and child keys. Values must be equal.</li>
+ * <li>Reads value associated with the master key, increments it by 1 and puts the value, then invokes increment
+ * closure on child keys. No validation is performed.</li>
+ * </ul>
+ */
+public class IgniteTransactionalOffHeapWriteInvokeBenchmark extends IgniteTransactionalWriteInvokeBenchmark {
+ /** {@inheritDoc} */
+ @Override protected String cacheName() {
+ return "tx-offheap-write-invoke";
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5b0a18dd/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteReadBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteReadBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteReadBenchmark.java
new file mode 100644
index 0000000..bdecca7
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteReadBenchmark.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+/**
+ * Transactional write read failover benchmark.
+ * <p>
+ * Each client generates a random integer K in a limited range and creates keys in the form 'key-' + K + '-1',
+ * 'key-' + K + '-2', ... Then client starts a pessimistic repeatable read transaction, reads value associated with
+ * each key. Values must be equal. Client increments value by 1, commits the transaction.
+ */
+public class IgniteTransactionalOffHeapWriteReadBenchmark extends IgniteTransactionalWriteReadBenchmark {
+ /** {@inheritDoc} */
+ @Override protected String cacheName() {
+ return "tx-offheap-write-read";
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5b0a18dd/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteInvokeBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteInvokeBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteInvokeBenchmark.java
new file mode 100644
index 0000000..1a8ee14
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteInvokeBenchmark.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import javax.cache.CacheException;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionRollbackException;
+
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.yardstickframework.BenchmarkUtils.println;
+
+/**
+ * Transactional write invoke failover benchmark.
+ * <p>
+ * Each client generates a random integer K in a limited range and creates keys in the form 'key-' + K + 'master',
+ * 'key-' + K + '-1', 'key-' + K + '-2', ... Then client starts a pessimistic repeatable read transaction
+ * and randomly chooses between read and write scenarios:
+ * <ul>
+ * <li>Reads value associated with the master key and child keys. Values must be equal.</li>
+ * <li>Reads value associated with the master key, increments it by 1 and puts the value, then invokes increment
+ * closure on child keys. No validation is performed.</li>
+ * </ul>
+ */
+public class IgniteTransactionalWriteInvokeBenchmark extends IgniteFailoverAbstractBenchmark<String, Long> {
+ /** {@inheritDoc} */
+ @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+ final int k = nextRandom(args.range());
+
+ assert args.keysCount() > 0 : "Count of keys: " + args.keysCount();
+
+ final String[] keys = new String[args.keysCount()];
+
+ final String masterKey = "key-" + k + "-master";
+
+ for (int i = 0; i < keys.length; i++)
+ keys[i] = "key-" + k + "-" + i;
+
+ final int scenario = nextRandom(2);
+
+ return doInTransaction(ignite(), new Callable<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ final int timeout = args.cacheOperationTimeoutMillis();
+
+ switch (scenario) {
+ case 0: // Read scenario.
+ Map<String, Long> map = new HashMap<>();
+
+ asyncCache.get(masterKey);
+ Long cacheVal = asyncCache.<Long>future().get(timeout);
+
+ map.put(masterKey, cacheVal);
+
+ for (String key : keys) {
+ asyncCache.get(key);
+ cacheVal = asyncCache.<Long>future().get(timeout);
+
+ map.put(key, cacheVal);
+ }
+
+ Set<Long> values = new HashSet<>(map.values());
+
+ if (values.size() != 1) {
+ // Print all usefull information and finish.
+ println(cfg, "Got different values for keys [map=" + map + "]");
+
+ println(cfg, "Cache content:");
+
+ for (int k = 0; k < args.range(); k++) {
+ for (int i = 0; i < args.keysCount(); i++) {
+ String key = "key-" + k + "-" + i;
+
+ asyncCache.get(key);
+ Long val = asyncCache.<Long>future().get(timeout);
+
+ if (val != null)
+ println(cfg, "Entry [key=" + key + ", val=" + val + "]");
+ }
+ }
+
+ throw new IllegalStateException("Found different values for keys (see above information).");
+ }
+
+ break;
+ case 1: // Invoke scenario.
+ asyncCache.get(masterKey);
+ Long val = asyncCache.<Long>future().get(timeout);
+
+ asyncCache.put(masterKey, val == null ? 0 : val + 1);
+ asyncCache.future().get(timeout);
+
+ for (String key : keys) {
+ asyncCache.invoke(key, new IncrementCacheEntryProcessor());
+ asyncCache.future().get(timeout);
+ }
+
+ break;
+ }
+
+ return true;
+ }
+ });
+ }
+
+ /**
+ * @param ignite Ignite instance.
+ * @param clo Closure.
+ * @return Result of closure execution.
+ * @throws Exception
+ */
+ public static <T> T doInTransaction(Ignite ignite, Callable<T> clo) throws Exception {
+ while (true) {
+ try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ T res = clo.call();
+
+ tx.commit();
+
+ return res;
+ }
+ catch (CacheException e) {
+ if (e.getCause() instanceof ClusterTopologyException) {
+ ClusterTopologyException topEx = (ClusterTopologyException)e.getCause();
+
+ topEx.retryReadyFuture().get();
+ }
+ else
+ throw e;
+ }
+ catch (ClusterTopologyException e) {
+ e.retryReadyFuture().get();
+ }
+ catch (TransactionRollbackException ignore) {
+ // Safe to retry right away.
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected String cacheName() {
+ return "tx-write-invoke";
+ }
+
+ /**
+ */
+ private static class IncrementCacheEntryProcessor implements CacheEntryProcessor<String, Long, Void> {
+ /** */
+ private static final long serialVersionUID = 0;
+
+ /** {@inheritDoc} */
+ @Override public Void process(MutableEntry<String, Long> entry,
+ Object... arguments) throws EntryProcessorException {
+ entry.setValue(entry.getValue() == null ? 0 : entry.getValue() + 1);
+
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5b0a18dd/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteReadBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteReadBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteReadBenchmark.java
new file mode 100644
index 0000000..c962749
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteReadBenchmark.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionRollbackException;
+
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.yardstickframework.BenchmarkUtils.println;
+
+/**
+ * Transactional write read failover benchmark.
+ * <p>
+ * Each client generates a random integer K in a limited range and creates keys in the form 'key-' + K + '-1',
+ * 'key-' + K + '-2', ... Then client starts a pessimistic repeatable read transaction, reads value associated with
+ * each key. Values must be equal. Client increments value by 1, commits the transaction.
+ */
+public class IgniteTransactionalWriteReadBenchmark extends IgniteFailoverAbstractBenchmark<String, Long> {
+ /** {@inheritDoc} */
+ @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+ final int k = nextRandom(args.range());
+
+ assert args.keysCount() > 0 : "Count of keys: " + args.keysCount();
+
+ final String[] keys = new String[args.keysCount()];
+
+ for (int i = 0; i < keys.length; i++)
+ keys[i] = "key-" + k + "-" + i;
+
+ return doInTransaction(ignite(), new Callable<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ Map<String, Long> map = new HashMap<>();
+
+ final int timeout = args.cacheOperationTimeoutMillis();
+
+ for (String key : keys) {
+ asyncCache.get(key);
+ Long val = asyncCache.<Long>future().get(timeout);
+
+ map.put(key, val);
+ }
+
+ Set<Long> values = new HashSet<>(map.values());
+
+ if (values.size() != 1) {
+ // Print all usefull information and finish.
+ println(cfg, "Got different values for keys [map=" + map + "]");
+
+ println(cfg, "Cache content:");
+
+ for (int k = 0; k < args.range(); k++) {
+ for (int i = 0; i < args.keysCount(); i++) {
+ String key = "key-" + k + "-" + i;
+
+ asyncCache.get(key);
+ Long val = asyncCache.<Long>future().get(timeout);
+
+ if (val != null)
+ println(cfg, "Entry [key=" + key + ", val=" + val + "]");
+ }
+ }
+
+ throw new IllegalStateException("Found different values for keys (see above information).");
+ }
+
+ final Long oldVal = map.get(keys[0]);
+
+ final Long newVal = oldVal == null ? 0 : oldVal + 1;
+
+ for (String key : keys) {
+ asyncCache.put(key, newVal);
+ asyncCache.future().get(timeout);
+ }
+
+ return true;
+ }
+ });
+ }
+
+ /**
+ * @param ignite Ignite instance.
+ * @param clo Closure.
+ * @return Result of closure execution.
+ * @throws Exception
+ */
+ public static <T> T doInTransaction(Ignite ignite, Callable<T> clo) throws Exception {
+ while (true) {
+ try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ T res = clo.call();
+
+ tx.commit();
+
+ return res;
+ }
+ catch (CacheException e) {
+ if (e.getCause() instanceof ClusterTopologyException) {
+ ClusterTopologyException topEx = (ClusterTopologyException)e.getCause();
+
+ topEx.retryReadyFuture().get();
+ }
+ else
+ throw e;
+ }
+ catch (ClusterTopologyException e) {
+ e.retryReadyFuture().get();
+ }
+ catch (TransactionRollbackException ignore) {
+ // Safe to retry right away.
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected String cacheName() {
+ return "tx-write-read";
+ }
+}