You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2011/12/17 21:16:35 UTC

svn commit: r1215566 - in /lucene/dev/branches/solrcloud/solr: core/src/java/org/apache/solr/update/ core/src/java/org/apache/solr/update/processor/ core/src/test/org/apache/solr/cloud/ solrj/src/java/org/apache/solr/common/cloud/

Author: markrmiller
Date: Sat Dec 17 20:16:34 2011
New Revision: 1215566

URL: http://svn.apache.org/viewvc?rev=1215566&view=rev
Log:
slightly de-uglify some stuff - some of it needs a fresh design still

Modified:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyDistributedZkTest.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java
    lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkNodeProps.java

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1215566&r1=1215565&r2=1215566&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Sat Dec 17 20:16:34 2011
@@ -37,14 +37,12 @@ import org.apache.solr.client.solrj.Solr
 import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
 import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
 import org.apache.solr.client.solrj.request.UpdateRequestExt;
-import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.response.SolrQueryResponse;
-import org.apache.zookeeper.KeeperException;
 
 // TODO: we are not really using the buffering anymore due to DistribUpdateProc...
 // we might want to bring back a form of slots...
@@ -76,45 +74,70 @@ public class SolrCmdDistributor {
   private List<AddUpdateCommand> alist;
   private ArrayList<DeleteUpdateCommand> dlist;
 
-  private ZkController zkController;
-
-  private String collection;
-
+  public static class CmdRequest {
+    public List<ShardInfo> shardInfos;
+    public ModifiableSolrParams params;
+    // we have to retry updates that are just being forwarded to the leader
+    public boolean forwarding;
+    
+    public CmdRequest(List<ShardInfo> shardInfos, ModifiableSolrParams params) {
+      this.shardInfos = shardInfos;
+      this.params = params;
+    }
+    
+    public CmdRequest(List<ShardInfo> shardInfos, ModifiableSolrParams params, boolean forwarding) {
+      this(shardInfos, params);
+      this.forwarding = forwarding;
+    }
+    
+    public void updateUrlsForRetry() {
+      for (ShardInfo shardInfo : shardInfos) {
+        if (shardInfo.retryUrl != null) {
+          shardInfo.url = shardInfo.retryUrl.getRetryUrl();
+        }
+      }
+    }
+  }
   
-  public SolrCmdDistributor(ZkController zkController, String collection, SolrQueryResponse rsp) {
-    //this.req = req;
+  public static class ShardInfo {
+    public String url;
+    public RetryUrl retryUrl;
+  }
+  
+  public static interface RetryUrl {
+    String getRetryUrl();
+  }
+  
+  public SolrCmdDistributor(SolrQueryResponse rsp) {
     this.rsp = rsp;
-    // TODO: kind of a hack to get the latest leader
-    this.zkController = zkController;
-    this.collection = collection;
   }
   
-  public void finish(List<String> shards, ModifiableSolrParams params, boolean forwarding, String shardId) {
+  public void finish(CmdRequest cmdRequest) {
 
     // piggyback on any outstanding adds or deletes if possible.
-    flushAdds(1, null, shards, params, forwarding, shardId);
-    flushDeletes(1, null, shards, params, forwarding, shardId);
+    flushAdds(1, null, cmdRequest);
+    flushDeletes(1, null, cmdRequest);
 
-    checkResponses(true, forwarding, shardId);
+    checkResponses(true, cmdRequest);
   }
   
-  public void distribDelete(DeleteUpdateCommand cmd, List<String> shards, ModifiableSolrParams params, boolean forwarding, String shardId) throws IOException {
-    checkResponses(false, forwarding, shardId);
+  public void distribDelete(DeleteUpdateCommand cmd, CmdRequest cmdRequest) throws IOException {
+    checkResponses(false, cmdRequest);
     
     if (cmd.isDeleteById()) {
-      doDelete(cmd, shards, params, forwarding, shardId);
+      doDelete(cmd, cmdRequest);
     } else {
       // TODO: query must be broadcast to all ??
-      doDelete(cmd, shards, params, forwarding, shardId);
+      doDelete(cmd, cmdRequest);
     }
   }
   
-  public void distribAdd(AddUpdateCommand cmd, List<String> shards, ModifiableSolrParams params, boolean forwarding, String shardId) throws IOException {
+  public void distribAdd(AddUpdateCommand cmd, CmdRequest cmdRequest) throws IOException {
     
-    checkResponses(false, forwarding, shardId);
+    checkResponses(false, cmdRequest);
     
     // make sure any pending deletes are flushed
-    flushDeletes(1, null, shards, params, forwarding, shardId);
+    flushDeletes(1, null, cmdRequest);
     
     // TODO: this is brittle
     // need to make a clone since these commands may be reused
@@ -135,48 +158,48 @@ public class SolrCmdDistributor {
     }
     alist.add(clone);
     
-    flushAdds(maxBufferedAddsPerServer, null, shards, params, forwarding, shardId);
+    flushAdds(maxBufferedAddsPerServer, null, cmdRequest);
   }
   
-  public void distribCommit(CommitUpdateCommand cmd, List<String> shards, ModifiableSolrParams params)
+  public void distribCommit(CommitUpdateCommand cmd, CmdRequest cmdRequest)
       throws IOException {
     
     // Wait for all outstanding repsonses to make sure that a commit
     // can't sneak in ahead of adds or deletes we already sent.
     // We could do this on a per-server basis, but it's more complex
     // and this solution will lead to commits happening closer together.
-    checkResponses(true, false, null);
+    checkResponses(true, cmdRequest);
     
     // piggyback on any outstanding adds or deletes if possible.
     // TODO: review this
-    flushAdds(1, cmd, shards, params, false, null);
+    flushAdds(1, cmd, cmdRequest);
     
-    flushDeletes(1, cmd, shards, params, false, null);
+    flushDeletes(1, cmd, cmdRequest);
     
     UpdateRequestExt ureq = new UpdateRequestExt();
-    ureq.setParams(params);
+    ureq.setParams(cmdRequest.params);
 
     addCommit(ureq, cmd);
-    submit(ureq, shards);
+    submit(ureq, cmdRequest);
     
     // if the command wanted to block until everything was committed,
     // then do that here.
     // nocommit
     if (/* cmd.waitFlush || */cmd.waitSearcher) {
-      checkResponses(true, false, null);
+      checkResponses(true, cmdRequest);
     }
   }
   
-  private void doDelete(DeleteUpdateCommand cmd, List<String> shards, ModifiableSolrParams params, boolean forwarding, String shardId) throws IOException {
+  private void doDelete(DeleteUpdateCommand cmd, CmdRequest cmdRequest) throws IOException {
     
-    flushAdds(1, null, shards, params, forwarding, shardId);
+    flushAdds(1, null, cmdRequest);
     
     if (dlist == null) {
       dlist = new ArrayList<DeleteUpdateCommand>(2);
     }
     dlist.add(clone(cmd));
     
-    flushDeletes(maxBufferedDeletesPerServer, null, shards, params, forwarding, shardId);
+    flushDeletes(maxBufferedDeletesPerServer, null, cmdRequest);
   }
   
   void addCommit(UpdateRequestExt ureq, CommitUpdateCommand cmd) {
@@ -186,12 +209,12 @@ public class SolrCmdDistributor {
         : AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher);
   }
   
-  boolean flushAdds(int limit, CommitUpdateCommand ccmd, List<String> urls, ModifiableSolrParams params, boolean forwarding, String shardId) {
+  boolean flushAdds(int limit, CommitUpdateCommand ccmd, CmdRequest cmdRequest) {
     // check for pending deletes
     if (alist == null || alist.size() < limit) return false;
     
     UpdateRequestExt ureq = new UpdateRequestExt();
-    ureq.setParams(params);
+    ureq.setParams(cmdRequest.params);
     
     addCommit(ureq, ccmd);
     
@@ -200,16 +223,16 @@ public class SolrCmdDistributor {
     }
     
     alist = null;
-    submit(ureq, urls);
+    submit(ureq, cmdRequest);
     return true;
   }
   
-  boolean flushDeletes(int limit, CommitUpdateCommand ccmd, List<String> shards, ModifiableSolrParams params, boolean forwarding, String shardId) {
+  boolean flushDeletes(int limit, CommitUpdateCommand ccmd, CmdRequest cmdRequest) {
     // check for pending deletes
     if (dlist == null || dlist.size() < limit) return false;
     
     UpdateRequestExt ureq = new UpdateRequestExt();
-    ureq.setParams(params);
+    ureq.setParams(cmdRequest.params);
 
     addCommit(ureq, ccmd);
     
@@ -222,7 +245,7 @@ public class SolrCmdDistributor {
     }
     
     dlist = null;
-    submit(ureq, shards);
+    submit(ureq, cmdRequest);
     return true;
   }
   
@@ -236,7 +259,7 @@ public class SolrCmdDistributor {
   
   static class Request {
     // TODO: we may need to look at deep cloning this?
-    List<String> shards;
+    CmdRequest cmdRequest;
     UpdateRequestExt ureq;
     NamedList<Object> ursp;
     int rspCode;
@@ -245,9 +268,9 @@ public class SolrCmdDistributor {
     int retries = 0;
   }
   
-  void submit(UpdateRequestExt ureq, List<String> shards) {
+  void submit(UpdateRequestExt ureq, CmdRequest cmdRequest) {
     Request sreq = new Request();
-    sreq.shards = shards;
+    sreq.cmdRequest = cmdRequest;
     sreq.ureq = ureq;
     submit(sreq);
   }
@@ -258,7 +281,7 @@ public class SolrCmdDistributor {
       pending = new HashSet<Future<Request>>();
     }
 
-    for (final String shard : sreq.shards) {
+    for (final ShardInfo shardInfo : sreq.cmdRequest.shardInfos) {
       // TODO: when we break up shards, we might forward
       // to self again - makes things simple here, but we could
       // also have realized this before, done the req locally, and
@@ -268,16 +291,16 @@ public class SolrCmdDistributor {
         @Override
         public Request call() throws Exception {
           Request clonedRequest = new Request();
-          clonedRequest.shards = sreq.shards;
+          clonedRequest.cmdRequest = sreq.cmdRequest;
           clonedRequest.ureq = sreq.ureq;
-          clonedRequest.url = shard;
+          clonedRequest.url = shardInfo.url;
           try {
             // TODO: what about https?
             String url;
-            if (!shard.startsWith("http://")) {
-              url = "http://" + shard;
+            if (!shardInfo.url.startsWith("http://")) {
+              url = "http://" + shardInfo.url;
             } else {
-              url = shard;
+              url = shardInfo.url;
             }
 
             SolrServer server = new CommonsHttpSolrServer(url, client);
@@ -300,7 +323,7 @@ public class SolrCmdDistributor {
     }
   }
   
-  void checkResponses(boolean block, boolean forwarding, String shardId) {
+  void checkResponses(boolean block, CmdRequest cmdRequest) {
     
     int expectedResponses = pending == null ? 0 : pending.size();
     int nonConnectionErrors = 0;
@@ -336,7 +359,7 @@ public class SolrCmdDistributor {
             // TODO: perhaps we should do more?
             if (rsp.getException() == null) {
               
-              String newMsg = "shard update error (" + sreq.shards + "):"
+              String newMsg = "shard update error (" + sreq.cmdRequest.shardInfos + "):"
                   + e.getMessage();
               if (e instanceof SolrException) {
                 SolrException se = (SolrException) e;
@@ -349,13 +372,13 @@ public class SolrCmdDistributor {
               rsp.setException(e);
             }
             
-            if (forwarding) {
+            if (cmdRequest.forwarding) {
               // this shold be fine because forwarding requests are only to one shard
               failedFowardingRequest = sreq;
             }
             
             SolrException.logOnce(SolrCore.log, "shard update error " + sreq.url + " ("
-                + sreq.shards + ")", sreq.exception);
+                + sreq.cmdRequest.shardInfos + ")", sreq.exception);
           }
           
         } catch (ExecutionException e) {
@@ -379,10 +402,10 @@ public class SolrCmdDistributor {
     // TODO: now we should tell those that failed to try and recover?
 
     if (failed > 0 && nonConnectionErrors == 0) {
-      if (failed == expectedResponses && forwarding) {
+      if (failed == expectedResponses && cmdRequest.forwarding) {
         // this is a pure forwarding request (single url for the leader) and it fully failed -
         // don't reset the exception - TODO: most likely there is now a new
-        // leader - we really should retry the request...
+        // leader - first we should retry the request...
         
         // TODO: we really need to clean this up and apis that allow it...
         if (failedFowardingRequest != null) {
@@ -392,27 +415,20 @@ public class SolrCmdDistributor {
             // TODO Auto-generated catch block
             e1.printStackTrace();
           }
-          // we try again with the latest leader - nocommit: dont retry forever
-          try {
-            zkController.getZkStateReader().getLeaderUrl(collection, shardId);
-          } catch (InterruptedException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-          } catch (KeeperException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-            // eventually this will not throw an exception to get...
+
+          failedFowardingRequest.cmdRequest.updateUrlsForRetry();
+          failedFowardingRequest.retries++;
+
+          if (failedFowardingRequest.retries < 10) {
+            rsp.setException(null);
+            submit(failedFowardingRequest);
+            checkResponses(block, cmdRequest);
           }
-          submit(failedFowardingRequest);
-          checkResponses(block, forwarding, shardId);
         }
       } else {
         // System.out.println("clear exception");
         rsp.setException(null);
       }
-    } else {
-      // make sure exception is cleared in the retry case from above
-      rsp.setException(null);
     }
   }
 }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1215566&r1=1215565&r2=1215566&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Sat Dec 17 20:16:34 2011
@@ -31,7 +31,6 @@ import org.apache.solr.common.SolrExcept
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.SolrInputField;
 import org.apache.solr.common.cloud.CloudState;
-import org.apache.solr.common.cloud.HashPartitioner;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
@@ -48,6 +47,9 @@ import org.apache.solr.update.AddUpdateC
 import org.apache.solr.update.CommitUpdateCommand;
 import org.apache.solr.update.DeleteUpdateCommand;
 import org.apache.solr.update.SolrCmdDistributor;
+import org.apache.solr.update.SolrCmdDistributor.CmdRequest;
+import org.apache.solr.update.SolrCmdDistributor.RetryUrl;
+import org.apache.solr.update.SolrCmdDistributor.ShardInfo;
 import org.apache.solr.update.UpdateCommand;
 import org.apache.solr.update.UpdateHandler;
 import org.apache.solr.update.UpdateLog;
@@ -79,7 +81,7 @@ public class DistributedUpdateProcessor 
   private final SchemaField idField;
   
   private final SolrCmdDistributor cmdDistrib;
-  private HashPartitioner hp;
+
   private boolean zkEnabled = false;
 
   private String collection;
@@ -89,7 +91,7 @@ public class DistributedUpdateProcessor 
   // method in this update processor
   private boolean isLeader = true;
   private boolean forwardToLeader = false;
-  private List<String> shards;
+  private List<String> urls;
   private String shardId;
 
   
@@ -126,13 +128,13 @@ public class DistributedUpdateProcessor 
       collection = cloudDesc.getCollectionName();
     }
     
-    cmdDistrib = new SolrCmdDistributor(zkController, collection, rsp); // TODO: we put the last result (which could be complicated due to 
+    cmdDistrib = new SolrCmdDistributor(rsp); // TODO: we put the last result (which could be complicated due to 
                                               // multiple docs per req) in the rsp - this is whack
   }
 
   private List<String> setupRequest(int hash) {
     
-    List<String> shards = null;
+    List<String> urls = null;
 
     // if we are in zk mode...
     if (zkEnabled) {
@@ -155,15 +157,15 @@ public class DistributedUpdateProcessor 
         isLeader = nodeName.equals(leaderNodeName);
         
         if (req.getParams().getBool(SEEN_LEADER, false)) {
-          // we are coming from the leader, just go local - set no shardStr
+          // we are coming from the leader, just go local - set no urlstr
         } else if (isLeader) {
           // that means I want to forward onto my replicas...
           // so get the replicas...
-          shards = getReplicaUrls(req, collection, shardId, nodeName);
+          urls = getReplicaUrls(req, collection, shardId, nodeName);
         } else {
           // I need to forward onto the leader...
-          shards = new ArrayList<String>(1);
-          shards.add(leaderUrl);
+          urls = new ArrayList<String>(1);
+          urls.add(leaderUrl);
           forwardToLeader = true;
         }
         
@@ -177,7 +179,7 @@ public class DistributedUpdateProcessor 
       }
     }
 
-    return shards;
+    return urls;
   }
   
   private String getShard(int hash, String collection, CloudState cloudState) {
@@ -194,7 +196,7 @@ public class DistributedUpdateProcessor 
     int hash = 0;
     if (zkEnabled) {
       hash = hash(cmd);
-      shards = setupRequest(hash);
+      urls = setupRequest(hash);
     } else {
       // even in non zk mode, tests simulate updates from a leader
       isLeader = !req.getParams().getBool(SEEN_LEADER, false);
@@ -211,12 +213,12 @@ public class DistributedUpdateProcessor 
     }
     
     ModifiableSolrParams params = null;
-    if (shards != null) {
+    if (urls != null) {
       params = new ModifiableSolrParams(req.getParams());
       if (isLeader) {
         params.set(SEEN_LEADER, true);
       }
-      cmdDistrib.distribAdd(cmd, shards, params, forwardToLeader, shardId);
+      cmdDistrib.distribAdd(cmd,  new CmdRequest(getShardInfos(urls), params, forwardToLeader));
     } else {
       // nocommit: At a minimum, local updates must be protected by synchronization
       // right now we count on versionAdd to do the local add
@@ -234,8 +236,10 @@ public class DistributedUpdateProcessor 
       addsResponse.add(scratch.toString(), cmd.getVersion());
     }
 
-    if (shards != null) {
-      cmdDistrib.finish(shards, params, forwardToLeader, shardId);
+    if (urls != null) {
+      CmdRequest cmdRequest = new CmdRequest(getShardInfos(urls), params);
+      cmdRequest.forwarding = forwardToLeader;
+      cmdDistrib.finish(cmdRequest);
     }
     
     // TODO: keep track of errors?  needs to be done at a higher level though since
@@ -244,6 +248,37 @@ public class DistributedUpdateProcessor 
     // processor too.
   }
 
+  private List<ShardInfo> getShardInfos(List<String> urls) {
+    List<ShardInfo> shardInfos = new ArrayList<ShardInfo>(urls.size());
+    for (String url : urls) {
+      ShardInfo shardInfo = new ShardInfo();
+      shardInfo.url = url;
+      shardInfo.retryUrl = new RetryUrl() {
+        
+        @Override
+        public String getRetryUrl() {
+          // TODO: if we are now the leader, we forward through http...
+          ZkNodeProps leaderProps = null;
+          try {
+            leaderProps = zkController.getZkStateReader().getLeaderProps(
+                collection, shardId);
+          } catch (InterruptedException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+          } catch (KeeperException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+          }
+          
+          String leaderUrl = leaderProps.get(ZkStateReader.URL_PROP);
+          return leaderUrl;
+        }
+      };
+      shardInfos.add(shardInfo);
+    }
+    return shardInfos;
+  }
+
   // must be synchronized by bucket
   private void doLocalAdd(AddUpdateCommand cmd) throws IOException {
     super.processAdd(cmd);
@@ -369,7 +404,7 @@ public class DistributedUpdateProcessor 
     int hash = 0;
     if (zkEnabled) {
       hash = hash(cmd);
-      shards = setupRequest(hash);
+      urls = setupRequest(hash);
     } else {
       // even in non zk mode, tests simulate updates from a leader
       isLeader = !req.getParams().getBool(SEEN_LEADER, false);
@@ -384,14 +419,15 @@ public class DistributedUpdateProcessor 
       // TODO: do we need to add anything to the response?
       return;
     }
-
+    CmdRequest cmdRequest = null;
     ModifiableSolrParams params = null;
-    if (shards != null) {
+    if (urls != null) {
       params = new ModifiableSolrParams(req.getParams());
       if (isLeader) {
         params.set(SEEN_LEADER, true);
       }
-      cmdDistrib.distribDelete(cmd, shards, params, forwardToLeader, shardId);
+      cmdRequest = new CmdRequest(getShardInfos(urls), params, forwardToLeader);
+      cmdDistrib.distribDelete(cmd, cmdRequest);
     } else {
       // super.processDelete(cmd);
     }
@@ -407,8 +443,8 @@ public class DistributedUpdateProcessor 
       deleteResponse.add(scratch.toString(), cmd.getVersion());  // we're returning the version of the delete.. not the version of the doc we deleted.
     }
     
-    if (shards != null) {
-      cmdDistrib.finish(shards, params, forwardToLeader, shardId);
+    if (urls != null) {
+      cmdDistrib.finish(cmdRequest);
     }
   }
 
@@ -571,12 +607,13 @@ public class DistributedUpdateProcessor 
         String nodeName = req.getCore().getCoreDescriptor().getCoreContainer()
             .getZkController().getNodeName();
         String shardZkNodeName = nodeName + "_" + req.getCore().getName();
-        shards = getReplicaUrls(req, req.getCore().getCoreDescriptor()
+        urls = getReplicaUrls(req, req.getCore().getCoreDescriptor()
             .getCloudDescriptor().getCollectionName(), shardZkNodeName);
 
-        if (shards != null) {
-          cmdDistrib.distribCommit(cmd, shards, params);
-          cmdDistrib.finish(shards, params, forwardToLeader, shardId);
+        if (urls != null) {
+          CmdRequest cmdRequest = new CmdRequest(getShardInfos(urls), params);
+          cmdDistrib.distribCommit(cmd, cmdRequest);
+          cmdDistrib.finish(cmdRequest);
         }
       }
     }
@@ -584,7 +621,7 @@ public class DistributedUpdateProcessor 
   
   @Override
   public void finish() throws IOException {
-    if (next != null && shards == null) next.finish();
+    if (next != null && urls == null) next.finish();
   }
   
   private List<String> getReplicaUrls(SolrQueryRequest req, String collection,

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyDistributedZkTest.java?rev=1215566&r1=1215565&r2=1215566&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyDistributedZkTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyDistributedZkTest.java Sat Dec 17 20:16:34 2011
@@ -48,6 +48,10 @@ public class ChaosMonkeyDistributedZkTes
     
     StopableIndexingThread indexThread = new StopableIndexingThread(0);
     indexThread.start();
+    StopableIndexingThread indexThread2 = new StopableIndexingThread(0);
+    indexThread2.start();
+    StopableIndexingThread indexThread3 = new StopableIndexingThread(0);
+    indexThread3.start();
     
     chaosMonkey.startTheMonkey();
     
@@ -56,6 +60,8 @@ public class ChaosMonkeyDistributedZkTes
     chaosMonkey.stopTheMonkey();
     
     indexThread.safeStop();
+    indexThread2.safeStop();
+    indexThread3.safeStop();
     
     // try and wait for any replications and what not to finish...
     // TODO: I suppose we should poll zk here about state
@@ -63,7 +69,7 @@ public class ChaosMonkeyDistributedZkTes
     
     commit();
     
-    // does not pass yet
+    // does not always pass yet
     checkShardConsistency();
     
     System.out.println("control docs:" + controlClient.query(new SolrQuery("*:*")).getResults().getNumFound() + "\n\n");

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java?rev=1215566&r1=1215565&r2=1215566&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/cloud/FullDistributedZkTest.java Sat Dec 17 20:16:34 2011
@@ -912,6 +912,7 @@ public class FullDistributedZkTest exten
     return rsp;
   }
   
+  // TODO: also do some deletes
   class StopableIndexingThread extends Thread {
     private volatile boolean stop = false;
     private int startI;

Modified: lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkNodeProps.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkNodeProps.java?rev=1215566&r1=1215565&r2=1215566&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkNodeProps.java (original)
+++ lucene/dev/branches/solrcloud/solr/solrj/src/java/org/apache/solr/common/cloud/ZkNodeProps.java Sat Dec 17 20:16:34 2011
@@ -44,6 +44,16 @@ public class ZkNodeProps implements JSON
     propMap = new HashMap<String,String>();
   }
   
+  public ZkNodeProps(String... keyVals) {
+    if (keyVals.length % 2 != 0) {
+      throw new IllegalArgumentException("arguments should be key,value");
+    }
+    propMap = new HashMap<String,String>();
+    for (int i = 0; i < keyVals.length; i+=2) {
+      propMap.put(keyVals[i], keyVals[i+1]);
+    }
+  }
+  
   public Set<String> keySet() {
     return Collections.unmodifiableSet(propMap.keySet());
   }