You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2011/12/17 21:16:35 UTC
svn commit: r1215566 - in /lucene/dev/branches/solrcloud/solr:
core/src/java/org/apache/solr/update/
core/src/java/org/apache/solr/update/processor/
core/src/test/org/apache/solr/cloud/
solrj/src/java/org/apache/solr/common/cloud/
Author: markrmiller
Date: Sat Dec 17 20:16:34 2011
New Revision: 1215566
URL: http://svn.apache.org/viewvc?rev=1215566&view=rev
Log:
slightly de-uglify some stuff - some of it needs a fresh design still
Modified:
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyDistributedZkTest.java
lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java
lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkNodeProps.java
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1215566&r1=1215565&r2=1215566&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Sat Dec 17 20:16:34 2011
@@ -37,14 +37,12 @@ import org.apache.solr.client.solrj.Solr
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.UpdateRequestExt;
-import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.SolrCore;
import org.apache.solr.response.SolrQueryResponse;
-import org.apache.zookeeper.KeeperException;
// TODO: we are not really using the buffering anymore due to DistribUpdateProc...
// we might want to bring back a form of slots...
@@ -76,45 +74,70 @@ public class SolrCmdDistributor {
private List<AddUpdateCommand> alist;
private ArrayList<DeleteUpdateCommand> dlist;
- private ZkController zkController;
-
- private String collection;
-
+ public static class CmdRequest {
+ public List<ShardInfo> shardInfos;
+ public ModifiableSolrParams params;
+ // we have to retry updates that are just being forwarded to the leader
+ public boolean forwarding;
+
+ public CmdRequest(List<ShardInfo> shardInfos, ModifiableSolrParams params) {
+ this.shardInfos = shardInfos;
+ this.params = params;
+ }
+
+ public CmdRequest(List<ShardInfo> shardInfos, ModifiableSolrParams params, boolean forwarding) {
+ this(shardInfos, params);
+ this.forwarding = forwarding;
+ }
+
+ public void updateUrlsForRetry() {
+ for (ShardInfo shardInfo : shardInfos) {
+ if (shardInfo.retryUrl != null) {
+ shardInfo.url = shardInfo.retryUrl.getRetryUrl();
+ }
+ }
+ }
+ }
- public SolrCmdDistributor(ZkController zkController, String collection, SolrQueryResponse rsp) {
- //this.req = req;
+ public static class ShardInfo {
+ public String url;
+ public RetryUrl retryUrl;
+ }
+
+ public static interface RetryUrl {
+ String getRetryUrl();
+ }
+
+ public SolrCmdDistributor(SolrQueryResponse rsp) {
this.rsp = rsp;
- // TODO: kind of a hack to get the latest leader
- this.zkController = zkController;
- this.collection = collection;
}
- public void finish(List<String> shards, ModifiableSolrParams params, boolean forwarding, String shardId) {
+ public void finish(CmdRequest cmdRequest) {
// piggyback on any outstanding adds or deletes if possible.
- flushAdds(1, null, shards, params, forwarding, shardId);
- flushDeletes(1, null, shards, params, forwarding, shardId);
+ flushAdds(1, null, cmdRequest);
+ flushDeletes(1, null, cmdRequest);
- checkResponses(true, forwarding, shardId);
+ checkResponses(true, cmdRequest);
}
- public void distribDelete(DeleteUpdateCommand cmd, List<String> shards, ModifiableSolrParams params, boolean forwarding, String shardId) throws IOException {
- checkResponses(false, forwarding, shardId);
+ public void distribDelete(DeleteUpdateCommand cmd, CmdRequest cmdRequest) throws IOException {
+ checkResponses(false, cmdRequest);
if (cmd.isDeleteById()) {
- doDelete(cmd, shards, params, forwarding, shardId);
+ doDelete(cmd, cmdRequest);
} else {
// TODO: query must be broadcast to all ??
- doDelete(cmd, shards, params, forwarding, shardId);
+ doDelete(cmd, cmdRequest);
}
}
- public void distribAdd(AddUpdateCommand cmd, List<String> shards, ModifiableSolrParams params, boolean forwarding, String shardId) throws IOException {
+ public void distribAdd(AddUpdateCommand cmd, CmdRequest cmdRequest) throws IOException {
- checkResponses(false, forwarding, shardId);
+ checkResponses(false, cmdRequest);
// make sure any pending deletes are flushed
- flushDeletes(1, null, shards, params, forwarding, shardId);
+ flushDeletes(1, null, cmdRequest);
// TODO: this is brittle
// need to make a clone since these commands may be reused
@@ -135,48 +158,48 @@ public class SolrCmdDistributor {
}
alist.add(clone);
- flushAdds(maxBufferedAddsPerServer, null, shards, params, forwarding, shardId);
+ flushAdds(maxBufferedAddsPerServer, null, cmdRequest);
}
- public void distribCommit(CommitUpdateCommand cmd, List<String> shards, ModifiableSolrParams params)
+ public void distribCommit(CommitUpdateCommand cmd, CmdRequest cmdRequest)
throws IOException {
// Wait for all outstanding repsonses to make sure that a commit
// can't sneak in ahead of adds or deletes we already sent.
// We could do this on a per-server basis, but it's more complex
// and this solution will lead to commits happening closer together.
- checkResponses(true, false, null);
+ checkResponses(true, cmdRequest);
// piggyback on any outstanding adds or deletes if possible.
// TODO: review this
- flushAdds(1, cmd, shards, params, false, null);
+ flushAdds(1, cmd, cmdRequest);
- flushDeletes(1, cmd, shards, params, false, null);
+ flushDeletes(1, cmd, cmdRequest);
UpdateRequestExt ureq = new UpdateRequestExt();
- ureq.setParams(params);
+ ureq.setParams(cmdRequest.params);
addCommit(ureq, cmd);
- submit(ureq, shards);
+ submit(ureq, cmdRequest);
// if the command wanted to block until everything was committed,
// then do that here.
// nocommit
if (/* cmd.waitFlush || */cmd.waitSearcher) {
- checkResponses(true, false, null);
+ checkResponses(true, cmdRequest);
}
}
- private void doDelete(DeleteUpdateCommand cmd, List<String> shards, ModifiableSolrParams params, boolean forwarding, String shardId) throws IOException {
+ private void doDelete(DeleteUpdateCommand cmd, CmdRequest cmdRequest) throws IOException {
- flushAdds(1, null, shards, params, forwarding, shardId);
+ flushAdds(1, null, cmdRequest);
if (dlist == null) {
dlist = new ArrayList<DeleteUpdateCommand>(2);
}
dlist.add(clone(cmd));
- flushDeletes(maxBufferedDeletesPerServer, null, shards, params, forwarding, shardId);
+ flushDeletes(maxBufferedDeletesPerServer, null, cmdRequest);
}
void addCommit(UpdateRequestExt ureq, CommitUpdateCommand cmd) {
@@ -186,12 +209,12 @@ public class SolrCmdDistributor {
: AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher);
}
- boolean flushAdds(int limit, CommitUpdateCommand ccmd, List<String> urls, ModifiableSolrParams params, boolean forwarding, String shardId) {
+ boolean flushAdds(int limit, CommitUpdateCommand ccmd, CmdRequest cmdRequest) {
// check for pending deletes
if (alist == null || alist.size() < limit) return false;
UpdateRequestExt ureq = new UpdateRequestExt();
- ureq.setParams(params);
+ ureq.setParams(cmdRequest.params);
addCommit(ureq, ccmd);
@@ -200,16 +223,16 @@ public class SolrCmdDistributor {
}
alist = null;
- submit(ureq, urls);
+ submit(ureq, cmdRequest);
return true;
}
- boolean flushDeletes(int limit, CommitUpdateCommand ccmd, List<String> shards, ModifiableSolrParams params, boolean forwarding, String shardId) {
+ boolean flushDeletes(int limit, CommitUpdateCommand ccmd, CmdRequest cmdRequest) {
// check for pending deletes
if (dlist == null || dlist.size() < limit) return false;
UpdateRequestExt ureq = new UpdateRequestExt();
- ureq.setParams(params);
+ ureq.setParams(cmdRequest.params);
addCommit(ureq, ccmd);
@@ -222,7 +245,7 @@ public class SolrCmdDistributor {
}
dlist = null;
- submit(ureq, shards);
+ submit(ureq, cmdRequest);
return true;
}
@@ -236,7 +259,7 @@ public class SolrCmdDistributor {
static class Request {
// TODO: we may need to look at deep cloning this?
- List<String> shards;
+ CmdRequest cmdRequest;
UpdateRequestExt ureq;
NamedList<Object> ursp;
int rspCode;
@@ -245,9 +268,9 @@ public class SolrCmdDistributor {
int retries = 0;
}
- void submit(UpdateRequestExt ureq, List<String> shards) {
+ void submit(UpdateRequestExt ureq, CmdRequest cmdRequest) {
Request sreq = new Request();
- sreq.shards = shards;
+ sreq.cmdRequest = cmdRequest;
sreq.ureq = ureq;
submit(sreq);
}
@@ -258,7 +281,7 @@ public class SolrCmdDistributor {
pending = new HashSet<Future<Request>>();
}
- for (final String shard : sreq.shards) {
+ for (final ShardInfo shardInfo : sreq.cmdRequest.shardInfos) {
// TODO: when we break up shards, we might forward
// to self again - makes things simple here, but we could
// also have realized this before, done the req locally, and
@@ -268,16 +291,16 @@ public class SolrCmdDistributor {
@Override
public Request call() throws Exception {
Request clonedRequest = new Request();
- clonedRequest.shards = sreq.shards;
+ clonedRequest.cmdRequest = sreq.cmdRequest;
clonedRequest.ureq = sreq.ureq;
- clonedRequest.url = shard;
+ clonedRequest.url = shardInfo.url;
try {
// TODO: what about https?
String url;
- if (!shard.startsWith("http://")) {
- url = "http://" + shard;
+ if (!shardInfo.url.startsWith("http://")) {
+ url = "http://" + shardInfo.url;
} else {
- url = shard;
+ url = shardInfo.url;
}
SolrServer server = new CommonsHttpSolrServer(url, client);
@@ -300,7 +323,7 @@ public class SolrCmdDistributor {
}
}
- void checkResponses(boolean block, boolean forwarding, String shardId) {
+ void checkResponses(boolean block, CmdRequest cmdRequest) {
int expectedResponses = pending == null ? 0 : pending.size();
int nonConnectionErrors = 0;
@@ -336,7 +359,7 @@ public class SolrCmdDistributor {
// TODO: perhaps we should do more?
if (rsp.getException() == null) {
- String newMsg = "shard update error (" + sreq.shards + "):"
+ String newMsg = "shard update error (" + sreq.cmdRequest.shardInfos + "):"
+ e.getMessage();
if (e instanceof SolrException) {
SolrException se = (SolrException) e;
@@ -349,13 +372,13 @@ public class SolrCmdDistributor {
rsp.setException(e);
}
- if (forwarding) {
+ if (cmdRequest.forwarding) {
// this shold be fine because forwarding requests are only to one shard
failedFowardingRequest = sreq;
}
SolrException.logOnce(SolrCore.log, "shard update error " + sreq.url + " ("
- + sreq.shards + ")", sreq.exception);
+ + sreq.cmdRequest.shardInfos + ")", sreq.exception);
}
} catch (ExecutionException e) {
@@ -379,10 +402,10 @@ public class SolrCmdDistributor {
// TODO: now we should tell those that failed to try and recover?
if (failed > 0 && nonConnectionErrors == 0) {
- if (failed == expectedResponses && forwarding) {
+ if (failed == expectedResponses && cmdRequest.forwarding) {
// this is a pure forwarding request (single url for the leader) and it fully failed -
// don't reset the exception - TODO: most likely there is now a new
- // leader - we really should retry the request...
+ // leader - first we should retry the request...
// TODO: we really need to clean this up and apis that allow it...
if (failedFowardingRequest != null) {
@@ -392,27 +415,20 @@ public class SolrCmdDistributor {
// TODO Auto-generated catch block
e1.printStackTrace();
}
- // we try again with the latest leader - nocommit: dont retry forever
- try {
- zkController.getZkStateReader().getLeaderUrl(collection, shardId);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (KeeperException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- // eventually this will not throw an exception to get...
+
+ failedFowardingRequest.cmdRequest.updateUrlsForRetry();
+ failedFowardingRequest.retries++;
+
+ if (failedFowardingRequest.retries < 10) {
+ rsp.setException(null);
+ submit(failedFowardingRequest);
+ checkResponses(block, cmdRequest);
}
- submit(failedFowardingRequest);
- checkResponses(block, forwarding, shardId);
}
} else {
// System.out.println("clear exception");
rsp.setException(null);
}
- } else {
- // make sure exception is cleared in the retry case from above
- rsp.setException(null);
}
}
}
Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1215566&r1=1215565&r2=1215566&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Sat Dec 17 20:16:34 2011
@@ -31,7 +31,6 @@ import org.apache.solr.common.SolrExcept
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.cloud.CloudState;
-import org.apache.solr.common.cloud.HashPartitioner;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
@@ -48,6 +47,9 @@ import org.apache.solr.update.AddUpdateC
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.SolrCmdDistributor;
+import org.apache.solr.update.SolrCmdDistributor.CmdRequest;
+import org.apache.solr.update.SolrCmdDistributor.RetryUrl;
+import org.apache.solr.update.SolrCmdDistributor.ShardInfo;
import org.apache.solr.update.UpdateCommand;
import org.apache.solr.update.UpdateHandler;
import org.apache.solr.update.UpdateLog;
@@ -79,7 +81,7 @@ public class DistributedUpdateProcessor
private final SchemaField idField;
private final SolrCmdDistributor cmdDistrib;
- private HashPartitioner hp;
+
private boolean zkEnabled = false;
private String collection;
@@ -89,7 +91,7 @@ public class DistributedUpdateProcessor
// method in this update processor
private boolean isLeader = true;
private boolean forwardToLeader = false;
- private List<String> shards;
+ private List<String> urls;
private String shardId;
@@ -126,13 +128,13 @@ public class DistributedUpdateProcessor
collection = cloudDesc.getCollectionName();
}
- cmdDistrib = new SolrCmdDistributor(zkController, collection, rsp); // TODO: we put the last result (which could be complicated due to
+ cmdDistrib = new SolrCmdDistributor(rsp); // TODO: we put the last result (which could be complicated due to
// multiple docs per req) in the rsp - this is whack
}
private List<String> setupRequest(int hash) {
- List<String> shards = null;
+ List<String> urls = null;
// if we are in zk mode...
if (zkEnabled) {
@@ -155,15 +157,15 @@ public class DistributedUpdateProcessor
isLeader = nodeName.equals(leaderNodeName);
if (req.getParams().getBool(SEEN_LEADER, false)) {
- // we are coming from the leader, just go local - set no shardStr
+ // we are coming from the leader, just go local - set no urlstr
} else if (isLeader) {
// that means I want to forward onto my replicas...
// so get the replicas...
- shards = getReplicaUrls(req, collection, shardId, nodeName);
+ urls = getReplicaUrls(req, collection, shardId, nodeName);
} else {
// I need to forward onto the leader...
- shards = new ArrayList<String>(1);
- shards.add(leaderUrl);
+ urls = new ArrayList<String>(1);
+ urls.add(leaderUrl);
forwardToLeader = true;
}
@@ -177,7 +179,7 @@ public class DistributedUpdateProcessor
}
}
- return shards;
+ return urls;
}
private String getShard(int hash, String collection, CloudState cloudState) {
@@ -194,7 +196,7 @@ public class DistributedUpdateProcessor
int hash = 0;
if (zkEnabled) {
hash = hash(cmd);
- shards = setupRequest(hash);
+ urls = setupRequest(hash);
} else {
// even in non zk mode, tests simulate updates from a leader
isLeader = !req.getParams().getBool(SEEN_LEADER, false);
@@ -211,12 +213,12 @@ public class DistributedUpdateProcessor
}
ModifiableSolrParams params = null;
- if (shards != null) {
+ if (urls != null) {
params = new ModifiableSolrParams(req.getParams());
if (isLeader) {
params.set(SEEN_LEADER, true);
}
- cmdDistrib.distribAdd(cmd, shards, params, forwardToLeader, shardId);
+ cmdDistrib.distribAdd(cmd, new CmdRequest(getShardInfos(urls), params, forwardToLeader));
} else {
// nocommit: At a minimum, local updates must be protected by synchronization
// right now we count on versionAdd to do the local add
@@ -234,8 +236,10 @@ public class DistributedUpdateProcessor
addsResponse.add(scratch.toString(), cmd.getVersion());
}
- if (shards != null) {
- cmdDistrib.finish(shards, params, forwardToLeader, shardId);
+ if (urls != null) {
+ CmdRequest cmdRequest = new CmdRequest(getShardInfos(urls), params);
+ cmdRequest.forwarding = forwardToLeader;
+ cmdDistrib.finish(cmdRequest);
}
// TODO: keep track of errors? needs to be done at a higher level though since
@@ -244,6 +248,37 @@ public class DistributedUpdateProcessor
// processor too.
}
+ private List<ShardInfo> getShardInfos(List<String> urls) {
+ List<ShardInfo> shardInfos = new ArrayList<ShardInfo>(urls.size());
+ for (String url : urls) {
+ ShardInfo shardInfo = new ShardInfo();
+ shardInfo.url = url;
+ shardInfo.retryUrl = new RetryUrl() {
+
+ @Override
+ public String getRetryUrl() {
+ // TODO: if we are now the leader, we forward through http...
+ ZkNodeProps leaderProps = null;
+ try {
+ leaderProps = zkController.getZkStateReader().getLeaderProps(
+ collection, shardId);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (KeeperException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ String leaderUrl = leaderProps.get(ZkStateReader.URL_PROP);
+ return leaderUrl;
+ }
+ };
+ shardInfos.add(shardInfo);
+ }
+ return shardInfos;
+ }
+
// must be synchronized by bucket
private void doLocalAdd(AddUpdateCommand cmd) throws IOException {
super.processAdd(cmd);
@@ -369,7 +404,7 @@ public class DistributedUpdateProcessor
int hash = 0;
if (zkEnabled) {
hash = hash(cmd);
- shards = setupRequest(hash);
+ urls = setupRequest(hash);
} else {
// even in non zk mode, tests simulate updates from a leader
isLeader = !req.getParams().getBool(SEEN_LEADER, false);
@@ -384,14 +419,15 @@ public class DistributedUpdateProcessor
// TODO: do we need to add anything to the response?
return;
}
-
+ CmdRequest cmdRequest = null;
ModifiableSolrParams params = null;
- if (shards != null) {
+ if (urls != null) {
params = new ModifiableSolrParams(req.getParams());
if (isLeader) {
params.set(SEEN_LEADER, true);
}
- cmdDistrib.distribDelete(cmd, shards, params, forwardToLeader, shardId);
+ cmdRequest = new CmdRequest(getShardInfos(urls), params, forwardToLeader);
+ cmdDistrib.distribDelete(cmd, cmdRequest);
} else {
// super.processDelete(cmd);
}
@@ -407,8 +443,8 @@ public class DistributedUpdateProcessor
deleteResponse.add(scratch.toString(), cmd.getVersion()); // we're returning the version of the delete.. not the version of the doc we deleted.
}
- if (shards != null) {
- cmdDistrib.finish(shards, params, forwardToLeader, shardId);
+ if (urls != null) {
+ cmdDistrib.finish(cmdRequest);
}
}
@@ -571,12 +607,13 @@ public class DistributedUpdateProcessor
String nodeName = req.getCore().getCoreDescriptor().getCoreContainer()
.getZkController().getNodeName();
String shardZkNodeName = nodeName + "_" + req.getCore().getName();
- shards = getReplicaUrls(req, req.getCore().getCoreDescriptor()
+ urls = getReplicaUrls(req, req.getCore().getCoreDescriptor()
.getCloudDescriptor().getCollectionName(), shardZkNodeName);
- if (shards != null) {
- cmdDistrib.distribCommit(cmd, shards, params);
- cmdDistrib.finish(shards, params, forwardToLeader, shardId);
+ if (urls != null) {
+ CmdRequest cmdRequest = new CmdRequest(getShardInfos(urls), params);
+ cmdDistrib.distribCommit(cmd, cmdRequest);
+ cmdDistrib.finish(cmdRequest);
}
}
}
@@ -584,7 +621,7 @@ public class DistributedUpdateProcessor
@Override
public void finish() throws IOException {
- if (next != null && shards == null) next.finish();
+ if (next != null && urls == null) next.finish();
}
private List<String> getReplicaUrls(SolrQueryRequest req, String collection,
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyDistributedZkTest.java?rev=1215566&r1=1215565&r2=1215566&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyDistributedZkTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyDistributedZkTest.java Sat Dec 17 20:16:34 2011
@@ -48,6 +48,10 @@ public class ChaosMonkeyDistributedZkTes
StopableIndexingThread indexThread = new StopableIndexingThread(0);
indexThread.start();
+ StopableIndexingThread indexThread2 = new StopableIndexingThread(0);
+ indexThread2.start();
+ StopableIndexingThread indexThread3 = new StopableIndexingThread(0);
+ indexThread3.start();
chaosMonkey.startTheMonkey();
@@ -56,6 +60,8 @@ public class ChaosMonkeyDistributedZkTes
chaosMonkey.stopTheMonkey();
indexThread.safeStop();
+ indexThread2.safeStop();
+ indexThread3.safeStop();
// try and wait for any replications and what not to finish...
// TODO: I suppose we should poll zk here about state
@@ -63,7 +69,7 @@ public class ChaosMonkeyDistributedZkTes
commit();
- // does not pass yet
+ // does not always pass yet
checkShardConsistency();
System.out.println("control docs:" + controlClient.query(new SolrQuery("*:*")).getResults().getNumFound() + "\n\n");
Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java?rev=1215566&r1=1215565&r2=1215566&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java Sat Dec 17 20:16:34 2011
@@ -912,6 +912,7 @@ public class FullDistributedZkTest exten
return rsp;
}
+ // TODO: also do some deletes
class StopableIndexingThread extends Thread {
private volatile boolean stop = false;
private int startI;
Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkNodeProps.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkNodeProps.java?rev=1215566&r1=1215565&r2=1215566&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkNodeProps.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkNodeProps.java Sat Dec 17 20:16:34 2011
@@ -44,6 +44,16 @@ public class ZkNodeProps implements JSON
propMap = new HashMap<String,String>();
}
+ public ZkNodeProps(String... keyVals) {
+ if (keyVals.length % 2 != 0) {
+ throw new IllegalArgumentException("arguments should be key,value");
+ }
+ propMap = new HashMap<String,String>();
+ for (int i = 0; i < keyVals.length; i+=2) {
+ propMap.put(keyVals[i], keyVals[i+1]);
+ }
+ }
+
public Set<String> keySet() {
return Collections.unmodifiableSet(propMap.keySet());
}