You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by wu...@apache.org on 2022/11/16 08:38:23 UTC
[ambari-metrics] branch master updated: AMBARI-25549: NegativeArraySizeException thrown when invoking CurrentCollectorHost (#57)
This is an automated email from the ASF dual-hosted git repository.
wuzhiguo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ambari-metrics.git
The following commit(s) were added to refs/heads/master by this push:
new dd01477 AMBARI-25549: NegativeArraySizeException thrown when invoking CurrentCollectorHost (#57)
dd01477 is described below
commit dd0147782f26a6a1ba4ce13181a2ee3d95320afa
Author: lucasbak <lu...@gmail.com>
AuthorDate: Wed Nov 16 09:38:18 2022 +0100
AMBARI-25549: NegativeArraySizeException thrown when invoking CurrentCollectorHost (#57)
---
.../sink/timeline/AbstractTimelineMetricsSink.java | 178 ++++++++++++---------
...etricSinkWriteShardHostnameHashingStrategy.java | 5 +-
.../timeline/AbstractTimelineMetricSinkTest.java | 66 ++++++++
3 files changed, 169 insertions(+), 80 deletions(-)
diff --git a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
index d77f60c..ee036ca 100644
--- a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
+++ b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -119,6 +119,8 @@ public abstract class AbstractTimelineMetricsSink {
// well as timed refresh
protected Supplier<String> targetCollectorHostSupplier;
+ private final CollectorShardSupplier collectorShardSupplier = new CollectorShardSupplier();
+
protected final SortedSet<String> allKnownLiveCollectors = new TreeSet<>();
private volatile boolean isInitializedForHA = false;
@@ -148,6 +150,20 @@ public abstract class AbstractTimelineMetricsSink {
.withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
}
+ private final class CollectorShardSupplier implements Supplier<String> {
+ @Override
+ public String get() {
+ //shardExpired flag is used to determine if the Supplier.get() is invoked through the
+ // findPreferredCollectHost method (No need to refresh collector hosts)
+ // OR
+ // through Expiry (Refresh needed to pick up dead collectors that might have not become alive).
+ if (shardExpired) {
+ refreshCollectorsFromConfigured(getConfiguredCollectorHosts());
+ }
+ return metricSinkWriteShardStrategy.findCollectorShard(new ArrayList<>(allKnownLiveCollectors));
+ }
+ }
+
public AbstractTimelineMetricsSink() {
LOG = LogFactory.getLog(this.getClass());
}
@@ -272,18 +288,22 @@ public abstract class AbstractTimelineMetricsSink {
protected String getCurrentCollectorHost() {
String collectorHost;
- // Get cached target
- if (targetCollectorHostSupplier != null) {
- collectorHost = targetCollectorHostSupplier.get();
- // Last X attempts have failed - force refresh
- if (failedCollectorConnectionsCounter.get() > RETRY_COUNT_BEFORE_COLLECTOR_FAILOVER) {
- LOG.debug("Removing collector " + collectorHost + " from allKnownLiveCollectors.");
- allKnownLiveCollectors.remove(collectorHost);
- targetCollectorHostSupplier = null;
+ /* Using collectorShardSupplier as sync_object since the Supplier.get() is using the same
+ object under the hood to provide thread safety. */
+ synchronized (collectorShardSupplier) {
+ // Get cached target
+ if (targetCollectorHostSupplier != null) {
+ collectorHost = targetCollectorHostSupplier.get();
+ // Last X attempts have failed - force refresh
+ if (failedCollectorConnectionsCounter.get() > RETRY_COUNT_BEFORE_COLLECTOR_FAILOVER) {
+ LOG.debug("Removing collector " + collectorHost + " from allKnownLiveCollectors.");
+ allKnownLiveCollectors.remove(collectorHost);
+ targetCollectorHostSupplier = null;
+ collectorHost = findPreferredCollectHost();
+ }
+ } else {
collectorHost = findPreferredCollectHost();
}
- } else {
- collectorHost = findPreferredCollectHost();
}
if (collectorHost == null) {
@@ -360,11 +380,15 @@ public abstract class AbstractTimelineMetricsSink {
*
* @return the app cookie manager
*/
- public synchronized AppCookieManager getAppCookieManager() {
- if (appCookieManager == null) {
- appCookieManager = new AppCookieManager();
+ public AppCookieManager getAppCookieManager() {
+ /* Using collectorShardSupplier as sync_object since the Supplier.get() is using the same
+ object under the hood to provide thread safety. */
+ synchronized (collectorShardSupplier) {
+ if (appCookieManager == null) {
+ appCookieManager = new AppCookieManager();
+ }
+ return appCookieManager;
}
- return appCookieManager;
}
/**
@@ -513,72 +537,59 @@ public abstract class AbstractTimelineMetricsSink {
*
* @return String Collector hostname
*/
- protected synchronized String findPreferredCollectHost() {
- if (!isInitializedForHA) {
- init();
- }
-
- shardExpired = false;
- // Auto expire and re-calculate after 1 hour
- if (targetCollectorHostSupplier != null) {
- String targetCollector = targetCollectorHostSupplier.get();
- if (targetCollector != null) {
- return targetCollector;
+ protected String findPreferredCollectHost() {
+ /* Using collectorShardSupplier as sync_object since the Supplier.get() is using the same
+ object under the hood to provide thread safety. */
+ synchronized (collectorShardSupplier) {
+ if (!isInitializedForHA) {
+ init();
}
- }
- // Reach out to all configured collectors before Zookeeper
- Collection<String> collectorHosts = getConfiguredCollectorHosts();
- refreshCollectorsFromConfigured(collectorHosts);
-
- // Lookup Zookeeper for live hosts - max 10 seconds wait time
- long currentTime = System.currentTimeMillis();
- if (allKnownLiveCollectors.size() == 0 && getZookeeperQuorum() != null
- && (currentTime - lastFailedZkRequestTime) > zookeeperBackoffTimeMillis) {
-
- LOG.debug("No live collectors from configuration. Requesting zookeeper...");
- allKnownLiveCollectors.addAll(collectorHAHelper.findLiveCollectorHostsFromZNode());
- boolean noNewCollectorFromZk = true;
- for (String collectorHostFromZk : allKnownLiveCollectors) {
- if (!collectorHosts.contains(collectorHostFromZk)) {
- noNewCollectorFromZk = false;
- break;
+ shardExpired = false;
+ // Auto expire and re-calculate after 1 hour
+ if (targetCollectorHostSupplier != null) {
+ String targetCollector = targetCollectorHostSupplier.get();
+ if (targetCollector != null) {
+ return targetCollector;
}
}
- if (noNewCollectorFromZk) {
- LOG.debug("No new collector was found from Zookeeper. Will not request zookeeper for " + zookeeperBackoffTimeMillis + " millis");
- lastFailedZkRequestTime = System.currentTimeMillis();
- }
- }
- if (allKnownLiveCollectors.size() != 0) {
- targetCollectorHostSupplier = Suppliers.memoizeWithExpiration(
- new Supplier<String>() {
- @Override
- public String get() {
- //shardExpired flag is used to determine if the Supplier.get() is invoked through the
- // findPreferredCollectHost method (No need to refresh collector hosts
- // OR
- // through Expiry (Refresh needed to pick up dead collectors that might have not become alive).
- if (shardExpired) {
- refreshCollectorsFromConfigured(getConfiguredCollectorHosts());
- }
- return metricSinkWriteShardStrategy.findCollectorShard(new ArrayList<>(allKnownLiveCollectors));
+ // Reach out to all configured collectors before Zookeeper
+ Collection<String> collectorHosts = getConfiguredCollectorHosts();
+ refreshCollectorsFromConfigured(collectorHosts);
+
+ // Lookup Zookeeper for live hosts - max 10 seconds wait time
+ long currentTime = System.currentTimeMillis();
+ if (allKnownLiveCollectors.size() == 0 && getZookeeperQuorum() != null
+ && (currentTime - lastFailedZkRequestTime) > zookeeperBackoffTimeMillis) {
+
+ LOG.debug("No live collectors from configuration. Requesting zookeeper...");
+ allKnownLiveCollectors.addAll(collectorHAHelper.findLiveCollectorHostsFromZNode());
+ boolean noNewCollectorFromZk = true;
+ for (String collectorHostFromZk : allKnownLiveCollectors) {
+ if (!collectorHosts.contains(collectorHostFromZk)) {
+ noNewCollectorFromZk = false;
+ break;
}
- }, // random.nextInt(max - min + 1) + min # (60 to 75 minutes)
- rand.nextInt(COLLECTOR_HOST_CACHE_MAX_EXPIRATION_MINUTES
- - COLLECTOR_HOST_CACHE_MIN_EXPIRATION_MINUTES + 1)
- + COLLECTOR_HOST_CACHE_MIN_EXPIRATION_MINUTES,
- TimeUnit.MINUTES
- );
-
- String collectorHost = targetCollectorHostSupplier.get();
+ }
+ if (noNewCollectorFromZk) {
+ LOG.debug("No new collector was found from Zookeeper. Will not request zookeeper for " + zookeeperBackoffTimeMillis + " millis");
+ lastFailedZkRequestTime = System.currentTimeMillis();
+ }
+ }
+
+ if (allKnownLiveCollectors.size() != 0) {
+ SupplierExpiry expiry = getSupplierExpiry();
+ targetCollectorHostSupplier = Suppliers.memoizeWithExpiration(collectorShardSupplier, expiry.duration, expiry.timeUnit);
+
+ String collectorHost = targetCollectorHostSupplier.get();
+ shardExpired = true;
+ return collectorHost;
+ }
+ LOG.debug("Couldn't find any live collectors. Returning null");
shardExpired = true;
- return collectorHost;
+ return null;
}
- LOG.debug("Couldn't find any live collectors. Returning null");
- shardExpired = true;
- return null;
}
private void refreshCollectorsFromConfigured(Collection<String> collectorHosts) {
@@ -591,14 +602,12 @@ public abstract class AbstractTimelineMetricsSink {
try {
Collection<String> liveHosts = findLiveCollectorHostsFromKnownCollector(hostStr, getCollectorPort());
// Update live Hosts - current host will already be a part of this
- for (String host : liveHosts) {
- allKnownLiveCollectors.add(host);
- }
+ allKnownLiveCollectors.addAll(liveHosts);
break; // Found at least 1 live collector
} catch (MetricCollectorUnavailableException e) {
LOG.debug("Collector " + hostStr + " is not longer live. Removing " +
"it from list of know live collector hosts : " + allKnownLiveCollectors);
- allKnownLiveCollectors.remove(hostStr);
+ boolean res = allKnownLiveCollectors.remove(hostStr);
}
}
}
@@ -711,6 +720,23 @@ public abstract class AbstractTimelineMetricsSink {
return metricsPostCache;
}
+ class SupplierExpiry {
+ final long duration;
+ final TimeUnit timeUnit;
+
+ public SupplierExpiry(long duration, TimeUnit timeUnit) {
+ this.duration = duration;
+ this.timeUnit = timeUnit;
+ }
+ }
+ protected SupplierExpiry getSupplierExpiry() {
+ // random.nextInt(max - min + 1) + min # (60 to 75 minutes)
+ long duration = rand.nextInt(COLLECTOR_HOST_CACHE_MAX_EXPIRATION_MINUTES
+ - COLLECTOR_HOST_CACHE_MIN_EXPIRATION_MINUTES + 1)
+ + COLLECTOR_HOST_CACHE_MIN_EXPIRATION_MINUTES;
+ return new SupplierExpiry(duration, TimeUnit.MINUTES);
+ }
+
/**
* Get a pre-formatted URI for the collector
*/
diff --git a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardHostnameHashingStrategy.java b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardHostnameHashingStrategy.java
index c79bbfb..23bf68c 100644
--- a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardHostnameHashingStrategy.java
+++ b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardHostnameHashingStrategy.java
@@ -36,10 +36,7 @@ public class MetricSinkWriteShardHostnameHashingStrategy implements MetricSinkWr
@Override
public String findCollectorShard(List<String> collectorHosts) {
- if(collectorHosts.size() ==0) {
- LOG.warn("No metrics collectors alive, please check collector status!");
- return "";
- }
+ if (collectorHosts.isEmpty()) return null;
long index = hostnameHash % collectorHosts.size();
index = index < 0 ? index + collectorHosts.size() : index;
String collectorHost = collectorHosts.get((int) index);
diff --git a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricSinkTest.java b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricSinkTest.java
index f3f8a62..d494a8e 100644
--- a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricSinkTest.java
+++ b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricSinkTest.java
@@ -30,7 +30,13 @@ import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.easymock.EasyMock.anyString;
import static org.easymock.EasyMock.expect;
@@ -204,6 +210,41 @@ public class AbstractTimelineMetricSinkTest {
verifyAll();
}
+ @Test
+ public void testGetCurrentCollectorHostMultiThreaded() {
+ final int threadPoolSize = 32;
+ final int numberOfThreads = threadPoolSize * 8;
+ final AtomicBoolean stop = new AtomicBoolean(false);
+ final TestTimelineMetricsThreadingSink sink = new TestTimelineMetricsThreadingSink();
+ final ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize);
+ int threadCount = 0;
+
+ try {
+ while (!stop.get() && threadCount < numberOfThreads) {
+ executorService.execute(() -> {
+ try {
+ String host = sink.getCurrentCollectorHost();
+ } catch (Exception e) {
+ e.printStackTrace();
+ stop.set(true);
+ executorService.shutdownNow();
+ }
+ });
+
+ threadCount++;
+ }
+
+ executorService.awaitTermination(30, TimeUnit.SECONDS);
+ if(stop.get()) {
+ Assert.fail("Unexpected exception(s) has been thrown! See the stack traces above!");
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } finally {
+ executorService.shutdown();
+ }
+ }
+
private class TestTimelineMetricsSink extends AbstractTimelineMetricsSink {
@Override
protected String getCollectorUri(String host) {
@@ -255,4 +296,29 @@ public class AbstractTimelineMetricSinkTest {
return "http";
}
}
+
+ private class TestTimelineMetricsThreadingSink extends TestTimelineMetricsSink {
+ private final AtomicInteger counter = new AtomicInteger(0);
+ private final Collection<String> hosts = Collections.singletonList("host1");
+
+ @Override
+ protected Collection<String> findLiveCollectorHostsFromKnownCollector(String host, String port) throws MetricCollectorUnavailableException {
+ failedCollectorConnectionsCounter.set(4);
+ int c = counter.getAndIncrement();
+ if (c % 2 == 0 || c % 3 == 0) {
+ throw new MetricCollectorUnavailableException("MetricCollectorUnavailable");
+ }
+ return hosts;
+ }
+
+ @Override
+ protected Collection<String> getConfiguredCollectorHosts() {
+ return hosts;
+ }
+
+ @Override
+ protected SupplierExpiry getSupplierExpiry() {
+ return new SupplierExpiry(128, TimeUnit.MILLISECONDS);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ambari.apache.org
For additional commands, e-mail: commits-help@ambari.apache.org