You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2017/07/20 12:31:32 UTC

[27/27] ignite git commit: ignite-5658 review

ignite-5658 review


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9936dd85
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9936dd85
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9936dd85

Branch: refs/heads/ignite-5658
Commit: 9936dd850e1caa022064cef16e680e32f1f2007d
Parents: 269ca02
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Thu Jul 20 15:30:52 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Thu Jul 20 15:30:52 2017 +0300

----------------------------------------------------------------------
 .../yardstick/IgniteBenchmarkArguments.java     |  11 +
 .../IgniteSingleCacheStreamerBenchmark.java     | 209 +++++++++++++++++++
 .../cache/IgniteStreamerBenchmark.java          |  13 +-
 3 files changed, 231 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9936dd85/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
index 594fa1f..355b1b0 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
@@ -252,6 +252,10 @@ public class IgniteBenchmarkArguments {
     @Parameter(names = {"-stbs", "--streamerBufSize"}, description = "Data streamer buffer size")
     private int streamerBufSize = IgniteDataStreamer.DFLT_PER_NODE_BUFFER_SIZE;
 
+    /** */
+    @Parameter(names = {"-stpo", "--streamerParallelOps"}, description = "Data streamer max parallel ops")
+    private int streamerPerNodeParallelOps = IgniteDataStreamer.DFLT_MAX_PARALLEL_OPS;
+
     /**
      * @return {@code True} if need set {@link PersistentStoreConfiguration}.
      */
@@ -631,6 +635,13 @@ public class IgniteBenchmarkArguments {
         return streamerBufSize;
     }
 
+    /**
+     * @return Streamer per node parallel ops.
+     */
+    public int getStreamerPerNodeParallelOps() {
+        return streamerPerNodeParallelOps;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return GridToStringBuilder.toString(IgniteBenchmarkArguments.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/9936dd85/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSingleCacheStreamerBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSingleCacheStreamerBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSingleCacheStreamerBenchmark.java
new file mode 100644
index 0000000..1a35c5d
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSingleCacheStreamerBenchmark.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.yardstick.IgniteAbstractBenchmark;
+import org.apache.ignite.yardstick.cache.model.SampleValue;
+import org.yardstickframework.BenchmarkConfiguration;
+import org.yardstickframework.BenchmarkUtils;
+
+/**
+ *
+ */
+public class IgniteSingleCacheStreamerBenchmark extends IgniteAbstractBenchmark {
+    /** */
+    private List<String> cacheNames;
+
+    /** */
+    private ExecutorService executor;
+
+    /** */
+    private int entries;
+
+    /** {@inheritDoc} */
+    @Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
+        super.setUp(cfg);
+
+        entries = args.range();
+
+        if (entries <= 0)
+            throw new IllegalArgumentException("Invalid number of entries: " + entries);
+
+        if (cfg.threads() != 1)
+            throw new IllegalArgumentException("IgniteStreamerBenchmark should be run with single thread. " +
+                "Internally it starts multiple threads.");
+
+        String cacheNamePrefix = args.streamerCachesPrefix();
+
+        if (cacheNamePrefix == null || cacheNamePrefix.isEmpty())
+            throw new IllegalArgumentException("Streamer caches prefix not set.");
+
+        List<String> caches = new ArrayList<>();
+
+        for (String cacheName : ignite().cacheNames()) {
+            if (cacheName.startsWith(cacheNamePrefix))
+                caches.add(cacheName);
+        }
+
+        if (caches.isEmpty())
+            throw new IllegalArgumentException("Failed to find for IgniteStreamerBenchmark caches " +
+                "starting with '" + cacheNamePrefix + "'");
+
+        BenchmarkUtils.println("Found " + caches.size() + " caches for IgniteStreamerBenchmark: " + caches);
+
+        if (args.streamerCacheIndex() >= caches.size()) {
+            throw new IllegalArgumentException("Invalid streamer cache index: " + args.streamerCacheIndex() +
+                ", there are only " + caches.size() + " caches.");
+        }
+
+        if (args.streamerCacheIndex() + args.streamerConcurrentCaches() > caches.size()) {
+            throw new IllegalArgumentException("There are no enough caches [cacheIndex=" + args.streamerCacheIndex() +
+                ", concurrentCaches=" + args.streamerConcurrentCaches() +
+                ", totalCaches=" + caches.size() + ']');
+        }
+
+        Collections.sort(caches);
+
+        cacheNames = new ArrayList<>(caches.subList(args.streamerCacheIndex(),
+            args.streamerCacheIndex() + args.streamerConcurrentCaches()));
+
+        if (cacheNames.size() > 1)
+            throw new IllegalArgumentException("IgniteSingleCacheStreamerBenchmark can run only with single cache " +
+                "[cacheNames=" + cacheNames + ']');
+
+        executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+
+        BenchmarkUtils.println("IgniteStreamerBenchmark start [cacheIndex=" + args.streamerCacheIndex() +
+            ", concurrentCaches=" + args.streamerConcurrentCaches() +
+            ", entries=" + entries +
+            ", bufferSize=" + args.streamerBufferSize() +
+            ", cachesToUse=" + cacheNames + ']');
+
+        if (cfg.warmup() > 0)
+            throw new IllegalArgumentException("IgniteSingleCacheStreamerBenchmark can run only without warmup " +
+                "[warmup=" + cfg.warmup() + ']');
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean test(Map<Object, Object> map) throws Exception {
+        BenchmarkUtils.println("IgniteStreamerBenchmark start test.");
+
+        long start = System.currentTimeMillis();
+
+        final AtomicBoolean stop = new AtomicBoolean();
+
+        final String cacheName = cacheNames.get(0);
+
+        try (IgniteDataStreamer<Object, Object> streamer = ignite().dataStreamer(cacheName)) {
+            streamer.perNodeParallelOperations(args.getStreamerPerNodeParallelOps());
+            streamer.perNodeBufferSize(args.streamerBufferSize());
+
+            final List<Future<Void>> futs = new ArrayList<>();
+
+            int availableCpus = Runtime.getRuntime().availableProcessors();
+
+            final AtomicInteger cnt = new AtomicInteger();
+            final int delta = entries / availableCpus;
+
+            for (int i = 0; i < availableCpus; i++) {
+                futs.add(executor.submit(
+                    new Callable<Void>() {
+                        @Override public Void call() throws Exception {
+                            int min = cnt.getAndAdd(delta);
+                            int max = min + delta;
+
+                            long start = System.currentTimeMillis();
+
+                            BenchmarkUtils.println("IgniteStreamerBenchmark start load cache " +
+                                "[name=" + cacheName +
+                                ", min=" + min +
+                                ", max=" + max + ']');
+
+                            for (int i = 0; i < delta; i++) {
+                                streamer.addData(min + i, new SampleValue(min + i));
+
+                                if (i > 0 && i % 1000 == 0) {
+                                    if (stop.get())
+                                        break;
+
+                                    if (i % 100_000 == 0) {
+                                        BenchmarkUtils.println("IgniteStreamerBenchmark cache load progress " +
+                                            "[name=" + cacheName +
+                                            ", entries=" + i +
+                                            ", delta=" + delta +
+                                            ", timeMillis=" + (System.currentTimeMillis() - start) + ']');
+                                    }
+                                }
+                            }
+
+                            long time = System.currentTimeMillis() - start;
+
+                            BenchmarkUtils.println("Thread finished loading cache [name=" + cacheName +
+                                ", min=" + min +
+                                ", max=" + max +
+                                ", bufferSize=" + args.streamerBufferSize() +
+                                ", totalTimeMillis=" + time + ']');
+
+                            return null;
+                        }
+                    }
+                ));
+            }
+
+            for (Future<Void> fut : futs)
+                fut.get();
+        }
+        finally {
+            stop.set(true);
+        }
+
+        long time = System.currentTimeMillis() - start;
+
+        BenchmarkUtils.println("IgniteStreamerBenchmark finished [totalTimeMillis=" + time +
+            ", entries=" + entries +
+            ", bufferSize=" + args.streamerBufferSize() + ']');
+
+        BenchmarkUtils.println("Cache size [cacheName=" + cacheName +
+            ", size=" + ignite().cache(cacheName).size() + ']');
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void tearDown() throws Exception {
+        if (executor != null) {
+            executor.shutdownNow();
+
+            executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+        }
+
+        super.tearDown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/9936dd85/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteStreamerBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteStreamerBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteStreamerBenchmark.java
index 9e253e1..9914eac 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteStreamerBenchmark.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteStreamerBenchmark.java
@@ -25,6 +25,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.yardstick.IgniteAbstractBenchmark;
@@ -175,6 +176,11 @@ public class IgniteStreamerBenchmark extends IgniteAbstractBenchmark {
                         BenchmarkUtils.println("IgniteStreamerBenchmark start load cache [name=" + cacheName + ']');
 
                         try (IgniteDataStreamer<Object, Object> streamer = ignite().dataStreamer(cacheName)) {
+                            streamer.perNodeParallelOperations(Runtime.getRuntime().availableProcessors() * 4);
+                            streamer.perNodeBufferSize(args.streamerBufferSize());
+
+                            BenchmarkUtils.println("Data streamer: " + streamer);
+
                             for (int i = 0; i < entries; i++) {
                                 streamer.addData(i, new SampleValue(i));
 
@@ -226,8 +232,11 @@ public class IgniteStreamerBenchmark extends IgniteAbstractBenchmark {
 
     /** {@inheritDoc} */
     @Override public void tearDown() throws Exception {
-        if (executor != null)
-            executor.shutdown();
+        if (executor != null) {
+            executor.shutdownNow();
+
+            executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+        }
 
         super.tearDown();
     }