You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2013/10/19 00:15:35 UTC
svn commit: r1533652 - in /lucene/dev/branches/branch_4x: ./ solr/
solr/core/ solr/core/src/java/org/apache/solr/core/
solr/core/src/java/org/apache/solr/update/
solr/core/src/java/org/apache/solr/update/processor/
solr/core/src/test/org/apache/solr/cl...
Author: markrmiller
Date: Fri Oct 18 22:15:35 2013
New Revision: 1533652
URL: http://svn.apache.org/r1533652
Log:
SOLR-5216: Document updates to SolrCloud can cause a distributed deadlock.
SOLR-5232: SolrCloud should distribute updates via streaming rather than buffering.
Added:
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java
- copied unchanged from r1533649, lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java
Modified:
lucene/dev/branches/branch_4x/ (props changed)
lucene/dev/branches/branch_4x/solr/ (props changed)
lucene/dev/branches/branch_4x/solr/CHANGES.txt (contents, props changed)
lucene/dev/branches/branch_4x/solr/core/ (props changed)
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java
lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/CustomCollectionTest.java
lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
lucene/dev/branches/branch_4x/solr/solrj/ (props changed)
lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java
lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java
Modified: lucene/dev/branches/branch_4x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/CHANGES.txt?rev=1533652&r1=1533651&r2=1533652&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_4x/solr/CHANGES.txt Fri Oct 18 22:15:35 2013
@@ -67,7 +67,16 @@ New Features
Bug Fixes
----------------------
+
+* SOLR-5216: Document updates to SolrCloud can cause a distributed deadlock.
+ (Mark Miller)
+
+Optimizations
+----------------------
+* SOLR-5232: SolrCloud should distribute updates via streaming rather than buffering.
+ (Mark Miller)
+
Security
----------------------
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java?rev=1533652&r1=1533651&r2=1533652&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java Fri Oct 18 22:15:35 2013
@@ -17,6 +17,27 @@
package org.apache.solr.core;
+import com.google.common.collect.Maps;
+
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.ZkSolrResourceLoader;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
+import org.apache.solr.handler.admin.CollectionsHandler;
+import org.apache.solr.handler.admin.CoreAdminHandler;
+import org.apache.solr.handler.admin.InfoHandler;
+import org.apache.solr.handler.component.ShardHandlerFactory;
+import org.apache.solr.logging.LogWatcher;
+import org.apache.solr.schema.IndexSchema;
+import org.apache.solr.schema.IndexSchemaFactory;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.apache.solr.util.FileUtils;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkNotNull;
import java.io.File;
@@ -86,6 +107,9 @@ public class CoreContainer {
protected ZkContainer zkSys = new ZkContainer();
private ShardHandlerFactory shardHandlerFactory;
+
+ private ExecutorService updateExecutor = Executors.newCachedThreadPool(
+ new SolrjNamedThreadFactory("updateExecutor"));
protected LogWatcher logging = null;
@@ -379,6 +403,8 @@ public class CoreContainer {
shardHandlerFactory.close();
}
+ ExecutorUtil.shutdownAndAwaitTermination(updateExecutor);
+
// we want to close zk stuff last
zkSys.close();
@@ -924,6 +950,10 @@ public class CoreContainer {
return shardHandlerFactory;
}
+ public ExecutorService getUpdateExecutor() {
+ return updateExecutor;
+ }
+
// Just to tidy up the code where it did this in-line.
private SolrException recordAndThrow(String name, String msg, Exception ex) {
synchronized (coreInitFailures) {
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1533652&r1=1533651&r2=1533652&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Fri Oct 18 22:15:35 2013
@@ -19,18 +19,10 @@ package org.apache.solr.update;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ExecutorService;
+import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
@@ -38,11 +30,10 @@ import org.apache.solr.client.solrj.requ
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.Diagnostics;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.util.AdjustableSemaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,220 +41,139 @@ import org.slf4j.LoggerFactory;
public class SolrCmdDistributor {
private static final int MAX_RETRIES_ON_FORWARD = 15;
public static Logger log = LoggerFactory.getLogger(SolrCmdDistributor.class);
-
- static AdjustableSemaphore semaphore = new AdjustableSemaphore(8);
-
- CompletionService<Request> completionService;
- Set<Future<Request>> pending;
-
- int maxBufferedAddsPerServer = 10;
- int maxBufferedDeletesPerServer = 10;
-
- private Response response = new Response();
-
- private final Map<Node,List<AddRequest>> adds = new HashMap<Node,List<AddRequest>>();
- private final Map<Node,List<DeleteRequest>> deletes = new HashMap<Node,List<DeleteRequest>>();
- private UpdateShardHandler updateShardHandler;
- class AddRequest {
- AddUpdateCommand cmd;
- ModifiableSolrParams params;
- }
+ private StreamingSolrServers servers;
- class DeleteRequest {
- DeleteUpdateCommand cmd;
- ModifiableSolrParams params;
- }
+ private List<Error> allErrors = new ArrayList<Error>();
+ private List<Error> errors = new ArrayList<Error>();
public static interface AbortCheck {
public boolean abortCheck();
}
- public SolrCmdDistributor(int numHosts, UpdateShardHandler updateShardHandler) {
- int maxPermits = Math.max(16, numHosts * 16);
- // limits how many tasks can actually execute at once
- if (maxPermits != semaphore.getMaxPermits()) {
- semaphore.setMaxPermits(maxPermits);
- }
-
- this.updateShardHandler = updateShardHandler;
- completionService = new ExecutorCompletionService<Request>(updateShardHandler.getCmdDistribExecutor());
- pending = new HashSet<Future<Request>>();
+ public SolrCmdDistributor(ExecutorService updateExecutor) {
+ servers = new StreamingSolrServers(updateExecutor);
}
public void finish() {
+ servers.blockUntilFinished();
+ doRetriesIfNeeded();
+
+ servers.shutdown();
+ }
- flushAdds(1);
- flushDeletes(1);
+ private void doRetriesIfNeeded() {
+ // NOTE: retries will be forwards to a single url
+
+ List<Error> errors = new ArrayList<Error>(this.errors);
+ errors.addAll(servers.getErrors());
+ allErrors.addAll(errors);
+ boolean blockUntilFinishedAgain = false;
+ for (Error err : errors) {
+ String oldNodeUrl = err.req.node.getUrl();
+
+ // if there is a retry url, we want to retry...
+ boolean isRetry = err.req.node.checkRetry();
+ boolean doRetry = false;
+ int rspCode = err.statusCode;
+
+ if (testing_errorHook != null) Diagnostics.call(testing_errorHook, err.e);
+
+ // this can happen in certain situations such as shutdown
+ if (isRetry) {
+ if (rspCode == 404 || rspCode == 403 || rspCode == 503
+ || rspCode == 500) {
+ doRetry = true;
+ }
+
+ // if its an ioexception, lets try again
+ if (err.e instanceof IOException) {
+ doRetry = true;
+ } else if (err.e instanceof SolrServerException) {
+ if (((SolrServerException) err.e).getRootCause() instanceof IOException) {
+ doRetry = true;
+ }
+ }
+ }
+
+ if (isRetry && err.req.retries < MAX_RETRIES_ON_FORWARD && doRetry) {
+ err.req.retries++;
- checkResponses(true);
- }
-
- public void distribDelete(DeleteUpdateCommand cmd, List<Node> urls, ModifiableSolrParams params) throws IOException {
- checkResponses(false);
+ SolrException.log(SolrCmdDistributor.log, "forwarding update to "
+ + oldNodeUrl + " failed - retrying ... ");
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.warn(null, e);
+ }
+
+ submit(err.req);
+ blockUntilFinishedAgain = true;
+ }
+ }
+
+ servers.clearErrors();
+ this.errors.clear();
- if (cmd.isDeleteById()) {
- doDelete(cmd, urls, params);
- } else {
- doDelete(cmd, urls, params);
+ if (blockUntilFinishedAgain) {
+ servers.blockUntilFinished();
+ doRetriesIfNeeded();
}
}
- public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params) throws IOException {
- checkResponses(false);
+ public void distribDelete(DeleteUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params) throws IOException {
+ distribDelete(cmd, nodes, params, false);
+ }
+
+ public void distribDelete(DeleteUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params, boolean sync) throws IOException {
- // make sure any pending deletes are flushed
- flushDeletes(1);
-
- // TODO: this is brittle
- // need to make a clone since these commands may be reused
- AddUpdateCommand clone = new AddUpdateCommand(null);
-
- clone.solrDoc = cmd.solrDoc;
- clone.commitWithin = cmd.commitWithin;
- clone.overwrite = cmd.overwrite;
- clone.setVersion(cmd.getVersion());
- AddRequest addRequest = new AddRequest();
- addRequest.cmd = clone;
- addRequest.params = params;
-
for (Node node : nodes) {
- List<AddRequest> alist = adds.get(node);
- if (alist == null) {
- alist = new ArrayList<AddRequest>(2);
- adds.put(node, alist);
+ UpdateRequest uReq = new UpdateRequest();
+ uReq.setParams(params);
+ if (cmd.isDeleteById()) {
+ uReq.deleteById(cmd.getId(), cmd.getVersion());
+ } else {
+ uReq.deleteByQuery(cmd.query);
}
- alist.add(addRequest);
+
+ submit(new Req(node, uReq, sync));
}
-
- flushAdds(maxBufferedAddsPerServer);
}
-
- /**
- * Synchronous (blocking) add to specified node. Any error returned from node is propagated.
- */
- public void syncAdd(AddUpdateCommand cmd, Node node, ModifiableSolrParams params) throws IOException {
- log.info("SYNCADD on {} : {}", node, cmd.getPrintableId());
- checkResponses(false);
- // flush all pending deletes
- flushDeletes(1);
- // flush all pending adds
- flushAdds(1);
- // finish with the pending requests
- checkResponses(false);
-
- UpdateRequest ureq = new UpdateRequest();
- ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
-
- ureq.setParams(params);
- syncRequest(node, ureq);
- }
-
- public void syncDelete(DeleteUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params) throws IOException {
- log.info("SYNCDELETE on {} : ", nodes, cmd);
- checkResponses(false);
- // flush all pending adds
- flushAdds(1);
- // flush all pending deletes
- flushDeletes(1);
- // finish pending requests
- checkResponses(false);
-
- DeleteUpdateCommand clonedCmd = clone(cmd);
- DeleteRequest deleteRequest = new DeleteRequest();
- deleteRequest.cmd = clonedCmd;
- deleteRequest.params = params;
-
- UpdateRequest ureq = new UpdateRequest();
- if (cmd.isDeleteById()) {
- ureq.deleteById(cmd.getId(), cmd.getVersion());
- } else {
- ureq.deleteByQuery(cmd.query);
- }
- ureq.setParams(params);
- for (Node node : nodes) {
- syncRequest(node, ureq);
- }
+
+ public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params) throws IOException {
+ distribAdd(cmd, nodes, params, false);
}
+
+ public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params, boolean synchronous) throws IOException {
- private void syncRequest(Node node, UpdateRequest ureq) {
- Request sreq = new Request();
- sreq.node = node;
- sreq.ureq = ureq;
-
- String url = node.getUrl();
- String fullUrl;
- if (!url.startsWith("http://") && !url.startsWith("https://")) {
- fullUrl = "http://" + url;
- } else {
- fullUrl = url;
- }
-
- HttpSolrServer server = new HttpSolrServer(fullUrl,
- updateShardHandler.getHttpClient());
-
- try {
- sreq.ursp = server.request(ureq);
- } catch (Exception e) {
- throw new SolrException(ErrorCode.SERVER_ERROR, "Failed synchronous update on shard " + sreq.node + " update: " + ureq , e);
+ for (Node node : nodes) {
+ UpdateRequest uReq = new UpdateRequest();
+ uReq.setParams(params);
+ uReq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
+ submit(new Req(node, uReq, synchronous));
}
+
}
public void distribCommit(CommitUpdateCommand cmd, List<Node> nodes,
ModifiableSolrParams params) throws IOException {
- // make sure we are ordered
- flushAdds(1);
- flushDeletes(1);
-
-
- // Wait for all outstanding responses to make sure that a commit
- // can't sneak in ahead of adds or deletes we already sent.
- // We could do this on a per-server basis, but it's more complex
- // and this solution will lead to commits happening closer together.
- checkResponses(true);
-
- // currently, we dont try to piggy back on outstanding adds or deletes
-
- UpdateRequest ureq = new UpdateRequest();
- ureq.setParams(params);
-
- addCommit(ureq, cmd);
-
- log.info("Distrib commit to:" + nodes + " params:" + params);
+ // we need to do any retries before commit...
+ servers.blockUntilFinished();
+ doRetriesIfNeeded();
- for (Node node : nodes) {
- submit(ureq, node);
- }
-
- // if the command wanted to block until everything was committed,
- // then do that here.
+ UpdateRequest uReq = new UpdateRequest();
+ uReq.setParams(params);
- if (cmd.waitSearcher) {
- checkResponses(true);
- }
- }
-
- private void doDelete(DeleteUpdateCommand cmd, List<Node> nodes,
- ModifiableSolrParams params) {
+ addCommit(uReq, cmd);
- flushAdds(1);
+ log.debug("Distrib commit to:" + nodes + " params:" + params);
- DeleteUpdateCommand clonedCmd = clone(cmd);
- DeleteRequest deleteRequest = new DeleteRequest();
- deleteRequest.cmd = clonedCmd;
- deleteRequest.params = params;
for (Node node : nodes) {
- List<DeleteRequest> dlist = deletes.get(node);
-
- if (dlist == null) {
- dlist = new ArrayList<DeleteRequest>(2);
- deletes.put(node, dlist);
- }
- dlist.add(deleteRequest);
+ submit(new Req(node, uReq, false));
}
- flushDeletes(maxBufferedDeletesPerServer);
}
void addCommit(UpdateRequest ureq, CommitUpdateCommand cmd) {
@@ -271,248 +181,63 @@ public class SolrCmdDistributor {
ureq.setAction(cmd.optimize ? AbstractUpdateRequest.ACTION.OPTIMIZE
: AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher, cmd.maxOptimizeSegments, cmd.softCommit, cmd.expungeDeletes);
}
-
- boolean flushAdds(int limit) {
- // check for pending deletes
-
- Set<Node> removeNodes = new HashSet<Node>();
- Set<Node> nodes = adds.keySet();
-
- for (Node node : nodes) {
- List<AddRequest> alist = adds.get(node);
- if (alist == null || alist.size() < limit) continue;
-
- UpdateRequest ureq = new UpdateRequest();
-
- ModifiableSolrParams combinedParams = new ModifiableSolrParams();
- for (AddRequest aReq : alist) {
- AddUpdateCommand cmd = aReq.cmd;
- combinedParams.add(aReq.params);
- ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
- }
+ private void submit(Req req) {
+ if (req.synchronous) {
+ servers.blockUntilFinished();
+ doRetriesIfNeeded();
- if (ureq.getParams() == null) ureq.setParams(new ModifiableSolrParams());
- ureq.getParams().add(combinedParams);
+ HttpSolrServer server = new HttpSolrServer(req.node.getUrl(),
+ servers.getHttpClient());
- removeNodes.add(node);
-
- submit(ureq, node);
- }
-
- for (Node node : removeNodes) {
- adds.remove(node);
- }
-
- return true;
- }
-
- boolean flushDeletes(int limit) {
- // check for pending deletes
-
- Set<Node> removeNodes = new HashSet<Node>();
- Set<Node> nodes = deletes.keySet();
- for (Node node : nodes) {
- List<DeleteRequest> dlist = deletes.get(node);
- if (dlist == null || dlist.size() < limit) continue;
- UpdateRequest ureq = new UpdateRequest();
-
- ModifiableSolrParams combinedParams = new ModifiableSolrParams();
-
- for (DeleteRequest dReq : dlist) {
- DeleteUpdateCommand cmd = dReq.cmd;
- combinedParams.add(dReq.params);
- if (cmd.isDeleteById()) {
- ureq.deleteById(cmd.getId(), cmd.getVersion());
- } else {
- ureq.deleteByQuery(cmd.query);
- }
-
- if (ureq.getParams() == null) ureq
- .setParams(new ModifiableSolrParams());
- ureq.getParams().add(combinedParams);
+ try {
+ server.request(req.uReq);
+ } catch (Exception e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Failed synchronous update on shard " + req.node + " update: " + req.uReq , e);
}
-
- removeNodes.add(node);
- submit(ureq, node);
+ return;
}
- for (Node node : removeNodes) {
- deletes.remove(node);
+ SolrServer solrServer = servers.getSolrServer(req);
+ try {
+ NamedList<Object> rsp = solrServer.request(req.uReq);
+ } catch (Exception e) {
+ SolrException.log(log, e);
+ Error error = new Error();
+ error.e = e;
+ error.req = req;
+ if (e instanceof SolrException) {
+ error.statusCode = ((SolrException) e).code();
+ }
+ errors.add(error);
}
-
- return true;
- }
-
- private DeleteUpdateCommand clone(DeleteUpdateCommand cmd) {
- DeleteUpdateCommand c = (DeleteUpdateCommand)cmd.clone();
- // TODO: shouldnt the clone do this?
- c.setFlags(cmd.getFlags());
- c.setVersion(cmd.getVersion());
- return c;
}
- public static class Request {
+ public static class Req {
public Node node;
- UpdateRequest ureq;
- NamedList<Object> ursp;
- int rspCode;
- public Exception exception;
- int retries;
- }
-
- void submit(UpdateRequest ureq, Node node) {
- Request sreq = new Request();
- sreq.node = node;
- sreq.ureq = ureq;
- submit(sreq);
- }
-
- public void submit(final Request sreq) {
-
- final String url = sreq.node.getUrl();
-
- Callable<Request> task = new Callable<Request>() {
- @Override
- public Request call() throws Exception {
- Request clonedRequest = null;
- try {
- clonedRequest = new Request();
- clonedRequest.node = sreq.node;
- clonedRequest.ureq = sreq.ureq;
- clonedRequest.retries = sreq.retries;
-
- String fullUrl;
- if (!url.startsWith("http://") && !url.startsWith("https://")) {
- fullUrl = "http://" + url;
- } else {
- fullUrl = url;
- }
-
- HttpSolrServer server = new HttpSolrServer(fullUrl,
- updateShardHandler.getHttpClient());
-
- if (Thread.currentThread().isInterrupted()) {
- clonedRequest.rspCode = 503;
- clonedRequest.exception = new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Shutting down.");
- return clonedRequest;
- }
-
- clonedRequest.ursp = server.request(clonedRequest.ureq);
-
- // currently no way to get the request body.
- } catch (Exception e) {
- clonedRequest.exception = e;
- if (e instanceof SolrException) {
- clonedRequest.rspCode = ((SolrException) e).code();
- } else {
- clonedRequest.rspCode = -1;
- }
- } finally {
- semaphore.release();
- }
- return clonedRequest;
- }
- };
- try {
- semaphore.acquire();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Update thread interrupted", e);
+ public UpdateRequest uReq;
+ public int retries;
+ public boolean synchronous;
+
+ public Req(Node node, UpdateRequest uReq, boolean synchronous) {
+ this.node = node;
+ this.uReq = uReq;
+ this.synchronous = synchronous;
}
- try {
- pending.add(completionService.submit(task));
- } catch (RejectedExecutionException e) {
- semaphore.release();
- throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Shutting down", e);
- }
-
}
+
public static Diagnostics.Callable testing_errorHook; // called on error when forwarding request. Currently data=[this, Request]
- void checkResponses(boolean block) {
-
- while (pending != null && pending.size() > 0) {
- try {
- Future<Request> future = block ? completionService.take()
- : completionService.poll();
- if (future == null) return;
- pending.remove(future);
-
- try {
- Request sreq = future.get();
- if (sreq.rspCode != 0) {
- // error during request
-
- if (testing_errorHook != null) Diagnostics.call(testing_errorHook, this, sreq);
-
- // if there is a retry url, we want to retry...
- boolean isRetry = sreq.node.checkRetry();
- boolean doRetry = false;
- int rspCode = sreq.rspCode;
-
- // this can happen in certain situations such as shutdown
- if (isRetry) {
- if (rspCode == 404 || rspCode == 403 || rspCode == 503
- || rspCode == 500) {
- doRetry = true;
- }
-
- // if its an ioexception, lets try again
- if (sreq.exception instanceof IOException) {
- doRetry = true;
- } else if (sreq.exception instanceof SolrServerException) {
- if (((SolrServerException) sreq.exception).getRootCause() instanceof IOException) {
- doRetry = true;
- }
- }
- }
-
- if (isRetry && sreq.retries < MAX_RETRIES_ON_FORWARD && doRetry) {
- sreq.retries++;
- sreq.rspCode = 0;
- sreq.exception = null;
- SolrException.log(SolrCmdDistributor.log, "forwarding update to " + sreq.node.getUrl() + " failed - retrying ... ");
- Thread.sleep(500);
- submit(sreq);
- } else {
- Exception e = sreq.exception;
- Error error = new Error();
- error.e = e;
- error.node = sreq.node;
- response.errors.add(error);
- response.sreq = sreq;
- SolrException.log(SolrCmdDistributor.log, "shard update error "
- + sreq.node, sreq.exception);
- }
- }
-
- } catch (ExecutionException e) {
- // shouldn't happen since we catch exceptions ourselves
- SolrException.log(SolrCore.log,
- "error sending update request to shard", e);
- }
-
- } catch (InterruptedException e) {
- throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
- "interrupted waiting for shard update response", e);
- }
- }
- }
public static class Response {
- public Request sreq;
public List<Error> errors = new ArrayList<Error>();
}
public static class Error {
- public Node node;
public Exception e;
- }
-
- public Response getResponse() {
- return response;
+ public int statusCode;
+ public Req req;
}
public static abstract class Node {
@@ -595,6 +320,64 @@ public class SolrCmdDistributor {
}
}
+ // RetryNodes are used in the case of 'forward to leader' where we want
+ // to try the latest leader on a fail in the case the leader just went down.
+ public static class RetryNode extends StdNode {
+
+ private ZkStateReader zkStateReader;
+ private String collection;
+ private String shardId;
+
+ public RetryNode(ZkCoreNodeProps nodeProps, ZkStateReader zkStateReader, String collection, String shardId) {
+ super(nodeProps);
+ this.zkStateReader = zkStateReader;
+ this.collection = collection;
+ this.shardId = shardId;
+ }
+
+ @Override
+ public boolean checkRetry() {
+ ZkCoreNodeProps leaderProps;
+ try {
+ leaderProps = new ZkCoreNodeProps(zkStateReader.getLeaderRetry(
+ collection, shardId));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+
+ this.nodeProps = leaderProps;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = super.hashCode();
+ result = prime * result
+ + ((collection == null) ? 0 : collection.hashCode());
+ result = prime * result + ((shardId == null) ? 0 : shardId.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (!super.equals(obj)) return false;
+ if (getClass() != obj.getClass()) return false;
+ RetryNode other = (RetryNode) obj;
+ if (nodeProps.getCoreUrl() == null) {
+ if (other.nodeProps.getCoreUrl() != null) return false;
+ } else if (!nodeProps.getCoreUrl().equals(other.nodeProps.getCoreUrl())) return false;
+
+ return true;
+ }
+ }
+
+ public List<Error> getErrors() {
+ return allErrors;
+ }
}
Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1533652&r1=1533651&r2=1533652&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Fri Oct 18 22:15:35 2013
@@ -53,8 +53,9 @@ import org.apache.solr.update.AddUpdateC
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.SolrCmdDistributor;
+import org.apache.solr.update.SolrCmdDistributor.Error;
import org.apache.solr.update.SolrCmdDistributor.Node;
-import org.apache.solr.update.SolrCmdDistributor.Response;
+import org.apache.solr.update.SolrCmdDistributor.RetryNode;
import org.apache.solr.update.SolrCmdDistributor.StdNode;
import org.apache.solr.update.UpdateCommand;
import org.apache.solr.update.UpdateHandler;
@@ -68,6 +69,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -142,8 +144,6 @@ public class DistributedUpdateProcessor
private boolean isSubShardLeader = false;
private List<Node> nodes;
- private int numNodes;
-
private UpdateCommand updateCommand; // the current command this processor is working on.
@@ -171,8 +171,7 @@ public class DistributedUpdateProcessor
this.zkEnabled = coreDesc.getCoreContainer().isZooKeeperAware();
zkController = req.getCore().getCoreDescriptor().getCoreContainer().getZkController();
if (zkEnabled) {
- numNodes = zkController.getZkStateReader().getClusterState().getLiveNodes().size();
- cmdDistrib = new SolrCmdDistributor(numNodes, coreDesc.getCoreContainer().getZkController().getUpdateShardHandler());
+ cmdDistrib = new SolrCmdDistributor(coreDesc.getCoreContainer().getUpdateExecutor());
}
//this.rsp = reqInfo != null ? reqInfo.getRsp() : null;
@@ -199,7 +198,6 @@ public class DistributedUpdateProcessor
String coreName = req.getCore().getName();
ClusterState cstate = zkController.getClusterState();
- numNodes = cstate.getLiveNodes().size();
DocCollection coll = cstate.getCollection(collection);
Slice slice = coll.getRouter().getTargetSlice(id, doc, req.getParams(), coll);
@@ -451,7 +449,7 @@ public class DistributedUpdateProcessor
zkController.getBaseUrl(), req.getCore().getName()));
params.set("distrib.from.parent", req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId());
for (Node subShardLeader : subShardLeaders) {
- cmdDistrib.syncAdd(cmd, subShardLeader, params);
+ cmdDistrib.distribAdd(cmd, Collections.singletonList(subShardLeader), params, true);
}
}
}
@@ -497,16 +495,16 @@ public class DistributedUpdateProcessor
// send in a background thread
cmdDistrib.finish();
- Response response = cmdDistrib.getResponse();
+ List<Error> errors = cmdDistrib.getErrors();
// TODO - we may need to tell about more than one error...
// if its a forward, any fail is a problem -
// otherwise we assume things are fine if we got it locally
// until we start allowing min replication param
- if (response.errors.size() > 0) {
+ if (errors.size() > 0) {
// if one node is a RetryNode, this was a forward request
- if (response.errors.get(0).node instanceof RetryNode) {
- rsp.setException(response.errors.get(0).e);
+ if (errors.get(0).req.node instanceof RetryNode) {
+ rsp.setException(errors.get(0).e);
}
// else
// for now we don't error - we assume if it was added locally, we
@@ -519,8 +517,8 @@ public class DistributedUpdateProcessor
// legit
// TODO: we should do this in the background it would seem
- for (SolrCmdDistributor.Error error : response.errors) {
- if (error.node instanceof RetryNode) {
+ for (SolrCmdDistributor.Error error : errors) {
+ if (error.req.node instanceof RetryNode) {
// we don't try to force a leader to recover
// when we cannot forward to it
continue;
@@ -530,7 +528,7 @@ public class DistributedUpdateProcessor
// TODO: do retries??
// TODO: what if its is already recovering? Right now recoveries queue up -
// should they?
- String recoveryUrl = error.node.getBaseUrl();
+ String recoveryUrl = error.req.node.getBaseUrl();
HttpSolrServer server;
log.info("try and ask " + recoveryUrl + " to recover");
try {
@@ -540,7 +538,7 @@ public class DistributedUpdateProcessor
RequestRecovery recoverRequestCmd = new RequestRecovery();
recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
- recoverRequestCmd.setCoreName(error.node.getCoreName());
+ recoverRequestCmd.setCoreName(error.req.node.getCoreName());
server.request(recoverRequestCmd);
} catch (Exception e) {
@@ -838,7 +836,7 @@ public class DistributedUpdateProcessor
params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
params.set("distrib.from.parent", cloudDesc.getShardId());
- cmdDistrib.syncDelete(cmd, subShardLeaders, params);
+ cmdDistrib.distribDelete(cmd, subShardLeaders, params, true);
}
}
@@ -1061,7 +1059,7 @@ public class DistributedUpdateProcessor
if (leaderLogic) {
List<Node> subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), null, null);
if (subShardLeaders != null) {
- cmdDistrib.syncDelete(cmd, subShardLeaders, params);
+ cmdDistrib.distribDelete(cmd, subShardLeaders, params, true);
}
if (replicas != null) {
cmdDistrib.distribDelete(cmd, replicas, params);
@@ -1286,61 +1284,6 @@ public class DistributedUpdateProcessor
}
return urls;
}
-
- // RetryNodes are used in the case of 'forward to leader' where we want
- // to try the latest leader on a fail in the case the leader just went down.
- public static class RetryNode extends StdNode {
-
- private ZkStateReader zkStateReader;
- private String collection;
- private String shardId;
-
- public RetryNode(ZkCoreNodeProps nodeProps, ZkStateReader zkStateReader, String collection, String shardId) {
- super(nodeProps);
- this.zkStateReader = zkStateReader;
- this.collection = collection;
- this.shardId = shardId;
- }
-
- @Override
- public boolean checkRetry() {
- ZkCoreNodeProps leaderProps;
- try {
- leaderProps = new ZkCoreNodeProps(zkStateReader.getLeaderRetry(
- collection, shardId));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return false;
- }
-
- this.nodeProps = leaderProps;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = super.hashCode();
- result = prime * result
- + ((collection == null) ? 0 : collection.hashCode());
- result = prime * result + ((shardId == null) ? 0 : shardId.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) return true;
- if (!super.equals(obj)) return false;
- if (getClass() != obj.getClass()) return false;
- RetryNode other = (RetryNode) obj;
- if (nodeProps.getCoreUrl() == null) {
- if (other.nodeProps.getCoreUrl() != null) return false;
- } else if (!nodeProps.getCoreUrl().equals(other.nodeProps.getCoreUrl())) return false;
-
- return true;
- }
- }
/**
* Returns a boolean indicating whether or not the caller should behave as
Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java?rev=1533652&r1=1533651&r2=1533652&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java Fri Oct 18 22:15:35 2013
@@ -65,7 +65,6 @@ import org.apache.solr.common.params.Mod
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.update.DirectUpdateHandler2;
-import org.apache.solr.update.SolrCmdDistributor.Request;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -105,8 +104,8 @@ public class BasicDistributedZkTest exte
Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new DefaultSolrThreadFactory("testExecutor"));
- CompletionService<Request> completionService;
- Set<Future<Request>> pending;
+ CompletionService<Object> completionService;
+ Set<Future<Object>> pending;
@BeforeClass
public static void beforeThisClass2() throws Exception {
@@ -126,8 +125,8 @@ public class BasicDistributedZkTest exte
sliceCount = 2;
shardCount = 4;
- completionService = new ExecutorCompletionService<Request>(executor);
- pending = new HashSet<Future<Request>>();
+ completionService = new ExecutorCompletionService<Object>(executor);
+ pending = new HashSet<Future<Object>>();
}
@@ -749,7 +748,7 @@ public class BasicDistributedZkTest exte
while (pending != null && pending.size() > 0) {
- Future<Request> future = completionService.take();
+ Future<Object> future = completionService.take();
pending.remove(future);
}
@@ -875,7 +874,7 @@ public class BasicDistributedZkTest exte
while (pending != null && pending.size() > 0) {
- Future<Request> future = completionService.take();
+ Future<Object> future = completionService.take();
if (future == null) return;
pending.remove(future);
}
@@ -964,7 +963,7 @@ public class BasicDistributedZkTest exte
while (pending != null && pending.size() > 0) {
- Future<Request> future = completionService.take();
+ Future<Object> future = completionService.take();
if (future == null) return;
pending.remove(future);
}
@@ -1077,7 +1076,7 @@ public class BasicDistributedZkTest exte
pending.add(completionService.submit(call));
while (pending != null && pending.size() > 0) {
- Future<Request> future = completionService.take();
+ Future<Object> future = completionService.take();
if (future == null) return;
pending.remove(future);
}
Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java?rev=1533652&r1=1533651&r2=1533652&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java Fri Oct 18 22:15:35 2013
@@ -21,7 +21,7 @@ import java.net.ConnectException;
import java.util.ArrayList;
import java.util.List;
-import org.apache.lucene.util.LuceneTestCase.BadApple;
+import org.apache.http.client.HttpClient;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.SolrQuery;
@@ -52,10 +52,10 @@ public class ChaosMonkeyNothingIsSafeTes
SolrCmdDistributor.testing_errorHook = new Diagnostics.Callable() {
@Override
public void call(Object... data) {
- SolrCmdDistributor.Request sreq = (SolrCmdDistributor.Request)data[1];
- if (sreq.exception == null) return;
- if (sreq.exception.getMessage().contains("Timeout")) {
- Diagnostics.logThreadDumps("REQUESTING THREAD DUMP DUE TO TIMEOUT: " + sreq.exception.getMessage());
+ Exception e = (Exception) data[0];
+ if (e == null) return;
+ if (e.getMessage().contains("Timeout")) {
+ Diagnostics.logThreadDumps("REQUESTING THREAD DUMP DUE TO TIMEOUT: " + e.getMessage());
}
}
};
Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java?rev=1533652&r1=1533651&r2=1533652&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java Fri Oct 18 22:15:35 2013
@@ -44,10 +44,10 @@ public class ChaosMonkeySafeLeaderTest
SolrCmdDistributor.testing_errorHook = new Diagnostics.Callable() {
@Override
public void call(Object... data) {
- SolrCmdDistributor.Request sreq = (SolrCmdDistributor.Request)data[1];
- if (sreq.exception == null) return;
- if (sreq.exception.getMessage().contains("Timeout")) {
- Diagnostics.logThreadDumps("REQUESTING THREAD DUMP DUE TO TIMEOUT: " + sreq.exception.getMessage());
+ Exception e = (Exception) data[0];
+ if (e == null) return;
+ if (e.getMessage().contains("Timeout")) {
+ Diagnostics.logThreadDumps("REQUESTING THREAD DUMP DUE TO TIMEOUT: " + e.getMessage());
}
}
};
Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java?rev=1533652&r1=1533651&r2=1533652&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java Fri Oct 18 22:15:35 2013
@@ -80,7 +80,6 @@ import org.apache.solr.core.SolrInfoMBea
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.servlet.SolrDispatchFilter;
import org.apache.solr.update.DirectUpdateHandler2;
-import org.apache.solr.update.SolrCmdDistributor.Request;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -98,8 +97,8 @@ public class CollectionsAPIDistributedZk
Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new DefaultSolrThreadFactory("testExecutor"));
- CompletionService<Request> completionService;
- Set<Future<Request>> pending;
+ CompletionService<Object> completionService;
+ Set<Future<Object>> pending;
// we randomly use a second config set rather than just one
private boolean secondConfigSet = random().nextBoolean();
@@ -165,8 +164,8 @@ public class CollectionsAPIDistributedZk
sliceCount = 2;
shardCount = 4;
- completionService = new ExecutorCompletionService<Request>(executor);
- pending = new HashSet<Future<Request>>();
+ completionService = new ExecutorCompletionService<Object>(executor);
+ pending = new HashSet<Future<Object>>();
checkCreatedVsState = false;
}
Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/CustomCollectionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/CustomCollectionTest.java?rev=1533652&r1=1533651&r2=1533652&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/CustomCollectionTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/CustomCollectionTest.java Fri Oct 18 22:15:35 2013
@@ -17,12 +17,32 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
+import static org.apache.solr.cloud.OverseerCollectionProcessor.MAX_SHARDS_PER_NODE;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.ROUTER;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
+import static org.apache.solr.common.params.ShardParams._ROUTE_;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.lucene.util._TestUtil;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrServer;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
@@ -33,7 +53,6 @@ import org.apache.solr.common.SolrExcept
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.ImplicitDocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
@@ -43,32 +62,10 @@ import org.apache.solr.common.cloud.ZkSt
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.update.DirectUpdateHandler2;
-import org.apache.solr.update.SolrCmdDistributor.Request;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.junit.Before;
import org.junit.BeforeClass;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.solr.cloud.OverseerCollectionProcessor.MAX_SHARDS_PER_NODE;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.ROUTER;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
-import static org.apache.solr.common.params.ShardParams._ROUTE_;
-
/**
* Tests the Custom Sharding API.
*/
@@ -82,8 +79,8 @@ public class CustomCollectionTest extend
Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new DefaultSolrThreadFactory("testExecutor"));
- CompletionService<Request> completionService;
- Set<Future<Request>> pending;
+ CompletionService<Object> completionService;
+ Set<Future<Object>> pending;
@BeforeClass
public static void beforeThisClass2() throws Exception {
@@ -108,8 +105,8 @@ public class CustomCollectionTest extend
sliceCount = 2;
shardCount = 4;
- completionService = new ExecutorCompletionService<Request>(executor);
- pending = new HashSet<Future<Request>>();
+ completionService = new ExecutorCompletionService<Object>(executor);
+ pending = new HashSet<Future<Object>>();
checkCreatedVsState = false;
}
Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java?rev=1533652&r1=1533651&r2=1533652&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java Fri Oct 18 22:15:35 2013
@@ -21,10 +21,11 @@ import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.index.LogDocMergePolicy;
-
import org.apache.solr.BaseDistributedSearchTestCase;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServer;
@@ -36,17 +37,19 @@ import org.apache.solr.common.cloud.ZkCo
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrEventListener;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.servlet.SolrDispatchFilter;
+import org.apache.solr.update.SolrCmdDistributor.Error;
import org.apache.solr.update.SolrCmdDistributor.Node;
-import org.apache.solr.update.SolrCmdDistributor.Response;
+import org.apache.solr.update.SolrCmdDistributor.RetryNode;
import org.apache.solr.update.SolrCmdDistributor.StdNode;
import org.apache.solr.update.processor.DistributedUpdateProcessor;
-
import org.junit.BeforeClass;
public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
@@ -57,7 +60,8 @@ public class SolrCmdDistributorTest exte
// being able to call optimize to have all deletes expunged.
System.setProperty("solr.tests.mergePolicy", LogDocMergePolicy.class.getName());
}
- private UpdateShardHandler updateShardHandler;
+ private ExecutorService updateExecutor = Executors.newCachedThreadPool(
+ new SolrjNamedThreadFactory("updateExecutor"));
public SolrCmdDistributorTest() {
fixShardCount = true;
@@ -103,7 +107,7 @@ public class SolrCmdDistributorTest exte
public void doTest() throws Exception {
del("*:*");
- SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(5, updateShardHandler);
+ SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateExecutor);
ModifiableSolrParams params = new ModifiableSolrParams();
@@ -129,9 +133,9 @@ public class SolrCmdDistributorTest exte
cmdDistrib.finish();
- Response response = cmdDistrib.getResponse();
+ List<Error> errors = cmdDistrib.getErrors();
- assertEquals(response.errors.toString(), 0, response.errors.size());
+ assertEquals(errors.toString(), 0, errors.size());
long numFound = controlClient.query(new SolrQuery("*:*")).getResults()
.getNumFound();
@@ -143,7 +147,7 @@ public class SolrCmdDistributorTest exte
nodes.add(new StdNode(new ZkCoreNodeProps(nodeProps)));
// add another 2 docs to control and 3 to client
- cmdDistrib = new SolrCmdDistributor(5, updateShardHandler);
+ cmdDistrib = new SolrCmdDistributor(updateExecutor);
cmd.solrDoc = sdoc("id", 2);
params = new ModifiableSolrParams();
params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
@@ -167,9 +171,9 @@ public class SolrCmdDistributorTest exte
params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
cmdDistrib.distribCommit(ccmd, nodes, params);
cmdDistrib.finish();
- response = cmdDistrib.getResponse();
+ errors = cmdDistrib.getErrors();
- assertEquals(response.errors.toString(), 0, response.errors.size());
+ assertEquals(errors.toString(), 0, errors.size());
SolrDocumentList results = controlClient.query(new SolrQuery("*:*")).getResults();
numFound = results.getNumFound();
@@ -186,7 +190,7 @@ public class SolrCmdDistributorTest exte
- cmdDistrib = new SolrCmdDistributor(5, updateShardHandler);
+ cmdDistrib = new SolrCmdDistributor(updateExecutor);
params = new ModifiableSolrParams();
params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
@@ -199,9 +203,9 @@ public class SolrCmdDistributorTest exte
cmdDistrib.distribCommit(ccmd, nodes, params);
cmdDistrib.finish();
- response = cmdDistrib.getResponse();
+ errors = cmdDistrib.getErrors();
- assertEquals(response.errors.toString(), 0, response.errors.size());
+ assertEquals(errors.toString(), 0, errors.size());
results = controlClient.query(new SolrQuery("*:*")).getResults();
@@ -219,7 +223,7 @@ public class SolrCmdDistributorTest exte
int id = 5;
- cmdDistrib = new SolrCmdDistributor(5, updateShardHandler);
+ cmdDistrib = new SolrCmdDistributor(updateExecutor);
int cnt = atLeast(303);
for (int i = 0; i < cnt; i++) {
@@ -289,18 +293,57 @@ public class SolrCmdDistributorTest exte
((NamedList<Object>) resp.get("index")).get("numDocs"),
((NamedList<Object>) resp.get("index")).get("maxDoc"));
}
+
+ // Test RetryNode
+ cmdDistrib = new SolrCmdDistributor(updateExecutor);
+ final HttpSolrServer solrclient = (HttpSolrServer) clients.get(0);
+ long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults()
+ .getNumFound();
+
+ nodes = new ArrayList<Node>();
+
+ nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, "[ff01::114]:33332" + context, ZkStateReader.CORE_NAME_PROP, "");
+ RetryNode retryNode = new RetryNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1") {
+ @Override
+ public boolean checkRetry() {
+ ZkNodeProps leaderProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(),
+ ZkStateReader.CORE_NAME_PROP, "");
+ this.nodeProps = new ZkCoreNodeProps(leaderProps);
+
+ return true;
+ }
+ };
+
+
+ nodes.add(retryNode);
+
+
+ cmd = new AddUpdateCommand(null);
+ cmd.solrDoc = sdoc("id", 1111111);
+ params = new ModifiableSolrParams();
+ cmdDistrib.distribAdd(cmd, nodes, params);
+
+ ccmd = new CommitUpdateCommand(null, false);
+ params = new ModifiableSolrParams();
+ params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
+ cmdDistrib.distribCommit(ccmd, nodes, params);
+ cmdDistrib.finish();
+
+ long numFoundAfter = solrclient.query(new SolrQuery("*:*")).getResults()
+ .getNumFound();
+
+ assertEquals(numFoundBefore + 1, numFoundAfter);
}
@Override
public void setUp() throws Exception {
super.setUp();
- updateShardHandler = new UpdateShardHandler(10000, 10000);
}
@Override
public void tearDown() throws Exception {
- updateShardHandler = null;
+ ExecutorUtil.shutdownNowAndAwaitTermination(updateExecutor);
super.tearDown();
}
}
Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java?rev=1533652&r1=1533651&r2=1533652&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java Fri Oct 18 22:15:35 2013
@@ -44,6 +44,8 @@ import org.apache.solr.client.solrj.Solr
import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.util.ClientUtils;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
@@ -72,14 +74,15 @@ public class ConcurrentUpdateSolrServer
.getLogger(ConcurrentUpdateSolrServer.class);
private HttpSolrServer server;
final BlockingQueue<UpdateRequest> queue;
- final ExecutorService scheduler = Executors.newCachedThreadPool(
- new SolrjNamedThreadFactory("concurrentUpdateScheduler"));
+ final ExecutorService scheduler;
final Queue<Runner> runners;
volatile CountDownLatch lock = null; // used to block everything
final int threadCount;
+ boolean shutdownExecutor = false;
+ int pollQueueTime = 250;
/**
- * Uses an internaly managed HttpClient instance.
+ * Uses an internally managed HttpClient instance.
*
* @param solrServerUrl
* The Solr server URL
@@ -91,18 +94,27 @@ public class ConcurrentUpdateSolrServer
public ConcurrentUpdateSolrServer(String solrServerUrl, int queueSize,
int threadCount) {
this(solrServerUrl, null, queueSize, threadCount);
+ shutdownExecutor = true;
+ }
+
+ public ConcurrentUpdateSolrServer(String solrServerUrl,
+ HttpClient client, int queueSize, int threadCount) {
+ this(solrServerUrl, null, queueSize, threadCount, Executors.newCachedThreadPool(
+ new SolrjNamedThreadFactory("concurrentUpdateScheduler")));
+ shutdownExecutor = true;
}
/**
* Uses the supplied HttpClient to send documents to the Solr server.
*/
public ConcurrentUpdateSolrServer(String solrServerUrl,
- HttpClient client, int queueSize, int threadCount) {
+ HttpClient client, int queueSize, int threadCount, ExecutorService es) {
this.server = new HttpSolrServer(solrServerUrl, client);
this.server.setFollowRedirects(false);
queue = new LinkedBlockingQueue<UpdateRequest>(queueSize);
this.threadCount = threadCount;
runners = new LinkedList<Runner>();
+ scheduler = es;
}
/**
@@ -115,8 +127,7 @@ public class ConcurrentUpdateSolrServer
public void run() {
runnerLock.lock();
- // info is ok since this should only happen once for each thread
- log.info("starting runner: {}", this);
+ log.debug("starting runner: {}", this);
HttpPost method = null;
HttpResponse response = null;
try {
@@ -169,14 +180,15 @@ public class ConcurrentUpdateSolrServer
}
}
out.flush();
- req = queue.poll(250, TimeUnit.MILLISECONDS);
+ req = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
}
if (isXml) {
out.write("</stream>".getBytes("UTF-8"));
}
} catch (InterruptedException e) {
- e.printStackTrace();
+ Thread.currentThread().interrupt();
+ log.warn("", e);
}
}
});
@@ -196,16 +208,13 @@ public class ConcurrentUpdateSolrServer
response = server.getHttpClient().execute(method);
int statusCode = response.getStatusLine().getStatusCode();
- log.info("Status for: "
- + updateRequest.getDocuments().get(0).getFieldValue("id")
- + " is " + statusCode);
if (statusCode != HttpStatus.SC_OK) {
StringBuilder msg = new StringBuilder();
msg.append(response.getStatusLine().getReasonPhrase());
msg.append("\n\n");
msg.append("\n\n");
msg.append("request: ").append(method.getURI());
- handleError(new Exception(msg.toString()));
+ handleError(new SolrException(ErrorCode.getErrorCode(statusCode), msg.toString()));
}
} finally {
try {
@@ -213,6 +222,7 @@ public class ConcurrentUpdateSolrServer
response.getEntity().getContent().close();
}
} catch (Exception ex) {
+ log.warn("", ex);
}
}
}
@@ -236,7 +246,7 @@ public class ConcurrentUpdateSolrServer
}
}
- log.info("finished: {}", this);
+ log.debug("finished: {}", this);
runnerLock.unlock();
}
}
@@ -357,16 +367,18 @@ public class ConcurrentUpdateSolrServer
@Override
public void shutdown() {
server.shutdown();
- scheduler.shutdown();
- try {
- if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
+ if (shutdownExecutor) {
+ scheduler.shutdown();
+ try {
+ if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
+ scheduler.shutdownNow();
+ if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) log
+ .error("ExecutorService did not terminate");
+ }
+ } catch (InterruptedException ie) {
scheduler.shutdownNow();
- if (!scheduler.awaitTermination(60, TimeUnit.SECONDS))
- log.error("ExecutorService did not terminate");
+ Thread.currentThread().interrupt();
}
- } catch (InterruptedException ie) {
- scheduler.shutdownNow();
- Thread.currentThread().interrupt();
}
}
@@ -384,19 +396,30 @@ public class ConcurrentUpdateSolrServer
public void shutdownNow() {
server.shutdown();
- scheduler.shutdownNow(); // Cancel currently executing tasks
- try {
- if (!scheduler.awaitTermination(30, TimeUnit.SECONDS))
- log.error("ExecutorService did not terminate");
- } catch (InterruptedException ie) {
- scheduler.shutdownNow();
- Thread.currentThread().interrupt();
+ if (shutdownExecutor) {
+ scheduler.shutdownNow(); // Cancel currently executing tasks
+ try {
+ if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) log
+ .error("ExecutorService did not terminate");
+ } catch (InterruptedException ie) {
+ scheduler.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
}
}
public void setParser(ResponseParser responseParser) {
server.setParser(responseParser);
}
+
+
+ /**
+ * @param pollQueueTime time for an open connection to wait for updates when
+ * the queue is empty.
+ */
+ public void setPollQueueTime(int pollQueueTime) {
+ this.pollQueueTime = pollQueueTime;
+ }
public void setRequestWriter(RequestWriter requestWriter) {
server.setRequestWriter(requestWriter);
Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java?rev=1533652&r1=1533651&r2=1533652&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java Fri Oct 18 22:15:35 2013
@@ -100,7 +100,9 @@ public class HttpClientUtil {
*/
public static HttpClient createClient(final SolrParams params) {
final ModifiableSolrParams config = new ModifiableSolrParams(params);
- logger.info("Creating new http client, config:" + config);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Creating new http client, config:" + config);
+ }
final DefaultHttpClient httpClient = new SystemDefaultHttpClient();
configureClient(httpClient, config);
return httpClient;