You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ds...@apache.org on 2020/08/14 20:00:08 UTC

[lucene-solr] branch master updated: SOLR prometheus: simplify concurrent collection (#1723)

This is an automated email from the ASF dual-hosted git repository.

dsmiley pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/master by this push:
     new e6a11f8  SOLR prometheus: simplify concurrent collection (#1723)
e6a11f8 is described below

commit e6a11f8c3a2f5442e7deeaf6c7f40da0f5676721
Author: David Smiley <ds...@apache.org>
AuthorDate: Fri Aug 14 15:59:40 2020 -0400

    SOLR prometheus: simplify concurrent collection (#1723)
    
    No semantic difference in behavior.
---
 .../collector/MetricsCollectorFactory.java         |  6 +-
 .../collector/SchedulerMetricsCollector.java       | 52 +++++++--------
 .../org/apache/solr/prometheus/scraper/Async.java  | 61 -----------------
 .../solr/prometheus/scraper/SolrCloudScraper.java  |  4 +-
 .../solr/prometheus/scraper/SolrScraper.java       | 43 +++++++-----
 .../prometheus/scraper/SolrStandaloneScraper.java  |  4 +-
 .../apache/solr/prometheus/scraper/AsyncTest.java  | 78 ----------------------
 7 files changed, 59 insertions(+), 189 deletions(-)

diff --git a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/collector/MetricsCollectorFactory.java b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/collector/MetricsCollectorFactory.java
index 1ad98d1..fdf8c8e 100644
--- a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/collector/MetricsCollectorFactory.java
+++ b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/collector/MetricsCollectorFactory.java
@@ -18,7 +18,7 @@
 package org.apache.solr.prometheus.collector;
 
 import java.util.List;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -29,12 +29,12 @@ import org.apache.solr.prometheus.scraper.SolrScraper;
 public class MetricsCollectorFactory {
 
   private final MetricsConfiguration metricsConfiguration;
-  private final Executor executor;
+  private final ExecutorService executor;
   private final int refreshInSeconds;
   private final SolrScraper solrScraper;
 
   public MetricsCollectorFactory(
-      Executor executor,
+      ExecutorService executor,
       int refreshInSeconds,
       SolrScraper solrScraper,
       MetricsConfiguration metricsConfiguration) {
diff --git a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/collector/SchedulerMetricsCollector.java b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/collector/SchedulerMetricsCollector.java
index 53b0aa1..62763df 100644
--- a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/collector/SchedulerMetricsCollector.java
+++ b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/collector/SchedulerMetricsCollector.java
@@ -19,20 +19,20 @@ package org.apache.solr.prometheus.collector;
 
 import java.io.Closeable;
 import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import io.prometheus.client.Collector;
 import io.prometheus.client.Histogram;
 import org.apache.solr.prometheus.exporter.SolrExporter;
-import org.apache.solr.prometheus.scraper.Async;
 import org.apache.solr.common.util.SolrNamedThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,7 +53,7 @@ public class SchedulerMetricsCollector implements Closeable {
       1,
       new SolrNamedThreadFactory("scheduled-metrics-collector"));
 
-  private final Executor executor;
+  private final ExecutorService executor;
 
   private final List<Observer> observers = new CopyOnWriteArrayList<>();
 
@@ -63,7 +63,7 @@ public class SchedulerMetricsCollector implements Closeable {
       .register(SolrExporter.defaultRegistry);
 
   public SchedulerMetricsCollector(
-      Executor executor,
+      ExecutorService executor,
       int duration,
       TimeUnit timeUnit,
       List<MetricCollector> metricCollectors) {
@@ -83,31 +83,27 @@ public class SchedulerMetricsCollector implements Closeable {
     try (Histogram.Timer timer = metricsCollectionTime.startTimer()) {
       log.info("Beginning metrics collection");
 
-      List<CompletableFuture<MetricSamples>> futures = new ArrayList<>();
-
-      for (MetricCollector metricsCollector : metricCollectors) {
-        futures.add(CompletableFuture.supplyAsync(() -> {
-          try {
-            return metricsCollector.collect();
-          } catch (Exception e) {
-            throw new RuntimeException(e);
-          }
-        }, executor));
+      final List<Future<MetricSamples>> futures = executor.invokeAll(
+          metricCollectors.stream()
+              .map(metricCollector -> (Callable<MetricSamples>) metricCollector::collect)
+              .collect(Collectors.toList())
+      );
+      MetricSamples metricSamples = new MetricSamples();
+      for (Future<MetricSamples> future : futures) {
+        try {
+          metricSamples.addAll(future.get());
+        } catch (ExecutionException e) {
+          log.error("Error occurred during metrics collection", e.getCause());//logok
+          // continue any ways; do not fail
+        }
       }
 
-      try {
-        CompletableFuture<List<MetricSamples>> sampleFuture = Async.waitForAllSuccessfulResponses(futures);
-        List<MetricSamples> samples = sampleFuture.get();
-
-        MetricSamples metricSamples = new MetricSamples();
-        samples.forEach(metricSamples::addAll);
+      notifyObservers(metricSamples.asList());
 
-        notifyObservers(metricSamples.asList());
-
-        log.info("Completed metrics collection");
-      } catch (InterruptedException | ExecutionException e) {
-        log.error("Error while waiting for metric collection to complete", e);
-      }
+      log.info("Completed metrics collection");
+    } catch (InterruptedException e) {
+      log.warn("Interrupted waiting for metric collection to complete", e);
+      Thread.currentThread().interrupt();
     }
 
   }
diff --git a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/Async.java b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/Async.java
deleted file mode 100644
index 2b8c763..0000000
--- a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/Async.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.prometheus.scraper;
-
-import java.lang.invoke.MethodHandles;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class Async {
-
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  @SuppressWarnings({"rawtypes"})
-  public static <T> CompletableFuture<List<T>> waitForAllSuccessfulResponses(List<CompletableFuture<T>> futures) {
-    CompletableFuture<Void> completed = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
-
-    return completed.thenApply(values -> {
-        return futures.stream()
-          .map(CompletableFuture::join)
-          .collect(Collectors.toList());
-      }
-    ).exceptionally(error -> {
-      futures.stream()
-          .filter(CompletableFuture::isCompletedExceptionally)
-          .forEach(future -> {
-            try {
-              future.get();
-            } catch (Exception exception) {
-              log.warn("Error occurred during metrics collection", exception);
-            }
-          });
-
-      return futures.stream()
-          .filter(future -> !(future.isCompletedExceptionally() || future.isCancelled()))
-          .map(CompletableFuture::join)
-          .collect(Collectors.toList());
-      }
-    );
-  }
-
-
-}
diff --git a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrCloudScraper.java b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrCloudScraper.java
index 896ea27..e4b98e7 100644
--- a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrCloudScraper.java
+++ b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrCloudScraper.java
@@ -21,7 +21,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -44,7 +44,7 @@ public class SolrCloudScraper extends SolrScraper {
 
   private Cache<String, HttpSolrClient> hostClientCache = CacheBuilder.newBuilder().build();
 
-  public SolrCloudScraper(CloudSolrClient solrClient, Executor executor, SolrClientFactory solrClientFactory) {
+  public SolrCloudScraper(CloudSolrClient solrClient, ExecutorService executor, SolrClientFactory solrClientFactory) {
     super(executor);
     this.solrClient = solrClient;
     this.solrClientFactory = solrClientFactory;
diff --git a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrScraper.java b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrScraper.java
index bbbfc20..c1ee6aa 100644
--- a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrScraper.java
+++ b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrScraper.java
@@ -21,12 +21,11 @@ import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -42,7 +41,6 @@ import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.Pair;
 import org.apache.solr.prometheus.collector.MetricSamples;
 import org.apache.solr.prometheus.exporter.MetricsQuery;
 import org.apache.solr.prometheus.exporter.SolrExporter;
@@ -59,7 +57,7 @@ public abstract class SolrScraper implements Closeable {
   protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  protected final Executor executor;
+  protected final ExecutorService executor;
 
   public abstract Map<String, MetricSamples> metricsForAllHosts(MetricsQuery query) throws IOException;
 
@@ -69,7 +67,7 @@ public abstract class SolrScraper implements Closeable {
   public abstract MetricSamples search(MetricsQuery query) throws IOException;
   public abstract MetricSamples collections(MetricsQuery metricsQuery) throws IOException;
 
-  public SolrScraper(Executor executor) {
+  public SolrScraper(ExecutorService executor) {
     this.executor = executor;
   }
 
@@ -77,17 +75,32 @@ public abstract class SolrScraper implements Closeable {
       Collection<String> items,
       Function<String, MetricSamples> samplesCallable) throws IOException {
 
-    List<CompletableFuture<Pair<String, MetricSamples>>> futures = items.stream()
-        .map(item -> CompletableFuture.supplyAsync(() -> new Pair<>(item, samplesCallable.apply(item)), executor))
-        .collect(Collectors.toList());
-
-    Future<List<Pair<String, MetricSamples>>> allComplete = Async.waitForAllSuccessfulResponses(futures);
+    Map<String, MetricSamples> result = new HashMap<>(); // sync on this when adding to it below
 
     try {
-      return allComplete.get().stream().collect(Collectors.toMap(Pair::first, Pair::second));
-    } catch (InterruptedException | ExecutionException e) {
-      throw new IOException(e);
+      // invoke each samplesCallable with each item and putting the results in the above "result" map.
+      executor.invokeAll(
+          items.stream()
+              .map(item -> (Callable<MetricSamples>) () -> {
+                try {
+                  final MetricSamples samples = samplesCallable.apply(item);
+                  synchronized (result) {
+                    result.put(item, samples);
+                  }
+                } catch (Exception e) {
+                  // do NOT totally fail; just log and move on
+                  log.warn("Error occurred during metrics collection", e);
+                }
+                return null;//not used
+              })
+              .collect(Collectors.toList())
+      );
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
     }
+
+    return result;
   }
 
   protected MetricSamples request(SolrClient client, MetricsQuery query) throws IOException {
diff --git a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrStandaloneScraper.java b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrStandaloneScraper.java
index 8c1ee78..4bd8370 100644
--- a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrStandaloneScraper.java
+++ b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/scraper/SolrStandaloneScraper.java
@@ -22,7 +22,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.solr.client.solrj.SolrServerException;
@@ -38,7 +38,7 @@ public class SolrStandaloneScraper extends SolrScraper {
 
   private final HttpSolrClient solrClient;
 
-  public SolrStandaloneScraper(HttpSolrClient solrClient, Executor executor) {
+  public SolrStandaloneScraper(HttpSolrClient solrClient, ExecutorService executor) {
     super(executor);
     this.solrClient = solrClient;
   }
diff --git a/solr/contrib/prometheus-exporter/src/test/org/apache/solr/prometheus/scraper/AsyncTest.java b/solr/contrib/prometheus-exporter/src/test/org/apache/solr/prometheus/scraper/AsyncTest.java
deleted file mode 100644
index 0959bd4..0000000
--- a/solr/contrib/prometheus-exporter/src/test/org/apache/solr/prometheus/scraper/AsyncTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.prometheus.scraper;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class AsyncTest {
-
-  private CompletableFuture<Integer> failedFuture() {
-    CompletableFuture<Integer> result = new CompletableFuture<>();
-    result.completeExceptionally(new RuntimeException("Some error"));
-    return result;
-  }
-
-  @Test
-  public void getAllResults() throws Exception {
-    List<Integer> expectedValues = Arrays.asList(1, 2, 3, 4, 5, 6, 7);
-
-    CompletableFuture<List<Integer>> results = Async.waitForAllSuccessfulResponses(
-        expectedValues.stream()
-            .map(CompletableFuture::completedFuture)
-            .collect(Collectors.toList()));
-
-    List<Integer> actualValues = results.get();
-
-    Collections.sort(expectedValues);
-    Collections.sort(actualValues);
-
-    assertEquals(expectedValues, actualValues);
-  }
-
-  @Test
-  public void ignoresFailures() throws Exception {
-    CompletableFuture<List<Integer>> results = Async.waitForAllSuccessfulResponses(Arrays.asList(
-        CompletableFuture.completedFuture(1),
-        failedFuture()
-    ));
-
-    List<Integer> values = results.get();
-
-    assertEquals(Collections.singletonList(1), values);
-  }
-
-  @Test
-  public void allFuturesFail() throws Exception {
-    CompletableFuture<List<Integer>> results = Async.waitForAllSuccessfulResponses(Collections.singletonList(
-        failedFuture()
-    ));
-
-    List<Integer> values = results.get();
-
-    assertTrue(values.isEmpty());
-  }
-}
\ No newline at end of file