You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2019/12/10 12:35:11 UTC

[tinkerpop] 11/18: Minor modifications while profiling.

This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch driver-35
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 6984ac740a214951062ab4953109caac85a69092
Author: stephen <sp...@gmail.com>
AuthorDate: Fri Dec 6 09:26:44 2019 -0500

    Minor modifications while profiling.
    
    No changes of consequence really.
---
 .../apache/tinkerpop/gremlin/driver/Client.java    | 17 ++++++++-------
 .../apache/tinkerpop/gremlin/driver/Cluster.java   |  2 +-
 .../gremlin/driver/ConnectionPoolImpl.java         | 12 +++++++----
 .../driver/util/ConfigurationEvaluator.java        |  7 +++----
 .../gremlin/driver/util/ProfilingApplication.java  | 24 ++++++++++------------
 5 files changed, 33 insertions(+), 29 deletions(-)

diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index e2ea17b..987948e 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -199,7 +199,8 @@ public abstract class Client {
         if (initialized)
             return this;
 
-        logger.debug("Initializing client on cluster [{}]", cluster);
+        if (logger.isDebugEnabled())
+            logger.debug("Initializing client on cluster [{}]", cluster);
 
         cluster.init();
         initializeImplementation();
@@ -337,11 +338,11 @@ public abstract class Client {
         // need to call buildMessage() right away to get client specific configurations, that way request specific
         // ones can override as needed
         final RequestMessage.Builder request = buildMessage(RequestMessage.build(Tokens.OPS_EVAL))
-                .add(Tokens.ARGS_GREMLIN, gremlin)
-                .add(Tokens.ARGS_BATCH_SIZE, batchSize);
+                .addArg(Tokens.ARGS_GREMLIN, gremlin)
+                .addArg(Tokens.ARGS_BATCH_SIZE, batchSize);
 
         // apply settings if they were made available
-        options.getTimeout().ifPresent(timeout -> request.add(Tokens.ARGS_EVAL_TIMEOUT, timeout));
+        options.getTimeout().ifPresent(timeout -> request.addArg(Tokens.ARGS_EVAL_TIMEOUT, timeout));
         options.getParameters().ifPresent(params -> request.addArg(Tokens.ARGS_BINDINGS, params));
         options.getAliases().ifPresent(aliases -> request.addArg(Tokens.ARGS_ALIASES, aliases));
         options.getOverrideRequestId().ifPresent(request::overrideRequestId);
@@ -379,7 +380,9 @@ public abstract class Client {
                         }
                     } else {
                         conn.write(msg, future);
-                        logger.debug("Submitted {} to - {}", msg, conn);
+
+                        if (logger.isDebugEnabled())
+                            logger.debug("Submitted {} to - {}", msg, conn);
                     }
                 }, this.cluster.executor());
         return future;
@@ -589,8 +592,8 @@ public abstract class Client {
                                                                                   .addArg(Tokens.ARGS_GREMLIN, bytecode));
 
                 // apply settings if they were made available
