You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/11/05 13:17:55 UTC

[11/38] ignite git commit: ignite-1397: Load/consistency tests.

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b0a18dd/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalInvokeRetryBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalInvokeRetryBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalInvokeRetryBenchmark.java
new file mode 100644
index 0000000..f8a1689
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalInvokeRetryBenchmark.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.yardstickframework.BenchmarkConfiguration;
+
+import static org.yardstickframework.BenchmarkUtils.println;
+
+/**
+ * Invoke retry failover benchmark. <p> Each client maintains a local map that it updates together with cache. Client
+ * invokes an increment closure for all generated keys and atomically increments value for corresponding keys in the
+ * local map. To validate cache contents, all writes from the client are stopped, values in the local map are compared
+ * to the values in the cache.
+ */
+public class IgniteTransactionalInvokeRetryBenchmark extends IgniteFailoverAbstractBenchmark<String, Long> {
+    /** */
+    private final ConcurrentMap<String, AtomicLong> map = new ConcurrentHashMap<>();
+
+    /** */
+    private final ReadWriteLock rwl = new ReentrantReadWriteLock(true);
+
+    /** */
+    private volatile Exception ex;
+
+    /** {@inheritDoc} */
+    @Override public void setUp(final BenchmarkConfiguration cfg) throws Exception {
+        super.setUp(cfg);
+
+        Thread thread = new Thread(new Runnable() {
+            @Override public void run() {
+                try {
+                    final int timeout = args.cacheOperationTimeoutMillis();
+                    final int keysCnt = args.keysCount();
+
+                    while (!Thread.currentThread().isInterrupted()) {
+                        Thread.sleep(args.cacheConsistencyCheckingPeriod() * 1000);
+
+                        rwl.writeLock().lock();
+
+                        try {
+                            println("Start cache validation.");
+
+                            long startTime = U.currentTimeMillis();
+
+                            Map<String, Long> notEqualsCacheVals = new HashMap<>();
+                            Map<String, Long> notEqualsLocMapVals = new HashMap<>();
+
+                            for (int k = 0; k < args.range(); k++) {
+                                if (k % 10_000 == 0)
+                                    println("Start validation for keys like 'key-" + k + "-*'");
+
+                                for (int i = 0; i < keysCnt; i++) {
+                                    String key = "key-" + k + "-" + cfg.memberId() + "-" + i;
+
+                                    asyncCache.get(key);
+                                    Long cacheVal = asyncCache.<Long>future().get(timeout);
+
+                                    AtomicLong aVal = map.get(key);
+                                    Long mapVal = aVal != null ? aVal.get() : null;
+
+                                    if (!Objects.equals(cacheVal, mapVal)) {
+                                        notEqualsCacheVals.put(key, cacheVal);
+                                        notEqualsLocMapVals.put(key, mapVal);
+                                    }
+                                }
+                            }
+
+                            assert notEqualsCacheVals.size() == notEqualsLocMapVals.size() : "Invalid state " +
+                                "[cacheMapVals=" + notEqualsCacheVals + ", mapVals=" + notEqualsLocMapVals + "]";
+
+                            if (!notEqualsCacheVals.isEmpty()) {
+                                // Print all usefull information and finish.
+                                for (Map.Entry<String, Long> eLocMap : notEqualsLocMapVals.entrySet()) {
+                                    String key = eLocMap.getKey();
+                                    Long mapVal = eLocMap.getValue();
+                                    Long cacheVal = notEqualsCacheVals.get(key);
+
+                                    println(cfg, "Got different values [key='" + key
+                                        + "', cacheVal=" + cacheVal + ", localMapVal=" + mapVal + "]");
+                                }
+
+                                println(cfg, "Local driver map contant:\n " + map);
+
+                                println(cfg, "Cache content:");
+
+                                for (int k2 = 0; k2 < args.range(); k2++) {
+                                    for (int i2 = 0; i2 < keysCnt; i2++) {
+                                        String key2 = "key-" + k2 + "-" + cfg.memberId() + "-" + i2;
+
+                                        asyncCache.get(key2);
+                                        Long val = asyncCache.<Long>future().get(timeout);
+
+                                        if (val != null)
+                                            println(cfg, "Entry [key=" + key2 + ", val=" + val + "]");
+                                    }
+                                }
+
+                                throw new IllegalStateException("Cache and local map are in inconsistent state.");
+                            }
+
+                            println("Cache validation successfully finished in "
+                                + (U.currentTimeMillis() - startTime) / 1000 + " sec.");
+                        }
+                        finally {
+                            rwl.writeLock().unlock();
+                        }
+                    }
+                }
+                catch (Throwable e) {
+                    ex = new Exception(e);
+
+                    println("Got exception: " + e);
+
+                    e.printStackTrace();
+
+                    if (e instanceof Error)
+                        throw (Error)e;
+                }
+            }
+        }, "cache-" + cacheName() + "-validator");
+
+        thread.setDaemon(true);
+
+        thread.start();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+        final int k = nextRandom(args.range());
+
+        final String[] keys = new String[args.keysCount()];
+
+        assert keys.length > 0 : "Count of keys: " + keys.length;
+
+        for (int i = 0; i < keys.length; i++)
+            keys[i] = "key-" + k + "-" + cfg.memberId() + "-" + i;
+
+        for (String key : keys) {
+            rwl.readLock().lock();
+
+            try {
+                if (ex != null)
+                    throw ex;
+
+                asyncCache.invoke(key, new IncrementCacheEntryProcessor());
+                asyncCache.future().get(args.cacheOperationTimeoutMillis());
+
+                AtomicLong prevVal = map.putIfAbsent(key, new AtomicLong(0));
+
+                if (prevVal != null)
+                    prevVal.incrementAndGet();
+            }
+            finally {
+                rwl.readLock().unlock();
+            }
+        }
+
+        if (ex != null)
+            throw ex;
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected String cacheName() {
+        return "tx-invoke-retry";
+    }
+
+    /**
+     */
+    private static class IncrementCacheEntryProcessor implements CacheEntryProcessor<String, Long, Long> {
+        /** */
+        private static final long serialVersionUID = 0;
+
+        /** {@inheritDoc} */
+        @Override public Long process(MutableEntry<String, Long> entry,
+            Object... arguments) throws EntryProcessorException {
+            long newVal = entry.getValue() == null ? 0 : entry.getValue() + 1;
+
+            entry.setValue(newVal);
+
+            return newVal;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b0a18dd/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapInvokeRetryBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapInvokeRetryBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapInvokeRetryBenchmark.java
new file mode 100644
index 0000000..4cbcf67
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapInvokeRetryBenchmark.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+/**
+ * Invoke retry failover benchmark.
+ * <p>
+ * Each client maintains a local map that it updates together with cache.
+ * Client invokes an increment closure for all generated keys and atomically increments value for corresponding
+ * keys in the local map. To validate cache contents, all writes from the client are stopped, values in
+ * the local map are compared to the values in the cache.
+ */
+public class IgniteTransactionalOffHeapInvokeRetryBenchmark extends IgniteTransactionalInvokeRetryBenchmark {
+    /** {@inheritDoc} */
+    @Override protected String cacheName() {
+        return "tx-offheap-invoke-retry";
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b0a18dd/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteInvokeBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteInvokeBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteInvokeBenchmark.java
new file mode 100644
index 0000000..7fa2d1a
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteInvokeBenchmark.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+/**
+ * Transactional write invoke failover benchmark.
+ * <p>
+ * Each client generates a random integer K in a limited range and creates keys in the form 'key-' + K + 'master',
+ * 'key-' + K + '-1', 'key-' + K + '-2', ... Then client starts a pessimistic repeatable read transaction
+ * and randomly chooses between read and write scenarios:
+ * <ul>
+ * <li>Reads value associated with the master key and child keys. Values must be equal.</li>
+ * <li>Reads value associated with the master key, increments it by 1 and puts the value, then invokes increment
+ * closure on child keys. No validation is performed.</li>
+ * </ul>
+ */
+public class IgniteTransactionalOffHeapWriteInvokeBenchmark extends IgniteTransactionalWriteInvokeBenchmark {
+    /** {@inheritDoc} */
+    @Override protected String cacheName() {
+        return "tx-offheap-write-invoke";
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b0a18dd/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteReadBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteReadBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteReadBenchmark.java
new file mode 100644
index 0000000..bdecca7
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteReadBenchmark.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+/**
+ * Transactional write read failover benchmark.
+ * <p>
+ * Each client generates a random integer K in a limited range and creates keys in the form 'key-' + K + '-1',
+ * 'key-' + K + '-2', ... Then client starts a pessimistic repeatable read transaction, reads value associated with
+ * each key. Values must be equal. Client increments value by 1, commits the transaction.
+ */
+public class IgniteTransactionalOffHeapWriteReadBenchmark extends IgniteTransactionalWriteReadBenchmark {
+    /** {@inheritDoc} */
+    @Override protected String cacheName() {
+        return "tx-offheap-write-read";
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b0a18dd/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteInvokeBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteInvokeBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteInvokeBenchmark.java
new file mode 100644
index 0000000..1a8ee14
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteInvokeBenchmark.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import javax.cache.CacheException;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionRollbackException;
+
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.yardstickframework.BenchmarkUtils.println;
+
+/**
+ * Transactional write invoke failover benchmark.
+ * <p>
+ * Each client generates a random integer K in a limited range and creates keys in the form 'key-' + K + 'master',
+ * 'key-' + K + '-1', 'key-' + K + '-2', ... Then client starts a pessimistic repeatable read transaction
+ * and randomly chooses between read and write scenarios:
+ * <ul>
+ * <li>Reads value associated with the master key and child keys. Values must be equal.</li>
+ * <li>Reads value associated with the master key, increments it by 1 and puts the value, then invokes increment
+ * closure on child keys. No validation is performed.</li>
+ * </ul>
+ */
+public class IgniteTransactionalWriteInvokeBenchmark extends IgniteFailoverAbstractBenchmark<String, Long> {
+    /** {@inheritDoc} */
+    @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+        final int k = nextRandom(args.range());
+
+        assert args.keysCount() > 0 : "Count of keys: " + args.keysCount();
+
+        final String[] keys = new String[args.keysCount()];
+
+        final String masterKey = "key-" + k + "-master";
+
+        for (int i = 0; i < keys.length; i++)
+            keys[i] = "key-" + k + "-" + i;
+
+        final int scenario = nextRandom(2);
+
+        return doInTransaction(ignite(), new Callable<Boolean>() {
+            @Override public Boolean call() throws Exception {
+                final int timeout = args.cacheOperationTimeoutMillis();
+
+                switch (scenario) {
+                    case 0: // Read scenario.
+                        Map<String, Long> map = new HashMap<>();
+
+                        asyncCache.get(masterKey);
+                        Long cacheVal = asyncCache.<Long>future().get(timeout);
+
+                        map.put(masterKey, cacheVal);
+
+                        for (String key : keys) {
+                            asyncCache.get(key);
+                            cacheVal = asyncCache.<Long>future().get(timeout);
+
+                            map.put(key, cacheVal);
+                        }
+
+                        Set<Long> values = new HashSet<>(map.values());
+
+                        if (values.size() != 1) {
+                            // Print all usefull information and finish.
+                            println(cfg, "Got different values for keys [map=" + map + "]");
+
+                            println(cfg, "Cache content:");
+
+                            for (int k = 0; k < args.range(); k++) {
+                                for (int i = 0; i < args.keysCount(); i++) {
+                                    String key = "key-" + k + "-" + i;
+
+                                    asyncCache.get(key);
+                                    Long val = asyncCache.<Long>future().get(timeout);
+
+                                    if (val != null)
+                                        println(cfg, "Entry [key=" + key + ", val=" + val + "]");
+                                }
+                            }
+
+                            throw new IllegalStateException("Found different values for keys (see above information).");
+                        }
+
+                        break;
+                    case 1: // Invoke scenario.
+                        asyncCache.get(masterKey);
+                        Long val = asyncCache.<Long>future().get(timeout);
+
+                        asyncCache.put(masterKey, val == null ? 0 : val + 1);
+                        asyncCache.future().get(timeout);
+
+                        for (String key : keys) {
+                            asyncCache.invoke(key, new IncrementCacheEntryProcessor());
+                            asyncCache.future().get(timeout);
+                        }
+
+                        break;
+                }
+
+                return true;
+            }
+        });
+    }
+
+    /**
+     * @param ignite Ignite instance.
+     * @param clo Closure.
+     * @return Result of closure execution.
+     * @throws Exception
+     */
+    public static <T> T doInTransaction(Ignite ignite, Callable<T> clo) throws Exception {
+        while (true) {
+            try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                T res = clo.call();
+
+                tx.commit();
+
+                return res;
+            }
+            catch (CacheException e) {
+                if (e.getCause() instanceof ClusterTopologyException) {
+                    ClusterTopologyException topEx = (ClusterTopologyException)e.getCause();
+
+                    topEx.retryReadyFuture().get();
+                }
+                else
+                    throw e;
+            }
+            catch (ClusterTopologyException e) {
+                e.retryReadyFuture().get();
+            }
+            catch (TransactionRollbackException ignore) {
+                // Safe to retry right away.
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected String cacheName() {
+        return "tx-write-invoke";
+    }
+
+    /**
+     */
+    private static class IncrementCacheEntryProcessor implements CacheEntryProcessor<String, Long, Void> {
+        /** */
+        private static final long serialVersionUID = 0;
+
+        /** {@inheritDoc} */
+        @Override public Void process(MutableEntry<String, Long> entry,
+            Object... arguments) throws EntryProcessorException {
+            entry.setValue(entry.getValue() == null ? 0 : entry.getValue() + 1);
+
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5b0a18dd/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteReadBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteReadBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteReadBenchmark.java
new file mode 100644
index 0000000..c962749
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteReadBenchmark.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionRollbackException;
+
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.yardstickframework.BenchmarkUtils.println;
+
+/**
+ * Transactional write read failover benchmark.
+ * <p>
+ * Each client generates a random integer K in a limited range and creates keys in the form 'key-' + K + '-1',
+ * 'key-' + K + '-2', ... Then client starts a pessimistic repeatable read transaction, reads value associated with
+ * each key. Values must be equal. Client increments value by 1, commits the transaction.
+ */
+public class IgniteTransactionalWriteReadBenchmark extends IgniteFailoverAbstractBenchmark<String, Long> {
+    /** {@inheritDoc} */
+    @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+        final int k = nextRandom(args.range());
+
+        assert args.keysCount() > 0 : "Count of keys: " + args.keysCount();
+
+        final String[] keys = new String[args.keysCount()];
+
+        for (int i = 0; i < keys.length; i++)
+            keys[i] = "key-" + k + "-" + i;
+
+        return doInTransaction(ignite(), new Callable<Boolean>() {
+            @Override public Boolean call() throws Exception {
+                Map<String, Long> map = new HashMap<>();
+
+                final int timeout = args.cacheOperationTimeoutMillis();
+
+                for (String key : keys) {
+                    asyncCache.get(key);
+                    Long val = asyncCache.<Long>future().get(timeout);
+
+                    map.put(key, val);
+                }
+
+                Set<Long> values = new HashSet<>(map.values());
+
+                if (values.size() != 1) {
+                    // Print all usefull information and finish.
+                    println(cfg, "Got different values for keys [map=" + map + "]");
+
+                    println(cfg, "Cache content:");
+
+                    for (int k = 0; k < args.range(); k++) {
+                        for (int i = 0; i < args.keysCount(); i++) {
+                            String key = "key-" + k + "-" + i;
+
+                            asyncCache.get(key);
+                            Long val = asyncCache.<Long>future().get(timeout);
+
+                            if (val != null)
+                                println(cfg, "Entry [key=" + key + ", val=" + val + "]");
+                        }
+                    }
+
+                    throw new IllegalStateException("Found different values for keys (see above information).");
+                }
+
+                final Long oldVal = map.get(keys[0]);
+
+                final Long newVal = oldVal == null ? 0 : oldVal + 1;
+
+                for (String key : keys) {
+                    asyncCache.put(key, newVal);
+                    asyncCache.future().get(timeout);
+                }
+
+                return true;
+            }
+        });
+    }
+
+    /**
+     * @param ignite Ignite instance.
+     * @param clo Closure.
+     * @return Result of closure execution.
+     * @throws Exception
+     */
+    public static <T> T doInTransaction(Ignite ignite, Callable<T> clo) throws Exception {
+        while (true) {
+            try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                T res = clo.call();
+
+                tx.commit();
+
+                return res;
+            }
+            catch (CacheException e) {
+                if (e.getCause() instanceof ClusterTopologyException) {
+                    ClusterTopologyException topEx = (ClusterTopologyException)e.getCause();
+
+                    topEx.retryReadyFuture().get();
+                }
+                else
+                    throw e;
+            }
+            catch (ClusterTopologyException e) {
+                e.retryReadyFuture().get();
+            }
+            catch (TransactionRollbackException ignore) {
+                // Safe to retry right away.
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected String cacheName() {
+        return "tx-write-read";
+    }
+}