You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2017/07/20 12:31:32 UTC
[27/27] ignite git commit: ignite-5658 review
ignite-5658 review
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9936dd85
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9936dd85
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9936dd85
Branch: refs/heads/ignite-5658
Commit: 9936dd850e1caa022064cef16e680e32f1f2007d
Parents: 269ca02
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Thu Jul 20 15:30:52 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Thu Jul 20 15:30:52 2017 +0300
----------------------------------------------------------------------
.../yardstick/IgniteBenchmarkArguments.java | 11 +
.../IgniteSingleCacheStreamerBenchmark.java | 209 +++++++++++++++++++
.../cache/IgniteStreamerBenchmark.java | 13 +-
3 files changed, 231 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/9936dd85/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
index 594fa1f..355b1b0 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
@@ -252,6 +252,10 @@ public class IgniteBenchmarkArguments {
@Parameter(names = {"-stbs", "--streamerBufSize"}, description = "Data streamer buffer size")
private int streamerBufSize = IgniteDataStreamer.DFLT_PER_NODE_BUFFER_SIZE;
+ /** */
+ @Parameter(names = {"-stpo", "--streamerParallelOps"}, description = "Data streamer max parallel ops")
+ private int streamerPerNodeParallelOps = IgniteDataStreamer.DFLT_MAX_PARALLEL_OPS;
+
/**
* @return {@code True} if need set {@link PersistentStoreConfiguration}.
*/
@@ -631,6 +635,13 @@ public class IgniteBenchmarkArguments {
return streamerBufSize;
}
+ /**
+ * @return Streamer per node parallel ops.
+ */
+ public int getStreamerPerNodeParallelOps() {
+ return streamerPerNodeParallelOps;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return GridToStringBuilder.toString(IgniteBenchmarkArguments.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/9936dd85/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSingleCacheStreamerBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSingleCacheStreamerBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSingleCacheStreamerBenchmark.java
new file mode 100644
index 0000000..1a35c5d
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSingleCacheStreamerBenchmark.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.yardstick.IgniteAbstractBenchmark;
+import org.apache.ignite.yardstick.cache.model.SampleValue;
+import org.yardstickframework.BenchmarkConfiguration;
+import org.yardstickframework.BenchmarkUtils;
+
+/**
+ *
+ */
+public class IgniteSingleCacheStreamerBenchmark extends IgniteAbstractBenchmark {
+ /** */
+ private List<String> cacheNames;
+
+ /** */
+ private ExecutorService executor;
+
+ /** */
+ private int entries;
+
+ /** {@inheritDoc} */
+ @Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
+ super.setUp(cfg);
+
+ entries = args.range();
+
+ if (entries <= 0)
+ throw new IllegalArgumentException("Invalid number of entries: " + entries);
+
+ if (cfg.threads() != 1)
+ throw new IllegalArgumentException("IgniteStreamerBenchmark should be run with single thread. " +
+ "Internally it starts multiple threads.");
+
+ String cacheNamePrefix = args.streamerCachesPrefix();
+
+ if (cacheNamePrefix == null || cacheNamePrefix.isEmpty())
+ throw new IllegalArgumentException("Streamer caches prefix not set.");
+
+ List<String> caches = new ArrayList<>();
+
+ for (String cacheName : ignite().cacheNames()) {
+ if (cacheName.startsWith(cacheNamePrefix))
+ caches.add(cacheName);
+ }
+
+ if (caches.isEmpty())
+ throw new IllegalArgumentException("Failed to find for IgniteStreamerBenchmark caches " +
+ "starting with '" + cacheNamePrefix + "'");
+
+ BenchmarkUtils.println("Found " + caches.size() + " caches for IgniteStreamerBenchmark: " + caches);
+
+ if (args.streamerCacheIndex() >= caches.size()) {
+ throw new IllegalArgumentException("Invalid streamer cache index: " + args.streamerCacheIndex() +
+ ", there are only " + caches.size() + " caches.");
+ }
+
+ if (args.streamerCacheIndex() + args.streamerConcurrentCaches() > caches.size()) {
+ throw new IllegalArgumentException("There are no enough caches [cacheIndex=" + args.streamerCacheIndex() +
+ ", concurrentCaches=" + args.streamerConcurrentCaches() +
+ ", totalCaches=" + caches.size() + ']');
+ }
+
+ Collections.sort(caches);
+
+ cacheNames = new ArrayList<>(caches.subList(args.streamerCacheIndex(),
+ args.streamerCacheIndex() + args.streamerConcurrentCaches()));
+
+ if (cacheNames.size() > 1)
+ throw new IllegalArgumentException("IgniteSingleCacheStreamerBenchmark can run only with single cache " +
+ "[cacheNames=" + cacheNames + ']');
+
+ executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+
+ BenchmarkUtils.println("IgniteStreamerBenchmark start [cacheIndex=" + args.streamerCacheIndex() +
+ ", concurrentCaches=" + args.streamerConcurrentCaches() +
+ ", entries=" + entries +
+ ", bufferSize=" + args.streamerBufferSize() +
+ ", cachesToUse=" + cacheNames + ']');
+
+ if (cfg.warmup() > 0)
+ throw new IllegalArgumentException("IgniteSingleCacheStreamerBenchmark can run only without warmup " +
+ "[warmup=" + cfg.warmup() + ']');
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean test(Map<Object, Object> map) throws Exception {
+ BenchmarkUtils.println("IgniteStreamerBenchmark start test.");
+
+ long start = System.currentTimeMillis();
+
+ final AtomicBoolean stop = new AtomicBoolean();
+
+ final String cacheName = cacheNames.get(0);
+
+ try (IgniteDataStreamer<Object, Object> streamer = ignite().dataStreamer(cacheName)) {
+ streamer.perNodeParallelOperations(args.getStreamerPerNodeParallelOps());
+ streamer.perNodeBufferSize(args.streamerBufferSize());
+
+ final List<Future<Void>> futs = new ArrayList<>();
+
+ int availableCpus = Runtime.getRuntime().availableProcessors();
+
+ final AtomicInteger cnt = new AtomicInteger();
+ final int delta = entries / availableCpus;
+
+ for (int i = 0; i < availableCpus; i++) {
+ futs.add(executor.submit(
+ new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ int min = cnt.getAndAdd(delta);
+ int max = min + delta;
+
+ long start = System.currentTimeMillis();
+
+ BenchmarkUtils.println("IgniteStreamerBenchmark start load cache " +
+ "[name=" + cacheName +
+ ", min=" + min +
+ ", max=" + max + ']');
+
+ for (int i = 0; i < delta; i++) {
+ streamer.addData(min + i, new SampleValue(min + i));
+
+ if (i > 0 && i % 1000 == 0) {
+ if (stop.get())
+ break;
+
+ if (i % 100_000 == 0) {
+ BenchmarkUtils.println("IgniteStreamerBenchmark cache load progress " +
+ "[name=" + cacheName +
+ ", entries=" + i +
+ ", delta=" + delta +
+ ", timeMillis=" + (System.currentTimeMillis() - start) + ']');
+ }
+ }
+ }
+
+ long time = System.currentTimeMillis() - start;
+
+ BenchmarkUtils.println("Thread finished loading cache [name=" + cacheName +
+ ", min=" + min +
+ ", max=" + max +
+ ", bufferSize=" + args.streamerBufferSize() +
+ ", totalTimeMillis=" + time + ']');
+
+ return null;
+ }
+ }
+ ));
+ }
+
+ for (Future<Void> fut : futs)
+ fut.get();
+ }
+ finally {
+ stop.set(true);
+ }
+
+ long time = System.currentTimeMillis() - start;
+
+ BenchmarkUtils.println("IgniteStreamerBenchmark finished [totalTimeMillis=" + time +
+ ", entries=" + entries +
+ ", bufferSize=" + args.streamerBufferSize() + ']');
+
+ BenchmarkUtils.println("Cache size [cacheName=" + cacheName +
+ ", size=" + ignite().cache(cacheName).size() + ']');
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void tearDown() throws Exception {
+ if (executor != null) {
+ executor.shutdownNow();
+
+ executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ }
+
+ super.tearDown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9936dd85/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteStreamerBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteStreamerBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteStreamerBenchmark.java
index 9e253e1..9914eac 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteStreamerBenchmark.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteStreamerBenchmark.java
@@ -25,6 +25,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.yardstick.IgniteAbstractBenchmark;
@@ -175,6 +176,11 @@ public class IgniteStreamerBenchmark extends IgniteAbstractBenchmark {
BenchmarkUtils.println("IgniteStreamerBenchmark start load cache [name=" + cacheName + ']');
try (IgniteDataStreamer<Object, Object> streamer = ignite().dataStreamer(cacheName)) {
+ streamer.perNodeParallelOperations(Runtime.getRuntime().availableProcessors() * 4);
+ streamer.perNodeBufferSize(args.streamerBufferSize());
+
+ BenchmarkUtils.println("Data streamer: " + streamer);
+
for (int i = 0; i < entries; i++) {
streamer.addData(i, new SampleValue(i));
@@ -226,8 +232,11 @@ public class IgniteStreamerBenchmark extends IgniteAbstractBenchmark {
/** {@inheritDoc} */
@Override public void tearDown() throws Exception {
- if (executor != null)
- executor.shutdown();
+ if (executor != null) {
+ executor.shutdownNow();
+
+ executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ }
super.tearDown();
}