You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2020/09/24 01:28:14 UTC

[GitHub] [cassandra] dcapwell commented on a change in pull request #719: Add metric for client concurrent byte throttle

dcapwell commented on a change in pull request #719:
URL: https://github.com/apache/cassandra/pull/719#discussion_r493984932



##########
File path: src/java/org/apache/cassandra/transport/Server.java
##########
@@ -404,6 +421,28 @@ public void release()
             if (-1 == refCount.updateAndGet(i -> i == 1 ? -1 : i - 1))
                 requestPayloadInFlightPerEndpoint.remove(endpoint, this);
         }
+
+        /**
+         * This will recompute the ip usage histo on each query of the snapshot when requested instead of trying to keep
+         * a histogram up to date with each request
+         */
+        public static class IpUsageReservoir implements Reservoir

Review comment:
       feel that this would be best as a private class in `ClientMetrics` as its not reusable and only to mock out the histogram state.

##########
File path: test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java
##########
@@ -69,137 +82,115 @@ public void dropCreatedTable()
         }
     }
 
+    private SimpleClient client() throws IOException
+    {
+        return new SimpleClient(nativeAddr.getHostAddress(),
+                                nativePort,
+                                ProtocolVersion.V5,
+                                true,
+                                new EncryptionOptions())
+               .connect(false, false, true);
+    }
+
     @Test
     public void testQueryExecutionWithThrowOnOverload() throws Throwable
     {
-        SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
-                                               nativePort,
-                                               ProtocolVersion.V5,
-                                               true,
-                                               new EncryptionOptions());
-
-        try
+        try (SimpleClient client = client())
         {
-            client.connect(false, false, true);
-            QueryOptions queryOptions = QueryOptions.create(
-            QueryOptions.DEFAULT.getConsistency(),
-            QueryOptions.DEFAULT.getValues(),
-            QueryOptions.DEFAULT.skipMetadata(),
-            QueryOptions.DEFAULT.getPageSize(),
-            QueryOptions.DEFAULT.getPagingState(),
-            QueryOptions.DEFAULT.getSerialConsistency(),
-            ProtocolVersion.V5,
-            KEYSPACE);
-
             QueryMessage queryMessage = new QueryMessage("CREATE TABLE atable (pk1 int PRIMARY KEY, v text)",
-                                                         queryOptions);
+                                                         V5_DEFAULT_OPTIONS);
             client.execute(queryMessage);
         }
-        finally
-        {
-            client.close();
-        }
     }
 
     @Test
     public void testQueryExecutionWithoutThrowOnOverload() throws Throwable
     {
-        SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
-                                               nativePort,
-                                               ProtocolVersion.V5,
-                                               true,
-                                               new EncryptionOptions());
-
-        try
+        try (SimpleClient client = client())
         {
             client.connect(false, false, false);
-            QueryOptions queryOptions = QueryOptions.create(
-            QueryOptions.DEFAULT.getConsistency(),
-            QueryOptions.DEFAULT.getValues(),
-            QueryOptions.DEFAULT.skipMetadata(),
-            QueryOptions.DEFAULT.getPageSize(),
-            QueryOptions.DEFAULT.getPagingState(),
-            QueryOptions.DEFAULT.getSerialConsistency(),
-            ProtocolVersion.V5,
-            KEYSPACE);
 
             QueryMessage queryMessage = new QueryMessage("CREATE TABLE atable (pk int PRIMARY KEY, v text)",
-                                                         queryOptions);
+                                                         V5_DEFAULT_OPTIONS);
             client.execute(queryMessage);
             queryMessage = new QueryMessage("SELECT * FROM atable",
-                                            queryOptions);
+                                            V5_DEFAULT_OPTIONS);
             client.execute(queryMessage);
         }
-        finally
+    }
+
+    @Test
+    public void testQueryUpdatesConcurrentMetricsUpdate() throws Throwable
+    {
+        try (SimpleClient client = client())
         {
-            client.close();
+            final QueryMessage create = new QueryMessage("CREATE TABLE atable (pk int PRIMARY KEY, v text)",
+                                                         V5_DEFAULT_OPTIONS);
+            client.execute(create);
+
+            final QueryMessage queryMessage = new QueryMessage("SELECT * FROM atable",
+                                                               V5_DEFAULT_OPTIONS);
+
+            Assert.assertEquals(0L, Server.EndpointPayloadTracker.getCurrentGlobalUsage());
+            AtomicBoolean running = new AtomicBoolean(true);
+            // run query serially on repeat
+            new Thread(() ->
+                       {
+                           while (running.get())
+                           {
+                               client.execute(queryMessage);
+                           }
+                       }).start();
+
+            // checking metric may occur inbetween running of query, so check multiple times for up to 2 seconds
+            long start = System.currentTimeMillis();
+            while (running.get() && System.currentTimeMillis() - start < 2000)
+            {
+                if (Server.EndpointPayloadTracker.getCurrentGlobalUsage() > 0)
+                {
+                    running.set(false);
+                }
+            }
+
+            // if this isnt false it never saw the usage go above zero
+            Assert.assertFalse(running.get());
+
+            // set to false to ensure stopping the background thread
+            running.set(false);

Review comment:
       unreachable as this is true or the above fails; best to add in the finally.

##########
File path: doc/source/operating/metrics.rst
##########
@@ -621,13 +621,16 @@ Reported name format:
 **JMX MBean**
     ``org.apache.cassandra.metrics:type=Client name=<MetricName>``
 
-============================== =============================== ===========
-Name                           Type                            Description
-============================== =============================== ===========
-connectedNativeClients         Gauge<Integer>                  Number of clients connected to this nodes native protocol server
-connections                    Gauge<List<Map<String, String>> List of all connections and their state information
-connectedNativeClientsByUser   Gauge<Map<String, Int>          Number of connnective native clients by username
-============================== =============================== ===========
+============================== ================================ ===========
+Name                           Type                             Description
+============================== ================================ ===========
+ConnectedNativeClients         Gauge<Integer>                   Number of clients connected to this nodes native protocol server

Review comment:
       for me: this was changed by Caleb in a recent JIRA (both names are supported), thanks for updating the docs

##########
File path: test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java
##########
@@ -69,137 +82,115 @@ public void dropCreatedTable()
         }
     }
 
