You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2019/12/06 14:27:29 UTC

[tinkerpop] branch driver-35 updated (c5308a9 -> 8c30224)

This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a change to branch driver-35
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git.


    from c5308a9  Removed some code deprecation
     new f82fd4c  Control the number of request in flight
     new 8c30224  Minor modifications while profiling.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/tinkerpop/gremlin/driver/Client.java    | 17 ++++++++------
 .../apache/tinkerpop/gremlin/driver/Cluster.java   |  2 +-
 .../gremlin/driver/ConnectionPoolImpl.java         | 12 ++++++----
 .../driver/util/ConfigurationEvaluator.java        |  7 +++---
 .../gremlin/driver/util/ProfilingApplication.java  | 27 ++++++++++++++++------
 5 files changed, 42 insertions(+), 23 deletions(-)


[tinkerpop] 01/02: Control the number of request in flight

Posted by sp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch driver-35
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit f82fd4c3a0d7e330d7bddea6cfe5ef9a92dd21e7
Author: stephen <sp...@gmail.com>
AuthorDate: Thu Dec 5 13:09:52 2019 -0500

    Control the number of request in flight
    
    Bind it to the size of the max connections so it just doesn't timeout right away
---
 .../gremlin/driver/util/ProfilingApplication.java       | 17 ++++++++++++++++-
 1 file changed, 16 insertions(+), 1 deletion(-)

diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
index 5838ab4..bf675e5 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
@@ -23,6 +23,7 @@ import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.tinkerpop.gremlin.driver.Channelizer;
 import org.apache.tinkerpop.gremlin.driver.Client;
 import org.apache.tinkerpop.gremlin.driver.Cluster;
+import org.apache.tinkerpop.gremlin.driver.Connection;
 import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
 import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
 
