You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/03/29 22:54:11 UTC

[GitHub] merlimat closed pull request #1472: Pulsar Proxy - Added Prometheus metrics for throttled connections and���

merlimat closed pull request #1472: Pulsar Proxy - Added Prometheus metrics for throttled connections and…
URL: https://github.com/apache/incubator-pulsar/pull/1472
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index aad42dfaa..bbf1c4425 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -38,6 +38,7 @@
 import static org.apache.commons.lang3.StringUtils.isBlank;
 
 public class LookupProxyHandler {
+    private final String throttlingErrorMessage = "Too many concurrent lookup and partitionsMetadata requests";
     private final ProxyService service;
     private final ProxyConnection proxyConnection;
     private final boolean connectWithTLS;
@@ -52,6 +53,14 @@
             .build("pulsar_proxy_partitions_metadata_requests", "Counter of partitions metadata requests").create()
             .register();
 
+    static final Counter rejectedLookupRequests = Counter.build("pulsar_proxy_rejected_lookup_requests",
+            "Counter of topic lookup requests rejected due to throttling").create().register();
+
+    static final Counter rejectedPartitionsMetadataRequests = Counter
+            .build("pulsar_proxy_rejected_partitions_metadata_requests",
+                    "Counter of partitions metadata requests rejected due to throttling")
+            .create().register();
+
     public LookupProxyHandler(ProxyService proxy, ProxyConnection proxyConnection) {
         this.service = proxy;
         this.proxyConnection = proxyConnection;
@@ -89,11 +98,13 @@ public void handleLookup(CommandLookupTopic lookup) {
             performLookup(clientRequestId, topic, serviceUrl, false, 10);
             this.service.getLookupRequestSemaphore().release();
         } else {
+            rejectedLookupRequests.inc();
             if (log.isDebugEnabled()) {
-                log.debug("Request ID {} from {} rejected - Too many concurrent lookup requests.", clientRequestId, clientAddress);
+                log.debug("Lookup Request ID {} from {} rejected - {}.", clientRequestId, clientAddress,
+                        throttlingErrorMessage);
             }
             proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
-                    "Too many concurrent lookup requests", clientRequestId));
+                    throttlingErrorMessage, clientRequestId));
         }
 
     }
@@ -137,7 +148,6 @@ private void performLookup(long clientRequestId, String topic, String brokerServ
                     performLookup(clientRequestId, topic, brokerUrl, result.authoritative, numberOfRetries - 1);
                 } else {
                     // Reply the same address for both TLS non-TLS. The reason is that whether we use TLS
-                    // between proxy
                     // and broker is independent of whether the client itself uses TLS, but we need to force the
                     // client
                     // to use the appropriate target broker (and port) when it will connect back.
@@ -168,8 +178,13 @@ public void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata part
             handlePartitionMetadataResponse(partitionMetadata, clientRequestId);
             this.service.getLookupRequestSemaphore().release();
         } else {
+            rejectedPartitionsMetadataRequests.inc();
+            if (log.isDebugEnabled()) {
+                log.debug("PartitionMetaData Request ID {} from {} rejected - {}.", clientRequestId, clientAddress,
+                        throttlingErrorMessage);
+            }
             proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady,
-                    "Too many concurrent lookup requests", clientRequestId));
+                    throttlingErrorMessage, clientRequestId));
         }
     }
 
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index e65461a3b..d2c7f9709 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -40,8 +40,6 @@
 
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandler;
-import io.netty.channel.ChannelPipeline;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.FutureListener;
@@ -87,6 +85,10 @@
             .build("pulsar_proxy_new_connections", "Counter of connections being opened in the proxy").create()
             .register();
 
+    static final Counter rejectedConnections = Counter
+            .build("pulsar_proxy_rejected_connections", "Counter for connections rejected due to throttling").create()
+            .register();
+    
     public ProxyConnection(ProxyService proxyService) {
         super(30, TimeUnit.SECONDS);
         this.service = proxyService;
@@ -98,10 +100,8 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
         super.channelRegistered(ctx);
         activeConnections.inc();
         if (activeConnections.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("[{}] Too many connection opened {}", remoteAddress, activeConnections.get());
-            }
             ctx.close();
+            rejectedConnections.inc();
             return;
         }
     }
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
index 008e751e3..21c88c6cc 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionThrottlingTest.java
@@ -74,6 +74,7 @@ public void testInboundConnection() throws Exception {
         PulsarClient client2 = PulsarClient.builder().serviceUrl("pulsar://localhost:" + proxyConfig.getServicePort())
                 .build();
         Producer<byte[]> producer2;
+        Assert.assertEquals(ProxyConnection.rejectedConnections.get(), 0.0d);
         try {
             producer2 = client2.newProducer().topic("persistent://sample/test/local/producer-topic-1").create();
             producer2.send("Message 1".getBytes());
@@ -81,6 +82,7 @@ public void testInboundConnection() throws Exception {
         } catch (Exception ex) {
             // OK
         }
+        Assert.assertEquals(ProxyConnection.rejectedConnections.get(), 1.0d);
     }
     
     private static final Logger LOG = LoggerFactory.getLogger(ProxyConnectionThrottlingTest.class);
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
index c661caea4..4411f8097 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyLookupThrottlingTest.java
@@ -78,7 +78,7 @@ public void testLookup() throws Exception {
         } catch (Exception ex) {
             // Ignore
         }
-
+        Assert.assertEquals(LookupProxyHandler.rejectedPartitionsMetadataRequests.get(), 1.0d);
         proxyService.getLookupRequestSemaphore().release();
         try {
             Producer<byte[]> producer3 = client.newProducer().topic("persistent://sample/test/local/producer-topic")
@@ -86,6 +86,7 @@ public void testLookup() throws Exception {
         } catch (Exception ex) {
             Assert.fail("Should not have failed since can acquire LookupRequestSemaphore");
         }
+        Assert.assertEquals(LookupProxyHandler.rejectedPartitionsMetadataRequests.get(), 1.0d);
         client.close();
     }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services