You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by is...@apache.org on 2019/08/28 10:04:44 UTC

[lucene-solr] 01/02: SOLR-13718: Async Collection API calls should throw exception when an underlying async call fails

This is an automated email from the ASF dual-hosted git repository.

ishan pushed a commit to branch jira/solr-13718-8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit c7b887a01036c2933235ccc1ceaf19f2705307bf
Author: Ishan Chattopadhyaya <is...@apache.org>
AuthorDate: Mon Aug 26 16:24:20 2019 +0530

    SOLR-13718: Async Collection API calls should throw exception when an underlying async call fails
---
 .../OverseerCollectionMessageHandler.java          | 68 +++++++++++++++++++++-
 1 file changed, 67 insertions(+), 1 deletion(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 6fbab13..ca583a6 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -707,6 +707,38 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
     return ((AddReplicaCmd) commandMap.get(ADDREPLICA)).addReplica(clusterState, message, results, onComplete);
   }
 
+  void processResponses(NamedList results, ShardHandler shardHandler, boolean abortOnError, String msgOnError,
+                        String asyncId, Map<String, String> requestMap) {
+    processResponses(results, shardHandler, abortOnError, msgOnError, asyncId, requestMap, Collections.emptySet());
+  }
+
+  void processResponses(NamedList results, ShardHandler shardHandler, boolean abortOnError, String msgOnError,
+                                String asyncId, Map<String, String> requestMap, Set<String> okayExceptions) {
+    //Processes all shard responses
+    ShardResponse srsp;
+    do {
+      srsp = shardHandler.takeCompletedOrError();
+      if (srsp != null) {
+        processResponse(results, srsp, okayExceptions);
+        Throwable exception = srsp.getException();
+        if (abortOnError && exception != null)  {
+          // drain pending requests
+          while (srsp != null)  {
+            srsp = shardHandler.takeCompletedOrError();
+          }
+          throw new SolrException(ErrorCode.SERVER_ERROR, msgOnError, exception);
+        }
+      }
+    } while (srsp != null);
+
+    //If request is async wait for the core admin to complete before returning
+    if (asyncId != null) {
+      waitForAsyncCallsToComplete(requestMap, results, true, msgOnError);
+      requestMap.clear();
+    }
+  }
+
+
   void validateConfigOrThrowSolrException(String configName) throws IOException, KeeperException, InterruptedException {
     boolean isValid = cloudManager.getDistribStateManager().hasData(ZkConfigManager.CONFIGS_ZKNODE + "/" + configName);
     if(!isValid) {
@@ -810,7 +842,41 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
     success.add(key, value);
   }
 
-  private NamedList<Object> waitForCoreAdminAsyncCallToComplete(String nodeName, String requestId) {
+  /*
+   * backward compatibility reasons, add the response with the async ID as top level.
+   * This can be removed in Solr 9
+   */
+  @Deprecated
+  @SuppressWarnings("unchecked")
+  private void waitForAsyncCallsToComplete(Map<String, String> requestMap, NamedList results) {
+    waitForAsyncCallsToComplete(requestMap, results, false, null);
+  }
+  
+  private void waitForAsyncCallsToComplete(Map<String, String> requestMap, NamedList results,
+      boolean abortOnFailure, String msgOnError) {
+    SolrException failureException = null;
+    for (String k:requestMap.keySet()) {
+      log.debug("I am Waiting for :{}/{}", k, requestMap.get(k));
+      NamedList reqResult = waitForCoreAdminAsyncCallToComplete(k, requestMap.get(k));
+      if (INCLUDE_TOP_LEVEL_RESPONSE) {
+        results.add(requestMap.get(k), reqResult);
+      }
+      if ("failed".equalsIgnoreCase(((String)reqResult.get("STATUS")))) {
+        log.error("Error from shard {}: {}", k,  reqResult);
+        addFailure(results, k, reqResult);
+        if (abortOnFailure && msgOnError != null && failureException == null) {
+          failureException = new SolrException(ErrorCode.SERVER_ERROR, msgOnError);
+        }
+      } else {
+        addSuccess(results, k, reqResult);
+      }
+    }
+    if (failureException != null) {
+      throw failureException;
+    }
+  }
+
+  private NamedList waitForCoreAdminAsyncCallToComplete(String nodeName, String requestId) {
     ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
     ModifiableSolrParams params = new ModifiableSolrParams();
     params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTSTATUS.toString());