You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/06 08:04:28 UTC

[13/21] ignite git commit: ignite-2.1

ignite-2.1


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ad42f620
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ad42f620
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ad42f620

Branch: refs/heads/ignite-gg-12306-1
Commit: ad42f6205b3956dca0ee54e85ce385e6591ec7a9
Parents: 7504b38
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jul 5 17:41:30 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jul 5 17:41:30 2017 +0300

----------------------------------------------------------------------
 .../yardstick/IgniteBenchmarkArguments.java     |  46 ++++
 .../cache/IgniteStreamerBenchmark.java          | 234 +++++++++++++++++++
 2 files changed, 280 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ad42f620/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
index d3b860c..5ec6c54 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/IgniteBenchmarkArguments.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.yardstick;
 
 import com.beust.jcommander.Parameter;
+import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.configuration.MemoryConfiguration;
 import org.apache.ignite.configuration.PersistentStoreConfiguration;
@@ -28,6 +29,7 @@ import org.apache.ignite.transactions.TransactionIsolation;
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.ignite.yardstick.cache.IgniteStreamerBenchmark;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -222,6 +224,22 @@ public class IgniteBenchmarkArguments {
     @Parameter(names = {"-pds", "--persistentStore"}, description = "Persistent store flag")
     private boolean persistentStoreEnabled;
 
+    /** */
+    @Parameter(names = {"-stcp", "--streamerCachesPrefix"}, description = "Cache name prefix for streamer benchmark")
+    private String streamerCachesPrefix = "streamer";
+
+    /** */
+    @Parameter(names = {"-stci", "--streamerCachesIndex"}, description = "First cache index for streamer benchmark")
+    private int streamerCacheIndex;
+
+    /** */
+    @Parameter(names = {"-stcc", "--streamerConcCaches"}, description = "Number of concurrently loaded caches for streamer benchmark")
+    private int streamerConcurrentCaches = 1;
+
+    /** */
+    @Parameter(names = {"-stbs", "--streamerBufSize"}, description = "Data streamer buffer size")
+    private int streamerBufSize = IgniteDataStreamer.DFLT_PER_NODE_BUFFER_SIZE;
+
     /**
      * @return {@code True} if need set {@link PersistentStoreConfiguration}.
      */
@@ -552,6 +570,34 @@ public class IgniteBenchmarkArguments {
             "-txc=" + txConcurrency + "-rd=" + restartDelay + "-rs=" + restartSleep;
     }
 
+    /**
+     * @return Cache name prefix for caches to be used in {@link IgniteStreamerBenchmark}.
+     */
+    public String streamerCachesPrefix() {
+        return streamerCachesPrefix;
+    }
+
+    /**
+     * @return First cache index for {@link IgniteStreamerBenchmark}.
+     */
+    public int streamerCacheIndex() {
+        return streamerCacheIndex;
+    }
+
+    /**
+     * @return Number of concurrently loaded caches for {@link IgniteStreamerBenchmark}.
+     */
+    public int streamerConcurrentCaches() {
+        return streamerConcurrentCaches;
+    }
+
+    /**
+     * @return Streamer buffer size {@link IgniteStreamerBenchmark} (see {@link IgniteDataStreamer#perNodeBufferSize()}.
+     */
+    public int streamerBufferSize() {
+        return streamerBufSize;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return GridToStringBuilder.toString(IgniteBenchmarkArguments.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ad42f620/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteStreamerBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteStreamerBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteStreamerBenchmark.java
new file mode 100644
index 0000000..9e253e1
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteStreamerBenchmark.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.yardstick.IgniteAbstractBenchmark;
+import org.apache.ignite.yardstick.cache.model.SampleValue;
+import org.yardstickframework.BenchmarkConfiguration;
+import org.yardstickframework.BenchmarkUtils;
+
+/**
+ *
+ */
+public class IgniteStreamerBenchmark extends IgniteAbstractBenchmark {
+    /** */
+    private List<String> cacheNames;
+
+    /** */
+    private ExecutorService executor;
+
+    /** */
+    private int entries;
+
+    /** {@inheritDoc} */
+    @Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
+        super.setUp(cfg);
+
+        entries = args.range();
+
+        if (entries <= 0)
+            throw new IllegalArgumentException("Invalid number of entries: " + entries);
+
+        if (cfg.threads() != 1)
+            throw new IllegalArgumentException("IgniteStreamerBenchmark should be run with single thread. " +
+                "Internally it starts multiple threads.");
+
+        String cacheNamePrefix = args.streamerCachesPrefix();
+
+        if (cacheNamePrefix == null || cacheNamePrefix.isEmpty())
+            throw new IllegalArgumentException("Streamer caches prefix not set.");
+
+        List<String> caches = new ArrayList<>();
+
+        for (String cacheName : ignite().cacheNames()) {
+            if (cacheName.startsWith(cacheNamePrefix))
+                caches.add(cacheName);
+        }
+
+        if (caches.isEmpty())
+            throw new IllegalArgumentException("Failed to find for IgniteStreamerBenchmark caches " +
+                "starting with '" + cacheNamePrefix + "'");
+
+        BenchmarkUtils.println("Found " + caches.size() + " caches for IgniteStreamerBenchmark: " + caches);
+
+        if (args.streamerCacheIndex() >= caches.size()) {
+            throw new IllegalArgumentException("Invalid streamer cache index: " + args.streamerCacheIndex() +
+                ", there are only " + caches.size() + " caches.");
+        }
+
+        if (args.streamerCacheIndex() + args.streamerConcurrentCaches() > caches.size()) {
+            throw new IllegalArgumentException("There are no enough caches [cacheIndex=" + args.streamerCacheIndex() +
+                ", concurrentCaches=" + args.streamerConcurrentCaches() +
+                ", totalCaches=" + caches.size() + ']');
+        }
+
+        Collections.sort(caches);
+
+        cacheNames = new ArrayList<>(caches.subList(args.streamerCacheIndex(),
+            args.streamerCacheIndex() + args.streamerConcurrentCaches()));
+
+        executor = Executors.newFixedThreadPool(args.streamerConcurrentCaches());
+
+        BenchmarkUtils.println("IgniteStreamerBenchmark start [cacheIndex=" + args.streamerCacheIndex() +
+            ", concurrentCaches=" + args.streamerConcurrentCaches() +
+            ", entries=" + entries +
+            ", bufferSize=" + args.streamerBufferSize() +
+            ", cachesToUse=" + cacheNames + ']');
+
+        if (cfg.warmup() > 0) {
+            BenchmarkUtils.println("IgniteStreamerBenchmark start warmup [warmupTimeMillis=" + cfg.warmup() + ']');
+
+            final long warmupEnd = System.currentTimeMillis() + cfg.warmup();
+
+            final AtomicBoolean stop = new AtomicBoolean();
+
+            try {
+                List<Future<Void>> futs = new ArrayList<>();
+
+                for (final String cacheName : cacheNames) {
+                    futs.add(executor.submit(new Callable<Void>() {
+                        @Override public Void call() throws Exception {
+                            Thread.currentThread().setName("streamer-" + cacheName);
+
+                            BenchmarkUtils.println("IgniteStreamerBenchmark start warmup for cache " +
+                                "[name=" + cacheName + ']');
+
+                            final int KEYS = Math.min(100_000, entries);
+
+                            int key = 1;
+
+                            try (IgniteDataStreamer<Object, Object> streamer = ignite().dataStreamer(cacheName)) {
+                                streamer.perNodeBufferSize(args.streamerBufferSize());
+
+                                while (System.currentTimeMillis() < warmupEnd && !stop.get()) {
+                                    for (int i = 0; i < 10; i++) {
+                                        streamer.addData(-key++, new SampleValue(key));
+
+                                        if (key >= KEYS)
+                                            key = 1;
+                                    }
+
+                                    streamer.flush();
+                                }
+                            }
+
+                            BenchmarkUtils.println("IgniteStreamerBenchmark finished warmup for cache " +
+                                "[name=" + cacheName + ']');
+
+                            return null;
+                        }
+                    }));
+                }
+
+                for (Future<Void> fut : futs)
+                    fut.get();
+            }
+            finally {
+                stop.set(true);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean test(Map<Object, Object> map) throws Exception {
+        BenchmarkUtils.println("IgniteStreamerBenchmark start test.");
+
+        long start = System.currentTimeMillis();
+
+        final AtomicBoolean stop = new AtomicBoolean();
+
+        try {
+            List<Future<Void>> futs = new ArrayList<>();
+
+            for (final String cacheName : cacheNames) {
+                futs.add(executor.submit(new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        Thread.currentThread().setName("streamer-" + cacheName);
+
+                        long start = System.currentTimeMillis();
+
+                        BenchmarkUtils.println("IgniteStreamerBenchmark start load cache [name=" + cacheName + ']');
+
+                        try (IgniteDataStreamer<Object, Object> streamer = ignite().dataStreamer(cacheName)) {
+                            for (int i = 0; i < entries; i++) {
+                                streamer.addData(i, new SampleValue(i));
+
+                                if (i > 0 && i % 1000 == 0) {
+                                    if (stop.get())
+                                        break;
+
+                                    if (i % 100_000 == 0) {
+                                        BenchmarkUtils.println("IgniteStreamerBenchmark cache load progress [name=" + cacheName +
+                                            ", entries=" + i +
+                                            ", timeMillis=" + (System.currentTimeMillis() - start) + ']');
+                                    }
+                                }
+                            }
+                        }
+
+                        long time = System.currentTimeMillis() - start;
+
+                        BenchmarkUtils.println("IgniteStreamerBenchmark finished load cache [name=" + cacheName +
+                            ", entries=" + entries +
+                            ", bufferSize=" + args.streamerBufferSize() +
+                            ", totalTimeMillis=" + time + ']');
+
+                        return null;
+                    }
+                }));
+            }
+
+            for (Future<Void> fut : futs)
+                fut.get();
+        }
+        finally {
+            stop.set(true);
+        }
+
+        long time = System.currentTimeMillis() - start;
+
+        BenchmarkUtils.println("IgniteStreamerBenchmark finished [totalTimeMillis=" + time +
+            ", entries=" + entries +
+            ", bufferSize=" + args.streamerBufferSize() + ']');
+
+        for (String cacheName : cacheNames) {
+            BenchmarkUtils.println("Cache size [cacheName=" + cacheName +
+                ", size=" + ignite().cache(cacheName).size() + ']');
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void tearDown() throws Exception {
+        if (executor != null)
+            executor.shutdown();
+
+        super.tearDown();
+    }
+}