You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2011/12/31 01:36:54 UTC
svn commit: r1225998 - in /lucene/dev/branches/solrcloud/solr/core/src:
java/org/apache/solr/update/SolrCmdDistributor.java
java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
test/org/apache/solr/update/SolrCmdDistributorTest.java
Author: markrmiller
Date: Sat Dec 31 00:36:54 2011
New Revision: 1225998
URL: http://svn.apache.org/viewvc?rev=1225998&view=rev
Log:
add buffering to the SolrCmdDistributor - also move retries back into SolrCmdDistributor
Modified:
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1225998&r1=1225997&r2=1225998&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Sat Dec 31 00:36:54 2011
@@ -19,8 +19,10 @@ package org.apache.solr.update;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
@@ -41,8 +43,8 @@ import org.apache.solr.common.params.Mod
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.SolrCore;
-// TODO: we are not really using the buffering anymore due to DistribUpdateProc...
-// we might want to bring back a form of slots...
+
+
public class SolrCmdDistributor {
// TODO: shut this thing down
static ThreadPoolExecutor commExecutor = new ThreadPoolExecutor(0,
@@ -60,43 +62,50 @@ public class SolrCmdDistributor {
CompletionService<Request> completionService;
Set<Future<Request>> pending;
- //private final SolrQueryRequest req;
- //private final SolrQueryResponse rsp;
-
- //private final SchemaField idField;
-
int maxBufferedAddsPerServer = 10;
int maxBufferedDeletesPerServer = 100;
+
+ private Response response = new Response();
- private List<AddUpdateCommand> alist;
- private ArrayList<DeleteUpdateCommand> dlist;
+ private final Map<String,List<AddRequest>> adds = new HashMap<String,List<AddRequest>>();
+ private final Map<String,List<DeleteRequest>> deletes = new HashMap<String,List<DeleteRequest>>();
- public Response finish(List<String> urls, ModifiableSolrParams params) {
+ class AddRequest {
+ AddUpdateCommand cmd;
+ ModifiableSolrParams params;
+ }
+
+ class DeleteRequest {
+ DeleteUpdateCommand cmd;
+ ModifiableSolrParams params;
+ }
+
+ public void finish(List<Url> urls) {
// piggyback on any outstanding adds or deletes if possible.
- flushAdds(1, null, urls, params);
- flushDeletes(1, null, urls, params);
+ flushAdds(1, null, null, urls);
+ flushDeletes(1, null, null, urls);
- return checkResponses(true, urls);
+ checkResponses(true);
}
- public void distribDelete(DeleteUpdateCommand cmd, List<String> urls, ModifiableSolrParams params) throws IOException {
- checkResponses(false, urls);
+ public void distribDelete(DeleteUpdateCommand cmd, List<Url> urls, ModifiableSolrParams params) throws IOException {
+ checkResponses(false);
if (cmd.isDeleteById()) {
doDelete(cmd, urls, params);
} else {
- // TODO: query must be broadcast to all ??
+ // nocommit TODO: query must be broadcast to all ??
doDelete(cmd, urls, params);
}
}
- public void distribAdd(AddUpdateCommand cmd, List<String> urls, ModifiableSolrParams params) throws IOException {
+ public void distribAdd(AddUpdateCommand cmd, List<Url> urls, ModifiableSolrParams params) throws IOException {
- checkResponses(false, urls);
+ checkResponses(false);
// make sure any pending deletes are flushed
- flushDeletes(1, null, urls, params);
+ flushDeletes(1, null, null, urls);
// TODO: this is brittle
// need to make a clone since these commands may be reused
@@ -106,105 +115,157 @@ public class SolrCmdDistributor {
clone.commitWithin = cmd.commitWithin;
clone.overwrite = cmd.overwrite;
clone.setVersion(cmd.getVersion());
-
+ AddRequest addRequest = new AddRequest();
+ addRequest.cmd = clone;
+ addRequest.params = params;
// nocommit: review as far as SOLR-2685
// clone.indexedId = cmd.indexedId;
// clone.doc = cmd.doc;
-
-
- if (alist == null) {
- alist = new ArrayList<AddUpdateCommand>(2);
+ for (Url url : urls) {
+ List<AddRequest> alist = adds.get(url.getUrl());
+ if (alist == null) {
+ alist = new ArrayList<AddRequest>(2);
+ adds.put(url.getUrl(), alist);
+ }
+ alist.add(addRequest);
+ // System.out.println("buffer add to " + url);
}
- alist.add(clone);
- flushAdds(maxBufferedAddsPerServer, null, urls, params);
+ flushAdds(maxBufferedAddsPerServer, null, null, urls);
}
- public void distribCommit(CommitUpdateCommand cmd, List<String> urls, ModifiableSolrParams params)
+ public void distribCommit(CommitUpdateCommand cmd, List<Url> urls, ModifiableSolrParams params)
throws IOException {
// Wait for all outstanding repsonses to make sure that a commit
// can't sneak in ahead of adds or deletes we already sent.
// We could do this on a per-server basis, but it's more complex
// and this solution will lead to commits happening closer together.
- checkResponses(true, urls);
-
+ checkResponses(true);
+
// piggyback on any outstanding adds or deletes if possible.
- // TODO: review this
- flushAdds(1, cmd, urls, params);
-
- flushDeletes(1, cmd, urls, params);
+ boolean piggied = false;
+ if (!flushAdds(1, cmd, params, urls)) {
+ if (flushDeletes(1, cmd, params, urls)) piggied = true;
+ } else {
+ piggied = true;
+ }
- UpdateRequestExt ureq = new UpdateRequestExt();
- ureq.setParams(params);
-
- addCommit(ureq, cmd);
- submit(ureq, urls);
+ if (!piggied) {
+ UpdateRequestExt ureq = new UpdateRequestExt();
+ ureq.setParams(params);
+
+ addCommit(ureq, cmd);
+
+ for (Url url : urls) {
+ submit(ureq, url);
+ }
+ }
// if the command wanted to block until everything was committed,
// then do that here.
// nocommit
if (/* cmd.waitFlush || */cmd.waitSearcher) {
- checkResponses(true, urls);
+ checkResponses(true);
}
}
- private void doDelete(DeleteUpdateCommand cmd, List<String> urls, ModifiableSolrParams params) throws IOException {
+ private void doDelete(DeleteUpdateCommand cmd, List<Url> urls,
+ ModifiableSolrParams params) throws IOException {
- flushAdds(1, null, urls, params);
+ flushAdds(1, null, null, urls);
- if (dlist == null) {
- dlist = new ArrayList<DeleteUpdateCommand>(2);
+ DeleteUpdateCommand clonedCmd = clone(cmd);
+ DeleteRequest deleteRequest = new DeleteRequest();
+ deleteRequest.cmd = clonedCmd;
+ deleteRequest.params = params;
+ for (Url url : urls) {
+ List<DeleteRequest> dlist = deletes.get(url.getUrl());
+
+ if (dlist == null) {
+ dlist = new ArrayList<DeleteRequest>(2);
+ deletes.put(url.getUrl(), dlist);
+ }
+ dlist.add(deleteRequest);
}
- dlist.add(clone(cmd));
- flushDeletes(maxBufferedDeletesPerServer, null, urls, params);
+ flushDeletes(maxBufferedDeletesPerServer, null, null, urls);
}
void addCommit(UpdateRequestExt ureq, CommitUpdateCommand cmd) {
if (cmd == null) return;
+ //System.out.println("piggy back commit on");
// nocommit
ureq.setAction(cmd.optimize ? AbstractUpdateRequest.ACTION.OPTIMIZE
: AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher);
}
- boolean flushAdds(int limit, CommitUpdateCommand ccmd, List<String> urls, ModifiableSolrParams params) {
+ boolean flushAdds(int limit, CommitUpdateCommand ccmd, ModifiableSolrParams params, List<Url> urls) {
// check for pending deletes
- if (alist == null || alist.size() < limit) return false;
-
- UpdateRequestExt ureq = new UpdateRequestExt();
- ureq.setParams(params);
-
- addCommit(ureq, ccmd);
-
- for (AddUpdateCommand cmd : alist) {
- ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
+ UpdateRequestExt ureq = null;
+ for (Url url : urls) {
+ List<AddRequest> alist = adds.get(url.getUrl());
+ if (alist == null || alist.size() < limit) return false;
+ if (ureq == null) {
+ ureq = new UpdateRequestExt();
+
+ addCommit(ureq, ccmd);
+
+ ModifiableSolrParams combinedParams = new ModifiableSolrParams();
+
+ for (AddRequest aReq : alist) {
+ AddUpdateCommand cmd = aReq.cmd;
+ combinedParams.add(aReq.params);
+ ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
+ }
+
+ if (params != null) combinedParams.add(params);
+ if (ureq.getParams() == null) ureq.setParams(new ModifiableSolrParams());
+ ureq.getParams().add(combinedParams);
+ //System.out.println("ureq is:" + ureq);
+ }
+
+ adds.remove(url.getUrl());
+ //System.out.println("flush adds to " + url + " " + alist.size());
+ //System.out.println("commit " + ureq.getAction());
+
+ submit(ureq, url);
}
-
- alist = null;
- submit(ureq, urls);
return true;
}
- boolean flushDeletes(int limit, CommitUpdateCommand ccmd, List<String> urls, ModifiableSolrParams params) {
+ boolean flushDeletes(int limit, CommitUpdateCommand ccmd, ModifiableSolrParams params, List<Url> urls) {
// check for pending deletes
- if (dlist == null || dlist.size() < limit) return false;
-
- UpdateRequestExt ureq = new UpdateRequestExt();
- ureq.setParams(params);
+ //System.out.println("flush deletes to " + urls);
+ UpdateRequestExt ureq = null;
+ for (Url url : urls) {
+ List<DeleteRequest> dlist = deletes.get(url.getUrl());
+ if (dlist == null || dlist.size() < limit) return false;
+ if (ureq == null) {
+ ureq = new UpdateRequestExt();
+
+ addCommit(ureq, ccmd);
+
+ ModifiableSolrParams combinedParams = new ModifiableSolrParams();
+
+ for (DeleteRequest dReq : dlist) {
+ DeleteUpdateCommand cmd = dReq.cmd;
+ combinedParams.add(dReq.params);
+ if (cmd.isDeleteById()) {
+ ureq.deleteById(cmd.getId(), cmd.getVersion());
+ } else {
+ ureq.deleteByQuery(cmd.query);
+ }
+ }
- addCommit(ureq, ccmd);
-
- for (DeleteUpdateCommand cmd : dlist) {
- if (cmd.isDeleteById()) {
- ureq.deleteById(cmd.getId(), cmd.getVersion());
- } else {
- ureq.deleteByQuery(cmd.query);
+ if (params != null) combinedParams.add(params);
+ if (ureq.getParams() == null) ureq.setParams(new ModifiableSolrParams());
+ ureq.getParams().add(combinedParams);
}
+
+ deletes.remove(url.getUrl());
+ submit(ureq, url);
}
-
- dlist = null;
- submit(ureq, urls);
return true;
}
@@ -218,99 +279,104 @@ public class SolrCmdDistributor {
public static class Request {
// TODO: we may need to look at deep cloning this?
- public List<String> urls;
+ public Url url;
UpdateRequestExt ureq;
NamedList<Object> ursp;
int rspCode;
public Exception exception;
- String url;
+ String errorUrl;
+ int retries;
}
- void submit(UpdateRequestExt ureq, List<String> urls) {
+ void submit(UpdateRequestExt ureq, Url url) {
Request sreq = new Request();
- sreq.urls = urls;
+ sreq.errorUrl = url.getUrl();
+ sreq.url = url;
sreq.ureq = ureq;
submit(sreq);
}
- public void submit(final Request sreq) {
+ public void submit(final Request sreq) {
if (completionService == null) {
completionService = new ExecutorCompletionService<Request>(commExecutor);
pending = new HashSet<Future<Request>>();
}
-
- for (final String url : sreq.urls) {
- // TODO: when we break up shards, we might forward
- // to self again - makes things simple here, but we could
- // also have realized this before, done the req locally, and
- // removed self from this list.
-
- Callable<Request> task = new Callable<Request>() {
- @Override
- public Request call() throws Exception {
- Request clonedRequest = new Request();
- clonedRequest.urls = sreq.urls;
- clonedRequest.ureq = sreq.ureq;
+ final String url = sreq.url.getUrl();
+ //System.out.println("submit url:" + url);
+ Callable<Request> task = new Callable<Request>() {
+ @Override
+ public Request call() throws Exception {
+ Request clonedRequest = new Request();
+ clonedRequest.url = sreq.url;
+ clonedRequest.ureq = sreq.ureq;
+
+ // TODO: yeah, this is a little odd...
+ clonedRequest.errorUrl = url;
+
+ try {
+ String fullUrl;
+ if (!url.startsWith("http://") && !url.startsWith("https://")) {
+ fullUrl = "http://" + url;
+ } else {
+ fullUrl = url;
+ }
- // TODO: yeah, this is a little odd...
- clonedRequest.url = url;
-
+ CommonsHttpSolrServer server = new CommonsHttpSolrServer(fullUrl,
+ client);
- try {
- // TODO: what about https?
- String fullUrl;
- if (!url.startsWith("http://")) {
- fullUrl = "http://" + url;
- } else {
- fullUrl = url;
- }
-
- CommonsHttpSolrServer server = new CommonsHttpSolrServer(fullUrl, client);
-
- //server.setSoTimeout(1000);
- //server.setConnectionTimeout(1000);
- clonedRequest.ursp = server.request(clonedRequest.ureq);
-
- // currently no way to get the request body.
- } catch (Exception e) {
- clonedRequest.exception = e;
- if (e instanceof SolrException) {
- clonedRequest.rspCode = ((SolrException) e).code();
- } else {
- clonedRequest.rspCode = -1;
- }
+ clonedRequest.ursp = server.request(clonedRequest.ureq);
+
+ // currently no way to get the request body.
+ } catch (Exception e) {
+ clonedRequest.exception = e;
+ if (e instanceof SolrException) {
+ clonedRequest.rspCode = ((SolrException) e).code();
+ } else {
+ clonedRequest.rspCode = -1;
}
- return clonedRequest;
}
- };
-
- pending.add(completionService.submit(task));
- }
+ return clonedRequest;
+ }
+ };
+
+ pending.add(completionService.submit(task));
+
}
// TODO: need to consider those that call this with block=false now...
- Response checkResponses(boolean block, List<String> urls) {
- Response response = new Response();
+ void checkResponses(boolean block) {
while (pending != null && pending.size() > 0) {
try {
Future<Request> future = block ? completionService.take()
: completionService.poll();
- if (future == null) return null;
+ if (future == null) return;
pending.remove(future);
try {
Request sreq = future.get();
if (sreq.rspCode != 0) {
// error during request
- Exception e = sreq.exception;
- Error error = new Error();
- error.e = e;
- error.url = sreq.url;
- response.errors.add(error);
- response.sreq = sreq;
- SolrException.logOnce(SolrCore.log, "shard update error " + sreq.url + " ("
- + sreq.urls + ")", sreq.exception);
+
+ // if there is a retry url, we want to retry...
+ // TODO: but we really should only retry on connection errors...
+ if (sreq.retries < 5 && sreq.url.checkRetry()) {
+ sreq.retries++;
+ sreq.rspCode = 0;
+ sreq.exception = null;
+ Thread.sleep(1000);
+ submit(sreq);
+ checkResponses(block);
+ } else {
+ Exception e = sreq.exception;
+ Error error = new Error();
+ error.e = e;
+ error.url = sreq.errorUrl;
+ response.errors.add(error);
+ response.sreq = sreq;
+ SolrException.logOnce(SolrCore.log, "shard update error "
+ + sreq.url + " (" + sreq.url + ")", sreq.exception);
+ }
}
} catch (ExecutionException e) {
@@ -324,9 +390,6 @@ public class SolrCmdDistributor {
"interrupted waiting for shard update response", e);
}
}
-
- return response;
-
}
public static class Response {
@@ -338,4 +401,36 @@ public class SolrCmdDistributor {
public String url;
public Exception e;
}
+
+ public Response getResponse() {
+ return response;
+ }
+
+ public static abstract class Url {
+ public abstract String getUrl();
+ public abstract boolean checkRetry();
+ }
+
+ public static class StdUrl extends Url {
+ String url;
+
+ public StdUrl(String url) {
+ this.url = url;
+ }
+
+ @Override
+ public String getUrl() {
+ return url;
+ }
+
+ @Override
+ public String toString() {
+ return url;
+ }
+
+ @Override
+ public boolean checkRetry() {
+ return false;
+ }
+ }
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1225998&r1=1225997&r2=1225998&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Sat Dec 31 00:36:54 2011
@@ -20,7 +20,6 @@ package org.apache.solr.update.processor
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -39,9 +38,10 @@ import org.apache.solr.common.cloud.Clou
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
-import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
+import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.Hash;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreDescriptor;
@@ -54,12 +54,13 @@ import org.apache.solr.update.CommitUpda
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.SolrCmdDistributor;
import org.apache.solr.update.SolrCmdDistributor.Response;
+import org.apache.solr.update.SolrCmdDistributor.Url;
+import org.apache.solr.update.SolrCmdDistributor.StdUrl;
import org.apache.solr.update.UpdateCommand;
import org.apache.solr.update.UpdateHandler;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.VersionBucket;
import org.apache.solr.update.VersionInfo;
-import org.apache.zookeeper.KeeperException;
// NOT mt-safe... create a new processor for each add thread
// TODO: we really should not wait for distrib after local? unless a certain replication factor is asked for
@@ -96,7 +97,7 @@ public class DistributedUpdateProcessor
// method in this update processor
private boolean isLeader = true;
private boolean forwardToLeader = false;
- private List<String> urls;
+ private List<Url> urls;
private String shardId;
@@ -136,9 +137,9 @@ public class DistributedUpdateProcessor
cmdDistrib = new SolrCmdDistributor();
}
- private List<String> setupRequest(int hash) {
+ private List<Url> setupRequest(int hash) {
- List<String> urls = null;
+ List<Url> urls = null;
// if we are in zk mode...
if (zkEnabled) {
@@ -174,8 +175,8 @@ public class DistributedUpdateProcessor
urls = getReplicaUrls(req, collection, shardId, nodeName);
} else {
// I need to forward onto the leader...
- urls = new ArrayList<String>(1);
- urls.add(leaderUrl);
+ urls = new ArrayList<Url>(1);
+ urls.add(new RetryUrl(leaderUrl, zkController.getZkStateReader(), collection, shardId));
forwardToLeader = true;
}
@@ -243,10 +244,6 @@ public class DistributedUpdateProcessor
idField.getType().indexedToReadable(cmd.getIndexedId(), scratch);
addsResponse.add(scratch.toString(), cmd.getVersion());
}
-
- if (urls != null) {
- finish(params);
- }
// TODO: keep track of errors? needs to be done at a higher level though since
// an id may fail before it gets to this processor.
@@ -256,75 +253,20 @@ public class DistributedUpdateProcessor
// TODO: optionally fail if n replicas are not reached...
// nocommit: what the hell - doesnt seem to fail when cannot forward - need to check that...
- private void finish(ModifiableSolrParams params) {
- boolean retry = false;
- int retries = 0;
-
+ private void doFinish() {
// TODO: if not a forward and replication req is not specified, we could
// send in a background thread
-
- Response response;
- do {
- retry = false;
- // TODO: realtime timeout strat
- int timeout = 200;
- response = cmdDistrib.finish(urls, params);
- // nocommit - we may need to tell about more than one error...
- if (response.errors.size() > 0) {
- if (urls.size() == 1 && forwardToLeader) {
- // we should retry a failed forward...
- retry = true;
-
- if (retries++ > 15) {
- timeout = timeout * 2;
- // nocommit
- log.error("we totally failed: set exception to:" + response.errors.get(0).e);
- retry = false;
-
- Exception e = response.errors.get(0).e;
- String newMsg = "shard update error (" + response.sreq.urls + "):"
- + e.getMessage();
- if (e instanceof SolrException) {
- SolrException se = (SolrException) e;
- e = new SolrException(ErrorCode.getErrorCode(se.code()),
- newMsg, se.getCause());
- } else {
- e = new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- newMsg, e);
- }
-
- rsp.setException(e);
- } else {
- try {
- response.sreq.urls = Collections.singletonList(zkController
- .getZkStateReader().getLeaderUrl(collection, shardId));
- response.sreq.exception = null;
-
- // nocommit
- System.out.println("FORWARD FAILED:" + urls + " retry with "
- + response.sreq.urls);
- response.errors.get(0).e.printStackTrace();
- urls = response.sreq.urls;
- } catch (InterruptedException e2) {
- Thread.currentThread().interrupt();
- throw new SolrException(ErrorCode.SERVER_ERROR, e2);
- } catch (KeeperException e2) {
- throw new SolrException(ErrorCode.SERVER_ERROR, e2);
- }
- cmdDistrib.submit(response.sreq);
-
- try {
- Thread.sleep(timeout);
- } catch (InterruptedException e1) {
- Thread.currentThread().interrupt();
- throw new SolrException(ErrorCode.SERVER_ERROR, "");
- }
- }
- }
- } else {
- rsp.setException(null);
- }
- } while(retry);
+
+ cmdDistrib.finish(urls);
+ Response response = cmdDistrib.getResponse();
+ // nocommit - we may need to tell about more than one error...
+ if (response.errors.size() > 0) {
+ // nocommit: set first error...?
+ rsp.setException(response.errors.get(0).e);
+ } else {
+ rsp.setException(null);
+ }
+
// if it is not a forward request, for each fail, try to tell them to
// recover
@@ -524,10 +466,6 @@ public class DistributedUpdateProcessor
idField.getType().indexedToReadable(cmd.getIndexedId(), scratch);
deleteResponse.add(scratch.toString(), cmd.getVersion()); // we're returning the version of the delete.. not the version of the doc we deleted.
}
-
- if (urls != null) {
- finish(params);
- }
}
private boolean versionDelete(DeleteUpdateCommand cmd) throws IOException {
@@ -683,6 +621,7 @@ public class DistributedUpdateProcessor
if (zkEnabled) {
ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
+ System.out.println("distrib commit?");
if (!params.getBool(COMMIT_END_POINT, false)) {
params.set(COMMIT_END_POINT, true);
@@ -691,11 +630,10 @@ public class DistributedUpdateProcessor
String shardZkNodeName = nodeName + "_" + req.getCore().getName();
urls = getReplicaUrls(req, req.getCore().getCoreDescriptor()
.getCloudDescriptor().getCollectionName(), shardZkNodeName);
-
+ System.out.println("send commit to:" + urls);
if (urls != null) {
-
cmdDistrib.distribCommit(cmd, urls, params);
- finish(params);
+ finish();
}
}
}
@@ -703,10 +641,13 @@ public class DistributedUpdateProcessor
@Override
public void finish() throws IOException {
+ if (urls != null) {
+ doFinish();
+ }
if (next != null && urls == null) next.finish();
}
// nocommit: TODO: make map of url to props as well - order to recover code needs core name
- private List<String> getReplicaUrls(SolrQueryRequest req, String collection,
+ private List<Url> getReplicaUrls(SolrQueryRequest req, String collection,
String shardId, String thisNodeName) {
CloudState cloudState = req.getCore().getCoreDescriptor()
.getCoreContainer().getZkController().getCloudState();
@@ -722,14 +663,14 @@ public class DistributedUpdateProcessor
}
Map<String,ZkNodeProps> shardMap = replicas.getShards();
- List<String> urls = new ArrayList<String>(shardMap.size());
+ List<Url> urls = new ArrayList<Url>(shardMap.size());
for (Entry<String,ZkNodeProps> entry : shardMap.entrySet()) {
ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue());
String nodeName = nodeProps.getNodeName();
if (cloudState.liveNodesContain(nodeName) && !nodeName.equals(thisNodeName)) {
String replicaUrl = nodeProps.getCoreUrl();
- urls.add(replicaUrl);
+ urls.add(new StdUrl(replicaUrl));
}
}
if (urls.size() == 0) {
@@ -738,10 +679,10 @@ public class DistributedUpdateProcessor
return urls;
}
- private List<String> getReplicaUrls(SolrQueryRequest req, String collection, String shardZkNodeName) {
+ private List<Url> getReplicaUrls(SolrQueryRequest req, String collection, String shardZkNodeName) {
CloudState cloudState = req.getCore().getCoreDescriptor()
.getCoreContainer().getZkController().getCloudState();
- List<String> urls = new ArrayList<String>();
+ List<Url> urls = new ArrayList<Url>();
Map<String,Slice> slices = cloudState.getSlices(collection);
if (slices == null) {
throw new ZooKeeperException(ErrorCode.BAD_REQUEST,
@@ -756,7 +697,7 @@ public class DistributedUpdateProcessor
ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue());
if (cloudState.liveNodesContain(nodeProps.getNodeName()) && !entry.getKey().equals(shardZkNodeName)) {
String replicaUrl = nodeProps.getCoreUrl();
- urls.add(replicaUrl);
+ urls.add(new StdUrl(replicaUrl));
}
}
}
@@ -778,4 +719,45 @@ public class DistributedUpdateProcessor
BytesRef br = cmd.getIndexedId();
return Hash.murmurhash3_x86_32(br.bytes, br.offset, br.length, 0);
}
+
+ public static class RetryUrl extends Url {
+ String url;
+ private ZkStateReader zkStateReader;
+ private String collection;
+ private String shardId;
+
+ public RetryUrl(String url, ZkStateReader zkStateReader, String collection, String shardId) {
+ this.url = url;
+ this.zkStateReader = zkStateReader;
+ this.collection = collection;
+ this.shardId = shardId;
+ }
+
+ @Override
+ public String getUrl() {
+ return url;
+ }
+
+ @Override
+ public String toString() {
+ return url;
+ }
+
+ @Override
+ public boolean checkRetry() {
+ ZkCoreNodeProps leaderProps;
+ try {
+ leaderProps = new ZkCoreNodeProps(zkStateReader.getLeaderProps(
+ collection, shardId));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+
+ this.url = leaderProps.getCoreUrl();
+
+ return true;
+ }
+ }
+
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java?rev=1225998&r1=1225997&r2=1225998&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java Sat Dec 31 00:36:54 2011
@@ -18,6 +18,7 @@ package org.apache.solr.update;
*/
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import org.apache.solr.BaseDistributedSearchTestCase;
@@ -28,6 +29,8 @@ import org.apache.solr.common.SolrDocume
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.update.SolrCmdDistributor.Response;
+import org.apache.solr.update.SolrCmdDistributor.Url;
+import org.apache.solr.update.SolrCmdDistributor.StdUrl;
public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
@@ -74,14 +77,16 @@ public class SolrCmdDistributorTest exte
@Override
public void doTest() throws Exception {
- del("*:*");
+ //del("*:*");
SolrCmdDistributor cmdDistrib = new SolrCmdDistributor();
ModifiableSolrParams params = new ModifiableSolrParams();
- List<String> urls = new ArrayList<String>();
+ List<Url> urls = new ArrayList<Url>();
- urls.add(((CommonsHttpSolrServer) controlClient).getBaseURL());
+ urls.add(new StdUrl(((CommonsHttpSolrServer) controlClient).getBaseURL()));
+
+ // add one doc to controlClient
AddUpdateCommand cmd = new AddUpdateCommand(null);
cmd.solrDoc = getSolrDoc("id", 1);
@@ -89,7 +94,8 @@ public class SolrCmdDistributorTest exte
CommitUpdateCommand ccmd = new CommitUpdateCommand(null, false);
cmdDistrib.distribCommit(ccmd, urls, params);
- Response response = cmdDistrib.finish(urls, params);
+ cmdDistrib.finish(urls);
+ Response response = cmdDistrib.getResponse();
assertEquals(response.errors.toString(), 0, response.errors.size());
@@ -98,7 +104,9 @@ public class SolrCmdDistributorTest exte
assertEquals(1, numFound);
CommonsHttpSolrServer client2 = (CommonsHttpSolrServer) clients.get(0);
- urls.add(client2.getBaseURL());
+ urls.add(new StdUrl(client2.getBaseURL()));
+
+ // add another 3 docs to both control and client1
cmd.solrDoc = getSolrDoc("id", 2);
cmdDistrib.distribAdd(cmd, urls, params);
@@ -108,18 +116,26 @@ public class SolrCmdDistributorTest exte
cmdDistrib.distribAdd(cmd2, urls, params);
+ AddUpdateCommand cmd3 = new AddUpdateCommand(null);
+ cmd3.solrDoc = getSolrDoc("id", 4);
+
+ cmdDistrib.distribAdd(cmd3, Collections.singletonList(urls.get(0)), params);
+
cmdDistrib.distribCommit(ccmd, urls, params);
- response = cmdDistrib.finish(urls, params);
+ cmdDistrib.finish(urls);
+ response = cmdDistrib.getResponse();
assertEquals(response.errors.toString(), 0, response.errors.size());
SolrDocumentList results = controlClient.query(new SolrQuery("*:*")).getResults();
numFound = results.getNumFound();
- assertEquals(results.toString(), 3, numFound);
+ assertEquals(results.toString(), 4, numFound);
numFound = client2.query(new SolrQuery("*:*")).getResults()
.getNumFound();
- assertEquals(2, numFound);
+ assertEquals(3, numFound);
+
+ // now delete doc 2 which is on both control and client1
DeleteUpdateCommand dcmd = new DeleteUpdateCommand(null);
dcmd.id = "2";
@@ -127,17 +143,18 @@ public class SolrCmdDistributorTest exte
cmdDistrib.distribDelete(dcmd, urls, params);
cmdDistrib.distribCommit(ccmd, urls, params);
- response = cmdDistrib.finish(urls, params);
+ cmdDistrib.finish(urls);
+ response = cmdDistrib.getResponse();
assertEquals(response.errors.toString(), 0, response.errors.size());
results = controlClient.query(new SolrQuery("*:*")).getResults();
numFound = results.getNumFound();
- assertEquals(results.toString(), 2, numFound);
+ assertEquals(results.toString(), 3, numFound);
numFound = client2.query(new SolrQuery("*:*")).getResults()
.getNumFound();
- assertEquals(results.toString(), 1, numFound);
+ assertEquals(results.toString(), 2, numFound);
}
protected void addFields(SolrInputDocument doc, Object... fields) {