You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/06 08:04:28 UTC
[13/21] ignite git commit: ignite-2.1
ignite-2.1
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ad42f620
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ad42f620
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ad42f620
Branch: refs/heads/ignite-gg-12306-1
Commit: ad42f6205b3956dca0ee54e85ce385e6591ec7a9
Parents: 7504b38
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jul 5 17:41:30 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jul 5 17:41:30 2017 +0300
----------------------------------------------------------------------
.../yardstick/IgniteBenchmarkArguments.java | 46 ++++
.../cache/IgniteStreamerBenchmark.java | 234 +++++++++++++++++++
2 files changed, 280 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad42f620/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
index d3b860c..5ec6c54 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
@@ -18,6 +18,7 @@
package org.apache.ignite.yardstick;
import com.beust.jcommander.Parameter;
+import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.configuration.MemoryConfiguration;
import org.apache.ignite.configuration.PersistentStoreConfiguration;
@@ -28,6 +29,7 @@ import org.apache.ignite.transactions.TransactionIsolation;
import java.util.ArrayList;
import java.util.List;
+import org.apache.ignite.yardstick.cache.IgniteStreamerBenchmark;
import org.jetbrains.annotations.Nullable;
/**
@@ -222,6 +224,22 @@ public class IgniteBenchmarkArguments {
@Parameter(names = {"-pds", "--persistentStore"}, description = "Persistent store flag")
private boolean persistentStoreEnabled;
+ /** */
+ @Parameter(names = {"-stcp", "--streamerCachesPrefix"}, description = "Cache name prefix for streamer benchmark")
+ private String streamerCachesPrefix = "streamer";
+
+ /** */
+ @Parameter(names = {"-stci", "--streamerCachesIndex"}, description = "First cache index for streamer benchmark")
+ private int streamerCacheIndex;
+
+ /** */
+ @Parameter(names = {"-stcc", "--streamerConcCaches"}, description = "Number of concurrently loaded caches for streamer benchmark")
+ private int streamerConcurrentCaches = 1;
+
+ /** */
+ @Parameter(names = {"-stbs", "--streamerBufSize"}, description = "Data streamer buffer size")
+ private int streamerBufSize = IgniteDataStreamer.DFLT_PER_NODE_BUFFER_SIZE;
+
/**
* @return {@code True} if need set {@link PersistentStoreConfiguration}.
*/
@@ -552,6 +570,34 @@ public class IgniteBenchmarkArguments {
"-txc=" + txConcurrency + "-rd=" + restartDelay + "-rs=" + restartSleep;
}
+ /**
+ * @return Cache name prefix for caches to be used in {@link IgniteStreamerBenchmark}.
+ */
+ public String streamerCachesPrefix() {
+ return streamerCachesPrefix;
+ }
+
+ /**
+ * @return First cache index for {@link IgniteStreamerBenchmark}.
+ */
+ public int streamerCacheIndex() {
+ return streamerCacheIndex;
+ }
+
+ /**
+ * @return Number of concurrently loaded caches for {@link IgniteStreamerBenchmark}.
+ */
+ public int streamerConcurrentCaches() {
+ return streamerConcurrentCaches;
+ }
+
+ /**
+ * @return Streamer buffer size {@link IgniteStreamerBenchmark} (see {@link IgniteDataStreamer#perNodeBufferSize()}.
+ */
+ public int streamerBufferSize() {
+ return streamerBufSize;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return GridToStringBuilder.toString(IgniteBenchmarkArguments.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ad42f620/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteStreamerBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteStreamerBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteStreamerBenchmark.java
new file mode 100644
index 0000000..9e253e1
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteStreamerBenchmark.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.yardstick.IgniteAbstractBenchmark;
+import org.apache.ignite.yardstick.cache.model.SampleValue;
+import org.yardstickframework.BenchmarkConfiguration;
+import org.yardstickframework.BenchmarkUtils;
+
+/**
+ *
+ */
+public class IgniteStreamerBenchmark extends IgniteAbstractBenchmark {
+ /** */
+ private List<String> cacheNames;
+
+ /** */
+ private ExecutorService executor;
+
+ /** */
+ private int entries;
+
+ /** {@inheritDoc} */
+ @Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
+ super.setUp(cfg);
+
+ entries = args.range();
+
+ if (entries <= 0)
+ throw new IllegalArgumentException("Invalid number of entries: " + entries);
+
+ if (cfg.threads() != 1)
+ throw new IllegalArgumentException("IgniteStreamerBenchmark should be run with single thread. " +
+ "Internally it starts multiple threads.");
+
+ String cacheNamePrefix = args.streamerCachesPrefix();
+
+ if (cacheNamePrefix == null || cacheNamePrefix.isEmpty())
+ throw new IllegalArgumentException("Streamer caches prefix not set.");
+
+ List<String> caches = new ArrayList<>();
+
+ for (String cacheName : ignite().cacheNames()) {
+ if (cacheName.startsWith(cacheNamePrefix))
+ caches.add(cacheName);
+ }
+
+ if (caches.isEmpty())
+ throw new IllegalArgumentException("Failed to find for IgniteStreamerBenchmark caches " +
+ "starting with '" + cacheNamePrefix + "'");
+
+ BenchmarkUtils.println("Found " + caches.size() + " caches for IgniteStreamerBenchmark: " + caches);
+
+ if (args.streamerCacheIndex() >= caches.size()) {
+ throw new IllegalArgumentException("Invalid streamer cache index: " + args.streamerCacheIndex() +
+ ", there are only " + caches.size() + " caches.");
+ }
+
+ if (args.streamerCacheIndex() + args.streamerConcurrentCaches() > caches.size()) {
+ throw new IllegalArgumentException("There are no enough caches [cacheIndex=" + args.streamerCacheIndex() +
+ ", concurrentCaches=" + args.streamerConcurrentCaches() +
+ ", totalCaches=" + caches.size() + ']');
+ }
+
+ Collections.sort(caches);
+
+ cacheNames = new ArrayList<>(caches.subList(args.streamerCacheIndex(),
+ args.streamerCacheIndex() + args.streamerConcurrentCaches()));
+
+ executor = Executors.newFixedThreadPool(args.streamerConcurrentCaches());
+
+ BenchmarkUtils.println("IgniteStreamerBenchmark start [cacheIndex=" + args.streamerCacheIndex() +
+ ", concurrentCaches=" + args.streamerConcurrentCaches() +
+ ", entries=" + entries +
+ ", bufferSize=" + args.streamerBufferSize() +
+ ", cachesToUse=" + cacheNames + ']');
+
+ if (cfg.warmup() > 0) {
+ BenchmarkUtils.println("IgniteStreamerBenchmark start warmup [warmupTimeMillis=" + cfg.warmup() + ']');
+
+ final long warmupEnd = System.currentTimeMillis() + cfg.warmup();
+
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ try {
+ List<Future<Void>> futs = new ArrayList<>();
+
+ for (final String cacheName : cacheNames) {
+ futs.add(executor.submit(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ Thread.currentThread().setName("streamer-" + cacheName);
+
+ BenchmarkUtils.println("IgniteStreamerBenchmark start warmup for cache " +
+ "[name=" + cacheName + ']');
+
+ final int KEYS = Math.min(100_000, entries);
+
+ int key = 1;
+
+ try (IgniteDataStreamer<Object, Object> streamer = ignite().dataStreamer(cacheName)) {
+ streamer.perNodeBufferSize(args.streamerBufferSize());
+
+ while (System.currentTimeMillis() < warmupEnd && !stop.get()) {
+ for (int i = 0; i < 10; i++) {
+ streamer.addData(-key++, new SampleValue(key));
+
+ if (key >= KEYS)
+ key = 1;
+ }
+
+ streamer.flush();
+ }
+ }
+
+ BenchmarkUtils.println("IgniteStreamerBenchmark finished warmup for cache " +
+ "[name=" + cacheName + ']');
+
+ return null;
+ }
+ }));
+ }
+
+ for (Future<Void> fut : futs)
+ fut.get();
+ }
+ finally {
+ stop.set(true);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean test(Map<Object, Object> map) throws Exception {
+ BenchmarkUtils.println("IgniteStreamerBenchmark start test.");
+
+ long start = System.currentTimeMillis();
+
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ try {
+ List<Future<Void>> futs = new ArrayList<>();
+
+ for (final String cacheName : cacheNames) {
+ futs.add(executor.submit(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ Thread.currentThread().setName("streamer-" + cacheName);
+
+ long start = System.currentTimeMillis();
+
+ BenchmarkUtils.println("IgniteStreamerBenchmark start load cache [name=" + cacheName + ']');
+
+ try (IgniteDataStreamer<Object, Object> streamer = ignite().dataStreamer(cacheName)) {
+ for (int i = 0; i < entries; i++) {
+ streamer.addData(i, new SampleValue(i));
+
+ if (i > 0 && i % 1000 == 0) {
+ if (stop.get())
+ break;
+
+ if (i % 100_000 == 0) {
+ BenchmarkUtils.println("IgniteStreamerBenchmark cache load progress [name=" + cacheName +
+ ", entries=" + i +
+ ", timeMillis=" + (System.currentTimeMillis() - start) + ']');
+ }
+ }
+ }
+ }
+
+ long time = System.currentTimeMillis() - start;
+
+ BenchmarkUtils.println("IgniteStreamerBenchmark finished load cache [name=" + cacheName +
+ ", entries=" + entries +
+ ", bufferSize=" + args.streamerBufferSize() +
+ ", totalTimeMillis=" + time + ']');
+
+ return null;
+ }
+ }));
+ }
+
+ for (Future<Void> fut : futs)
+ fut.get();
+ }
+ finally {
+ stop.set(true);
+ }
+
+ long time = System.currentTimeMillis() - start;
+
+ BenchmarkUtils.println("IgniteStreamerBenchmark finished [totalTimeMillis=" + time +
+ ", entries=" + entries +
+ ", bufferSize=" + args.streamerBufferSize() + ']');
+
+ for (String cacheName : cacheNames) {
+ BenchmarkUtils.println("Cache size [cacheName=" + cacheName +
+ ", size=" + ignite().cache(cacheName).size() + ']');
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void tearDown() throws Exception {
+ if (executor != null)
+ executor.shutdown();
+
+ super.tearDown();
+ }
+}