You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2023/02/17 22:46:37 UTC

[hudi] branch master updated: [HUDI-5795] Ensure we only have a single push gw client and hence a single connection pool

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 28d6bfef58c [HUDI-5795] Ensure we only have a single push gw client and hence a single connection pool
28d6bfef58c is described below

commit 28d6bfef58cd983bfaefb82323137e851d5f630a
Author: Jon Vexler <jb...@gmail.com>
AuthorDate: Fri Feb 17 17:46:30 2023 -0500

    [HUDI-5795] Ensure we only have a single push gw client and hence a single connection pool
    
    Currently we maintain a single Metrics instance per table (and a separate one for metadata table). That means we create a separate instance for PushGateway, the client for push gateway. This implies that we have a connection pool per hudi table, that is leading to a lot of open/waiting/blocked TCP connections on the push gw, possibly causing it to crash.
    
    In this PR, we ensure that we use a single PushGateway instance per hostname and port, ensuring we only maintain a single connection pool per PushGw Server Instance
---
 .../metrics/prometheus/PushGatewayReporter.java    | 27 ++++++++++++++++------
 1 file changed, 20 insertions(+), 7 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PushGatewayReporter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PushGatewayReporter.java
index 5f82b6679ff..98fd1e8f124 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PushGatewayReporter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PushGatewayReporter.java
@@ -35,14 +35,18 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.SortedMap;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
 public class PushGatewayReporter extends ScheduledReporter {
 
   private static final Logger LOG = LogManager.getLogger(PushGatewayReporter.class);
+  // Ensures that we maintain a single PushGw client (single connection pool) per Push Gw Server instance.
+  private static final Map<String, PushGateway> PUSH_GATEWAY_PER_HOSTNAME = new ConcurrentHashMap<>();
 
-  private final PushGateway pushGateway;
+  private final PushGateway pushGatewayClient;
   private final DropwizardExports metricExports;
   private final CollectorRegistry collectorRegistry;
   private final String jobName;
@@ -61,20 +65,29 @@ public class PushGatewayReporter extends ScheduledReporter {
     this.deleteShutdown = deleteShutdown;
     collectorRegistry = new CollectorRegistry();
     metricExports = new DropwizardExports(registry);
-    pushGateway = createPushGatewayClient(serverHost, serverPort);
+    pushGatewayClient = createPushGatewayClient(serverHost, serverPort);
     metricExports.register(collectorRegistry);
   }
 
-  private PushGateway createPushGatewayClient(String serverHost, int serverPort) {
+  private synchronized PushGateway createPushGatewayClient(String serverHost, int serverPort) {
+    final String serverUrl = String.format("%s:%s", serverHost, serverPort);
+    if (PUSH_GATEWAY_PER_HOSTNAME.containsKey(serverUrl)) {
+      return PUSH_GATEWAY_PER_HOSTNAME.get(serverUrl);
+    }
+
+    PushGateway pushGateway;
     if (serverPort == 443) {
       try {
-        return new PushGateway(new URL("https://" + serverHost + ":" + serverPort));
+        pushGateway = new PushGateway(new URL("https://" + serverUrl));
       } catch (MalformedURLException e) {
         e.printStackTrace();
         throw new IllegalArgumentException("Malformed pushgateway host: " + serverHost);
       }
+    } else {
+      pushGateway = new PushGateway(serverUrl);
     }
-    return new PushGateway(serverHost + ":" + serverPort);
+    PUSH_GATEWAY_PER_HOSTNAME.put(serverUrl, pushGateway);
+    return pushGateway;
   }
 
   @Override
@@ -84,7 +97,7 @@ public class PushGatewayReporter extends ScheduledReporter {
                      SortedMap<String, Meter> meters,
                      SortedMap<String, Timer> timers) {
     try {
-      pushGateway.pushAdd(collectorRegistry, jobName);
+      pushGatewayClient.pushAdd(collectorRegistry, jobName);
     } catch (IOException e) {
       LOG.warn("Can't push monitoring information to pushGateway", e);
     }
@@ -101,7 +114,7 @@ public class PushGatewayReporter extends ScheduledReporter {
     try {
       if (deleteShutdown) {
         collectorRegistry.unregister(metricExports);
-        pushGateway.delete(jobName);
+        pushGatewayClient.delete(jobName);
       }
     } catch (IOException e) {
       LOG.warn("Failed to delete metrics from pushGateway with jobName {" + jobName + "}", e);