You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by is...@apache.org on 2019/08/28 10:04:44 UTC
[lucene-solr] 01/02: SOLR-13718: Async Collection API calls should
throw exception when an underlying async call fails
This is an automated email from the ASF dual-hosted git repository.
ishan pushed a commit to branch jira/solr-13718-8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit c7b887a01036c2933235ccc1ceaf19f2705307bf
Author: Ishan Chattopadhyaya <is...@apache.org>
AuthorDate: Mon Aug 26 16:24:20 2019 +0530
SOLR-13718: Async Collection API calls should throw exception when an underlying async call fails
---
.../OverseerCollectionMessageHandler.java | 68 +++++++++++++++++++++-
1 file changed, 67 insertions(+), 1 deletion(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 6fbab13..ca583a6 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -707,6 +707,38 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
return ((AddReplicaCmd) commandMap.get(ADDREPLICA)).addReplica(clusterState, message, results, onComplete);
}
+ void processResponses(NamedList results, ShardHandler shardHandler, boolean abortOnError, String msgOnError,
+ String asyncId, Map<String, String> requestMap) {
+ processResponses(results, shardHandler, abortOnError, msgOnError, asyncId, requestMap, Collections.emptySet());
+ }
+
+ void processResponses(NamedList results, ShardHandler shardHandler, boolean abortOnError, String msgOnError,
+ String asyncId, Map<String, String> requestMap, Set<String> okayExceptions) {
+ //Processes all shard responses
+ ShardResponse srsp;
+ do {
+ srsp = shardHandler.takeCompletedOrError();
+ if (srsp != null) {
+ processResponse(results, srsp, okayExceptions);
+ Throwable exception = srsp.getException();
+ if (abortOnError && exception != null) {
+ // drain pending requests
+ while (srsp != null) {
+ srsp = shardHandler.takeCompletedOrError();
+ }
+ throw new SolrException(ErrorCode.SERVER_ERROR, msgOnError, exception);
+ }
+ }
+ } while (srsp != null);
+
+ //If request is async wait for the core admin to complete before returning
+ if (asyncId != null) {
+ waitForAsyncCallsToComplete(requestMap, results, true, msgOnError);
+ requestMap.clear();
+ }
+ }
+
+
void validateConfigOrThrowSolrException(String configName) throws IOException, KeeperException, InterruptedException {
boolean isValid = cloudManager.getDistribStateManager().hasData(ZkConfigManager.CONFIGS_ZKNODE + "/" + configName);
if(!isValid) {
@@ -810,7 +842,41 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
success.add(key, value);
}
- private NamedList<Object> waitForCoreAdminAsyncCallToComplete(String nodeName, String requestId) {
+ /*
+ * backward compatibility reasons, add the response with the async ID as top level.
+ * This can be removed in Solr 9
+ */
+ @Deprecated
+ @SuppressWarnings("unchecked")
+ private void waitForAsyncCallsToComplete(Map<String, String> requestMap, NamedList results) {
+ waitForAsyncCallsToComplete(requestMap, results, false, null);
+ }
+
+ private void waitForAsyncCallsToComplete(Map<String, String> requestMap, NamedList results,
+ boolean abortOnFailure, String msgOnError) {
+ SolrException failureException = null;
+ for (String k:requestMap.keySet()) {
+ log.debug("I am Waiting for :{}/{}", k, requestMap.get(k));
+ NamedList reqResult = waitForCoreAdminAsyncCallToComplete(k, requestMap.get(k));
+ if (INCLUDE_TOP_LEVEL_RESPONSE) {
+ results.add(requestMap.get(k), reqResult);
+ }
+ if ("failed".equalsIgnoreCase(((String)reqResult.get("STATUS")))) {
+ log.error("Error from shard {}: {}", k, reqResult);
+ addFailure(results, k, reqResult);
+ if (abortOnFailure && msgOnError != null && failureException == null) {
+ failureException = new SolrException(ErrorCode.SERVER_ERROR, msgOnError);
+ }
+ } else {
+ addSuccess(results, k, reqResult);
+ }
+ }
+ if (failureException != null) {
+ throw failureException;
+ }
+ }
+
+ private NamedList waitForCoreAdminAsyncCallToComplete(String nodeName, String requestId) {
ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTSTATUS.toString());