You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@lucene.apache.org by GitBox <gi...@apache.org> on 2020/09/09 04:24:57 UTC

[GitHub] [lucene-solr] rishisankar commented on a change in pull request #1770: SOLR-14763 SolrJ HTTP/2 Async API using CompletableFuture

rishisankar commented on a change in pull request #1770:
URL: https://github.com/apache/lucene-solr/pull/1770#discussion_r485329143



##########
File path: solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
##########
@@ -537,40 +541,100 @@ public void registerDocCollectionWatcher(String collection, DocCollectionWatcher
       }
     }
 
-    final NamedList<Throwable> exceptions = new NamedList<>();
-    @SuppressWarnings({"rawtypes"})
-    final NamedList<NamedList> shardResponses = new NamedList<>(routes.size()+1); // +1 for deleteQuery
+    final NamedList<NamedList<?>> shardResponses = new NamedList<>(routes.size()+1); // +1 for deleteQuery
 
     long start = System.nanoTime();
 
+    CompletableFuture<Void> updateFuture;
     if (parallelUpdates) {
-      final Map<String, Future<NamedList<?>>> responseFutures = new HashMap<>(routes.size());
-      for (final Map.Entry<String, ? extends LBSolrClient.Req> entry : routes.entrySet()) {
-        final String url = entry.getKey();
-        final LBSolrClient.Req lbRequest = entry.getValue();
-        try {
-          MDC.put("CloudSolrClient.url", url);
-          responseFutures.put(url, threadPool.submit(() -> {
-            return getLbClient().request(lbRequest).getResponse();
-          }));
-        } finally {
-          MDC.remove("CloudSolrClient.url");
+      updateFuture = doUpdatesWithExecutor(routes, shardResponses, isAsyncRequest);
+    } else {
+      updateFuture = doUpdatesWithoutExecutor(routes, shardResponses, isAsyncRequest);
+    }
+
+    CompletableFuture<NamedList<Object>> apiFuture = new CompletableFuture<>();
+    if (!isAsyncRequest) {
+      try {
+        updateFuture.get();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        if (cause instanceof SolrServerException) {
+          throw (SolrServerException) cause;
+        } else if (cause instanceof RuntimeException) {
+          throw (RuntimeException) cause;
+        } else {
+          throw new SolrServerException(cause);
         }
       }
+      doDeleteQuery(updateRequest, nonRoutableParams, routes, shardResponses, apiFuture, start, isAsyncRequest);
+    } else {
+      updateFuture.whenComplete((result, error) -> {
+        if (updateFuture.isCompletedExceptionally()) {
+          apiFuture.completeExceptionally(error);
+        } else {
+          doDeleteQuery(updateRequest, nonRoutableParams, routes, shardResponses, apiFuture, start, isAsyncRequest);
+        }
+      });
 
-      for (final Map.Entry<String, Future<NamedList<?>>> entry: responseFutures.entrySet()) {
-        final String url = entry.getKey();
-        final Future<NamedList<?>> responseFuture = entry.getValue();
-        try {
-          shardResponses.add(url, responseFuture.get());
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new RuntimeException(e);
-        } catch (ExecutionException e) {
-          exceptions.add(url, e.getCause());
+      apiFuture.exceptionally((error) -> {
+        if (apiFuture.isCancelled()) {
+          updateFuture.cancel(true);
         }
+        return null;
+      });
+    }
+
+    return apiFuture;
+  }
+
+  private CompletableFuture<Void> doUpdatesWithExecutor(final Map<String, ? extends LBSolrClient.Req> routes,
+                                                        NamedList<NamedList<?>> shardResponses,
+                                                        boolean isAsyncRequest) {
+    final NamedList<Throwable> exceptions = new NamedList<>();
+    final Map<String, CompletableFuture<NamedList<Object>>> responseFutures = new HashMap<>(routes.size());
+    for (final Map.Entry<String, ? extends LBSolrClient.Req> entry : routes.entrySet()) {
+      final String url = entry.getKey();
+      final LBSolrClient.Req lbRequest = entry.getValue();
+      try {
+        MDC.put("CloudSolrClient.url", url);
+        final CompletableFuture<NamedList<Object>> future = new CompletableFuture<>();
+        if (isAsyncRequest) {
+          CompletableFuture<LBSolrClient.Rsp> reqFuture = getLbClient().requestAsync(lbRequest);
+          reqFuture.whenComplete((result, error) -> {
+            if (!reqFuture.isCompletedExceptionally()) {
+              future.complete(result.getResponse());
+            } else {
+              future.completeExceptionally(error);
+            }
+          });
+        } else {

Review comment:
       I've added an implementation (in the SolrClient class) for requestAsync with an executor (so that the request can be canceled). Currently, it only takes an `ExecutorService`, although I can switch `ExecutorService` to `Executor` with the drawback of the async request not being cancellable - curious which you think would be better?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org