-                options.getBatchSize().ifPresent(batchSize -> request.add(Tokens.ARGS_BATCH_SIZE, batchSize));
-                options.getTimeout().ifPresent(timeout -> request.add(Tokens.ARGS_EVAL_TIMEOUT, timeout));
+                options.getBatchSize().ifPresent(batchSize -> request.addArg(Tokens.ARGS_BATCH_SIZE, batchSize));
+                options.getTimeout().ifPresent(timeout -> request.addArg(Tokens.ARGS_EVAL_TIMEOUT, timeout));
 
                 return submitAsync(request.create());
             } catch (Exception ex) {
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
index 6d72a32..7763cb0 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@ -923,7 +923,7 @@ public final class Cluster {
                     new BasicThreadFactory.Builder().namingPattern("gremlin-driver-worker-%d").build());
             this.executor.setRemoveOnCancelPolicy(true);
 
-            validationRequest = () -> RequestMessage.build(Tokens.OPS_EVAL).add(Tokens.ARGS_GREMLIN, builder.validationRequest);
+            validationRequest = () -> RequestMessage.build(Tokens.OPS_EVAL).addArg(Tokens.ARGS_GREMLIN, builder.validationRequest);
         }
 
         private void validateBuilder(final Builder builder) {
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolImpl.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolImpl.java
index 5f0ccae..32bf7d8 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolImpl.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolImpl.java
@@ -106,19 +106,22 @@ public class ConnectionPoolImpl implements ConnectionPool {
             public void channelReleased(final Channel ch) {
                 // Note: Any operation performed here might have direct impact on the performance of the
                 // client since, this method is called with every new request.
-                logger.debug("Channel released: {}", ch);
+                if (logger.isDebugEnabled())
+                    logger.debug("Channel released: {}", ch);
             }
 
             @Override
             public void channelAcquired(final Channel ch) {
                 // Note: Any operation performed here might have direct impact on the performance of the
                 // client since, this method is called with every new request.
-                logger.debug("Channel acquired: {}", ch);
+                if (logger.isDebugEnabled())
+                    logger.debug("Channel acquired: {}", ch);
             }
 
             @Override
             public void channelCreated(final Channel ch) {
-                logger.debug("Channel created: {}", ch);
+                if (logger.isDebugEnabled())
+                    logger.debug("Channel created: {}", ch);
                 // Guaranteed that it is a socket channel because we set b.channel as SocketChannel
                 final SocketChannel sch = (SocketChannel) ch;
                 ((Channelizer.AbstractChannelizer) channelizer).initChannel(sch);
@@ -127,7 +130,8 @@ public class ConnectionPoolImpl implements ConnectionPool {
 
         this.channelPool = createChannelPool(b, cluster.connectionPoolSettings(), handler);
 
-        logger.debug("Initialized {} successfully.", this);
+        if (logger.isDebugEnabled())
+            logger.debug("Initialized {} successfully.", this);
     }
 
     private FixedChannelPool createChannelPool(final Bootstrap b,
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ConfigurationEvaluator.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ConfigurationEvaluator.java
index d145592..25affa1 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ConfigurationEvaluator.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ConfigurationEvaluator.java
@@ -34,21 +34,20 @@ public class ConfigurationEvaluator {
 
     private final List<Integer> workerPoolSizeRange = Arrays.asList(1,2,3,4,8,16,32);
     private final List<Integer> nioPoolSizeRange = Arrays.asList(1,2,4);
-    private final List<Integer> parallelismSizeRange = Arrays.asList(1,2,4,8,16,32);
+    private final List<Integer> maxConnectionPoolSizeRange = Arrays.asList(64, 128, 256, 512, 1024);
 
     public Stream<String[]> generate(final String [] args) throws Exception {
         final Set<String> configsTried = new HashSet<>();
 
         // get ready for the some serious brute-force action here
         for (int ir = 0; ir < nioPoolSizeRange.size(); ir++) {
-            for (int is = 0; is < parallelismSizeRange.size(); is++) {
+            for (int is = 0; is < maxConnectionPoolSizeRange.size(); is++) {
                 for (int it = 0; it < workerPoolSizeRange.size(); it++) {
                     final String s = String.join(",", String.valueOf(ir), String.valueOf(is), String.valueOf(it));
                     if (!configsTried.contains(s)) {
                         final Object[] argsToProfiler =
                                 Stream.of("nioPoolSize", nioPoolSizeRange.get(ir).toString(),
-                                          "parallelism", parallelismSizeRange.get(is).toString(),
-                                          "maxConnectionPoolSize", "15000",
+                                          "maxConnectionPoolSize", maxConnectionPoolSizeRange.get(is).toString(),
                                           "workerPoolSize", workerPoolSizeRange.get(it).toString(),
                                           "noExit", Boolean.TRUE.toString()).toArray();
 
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
index bf675e5..ed9b6d7 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
@@ -40,7 +40,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.IntStream;
 
 /**
  * An internal application used to test out configuration parameters for Gremlin Driver against Gremlin Server.
@@ -96,22 +95,21 @@ public class ProfilingApplication {
         try {
             final CountDownLatch latch = new CountDownLatch(requests);
             final AtomicInteger inFlight = new AtomicInteger(0);
+            final AtomicInteger count = new AtomicInteger(0);
             client.init();
 
             final long start = System.nanoTime();
-            IntStream.range(0, requests).forEach(i -> {
-                final String s = exercise ? chooseScript() : script;
 
+            while (count.get() < requests) {
                 // control number of requests in flight for testing purposes or else we'll timeout without enough
                 // connections to service all this stuff.
-                while (inFlight.intValue() >= cluster.maxConnectionPoolSize()) {
-                    try {
-                        TimeUnit.MILLISECONDS.sleep(10L);
-                    } catch (Exception ex) {
-                        // wait wait wait
-                    }
+                if (inFlight.intValue() >= cluster.maxConnectionPoolSize()) {
+                    continue;
                 }
 
+                final String s = exercise ? chooseScript() : script;
+
+                count.incrementAndGet();
                 inFlight.incrementAndGet();
                 client.submitAsync(s).thenAcceptAsync(r -> {
                     try {
@@ -125,7 +123,7 @@ public class ProfilingApplication {
                         latch.countDown();
                     }
                 }, executor);
-            });
+            }
 
             // finish once all requests are accounted for
             latch.await();
@@ -152,9 +150,9 @@ public class ProfilingApplication {
     public static void main(final String[] args) {
         final Map<String,Object> options = ElementHelper.asMap(args);
         final boolean noExit = Boolean.parseBoolean(options.getOrDefault("noExit", "false").toString());
-        final int parallelism = Integer.parseInt(options.getOrDefault("parallelism", "16").toString());
+        final int profilerPoolSize = Integer.parseInt(options.getOrDefault("profiler-pool", "2").toString());
         final BasicThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("profiler-%d").build();
-        final ExecutorService executor = Executors.newFixedThreadPool(parallelism, threadFactory);
+        final ExecutorService executor = Executors.newFixedThreadPool(profilerPoolSize, threadFactory);
 
         final String host = options.getOrDefault("host", "localhost").toString();
         final int minExpectedRps = Integer.parseInt(options.getOrDefault("minExpectedRps", "200").toString());
@@ -237,7 +235,7 @@ public class ProfilingApplication {
             System.out.println(String.format("avg req/sec: %s", averageRequestPerSecond));
             if (f != null) {
                 try (final PrintWriter writer = new PrintWriter(new BufferedWriter(new FileWriter(f, true)))) {
-                    writer.println(String.join("\t", String.valueOf(parallelism), String.valueOf(nioPoolSize), String.valueOf(maxConnectionPoolSize), String.valueOf(workerPoolSize), String.valueOf(averageRequestPerSecond)));
+                    writer.println(String.join("\t", String.valueOf(profilerPoolSize), String.valueOf(nioPoolSize), String.valueOf(maxConnectionPoolSize), String.valueOf(workerPoolSize), String.valueOf(averageRequestPerSecond)));
                 }
             }