+    private SimpleClient client() throws IOException
+    {
+        return new SimpleClient(nativeAddr.getHostAddress(),
+                                nativePort,
+                                ProtocolVersion.V5,
+                                true,
+                                new EncryptionOptions())
+               .connect(false, false, true);
+    }
+
     @Test
     public void testQueryExecutionWithThrowOnOverload() throws Throwable
     {
-        SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
-                                               nativePort,
-                                               ProtocolVersion.V5,
-                                               true,
-                                               new EncryptionOptions());
-
-        try
+        try (SimpleClient client = client())
         {
-            client.connect(false, false, true);
-            QueryOptions queryOptions = QueryOptions.create(
-            QueryOptions.DEFAULT.getConsistency(),
-            QueryOptions.DEFAULT.getValues(),
-            QueryOptions.DEFAULT.skipMetadata(),
-            QueryOptions.DEFAULT.getPageSize(),
-            QueryOptions.DEFAULT.getPagingState(),
-            QueryOptions.DEFAULT.getSerialConsistency(),
-            ProtocolVersion.V5,
-            KEYSPACE);
-
             QueryMessage queryMessage = new QueryMessage("CREATE TABLE atable (pk1 int PRIMARY KEY, v text)",
-                                                         queryOptions);
+                                                         V5_DEFAULT_OPTIONS);
             client.execute(queryMessage);
         }
-        finally
-        {
-            client.close();
-        }
     }
 
     @Test
     public void testQueryExecutionWithoutThrowOnOverload() throws Throwable
     {
-        SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
-                                               nativePort,
-                                               ProtocolVersion.V5,
-                                               true,
-                                               new EncryptionOptions());
-
-        try
+        try (SimpleClient client = client())
         {
             client.connect(false, false, false);
-            QueryOptions queryOptions = QueryOptions.create(
-            QueryOptions.DEFAULT.getConsistency(),
-            QueryOptions.DEFAULT.getValues(),
-            QueryOptions.DEFAULT.skipMetadata(),
-            QueryOptions.DEFAULT.getPageSize(),
-            QueryOptions.DEFAULT.getPagingState(),
-            QueryOptions.DEFAULT.getSerialConsistency(),
-            ProtocolVersion.V5,
-            KEYSPACE);
 
             QueryMessage queryMessage = new QueryMessage("CREATE TABLE atable (pk int PRIMARY KEY, v text)",
-                                                         queryOptions);
+                                                         V5_DEFAULT_OPTIONS);
             client.execute(queryMessage);
             queryMessage = new QueryMessage("SELECT * FROM atable",
-                                            queryOptions);
+                                            V5_DEFAULT_OPTIONS);
             client.execute(queryMessage);
         }
-        finally
+    }
+
+    @Test
+    public void testQueryUpdatesConcurrentMetricsUpdate() throws Throwable
+    {
+        try (SimpleClient client = client())
         {
-            client.close();
+            final QueryMessage create = new QueryMessage("CREATE TABLE atable (pk int PRIMARY KEY, v text)",
+                                                         V5_DEFAULT_OPTIONS);
+            client.execute(create);
+
+            final QueryMessage queryMessage = new QueryMessage("SELECT * FROM atable",
+                                                               V5_DEFAULT_OPTIONS);
+
+            Assert.assertEquals(0L, Server.EndpointPayloadTracker.getCurrentGlobalUsage());
+            AtomicBoolean running = new AtomicBoolean(true);
+            // run query serially on repeat
+            new Thread(() ->
+                       {
+                           while (running.get())
+                           {
+                               client.execute(queryMessage);
+                           }
+                       }).start();
+
+            // checking metric may occur inbetween running of query, so check multiple times for up to 2 seconds
+            long start = System.currentTimeMillis();
+            while (running.get() && System.currentTimeMillis() - start < 2000)

Review comment:
       I am concerned that this test may become flaky as its very specific to timing.  I think byte buddy would be needed to control the timing to avoid flakiness sadly =(




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org