You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2011/12/31 01:36:54 UTC

svn commit: r1225998 - in /lucene/dev/branches/solrcloud/solr/core/src: java/org/apache/solr/update/SolrCmdDistributor.java java/org/apache/solr/update/processor/DistributedUpdateProcessor.java test/org/apache/solr/update/SolrCmdDistributorTest.java

Author: markrmiller
Date: Sat Dec 31 00:36:54 2011
New Revision: 1225998

URL: http://svn.apache.org/viewvc?rev=1225998&view=rev
Log:
add buffering to the SolrCmdDistributor - also move retries back into SolrCmdDistributor

Modified:
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
    lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
    lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1225998&r1=1225997&r2=1225998&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Sat Dec 31 00:36:54 2011
@@ -19,8 +19,10 @@ package org.apache.solr.update;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
@@ -41,8 +43,8 @@ import org.apache.solr.common.params.Mod
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.SolrCore;
 
-// TODO: we are not really using the buffering anymore due to DistribUpdateProc...
-// we might want to bring back a form of slots...
+
+
 public class SolrCmdDistributor {
   // TODO: shut this thing down
   static ThreadPoolExecutor commExecutor = new ThreadPoolExecutor(0,
@@ -60,43 +62,50 @@ public class SolrCmdDistributor {
   CompletionService<Request> completionService;
   Set<Future<Request>> pending;
   
-  //private final SolrQueryRequest req;
-  //private final SolrQueryResponse rsp;
-
-  //private final SchemaField idField;
-  
   int maxBufferedAddsPerServer = 10;
   int maxBufferedDeletesPerServer = 100;
+
+  private Response response = new Response();
   
-  private List<AddUpdateCommand> alist;
-  private ArrayList<DeleteUpdateCommand> dlist;
+  private final Map<String,List<AddRequest>> adds = new HashMap<String,List<AddRequest>>();
+  private final Map<String,List<DeleteRequest>> deletes = new HashMap<String,List<DeleteRequest>>();
   
-  public Response finish(List<String> urls, ModifiableSolrParams params) {
+  class AddRequest {
+    AddUpdateCommand cmd;
+    ModifiableSolrParams params;
+  }
+  
+  class DeleteRequest {
+    DeleteUpdateCommand cmd;
+    ModifiableSolrParams params;
+  }
+  
+  public void finish(List<Url> urls) {
 
     // piggyback on any outstanding adds or deletes if possible.
-    flushAdds(1, null, urls, params);
-    flushDeletes(1, null, urls, params);
+    flushAdds(1, null, null, urls);
+    flushDeletes(1, null, null, urls);
 
-    return checkResponses(true, urls);
+    checkResponses(true);
   }
   
-  public void distribDelete(DeleteUpdateCommand cmd, List<String> urls, ModifiableSolrParams params) throws IOException {
-    checkResponses(false, urls);
+  public void distribDelete(DeleteUpdateCommand cmd, List<Url> urls, ModifiableSolrParams params) throws IOException {
+    checkResponses(false);
     
     if (cmd.isDeleteById()) {
       doDelete(cmd, urls, params);
     } else {
-      // TODO: query must be broadcast to all ??
+      // nocommit TODO: query must be broadcast to all ??
       doDelete(cmd, urls, params);
     }
   }
   
-  public void distribAdd(AddUpdateCommand cmd, List<String> urls, ModifiableSolrParams params) throws IOException {
+  public void distribAdd(AddUpdateCommand cmd, List<Url> urls, ModifiableSolrParams params) throws IOException {
     
-    checkResponses(false, urls);
+    checkResponses(false);
     
     // make sure any pending deletes are flushed
-    flushDeletes(1, null, urls, params);
+    flushDeletes(1, null, null, urls);
     
     // TODO: this is brittle
     // need to make a clone since these commands may be reused
@@ -106,105 +115,157 @@ public class SolrCmdDistributor {
     clone.commitWithin = cmd.commitWithin;
     clone.overwrite = cmd.overwrite;
     clone.setVersion(cmd.getVersion());
-    
+    AddRequest addRequest = new AddRequest();
+    addRequest.cmd = clone;
+    addRequest.params = params;
     // nocommit: review as far as SOLR-2685
     // clone.indexedId = cmd.indexedId;
     // clone.doc = cmd.doc;
-    
-
-    if (alist == null) {
-      alist = new ArrayList<AddUpdateCommand>(2);
+    for (Url url : urls) {
+      List<AddRequest> alist = adds.get(url.getUrl());
+      if (alist == null) {
+        alist = new ArrayList<AddRequest>(2);
+        adds.put(url.getUrl(), alist);
+      }
+      alist.add(addRequest);
+     // System.out.println("buffer add to " + url);
     }
-    alist.add(clone);
     
-    flushAdds(maxBufferedAddsPerServer, null, urls, params);
+    flushAdds(maxBufferedAddsPerServer, null, null, urls);
   }
   
-  public void distribCommit(CommitUpdateCommand cmd, List<String> urls, ModifiableSolrParams params)
+  public void distribCommit(CommitUpdateCommand cmd, List<Url> urls, ModifiableSolrParams params)
       throws IOException {
     
     // Wait for all outstanding repsonses to make sure that a commit
     // can't sneak in ahead of adds or deletes we already sent.
     // We could do this on a per-server basis, but it's more complex
     // and this solution will lead to commits happening closer together.
-    checkResponses(true, urls);
-    
+    checkResponses(true);
+
     // piggyback on any outstanding adds or deletes if possible.
-    // TODO: review this
-    flushAdds(1, cmd, urls, params);
-    
-    flushDeletes(1, cmd, urls, params);
+    boolean piggied = false;
+    if (!flushAdds(1, cmd, params, urls)) {
+      if (flushDeletes(1, cmd, params, urls)) piggied = true;
+    } else {
+      piggied = true;
+    }
     
-    UpdateRequestExt ureq = new UpdateRequestExt();
-    ureq.setParams(params);
-
-    addCommit(ureq, cmd);
-    submit(ureq, urls);
+    if (!piggied) {
+      UpdateRequestExt ureq = new UpdateRequestExt();
+      ureq.setParams(params);
+      
+      addCommit(ureq, cmd);
+      
+      for (Url url : urls) {
+        submit(ureq, url);
+      }
+    }
     
     // if the command wanted to block until everything was committed,
     // then do that here.
     // nocommit
     if (/* cmd.waitFlush || */cmd.waitSearcher) {
-      checkResponses(true, urls);
+      checkResponses(true);
     }
   }
   
-  private void doDelete(DeleteUpdateCommand cmd, List<String> urls, ModifiableSolrParams params) throws IOException {
+  private void doDelete(DeleteUpdateCommand cmd, List<Url> urls,
+      ModifiableSolrParams params) throws IOException {
     
-    flushAdds(1, null, urls, params);
+    flushAdds(1, null, null, urls);
     
-    if (dlist == null) {
-      dlist = new ArrayList<DeleteUpdateCommand>(2);
+    DeleteUpdateCommand clonedCmd = clone(cmd);
+    DeleteRequest deleteRequest = new DeleteRequest();
+    deleteRequest.cmd = clonedCmd;
+    deleteRequest.params = params;
+    for (Url url : urls) {
+      List<DeleteRequest> dlist = deletes.get(url.getUrl());
+      
+      if (dlist == null) {
+        dlist = new ArrayList<DeleteRequest>(2);
+        deletes.put(url.getUrl(), dlist);
+      }
+      dlist.add(deleteRequest);
     }
-    dlist.add(clone(cmd));
     
-    flushDeletes(maxBufferedDeletesPerServer, null, urls, params);
+    flushDeletes(maxBufferedDeletesPerServer, null, null, urls);
   }
   
   void addCommit(UpdateRequestExt ureq, CommitUpdateCommand cmd) {
     if (cmd == null) return;
+    //System.out.println("piggy back commit on");
     // nocommit
     ureq.setAction(cmd.optimize ? AbstractUpdateRequest.ACTION.OPTIMIZE
         : AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher);
   }
   
-  boolean flushAdds(int limit, CommitUpdateCommand ccmd, List<String> urls, ModifiableSolrParams params) {
+  boolean flushAdds(int limit, CommitUpdateCommand ccmd, ModifiableSolrParams params, List<Url> urls) {
     // check for pending deletes
-    if (alist == null || alist.size() < limit) return false;
-    
-    UpdateRequestExt ureq = new UpdateRequestExt();
-    ureq.setParams(params);
-    
-    addCommit(ureq, ccmd);
-    
-    for (AddUpdateCommand cmd : alist) {
-      ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
+    UpdateRequestExt ureq = null;
+    for (Url url : urls) {
+      List<AddRequest> alist = adds.get(url.getUrl());
+      if (alist == null || alist.size() < limit) return false;
+      if (ureq == null) {
+        ureq = new UpdateRequestExt();
+        
+        addCommit(ureq, ccmd);
+        
+        ModifiableSolrParams combinedParams = new ModifiableSolrParams();
+        
+        for (AddRequest aReq : alist) {
+          AddUpdateCommand cmd = aReq.cmd;
+          combinedParams.add(aReq.params);
+          ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
+        }
+        
+        if (params != null) combinedParams.add(params);
+        if (ureq.getParams() == null) ureq.setParams(new ModifiableSolrParams());
+        ureq.getParams().add(combinedParams);
+        //System.out.println("ureq is:" + ureq);
+      }
+      
+      adds.remove(url.getUrl());
+      //System.out.println("flush adds to " + url + " " + alist.size());
+      //System.out.println("commit " + ureq.getAction());
+      
+      submit(ureq, url);
     }
-    
-    alist = null;
-    submit(ureq, urls);
     return true;
   }
   
-  boolean flushDeletes(int limit, CommitUpdateCommand ccmd, List<String> urls, ModifiableSolrParams params) {
+  boolean flushDeletes(int limit, CommitUpdateCommand ccmd, ModifiableSolrParams params, List<Url> urls) {
     // check for pending deletes
-    if (dlist == null || dlist.size() < limit) return false;
-    
-    UpdateRequestExt ureq = new UpdateRequestExt();
-    ureq.setParams(params);
+    //System.out.println("flush deletes to " + urls);
+    UpdateRequestExt ureq = null;
+    for (Url url : urls) {
+      List<DeleteRequest> dlist = deletes.get(url.getUrl());
+      if (dlist == null || dlist.size() < limit) return false;
+      if (ureq == null) {
+        ureq = new UpdateRequestExt();
+ 
+        addCommit(ureq, ccmd);
+        
+        ModifiableSolrParams combinedParams = new ModifiableSolrParams();
+        
+        for (DeleteRequest dReq : dlist) {
+          DeleteUpdateCommand cmd = dReq.cmd;
+          combinedParams.add(dReq.params);
+          if (cmd.isDeleteById()) {
+            ureq.deleteById(cmd.getId(), cmd.getVersion());
+          } else {
+            ureq.deleteByQuery(cmd.query);
+          }
+        }
 
-    addCommit(ureq, ccmd);
-    
-    for (DeleteUpdateCommand cmd : dlist) {
-      if (cmd.isDeleteById()) {
-        ureq.deleteById(cmd.getId(), cmd.getVersion());
-      } else {
-        ureq.deleteByQuery(cmd.query);
+        if (params != null) combinedParams.add(params);
+        if (ureq.getParams() == null) ureq.setParams(new ModifiableSolrParams());
+        ureq.getParams().add(combinedParams);
       }
+      
+      deletes.remove(url.getUrl());
+      submit(ureq, url);
     }
-    
-    dlist = null;
-    submit(ureq, urls);
     return true;
   }
   
@@ -218,99 +279,104 @@ public class SolrCmdDistributor {
   
   public static class Request {
     // TODO: we may need to look at deep cloning this?
-    public List<String> urls;
+    public Url url;
     UpdateRequestExt ureq;
     NamedList<Object> ursp;
     int rspCode;
     public Exception exception;
-    String url;
+    String errorUrl;
+    int retries;
   }
   
-  void submit(UpdateRequestExt ureq, List<String> urls) {
+  void submit(UpdateRequestExt ureq, Url url) {
     Request sreq = new Request();
-    sreq.urls = urls;
+    sreq.errorUrl = url.getUrl();
+    sreq.url = url;
     sreq.ureq = ureq;
     submit(sreq);
   }
   
- public void submit(final Request sreq) {
+  public void submit(final Request sreq) {
     if (completionService == null) {
       completionService = new ExecutorCompletionService<Request>(commExecutor);
       pending = new HashSet<Future<Request>>();
     }
-
-    for (final String url : sreq.urls) {
-      // TODO: when we break up shards, we might forward
-      // to self again - makes things simple here, but we could
-      // also have realized this before, done the req locally, and
-      // removed self from this list.
-      
-      Callable<Request> task = new Callable<Request>() {
-        @Override
-        public Request call() throws Exception {
-          Request clonedRequest = new Request();
-          clonedRequest.urls = sreq.urls;
-          clonedRequest.ureq = sreq.ureq;
+    final String url = sreq.url.getUrl();
+    //System.out.println("submit url:" + url);
+    Callable<Request> task = new Callable<Request>() {
+      @Override
+      public Request call() throws Exception {
+        Request clonedRequest = new Request();
+        clonedRequest.url = sreq.url;
+        clonedRequest.ureq = sreq.ureq;
+        
+        // TODO: yeah, this is a little odd...
+        clonedRequest.errorUrl = url;
+        
+        try {
+          String fullUrl;
+          if (!url.startsWith("http://") && !url.startsWith("https://")) {
+            fullUrl = "http://" + url;
+          } else {
+            fullUrl = url;
+          }
           
-          // TODO: yeah, this is a little odd...
-          clonedRequest.url = url;
-
+          CommonsHttpSolrServer server = new CommonsHttpSolrServer(fullUrl,
+              client);
           
-          try {
-            // TODO: what about https?
-            String fullUrl;
-            if (!url.startsWith("http://")) {
-              fullUrl = "http://" + url;
-            } else {
-              fullUrl = url;
-            }
-
-            CommonsHttpSolrServer server = new CommonsHttpSolrServer(fullUrl, client);
-   
-            //server.setSoTimeout(1000);
-            //server.setConnectionTimeout(1000);
-            clonedRequest.ursp = server.request(clonedRequest.ureq);
-            
-            // currently no way to get the request body.
-          } catch (Exception e) {
-            clonedRequest.exception = e;
-            if (e instanceof SolrException) {
-              clonedRequest.rspCode = ((SolrException) e).code();
-            } else {
-              clonedRequest.rspCode = -1;
-            }
+          clonedRequest.ursp = server.request(clonedRequest.ureq);
+          
+          // currently no way to get the request body.
+        } catch (Exception e) {
+          clonedRequest.exception = e;
+          if (e instanceof SolrException) {
+            clonedRequest.rspCode = ((SolrException) e).code();
+          } else {
+            clonedRequest.rspCode = -1;
           }
-          return clonedRequest;
         }
-      };
-      
-      pending.add(completionService.submit(task));
-    }
+        return clonedRequest;
+      }
+    };
+    
+    pending.add(completionService.submit(task));
+    
   }
   
   // TODO: need to consider those that call this with block=false now...
-  Response checkResponses(boolean block, List<String> urls) {
-    Response response = new Response();
+  void checkResponses(boolean block) {
 
     while (pending != null && pending.size() > 0) {
       try {
         Future<Request> future = block ? completionService.take()
             : completionService.poll();
-        if (future == null) return null;
+        if (future == null) return;
         pending.remove(future);
         
         try {
           Request sreq = future.get();
           if (sreq.rspCode != 0) {
             // error during request
-            Exception e = sreq.exception;
-            Error error = new Error();
-            error.e = e;
-            error.url = sreq.url;
-            response.errors.add(error);
-            response.sreq = sreq;
-            SolrException.logOnce(SolrCore.log, "shard update error " + sreq.url + " ("
-                + sreq.urls + ")", sreq.exception);
+            
+            // if there is a retry url, we want to retry...
+            // TODO: but we really should only retry on connection errors...
+            if (sreq.retries < 5 && sreq.url.checkRetry()) {
+              sreq.retries++;
+              sreq.rspCode = 0;
+              sreq.exception = null;
+              Thread.sleep(1000);
+              submit(sreq);
+              checkResponses(block);
+            } else {
+              Exception e = sreq.exception;
+              Error error = new Error();
+              error.e = e;
+              error.url = sreq.errorUrl;
+              response.errors.add(error);
+              response.sreq = sreq;
+              SolrException.logOnce(SolrCore.log, "shard update error "
+                  + sreq.url + " (" + sreq.url + ")", sreq.exception);
+            }
           }
           
         } catch (ExecutionException e) {
@@ -324,9 +390,6 @@ public class SolrCmdDistributor {
             "interrupted waiting for shard update response", e);
       }
     }
-
-    return response;
-    
   }
   
   public static class Response {
@@ -338,4 +401,36 @@ public class SolrCmdDistributor {
     public String url;
     public Exception e;
   }
+
+  public Response getResponse() {
+    return response;
+  }
+  
+  public static abstract class Url {
+    public abstract String getUrl();
+    public abstract boolean checkRetry();
+  }
+
+  public static class StdUrl extends Url {
+    String url;
+    
+    public StdUrl(String url) {
+      this.url = url;
+    }
+    
+    @Override
+    public String getUrl() {
+      return url;
+    }
+    
+    @Override
+    public String toString() {
+      return url;
+    }
+
+    @Override
+    public boolean checkRetry() {
+      return false;
+    }
+  }
 }

Modified: lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1225998&r1=1225997&r2=1225998&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Sat Dec 31 00:36:54 2011
@@ -20,7 +20,6 @@ package org.apache.solr.update.processor
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -39,9 +38,10 @@ import org.apache.solr.common.cloud.Clou
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.cloud.ZooKeeperException;
-import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
+import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.Hash;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.CoreDescriptor;
@@ -54,12 +54,13 @@ import org.apache.solr.update.CommitUpda
 import org.apache.solr.update.DeleteUpdateCommand;
 import org.apache.solr.update.SolrCmdDistributor;
 import org.apache.solr.update.SolrCmdDistributor.Response;
+import org.apache.solr.update.SolrCmdDistributor.Url;
+import org.apache.solr.update.SolrCmdDistributor.StdUrl;
 import org.apache.solr.update.UpdateCommand;
 import org.apache.solr.update.UpdateHandler;
 import org.apache.solr.update.UpdateLog;
 import org.apache.solr.update.VersionBucket;
 import org.apache.solr.update.VersionInfo;
-import org.apache.zookeeper.KeeperException;
 
 // NOT mt-safe... create a new processor for each add thread
 // TODO: we really should not wait for distrib after local? unless a certain replication factor is asked for
@@ -96,7 +97,7 @@ public class DistributedUpdateProcessor 
   // method in this update processor
   private boolean isLeader = true;
   private boolean forwardToLeader = false;
-  private List<String> urls;
+  private List<Url> urls;
   private String shardId;
 
   
@@ -136,9 +137,9 @@ public class DistributedUpdateProcessor 
     cmdDistrib = new SolrCmdDistributor();
   }
 
-  private List<String> setupRequest(int hash) {
+  private List<Url> setupRequest(int hash) {
     
-    List<String> urls = null;
+    List<Url> urls = null;
 
     // if we are in zk mode...
     if (zkEnabled) {
@@ -174,8 +175,8 @@ public class DistributedUpdateProcessor 
           urls = getReplicaUrls(req, collection, shardId, nodeName);
         } else {
           // I need to forward onto the leader...
-          urls = new ArrayList<String>(1);
-          urls.add(leaderUrl);
+          urls = new ArrayList<Url>(1);
+          urls.add(new RetryUrl(leaderUrl, zkController.getZkStateReader(), collection, shardId));
           forwardToLeader = true;
         }
         
@@ -243,10 +244,6 @@ public class DistributedUpdateProcessor 
       idField.getType().indexedToReadable(cmd.getIndexedId(), scratch);
       addsResponse.add(scratch.toString(), cmd.getVersion());
     }
-
-    if (urls != null) {
-      finish(params);
-    }
     
     // TODO: keep track of errors?  needs to be done at a higher level though since
     // an id may fail before it gets to this processor.
@@ -256,75 +253,20 @@ public class DistributedUpdateProcessor 
  
   // TODO: optionally fail if n replicas are not reached...
   // nocommit: what the hell - doesnt seem to fail when cannot forward - need to check that...
-  private void finish(ModifiableSolrParams params) {
-    boolean retry = false;
-    int retries = 0;
-
+  private void doFinish() {
     // TODO: if not a forward and replication req is not specified, we could
     // send in a background thread
-    
-    Response response;
-    do {
-      retry = false;
-      // TODO: realtime timeout strat
-      int timeout = 200;
-      response = cmdDistrib.finish(urls, params);
-      // nocommit - we may need to tell about more than one error...
-      if (response.errors.size() > 0) { 
-        if (urls.size() == 1 && forwardToLeader) {
-          // we should retry a failed forward...
-          retry = true;
-
-          if (retries++ > 15) {
-            timeout = timeout * 2;
-            // nocommit
-            log.error("we totally failed: set exception to:" + response.errors.get(0).e);
-            retry = false;
-            
-            Exception e = response.errors.get(0).e;
-            String newMsg = "shard update error (" + response.sreq.urls + "):"
-                + e.getMessage();
-            if (e instanceof SolrException) {
-              SolrException se = (SolrException) e;
-              e = new SolrException(ErrorCode.getErrorCode(se.code()),
-                  newMsg, se.getCause());
-            } else {
-              e = new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-                  newMsg, e);
-            }
-            
-            rsp.setException(e);
-          } else {
-            try {
-              response.sreq.urls = Collections.singletonList(zkController
-                  .getZkStateReader().getLeaderUrl(collection, shardId));
-              response.sreq.exception = null;
-              
-              // nocommit
-              System.out.println("FORWARD FAILED:" + urls + " retry with "
-                  + response.sreq.urls);
-              response.errors.get(0).e.printStackTrace();
-              urls = response.sreq.urls;
-            } catch (InterruptedException e2) {
-              Thread.currentThread().interrupt();
-              throw new SolrException(ErrorCode.SERVER_ERROR, e2);
-            } catch (KeeperException e2) {
-              throw new SolrException(ErrorCode.SERVER_ERROR, e2);
-            }
-            cmdDistrib.submit(response.sreq);
-            
-            try {
-              Thread.sleep(timeout);
-            } catch (InterruptedException e1) {
-              Thread.currentThread().interrupt();
-              throw new SolrException(ErrorCode.SERVER_ERROR, "");
-            }
-          }
-        }
-      } else {
-        rsp.setException(null);
-      }
-    } while(retry);
+
+    cmdDistrib.finish(urls);
+    Response response = cmdDistrib.getResponse();
+    // nocommit - we may need to tell about more than one error...
+    if (response.errors.size() > 0) {
+      // nocommit: set first error...?
+      rsp.setException(response.errors.get(0).e);
+    } else {
+      rsp.setException(null);
+    }
+   
     
     // if it is not a forward request, for each fail, try to tell them to
     // recover
@@ -524,10 +466,6 @@ public class DistributedUpdateProcessor 
       idField.getType().indexedToReadable(cmd.getIndexedId(), scratch);
       deleteResponse.add(scratch.toString(), cmd.getVersion());  // we're returning the version of the delete.. not the version of the doc we deleted.
     }
-    
-    if (urls != null) {
-      finish(params);
-    }
   }
 
   private boolean versionDelete(DeleteUpdateCommand cmd) throws IOException {
@@ -683,6 +621,7 @@ public class DistributedUpdateProcessor 
 
     if (zkEnabled) {
       ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
+      System.out.println("distrib commit?");
       if (!params.getBool(COMMIT_END_POINT, false)) {
         params.set(COMMIT_END_POINT, true);
 
@@ -691,11 +630,10 @@ public class DistributedUpdateProcessor 
         String shardZkNodeName = nodeName + "_" + req.getCore().getName();
         urls = getReplicaUrls(req, req.getCore().getCoreDescriptor()
             .getCloudDescriptor().getCollectionName(), shardZkNodeName);
-
+        System.out.println("send commit to:" + urls);
         if (urls != null) {
-
           cmdDistrib.distribCommit(cmd, urls, params);
-          finish(params);
+          finish();
         }
       }
     }
@@ -703,10 +641,13 @@ public class DistributedUpdateProcessor 
   
   @Override
   public void finish() throws IOException {
+    if (urls != null) {
+      doFinish();
+    }
     if (next != null && urls == null) next.finish();
   }
   // nocommit: TODO: make map of url to props as well - order to recover code needs core name
-  private List<String> getReplicaUrls(SolrQueryRequest req, String collection,
+  private List<Url> getReplicaUrls(SolrQueryRequest req, String collection,
       String shardId, String thisNodeName) {
     CloudState cloudState = req.getCore().getCoreDescriptor()
         .getCoreContainer().getZkController().getCloudState();
@@ -722,14 +663,14 @@ public class DistributedUpdateProcessor 
     }
     
     Map<String,ZkNodeProps> shardMap = replicas.getShards();
-    List<String> urls = new ArrayList<String>(shardMap.size());
+    List<Url> urls = new ArrayList<Url>(shardMap.size());
 
     for (Entry<String,ZkNodeProps> entry : shardMap.entrySet()) {
       ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue());
       String nodeName = nodeProps.getNodeName();
       if (cloudState.liveNodesContain(nodeName) && !nodeName.equals(thisNodeName)) {
         String replicaUrl = nodeProps.getCoreUrl();
-        urls.add(replicaUrl);
+        urls.add(new StdUrl(replicaUrl));
       }
     }
     if (urls.size() == 0) {
@@ -738,10 +679,10 @@ public class DistributedUpdateProcessor 
     return urls;
   }
   
-  private List<String> getReplicaUrls(SolrQueryRequest req, String collection, String shardZkNodeName) {
+  private List<Url> getReplicaUrls(SolrQueryRequest req, String collection, String shardZkNodeName) {
     CloudState cloudState = req.getCore().getCoreDescriptor()
         .getCoreContainer().getZkController().getCloudState();
-    List<String> urls = new ArrayList<String>();
+    List<Url> urls = new ArrayList<Url>();
     Map<String,Slice> slices = cloudState.getSlices(collection);
     if (slices == null) {
       throw new ZooKeeperException(ErrorCode.BAD_REQUEST,
@@ -756,7 +697,7 @@ public class DistributedUpdateProcessor 
         ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue());
         if (cloudState.liveNodesContain(nodeProps.getNodeName()) && !entry.getKey().equals(shardZkNodeName)) {
           String replicaUrl = nodeProps.getCoreUrl();
-          urls.add(replicaUrl);
+          urls.add(new StdUrl(replicaUrl));
         }
       }
     }
@@ -778,4 +719,45 @@ public class DistributedUpdateProcessor 
     BytesRef br = cmd.getIndexedId();
     return Hash.murmurhash3_x86_32(br.bytes, br.offset, br.length, 0);
   }
+  
+  public static class RetryUrl extends Url {
+    String url;
+    private ZkStateReader zkStateReader;
+    private String collection;
+    private String shardId;
+    
+    public RetryUrl(String url, ZkStateReader zkStateReader, String collection, String shardId) {
+      this.url = url;
+      this.zkStateReader = zkStateReader;
+      this.collection = collection;
+      this.shardId = shardId;
+    }
+    
+    @Override
+    public String getUrl() {
+      return url;
+    }
+    
+    @Override
+    public String toString() {
+      return url;
+    }
+
+    @Override
+    public boolean checkRetry() {
+      ZkCoreNodeProps leaderProps;
+      try {
+        leaderProps = new ZkCoreNodeProps(zkStateReader.getLeaderProps(
+            collection, shardId));
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        return false;
+      }
+      
+      this.url = leaderProps.getCoreUrl();
+
+      return true;
+    }
+  }
+  
 }

Modified: lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java?rev=1225998&r1=1225997&r2=1225998&view=diff
==============================================================================
--- lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java (original)
+++ lucene/dev/branches/solrcloud/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java Sat Dec 31 00:36:54 2011
@@ -18,6 +18,7 @@ package org.apache.solr.update;
  */
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.solr.BaseDistributedSearchTestCase;
@@ -28,6 +29,8 @@ import org.apache.solr.common.SolrDocume
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.update.SolrCmdDistributor.Response;
+import org.apache.solr.update.SolrCmdDistributor.Url;
+import org.apache.solr.update.SolrCmdDistributor.StdUrl;
 
 public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
   
@@ -74,14 +77,16 @@ public class SolrCmdDistributorTest exte
   
   @Override
   public void doTest() throws Exception {
-    del("*:*");
+    //del("*:*");
     
     SolrCmdDistributor cmdDistrib = new SolrCmdDistributor();
     
     ModifiableSolrParams params = new ModifiableSolrParams();
-    List<String> urls = new ArrayList<String>();
+    List<Url> urls = new ArrayList<Url>();
 
-    urls.add(((CommonsHttpSolrServer) controlClient).getBaseURL());
+    urls.add(new StdUrl(((CommonsHttpSolrServer) controlClient).getBaseURL()));
+    
+    // add one doc to controlClient
     
     AddUpdateCommand cmd = new AddUpdateCommand(null);
     cmd.solrDoc = getSolrDoc("id", 1);
@@ -89,7 +94,8 @@ public class SolrCmdDistributorTest exte
     
     CommitUpdateCommand ccmd = new CommitUpdateCommand(null, false);
     cmdDistrib.distribCommit(ccmd, urls, params);
-    Response response = cmdDistrib.finish(urls, params);
+    cmdDistrib.finish(urls);
+    Response response = cmdDistrib.getResponse();
     
     assertEquals(response.errors.toString(), 0, response.errors.size());
     
@@ -98,7 +104,9 @@ public class SolrCmdDistributorTest exte
     assertEquals(1, numFound);
     
     CommonsHttpSolrServer client2 = (CommonsHttpSolrServer) clients.get(0);
-    urls.add(client2.getBaseURL());
+    urls.add(new StdUrl(client2.getBaseURL()));
+    
+    // add another 3 docs to both control and client1
     
     cmd.solrDoc = getSolrDoc("id", 2);
     cmdDistrib.distribAdd(cmd, urls, params);
@@ -108,18 +116,26 @@ public class SolrCmdDistributorTest exte
 
     cmdDistrib.distribAdd(cmd2, urls, params);
     
+    AddUpdateCommand cmd3 = new AddUpdateCommand(null);
+    cmd3.solrDoc = getSolrDoc("id", 4);
+    
+    cmdDistrib.distribAdd(cmd3, Collections.singletonList(urls.get(0)), params);
+    
     cmdDistrib.distribCommit(ccmd, urls, params);
-    response = cmdDistrib.finish(urls, params);
+    cmdDistrib.finish(urls);
+    response = cmdDistrib.getResponse();
     
     assertEquals(response.errors.toString(), 0, response.errors.size());
     
     SolrDocumentList results = controlClient.query(new SolrQuery("*:*")).getResults();
     numFound = results.getNumFound();
-    assertEquals(results.toString(), 3, numFound);
+    assertEquals(results.toString(), 4, numFound);
     
     numFound = client2.query(new SolrQuery("*:*")).getResults()
         .getNumFound();
-    assertEquals(2, numFound);
+    assertEquals(3, numFound);
+    
+    // now delete doc 2 which is on both control and client1
     
     DeleteUpdateCommand dcmd = new DeleteUpdateCommand(null);
     dcmd.id = "2";
@@ -127,17 +143,18 @@ public class SolrCmdDistributorTest exte
     cmdDistrib.distribDelete(dcmd, urls, params);
     
     cmdDistrib.distribCommit(ccmd, urls, params);
-    response = cmdDistrib.finish(urls, params);
+    cmdDistrib.finish(urls);
+    response = cmdDistrib.getResponse();
     
     assertEquals(response.errors.toString(), 0, response.errors.size());
     
     results = controlClient.query(new SolrQuery("*:*")).getResults();
     numFound = results.getNumFound();
-    assertEquals(results.toString(), 2, numFound);
+    assertEquals(results.toString(), 3, numFound);
     
     numFound = client2.query(new SolrQuery("*:*")).getResults()
         .getNumFound();
-    assertEquals(results.toString(), 1, numFound);
+    assertEquals(results.toString(), 2, numFound);
   }
   
   protected void addFields(SolrInputDocument doc, Object... fields) {