You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/11/10 09:40:54 UTC
[03/37] ignite git commit: ignite-1758 Fixed client reconnect issues
http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryPutBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryPutBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryPutBenchmark.java
index ea9531a..1c258a4 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryPutBenchmark.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryPutBenchmark.java
@@ -29,7 +29,7 @@ import org.yardstickframework.BenchmarkConfiguration;
/**
* Ignite benchmark that performs put and query operations.
*/
-public class IgniteSqlQueryPutBenchmark extends IgniteCacheAbstractBenchmark {
+public class IgniteSqlQueryPutBenchmark extends IgniteCacheAbstractBenchmark<Integer, Object> {
/** {@inheritDoc} */
@Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
super.setUp(cfg);
@@ -81,4 +81,4 @@ public class IgniteSqlQueryPutBenchmark extends IgniteCacheAbstractBenchmark {
@Override protected IgniteCache<Integer, Object> cache() {
return ignite().cache("query");
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicInvokeRetryBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicInvokeRetryBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicInvokeRetryBenchmark.java
new file mode 100644
index 0000000..c0567ef
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicInvokeRetryBenchmark.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.yardstickframework.BenchmarkConfiguration;
+
+import static org.yardstickframework.BenchmarkUtils.println;
+
+/**
+ * Invoke retry failover benchmark. <p> Each client maintains a local map that it updates together with cache. Client
+ * invokes an increment closure for all generated keys and atomically increments value for corresponding keys in the
+ * local map. To validate cache contents, all writes from the client are stopped, values in the local map are compared
+ * to the values in the cache.
+ */
+public class IgniteAtomicInvokeRetryBenchmark extends IgniteFailoverAbstractBenchmark<String, Set> {
+ /** */
+ private final ConcurrentMap<String, AtomicLong> nextValMap = new ConcurrentHashMap<>();
+
+ /** */
+ private final ReadWriteLock rwl = new ReentrantReadWriteLock(true);
+
+ /** */
+ private volatile Exception ex;
+
+ /** {@inheritDoc} */
+ @Override public void setUp(final BenchmarkConfiguration cfg) throws Exception {
+ super.setUp(cfg);
+
+ Thread thread = new Thread(new Runnable() {
+ @Override public void run() {
+ try {
+ final int timeout = args.cacheOperationTimeoutMillis();
+ final int range = args.range();
+
+ while (!Thread.currentThread().isInterrupted()) {
+ Thread.sleep(args.cacheConsistencyCheckingPeriod() * 1000);
+
+ rwl.writeLock().lock();
+
+ try {
+ println("Start cache validation.");
+
+ long startTime = U.currentTimeMillis();
+
+ Map<String, Set> badCacheEntries = new HashMap<>();
+
+ for (Map.Entry<String, AtomicLong> e : nextValMap.entrySet()) {
+ String key = e.getKey();
+
+ asyncCache.get(key);
+ Set set = asyncCache.<Set>future().get(timeout);
+
+ if (set == null || e.getValue() == null || !Objects.equals(e.getValue().get(), (long)set.size()))
+ badCacheEntries.put(key, set);
+ }
+
+ if (!badCacheEntries.isEmpty()) {
+ // Print all usefull information and finish.
+ for (Map.Entry<String, Set> e : badCacheEntries.entrySet()) {
+ String key = e.getKey();
+
+ println("Got unexpected set size [key='" + key + "', expSize=" + nextValMap.get(key)
+ + ", cacheVal=" + e.getValue() + "]");
+ }
+
+ println("Next values map contant:");
+ for (Map.Entry<String, AtomicLong> e : nextValMap.entrySet())
+ println("Map Entry [key=" + e.getKey() + ", val=" + e.getValue() + "]");
+
+ println("Cache content:");
+
+ for (int k2 = 0; k2 < range; k2++) {
+ String key2 = "key-" + k2;
+
+ asyncCache.get(key2);
+ Object val = asyncCache.future().get(timeout);
+
+ if (val != null)
+ println("Cache Entry [key=" + key2 + ", val=" + val + "]");
+
+ }
+
+ throw new IllegalStateException("Cache and local map are in inconsistent state " +
+ "[badKeys=" + badCacheEntries.keySet() + ']');
+ }
+
+ println("Clearing all data.");
+
+ asyncCache.removeAll();
+ asyncCache.future().get(timeout);
+
+ nextValMap.clear();
+
+ println("Cache validation successfully finished in "
+ + (U.currentTimeMillis() - startTime) / 1000 + " sec.");
+ }
+ finally {
+ rwl.writeLock().unlock();
+ }
+ }
+ }
+ catch (Throwable e) {
+ ex = new Exception(e);
+
+ println("Got exception: " + e);
+
+ e.printStackTrace();
+
+ if (e instanceof Error)
+ throw (Error)e;
+ }
+ }
+ }, "cache-" + cacheName() + "-validator");
+
+ thread.setDaemon(true);
+
+ thread.start();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+ final int k = nextRandom(args.range());
+
+ String key = "key-" + k;
+
+ rwl.readLock().lock();
+
+ try {
+ if (ex != null)
+ throw ex;
+
+ AtomicLong nextAtomicVal = nextValMap.putIfAbsent(key, new AtomicLong(1));
+
+ Long nextVal = 1L;
+
+ if (nextAtomicVal != null)
+ nextVal = nextAtomicVal.incrementAndGet();
+
+ asyncCache.invoke(key, new AddInSetEntryProcessor(), nextVal);
+ asyncCache.future().get(args.cacheOperationTimeoutMillis());
+ }
+ finally {
+ rwl.readLock().unlock();
+ }
+
+ if (ex != null)
+ throw ex;
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected String cacheName() {
+ return "atomic-invoke-retry";
+ }
+
+ /**
+ */
+ private static class AddInSetEntryProcessor implements CacheEntryProcessor<String, Set, Object> {
+ /** */
+ private static final long serialVersionUID = 0;
+
+ /** {@inheritDoc} */
+ @Override public Object process(MutableEntry<String, Set> entry,
+ Object... arguments) throws EntryProcessorException {
+ assert !F.isEmpty(arguments);
+
+ Object val = arguments[0];
+
+ Set set;
+
+ if (!entry.exists())
+ set = new HashSet<>();
+ else
+ set = entry.getValue();
+
+ set.add(val);
+
+ entry.setValue(set);
+
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicOffHeapInvokeRetryBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicOffHeapInvokeRetryBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicOffHeapInvokeRetryBenchmark.java
new file mode 100644
index 0000000..c8b0b1d
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicOffHeapInvokeRetryBenchmark.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+/**
+ * Invoke retry failover benchmark. <p> Each client maintains a local map that it updates together with cache. Client
+ * invokes an increment closure for all generated keys and atomically increments value for corresponding keys in the
+ * local map. To validate cache contents, all writes from the client are stopped, values in the local map are compared
+ * to the values in the cache.
+ */
+public class IgniteAtomicOffHeapInvokeRetryBenchmark extends IgniteAtomicInvokeRetryBenchmark {
+ /** {@inheritDoc} */
+ @Override protected String cacheName() {
+ return "atomic-offheap-invoke-retry";
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicOffHeapRetriesBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicOffHeapRetriesBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicOffHeapRetriesBenchmark.java
new file mode 100644
index 0000000..ebb9eac
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicOffHeapRetriesBenchmark.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+/**
+ * Atomic retries failover benchmark.
+ * <p>
+ * Client generates continuous load to the cluster (random get, put, invoke, remove
+ * operations).
+ */
+public class IgniteAtomicOffHeapRetriesBenchmark extends IgniteAtomicRetriesBenchmark {
+ /** {@inheritDoc} */
+ @Override protected String cacheName() {
+ return "atomic-offheap-reties";
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicRetriesBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicRetriesBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicRetriesBenchmark.java
new file mode 100644
index 0000000..4e60698
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteAtomicRetriesBenchmark.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+import java.util.Map;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.cache.CacheEntryProcessor;
+
+/**
+ * Atomic retries failover benchmark.
+ * <p>
+ * Client generates continuous load to the cluster (random get, put, invoke, remove
+ * operations).
+ */
+public class IgniteAtomicRetriesBenchmark extends IgniteFailoverAbstractBenchmark<Integer, String> {
+ /** {@inheritDoc} */
+ @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+ final int key = nextRandom(args.range());
+
+ int opNum = nextRandom(4);
+
+ final int timeout = args.cacheOperationTimeoutMillis();
+
+ switch (opNum) {
+ case 0:
+ asyncCache.get(key);
+ asyncCache.future().get(timeout);
+
+ break;
+
+ case 1:
+ asyncCache.put(key, String.valueOf(key));
+ asyncCache.future().get(timeout);
+
+ break;
+
+ case 2:
+ asyncCache.invoke(key, new TestCacheEntryProcessor());
+ asyncCache.future().get(timeout);
+
+ break;
+
+ case 3:
+ asyncCache.remove(key);
+ asyncCache.future().get(timeout);
+
+ break;
+
+ default:
+ throw new IllegalStateException("Got invalid operation number: " + opNum);
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected String cacheName() {
+ return "atomic-reties";
+ }
+
+ /**
+ */
+ private static class TestCacheEntryProcessor implements CacheEntryProcessor<Integer, String, String> {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0;
+
+ /** {@inheritDoc} */
+ @Override public String process(MutableEntry<Integer, String> entry,
+ Object... arguments) throws EntryProcessorException {
+ return "key";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverAbstractBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverAbstractBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverAbstractBenchmark.java
new file mode 100644
index 0000000..83fc58f
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverAbstractBenchmark.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.Cache;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.mxbean.IgniteMXBean;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.yardstick.cache.IgniteCacheAbstractBenchmark;
+import org.yardstickframework.BenchmarkConfiguration;
+import org.yardstickframework.BenchmarkUtils;
+import org.yardstickframework.BenchmarkUtils.ProcessExecutionResult;
+
+import static org.yardstickframework.BenchmarkUtils.println;
+
+/**
+ * Ignite benchmark that performs long running failover tasks.
+ */
+public abstract class IgniteFailoverAbstractBenchmark<K, V> extends IgniteCacheAbstractBenchmark<K, V> {
+ /** */
+ private static final AtomicBoolean restarterStarted = new AtomicBoolean();
+
+ /** Async Cache. */
+ protected IgniteCache<K, V> asyncCache;
+
+ /** */
+ private final AtomicBoolean firtsExProcessed = new AtomicBoolean();
+
+ /** {@inheritDoc} */
+ @Override public void setUp(final BenchmarkConfiguration cfg) throws Exception {
+ super.setUp(cfg);
+
+ asyncCache = cache.withAsync();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onWarmupFinished() {
+ if (cfg.memberId() == 0 && restarterStarted.compareAndSet(false, true)) {
+ Thread restarterThread = new Thread(new Runnable() {
+ @Override public void run() {
+ try {
+ println("Servers restarter started on driver: "
+ + IgniteFailoverAbstractBenchmark.this.getClass().getSimpleName());
+
+ Ignite ignite = ignite();
+
+ // Read servers configs from cache to local map.
+ IgniteCache<Integer, BenchmarkConfiguration> srvsCfgsCache = ignite.
+ getOrCreateCache(new CacheConfiguration<Integer, BenchmarkConfiguration>().
+ setName("serversConfigs"));
+
+ final Map<Integer, BenchmarkConfiguration> srvsCfgs = new HashMap<>();
+
+ for (Cache.Entry<Integer, BenchmarkConfiguration> e : srvsCfgsCache) {
+ println("Read entry from 'serversConfigs' cache : " + e);
+
+ srvsCfgs.put(e.getKey(), e.getValue());
+ }
+
+ assert ignite.cluster().forServers().nodes().size() == srvsCfgs.size();
+
+ final int backupsCnt = args.backups();
+
+ assert backupsCnt >= 1 : "Backups: " + backupsCnt;
+
+ final boolean isDebug = ignite.log().isDebugEnabled();
+
+ // Main logic.
+ while (!Thread.currentThread().isInterrupted()) {
+ Thread.sleep(args.restartDelay() * 1000);
+
+ int numNodesToRestart = nextRandom(1, backupsCnt + 1);
+
+ List<Integer> ids = new ArrayList<>();
+
+ ids.addAll(srvsCfgs.keySet());
+
+ Collections.shuffle(ids);
+
+ println("Waiting for partitioned map exchage of all nodes");
+
+ IgniteCompute asyncCompute = ignite.compute().withAsync();
+
+ asyncCompute.broadcast(new AwaitPartitionMapExchangeTask());
+
+ asyncCompute.future().get(args.cacheOperationTimeoutMillis());
+
+ println("Start servers restarting [numNodesToRestart=" + numNodesToRestart
+ + ", shuffledIds=" + ids + "]");
+
+ for (int i = 0; i < numNodesToRestart; i++) {
+ Integer id = ids.get(i);
+
+ BenchmarkConfiguration bc = srvsCfgs.get(id);
+
+ ProcessExecutionResult res = BenchmarkUtils.kill9Server(bc, isDebug);
+
+ println("Server with id " + id + " has been killed."
+ + (isDebug ? " Process execution result:\n" + res : ""));
+ }
+
+ Thread.sleep(args.restartSleep() * 1000);
+
+ for (int i = 0; i < numNodesToRestart; i++) {
+ Integer id = ids.get(i);
+
+ BenchmarkConfiguration bc = srvsCfgs.get(id);
+
+ ProcessExecutionResult res = BenchmarkUtils.startServer(bc, isDebug);
+
+ println("Server with id " + id + " has been started."
+ + (isDebug ? " Process execution result:\n" + res : ""));
+ }
+ }
+ }
+ catch (Throwable e) {
+ println("Got exception: " + e);
+ e.printStackTrace();
+
+ U.dumpThreads(null);
+
+ if (e instanceof Error)
+ throw (Error)e;
+ }
+ }
+ }, "servers-restarter");
+
+ restarterThread.setDaemon(true);
+ restarterThread.start();
+ }
+ }
+
+ /**
+ * Awaits for partitiona map exchage.
+ *
+ * @param ignite Ignite.
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("BusyWait")
+ protected static void awaitPartitionMapExchange(Ignite ignite) throws Exception {
+ IgniteLogger log = ignite.log();
+
+ log.info("Waiting for finishing of a partition exchange on node: " + ignite);
+
+ IgniteKernal kernal = (IgniteKernal)ignite;
+
+ while (true) {
+ boolean partitionsExchangeFinished = true;
+
+ for (IgniteInternalCache<?, ?> cache : kernal.cachesx(null)) {
+ log.info("Checking cache: " + cache.name());
+
+ GridCacheAdapter<?, ?> c = kernal.internalCache(cache.name());
+
+ if (!(c instanceof GridDhtCacheAdapter))
+ break;
+
+ GridDhtCacheAdapter<?, ?> dht = (GridDhtCacheAdapter<?, ?>)c;
+
+ GridDhtPartitionFullMap partMap = dht.topology().partitionMap(true);
+
+ for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) {
+ log.info("Checking node: " + e.getKey());
+
+ for (Map.Entry<Integer, GridDhtPartitionState> e1 : e.getValue().entrySet()) {
+ if (e1.getValue() != GridDhtPartitionState.OWNING) {
+ log.info("Undesired state [id=" + e1.getKey() + ", state=" + e1.getValue() + ']');
+
+ partitionsExchangeFinished = false;
+
+ break;
+ }
+ }
+
+ if (!partitionsExchangeFinished)
+ break;
+ }
+
+ if (!partitionsExchangeFinished)
+ break;
+ }
+
+ if (partitionsExchangeFinished)
+ return;
+
+ Thread.sleep(100);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onException(Throwable e) {
+ // Proceess only the first exception to prevent a multiple printing of a full thread dump.
+ if (firtsExProcessed.compareAndSet(false, true)) {
+ // Debug info on current client.
+ println("Full thread dump of the current node below.");
+
+ U.dumpThreads(null);
+
+ println("");
+
+ ((IgniteMXBean)ignite()).dumpDebugInfo();
+
+ // Debug info on servers.
+ Ignite ignite = ignite();
+
+ ClusterGroup srvs = ignite.cluster().forServers();
+
+ IgniteCompute asyncCompute = ignite.compute(srvs).withAsync();
+
+ asyncCompute.broadcast(new ThreadDumpPrinterTask(ignite.cluster().localNode().id(), e));
+ asyncCompute.future().get(10_000);
+ }
+ }
+
+ /**
+ * @return Cache name.
+ */
+ protected abstract String cacheName();
+
+ /** {@inheritDoc} */
+ @Override protected IgniteCache<K, V> cache() {
+ return ignite().cache(cacheName());
+ }
+
+ /**
+ */
+ private static class ThreadDumpPrinterTask implements IgniteRunnable {
+ /** */
+ private static final long serialVersionUID = 0;
+
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** */
+ private final UUID id;
+
+ /** */
+ private final Throwable e;
+
+ /**
+ * @param id Benchmark node id.
+ * @param e Exception.
+ */
+ ThreadDumpPrinterTask(UUID id, Throwable e) {
+ this.id = id;
+ this.e = e;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ println("Driver finished with exception [driverNodeId=" + id + ", e=" + e + "]");
+ println("Full thread dump of the current server node below.");
+
+ U.dumpThreads(null);
+
+ println("");
+
+ ((IgniteMXBean)ignite).dumpDebugInfo();
+ }
+ }
+
+ /**
+ */
+ private static class AwaitPartitionMapExchangeTask implements IgniteRunnable {
+ /** */
+ private static final long serialVersionUID = 0;
+
+ /** */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ try {
+ awaitPartitionMapExchange(ignite);
+ }
+ catch (Exception e) {
+ throw new IgniteException(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverNode.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverNode.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverNode.java
new file mode 100644
index 0000000..29405de
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteFailoverNode.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.util.List;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.yardstick.IgniteNode;
+import org.yardstickframework.BenchmarkConfiguration;
+
+import static org.yardstickframework.BenchmarkUtils.println;
+
+/**
+ * Ignite failover node.
+ */
+public class IgniteFailoverNode extends IgniteNode {
+ /** {@inheritDoc} */
+ @Override public void start(BenchmarkConfiguration cfg) throws Exception {
+ super.start(cfg);
+
+ // Put server configuration at special cache.
+ RuntimeMXBean mxBean = ManagementFactory.getRuntimeMXBean();
+
+ List<String> jvmOpts = mxBean.getInputArguments();
+
+ StringBuilder jvmOptsStr = new StringBuilder();
+
+ for (String opt : jvmOpts)
+ jvmOptsStr.append(opt).append(' ');
+
+ cfg.customProperties().put("JVM_OPTS", jvmOptsStr.toString());
+ cfg.customProperties().put("PROPS_ENV", System.getenv("PROPS_ENV"));
+ cfg.customProperties().put("CLASSPATH", mxBean.getClassPath());
+ cfg.customProperties().put("JAVA", System.getenv("JAVA"));
+
+ IgniteCache<Integer, BenchmarkConfiguration> srvsCfgsCache = ignite().
+ getOrCreateCache(new CacheConfiguration<Integer, BenchmarkConfiguration>().setName("serversConfigs"));
+
+ srvsCfgsCache.put(cfg.memberId(), cfg);
+
+ println("Put at cache [" + cfg.memberId() + "=" + cfg + "]");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalInvokeRetryBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalInvokeRetryBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalInvokeRetryBenchmark.java
new file mode 100644
index 0000000..f8a1689
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalInvokeRetryBenchmark.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.yardstickframework.BenchmarkConfiguration;
+
+import static org.yardstickframework.BenchmarkUtils.println;
+
+/**
+ * Invoke retry failover benchmark. <p> Each client maintains a local map that it updates together with cache. Client
+ * invokes an increment closure for all generated keys and atomically increments value for corresponding keys in the
+ * local map. To validate cache contents, all writes from the client are stopped, values in the local map are compared
+ * to the values in the cache.
+ */
+public class IgniteTransactionalInvokeRetryBenchmark extends IgniteFailoverAbstractBenchmark<String, Long> {
+ /** */
+ private final ConcurrentMap<String, AtomicLong> map = new ConcurrentHashMap<>();
+
+ /** */
+ private final ReadWriteLock rwl = new ReentrantReadWriteLock(true);
+
+ /** */
+ private volatile Exception ex;
+
+ /** {@inheritDoc} */
+ @Override public void setUp(final BenchmarkConfiguration cfg) throws Exception {
+ super.setUp(cfg);
+
+ Thread thread = new Thread(new Runnable() {
+ @Override public void run() {
+ try {
+ final int timeout = args.cacheOperationTimeoutMillis();
+ final int keysCnt = args.keysCount();
+
+ while (!Thread.currentThread().isInterrupted()) {
+ Thread.sleep(args.cacheConsistencyCheckingPeriod() * 1000);
+
+ rwl.writeLock().lock();
+
+ try {
+ println("Start cache validation.");
+
+ long startTime = U.currentTimeMillis();
+
+ Map<String, Long> notEqualsCacheVals = new HashMap<>();
+ Map<String, Long> notEqualsLocMapVals = new HashMap<>();
+
+ for (int k = 0; k < args.range(); k++) {
+ if (k % 10_000 == 0)
+ println("Start validation for keys like 'key-" + k + "-*'");
+
+ for (int i = 0; i < keysCnt; i++) {
+ String key = "key-" + k + "-" + cfg.memberId() + "-" + i;
+
+ asyncCache.get(key);
+ Long cacheVal = asyncCache.<Long>future().get(timeout);
+
+ AtomicLong aVal = map.get(key);
+ Long mapVal = aVal != null ? aVal.get() : null;
+
+ if (!Objects.equals(cacheVal, mapVal)) {
+ notEqualsCacheVals.put(key, cacheVal);
+ notEqualsLocMapVals.put(key, mapVal);
+ }
+ }
+ }
+
+ assert notEqualsCacheVals.size() == notEqualsLocMapVals.size() : "Invalid state " +
+ "[cacheMapVals=" + notEqualsCacheVals + ", mapVals=" + notEqualsLocMapVals + "]";
+
+ if (!notEqualsCacheVals.isEmpty()) {
+ // Print all usefull information and finish.
+ for (Map.Entry<String, Long> eLocMap : notEqualsLocMapVals.entrySet()) {
+ String key = eLocMap.getKey();
+ Long mapVal = eLocMap.getValue();
+ Long cacheVal = notEqualsCacheVals.get(key);
+
+ println(cfg, "Got different values [key='" + key
+ + "', cacheVal=" + cacheVal + ", localMapVal=" + mapVal + "]");
+ }
+
+ println(cfg, "Local driver map contant:\n " + map);
+
+ println(cfg, "Cache content:");
+
+ for (int k2 = 0; k2 < args.range(); k2++) {
+ for (int i2 = 0; i2 < keysCnt; i2++) {
+ String key2 = "key-" + k2 + "-" + cfg.memberId() + "-" + i2;
+
+ asyncCache.get(key2);
+ Long val = asyncCache.<Long>future().get(timeout);
+
+ if (val != null)
+ println(cfg, "Entry [key=" + key2 + ", val=" + val + "]");
+ }
+ }
+
+ throw new IllegalStateException("Cache and local map are in inconsistent state.");
+ }
+
+ println("Cache validation successfully finished in "
+ + (U.currentTimeMillis() - startTime) / 1000 + " sec.");
+ }
+ finally {
+ rwl.writeLock().unlock();
+ }
+ }
+ }
+ catch (Throwable e) {
+ ex = new Exception(e);
+
+ println("Got exception: " + e);
+
+ e.printStackTrace();
+
+ if (e instanceof Error)
+ throw (Error)e;
+ }
+ }
+ }, "cache-" + cacheName() + "-validator");
+
+ thread.setDaemon(true);
+
+ thread.start();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+ final int k = nextRandom(args.range());
+
+ final String[] keys = new String[args.keysCount()];
+
+ assert keys.length > 0 : "Count of keys: " + keys.length;
+
+ for (int i = 0; i < keys.length; i++)
+ keys[i] = "key-" + k + "-" + cfg.memberId() + "-" + i;
+
+ for (String key : keys) {
+ rwl.readLock().lock();
+
+ try {
+ if (ex != null)
+ throw ex;
+
+ asyncCache.invoke(key, new IncrementCacheEntryProcessor());
+ asyncCache.future().get(args.cacheOperationTimeoutMillis());
+
+ AtomicLong prevVal = map.putIfAbsent(key, new AtomicLong(0));
+
+ if (prevVal != null)
+ prevVal.incrementAndGet();
+ }
+ finally {
+ rwl.readLock().unlock();
+ }
+ }
+
+ if (ex != null)
+ throw ex;
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected String cacheName() {
+ return "tx-invoke-retry";
+ }
+
+ /**
+ */
+ private static class IncrementCacheEntryProcessor implements CacheEntryProcessor<String, Long, Long> {
+ /** */
+ private static final long serialVersionUID = 0;
+
+ /** {@inheritDoc} */
+ @Override public Long process(MutableEntry<String, Long> entry,
+ Object... arguments) throws EntryProcessorException {
+ long newVal = entry.getValue() == null ? 0 : entry.getValue() + 1;
+
+ entry.setValue(newVal);
+
+ return newVal;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapInvokeRetryBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapInvokeRetryBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapInvokeRetryBenchmark.java
new file mode 100644
index 0000000..4cbcf67
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapInvokeRetryBenchmark.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+/**
+ * Invoke retry failover benchmark.
+ * <p>
+ * Each client maintains a local map that it updates together with cache.
+ * Client invokes an increment closure for all generated keys and atomically increments value for corresponding
+ * keys in the local map. To validate cache contents, all writes from the client are stopped, values in
+ * the local map are compared to the values in the cache.
+ */
+public class IgniteTransactionalOffHeapInvokeRetryBenchmark extends IgniteTransactionalInvokeRetryBenchmark {
+ /** {@inheritDoc} */
+ @Override protected String cacheName() {
+ return "tx-offheap-invoke-retry";
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteInvokeBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteInvokeBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteInvokeBenchmark.java
new file mode 100644
index 0000000..7fa2d1a
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteInvokeBenchmark.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+/**
+ * Transactional write invoke failover benchmark.
+ * <p>
+ * Each client generates a random integer K in a limited range and creates keys in the form 'key-' + K + 'master',
+ * 'key-' + K + '-1', 'key-' + K + '-2', ... Then client starts a pessimistic repeatable read transaction
+ * and randomly chooses between read and write scenarios:
+ * <ul>
+ * <li>Reads value associated with the master key and child keys. Values must be equal.</li>
+ * <li>Reads value associated with the master key, increments it by 1 and puts the value, then invokes increment
+ * closure on child keys. No validation is performed.</li>
+ * </ul>
+ */
+public class IgniteTransactionalOffHeapWriteInvokeBenchmark extends IgniteTransactionalWriteInvokeBenchmark {
+ /** {@inheritDoc} */
+ @Override protected String cacheName() {
+ return "tx-offheap-write-invoke";
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteReadBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteReadBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteReadBenchmark.java
new file mode 100644
index 0000000..bdecca7
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalOffHeapWriteReadBenchmark.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+/**
+ * Transactional write read failover benchmark.
+ * <p>
+ * Each client generates a random integer K in a limited range and creates keys in the form 'key-' + K + '-1',
+ * 'key-' + K + '-2', ... Then client starts a pessimistic repeatable read transaction, reads value associated with
+ * each key. Values must be equal. Client increments value by 1, commits the transaction.
+ */
+public class IgniteTransactionalOffHeapWriteReadBenchmark extends IgniteTransactionalWriteReadBenchmark {
+ /** {@inheritDoc} */
+ @Override protected String cacheName() {
+ return "tx-offheap-write-read";
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteInvokeBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteInvokeBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteInvokeBenchmark.java
new file mode 100644
index 0000000..1a8ee14
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteInvokeBenchmark.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import javax.cache.CacheException;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionRollbackException;
+
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.yardstickframework.BenchmarkUtils.println;
+
+/**
+ * Transactional write invoke failover benchmark.
+ * <p>
+ * Each client generates a random integer K in a limited range and creates keys in the form 'key-' + K + 'master',
+ * 'key-' + K + '-1', 'key-' + K + '-2', ... Then client starts a pessimistic repeatable read transaction
+ * and randomly chooses between read and write scenarios:
+ * <ul>
+ * <li>Reads value associated with the master key and child keys. Values must be equal.</li>
+ * <li>Reads value associated with the master key, increments it by 1 and puts the value, then invokes increment
+ * closure on child keys. No validation is performed.</li>
+ * </ul>
+ */
+public class IgniteTransactionalWriteInvokeBenchmark extends IgniteFailoverAbstractBenchmark<String, Long> {
+ /** {@inheritDoc} */
+ @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+ final int k = nextRandom(args.range());
+
+ assert args.keysCount() > 0 : "Count of keys: " + args.keysCount();
+
+ final String[] keys = new String[args.keysCount()];
+
+ final String masterKey = "key-" + k + "-master";
+
+ for (int i = 0; i < keys.length; i++)
+ keys[i] = "key-" + k + "-" + i;
+
+ final int scenario = nextRandom(2);
+
+ return doInTransaction(ignite(), new Callable<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ final int timeout = args.cacheOperationTimeoutMillis();
+
+ switch (scenario) {
+ case 0: // Read scenario.
+ Map<String, Long> map = new HashMap<>();
+
+ asyncCache.get(masterKey);
+ Long cacheVal = asyncCache.<Long>future().get(timeout);
+
+ map.put(masterKey, cacheVal);
+
+ for (String key : keys) {
+ asyncCache.get(key);
+ cacheVal = asyncCache.<Long>future().get(timeout);
+
+ map.put(key, cacheVal);
+ }
+
+ Set<Long> values = new HashSet<>(map.values());
+
+ if (values.size() != 1) {
+ // Print all usefull information and finish.
+ println(cfg, "Got different values for keys [map=" + map + "]");
+
+ println(cfg, "Cache content:");
+
+ for (int k = 0; k < args.range(); k++) {
+ for (int i = 0; i < args.keysCount(); i++) {
+ String key = "key-" + k + "-" + i;
+
+ asyncCache.get(key);
+ Long val = asyncCache.<Long>future().get(timeout);
+
+ if (val != null)
+ println(cfg, "Entry [key=" + key + ", val=" + val + "]");
+ }
+ }
+
+ throw new IllegalStateException("Found different values for keys (see above information).");
+ }
+
+ break;
+ case 1: // Invoke scenario.
+ asyncCache.get(masterKey);
+ Long val = asyncCache.<Long>future().get(timeout);
+
+ asyncCache.put(masterKey, val == null ? 0 : val + 1);
+ asyncCache.future().get(timeout);
+
+ for (String key : keys) {
+ asyncCache.invoke(key, new IncrementCacheEntryProcessor());
+ asyncCache.future().get(timeout);
+ }
+
+ break;
+ }
+
+ return true;
+ }
+ });
+ }
+
+ /**
+ * @param ignite Ignite instance.
+ * @param clo Closure.
+ * @return Result of closure execution.
+ * @throws Exception
+ */
+ public static <T> T doInTransaction(Ignite ignite, Callable<T> clo) throws Exception {
+ while (true) {
+ try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ T res = clo.call();
+
+ tx.commit();
+
+ return res;
+ }
+ catch (CacheException e) {
+ if (e.getCause() instanceof ClusterTopologyException) {
+ ClusterTopologyException topEx = (ClusterTopologyException)e.getCause();
+
+ topEx.retryReadyFuture().get();
+ }
+ else
+ throw e;
+ }
+ catch (ClusterTopologyException e) {
+ e.retryReadyFuture().get();
+ }
+ catch (TransactionRollbackException ignore) {
+ // Safe to retry right away.
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected String cacheName() {
+ return "tx-write-invoke";
+ }
+
+ /**
+ */
+ private static class IncrementCacheEntryProcessor implements CacheEntryProcessor<String, Long, Void> {
+ /** */
+ private static final long serialVersionUID = 0;
+
+ /** {@inheritDoc} */
+ @Override public Void process(MutableEntry<String, Long> entry,
+ Object... arguments) throws EntryProcessorException {
+ entry.setValue(entry.getValue() == null ? 0 : entry.getValue() + 1);
+
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2501c3a5/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteReadBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteReadBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteReadBenchmark.java
new file mode 100644
index 0000000..c962749
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/failover/IgniteTransactionalWriteReadBenchmark.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache.failover;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionRollbackException;
+
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.yardstickframework.BenchmarkUtils.println;
+
+/**
+ * Transactional write read failover benchmark.
+ * <p>
+ * Each client generates a random integer K in a limited range and creates keys in the form 'key-' + K + '-1',
+ * 'key-' + K + '-2', ... Then client starts a pessimistic repeatable read transaction, reads value associated with
+ * each key. Values must be equal. Client increments value by 1, commits the transaction.
+ */
+public class IgniteTransactionalWriteReadBenchmark extends IgniteFailoverAbstractBenchmark<String, Long> {
+ /** {@inheritDoc} */
+ @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+ final int k = nextRandom(args.range());
+
+ assert args.keysCount() > 0 : "Count of keys: " + args.keysCount();
+
+ final String[] keys = new String[args.keysCount()];
+
+ for (int i = 0; i < keys.length; i++)
+ keys[i] = "key-" + k + "-" + i;
+
+ return doInTransaction(ignite(), new Callable<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ Map<String, Long> map = new HashMap<>();
+
+ final int timeout = args.cacheOperationTimeoutMillis();
+
+ for (String key : keys) {
+ asyncCache.get(key);
+ Long val = asyncCache.<Long>future().get(timeout);
+
+ map.put(key, val);
+ }
+
+ Set<Long> values = new HashSet<>(map.values());
+
+ if (values.size() != 1) {
+ // Print all usefull information and finish.
+ println(cfg, "Got different values for keys [map=" + map + "]");
+
+ println(cfg, "Cache content:");
+
+ for (int k = 0; k < args.range(); k++) {
+ for (int i = 0; i < args.keysCount(); i++) {
+ String key = "key-" + k + "-" + i;
+
+ asyncCache.get(key);
+ Long val = asyncCache.<Long>future().get(timeout);
+
+ if (val != null)
+ println(cfg, "Entry [key=" + key + ", val=" + val + "]");
+ }
+ }
+
+ throw new IllegalStateException("Found different values for keys (see above information).");
+ }
+
+ final Long oldVal = map.get(keys[0]);
+
+ final Long newVal = oldVal == null ? 0 : oldVal + 1;
+
+ for (String key : keys) {
+ asyncCache.put(key, newVal);
+ asyncCache.future().get(timeout);
+ }
+
+ return true;
+ }
+ });
+ }
+
+ /**
+ * @param ignite Ignite instance.
+ * @param clo Closure.
+ * @return Result of closure execution.
+ * @throws Exception
+ */
+ public static <T> T doInTransaction(Ignite ignite, Callable<T> clo) throws Exception {
+ while (true) {
+ try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ T res = clo.call();
+
+ tx.commit();
+
+ return res;
+ }
+ catch (CacheException e) {
+ if (e.getCause() instanceof ClusterTopologyException) {
+ ClusterTopologyException topEx = (ClusterTopologyException)e.getCause();
+
+ topEx.retryReadyFuture().get();
+ }
+ else
+ throw e;
+ }
+ catch (ClusterTopologyException e) {
+ e.retryReadyFuture().get();
+ }
+ catch (TransactionRollbackException ignore) {
+ // Safe to retry right away.
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected String cacheName() {
+ return "tx-write-read";
+ }
+}