You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/05/08 21:24:30 UTC
[2/2] incubator-tinkerpop git commit: Replaced "clients" setting with
"parallelism" in profiler.
Replaced "clients" setting with "parallelism" in profiler.
Improved output layout a bit so that columns line up.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/0b994621
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/0b994621
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/0b994621
Branch: refs/heads/master
Commit: 0b9946218b72b92330208236604a8438d2df4dd7
Parents: da1bf8c
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Fri May 8 15:22:48 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Fri May 8 15:23:54 2015 -0400
----------------------------------------------------------------------
.../driver/util/ProfilingApplication.java | 127 +++++++------------
1 file changed, 46 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0b994621/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
index bda2584..1c51f21 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
@@ -18,6 +18,7 @@
*/
package org.apache.tinkerpop.gremlin.driver.util;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.tinkerpop.gremlin.driver.Client;
import org.apache.tinkerpop.gremlin.driver.Cluster;
@@ -27,19 +28,14 @@ import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.PrintWriter;
-import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
@@ -49,94 +45,63 @@ public class ProfilingApplication {
private final Cluster cluster;
private final int requests;
- private final int clients;
private final String executionName;
private final ExecutorService executor;
- public ProfilingApplication(final String executionName, final Cluster cluster, final int clients, final int requests, final ExecutorService executor) {
+ public ProfilingApplication(final String executionName, final Cluster cluster, final int requests, final ExecutorService executor) {
this.executionName = executionName;
this.cluster = cluster;
- this.clients = clients;
this.requests = requests;
this.executor = executor;
}
public long execute() throws Exception {
- final Map<Thread, Long> rps = new ConcurrentHashMap<>();
final AtomicInteger tooSlow = new AtomicInteger(0);
- // let all the clients fully init before starting to send messages
- final CyclicBarrier barrier = new CyclicBarrier(clients);
-
- final List<Thread> threads = IntStream.range(0, clients).mapToObj(t -> new Thread(() -> {
- final Client client = cluster.connect();
- final String executionId = "[" + executionName + "-" + (t + 1) + "]";
- try {
- final CountDownLatch latch = new CountDownLatch(requests);
- client.init();
- barrier.await();
-
- // timer starts after init of all clients
- final long start = System.nanoTime();
- IntStream.range(0, requests).forEach(i ->
- client.submitAsync("1+1").thenAcceptAsync(r -> {
- try {
- r.all().get(125, TimeUnit.MILLISECONDS);
- } catch (TimeoutException ex) {
- tooSlow.incrementAndGet();
- } catch (Exception ex) {
- ex.printStackTrace();
- } finally {
- latch.countDown();
- }
- }, executor)
- );
-
- // finish once all requests are accounted for
- latch.await();
-
- final long end = System.nanoTime();
- final long total = end - start;
- final double totalSeconds = total / 1000000000d;
- final long requestCount = requests;
- final long reqSec = Math.round(requestCount / totalSeconds);
- rps.put(Thread.currentThread(), reqSec);
-
- System.out.println(String.format(executionId + " clients: %s | requests: %s | time(s): %s | req/sec: %s | too slow: %s", clients, requestCount, totalSeconds, reqSec, tooSlow.get()));
- } catch (Exception ex) {
- ex.printStackTrace();
- throw new RuntimeException(ex);
- } finally {
- if (client != null) client.close();
- }
- }, "benchmark-client-" + executionName + "-" + (t + 1))).collect(Collectors.toList());
-
- threads.forEach(t -> {
- try {
- t.start();
- } catch (Exception ex) {
- ex.printStackTrace();
- throw new RuntimeException(ex);
- }
- });
-
- threads.forEach(t -> {
- try {
- t.join();
- } catch (Exception ex) {
- ex.printStackTrace();
- throw new RuntimeException(ex);
- }
- });
-
- return rps.values().stream().collect(Collectors.averagingLong(l -> l)).longValue();
+ final Client client = cluster.connect();
+ final String executionId = "[" + executionName + "]";
+ try {
+ final CountDownLatch latch = new CountDownLatch(requests);
+ client.init();
+
+ final long start = System.nanoTime();
+ IntStream.range(0, requests).forEach(i ->
+ client.submitAsync("1+1").thenAcceptAsync(r -> {
+ try {
+ r.all().get(125, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException ex) {
+ tooSlow.incrementAndGet();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ } finally {
+ latch.countDown();
+ }
+ }, executor)
+ );
+
+ // finish once all requests are accounted for
+ latch.await();
+
+ final long end = System.nanoTime();
+ final long total = end - start;
+ final double totalSeconds = total / 1000000000d;
+ final long requestCount = requests;
+ final long reqSec = Math.round(requestCount / totalSeconds);
+ System.out.println(String.format(StringUtils.rightPad(executionId, 10) + " requests: %s | time(s): %s | req/sec: %s | too slow: %s", requestCount, StringUtils.rightPad(String.valueOf(totalSeconds), 14), StringUtils.rightPad(String.valueOf(reqSec), 7), tooSlow.get()));
+ return reqSec;
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ throw new RuntimeException(ex);
+ } finally {
+ if (client != null) client.close();
+ }
}
public static void main(final String[] args) {
final Map<String,Object> options = ElementHelper.asMap(args);
final boolean noExit = Boolean.parseBoolean(options.getOrDefault("noExit", "false").toString());
- final int parallelism = Integer.parseInt(options.getOrDefault("parallelism", "4").toString());
+ final int parallelism = Integer.parseInt(options.getOrDefault("parallelism", "2").toString());
final BasicThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("profiler-%d").build();
final ExecutorService executor = Executors.newFixedThreadPool(parallelism, threadFactory);
@@ -146,7 +111,7 @@ public class ProfilingApplication {
final int timeout = Integer.parseInt(options.getOrDefault("timeout", "1200000").toString());
final int warmups = Integer.parseInt(options.getOrDefault("warmups", "5").toString());
final int executions = Integer.parseInt(options.getOrDefault("executions", "10").toString());
- final int clients = Integer.parseInt(options.getOrDefault("clients", "1").toString());
+ final int nioPoolSize = Integer.parseInt(options.getOrDefault("nioPoolSize", "2").toString());
final int requests = Integer.parseInt(options.getOrDefault("requests", "10000").toString());
final int minConnectionPoolSize = Integer.parseInt(options.getOrDefault("minConnectionPoolSize", "256").toString());
final int maxConnectionPoolSize = Integer.parseInt(options.getOrDefault("maxConnectionPoolSize", "256").toString());
@@ -163,14 +128,14 @@ public class ProfilingApplication {
.maxSimultaneousUsagePerConnection(maxSimultaneousUsagePerConnection)
.minInProcessPerConnection(minInProcessPerConnection)
.maxInProcessPerConnection(maxInProcessPerConnection)
- .nioPoolSize(clients)
+ .nioPoolSize(nioPoolSize)
.workerPoolSize(workerPoolSize).create();
final Object fileName = options.get("store");
final File f = null == fileName ? null : new File(fileName.toString());
if (f != null && f.length() == 0) {
try (final PrintWriter writer = new PrintWriter(new BufferedWriter(new FileWriter(f, true)))) {
- writer.println("clients\tminConnectionPoolSize\tmaxConnectionPoolSize\tminSimultaneousUsagePerConnection\tmaxSimultaneousUsagePerConnection\tminInProcessPerConnection\tmaxInProcessPerConnection\tworkerPoolSize\trequestPerSecond");
+ writer.println("nioPoolSize\tminConnectionPoolSize\tmaxConnectionPoolSize\tminSimultaneousUsagePerConnection\tmaxSimultaneousUsagePerConnection\tminInProcessPerConnection\tmaxInProcessPerConnection\tworkerPoolSize\trequestPerSecond");
}
}
@@ -178,7 +143,7 @@ public class ProfilingApplication {
final AtomicBoolean meetsRpsExpectation = new AtomicBoolean(true);
System.out.println("---------------------------WARMUP CYCLE---------------------------");
for (int ix = 0; ix < warmups && meetsRpsExpectation.get(); ix++) {
- final long averageRequestsPerSecond = new ProfilingApplication("warmup-" + (ix + 1), cluster, clients, 1000, executor).execute();
+ final long averageRequestsPerSecond = new ProfilingApplication("warmup-" + (ix + 1), cluster, 1000, executor).execute();
meetsRpsExpectation.set(averageRequestsPerSecond > minExpectedRps);
TimeUnit.SECONDS.sleep(1); // pause between executions
}
@@ -191,7 +156,7 @@ public class ProfilingApplication {
final long start = System.nanoTime();
System.out.println("----------------------------TEST CYCLE----------------------------");
for (int ix = 0; ix < executions && !exceededTimeout.get(); ix++) {
- totalRequestsPerSecond += new ProfilingApplication("test-" + (ix + 1), cluster, clients, requests, executor).execute();
+ totalRequestsPerSecond += new ProfilingApplication("test-" + (ix + 1), cluster, requests, executor).execute();
exceededTimeout.set((System.nanoTime() - start) > TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS));
TimeUnit.SECONDS.sleep(1); // pause between executions
}
@@ -201,7 +166,7 @@ public class ProfilingApplication {
System.out.println(String.format("avg req/sec: %s", averageRequestPerSecond));
if (f != null) {
try (final PrintWriter writer = new PrintWriter(new BufferedWriter(new FileWriter(f, true)))) {
- writer.println(String.join("\t", String.valueOf(clients), String.valueOf(minConnectionPoolSize), String.valueOf(maxConnectionPoolSize), String.valueOf(minSimultaneousUsagePerConnection), String.valueOf(maxSimultaneousUsagePerConnection), String.valueOf(minInProcessPerConnection), String.valueOf(maxInProcessPerConnection), String.valueOf(workerPoolSize), String.valueOf(averageRequestPerSecond)));
+ writer.println(String.join("\t", String.valueOf(nioPoolSize), String.valueOf(minConnectionPoolSize), String.valueOf(maxConnectionPoolSize), String.valueOf(minSimultaneousUsagePerConnection), String.valueOf(maxSimultaneousUsagePerConnection), String.valueOf(minInProcessPerConnection), String.valueOf(maxInProcessPerConnection), String.valueOf(workerPoolSize), String.valueOf(averageRequestPerSecond)));
}
}