@@ -94,11 +95,24 @@ public class ProfilingApplication {
         final String executionId = "[" + executionName + "]";
         try {
             final CountDownLatch latch = new CountDownLatch(requests);
+            final AtomicInteger inFlight = new AtomicInteger(0);
             client.init();
 
             final long start = System.nanoTime();
             IntStream.range(0, requests).forEach(i -> {
                 final String s = exercise ? chooseScript() : script;
+
+                // control number of requests in flight for testing purposes or else we'll timeout without enough
+                // connections to service all this stuff.
+                while (inFlight.intValue() >= cluster.maxConnectionPoolSize()) {
+                    try {
+                        TimeUnit.MILLISECONDS.sleep(10L);
+                    } catch (Exception ex) {
+                        // wait wait wait
+                    }
+                }
+
+                inFlight.incrementAndGet();
                 client.submitAsync(s).thenAcceptAsync(r -> {
                     try {
                         r.all().get(tooSlowThreshold, TimeUnit.MILLISECONDS);
@@ -107,6 +121,7 @@ public class ProfilingApplication {
                     } catch (Exception ex) {
                         ex.printStackTrace();
                     } finally {
+                        inFlight.decrementAndGet();
                         latch.countDown();
                     }
                 }, executor);
@@ -149,7 +164,7 @@ public class ProfilingApplication {
         final int nioPoolSize = Integer.parseInt(options.getOrDefault("nioPoolSize", "1").toString());
         final int requests = Integer.parseInt(options.getOrDefault("requests", "10000").toString());
         final int maxConnectionPoolSize = Integer.parseInt(options.getOrDefault("maxConnectionPoolSize", "256").toString());
-        final int maxWaitForConnection = Integer.parseInt(options.getOrDefault("maxWaitForConnection", "3000").toString());
+        final int maxWaitForConnection = Integer.parseInt(options.getOrDefault("maxWaitForConnection", Connection.DEFAULT_MAX_WAIT_FOR_CONNECTION).toString());
         final int workerPoolSize = Integer.parseInt(options.getOrDefault("workerPoolSize", "2").toString());
         final int tooSlowThreshold = Integer.parseInt(options.getOrDefault("tooSlowThreshold", "125").toString());
         final String channelizer = options.getOrDefault("channelizer", Channelizer.WebSocketChannelizer.class.getName()).toString();


[tinkerpop] 02/02: Minor modifications while profiling.

Posted by sp...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

spmallette pushed a commit to branch driver-35
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 8c30224617d7059f4e43f30f7d9baf6c2b55b2b2
Author: stephen <sp...@gmail.com>
AuthorDate: Fri Dec 6 09:26:44 2019 -0500

    Minor modifications while profiling.
    
    No changes of consequence really.
---
 .../apache/tinkerpop/gremlin/driver/Client.java    | 17 ++++++++-------
 .../apache/tinkerpop/gremlin/driver/Cluster.java   |  2 +-
 .../gremlin/driver/ConnectionPoolImpl.java         | 12 +++++++----
 .../driver/util/ConfigurationEvaluator.java        |  7 +++----
 .../gremlin/driver/util/ProfilingApplication.java  | 24 ++++++++++------------
 5 files changed, 33 insertions(+), 29 deletions(-)

diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
index e2ea17b..987948e 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java
@@ -199,7 +199,8 @@ public abstract class Client {
         if (initialized)
             return this;
 
-        logger.debug("Initializing client on cluster [{}]", cluster);
+        if (logger.isDebugEnabled())
+            logger.debug("Initializing client on cluster [{}]", cluster);
 
         cluster.init();
         initializeImplementation();
@@ -337,11 +338,11 @@ public abstract class Client {
         // need to call buildMessage() right away to get client specific configurations, that way request specific
         // ones can override as needed
         final RequestMessage.Builder request = buildMessage(RequestMessage.build(Tokens.OPS_EVAL))
-                .add(Tokens.ARGS_GREMLIN, gremlin)
-                .add(Tokens.ARGS_BATCH_SIZE, batchSize);
+                .addArg(Tokens.ARGS_GREMLIN, gremlin)
+                .addArg(Tokens.ARGS_BATCH_SIZE, batchSize);
 
         // apply settings if they were made available
-        options.getTimeout().ifPresent(timeout -> request.add(Tokens.ARGS_EVAL_TIMEOUT, timeout));
+        options.getTimeout().ifPresent(timeout -> request.addArg(Tokens.ARGS_EVAL_TIMEOUT, timeout));
         options.getParameters().ifPresent(params -> request.addArg(Tokens.ARGS_BINDINGS, params));
         options.getAliases().ifPresent(aliases -> request.addArg(Tokens.ARGS_ALIASES, aliases));
         options.getOverrideRequestId().ifPresent(request::overrideRequestId);
@@ -379,7 +380,9 @@ public abstract class Client {
                         }
                     } else {
                         conn.write(msg, future);
-                        logger.debug("Submitted {} to - {}", msg, conn);
+
+                        if (logger.isDebugEnabled())
+                            logger.debug("Submitted {} to - {}", msg, conn);
                     }
                 }, this.cluster.executor());
         return future;
@@ -589,8 +592,8 @@ public abstract class Client {
                                                                                   .addArg(Tokens.ARGS_GREMLIN, bytecode));
 
                 // apply settings if they were made available
-                options.getBatchSize().ifPresent(batchSize -> request.add(Tokens.ARGS_BATCH_SIZE, batchSize));
-                options.getTimeout().ifPresent(timeout -> request.add(Tokens.ARGS_EVAL_TIMEOUT, timeout));
+                options.getBatchSize().ifPresent(batchSize -> request.addArg(Tokens.ARGS_BATCH_SIZE, batchSize));
+                options.getTimeout().ifPresent(timeout -> request.addArg(Tokens.ARGS_EVAL_TIMEOUT, timeout));
 
                 return submitAsync(request.create());
             } catch (Exception ex) {
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
index 6d72a32..7763cb0 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@ -923,7 +923,7 @@ public final class Cluster {
                     new BasicThreadFactory.Builder().namingPattern("gremlin-driver-worker-%d").build());
             this.executor.setRemoveOnCancelPolicy(true);
 
-            validationRequest = () -> RequestMessage.build(Tokens.OPS_EVAL).add(Tokens.ARGS_GREMLIN, builder.validationRequest);
+            validationRequest = () -> RequestMessage.build(Tokens.OPS_EVAL).addArg(Tokens.ARGS_GREMLIN, builder.validationRequest);
         }
 
         private void validateBuilder(final Builder builder) {
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolImpl.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolImpl.java
index 5f0ccae..32bf7d8 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolImpl.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolImpl.java
@@ -106,19 +106,22 @@ public class ConnectionPoolImpl implements ConnectionPool {
             public void channelReleased(final Channel ch) {
                 // Note: Any operation performed here might have direct impact on the performance of the
                 // client since, this method is called with every new request.
-                logger.debug("Channel released: {}", ch);
+                if (logger.isDebugEnabled())
+                    logger.debug("Channel released: {}", ch);
             }
 
             @Override
             public void channelAcquired(final Channel ch) {
                 // Note: Any operation performed here might have direct impact on the performance of the
                 // client since, this method is called with every new request.
-                logger.debug("Channel acquired: {}", ch);
+                if (logger.isDebugEnabled())
+                    logger.debug("Channel acquired: {}", ch);
             }
 
             @Override
             public void channelCreated(final Channel ch) {
-                logger.debug("Channel created: {}", ch);
+                if (logger.isDebugEnabled())
+                    logger.debug("Channel created: {}", ch);
                 // Guaranteed that it is a socket channel because we set b.channel as SocketChannel
                 final SocketChannel sch = (SocketChannel) ch;
                 ((Channelizer.AbstractChannelizer) channelizer).initChannel(sch);
@@ -127,7 +130,8 @@ public class ConnectionPoolImpl implements ConnectionPool {
 
         this.channelPool = createChannelPool(b, cluster.connectionPoolSettings(), handler);
 
-        logger.debug("Initialized {} successfully.", this);
+        if (logger.isDebugEnabled())
+            logger.debug("Initialized {} successfully.", this);
     }
 
     private FixedChannelPool createChannelPool(final Bootstrap b,
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ConfigurationEvaluator.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ConfigurationEvaluator.java
index d145592..25affa1 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ConfigurationEvaluator.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ConfigurationEvaluator.java
@@ -34,21 +34,20 @@ public class ConfigurationEvaluator {
 
     private final List<Integer> workerPoolSizeRange = Arrays.asList(1,2,3,4,8,16,32);
     private final List<Integer> nioPoolSizeRange = Arrays.asList(1,2,4);
-    private final List<Integer> parallelismSizeRange = Arrays.asList(1,2,4,8,16,32);
+    private final List<Integer> maxConnectionPoolSizeRange = Arrays.asList(64, 128, 256, 512, 1024);
 
     public Stream<String[]> generate(final String [] args) throws Exception {
         final Set<String> configsTried = new HashSet<>();
 
         // get ready for the some serious brute-force action here
         for (int ir = 0; ir < nioPoolSizeRange.size(); ir++) {
-            for (int is = 0; is < parallelismSizeRange.size(); is++) {
+            for (int is = 0; is < maxConnectionPoolSizeRange.size(); is++) {
                 for (int it = 0; it < workerPoolSizeRange.size(); it++) {
                     final String s = String.join(",", String.valueOf(ir), String.valueOf(is), String.valueOf(it));
                     if (!configsTried.contains(s)) {
                         final Object[] argsToProfiler =
                                 Stream.of("nioPoolSize", nioPoolSizeRange.get(ir).toString(),
-                                          "parallelism", parallelismSizeRange.get(is).toString(),
-                                          "maxConnectionPoolSize", "15000",
+                                          "maxConnectionPoolSize", maxConnectionPoolSizeRange.get(is).toString(),
                                           "workerPoolSize", workerPoolSizeRange.get(it).toString(),
                                           "noExit", Boolean.TRUE.toString()).toArray();
 
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
index bf675e5..ed9b6d7 100644
--- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
+++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/util/ProfilingApplication.java
@@ -40,7 +40,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.IntStream;
 
 /**
  * An internal application used to test out configuration parameters for Gremlin Driver against Gremlin Server.
@@ -96,22 +95,21 @@ public class ProfilingApplication {
         try {
             final CountDownLatch latch = new CountDownLatch(requests);
             final AtomicInteger inFlight = new AtomicInteger(0);
+            final AtomicInteger count = new AtomicInteger(0);
             client.init();
 
             final long start = System.nanoTime();
-            IntStream.range(0, requests).forEach(i -> {
-                final String s = exercise ? chooseScript() : script;
 
+            while (count.get() < requests) {
                 // control number of requests in flight for testing purposes or else we'll timeout without enough
                 // connections to service all this stuff.
-                while (inFlight.intValue() >= cluster.maxConnectionPoolSize()) {
-                    try {
-                        TimeUnit.MILLISECONDS.sleep(10L);
-                    } catch (Exception ex) {
-                        // wait wait wait
-                    }
+                if (inFlight.intValue() >= cluster.maxConnectionPoolSize()) {
+                    continue;
                 }
 
+                final String s = exercise ? chooseScript() : script;
+
+                count.incrementAndGet();
                 inFlight.incrementAndGet();
                 client.submitAsync(s).thenAcceptAsync(r -> {
                     try {
@@ -125,7 +123,7 @@ public class ProfilingApplication {
                         latch.countDown();
                     }
                 }, executor);
-            });
+            }
 
             // finish once all requests are accounted for
             latch.await();
@@ -152,9 +150,9 @@ public class ProfilingApplication {
     public static void main(final String[] args) {
         final Map<String,Object> options = ElementHelper.asMap(args);
         final boolean noExit = Boolean.parseBoolean(options.getOrDefault("noExit", "false").toString());
-        final int parallelism = Integer.parseInt(options.getOrDefault("parallelism", "16").toString());
+        final int profilerPoolSize = Integer.parseInt(options.getOrDefault("profiler-pool", "2").toString());
         final BasicThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("profiler-%d").build();
-        final ExecutorService executor = Executors.newFixedThreadPool(parallelism, threadFactory);
+        final ExecutorService executor = Executors.newFixedThreadPool(profilerPoolSize, threadFactory);
 
         final String host = options.getOrDefault("host", "localhost").toString();
         final int minExpectedRps = Integer.parseInt(options.getOrDefault("minExpectedRps", "200").toString());
@@ -237,7 +235,7 @@ public class ProfilingApplication {
             System.out.println(String.format("avg req/sec: %s", averageRequestPerSecond));
             if (f != null) {
                 try (final PrintWriter writer = new PrintWriter(new BufferedWriter(new FileWriter(f, true)))) {
-                    writer.println(String.join("\t", String.valueOf(parallelism), String.valueOf(nioPoolSize), String.valueOf(maxConnectionPoolSize), String.valueOf(workerPoolSize), String.valueOf(averageRequestPerSecond)));
+                    writer.println(String.join("\t", String.valueOf(profilerPoolSize), String.valueOf(nioPoolSize), String.valueOf(maxConnectionPoolSize), String.valueOf(workerPoolSize), String.valueOf(averageRequestPerSecond)));
                 }
             }