You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2011/11/18 05:07:19 UTC
svn commit: r1203481 - in /lucene/dev/branches/solrcloud/solr/core/src:
java/org/apache/solr/update/SolrCmdDistributor.java
java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
test/org/apache/solr/cloud/FullDistributedZkTest.java
Author: markrmiller
Date: Fri Nov 18 04:07:19 2011
New Revision: 1203481
URL: http://svn.apache.org/viewvc?rev=1203481&view=rev
Log:
pull code that distributes cmds into it's own class
Added:
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (with props)
Modified:
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java
Added: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1203481&view=auto
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (added)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Fri Nov 18 04:07:19 2011
@@ -0,0 +1,382 @@
+package org.apache.solr.update;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
+import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
+import org.apache.solr.client.solrj.request.UpdateRequestExt;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.SolrInputField;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.schema.SchemaField;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
+import org.apache.solr.update.processor.UpdateRequestProcessor;
+
+public class SolrCmdDistributor {
+ // TODO: shut this thing down
+ static ThreadPoolExecutor commExecutor = new ThreadPoolExecutor(0,
+ Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
+
+ static HttpClient client;
+
+ static {
+ MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager();
+ mgr.getParams().setDefaultMaxConnectionsPerHost(8);
+ mgr.getParams().setMaxTotalConnections(200);
+ client = new HttpClient(mgr);
+ }
+
+ CompletionService<Request> completionService;
+ Set<Future<Request>> pending;
+
+ private final SolrQueryRequest req;
+ private final SolrQueryResponse rsp;
+
+ private final SchemaField idField;
+
+ int maxBufferedAddsPerServer = 10;
+ int maxBufferedDeletesPerServer = 100;
+
+ private List<AddUpdateCommand> alist;
+ private ArrayList<DeleteUpdateCommand> dlist;
+
+ public SolrCmdDistributor(SolrQueryRequest req,
+ SolrQueryResponse rsp) {
+ this.req = req;
+ this.rsp = rsp;
+ this.idField = req.getSchema().getUniqueKeyField();
+ }
+
+ public void finish(String shardStr) {
+
+ // piggyback on any outstanding adds or deletes if possible.
+ flushAdds(1, null, shardStr);
+ flushDeletes(1, null, shardStr);
+
+ checkResponses(true);
+ }
+
+ public void distribDelete(DeleteUpdateCommand cmd, String shardStr) throws IOException {
+ checkResponses(false);
+
+ if (cmd.id != null) {
+ doDelete(cmd, shardStr);
+ } else if (cmd.query != null) {
+ // TODO: query must be broadcast to all ??
+ doDelete(cmd, shardStr);
+ }
+ }
+
+ public void distribAdd(AddUpdateCommand cmd, String shardStr) throws IOException {
+
+ checkResponses(false);
+
+ SolrInputDocument doc = cmd.getSolrInputDocument();
+ SolrInputField field = doc.getField(idField.getName());
+ if (field == null) {
+ throw new RuntimeException("no id field found");
+ }
+
+ // make sure any pending deletes are flushed
+ flushDeletes(1, null, shardStr);
+
+ // TODO: this is brittle
+ // need to make a clone since these commands may be reused
+ AddUpdateCommand clone = new AddUpdateCommand(req);
+
+ clone.solrDoc = cmd.solrDoc;
+ clone.commitWithin = cmd.commitWithin;
+ clone.overwrite = cmd.overwrite;
+
+ // nocommit: review as far as SOLR-2685
+ // clone.indexedId = cmd.indexedId;
+ // clone.doc = cmd.doc;
+
+
+ if (alist == null) {
+ alist = new ArrayList<AddUpdateCommand>(2);
+ }
+ alist.add(clone);
+
+ flushAdds(maxBufferedAddsPerServer, null, shardStr);
+ }
+
+ public void distribCommit(CommitUpdateCommand cmd, String shardStr)
+ throws IOException {
+
+ // Wait for all outstanding repsonses to make sure that a commit
+ // can't sneak in ahead of adds or deletes we already sent.
+ // We could do this on a per-server basis, but it's more complex
+ // and this solution will lead to commits happening closer together.
+ checkResponses(true);
+
+ // piggyback on any outstanding adds or deletes if possible.
+ // TODO: review this
+ flushAdds(1, cmd, shardStr);
+
+ flushDeletes(1, cmd, shardStr);
+
+
+ UpdateRequestExt ureq = new UpdateRequestExt();
+ // pass on SEEN_LEADER
+ // TODO: perhaps we should just pass all the incoming params...
+ if (ureq.getParams() == null) {
+ ureq.setParams(new ModifiableSolrParams());
+ }
+ String seenLeader = req.getParams().get(
+ DistributedUpdateProcessor.SEEN_LEADER);
+ if (seenLeader != null) {
+ ureq.getParams().add(DistributedUpdateProcessor.SEEN_LEADER, seenLeader);
+ }
+
+ // nocommit: we add the right update chain - we should add the current one?
+ ureq.getParams().add("update.chain", "distrib-update-chain");
+ addCommit(ureq, cmd);
+ submit(ureq, shardStr);
+
+ // if (next != null && shardStr == null) next.processCommit(cmd);
+
+ // if the command wanted to block until everything was committed,
+ // then do that here.
+ // nocommit
+ if (/* cmd.waitFlush || */cmd.waitSearcher) {
+ checkResponses(true);
+ }
+ }
+
+ private void doDelete(DeleteUpdateCommand cmd, String shardStr) throws IOException {
+
+ flushAdds(1, null, shardStr);
+
+ if (dlist == null) {
+ dlist = new ArrayList<DeleteUpdateCommand>(2);
+ }
+ dlist.add(clone(cmd));
+
+ flushDeletes(maxBufferedDeletesPerServer, null, shardStr);
+ }
+
+ void addCommit(UpdateRequestExt ureq, CommitUpdateCommand cmd) {
+ if (cmd == null) return;
+ // nocommit
+ ureq.setAction(cmd.optimize ? AbstractUpdateRequest.ACTION.OPTIMIZE
+ : AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher);
+ }
+
+ boolean flushAdds(int limit, CommitUpdateCommand ccmd, String shardStr) {
+ // check for pending deletes
+ if (alist == null || alist.size() < limit) return false;
+
+ UpdateRequestExt ureq = new UpdateRequestExt();
+ // pass on seen leader
+ if (ureq.getParams() == null) {
+ ureq.setParams(new ModifiableSolrParams());
+ }
+ String seenLeader = req.getParams().get(DistributedUpdateProcessor.SEEN_LEADER);
+ if (seenLeader != null) {
+ ureq.getParams().add(DistributedUpdateProcessor.SEEN_LEADER, seenLeader);
+ }
+ // nocommit: we add the right update chain - we should add the current one?
+ ureq.getParams().add("update.chain", "distrib-update-chain");
+ addCommit(ureq, ccmd);
+
+ for (AddUpdateCommand cmd : alist) {
+ ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
+ }
+
+ alist = null;
+ submit(ureq, shardStr);
+ return true;
+ }
+
+ boolean flushDeletes(int limit, CommitUpdateCommand ccmd, String shardStr) {
+ // check for pending deletes
+ if (dlist == null || dlist.size() < limit) return false;
+
+ UpdateRequestExt ureq = new UpdateRequestExt();
+ // pass on version
+ if (ureq.getParams() == null) {
+ ureq.setParams(new ModifiableSolrParams());
+ }
+
+ String seenLeader = req.getParams().get(DistributedUpdateProcessor.SEEN_LEADER);
+ if (seenLeader != null) {
+ ureq.getParams().add(DistributedUpdateProcessor.SEEN_LEADER, seenLeader);
+ }
+
+ // nocommit: we add the right update chain - we should add the current one?
+ ureq.getParams().add("update.chain", "distrib-update-chain");
+ addCommit(ureq, ccmd);
+ for (DeleteUpdateCommand cmd : dlist) {
+ if (cmd.id != null) {
+ ureq.deleteById(cmd.id);
+ }
+ if (cmd.query != null) {
+ ureq.deleteByQuery(cmd.query);
+ }
+ }
+
+ dlist = null;
+ submit(ureq, shardStr);
+ return true;
+ }
+
+ // TODO: this is brittle
+ private DeleteUpdateCommand clone(DeleteUpdateCommand cmd) {
+ DeleteUpdateCommand c = new DeleteUpdateCommand(req);
+ c.id = cmd.id;
+ c.query = cmd.query;
+ return c;
+ }
+
+ static class Request {
+ // TODO: we may need to look at deep cloning this?
+ String shard;
+ UpdateRequestExt ureq;
+ NamedList<Object> ursp;
+ int rspCode;
+ Exception exception;
+ }
+
+ void submit(UpdateRequestExt ureq, String shardStr) {
+ Request sreq = new Request();
+ sreq.shard = shardStr;
+ sreq.ureq = ureq;
+ submit(sreq);
+ }
+
+ void submit(final Request sreq) {
+ if (completionService == null) {
+ completionService = new ExecutorCompletionService<Request>(commExecutor);
+ pending = new HashSet<Future<Request>>();
+ }
+ String[] shards;
+ // look to see if we should send to multiple servers
+ if (sreq.shard.contains("|")) {
+ shards = sreq.shard.split("\\|");
+ } else {
+ shards = new String[1];
+ shards[0] = sreq.shard;
+ }
+ for (final String shard : shards) {
+ // TODO: when we break up shards, we might forward
+ // to self again - makes things simple here, but we could
+ // also have realized this before, done the req locally, and
+ // removed self from this list.
+
+ Callable<Request> task = new Callable<Request>() {
+ @Override
+ public Request call() throws Exception {
+ Request clonedRequest = new Request();
+ clonedRequest.shard = sreq.shard;
+ clonedRequest.ureq = sreq.ureq;
+
+ try {
+ // TODO: what about https?
+ String url;
+ if (!shard.startsWith("http://")) {
+ url = "http://" + shard;
+ } else {
+ url = shard;
+ }
+ System.out.println("URL:" + url);
+ SolrServer server = new CommonsHttpSolrServer(url, client);
+ clonedRequest.ursp = server.request(clonedRequest.ureq);
+
+ // currently no way to get the request body.
+ } catch (Exception e) {
+ e.printStackTrace(System.out);
+ clonedRequest.exception = e;
+ if (e instanceof SolrException) {
+ clonedRequest.rspCode = ((SolrException) e).code();
+ } else {
+ clonedRequest.rspCode = -1;
+ }
+ }
+ System.out.println("RSPFirst:" + clonedRequest.rspCode);
+ return clonedRequest;
+ }
+ };
+
+ pending.add(completionService.submit(task));
+ }
+ }
+
+ void checkResponses(boolean block) {
+
+ int expectedResponses = pending == null ? 0 : pending.size();
+ int failed = 0;
+ while (pending != null && pending.size() > 0) {
+ try {
+ Future<Request> future = block ? completionService.take()
+ : completionService.poll();
+ if (future == null) return;
+ pending.remove(future);
+
+ try {
+ Request sreq = future.get();
+ System.out.println("RSP:" + sreq.rspCode);
+ if (sreq.rspCode != 0) {
+ // error during request
+ failed++;
+ // use the first exception encountered
+ if (rsp.getException() == null) {
+ Exception e = sreq.exception;
+ String newMsg = "shard update error (" + sreq.shard + "):"
+ + e.getMessage();
+ if (e instanceof SolrException) {
+ SolrException se = (SolrException) e;
+ e = new SolrException(ErrorCode.getErrorCode(se.code()),
+ newMsg, se.getCause());
+ } else {
+ e = new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "newMsg", e);
+ }
+ rsp.setException(e);
+ }
+
+ SolrException.logOnce(SolrCore.log, "shard update error ("
+ + sreq.shard + ")", sreq.exception);
+ }
+
+ } catch (ExecutionException e) {
+ // shouldn't happen since we catch exceptions ourselves
+ SolrException.log(SolrCore.log,
+ "error sending update request to shard", e);
+ }
+
+ } catch (InterruptedException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
+ "interrupted waiting for shard update response", e);
+ }
+ }
+
+ System.out.println("check failed rate:" + failed + " " + expectedResponses
+ / 2);
+ if (failed <= (expectedResponses / 2)) {
+ // don't fail if half or more where fine
+ rsp.setException(null);
+ }
+ }
+}
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1203481&r1=1203480&r2=1203481&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Fri Nov 18 04:07:19 2011
@@ -18,33 +18,14 @@ package org.apache.solr.update.processor
*/
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.Map.Entry;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.commons.lang.NullArgumentException;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRef;
-import org.apache.solr.client.solrj.SolrServer;
-import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
-import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
-import org.apache.solr.client.solrj.request.UpdateRequestExt;
import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.cloud.CloudState;
import org.apache.solr.common.cloud.Slice;
@@ -56,7 +37,6 @@ import org.apache.solr.common.params.Mod
import org.apache.solr.common.util.Hash;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreDescriptor;
-import org.apache.solr.core.SolrCore;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.response.SolrQueryResponse;
@@ -64,6 +44,7 @@ import org.apache.solr.schema.SchemaFiel
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
+import org.apache.solr.update.SolrCmdDistributor;
import org.apache.solr.update.UpdateHandler;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.VersionBucket;
@@ -74,31 +55,9 @@ import org.apache.zookeeper.KeeperExcept
public class DistributedUpdateProcessor extends UpdateRequestProcessor {
public static final String SEEN_LEADER = "leader";
- // TODO: shut this thing down
- static ThreadPoolExecutor commExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
- 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
-
- static HttpClient client;
-
- static {
- MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager();
- mgr.getParams().setDefaultMaxConnectionsPerHost(8);
- mgr.getParams().setMaxTotalConnections(200);
- client = new HttpClient(mgr);
- }
-
- CompletionService<Request> completionService;
- Set<Future<Request>> pending;
-
private final SolrQueryRequest req;
private final SolrQueryResponse rsp;
- private final UpdateRequestProcessor next;;
- private final SchemaField idField;
-
- //private List<String> shards;
-
- int maxBufferedAddsPerServer = 10;
- int maxBufferedDeletesPerServer = 100;
+ private final UpdateRequestProcessor next;
private static final String VERSION_FIELD = "_version_";
private final UpdateHandler updateHandler;
@@ -114,9 +73,9 @@ public class DistributedUpdateProcessor
private boolean forwardToLeader;
private volatile String shardStr;
- private List<AddUpdateCommand> alist;
-
- private ArrayList<DeleteUpdateCommand> dlist;
+ private final SchemaField idField;
+
+ private final SolrCmdDistributor cmdDistrib;
public DistributedUpdateProcessor(SolrQueryRequest req,
SolrQueryResponse rsp, UpdateRequestProcessor next) {
@@ -124,7 +83,6 @@ public class DistributedUpdateProcessor
this.rsp = rsp;
this.next = next;
this.idField = req.getSchema().getUniqueKeyField();
-
// version init
this.updateHandler = req.getCore().getUpdateHandler();
@@ -137,7 +95,8 @@ public class DistributedUpdateProcessor
this.req = req;
//this.rsp = reqInfo != null ? reqInfo.getRsp() : null;
-
+
+ cmdDistrib = new SolrCmdDistributor(req, rsp);
}
private void setupRequest(int hash) {
@@ -243,8 +202,11 @@ public class DistributedUpdateProcessor
setupRequest(hash);
versionAdd(cmd, hash);
-
- distribAdd(cmd);
+ if (shardStr != null) {
+ cmdDistrib.distribAdd(cmd, shardStr);
+ } else {
+ super.processAdd(cmd);
+ }
if (returnVersions && rsp != null) {
if (addsResponse == null) {
@@ -340,45 +302,6 @@ public class DistributedUpdateProcessor
}
-
- private void distribAdd(AddUpdateCommand cmd) throws IOException {
- if (shardStr == null) {
- super.processAdd(cmd);
- return;
- }
-
- checkResponses(false);
-
- SolrInputDocument doc = cmd.getSolrInputDocument();
- SolrInputField field = doc.getField(idField.getName());
- if (field == null) {
- if (next != null) next.processAdd(cmd);
- return;
- }
-
- // make sure any pending deletes are flushed
- flushDeletes(1, null);
-
- // TODO: this is brittle
- // need to make a clone since these commands may be reused
- AddUpdateCommand clone = new AddUpdateCommand(req);
-
- clone.solrDoc = cmd.solrDoc;
- clone.commitWithin = cmd.commitWithin;
- clone.overwrite = cmd.overwrite;
-
- // nocommit: review as far as SOLR-2685
- // clone.indexedId = cmd.indexedId;
- // clone.doc = cmd.doc;
-
-
- if (alist == null) {
- alist = new ArrayList<AddUpdateCommand>(2);
- }
- alist.add(clone);
-
- flushAdds(maxBufferedAddsPerServer, null);
- }
// TODO: this is brittle
private DeleteUpdateCommand clone(DeleteUpdateCommand cmd) {
@@ -388,18 +311,6 @@ public class DistributedUpdateProcessor
return c;
}
- private void doDelete(DeleteUpdateCommand cmd) throws IOException {
-
- flushAdds(1, null);
-
- if (dlist == null) {
- dlist = new ArrayList<DeleteUpdateCommand>(2);
- }
- dlist.add(clone(cmd));
-
- flushDeletes(maxBufferedDeletesPerServer, null);
- }
-
@Override
public void processDelete(DeleteUpdateCommand cmd) throws IOException {
int hash = 0;
@@ -413,7 +324,11 @@ public class DistributedUpdateProcessor
versionDelete(cmd, hash);
- distribDelete(cmd);
+ if (shardStr != null) {
+ cmdDistrib.distribDelete(cmd, shardStr);
+ } else {
+ super.processDelete(cmd);
+ }
// cmd.getIndexId == null when delete by query
if (returnVersions && rsp != null && cmd.getIndexedId() != null) {
@@ -494,276 +409,23 @@ public class DistributedUpdateProcessor
}
}
-
- private void distribDelete(DeleteUpdateCommand cmd) throws IOException {
- if (shardStr == null) {
- super.processDelete(cmd);
- return;
- }
- checkResponses(false);
-
- if (cmd.id != null) {
- doDelete(cmd);
- } else if (cmd.query != null) {
- // TODO: query must be broadcast to all ??
- doDelete(cmd);
- }
- }
@Override
public void processCommit(CommitUpdateCommand cmd) throws IOException {
- String shardStr = null;
// nocommit: make everyone commit?
- distribCommit(cmd, shardStr);
- }
-
- private void distribCommit(CommitUpdateCommand cmd, String shardStr) throws IOException {
- if (shardStr == null) {
+ if (shardStr != null) {
+ cmdDistrib.distribCommit(cmd, shardStr);
+ } else {
super.processCommit(cmd);
- return;
- }
-
- // Wait for all outstanding repsonses to make sure that a commit
- // can't sneak in ahead of adds or deletes we already sent.
- // We could do this on a per-server basis, but it's more complex
- // and this solution will lead to commits happening closer together.
- checkResponses(true);
-
- for (int slot = 0; slot < 1; slot++) {
- // piggyback on any outstanding adds or deletes if possible.
- if (flushAdds(1, cmd)) continue;
- if (flushDeletes( 1, cmd)) continue;
-
- UpdateRequestExt ureq = new UpdateRequestExt();
- // pass on version
- if (ureq.getParams() == null) {
- ureq.setParams(new ModifiableSolrParams());
- }
- String seenLeader = req.getParams().get(SEEN_LEADER);
- if (seenLeader != null) {
- ureq.getParams().add(SEEN_LEADER, seenLeader);
- }
-
- // nocommit: we add the right update chain - we should add the current one?
- ureq.getParams().add("update.chain", "distrib-update-chain");
- addCommit(ureq, cmd);
- submit(ureq);
- }
- //if (next != null && shardStr == null) next.processCommit(cmd);
-
- // if the command wanted to block until everything was committed,
- // then do that here.
- // nocommit
- if (/* cmd.waitFlush || */cmd.waitSearcher) {
- checkResponses(true);
}
}
@Override
public void finish() throws IOException {
-
- // piggyback on any outstanding adds or deletes if possible.
- flushAdds(1, null);
- flushDeletes(1, null);
-
- checkResponses(true);
+ cmdDistrib.finish(shardStr);
if (next != null && shardStr == null) next.finish();
}
- void checkResponses(boolean block) {
-
- int expectedResponses = pending == null ? 0 : pending.size();
- int failed = 0;
- while (pending != null && pending.size() > 0) {
- try {
- Future<Request> future = block ? completionService.take()
- : completionService.poll();
- if (future == null) return;
- pending.remove(future);
-
- try {
- Request sreq = future.get();
- System.out.println("RSP:" + sreq.rspCode);
- if (sreq.rspCode != 0) {
- // error during request
- failed++;
- // use the first exception encountered
- if (rsp.getException() == null) {
- Exception e = sreq.exception;
- String newMsg = "shard update error (" + sreq.shard + "):"
- + e.getMessage();
- if (e instanceof SolrException) {
- SolrException se = (SolrException) e;
- e = new SolrException(ErrorCode.getErrorCode(se.code()),
- newMsg, se.getCause());
- } else {
- e = new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "newMsg", e);
- }
- rsp.setException(e);
- }
-
- SolrException.logOnce(SolrCore.log, "shard update error ("
- + sreq.shard + ")", sreq.exception);
- }
-
- } catch (ExecutionException e) {
- // shouldn't happen since we catch exceptions ourselves
- SolrException.log(SolrCore.log,
- "error sending update request to shard", e);
- }
-
- } catch (InterruptedException e) {
- throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
- "interrupted waiting for shard update response", e);
- }
- }
-
- System.out.println("check failed rate:" + failed + " " + expectedResponses / 2);
- if (failed <= (expectedResponses / 2)) {
- // don't fail if half or more where fine
- rsp.setException(null);
- }
- }
-
- void addCommit(UpdateRequestExt ureq, CommitUpdateCommand cmd) {
- if (cmd == null) return;
- // nocommit
- ureq.setAction(cmd.optimize ? AbstractUpdateRequest.ACTION.OPTIMIZE
- : AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher);
- }
-
- boolean flushAdds(int limit, CommitUpdateCommand ccmd) {
- // check for pending deletes
- if (alist == null || alist.size() < limit) return false;
-
- UpdateRequestExt ureq = new UpdateRequestExt();
- // pass on seen leader
- if (ureq.getParams() == null) {
- ureq.setParams(new ModifiableSolrParams());
- }
- String seenLeader = req.getParams().get(SEEN_LEADER);
- if (seenLeader != null) {
- ureq.getParams().add(SEEN_LEADER, seenLeader);
- }
- // nocommit: we add the right update chain - we should add the current one?
- ureq.getParams().add("update.chain", "distrib-update-chain");
- addCommit(ureq, ccmd);
-
- for (AddUpdateCommand cmd : alist) {
- ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
- }
-
- alist = null;
- submit(ureq);
- return true;
- }
-
- boolean flushDeletes(int limit, CommitUpdateCommand ccmd) {
- // check for pending deletes
- if (dlist == null || dlist.size() < limit) return false;
-
- UpdateRequestExt ureq = new UpdateRequestExt();
- // pass on version
- if (ureq.getParams() == null) {
- ureq.setParams(new ModifiableSolrParams());
- }
-
- String seenLeader = req.getParams().get(SEEN_LEADER);
- if (seenLeader != null) {
- ureq.getParams().add(SEEN_LEADER, seenLeader);
- }
-
- // nocommit: we add the right update chain - we should add the current one?
- ureq.getParams().add("update.chain", "distrib-update-chain");
- addCommit(ureq, ccmd);
- for (DeleteUpdateCommand cmd : dlist) {
- if (cmd.id != null) {
- ureq.deleteById(cmd.id);
- }
- if (cmd.query != null) {
- ureq.deleteByQuery(cmd.query);
- }
- }
-
- dlist = null;
- submit(ureq);
- return true;
- }
-
- static class Request {
- // TODO: we may need to look at deep cloning this?
- String shard;
- UpdateRequestExt ureq;
- NamedList<Object> ursp;
- int rspCode;
- Exception exception;
- }
-
- void submit(UpdateRequestExt ureq) {
- Request sreq = new Request();
- sreq.shard = shardStr;
- sreq.ureq = ureq;
- submit(sreq);
- }
-
- void submit(final Request sreq) {
- if (completionService == null) {
- completionService = new ExecutorCompletionService<Request>(commExecutor);
- pending = new HashSet<Future<Request>>();
- }
- String[] shards;
- // look to see if we should send to multiple servers
- if (sreq.shard.contains("|")) {
- shards = sreq.shard.split("\\|");
- } else {
- shards = new String[1];
- shards[0] = sreq.shard;
- }
- for (final String shard : shards) {
- // TODO: when we break up shards, we might forward
- // to self again - makes things simple here, but we could
- // also have realized this before, done the req locally, and
- // removed self from this list.
-
- Callable<Request> task = new Callable<Request>() {
- @Override
- public Request call() throws Exception {
- Request clonedRequest = new Request();
- clonedRequest.shard = sreq.shard;
- clonedRequest.ureq = sreq.ureq;
-
- try {
- // TODO: what about https?
- String url;
- if (!shard.startsWith("http://")) {
- url = "http://" + shard;
- } else {
- url = shard;
- }
- System.out.println("URL:" + url);
- SolrServer server = new CommonsHttpSolrServer(url, client);
- clonedRequest.ursp = server.request(clonedRequest.ureq);
-
- // currently no way to get the request body.
- } catch (Exception e) {
- e.printStackTrace(System.out);
- clonedRequest.exception = e;
- if (e instanceof SolrException) {
- clonedRequest.rspCode = ((SolrException) e).code();
- } else {
- clonedRequest.rspCode = -1;
- }
- }
- System.out.println("RSPFirst:" + clonedRequest.rspCode);
- return clonedRequest;
- }
- };
-
- pending.add(completionService.submit(task));
- }
- }
-
private String addReplicas(SolrQueryRequest req, String collection,
String shardId, String shardZkNodeName) {
CloudState cloudState = req.getCore().getCoreDescriptor()
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java?rev=1203481&r1=1203480&r2=1203481&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java Fri Nov 18 04:07:19 2011
@@ -82,7 +82,6 @@ public class FullDistributedZkTest exten
public FullDistributedZkTest() {
fixShardCount = true;
shardCount = 6;
-
// TODO: for now, turn off stress because it uses regular clients, and we
// need the cloud client because we kill servers
stress = 0;
@@ -265,7 +264,10 @@ public class FullDistributedZkTest exten
*/
@Override
public void doTest() throws Exception {
-
+ handle.clear();
+ handle.put("QTime", SKIPVAL);
+ handle.put("timestamp", SKIPVAL);
+
del("*:*");
indexr(id,1, i1, 100, tlong, 100,t1,"now is the time for all good men"
@@ -308,10 +310,7 @@ public class FullDistributedZkTest exten
commit();
assertDocCounts();
-
- handle.clear();
- handle.put("QTime", SKIPVAL);
- handle.put("timestamp", SKIPVAL);
+ query("q", "*:*", "sort", "n_tl1 desc");
// random value sort
for (String f : fieldNames) {
@@ -394,6 +393,7 @@ public class FullDistributedZkTest exten
query("q","*:*", "rows",0, "facet","true", "facet.field","{!key='a b/c \\' \\} foo'}"+t1,"facet.limit",5, "facet.shard.limit",5);
handle.remove("facet_fields");
+ //query("q", "*:*");
// index the same document to two shards and make sure things
// don't blow up.
@@ -414,6 +414,8 @@ public class FullDistributedZkTest exten
// our hash is not stable yet in distrib update proc
assertDocCounts();
+ //query("q", "*:*");
+
// kill a shard
JettySolrRunner deadShard = killShard("shard2", 0);
JettySolrRunner deadShard2 = killShard("shard3", 1);
@@ -433,6 +435,8 @@ public class FullDistributedZkTest exten
commit();
+ //query("q", "*:*");
+
// TMP: try adding a doc with CloudSolrServer
CloudSolrServer server = new CloudSolrServer(zkServer.getZkAddress());
server.setDefaultCollection(DEFAULT_COLLECTION);
@@ -452,6 +456,8 @@ public class FullDistributedZkTest exten
commit();
+ //query("q", "*:*");
+
long numFound2 = server.query(query).getResults().getNumFound();
// lets just check that the one doc since last commit made it in...
@@ -499,11 +505,17 @@ public class FullDistributedZkTest exten
.getResults().getNumFound());
// kill the other shard3 replica
- JettySolrRunner deadShard3 = killShard("shard3", 0);
+ // JettySolrRunner deadShard3 = killShard("shard3", 0);
// should fail
//query("q", "id:[1 TO 5]", CommonParams.DEBUG, CommonParams.QUERY);
+ // we can't do this here - we have killed a shard
+ // assertDocCounts();
+
+ // TODO: why is this failing with no servers hosting shard?
+ //query("q", "*:*");
+
// Thread.sleep(10000000000L);
if (DEBUG) {
super.printLayout();