You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/11/10 09:40:54 UTC

[03/37] ignite git commit: ignite-1758 Fixed client reconnect issues

http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryPutBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryPutBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryPutBenchmark.java
index ea9531a..1c258a4 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryPutBenchmark.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryPutBenchmark.java
@@ -29,7 +29,7 @@ import org.yardstickframework.BenchmarkConfiguration;
 /**
  * Ignite benchmark that performs put and query operations.
  */
-public class IgniteSqlQueryPutBenchmark extends IgniteCacheAbstractBenchmark {
+public class IgniteSqlQueryPutBenchmark extends IgniteCacheAbstractBenchmark<Integer, Object> {
     /** {@inheritDoc} */
     @Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
         super.setUp(cfg);
@@ -81,4 +81,4 @@ public class IgniteSqlQueryPutBenchmark extends IgniteCacheAbstractBenchmark {
     @Override protected IgniteCache<Integer, Object> cache() {
         return ignite().cache("query");
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicInvokeRetryBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicInvokeRetryBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicInvokeRetryBenchmark.java
new file mode 100644
index 0000000..c0567ef
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicInvokeRetryBenchmark.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.yardstickframework.BenchmarkConfiguration;
+
+import static org.yardstickframework.BenchmarkUtils.println;
+
+/**
+ * Invoke retry failover benchmark. <p> Each client maintains a local map that it updates together with cache. Client
+ * invokes an increment closure for all generated keys and atomically increments value for corresponding keys in the
+ * local map. To validate cache contents, all writes from the client are stopped, values in the local map are compared
+ * to the values in the cache.
+ */
+public class IgniteAtomicInvokeRetryBenchmark extends IgniteFailoverAbstractBenchmark<String, Set> {
+    /** */
+    private final ConcurrentMap<String, AtomicLong> nextValMap = new ConcurrentHashMap<>();
+
+    /** */
+    private final ReadWriteLock rwl = new ReentrantReadWriteLock(true);
+
+    /** */
+    private volatile Exception ex;
+
+    /** {@inheritDoc} */
+    @Override public void setUp(final BenchmarkConfiguration cfg) throws Exception {
+        super.setUp(cfg);
+
+        Thread thread = new Thread(new Runnable() {
+            @Override public void run() {
+                try {
+                    final int timeout = args.cacheOperationTimeoutMillis();
+                    final int range = args.range();
+
+                    while (!Thread.currentThread().isInterrupted()) {
+                        Thread.sleep(args.cacheConsistencyCheckingPeriod() * 1000);
+
+                        rwl.writeLock().lock();
+
+                        try {
+                            println("Start cache validation.");
+
+                            long startTime = U.currentTimeMillis();
+
+                            Map<String, Set> badCacheEntries = new HashMap<>();
+
+                            for (Map.Entry<String, AtomicLong> e : nextValMap.entrySet()) {
+                                String key = e.getKey();
+
+                                asyncCache.get(key);
+                                Set set = asyncCache.<Set>future().get(timeout);
+
+                                if (set == null || e.getValue() == null || !Objects.equals(e.getValue().get(), (long)set.size()))
+                                    badCacheEntries.put(key, set);
+                            }
+
+                            if (!badCacheEntries.isEmpty()) {
+                                // Print all usefull information and finish.
+                                for (Map.Entry<String, Set> e : badCacheEntries.entrySet()) {
+                                    String key = e.getKey();
+
+                                    println("Got unexpected set size [key='" + key + "', expSize=" + nextValMap.get(key)
+                                        + ", cacheVal=" + e.getValue() + "]");
+                                }
+
+                                println("Next values map contant:");
+                                for (Map.Entry<String, AtomicLong> e : nextValMap.entrySet())
+                                    println("Map Entry [key=" + e.getKey() + ", val=" + e.getValue() + "]");
+
+                                println("Cache content:");
+
+                                for (int k2 = 0; k2 < range; k2++) {
+                                    String key2 = "key-" + k2;
+
+                                    asyncCache.get(key2);
+                                    Object val = asyncCache.future().get(timeout);
+
+                                    if (val != null)
+                                        println("Cache Entry [key=" + key2 + ", val=" + val + "]");
+
+                                }
+
+                                throw new IllegalStateException("Cache and local map are in inconsistent state " +
+                                    "[badKeys=" + badCacheEntries.keySet() + ']');
+                            }
+
+                            println("Clearing all data.");
+
+                            asyncCache.removeAll();
+                            asyncCache.future().get(timeout);
+
+                            nextValMap.clear();
+
+                            println("Cache validation successfully finished in "
+                                + (U.currentTimeMillis() - startTime) / 1000 + " sec.");
+                        }
+                        finally {
+                            rwl.writeLock().unlock();
+                        }
+                    }
+                }
+                catch (Throwable e) {
+                    ex = new Exception(e);
+
+                    println("Got exception: " + e);
+
+                    e.printStackTrace();
+
+                    if (e instanceof Error)
+                        throw (Error)e;
+                }
+            }
+        }, "cache-" + cacheName() + "-validator");
+
+        thread.setDaemon(true);
+
+        thread.start();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+        final int k = nextRandom(args.range());
+
+        String key = "key-" + k;
+
+        rwl.readLock().lock();
+
+        try {
+            if (ex != null)
+                throw ex;
+
+            AtomicLong nextAtomicVal = nextValMap.putIfAbsent(key, new AtomicLong(1));
+
+            Long nextVal = 1L;
+
+            if (nextAtomicVal != null)
+                nextVal = nextAtomicVal.incrementAndGet();
+
+            asyncCache.invoke(key, new AddInSetEntryProcessor(), nextVal);
+            asyncCache.future().get(args.cacheOperationTimeoutMillis());
+        }
+        finally {
+            rwl.readLock().unlock();
+        }
+
+        if (ex != null)
+            throw ex;
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected String cacheName() {
+        return "atomic-invoke-retry";
+    }
+
+    /**
+     */
+    private static class AddInSetEntryProcessor implements CacheEntryProcessor<String, Set, Object> {
+        /** */
+        private static final long serialVersionUID = 0;
+
+        /** {@inheritDoc} */
+        @Override public Object process(MutableEntry<String, Set> entry,
+            Object... arguments) throws EntryProcessorException {
+            assert !F.isEmpty(arguments);
+
+            Object val = arguments[0];
+
+            Set set;
+
+            if (!entry.exists())
+                set = new HashSet<>();
+            else
+                set = entry.getValue();
+
+            set.add(val);
+
+            entry.setValue(set);
+
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicOffHeapInvokeRetryBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicOffHeapInvokeRetryBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicOffHeapInvokeRetryBenchmark.java
new file mode 100644
index 0000000..c8b0b1d
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicOffHeapInvokeRetryBenchmark.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+/**
+ * Invoke retry failover benchmark. <p> Each client maintains a local map that it updates together with cache. Client
+ * invokes an increment closure for all generated keys and atomically increments value for corresponding keys in the
+ * local map. To validate cache contents, all writes from the client are stopped, values in the local map are compared
+ * to the values in the cache.
+ */
+public class IgniteAtomicOffHeapInvokeRetryBenchmark extends IgniteAtomicInvokeRetryBenchmark {
+    /** {@inheritDoc} */
+    @Override protected String cacheName() {
+        return "atomic-offheap-invoke-retry";
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicOffHeapRetriesBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicOffHeapRetriesBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicOffHeapRetriesBenchmark.java
new file mode 100644
index 0000000..ebb9eac
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicOffHeapRetriesBenchmark.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+/**
+ * Atomic retries failover benchmark.
+ * <p>
+ * Client generates continuous load to the cluster (random get, put, invoke, remove
+ * operations).
+ */
+public class IgniteAtomicOffHeapRetriesBenchmark extends IgniteAtomicRetriesBenchmark {
+    /** {@inheritDoc} */
+    @Override protected String cacheName() {
+        return "atomic-offheap-reties";
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicRetriesBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicRetriesBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicRetriesBenchmark.java
new file mode 100644
index 0000000..4e60698
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicRetriesBenchmark.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+import java.util.Map;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.cache.CacheEntryProcessor;
+
+/**
+ * Atomic retries failover benchmark.
+ * <p>
+ * Client generates continuous load to the cluster (random get, put, invoke, remove
+ * operations).
+ */
+public class IgniteAtomicRetriesBenchmark extends IgniteFailoverAbstractBenchmark<Integer, String> {
+    /** {@inheritDoc} */
+    @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+        final int key = nextRandom(args.range());
+
+        int opNum = nextRandom(4);
+
+        final int timeout = args.cacheOperationTimeoutMillis();
+
+        switch (opNum) {
+            case 0:
+                asyncCache.get(key);
+                asyncCache.future().get(timeout);
+
+                break;
+
+            case 1:
+                asyncCache.put(key, String.valueOf(key));
+                asyncCache.future().get(timeout);
+
+                break;
+
+            case 2:
+                asyncCache.invoke(key, new TestCacheEntryProcessor());
+                asyncCache.future().get(timeout);
+
+                break;
+
+            case 3:
+                asyncCache.remove(key);
+                asyncCache.future().get(timeout);
+
+                break;
+
+            default:
+                throw new IllegalStateException("Got invalid operation number: " + opNum);
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected String cacheName() {
+        return "atomic-reties";
+    }
+
+    /**
+     */
+    private static class TestCacheEntryProcessor implements CacheEntryProcessor<Integer, String, String> {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0;
+
+        /** {@inheritDoc} */
+        @Override public String process(MutableEntry<Integer, String> entry,
+            Object... arguments) throws EntryProcessorException {
+            return "key";
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverAbstractBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverAbstractBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverAbstractBenchmark.java
new file mode 100644
index 0000000..83fc58f
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverAbstractBenchmark.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.Cache;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.mxbean.IgniteMXBean;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.yardstick.cache.IgniteCacheAbstractBenchmark;
+import org.yardstickframework.BenchmarkConfiguration;
+import org.yardstickframework.BenchmarkUtils;
+import org.yardstickframework.BenchmarkUtils.ProcessExecutionResult;
+
+import static org.yardstickframework.BenchmarkUtils.println;
+
+/**
+ * Ignite benchmark that performs long running failover tasks.
+ */
+public abstract class IgniteFailoverAbstractBenchmark<K, V> extends IgniteCacheAbstractBenchmark<K, V> {
+    /** */
+    private static final AtomicBoolean restarterStarted = new AtomicBoolean();
+
+    /** Async Cache. */
+    protected IgniteCache<K, V> asyncCache;
+
+    /** */
+    private final AtomicBoolean firtsExProcessed = new AtomicBoolean();
+
+    /** {@inheritDoc} */
+    @Override public void setUp(final BenchmarkConfiguration cfg) throws Exception {
+        super.setUp(cfg);
+
+        asyncCache = cache.withAsync();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onWarmupFinished() {
+        if (cfg.memberId() == 0 && restarterStarted.compareAndSet(false, true)) {
+            Thread restarterThread = new Thread(new Runnable() {
+                @Override public void run() {
+                    try {
+                        println("Servers restarter started on driver: "
+                            + IgniteFailoverAbstractBenchmark.this.getClass().getSimpleName());
+
+                        Ignite ignite = ignite();
+
+                        // Read servers configs from cache to local map.
+                        IgniteCache<Integer, BenchmarkConfiguration> srvsCfgsCache = ignite.
+                            getOrCreateCache(new CacheConfiguration<Integer, BenchmarkConfiguration>().
+                                setName("serversConfigs"));
+
+                        final Map<Integer, BenchmarkConfiguration> srvsCfgs = new HashMap<>();
+
+                        for (Cache.Entry<Integer, BenchmarkConfiguration> e : srvsCfgsCache) {
+                            println("Read entry from 'serversConfigs' cache : " + e);
+
+                            srvsCfgs.put(e.getKey(), e.getValue());
+                        }
+
+                        assert ignite.cluster().forServers().nodes().size() == srvsCfgs.size();
+
+                        final int backupsCnt = args.backups();
+
+                        assert backupsCnt >= 1 : "Backups: " + backupsCnt;
+
+                        final boolean isDebug = ignite.log().isDebugEnabled();
+
+                        // Main logic.
+                        while (!Thread.currentThread().isInterrupted()) {
+                            Thread.sleep(args.restartDelay() * 1000);
+
+                            int numNodesToRestart = nextRandom(1, backupsCnt + 1);
+
+                            List<Integer> ids = new ArrayList<>();
+
+                            ids.addAll(srvsCfgs.keySet());
+
+                            Collections.shuffle(ids);
+
+                            println("Waiting for partitioned map exchage of all nodes");
+
+                            IgniteCompute asyncCompute = ignite.compute().withAsync();
+
+                            asyncCompute.broadcast(new AwaitPartitionMapExchangeTask());
+
+                            asyncCompute.future().get(args.cacheOperationTimeoutMillis());
+
+                            println("Start servers restarting [numNodesToRestart=" + numNodesToRestart
+                                + ", shuffledIds=" + ids + "]");
+
+                            for (int i = 0; i < numNodesToRestart; i++) {
+                                Integer id = ids.get(i);
+
+                                BenchmarkConfiguration bc = srvsCfgs.get(id);
+
+                                ProcessExecutionResult res = BenchmarkUtils.kill9Server(bc, isDebug);
+
+                                println("Server with id " + id + " has been killed."
+                                    + (isDebug ? " Process execution result:\n" + res : ""));
+                            }
+
+                            Thread.sleep(args.restartSleep() * 1000);
+
+                            for (int i = 0; i < numNodesToRestart; i++) {
+                                Integer id = ids.get(i);
+
+                                BenchmarkConfiguration bc = srvsCfgs.get(id);
+
+                                ProcessExecutionResult res = BenchmarkUtils.startServer(bc, isDebug);
+
+                                println("Server with id " + id + " has been started."
+                                    + (isDebug ? " Process execution result:\n" + res : ""));
+                            }
+                        }
+                    }
+                    catch (Throwable e) {
+                        println("Got exception: " + e);
+                        e.printStackTrace();
+
+                        U.dumpThreads(null);
+
+                        if (e instanceof Error)
+                            throw (Error)e;
+                    }
+                }
+            }, "servers-restarter");
+
+            restarterThread.setDaemon(true);
+            restarterThread.start();
+        }
+    }
+
+    /**
+     * Awaits for partitiona map exchage.
+     *
+     * @param ignite Ignite.
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("BusyWait")
+    protected static void awaitPartitionMapExchange(Ignite ignite) throws Exception {
+        IgniteLogger log = ignite.log();
+
+        log.info("Waiting for finishing of a partition exchange on node: " + ignite);
+
+        IgniteKernal kernal = (IgniteKernal)ignite;
+
+        while (true) {
+            boolean partitionsExchangeFinished = true;
+
+            for (IgniteInternalCache<?, ?> cache : kernal.cachesx(null)) {
+                log.info("Checking cache: " + cache.name());
+
+                GridCacheAdapter<?, ?> c = kernal.internalCache(cache.name());
+
+                if (!(c instanceof GridDhtCacheAdapter))
+                    break;
+
+                GridDhtCacheAdapter<?, ?> dht = (GridDhtCacheAdapter<?, ?>)c;
+
+                GridDhtPartitionFullMap partMap = dht.topology().partitionMap(true);
+
+                for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) {
+                    log.info("Checking node: " + e.getKey());
+
+                    for (Map.Entry<Integer, GridDhtPartitionState> e1 : e.getValue().entrySet()) {
+                        if (e1.getValue() != GridDhtPartitionState.OWNING) {
+                            log.info("Undesired state [id=" + e1.getKey() + ", state=" + e1.getValue() + ']');
+
+                            partitionsExchangeFinished = false;
+
+                            break;
+                        }
+                    }
+
+                    if (!partitionsExchangeFinished)
+                        break;
+                }
+
+                if (!partitionsExchangeFinished)
+                    break;
+            }
+
+            if (partitionsExchangeFinished)
+                return;
+
+            Thread.sleep(100);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onException(Throwable e) {
+        // Proceess only the first exception to prevent a multiple printing of a full thread dump.
+        if (firtsExProcessed.compareAndSet(false, true)) {
+            // Debug info on current client.
+            println("Full thread dump of the current node below.");
+
+            U.dumpThreads(null);
+
+            println("");
+
+            ((IgniteMXBean)ignite()).dumpDebugInfo();
+
+            // Debug info on servers.
+            Ignite ignite = ignite();
+
+            ClusterGroup srvs = ignite.cluster().forServers();
+
+            IgniteCompute asyncCompute = ignite.compute(srvs).withAsync();
+
+            asyncCompute.broadcast(new ThreadDumpPrinterTask(ignite.cluster().localNode().id(), e));
+            asyncCompute.future().get(10_000);
+        }
+    }
+
+    /**
+     * @return Cache name.
+     */
+    protected abstract String cacheName();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteCache<K, V> cache() {
+        return ignite().cache(cacheName());
+    }
+
+    /**
+     */
+    private static class ThreadDumpPrinterTask implements IgniteRunnable {
+        /** */
+        private static final long serialVersionUID = 0;
+
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** */
+        private final UUID id;
+
+        /** */
+        private final Throwable e;
+
+        /**
+         * @param id Benchmark node id.
+         * @param e Exception.
+         */
+        ThreadDumpPrinterTask(UUID id, Throwable e) {
+            this.id = id;
+            this.e = e;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            println("Driver finished with exception [driverNodeId=" + id + ", e=" + e + "]");
+            println("Full thread dump of the current server node below.");
+
+            U.dumpThreads(null);
+
+            println("");
+
+            ((IgniteMXBean)ignite).dumpDebugInfo();
+        }
+    }
+
+    /**
+     */
+    private static class AwaitPartitionMapExchangeTask implements IgniteRunnable {
+        /** */
+        private static final long serialVersionUID = 0;
+
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            try {
+                awaitPartitionMapExchange(ignite);
+            }
+            catch (Exception e) {
+                throw new IgniteException(e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverNode.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverNode.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverNode.java
new file mode 100644
index 0000000..29405de
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverNode.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.util.List;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.yardstick.IgniteNode;
+import org.yardstickframework.BenchmarkConfiguration;
+
+import static org.yardstickframework.BenchmarkUtils.println;
+
+/**
+ * Ignite failover node.
+ */
+public class IgniteFailoverNode extends IgniteNode {
+    /** {@inheritDoc} */
+    @Override public void start(BenchmarkConfiguration cfg) throws Exception {
+        super.start(cfg);
+
+        // Put server configuration at special cache.
+        RuntimeMXBean mxBean = ManagementFactory.getRuntimeMXBean();
+
+        List<String> jvmOpts = mxBean.getInputArguments();
+
+        StringBuilder jvmOptsStr = new StringBuilder();
+
+        for (String opt : jvmOpts)
+            jvmOptsStr.append(opt).append(' ');
+
+        cfg.customProperties().put("JVM_OPTS", jvmOptsStr.toString());
+        cfg.customProperties().put("PROPS_ENV", System.getenv("PROPS_ENV"));
+        cfg.customProperties().put("CLASSPATH", mxBean.getClassPath());
+        cfg.customProperties().put("JAVA", System.getenv("JAVA"));
+
+        IgniteCache<Integer, BenchmarkConfiguration> srvsCfgsCache = ignite().
+            getOrCreateCache(new CacheConfiguration<Integer, BenchmarkConfiguration>().setName("serversConfigs"));
+
+        srvsCfgsCache.put(cfg.memberId(), cfg);
+
+        println("Put at cache [" + cfg.memberId() + "=" + cfg + "]");
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalInvokeRetryBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalInvokeRetryBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalInvokeRetryBenchmark.java
new file mode 100644
index 0000000..f8a1689
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalInvokeRetryBenchmark.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.yardstickframework.BenchmarkConfiguration;
+
+import static org.yardstickframework.BenchmarkUtils.println;
+
+/**
+ * Invoke retry failover benchmark. <p> Each client maintains a local map that it updates together with cache. Client
+ * invokes an increment closure for all generated keys and atomically increments value for corresponding keys in the
+ * local map. To validate cache contents, all writes from the client are stopped, values in the local map are compared
+ * to the values in the cache.
+ */
+public class IgniteTransactionalInvokeRetryBenchmark extends IgniteFailoverAbstractBenchmark<String, Long> {
+    /** */
+    private final ConcurrentMap<String, AtomicLong> map = new ConcurrentHashMap<>();
+
+    /** */
+    private final ReadWriteLock rwl = new ReentrantReadWriteLock(true);
+
+    /** */
+    private volatile Exception ex;
+
+    /** {@inheritDoc} */
+    @Override public void setUp(final BenchmarkConfiguration cfg) throws Exception {
+        super.setUp(cfg);
+
+        Thread thread = new Thread(new Runnable() {
+            @Override public void run() {
+                try {
+                    final int timeout = args.cacheOperationTimeoutMillis();
+                    final int keysCnt = args.keysCount();
+
+                    while (!Thread.currentThread().isInterrupted()) {
+                        Thread.sleep(args.cacheConsistencyCheckingPeriod() * 1000);
+
+                        rwl.writeLock().lock();
+
+                        try {
+                            println("Start cache validation.");
+
+                            long startTime = U.currentTimeMillis();
+
+                            Map<String, Long> notEqualsCacheVals = new HashMap<>();
+                            Map<String, Long> notEqualsLocMapVals = new HashMap<>();
+
+                            for (int k = 0; k < args.range(); k++) {
+                                if (k % 10_000 == 0)
+                                    println("Start validation for keys like 'key-" + k + "-*'");
+
+                                for (int i = 0; i < keysCnt; i++) {
+                                    String key = "key-" + k + "-" + cfg.memberId() + "-" + i;
+
+                                    asyncCache.get(key);
+                                    Long cacheVal = asyncCache.<Long>future().get(timeout);
+
+                                    AtomicLong aVal = map.get(key);
+                                    Long mapVal = aVal != null ? aVal.get() : null;
+
+                                    if (!Objects.equals(cacheVal, mapVal)) {
+                                        notEqualsCacheVals.put(key, cacheVal);
+                                        notEqualsLocMapVals.put(key, mapVal);
+                                    }
+                                }
+                            }
+
+                            assert notEqualsCacheVals.size() == notEqualsLocMapVals.size() : "Invalid state " +
+                                "[cacheMapVals=" + notEqualsCacheVals + ", mapVals=" + notEqualsLocMapVals + "]";
+
+                            if (!notEqualsCacheVals.isEmpty()) {
+                                // Print all usefull information and finish.
+                                for (Map.Entry<String, Long> eLocMap : notEqualsLocMapVals.entrySet()) {
+                                    String key = eLocMap.getKey();
+                                    Long mapVal = eLocMap.getValue();
+                                    Long cacheVal = notEqualsCacheVals.get(key);
+
+                                    println(cfg, "Got different values [key='" + key
+                                        + "', cacheVal=" + cacheVal + ", localMapVal=" + mapVal + "]");
+                                }
+
+                                println(cfg, "Local driver map contant:\n " + map);
+
+                                println(cfg, "Cache content:");
+
+                                for (int k2 = 0; k2 < args.range(); k2++) {
+                                    for (int i2 = 0; i2 < keysCnt; i2++) {
+                                        String key2 = "key-" + k2 + "-" + cfg.memberId() + "-" + i2;
+
+                                        asyncCache.get(key2);
+                                        Long val = asyncCache.<Long>future().get(timeout);
+
+                                        if (val != null)
+                                            println(cfg, "Entry [key=" + key2 + ", val=" + val + "]");
+                                    }
+                                }
+
+                                throw new IllegalStateException("Cache and local map are in inconsistent state.");
+                            }
+
+                            println("Cache validation successfully finished in "
+                                + (U.currentTimeMillis() - startTime) / 1000 + " sec.");
+                        }
+                        finally {
+                            rwl.writeLock().unlock();
+                        }
+                    }
+                }
+                catch (Throwable e) {
+                    ex = new Exception(e);
+
+                    println("Got exception: " + e);
+
+                    e.printStackTrace();
+
+                    if (e instanceof Error)
+                        throw (Error)e;
+                }
+            }
+        }, "cache-" + cacheName() + "-validator");
+
+        thread.setDaemon(true);
+
+        thread.start();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+        final int k = nextRandom(args.range());
+
+        final String[] keys = new String[args.keysCount()];
+
+        assert keys.length > 0 : "Count of keys: " + keys.length;
+
+        for (int i = 0; i < keys.length; i++)
+            keys[i] = "key-" + k + "-" + cfg.memberId() + "-" + i;
+
+        for (String key : keys) {
+            rwl.readLock().lock();
+
+            try {
+                if (ex != null)
+                    throw ex;
+
+                asyncCache.invoke(key, new IncrementCacheEntryProcessor());
+                asyncCache.future().get(args.cacheOperationTimeoutMillis());
+
+                AtomicLong prevVal = map.putIfAbsent(key, new AtomicLong(0));
+
+                if (prevVal != null)
+                    prevVal.incrementAndGet();
+            }
+            finally {
+                rwl.readLock().unlock();
+            }
+        }
+
+        if (ex != null)
+            throw ex;
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected String cacheName() {
+        return "tx-invoke-retry";
+    }
+
+    /**
+     */
+    private static class IncrementCacheEntryProcessor implements CacheEntryProcessor<String, Long, Long> {
+        /** */
+        private static final long serialVersionUID = 0;
+
+        /** {@inheritDoc} */
+        @Override public Long process(MutableEntry<String, Long> entry,
+            Object... arguments) throws EntryProcessorException {
+            long newVal = entry.getValue() == null ? 0 : entry.getValue() + 1;
+
+            entry.setValue(newVal);
+
+            return newVal;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapInvokeRetryBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapInvokeRetryBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapInvokeRetryBenchmark.java
new file mode 100644
index 0000000..4cbcf67
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapInvokeRetryBenchmark.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+/**
+ * Invoke retry failover benchmark.
+ * <p>
+ * Each client maintains a local map that it updates together with cache.
+ * Client invokes an increment closure for all generated keys and atomically increments value for corresponding
+ * keys in the local map. To validate cache contents, all writes from the client are stopped, values in
+ * the local map are compared to the values in the cache.
+ */
+public class IgniteTransactionalOffHeapInvokeRetryBenchmark extends IgniteTransactionalInvokeRetryBenchmark {
+    /** {@inheritDoc} */
+    @Override protected String cacheName() {
+        return "tx-offheap-invoke-retry";
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteInvokeBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteInvokeBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteInvokeBenchmark.java
new file mode 100644
index 0000000..7fa2d1a
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteInvokeBenchmark.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+/**
+ * Transactional write invoke failover benchmark.
+ * <p>
+ * Each client generates a random integer K in a limited range and creates keys in the form 'key-' + K + 'master',
+ * 'key-' + K + '-1', 'key-' + K + '-2', ... Then client starts a pessimistic repeatable read transaction
+ * and randomly chooses between read and write scenarios:
+ * <ul>
+ * <li>Reads value associated with the master key and child keys. Values must be equal.</li>
+ * <li>Reads value associated with the master key, increments it by 1 and puts the value, then invokes increment
+ * closure on child keys. No validation is performed.</li>
+ * </ul>
+ */
+public class IgniteTransactionalOffHeapWriteInvokeBenchmark extends IgniteTransactionalWriteInvokeBenchmark {
+    /** {@inheritDoc} */
+    @Override protected String cacheName() {
+        return "tx-offheap-write-invoke";
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteReadBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteReadBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteReadBenchmark.java
new file mode 100644
index 0000000..bdecca7
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteReadBenchmark.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+/**
+ * Transactional write read failover benchmark.
+ * <p>
+ * Each client generates a random integer K in a limited range and creates keys in the form 'key-' + K + '-1',
+ * 'key-' + K + '-2', ... Then client starts a pessimistic repeatable read transaction, reads value associated with
+ * each key. Values must be equal. Client increments value by 1, commits the transaction.
+ */
+public class IgniteTransactionalOffHeapWriteReadBenchmark extends IgniteTransactionalWriteReadBenchmark {
+    /** {@inheritDoc} */
+    @Override protected String cacheName() {
+        return "tx-offheap-write-read";
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteInvokeBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteInvokeBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteInvokeBenchmark.java
new file mode 100644
index 0000000..1a8ee14
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteInvokeBenchmark.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import javax.cache.CacheException;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionRollbackException;
+
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.yardstickframework.BenchmarkUtils.println;
+
+/**
+ * Transactional write invoke failover benchmark.
+ * <p>
+ * Each client generates a random integer K in a limited range and creates keys in the form 'key-' + K + 'master',
+ * 'key-' + K + '-1', 'key-' + K + '-2', ... Then client starts a pessimistic repeatable read transaction
+ * and randomly chooses between read and write scenarios:
+ * <ul>
+ * <li>Reads value associated with the master key and child keys. Values must be equal.</li>
+ * <li>Reads value associated with the master key, increments it by 1 and puts the value, then invokes increment
+ * closure on child keys. No validation is performed.</li>
+ * </ul>
+ */
+public class IgniteTransactionalWriteInvokeBenchmark extends IgniteFailoverAbstractBenchmark<String, Long> {
+    /** {@inheritDoc} */
+    @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+        final int k = nextRandom(args.range());
+
+        assert args.keysCount() > 0 : "Count of keys: " + args.keysCount();
+
+        final String[] keys = new String[args.keysCount()];
+
+        final String masterKey = "key-" + k + "-master";
+
+        for (int i = 0; i < keys.length; i++)
+            keys[i] = "key-" + k + "-" + i;
+
+        final int scenario = nextRandom(2);
+
+        return doInTransaction(ignite(), new Callable<Boolean>() {
+            @Override public Boolean call() throws Exception {
+                final int timeout = args.cacheOperationTimeoutMillis();
+
+                switch (scenario) {
+                    case 0: // Read scenario.
+                        Map<String, Long> map = new HashMap<>();
+
+                        asyncCache.get(masterKey);
+                        Long cacheVal = asyncCache.<Long>future().get(timeout);
+
+                        map.put(masterKey, cacheVal);
+
+                        for (String key : keys) {
+                            asyncCache.get(key);
+                            cacheVal = asyncCache.<Long>future().get(timeout);
+
+                            map.put(key, cacheVal);
+                        }
+
+                        Set<Long> values = new HashSet<>(map.values());
+
+                        if (values.size() != 1) {
+                            // Print all usefull information and finish.
+                            println(cfg, "Got different values for keys [map=" + map + "]");
+
+                            println(cfg, "Cache content:");
+
+                            for (int k = 0; k < args.range(); k++) {
+                                for (int i = 0; i < args.keysCount(); i++) {
+                                    String key = "key-" + k + "-" + i;
+
+                                    asyncCache.get(key);
+                                    Long val = asyncCache.<Long>future().get(timeout);
+
+                                    if (val != null)
+                                        println(cfg, "Entry [key=" + key + ", val=" + val + "]");
+                                }
+                            }
+
+                            throw new IllegalStateException("Found different values for keys (see above information).");
+                        }
+
+                        break;
+                    case 1: // Invoke scenario.
+                        asyncCache.get(masterKey);
+                        Long val = asyncCache.<Long>future().get(timeout);
+
+                        asyncCache.put(masterKey, val == null ? 0 : val + 1);
+                        asyncCache.future().get(timeout);
+
+                        for (String key : keys) {
+                            asyncCache.invoke(key, new IncrementCacheEntryProcessor());
+                            asyncCache.future().get(timeout);
+                        }
+
+                        break;
+                }
+
+                return true;
+            }
+        });
+    }
+
+    /**
+     * @param ignite Ignite instance.
+     * @param clo Closure.
+     * @return Result of closure execution.
+     * @throws Exception
+     */
+    public static <T> T doInTransaction(Ignite ignite, Callable<T> clo) throws Exception {
+        while (true) {
+            try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                T res = clo.call();
+
+                tx.commit();
+
+                return res;
+            }
+            catch (CacheException e) {
+                if (e.getCause() instanceof ClusterTopologyException) {
+                    ClusterTopologyException topEx = (ClusterTopologyException)e.getCause();
+
+                    topEx.retryReadyFuture().get();
+                }
+                else
+                    throw e;
+            }
+            catch (ClusterTopologyException e) {
+                e.retryReadyFuture().get();
+            }
+            catch (TransactionRollbackException ignore) {
+                // Safe to retry right away.
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected String cacheName() {
+        return "tx-write-invoke";
+    }
+
+    /**
+     */
+    private static class IncrementCacheEntryProcessor implements CacheEntryProcessor<String, Long, Void> {
+        /** */
+        private static final long serialVersionUID = 0;
+
+        /** {@inheritDoc} */
+        @Override public Void process(MutableEntry<String, Long> entry,
+            Object... arguments) throws EntryProcessorException {
+            entry.setValue(entry.getValue() == null ? 0 : entry.getValue() + 1);
+
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteReadBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteReadBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteReadBenchmark.java
new file mode 100644
index 0000000..c962749
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteReadBenchmark.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionRollbackException;
+
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.yardstickframework.BenchmarkUtils.println;
+
+/**
+ * Transactional write read failover benchmark.
+ * <p>
+ * Each client generates a random integer K in a limited range and creates keys in the form 'key-' + K + '-1',
+ * 'key-' + K + '-2', ... Then client starts a pessimistic repeatable read transaction, reads value associated with
+ * each key. Values must be equal. Client increments value by 1, commits the transaction.
+ */
+public class IgniteTransactionalWriteReadBenchmark extends IgniteFailoverAbstractBenchmark<String, Long> {
+    /** {@inheritDoc} */
+    @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+        final int k = nextRandom(args.range());
+
+        assert args.keysCount() > 0 : "Count of keys: " + args.keysCount();
+
+        final String[] keys = new String[args.keysCount()];
+
+        for (int i = 0; i < keys.length; i++)
+            keys[i] = "key-" + k + "-" + i;
+
+        return doInTransaction(ignite(), new Callable<Boolean>() {
+            @Override public Boolean call() throws Exception {
+                Map<String, Long> map = new HashMap<>();
+
+                final int timeout = args.cacheOperationTimeoutMillis();
+
+                for (String key : keys) {
+                    asyncCache.get(key);
+                    Long val = asyncCache.<Long>future().get(timeout);
+
+                    map.put(key, val);
+                }
+
+                Set<Long> values = new HashSet<>(map.values());
+
+                if (values.size() != 1) {
+                    // Print all usefull information and finish.
+                    println(cfg, "Got different values for keys [map=" + map + "]");
+
+                    println(cfg, "Cache content:");
+
+                    for (int k = 0; k < args.range(); k++) {
+                        for (int i = 0; i < args.keysCount(); i++) {
+                            String key = "key-" + k + "-" + i;
+
+                            asyncCache.get(key);
+                            Long val = asyncCache.<Long>future().get(timeout);
+
+                            if (val != null)
+                                println(cfg, "Entry [key=" + key + ", val=" + val + "]");
+                        }
+                    }
+
+                    throw new IllegalStateException("Found different values for keys (see above information).");
+                }
+
+                final Long oldVal = map.get(keys[0]);
+
+                final Long newVal = oldVal == null ? 0 : oldVal + 1;
+
+                for (String key : keys) {
+                    asyncCache.put(key, newVal);
+                    asyncCache.future().get(timeout);
+                }
+
+                return true;
+            }
+        });
+    }
+
+    /**
+     * @param ignite Ignite instance.
+     * @param clo Closure.
+     * @return Result of closure execution.
+     * @throws Exception
+     */
+    public static <T> T doInTransaction(Ignite ignite, Callable<T> clo) throws Exception {
+        while (true) {
+            try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                T res = clo.call();
+
+                tx.commit();
+
+                return res;
+            }
+            catch (CacheException e) {
+                if (e.getCause() instanceof ClusterTopologyException) {
+                    ClusterTopologyException topEx = (ClusterTopologyException)e.getCause();
+
+                    topEx.retryReadyFuture().get();
+                }
+                else
+                    throw e;
+            }
+            catch (ClusterTopologyException e) {
+                e.retryReadyFuture().get();
+            }
+            catch (TransactionRollbackException ignore) {
+                // Safe to retry right away.
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected String cacheName() {
+        return "tx-write-read";
+    }
+}