You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ds...@apache.org on 2020/08/14 20:00:08 UTC
[lucene-solr] branch master updated: SOLR prometheus: simplify
concurrent collection (#1723)
This is an automated email from the ASF dual-hosted git repository.
dsmiley pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/master by this push:
new e6a11f8 SOLR prometheus: simplify concurrent collection (#1723)
e6a11f8 is described below
commit e6a11f8c3a2f5442e7deeaf6c7f40da0f5676721
Author: David Smiley <ds...@apache.org>
AuthorDate: Fri Aug 14 15:59:40 2020 -0400
SOLR prometheus: simplify concurrent collection (#1723)
No semantic difference in behavior.
---
.../collector/MetricsCollectorFactory.java | 6 +-
.../collector/SchedulerMetricsCollector.java | 52 +++++++--------
.../org/apache/solr/prometheus/scraper/Async.java | 61 -----------------
.../solr/prometheus/scraper/SolrCloudScraper.java | 4 +-
.../solr/prometheus/scraper/SolrScraper.java | 43 +++++++-----
.../prometheus/scraper/SolrStandaloneScraper.java | 4 +-
.../apache/solr/prometheus/scraper/AsyncTest.java | 78 ----------------------
7 files changed, 59 insertions(+), 189 deletions(-)
diff --git a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/collector/MetricsCollectorFactory.java b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/collector/MetricsCollectorFactory.java
index 1ad98d1..fdf8c8e 100644
--- a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/collector/MetricsCollectorFactory.java
+++ b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/collector/MetricsCollectorFactory.java
@@ -18,7 +18,7 @@
package org.apache.solr.prometheus.collector;
import java.util.List;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -29,12 +29,12 @@ import org.apache.solr.prometheus.scraper.SolrScraper;
public class MetricsCollectorFactory {
private final MetricsConfiguration metricsConfiguration;
- private final Executor executor;
+ private final ExecutorService executor;
private final int refreshInSeconds;
private final SolrScraper solrScraper;
public MetricsCollectorFactory(
- Executor executor,
+ ExecutorService executor,
int refreshInSeconds,
SolrScraper solrScraper,
MetricsConfiguration metricsConfiguration) {
diff --git a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/collector/SchedulerMetricsCollector.java b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/collector/SchedulerMetricsCollector.java
index 53b0aa1..62763df 100644
--- a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/collector/SchedulerMetricsCollector.java
+++ b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/collector/SchedulerMetricsCollector.java
@@ -19,20 +19,20 @@ package org.apache.solr.prometheus.collector;
import java.io.Closeable;
import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import io.prometheus.client.Collector;
import io.prometheus.client.Histogram;
import org.apache.solr.prometheus.exporter.SolrExporter;
-import org.apache.solr.prometheus.scraper.Async;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,7 +53,7 @@ public class SchedulerMetricsCollector implements Closeable {
1,
new SolrNamedThreadFactory("scheduled-metrics-collector"));
- private final Executor executor;
+ private final ExecutorService executor;
private final List<Observer> observers = new CopyOnWriteArrayList<>();
@@ -63,7 +63,7 @@ public class SchedulerMetricsCollector implements Closeable {
.register(SolrExporter.defaultRegistry);
public SchedulerMetricsCollector(
- Executor executor,
+ ExecutorService executor,
int duration,
TimeUnit timeUnit,
List<MetricCollector> metricCollectors) {
@@ -83,31 +83,27 @@ public class SchedulerMetricsCollector implements Closeable {
try (Histogram.Timer timer = metricsCollectionTime.startTimer()) {
log.info("Beginning metrics collection");
- List<CompletableFuture<MetricSamples>> futures = new ArrayList<>();
-
- for (MetricCollector metricsCollector : metricCollectors) {
- futures.add(CompletableFuture.supplyAsync(() -> {
- try {
- return metricsCollector.collect();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }, executor));
+ final List<Future<MetricSamples>> futures = executor.invokeAll(
+ metricCollectors.stream()
+ .map(metricCollector -> (Callable<MetricSamples>) metricCollector::collect)
+ .collect(Collectors.toList())
+ );
+ MetricSamples metricSamples = new MetricSamples();
+ for (Future<MetricSamples> future : futures) {
+ try {
+ metricSamples.addAll(future.get());
+ } catch (ExecutionException e) {
+ log.error("Error occurred during metrics collection", e.getCause());//logok
+ // continue any ways; do not fail
+ }
}
- try {
- CompletableFuture<List<MetricSamples>> sampleFuture = Async.waitForAllSuccessfulResponses(futures);
- List<MetricSamples> samples = sampleFuture.get();
-
- MetricSamples metricSamples = new MetricSamples();
- samples.forEach(metricSamples::addAll);
+ notifyObservers(metricSamples.asList());
- notifyObservers(metricSamples.asList());
-
- log.info("Completed metrics collection");
- } catch (InterruptedException | ExecutionException e) {
- log.error("Error while waiting for metric collection to complete", e);
- }
+ log.info("Completed metrics collection");
+ } catch (InterruptedException e) {
+ log.warn("Interrupted waiting for metric collection to complete", e);
+ Thread.currentThread().interrupt();
}
}
diff --git a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/Async.java b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/Async.java
deleted file mode 100644
index 2b8c763..0000000
--- a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/Async.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.prometheus.scraper;
-
-import java.lang.invoke.MethodHandles;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class Async {
-
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- @SuppressWarnings({"rawtypes"})
- public static <T> CompletableFuture<List<T>> waitForAllSuccessfulResponses(List<CompletableFuture<T>> futures) {
- CompletableFuture<Void> completed = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
-
- return completed.thenApply(values -> {
- return futures.stream()
- .map(CompletableFuture::join)
- .collect(Collectors.toList());
- }
- ).exceptionally(error -> {
- futures.stream()
- .filter(CompletableFuture::isCompletedExceptionally)
- .forEach(future -> {
- try {
- future.get();
- } catch (Exception exception) {
- log.warn("Error occurred during metrics collection", exception);
- }
- });
-
- return futures.stream()
- .filter(future -> !(future.isCompletedExceptionally() || future.isCancelled()))
- .map(CompletableFuture::join)
- .collect(Collectors.toList());
- }
- );
- }
-
-
-}
diff --git a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrCloudScraper.java b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrCloudScraper.java
index 896ea27..e4b98e7 100644
--- a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrCloudScraper.java
+++ b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrCloudScraper.java
@@ -21,7 +21,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -44,7 +44,7 @@ public class SolrCloudScraper extends SolrScraper {
private Cache<String, HttpSolrClient> hostClientCache = CacheBuilder.newBuilder().build();
- public SolrCloudScraper(CloudSolrClient solrClient, Executor executor, SolrClientFactory solrClientFactory) {
+ public SolrCloudScraper(CloudSolrClient solrClient, ExecutorService executor, SolrClientFactory solrClientFactory) {
super(executor);
this.solrClient = solrClient;
this.solrClientFactory = solrClientFactory;
diff --git a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrScraper.java b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrScraper.java
index bbbfc20..c1ee6aa 100644
--- a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrScraper.java
+++ b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrScraper.java
@@ -21,12 +21,11 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -42,7 +41,6 @@ import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.Pair;
import org.apache.solr.prometheus.collector.MetricSamples;
import org.apache.solr.prometheus.exporter.MetricsQuery;
import org.apache.solr.prometheus.exporter.SolrExporter;
@@ -59,7 +57,7 @@ public abstract class SolrScraper implements Closeable {
protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- protected final Executor executor;
+ protected final ExecutorService executor;
public abstract Map<String, MetricSamples> metricsForAllHosts(MetricsQuery query) throws IOException;
@@ -69,7 +67,7 @@ public abstract class SolrScraper implements Closeable {
public abstract MetricSamples search(MetricsQuery query) throws IOException;
public abstract MetricSamples collections(MetricsQuery metricsQuery) throws IOException;
- public SolrScraper(Executor executor) {
+ public SolrScraper(ExecutorService executor) {
this.executor = executor;
}
@@ -77,17 +75,32 @@ public abstract class SolrScraper implements Closeable {
Collection<String> items,
Function<String, MetricSamples> samplesCallable) throws IOException {
- List<CompletableFuture<Pair<String, MetricSamples>>> futures = items.stream()
- .map(item -> CompletableFuture.supplyAsync(() -> new Pair<>(item, samplesCallable.apply(item)), executor))
- .collect(Collectors.toList());
-
- Future<List<Pair<String, MetricSamples>>> allComplete = Async.waitForAllSuccessfulResponses(futures);
+ Map<String, MetricSamples> result = new HashMap<>(); // sync on this when adding to it below
try {
- return allComplete.get().stream().collect(Collectors.toMap(Pair::first, Pair::second));
- } catch (InterruptedException | ExecutionException e) {
- throw new IOException(e);
+ // invoke each samplesCallable with each item and putting the results in the above "result" map.
+ executor.invokeAll(
+ items.stream()
+ .map(item -> (Callable<MetricSamples>) () -> {
+ try {
+ final MetricSamples samples = samplesCallable.apply(item);
+ synchronized (result) {
+ result.put(item, samples);
+ }
+ } catch (Exception e) {
+ // do NOT totally fail; just log and move on
+ log.warn("Error occurred during metrics collection", e);
+ }
+ return null;//not used
+ })
+ .collect(Collectors.toList())
+ );
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
}
+
+ return result;
}
protected MetricSamples request(SolrClient client, MetricsQuery query) throws IOException {
diff --git a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrStandaloneScraper.java b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrStandaloneScraper.java
index 8c1ee78..4bd8370 100644
--- a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrStandaloneScraper.java
+++ b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrStandaloneScraper.java
@@ -22,7 +22,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.solr.client.solrj.SolrServerException;
@@ -38,7 +38,7 @@ public class SolrStandaloneScraper extends SolrScraper {
private final HttpSolrClient solrClient;
- public SolrStandaloneScraper(HttpSolrClient solrClient, Executor executor) {
+ public SolrStandaloneScraper(HttpSolrClient solrClient, ExecutorService executor) {
super(executor);
this.solrClient = solrClient;
}
diff --git a/solr/contrib/prometheus-exporter/src/test/org/apache/solr/prometheus/scraper/AsyncTest.java b/solr/contrib/prometheus-exporter/src/test/org/apache/solr/prometheus/scraper/AsyncTest.java
deleted file mode 100644
index 0959bd4..0000000
--- a/solr/contrib/prometheus-exporter/src/test/org/apache/solr/prometheus/scraper/AsyncTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.prometheus.scraper;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class AsyncTest {
-
- private CompletableFuture<Integer> failedFuture() {
- CompletableFuture<Integer> result = new CompletableFuture<>();
- result.completeExceptionally(new RuntimeException("Some error"));
- return result;
- }
-
- @Test
- public void getAllResults() throws Exception {
- List<Integer> expectedValues = Arrays.asList(1, 2, 3, 4, 5, 6, 7);
-
- CompletableFuture<List<Integer>> results = Async.waitForAllSuccessfulResponses(
- expectedValues.stream()
- .map(CompletableFuture::completedFuture)
- .collect(Collectors.toList()));
-
- List<Integer> actualValues = results.get();
-
- Collections.sort(expectedValues);
- Collections.sort(actualValues);
-
- assertEquals(expectedValues, actualValues);
- }
-
- @Test
- public void ignoresFailures() throws Exception {
- CompletableFuture<List<Integer>> results = Async.waitForAllSuccessfulResponses(Arrays.asList(
- CompletableFuture.completedFuture(1),
- failedFuture()
- ));
-
- List<Integer> values = results.get();
-
- assertEquals(Collections.singletonList(1), values);
- }
-
- @Test
- public void allFuturesFail() throws Exception {
- CompletableFuture<List<Integer>> results = Async.waitForAllSuccessfulResponses(Collections.singletonList(
- failedFuture()
- ));
-
- List<Integer> values = results.get();
-
- assertTrue(values.isEmpty());
- }
-}
\ No newline at end of file