You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2023/02/17 22:46:37 UTC
[hudi] branch master updated: [HUDI-5795] Ensure we only have a single push gw client and hence a single connection pool
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 28d6bfef58c [HUDI-5795] Ensure we only have a single push gw client and hence a single connection pool
28d6bfef58c is described below
commit 28d6bfef58cd983bfaefb82323137e851d5f630a
Author: Jon Vexler <jb...@gmail.com>
AuthorDate: Fri Feb 17 17:46:30 2023 -0500
[HUDI-5795] Ensure we only have a single push gw client and hence a single connection pool
Currently we maintain a single Metrics instance per table (and a separate one for metadata table). That means we create a separate instance for PushGateway, the client for push gateway. This implies that we have a connection pool per hudi table, that is leading to a lot of open/waiting/blocked TCP connections on the push gw, possibly causing it to crash.
In this PR, we ensure that we use a single PushGateway instance per hostname and port, ensuring we only maintain a single connection pool per PushGw Server Instance
---
.../metrics/prometheus/PushGatewayReporter.java | 27 ++++++++++++++++------
1 file changed, 20 insertions(+), 7 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PushGatewayReporter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PushGatewayReporter.java
index 5f82b6679ff..98fd1e8f124 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PushGatewayReporter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/prometheus/PushGatewayReporter.java
@@ -35,14 +35,18 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
+import java.util.Map;
import java.util.SortedMap;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
public class PushGatewayReporter extends ScheduledReporter {
private static final Logger LOG = LogManager.getLogger(PushGatewayReporter.class);
+ // Ensures that we maintain a single PushGw client (single connection pool) per Push Gw Server instance.
+ private static final Map<String, PushGateway> PUSH_GATEWAY_PER_HOSTNAME = new ConcurrentHashMap<>();
- private final PushGateway pushGateway;
+ private final PushGateway pushGatewayClient;
private final DropwizardExports metricExports;
private final CollectorRegistry collectorRegistry;
private final String jobName;
@@ -61,20 +65,29 @@ public class PushGatewayReporter extends ScheduledReporter {
this.deleteShutdown = deleteShutdown;
collectorRegistry = new CollectorRegistry();
metricExports = new DropwizardExports(registry);
- pushGateway = createPushGatewayClient(serverHost, serverPort);
+ pushGatewayClient = createPushGatewayClient(serverHost, serverPort);
metricExports.register(collectorRegistry);
}
- private PushGateway createPushGatewayClient(String serverHost, int serverPort) {
+ private synchronized PushGateway createPushGatewayClient(String serverHost, int serverPort) {
+ final String serverUrl = String.format("%s:%s", serverHost, serverPort);
+ if (PUSH_GATEWAY_PER_HOSTNAME.containsKey(serverUrl)) {
+ return PUSH_GATEWAY_PER_HOSTNAME.get(serverUrl);
+ }
+
+ PushGateway pushGateway;
if (serverPort == 443) {
try {
- return new PushGateway(new URL("https://" + serverHost + ":" + serverPort));
+ pushGateway = new PushGateway(new URL("https://" + serverUrl));
} catch (MalformedURLException e) {
e.printStackTrace();
throw new IllegalArgumentException("Malformed pushgateway host: " + serverHost);
}
+ } else {
+ pushGateway = new PushGateway(serverUrl);
}
- return new PushGateway(serverHost + ":" + serverPort);
+ PUSH_GATEWAY_PER_HOSTNAME.put(serverUrl, pushGateway);
+ return pushGateway;
}
@Override
@@ -84,7 +97,7 @@ public class PushGatewayReporter extends ScheduledReporter {
SortedMap<String, Meter> meters,
SortedMap<String, Timer> timers) {
try {
- pushGateway.pushAdd(collectorRegistry, jobName);
+ pushGatewayClient.pushAdd(collectorRegistry, jobName);
} catch (IOException e) {
LOG.warn("Can't push monitoring information to pushGateway", e);
}
@@ -101,7 +114,7 @@ public class PushGatewayReporter extends ScheduledReporter {
try {
if (deleteShutdown) {
collectorRegistry.unregister(metricExports);
- pushGateway.delete(jobName);
+ pushGatewayClient.delete(jobName);
}
} catch (IOException e) {
LOG.warn("Failed to delete metrics from pushGateway with jobName {" + jobName + "}", e);