You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/05/08 21:24:29 UTC

[1/2] incubator-tinkerpop git commit: Add a parallelism setting to size the threadpool that sends out messages over a client.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 8f798f5cd -> 0b9946218


Add a parallelism setting to size the threadpool that sends out messages over a client.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/da1bf8c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/da1bf8c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/da1bf8c1

Branch: refs/heads/master
Commit: da1bf8c1c4a96033285cce391804babac45ef0a3
Parents: 8f798f5
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Fri May 8 14:58:52 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Fri May 8 14:58:52 2015 -0400

----------------------------------------------------------------------
 .../driver/util/ProfilingApplication.java       | 20 ++++++++++++++------
 1 file changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/da1bf8c1/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
index fc2c6ef..bda2584 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.driver.util;
 
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.tinkerpop.gremlin.driver.Client;
 import org.apache.tinkerpop.gremlin.driver.Cluster;
 import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
@@ -31,6 +32,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -50,13 +52,14 @@ public class ProfilingApplication {
     private final int clients;
     private final String executionName;
 
-    private static final ExecutorService executor = Executors.newFixedThreadPool(8);
+    private final ExecutorService executor;
 
-    public ProfilingApplication(final String executionName, final Cluster cluster, final int clients, final int requests) {
+    public ProfilingApplication(final String executionName, final Cluster cluster, final int clients, final int requests, final ExecutorService executor) {
         this.executionName = executionName;
         this.cluster = cluster;
         this.clients = clients;
         this.requests = requests;
+        this.executor = executor;
     }
 
     public long execute() throws Exception {
@@ -133,6 +136,9 @@ public class ProfilingApplication {
     public static void main(final String[] args) {
         final Map<String,Object> options = ElementHelper.asMap(args);
         final boolean noExit = Boolean.parseBoolean(options.getOrDefault("noExit", "false").toString());
+        final int parallelism = Integer.parseInt(options.getOrDefault("parallelism", "4").toString());
+        final BasicThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("profiler-%d").build();
+        final ExecutorService executor = Executors.newFixedThreadPool(parallelism, threadFactory);
 
         try {
             final String host = options.getOrDefault("host", "localhost").toString();
@@ -164,7 +170,7 @@ public class ProfilingApplication {
             final File f = null == fileName ? null : new File(fileName.toString());
             if (f != null && f.length() == 0) {
                 try (final PrintWriter writer = new PrintWriter(new BufferedWriter(new FileWriter(f, true)))) {
-                    writer.println("clients\tminConnectionPoolSize\tmaxConnectionPoolSize\tmaxSimultaneousUsagePerConnection\tmaxInProcessPerConnection\tworkerPoolSize\trequestPerSecond");
+                    writer.println("clients\tminConnectionPoolSize\tmaxConnectionPoolSize\tminSimultaneousUsagePerConnection\tmaxSimultaneousUsagePerConnection\tminInProcessPerConnection\tmaxInProcessPerConnection\tworkerPoolSize\trequestPerSecond");
                 }
             }
 
@@ -172,7 +178,7 @@ public class ProfilingApplication {
             final AtomicBoolean meetsRpsExpectation = new AtomicBoolean(true);
             System.out.println("---------------------------WARMUP CYCLE---------------------------");
             for (int ix = 0; ix < warmups && meetsRpsExpectation.get(); ix++) {
-                final long averageRequestsPerSecond = new ProfilingApplication("warmup-" + (ix + 1), cluster, clients, 1000).execute();
+                final long averageRequestsPerSecond = new ProfilingApplication("warmup-" + (ix + 1), cluster, clients, 1000, executor).execute();
                 meetsRpsExpectation.set(averageRequestsPerSecond > minExpectedRps);
                 TimeUnit.SECONDS.sleep(1); // pause between executions
             }
@@ -185,7 +191,7 @@ public class ProfilingApplication {
                 final long start = System.nanoTime();
                 System.out.println("----------------------------TEST CYCLE----------------------------");
                 for (int ix = 0; ix < executions && !exceededTimeout.get(); ix++) {
-                    totalRequestsPerSecond += new ProfilingApplication("test-" + (ix + 1), cluster, clients, requests).execute();
+                    totalRequestsPerSecond += new ProfilingApplication("test-" + (ix + 1), cluster, clients, requests, executor).execute();
                     exceededTimeout.set((System.nanoTime() - start) > TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS));
                     TimeUnit.SECONDS.sleep(1); // pause between executions
                 }
@@ -195,7 +201,7 @@ public class ProfilingApplication {
             System.out.println(String.format("avg req/sec: %s", averageRequestPerSecond));
             if (f != null) {
                 try (final PrintWriter writer = new PrintWriter(new BufferedWriter(new FileWriter(f, true)))) {
-                    writer.println(String.join("\t", String.valueOf(clients), String.valueOf(minConnectionPoolSize), String.valueOf(maxConnectionPoolSize), String.valueOf(maxSimultaneousUsagePerConnection), String.valueOf(maxInProcessPerConnection), String.valueOf(workerPoolSize), String.valueOf(averageRequestPerSecond)));
+                    writer.println(String.join("\t", String.valueOf(clients), String.valueOf(minConnectionPoolSize), String.valueOf(maxConnectionPoolSize), String.valueOf(minSimultaneousUsagePerConnection), String.valueOf(maxSimultaneousUsagePerConnection), String.valueOf(minInProcessPerConnection), String.valueOf(maxInProcessPerConnection), String.valueOf(workerPoolSize), String.valueOf(averageRequestPerSecond)));
                 }
             }
 
@@ -203,6 +209,8 @@ public class ProfilingApplication {
         } catch (Exception ex) {
             ex.printStackTrace();
             if (!noExit) System.exit(1);
+        } finally {
+            executor.shutdown();
         }
     }
 }


[2/2] incubator-tinkerpop git commit: Replaced "clients" setting with "parallelism" in profiler.

Posted by sp...@apache.org.
Replaced "clients" setting with "parallelism" in profiler.

Improved output layout a bit so that columns line up.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/0b994621
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/0b994621
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/0b994621

Branch: refs/heads/master
Commit: 0b9946218b72b92330208236604a8438d2df4dd7
Parents: da1bf8c
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Fri May 8 15:22:48 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Fri May 8 15:23:54 2015 -0400

----------------------------------------------------------------------
 .../driver/util/ProfilingApplication.java       | 127 +++++++------------
 1 file changed, 46 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0b994621/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
----------------------------------------------------------------------
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
index bda2584..1c51f21 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.driver.util;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.tinkerpop.gremlin.driver.Client;
 import org.apache.tinkerpop.gremlin.driver.Cluster;
@@ -27,19 +28,14 @@ import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.PrintWriter;
-import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 /**
@@ -49,94 +45,63 @@ public class ProfilingApplication {
 
     private final Cluster cluster;
     private final int requests;
-    private final int clients;
     private final String executionName;
 
     private final ExecutorService executor;
 
-    public ProfilingApplication(final String executionName, final Cluster cluster, final int clients, final int requests, final ExecutorService executor) {
+    public ProfilingApplication(final String executionName, final Cluster cluster, final int requests, final ExecutorService executor) {
         this.executionName = executionName;
         this.cluster = cluster;
-        this.clients = clients;
         this.requests = requests;
         this.executor = executor;
     }
 
     public long execute() throws Exception {
-        final Map<Thread, Long> rps = new ConcurrentHashMap<>();
         final AtomicInteger tooSlow = new AtomicInteger(0);
 
-        // let all the clients fully init before starting to send messages
-        final CyclicBarrier barrier = new CyclicBarrier(clients);
-
-        final List<Thread> threads = IntStream.range(0, clients).mapToObj(t -> new Thread(() -> {
-            final Client client = cluster.connect();
-            final String executionId = "[" + executionName + "-" + (t + 1) + "]";
-            try {
-                final CountDownLatch latch = new CountDownLatch(requests);
-                client.init();
-                barrier.await();
-
-                // timer starts after init of all clients
-                final long start = System.nanoTime();
-                IntStream.range(0, requests).forEach(i ->
-                    client.submitAsync("1+1").thenAcceptAsync(r -> {
-                        try {
-                            r.all().get(125, TimeUnit.MILLISECONDS);
-                        } catch (TimeoutException ex) {
-                            tooSlow.incrementAndGet();
-                        } catch (Exception ex) {
-                            ex.printStackTrace();
-                        } finally {
-                            latch.countDown();
-                        }
-                    }, executor)
-                );
-
-                // finish once all requests are accounted for
-                latch.await();
-
-                final long end = System.nanoTime();
-                final long total = end - start;
-                final double totalSeconds = total / 1000000000d;
-                final long requestCount = requests;
-                final long reqSec = Math.round(requestCount / totalSeconds);
-                rps.put(Thread.currentThread(), reqSec);
-
-                System.out.println(String.format(executionId + " clients: %s | requests: %s | time(s): %s | req/sec: %s | too slow: %s", clients, requestCount, totalSeconds, reqSec, tooSlow.get()));
-            } catch (Exception ex) {
-                ex.printStackTrace();
-                throw new RuntimeException(ex);
-            } finally {
-                if (client != null) client.close();
-            }
-        }, "benchmark-client-" + executionName + "-" + (t + 1))).collect(Collectors.toList());
-
-        threads.forEach(t -> {
-            try {
-                t.start();
-            } catch (Exception ex) {
-                ex.printStackTrace();
-                throw new RuntimeException(ex);
-            }
-        });
-
-        threads.forEach(t -> {
-            try {
-                t.join();
-            } catch (Exception ex) {
-                ex.printStackTrace();
-                throw new RuntimeException(ex);
-            }
-        });
-
-        return rps.values().stream().collect(Collectors.averagingLong(l -> l)).longValue();
+        final Client client = cluster.connect();
+        final String executionId = "[" + executionName + "]";
+        try {
+            final CountDownLatch latch = new CountDownLatch(requests);
+            client.init();
+
+            final long start = System.nanoTime();
+            IntStream.range(0, requests).forEach(i ->
+                client.submitAsync("1+1").thenAcceptAsync(r -> {
+                    try {
+                        r.all().get(125, TimeUnit.MILLISECONDS);
+                    } catch (TimeoutException ex) {
+                        tooSlow.incrementAndGet();
+                    } catch (Exception ex) {
+                        ex.printStackTrace();
+                    } finally {
+                        latch.countDown();
+                    }
+                }, executor)
+            );
+
+            // finish once all requests are accounted for
+            latch.await();
+
+            final long end = System.nanoTime();
+            final long total = end - start;
+            final double totalSeconds = total / 1000000000d;
+            final long requestCount = requests;
+            final long reqSec = Math.round(requestCount / totalSeconds);
+            System.out.println(String.format(StringUtils.rightPad(executionId, 10) + " requests: %s | time(s): %s | req/sec: %s | too slow: %s", requestCount, StringUtils.rightPad(String.valueOf(totalSeconds), 14), StringUtils.rightPad(String.valueOf(reqSec), 7), tooSlow.get()));
+            return reqSec;
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            throw new RuntimeException(ex);
+        } finally {
+            if (client != null) client.close();
+        }
     }
 
     public static void main(final String[] args) {
         final Map<String,Object> options = ElementHelper.asMap(args);
         final boolean noExit = Boolean.parseBoolean(options.getOrDefault("noExit", "false").toString());
-        final int parallelism = Integer.parseInt(options.getOrDefault("parallelism", "4").toString());
+        final int parallelism = Integer.parseInt(options.getOrDefault("parallelism", "2").toString());
         final BasicThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("profiler-%d").build();
         final ExecutorService executor = Executors.newFixedThreadPool(parallelism, threadFactory);
 
@@ -146,7 +111,7 @@ public class ProfilingApplication {
             final int timeout = Integer.parseInt(options.getOrDefault("timeout", "1200000").toString());
             final int warmups = Integer.parseInt(options.getOrDefault("warmups", "5").toString());
             final int executions = Integer.parseInt(options.getOrDefault("executions", "10").toString());
-            final int clients = Integer.parseInt(options.getOrDefault("clients", "1").toString());
+            final int nioPoolSize = Integer.parseInt(options.getOrDefault("nioPoolSize", "2").toString());
             final int requests = Integer.parseInt(options.getOrDefault("requests", "10000").toString());
             final int minConnectionPoolSize = Integer.parseInt(options.getOrDefault("minConnectionPoolSize", "256").toString());
             final int maxConnectionPoolSize = Integer.parseInt(options.getOrDefault("maxConnectionPoolSize", "256").toString());
@@ -163,14 +128,14 @@ public class ProfilingApplication {
                     .maxSimultaneousUsagePerConnection(maxSimultaneousUsagePerConnection)
                     .minInProcessPerConnection(minInProcessPerConnection)
                     .maxInProcessPerConnection(maxInProcessPerConnection)
-                    .nioPoolSize(clients)
+                    .nioPoolSize(nioPoolSize)
                     .workerPoolSize(workerPoolSize).create();
 
             final Object fileName = options.get("store");
             final File f = null == fileName ? null : new File(fileName.toString());
             if (f != null && f.length() == 0) {
                 try (final PrintWriter writer = new PrintWriter(new BufferedWriter(new FileWriter(f, true)))) {
-                    writer.println("clients\tminConnectionPoolSize\tmaxConnectionPoolSize\tminSimultaneousUsagePerConnection\tmaxSimultaneousUsagePerConnection\tminInProcessPerConnection\tmaxInProcessPerConnection\tworkerPoolSize\trequestPerSecond");
+                    writer.println("nioPoolSize\tminConnectionPoolSize\tmaxConnectionPoolSize\tminSimultaneousUsagePerConnection\tmaxSimultaneousUsagePerConnection\tminInProcessPerConnection\tmaxInProcessPerConnection\tworkerPoolSize\trequestPerSecond");
                 }
             }
 
@@ -178,7 +143,7 @@ public class ProfilingApplication {
             final AtomicBoolean meetsRpsExpectation = new AtomicBoolean(true);
             System.out.println("---------------------------WARMUP CYCLE---------------------------");
             for (int ix = 0; ix < warmups && meetsRpsExpectation.get(); ix++) {
-                final long averageRequestsPerSecond = new ProfilingApplication("warmup-" + (ix + 1), cluster, clients, 1000, executor).execute();
+                final long averageRequestsPerSecond = new ProfilingApplication("warmup-" + (ix + 1), cluster, 1000, executor).execute();
                 meetsRpsExpectation.set(averageRequestsPerSecond > minExpectedRps);
                 TimeUnit.SECONDS.sleep(1); // pause between executions
             }
@@ -191,7 +156,7 @@ public class ProfilingApplication {
                 final long start = System.nanoTime();
                 System.out.println("----------------------------TEST CYCLE----------------------------");
                 for (int ix = 0; ix < executions && !exceededTimeout.get(); ix++) {
-                    totalRequestsPerSecond += new ProfilingApplication("test-" + (ix + 1), cluster, clients, requests, executor).execute();
+                    totalRequestsPerSecond += new ProfilingApplication("test-" + (ix + 1), cluster, requests, executor).execute();
                     exceededTimeout.set((System.nanoTime() - start) > TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS));
                     TimeUnit.SECONDS.sleep(1); // pause between executions
                 }
@@ -201,7 +166,7 @@ public class ProfilingApplication {
             System.out.println(String.format("avg req/sec: %s", averageRequestPerSecond));
             if (f != null) {
                 try (final PrintWriter writer = new PrintWriter(new BufferedWriter(new FileWriter(f, true)))) {
-                    writer.println(String.join("\t", String.valueOf(clients), String.valueOf(minConnectionPoolSize), String.valueOf(maxConnectionPoolSize), String.valueOf(minSimultaneousUsagePerConnection), String.valueOf(maxSimultaneousUsagePerConnection), String.valueOf(minInProcessPerConnection), String.valueOf(maxInProcessPerConnection), String.valueOf(workerPoolSize), String.valueOf(averageRequestPerSecond)));
+                    writer.println(String.join("\t", String.valueOf(nioPoolSize), String.valueOf(minConnectionPoolSize), String.valueOf(maxConnectionPoolSize), String.valueOf(minSimultaneousUsagePerConnection), String.valueOf(maxSimultaneousUsagePerConnection), String.valueOf(minInProcessPerConnection), String.valueOf(maxInProcessPerConnection), String.valueOf(workerPoolSize), String.valueOf(averageRequestPerSecond)));
                 }
             }