You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2013/10/19 00:15:35 UTC

svn commit: r1533652 - in /lucene/dev/branches/branch_4x: ./ solr/ solr/core/ solr/core/src/java/org/apache/solr/core/ solr/core/src/java/org/apache/solr/update/ solr/core/src/java/org/apache/solr/update/processor/ solr/core/src/test/org/apache/solr/cl...

Author: markrmiller
Date: Fri Oct 18 22:15:35 2013
New Revision: 1533652

URL: http://svn.apache.org/r1533652
Log:
SOLR-5216: Document updates to SolrCloud can cause a distributed deadlock.
SOLR-5232: SolrCloud should distribute updates via streaming rather than buffering.

Added:
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java
      - copied unchanged from r1533649, lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java
Modified:
    lucene/dev/branches/branch_4x/   (props changed)
    lucene/dev/branches/branch_4x/solr/   (props changed)
    lucene/dev/branches/branch_4x/solr/CHANGES.txt   (contents, props changed)
    lucene/dev/branches/branch_4x/solr/core/   (props changed)
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
    lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
    lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
    lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
    lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
    lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java
    lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/CustomCollectionTest.java
    lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
    lucene/dev/branches/branch_4x/solr/solrj/   (props changed)
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java
    lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java

Modified: lucene/dev/branches/branch_4x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/CHANGES.txt?rev=1533652&r1=1533651&r2=1533652&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_4x/solr/CHANGES.txt Fri Oct 18 22:15:35 2013
@@ -67,7 +67,16 @@ New Features
 
 Bug Fixes
 ----------------------
+  
+* SOLR-5216: Document updates to SolrCloud can cause a distributed deadlock.
+  (Mark Miller)
+  
+Optimizations
+----------------------  
 
+* SOLR-5232: SolrCloud should distribute updates via streaming rather than buffering.
+  (Mark Miller)
+  
 Security
 ----------------------
 

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java?rev=1533652&r1=1533651&r2=1533652&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/core/CoreContainer.java Fri Oct 18 22:15:35 2013
@@ -17,6 +17,27 @@
 
 package org.apache.solr.core;
 
+import com.google.common.collect.Maps;
+
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.ZkSolrResourceLoader;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
+import org.apache.solr.handler.admin.CollectionsHandler;
+import org.apache.solr.handler.admin.CoreAdminHandler;
+import org.apache.solr.handler.admin.InfoHandler;
+import org.apache.solr.handler.component.ShardHandlerFactory;
+import org.apache.solr.logging.LogWatcher;
+import org.apache.solr.schema.IndexSchema;
+import org.apache.solr.schema.IndexSchemaFactory;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.apache.solr.util.FileUtils;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.io.File;
@@ -86,6 +107,9 @@ public class CoreContainer {
 
   protected ZkContainer zkSys = new ZkContainer();
   private ShardHandlerFactory shardHandlerFactory;
+  
+  private ExecutorService updateExecutor = Executors.newCachedThreadPool(
+      new SolrjNamedThreadFactory("updateExecutor"));
 
   protected LogWatcher logging = null;
 
@@ -379,6 +403,8 @@ public class CoreContainer {
         shardHandlerFactory.close();
       }
       
+      ExecutorUtil.shutdownAndAwaitTermination(updateExecutor);
+      
       // we want to close zk stuff last
 
       zkSys.close();
@@ -924,6 +950,10 @@ public class CoreContainer {
     return shardHandlerFactory;
   }
   
+  public ExecutorService getUpdateExecutor() {
+    return updateExecutor;
+  }
+  
   // Just to tidy up the code where it did this in-line.
   private SolrException recordAndThrow(String name, String msg, Exception ex) {
     synchronized (coreInitFailures) {

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1533652&r1=1533651&r2=1533652&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Fri Oct 18 22:15:35 2013
@@ -19,18 +19,10 @@ package org.apache.solr.update;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ExecutorService;
 
+import org.apache.solr.client.solrj.SolrServer;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.HttpSolrServer;
 import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
@@ -38,11 +30,10 @@ import org.apache.solr.client.solrj.requ
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.Diagnostics;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.util.AdjustableSemaphore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,220 +41,139 @@ import org.slf4j.LoggerFactory;
 public class SolrCmdDistributor {
   private static final int MAX_RETRIES_ON_FORWARD = 15;
   public static Logger log = LoggerFactory.getLogger(SolrCmdDistributor.class);
-
-  static AdjustableSemaphore semaphore = new AdjustableSemaphore(8);
-  
-  CompletionService<Request> completionService;
-  Set<Future<Request>> pending;
-  
-  int maxBufferedAddsPerServer = 10;
-  int maxBufferedDeletesPerServer = 10;
-
-  private Response response = new Response();
-  
-  private final Map<Node,List<AddRequest>> adds = new HashMap<Node,List<AddRequest>>();
-  private final Map<Node,List<DeleteRequest>> deletes = new HashMap<Node,List<DeleteRequest>>();
-  private UpdateShardHandler updateShardHandler;
   
-  class AddRequest {
-    AddUpdateCommand cmd;
-    ModifiableSolrParams params;
-  }
+  private StreamingSolrServers servers;
   
-  class DeleteRequest {
-    DeleteUpdateCommand cmd;
-    ModifiableSolrParams params;
-  }
+  private List<Error> allErrors = new ArrayList<Error>();
+  private List<Error> errors = new ArrayList<Error>();
   
   public static interface AbortCheck {
     public boolean abortCheck();
   }
   
-  public SolrCmdDistributor(int numHosts, UpdateShardHandler updateShardHandler) {
-    int maxPermits = Math.max(16, numHosts * 16);
-    // limits how many tasks can actually execute at once
-    if (maxPermits != semaphore.getMaxPermits()) {
-      semaphore.setMaxPermits(maxPermits);
-    }
-    
-    this.updateShardHandler = updateShardHandler;
-    completionService = new ExecutorCompletionService<Request>(updateShardHandler.getCmdDistribExecutor());
-    pending = new HashSet<Future<Request>>();
+  public SolrCmdDistributor(ExecutorService updateExecutor) {
+    servers = new StreamingSolrServers(updateExecutor);
   }
   
   public void finish() {
+    servers.blockUntilFinished();
+    doRetriesIfNeeded();
+  
+    servers.shutdown();
+  }
 
-    flushAdds(1);
-    flushDeletes(1);
+  private void doRetriesIfNeeded() {
+    // NOTE: retries will be forwards to a single url
+    
+    List<Error> errors = new ArrayList<Error>(this.errors);
+    errors.addAll(servers.getErrors());
+    allErrors.addAll(errors);
+    boolean blockUntilFinishedAgain = false;
+    for (Error err : errors) {
+      String oldNodeUrl = err.req.node.getUrl();
+      
+      // if there is a retry url, we want to retry...
+      boolean isRetry = err.req.node.checkRetry();
+      boolean doRetry = false;
+      int rspCode = err.statusCode;
+      
+      if (testing_errorHook != null) Diagnostics.call(testing_errorHook, err.e);
+      
+      // this can happen in certain situations such as shutdown
+      if (isRetry) {
+        if (rspCode == 404 || rspCode == 403 || rspCode == 503
+            || rspCode == 500) {
+          doRetry = true;
+        }
+        
+        // if its an ioexception, lets try again
+        if (err.e instanceof IOException) {
+          doRetry = true;
+        } else if (err.e instanceof SolrServerException) {
+          if (((SolrServerException) err.e).getRootCause() instanceof IOException) {
+            doRetry = true;
+          }
+        }
+      }
+      
+      if (isRetry && err.req.retries < MAX_RETRIES_ON_FORWARD && doRetry) {
+        err.req.retries++;
 
-    checkResponses(true);
-  }
-  
-  public void distribDelete(DeleteUpdateCommand cmd, List<Node> urls, ModifiableSolrParams params) throws IOException {
-    checkResponses(false);
+        SolrException.log(SolrCmdDistributor.log, "forwarding update to "
+            + oldNodeUrl + " failed - retrying ... ");
+        try {
+          Thread.sleep(500);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          log.warn(null, e);
+        }
+        
+        submit(err.req);
+        blockUntilFinishedAgain = true;
+      }
+    }
+    
+    servers.clearErrors();
+    this.errors.clear();
     
-    if (cmd.isDeleteById()) {
-      doDelete(cmd, urls, params);
-    } else {
-      doDelete(cmd, urls, params);
+    if (blockUntilFinishedAgain) {
+      servers.blockUntilFinished();
+      doRetriesIfNeeded();
     }
   }
   
-  public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params) throws IOException {
-    checkResponses(false);
+  public void distribDelete(DeleteUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params) throws IOException {
+    distribDelete(cmd, nodes, params, false);
+  }
+  
+  public void distribDelete(DeleteUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params, boolean sync) throws IOException {
     
-    // make sure any pending deletes are flushed
-    flushDeletes(1);
-
-    // TODO: this is brittle
-    // need to make a clone since these commands may be reused
-    AddUpdateCommand clone = new AddUpdateCommand(null);
-
-    clone.solrDoc = cmd.solrDoc;
-    clone.commitWithin = cmd.commitWithin;
-    clone.overwrite = cmd.overwrite;
-    clone.setVersion(cmd.getVersion());
-    AddRequest addRequest = new AddRequest();
-    addRequest.cmd = clone;
-    addRequest.params = params;
-
     for (Node node : nodes) {
-      List<AddRequest> alist = adds.get(node);
-      if (alist == null) {
-        alist = new ArrayList<AddRequest>(2);
-        adds.put(node, alist);
+      UpdateRequest uReq = new UpdateRequest();
+      uReq.setParams(params);
+      if (cmd.isDeleteById()) {
+        uReq.deleteById(cmd.getId(), cmd.getVersion());
+      } else {
+        uReq.deleteByQuery(cmd.query);
       }
-      alist.add(addRequest);
+      
+      submit(new Req(node, uReq, sync));
     }
-
-    flushAdds(maxBufferedAddsPerServer);
   }
-
-  /**
-   * Synchronous (blocking) add to specified node. Any error returned from node is propagated.
-   */
-  public void syncAdd(AddUpdateCommand cmd, Node node, ModifiableSolrParams params) throws IOException {
-    log.info("SYNCADD on {} : {}", node, cmd.getPrintableId());
-    checkResponses(false);
-    // flush all pending deletes
-    flushDeletes(1);
-    // flush all pending adds
-    flushAdds(1);
-    // finish with the pending requests
-    checkResponses(false);
-
-    UpdateRequest ureq = new UpdateRequest();
-    ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
-
-    ureq.setParams(params);
-    syncRequest(node, ureq);
-  }
-
-  public void syncDelete(DeleteUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params) throws IOException {
-    log.info("SYNCDELETE on {} : ", nodes, cmd);
-    checkResponses(false);
-    // flush all pending adds
-    flushAdds(1);
-    // flush all pending deletes
-    flushDeletes(1);
-    // finish pending requests
-    checkResponses(false);
-
-    DeleteUpdateCommand clonedCmd = clone(cmd);
-    DeleteRequest deleteRequest = new DeleteRequest();
-    deleteRequest.cmd = clonedCmd;
-    deleteRequest.params = params;
-
-    UpdateRequest ureq = new UpdateRequest();
-    if (cmd.isDeleteById()) {
-      ureq.deleteById(cmd.getId(), cmd.getVersion());
-    } else {
-      ureq.deleteByQuery(cmd.query);
-    }
-    ureq.setParams(params);
-    for (Node node : nodes) {
-      syncRequest(node, ureq);
-    }
+  
+  public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params) throws IOException {
+    distribAdd(cmd, nodes, params, false);
   }
+  
+  public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params, boolean synchronous) throws IOException {
 
-  private void syncRequest(Node node, UpdateRequest ureq) {
-    Request sreq = new Request();
-    sreq.node = node;
-    sreq.ureq = ureq;
-
-    String url = node.getUrl();
-    String fullUrl;
-    if (!url.startsWith("http://") && !url.startsWith("https://")) {
-      fullUrl = "http://" + url;
-    } else {
-      fullUrl = url;
-    }
-
-    HttpSolrServer server = new HttpSolrServer(fullUrl,
-        updateShardHandler.getHttpClient());
-
-    try {
-      sreq.ursp = server.request(ureq);
-    } catch (Exception e) {
-      throw new SolrException(ErrorCode.SERVER_ERROR, "Failed synchronous update on shard " + sreq.node + " update: " + ureq , e);
+    for (Node node : nodes) {
+      UpdateRequest uReq = new UpdateRequest();
+      uReq.setParams(params);
+      uReq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
+      submit(new Req(node, uReq, synchronous));
     }
+    
   }
 
   public void distribCommit(CommitUpdateCommand cmd, List<Node> nodes,
       ModifiableSolrParams params) throws IOException {
     
-    // make sure we are ordered
-    flushAdds(1);
-    flushDeletes(1);
-
-    
-    // Wait for all outstanding responses to make sure that a commit
-    // can't sneak in ahead of adds or deletes we already sent.
-    // We could do this on a per-server basis, but it's more complex
-    // and this solution will lead to commits happening closer together.
-    checkResponses(true);
-    
-    // currently, we dont try to piggy back on outstanding adds or deletes
-    
-    UpdateRequest ureq = new UpdateRequest();
-    ureq.setParams(params);
-    
-    addCommit(ureq, cmd);
-    
-    log.info("Distrib commit to:" + nodes + " params:" + params);
+    // we need to do any retries before commit...
+    servers.blockUntilFinished();
+    doRetriesIfNeeded();
     
-    for (Node node : nodes) {
-      submit(ureq, node);
-    }
-    
-    // if the command wanted to block until everything was committed,
-    // then do that here.
+    UpdateRequest uReq = new UpdateRequest();
+    uReq.setParams(params);
     
-    if (cmd.waitSearcher) {
-      checkResponses(true);
-    }
-  }
-  
-  private void doDelete(DeleteUpdateCommand cmd, List<Node> nodes,
-      ModifiableSolrParams params) {
+    addCommit(uReq, cmd);
     
-    flushAdds(1);
+    log.debug("Distrib commit to:" + nodes + " params:" + params);
     
-    DeleteUpdateCommand clonedCmd = clone(cmd);
-    DeleteRequest deleteRequest = new DeleteRequest();
-    deleteRequest.cmd = clonedCmd;
-    deleteRequest.params = params;
     for (Node node : nodes) {
-      List<DeleteRequest> dlist = deletes.get(node);
-      
-      if (dlist == null) {
-        dlist = new ArrayList<DeleteRequest>(2);
-        deletes.put(node, dlist);
-      }
-      dlist.add(deleteRequest);
+      submit(new Req(node, uReq, false));
     }
     
-    flushDeletes(maxBufferedDeletesPerServer);
   }
   
   void addCommit(UpdateRequest ureq, CommitUpdateCommand cmd) {
@@ -271,248 +181,63 @@ public class SolrCmdDistributor {
     ureq.setAction(cmd.optimize ? AbstractUpdateRequest.ACTION.OPTIMIZE
         : AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher, cmd.maxOptimizeSegments, cmd.softCommit, cmd.expungeDeletes);
   }
-  
-  boolean flushAdds(int limit) {
-    // check for pending deletes
-  
-    Set<Node> removeNodes = new HashSet<Node>();
-    Set<Node> nodes = adds.keySet();
- 
-    for (Node node : nodes) {
-      List<AddRequest> alist = adds.get(node);
-      if (alist == null || alist.size() < limit) continue;
-  
-      UpdateRequest ureq = new UpdateRequest();
-      
-      ModifiableSolrParams combinedParams = new ModifiableSolrParams();
 
-      for (AddRequest aReq : alist) {
-        AddUpdateCommand cmd = aReq.cmd;
-        combinedParams.add(aReq.params);
-        ureq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
-      }
+  private void submit(Req req) {
+    if (req.synchronous) {
+      servers.blockUntilFinished();
+      doRetriesIfNeeded();
       
-      if (ureq.getParams() == null) ureq.setParams(new ModifiableSolrParams());
-      ureq.getParams().add(combinedParams);
+      HttpSolrServer server = new HttpSolrServer(req.node.getUrl(),
+          servers.getHttpClient());
 
-      removeNodes.add(node);
-      
-      submit(ureq, node);
-    }
-    
-    for (Node node : removeNodes) {
-      adds.remove(node);
-    }
-    
-    return true;
-  }
-  
-  boolean flushDeletes(int limit) {
-    // check for pending deletes
- 
-    Set<Node> removeNodes = new HashSet<Node>();
-    Set<Node> nodes = deletes.keySet();
-    for (Node node : nodes) {
-      List<DeleteRequest> dlist = deletes.get(node);
-      if (dlist == null || dlist.size() < limit) continue;
-      UpdateRequest ureq = new UpdateRequest();
-      
-      ModifiableSolrParams combinedParams = new ModifiableSolrParams();
-      
-      for (DeleteRequest dReq : dlist) {
-        DeleteUpdateCommand cmd = dReq.cmd;
-        combinedParams.add(dReq.params);
-        if (cmd.isDeleteById()) {
-          ureq.deleteById(cmd.getId(), cmd.getVersion());
-        } else {
-          ureq.deleteByQuery(cmd.query);
-        }
-        
-        if (ureq.getParams() == null) ureq
-            .setParams(new ModifiableSolrParams());
-        ureq.getParams().add(combinedParams);
+      try {
+        server.request(req.uReq);
+      } catch (Exception e) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, "Failed synchronous update on shard " + req.node + " update: " + req.uReq , e);
       }
-      
-      removeNodes.add(node);
-      submit(ureq, node);
+      return;
     }
     
-    for (Node node : removeNodes) {
-      deletes.remove(node);
+    SolrServer solrServer = servers.getSolrServer(req);
+    try {
+      NamedList<Object> rsp = solrServer.request(req.uReq);
+    } catch (Exception e) {
+      SolrException.log(log, e);
+      Error error = new Error();
+      error.e = e;
+      error.req = req;
+      if (e instanceof SolrException) {
+        error.statusCode = ((SolrException) e).code();
+      }
+      errors.add(error);
     }
-    
-    return true;
-  }
-  
-  private DeleteUpdateCommand clone(DeleteUpdateCommand cmd) {
-    DeleteUpdateCommand c = (DeleteUpdateCommand)cmd.clone();
-    // TODO: shouldnt the clone do this?
-    c.setFlags(cmd.getFlags());
-    c.setVersion(cmd.getVersion());
-    return c;
   }
   
-  public static class Request {
+  public static class Req {
     public Node node;
-    UpdateRequest ureq;
-    NamedList<Object> ursp;
-    int rspCode;
-    public Exception exception;
-    int retries;
-  }
-  
-  void submit(UpdateRequest ureq, Node node) {
-    Request sreq = new Request();
-    sreq.node = node;
-    sreq.ureq = ureq;
-    submit(sreq);
-  }
-  
-  public void submit(final Request sreq) {
-
-    final String url = sreq.node.getUrl();
-
-    Callable<Request> task = new Callable<Request>() {
-      @Override
-      public Request call() throws Exception {
-        Request clonedRequest = null;
-        try {
-          clonedRequest = new Request();
-          clonedRequest.node = sreq.node;
-          clonedRequest.ureq = sreq.ureq;
-          clonedRequest.retries = sreq.retries;
-          
-          String fullUrl;
-          if (!url.startsWith("http://") && !url.startsWith("https://")) {
-            fullUrl = "http://" + url;
-          } else {
-            fullUrl = url;
-          }
-  
-          HttpSolrServer server = new HttpSolrServer(fullUrl,
-              updateShardHandler.getHttpClient());
-          
-          if (Thread.currentThread().isInterrupted()) {
-            clonedRequest.rspCode = 503;
-            clonedRequest.exception = new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Shutting down.");
-            return clonedRequest;
-          }
-          
-          clonedRequest.ursp = server.request(clonedRequest.ureq);
-          
-          // currently no way to get the request body.
-        } catch (Exception e) {
-          clonedRequest.exception = e;
-          if (e instanceof SolrException) {
-            clonedRequest.rspCode = ((SolrException) e).code();
-          } else {
-            clonedRequest.rspCode = -1;
-          }
-        } finally {
-          semaphore.release();
-        }
-        return clonedRequest;
-      }
-    };
-    try {
-      semaphore.acquire();
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Update thread interrupted", e);
+    public UpdateRequest uReq;
+    public int retries;
+    public boolean synchronous;
+    
+    public Req(Node node, UpdateRequest uReq, boolean synchronous) {
+      this.node = node;
+      this.uReq = uReq;
+      this.synchronous = synchronous;
     }
-    try {
-      pending.add(completionService.submit(task));
-    } catch (RejectedExecutionException e) {
-      semaphore.release();
-      throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Shutting down", e);
-    }
-    
   }
+    
 
   public static Diagnostics.Callable testing_errorHook;  // called on error when forwarding request.  Currently data=[this, Request]
 
-  void checkResponses(boolean block) {
-
-    while (pending != null && pending.size() > 0) {
-      try {
-        Future<Request> future = block ? completionService.take()
-            : completionService.poll();
-        if (future == null) return;
-        pending.remove(future);
-        
-        try {
-          Request sreq = future.get();
-          if (sreq.rspCode != 0) {
-            // error during request
-
-            if (testing_errorHook != null) Diagnostics.call(testing_errorHook, this, sreq);
-
-            // if there is a retry url, we want to retry...
-            boolean isRetry = sreq.node.checkRetry();
-            boolean doRetry = false;
-            int rspCode = sreq.rspCode;
-            
-            // this can happen in certain situations such as shutdown
-            if (isRetry) {
-              if (rspCode == 404 || rspCode == 403 || rspCode == 503
-                  || rspCode == 500) {
-                doRetry = true;
-              }
-              
-              // if its an ioexception, lets try again
-              if (sreq.exception instanceof IOException) {
-                doRetry = true;
-              } else if (sreq.exception instanceof SolrServerException) {
-                if (((SolrServerException) sreq.exception).getRootCause() instanceof IOException) {
-                  doRetry = true;
-                }
-              }
-            }
-            
-            if (isRetry && sreq.retries < MAX_RETRIES_ON_FORWARD && doRetry) {
-              sreq.retries++;
-              sreq.rspCode = 0;
-              sreq.exception = null;
-              SolrException.log(SolrCmdDistributor.log, "forwarding update to " + sreq.node.getUrl() + " failed - retrying ... ");
-              Thread.sleep(500);
-              submit(sreq);
-            } else {
-              Exception e = sreq.exception;
-              Error error = new Error();
-              error.e = e;
-              error.node = sreq.node;
-              response.errors.add(error);
-              response.sreq = sreq;
-              SolrException.log(SolrCmdDistributor.log, "shard update error "
-                  + sreq.node, sreq.exception);
-            }
-          }
-          
-        } catch (ExecutionException e) {
-          // shouldn't happen since we catch exceptions ourselves
-          SolrException.log(SolrCore.log,
-              "error sending update request to shard", e);
-        }
-        
-      } catch (InterruptedException e) {
-        throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
-            "interrupted waiting for shard update response", e);
-      }
-    }
-  }
   
   public static class Response {
-    public Request sreq;
     public List<Error> errors = new ArrayList<Error>();
   }
   
   public static class Error {
-    public Node node;
     public Exception e;
-  }
-
-  public Response getResponse() {
-    return response;
+    public int statusCode;
+    public Req req;
   }
   
   public static abstract class Node {
@@ -595,6 +320,64 @@ public class SolrCmdDistributor {
     }
   }
   
+  // RetryNodes are used in the case of 'forward to leader' where we want
+  // to try the latest leader on a fail in the case the leader just went down.
+  public static class RetryNode extends StdNode {
+    
+    private ZkStateReader zkStateReader;
+    private String collection;
+    private String shardId;
+    
+    public RetryNode(ZkCoreNodeProps nodeProps, ZkStateReader zkStateReader, String collection, String shardId) {
+      super(nodeProps);
+      this.zkStateReader = zkStateReader;
+      this.collection = collection;
+      this.shardId = shardId;
+    }
+
+    @Override
+    public boolean checkRetry() {
+      ZkCoreNodeProps leaderProps;
+      try {
+        leaderProps = new ZkCoreNodeProps(zkStateReader.getLeaderRetry(
+            collection, shardId));
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        return false;
+      }
+      
+      this.nodeProps = leaderProps;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = super.hashCode();
+      result = prime * result
+          + ((collection == null) ? 0 : collection.hashCode());
+      result = prime * result + ((shardId == null) ? 0 : shardId.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) return true;
+      if (!super.equals(obj)) return false;
+      if (getClass() != obj.getClass()) return false;
+      RetryNode other = (RetryNode) obj;
+      if (nodeProps.getCoreUrl() == null) {
+        if (other.nodeProps.getCoreUrl() != null) return false;
+      } else if (!nodeProps.getCoreUrl().equals(other.nodeProps.getCoreUrl())) return false;
+
+      return true;
+    }
+  }
+
+  public List<Error> getErrors() {
+    return allErrors;
+  }
 }
 
 

Modified: lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1533652&r1=1533651&r2=1533652&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Fri Oct 18 22:15:35 2013
@@ -53,8 +53,9 @@ import org.apache.solr.update.AddUpdateC
 import org.apache.solr.update.CommitUpdateCommand;
 import org.apache.solr.update.DeleteUpdateCommand;
 import org.apache.solr.update.SolrCmdDistributor;
+import org.apache.solr.update.SolrCmdDistributor.Error;
 import org.apache.solr.update.SolrCmdDistributor.Node;
-import org.apache.solr.update.SolrCmdDistributor.Response;
+import org.apache.solr.update.SolrCmdDistributor.RetryNode;
 import org.apache.solr.update.SolrCmdDistributor.StdNode;
 import org.apache.solr.update.UpdateCommand;
 import org.apache.solr.update.UpdateHandler;
@@ -68,6 +69,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -142,8 +144,6 @@ public class DistributedUpdateProcessor 
   private boolean isSubShardLeader = false;
   private List<Node> nodes;
 
-  private int numNodes;
-
   private UpdateCommand updateCommand;  // the current command this processor is working on.
 
   
@@ -171,8 +171,7 @@ public class DistributedUpdateProcessor 
     this.zkEnabled  = coreDesc.getCoreContainer().isZooKeeperAware();
     zkController = req.getCore().getCoreDescriptor().getCoreContainer().getZkController();
     if (zkEnabled) {
-      numNodes =  zkController.getZkStateReader().getClusterState().getLiveNodes().size();
-      cmdDistrib = new SolrCmdDistributor(numNodes, coreDesc.getCoreContainer().getZkController().getUpdateShardHandler());
+      cmdDistrib = new SolrCmdDistributor(coreDesc.getCoreContainer().getUpdateExecutor());
     }
     //this.rsp = reqInfo != null ? reqInfo.getRsp() : null;
 
@@ -199,7 +198,6 @@ public class DistributedUpdateProcessor 
       String coreName = req.getCore().getName();
 
       ClusterState cstate = zkController.getClusterState();
-      numNodes = cstate.getLiveNodes().size();
       DocCollection coll = cstate.getCollection(collection);
       Slice slice = coll.getRouter().getTargetSlice(id, doc, req.getParams(), coll);
 
@@ -451,7 +449,7 @@ public class DistributedUpdateProcessor 
             zkController.getBaseUrl(), req.getCore().getName()));
         params.set("distrib.from.parent", req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId());
         for (Node subShardLeader : subShardLeaders) {
-          cmdDistrib.syncAdd(cmd, subShardLeader, params);
+          cmdDistrib.distribAdd(cmd, Collections.singletonList(subShardLeader), params, true);
         }
       }
     }
@@ -497,16 +495,16 @@ public class DistributedUpdateProcessor 
     // send in a background thread
 
     cmdDistrib.finish();
-    Response response = cmdDistrib.getResponse();
+    List<Error> errors = cmdDistrib.getErrors();
     // TODO - we may need to tell about more than one error...
     
     // if its a forward, any fail is a problem - 
     // otherwise we assume things are fine if we got it locally
     // until we start allowing min replication param
-    if (response.errors.size() > 0) {
+    if (errors.size() > 0) {
       // if one node is a RetryNode, this was a forward request
-      if (response.errors.get(0).node instanceof RetryNode) {
-        rsp.setException(response.errors.get(0).e);
+      if (errors.get(0).req.node instanceof RetryNode) {
+        rsp.setException(errors.get(0).e);
       }
       // else
       // for now we don't error - we assume if it was added locally, we
@@ -519,8 +517,8 @@ public class DistributedUpdateProcessor 
     // legit
 
     // TODO: we should do this in the background it would seem
-    for (SolrCmdDistributor.Error error : response.errors) {
-      if (error.node instanceof RetryNode) {
+    for (SolrCmdDistributor.Error error : errors) {
+      if (error.req.node instanceof RetryNode) {
         // we don't try to force a leader to recover
         // when we cannot forward to it
         continue;
@@ -530,7 +528,7 @@ public class DistributedUpdateProcessor 
       // TODO: do retries??
       // TODO: what if its is already recovering? Right now recoveries queue up -
       // should they?
-      String recoveryUrl = error.node.getBaseUrl();
+      String recoveryUrl = error.req.node.getBaseUrl();
       HttpSolrServer server;
       log.info("try and ask " + recoveryUrl + " to recover");
       try {
@@ -540,7 +538,7 @@ public class DistributedUpdateProcessor 
         
         RequestRecovery recoverRequestCmd = new RequestRecovery();
         recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
-        recoverRequestCmd.setCoreName(error.node.getCoreName());
+        recoverRequestCmd.setCoreName(error.req.node.getCoreName());
         
         server.request(recoverRequestCmd);
       } catch (Exception e) {
@@ -838,7 +836,7 @@ public class DistributedUpdateProcessor 
         params.set("distrib.from", ZkCoreNodeProps.getCoreUrl(
             zkController.getBaseUrl(), req.getCore().getName()));
         params.set("distrib.from.parent", cloudDesc.getShardId());
-        cmdDistrib.syncDelete(cmd, subShardLeaders, params);
+        cmdDistrib.distribDelete(cmd, subShardLeaders, params, true);
       }
     }
 
@@ -1061,7 +1059,7 @@ public class DistributedUpdateProcessor 
       if (leaderLogic) {
         List<Node> subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), null, null);
         if (subShardLeaders != null)  {
-          cmdDistrib.syncDelete(cmd, subShardLeaders, params);
+          cmdDistrib.distribDelete(cmd, subShardLeaders, params, true);
         }
         if (replicas != null) {
           cmdDistrib.distribDelete(cmd, replicas, params);
@@ -1286,61 +1284,6 @@ public class DistributedUpdateProcessor 
     }
     return urls;
   }
-  
-  // RetryNodes are used in the case of 'forward to leader' where we want
-  // to try the latest leader on a fail in the case the leader just went down.
-  public static class RetryNode extends StdNode {
-    
-    private ZkStateReader zkStateReader;
-    private String collection;
-    private String shardId;
-    
-    public RetryNode(ZkCoreNodeProps nodeProps, ZkStateReader zkStateReader, String collection, String shardId) {
-      super(nodeProps);
-      this.zkStateReader = zkStateReader;
-      this.collection = collection;
-      this.shardId = shardId;
-    }
-
-    @Override
-    public boolean checkRetry() {
-      ZkCoreNodeProps leaderProps;
-      try {
-        leaderProps = new ZkCoreNodeProps(zkStateReader.getLeaderRetry(
-            collection, shardId));
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        return false;
-      }
-      
-      this.nodeProps = leaderProps;
-
-      return true;
-    }
-
-    @Override
-    public int hashCode() {
-      final int prime = 31;
-      int result = super.hashCode();
-      result = prime * result
-          + ((collection == null) ? 0 : collection.hashCode());
-      result = prime * result + ((shardId == null) ? 0 : shardId.hashCode());
-      return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj) return true;
-      if (!super.equals(obj)) return false;
-      if (getClass() != obj.getClass()) return false;
-      RetryNode other = (RetryNode) obj;
-      if (nodeProps.getCoreUrl() == null) {
-        if (other.nodeProps.getCoreUrl() != null) return false;
-      } else if (!nodeProps.getCoreUrl().equals(other.nodeProps.getCoreUrl())) return false;
-
-      return true;
-    }
-  }
 
   /**
    * Returns a boolean indicating whether or not the caller should behave as

Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java?rev=1533652&r1=1533651&r2=1533652&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java Fri Oct 18 22:15:35 2013
@@ -65,7 +65,6 @@ import org.apache.solr.common.params.Mod
 import org.apache.solr.common.params.UpdateParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.update.DirectUpdateHandler2;
-import org.apache.solr.update.SolrCmdDistributor.Request;
 import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -105,8 +104,8 @@ public class BasicDistributedZkTest exte
       Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
       new DefaultSolrThreadFactory("testExecutor"));
   
-  CompletionService<Request> completionService;
-  Set<Future<Request>> pending;
+  CompletionService<Object> completionService;
+  Set<Future<Object>> pending;
   
   @BeforeClass
   public static void beforeThisClass2() throws Exception {
@@ -126,8 +125,8 @@ public class BasicDistributedZkTest exte
     
     sliceCount = 2;
     shardCount = 4;
-    completionService = new ExecutorCompletionService<Request>(executor);
-    pending = new HashSet<Future<Request>>();
+    completionService = new ExecutorCompletionService<Object>(executor);
+    pending = new HashSet<Future<Object>>();
     
   }
   
@@ -749,7 +748,7 @@ public class BasicDistributedZkTest exte
     
    while (pending != null && pending.size() > 0) {
       
-      Future<Request> future = completionService.take();
+      Future<Object> future = completionService.take();
       pending.remove(future);
     }
     
@@ -875,7 +874,7 @@ public class BasicDistributedZkTest exte
     
    while (pending != null && pending.size() > 0) {
       
-      Future<Request> future = completionService.take();
+      Future<Object> future = completionService.take();
       if (future == null) return;
       pending.remove(future);
     }
@@ -964,7 +963,7 @@ public class BasicDistributedZkTest exte
     
     while (pending != null && pending.size() > 0) {
       
-      Future<Request> future = completionService.take();
+      Future<Object> future = completionService.take();
       if (future == null) return;
       pending.remove(future);
     }
@@ -1077,7 +1076,7 @@ public class BasicDistributedZkTest exte
       pending.add(completionService.submit(call));
       while (pending != null && pending.size() > 0) {
         
-        Future<Request> future = completionService.take();
+        Future<Object> future = completionService.take();
         if (future == null) return;
         pending.remove(future);
       }

Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java?rev=1533652&r1=1533651&r2=1533652&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java Fri Oct 18 22:15:35 2013
@@ -21,7 +21,7 @@ import java.net.ConnectException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.lucene.util.LuceneTestCase.BadApple;
+import org.apache.http.client.HttpClient;
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.http.client.HttpClient;
 import org.apache.solr.client.solrj.SolrQuery;
@@ -52,10 +52,10 @@ public class ChaosMonkeyNothingIsSafeTes
     SolrCmdDistributor.testing_errorHook = new Diagnostics.Callable() {
       @Override
       public void call(Object... data) {
-        SolrCmdDistributor.Request sreq = (SolrCmdDistributor.Request)data[1];
-        if (sreq.exception == null) return;
-        if (sreq.exception.getMessage().contains("Timeout")) {
-          Diagnostics.logThreadDumps("REQUESTING THREAD DUMP DUE TO TIMEOUT: " + sreq.exception.getMessage());
+        Exception e = (Exception) data[0];
+        if (e == null) return;
+        if (e.getMessage().contains("Timeout")) {
+          Diagnostics.logThreadDumps("REQUESTING THREAD DUMP DUE TO TIMEOUT: " + e.getMessage());
         }
       }
     };

Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java?rev=1533652&r1=1533651&r2=1533652&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java Fri Oct 18 22:15:35 2013
@@ -44,10 +44,10 @@ public class  ChaosMonkeySafeLeaderTest 
     SolrCmdDistributor.testing_errorHook = new Diagnostics.Callable() {
       @Override
       public void call(Object... data) {
-        SolrCmdDistributor.Request sreq = (SolrCmdDistributor.Request)data[1];
-        if (sreq.exception == null) return;
-        if (sreq.exception.getMessage().contains("Timeout")) {
-          Diagnostics.logThreadDumps("REQUESTING THREAD DUMP DUE TO TIMEOUT: " + sreq.exception.getMessage());
+        Exception e = (Exception) data[0];
+        if (e == null) return;
+        if (e.getMessage().contains("Timeout")) {
+          Diagnostics.logThreadDumps("REQUESTING THREAD DUMP DUE TO TIMEOUT: " + e.getMessage());
         }
       }
     };

Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java?rev=1533652&r1=1533651&r2=1533652&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java Fri Oct 18 22:15:35 2013
@@ -80,7 +80,6 @@ import org.apache.solr.core.SolrInfoMBea
 import org.apache.solr.core.SolrResourceLoader;
 import org.apache.solr.servlet.SolrDispatchFilter;
 import org.apache.solr.update.DirectUpdateHandler2;
-import org.apache.solr.update.SolrCmdDistributor.Request;
 import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -98,8 +97,8 @@ public class CollectionsAPIDistributedZk
       Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
       new DefaultSolrThreadFactory("testExecutor"));
   
-  CompletionService<Request> completionService;
-  Set<Future<Request>> pending;
+  CompletionService<Object> completionService;
+  Set<Future<Object>> pending;
   
   // we randomly use a second config set rather than just one
   private boolean secondConfigSet = random().nextBoolean();
@@ -165,8 +164,8 @@ public class CollectionsAPIDistributedZk
     
     sliceCount = 2;
     shardCount = 4;
-    completionService = new ExecutorCompletionService<Request>(executor);
-    pending = new HashSet<Future<Request>>();
+    completionService = new ExecutorCompletionService<Object>(executor);
+    pending = new HashSet<Future<Object>>();
     checkCreatedVsState = false;
     
   }

Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/CustomCollectionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/CustomCollectionTest.java?rev=1533652&r1=1533651&r2=1533652&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/CustomCollectionTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/cloud/CustomCollectionTest.java Fri Oct 18 22:15:35 2013
@@ -17,12 +17,32 @@ package org.apache.solr.cloud;
  * limitations under the License.
  */
 
+import static org.apache.solr.cloud.OverseerCollectionProcessor.MAX_SHARDS_PER_NODE;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.ROUTER;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
+import static org.apache.solr.common.params.ShardParams._ROUTE_;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.lucene.util.Constants;
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.lucene.util._TestUtil;
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.SolrServer;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.CloudSolrServer;
 import org.apache.solr.client.solrj.impl.HttpSolrServer;
@@ -33,7 +53,6 @@ import org.apache.solr.common.SolrExcept
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.common.cloud.ImplicitDocRouter;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
@@ -43,32 +62,10 @@ import org.apache.solr.common.cloud.ZkSt
 import org.apache.solr.common.params.CollectionParams.CollectionAction;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.update.DirectUpdateHandler2;
-import org.apache.solr.update.SolrCmdDistributor.Request;
 import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.junit.Before;
 import org.junit.BeforeClass;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.solr.cloud.OverseerCollectionProcessor.MAX_SHARDS_PER_NODE;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.REPLICATION_FACTOR;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.ROUTER;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
-import static org.apache.solr.common.params.ShardParams._ROUTE_;
-
 /**
  * Tests the Custom Sharding API.
  */
@@ -82,8 +79,8 @@ public class CustomCollectionTest extend
       Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
       new DefaultSolrThreadFactory("testExecutor"));
 
-  CompletionService<Request> completionService;
-  Set<Future<Request>> pending;
+  CompletionService<Object> completionService;
+  Set<Future<Object>> pending;
 
   @BeforeClass
   public static void beforeThisClass2() throws Exception {
@@ -108,8 +105,8 @@ public class CustomCollectionTest extend
 
     sliceCount = 2;
     shardCount = 4;
-    completionService = new ExecutorCompletionService<Request>(executor);
-    pending = new HashSet<Future<Request>>();
+    completionService = new ExecutorCompletionService<Object>(executor);
+    pending = new HashSet<Future<Object>>();
     checkCreatedVsState = false;
 
   }

Modified: lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java?rev=1533652&r1=1533651&r2=1533652&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java (original)
+++ lucene/dev/branches/branch_4x/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java Fri Oct 18 22:15:35 2013
@@ -21,10 +21,11 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.lucene.index.LogDocMergePolicy;
-
 import org.apache.solr.BaseDistributedSearchTestCase;
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrServer;
@@ -36,17 +37,19 @@ import org.apache.solr.common.cloud.ZkCo
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.core.SolrEventListener;
 import org.apache.solr.search.SolrIndexSearcher;
 import org.apache.solr.servlet.SolrDispatchFilter;
+import org.apache.solr.update.SolrCmdDistributor.Error;
 import org.apache.solr.update.SolrCmdDistributor.Node;
-import org.apache.solr.update.SolrCmdDistributor.Response;
+import org.apache.solr.update.SolrCmdDistributor.RetryNode;
 import org.apache.solr.update.SolrCmdDistributor.StdNode;
 import org.apache.solr.update.processor.DistributedUpdateProcessor;
-
 import org.junit.BeforeClass;
 
 public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
@@ -57,7 +60,8 @@ public class SolrCmdDistributorTest exte
     // being able to call optimize to have all deletes expunged.
     System.setProperty("solr.tests.mergePolicy", LogDocMergePolicy.class.getName());
   }
-  private UpdateShardHandler updateShardHandler;
+  private ExecutorService updateExecutor = Executors.newCachedThreadPool(
+      new SolrjNamedThreadFactory("updateExecutor"));
   
   public SolrCmdDistributorTest() {
     fixShardCount = true;
@@ -103,7 +107,7 @@ public class SolrCmdDistributorTest exte
   public void doTest() throws Exception {
     del("*:*");
     
-    SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(5, updateShardHandler);
+    SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateExecutor);
     
     ModifiableSolrParams params = new ModifiableSolrParams();
 
@@ -129,9 +133,9 @@ public class SolrCmdDistributorTest exte
     cmdDistrib.finish();
 
     
-    Response response = cmdDistrib.getResponse();
+    List<Error> errors = cmdDistrib.getErrors();
     
-    assertEquals(response.errors.toString(), 0, response.errors.size());
+    assertEquals(errors.toString(), 0, errors.size());
     
     long numFound = controlClient.query(new SolrQuery("*:*")).getResults()
         .getNumFound();
@@ -143,7 +147,7 @@ public class SolrCmdDistributorTest exte
     nodes.add(new StdNode(new ZkCoreNodeProps(nodeProps)));
     
     // add another 2 docs to control and 3 to client
-    cmdDistrib = new SolrCmdDistributor(5, updateShardHandler);
+    cmdDistrib = new SolrCmdDistributor(updateExecutor);
     cmd.solrDoc = sdoc("id", 2);
     params = new ModifiableSolrParams();
     params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
@@ -167,9 +171,9 @@ public class SolrCmdDistributorTest exte
     params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
     cmdDistrib.distribCommit(ccmd, nodes, params);
     cmdDistrib.finish();
-    response = cmdDistrib.getResponse();
+    errors = cmdDistrib.getErrors();
     
-    assertEquals(response.errors.toString(), 0, response.errors.size());
+    assertEquals(errors.toString(), 0, errors.size());
     
     SolrDocumentList results = controlClient.query(new SolrQuery("*:*")).getResults();
     numFound = results.getNumFound();
@@ -186,7 +190,7 @@ public class SolrCmdDistributorTest exte
     
     
 
-    cmdDistrib = new SolrCmdDistributor(5, updateShardHandler);
+    cmdDistrib = new SolrCmdDistributor(updateExecutor);
     
     params = new ModifiableSolrParams();
     params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
@@ -199,9 +203,9 @@ public class SolrCmdDistributorTest exte
     cmdDistrib.distribCommit(ccmd, nodes, params);
     cmdDistrib.finish();
 
-    response = cmdDistrib.getResponse();
+    errors = cmdDistrib.getErrors();
     
-    assertEquals(response.errors.toString(), 0, response.errors.size());
+    assertEquals(errors.toString(), 0, errors.size());
     
     
     results = controlClient.query(new SolrQuery("*:*")).getResults();
@@ -219,7 +223,7 @@ public class SolrCmdDistributorTest exte
     
     int id = 5;
     
-    cmdDistrib = new SolrCmdDistributor(5, updateShardHandler);
+    cmdDistrib = new SolrCmdDistributor(updateExecutor);
     
     int cnt = atLeast(303);
     for (int i = 0; i < cnt; i++) {
@@ -289,18 +293,57 @@ public class SolrCmdDistributorTest exte
           ((NamedList<Object>) resp.get("index")).get("numDocs"),
           ((NamedList<Object>) resp.get("index")).get("maxDoc"));
     }
+    
+    // Test RetryNode
+    cmdDistrib = new SolrCmdDistributor(updateExecutor);
+    final HttpSolrServer solrclient = (HttpSolrServer) clients.get(0);
+    long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults()
+        .getNumFound();
+    
+    nodes = new ArrayList<Node>();
+
+    nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, "[ff01::114]:33332" + context, ZkStateReader.CORE_NAME_PROP, "");
+    RetryNode retryNode = new RetryNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1") {
+      @Override
+      public boolean checkRetry() {
+        ZkNodeProps leaderProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(),
+            ZkStateReader.CORE_NAME_PROP, "");
+        this.nodeProps = new ZkCoreNodeProps(leaderProps);
+
+        return true;
+      }
+    };
+    
+
+    nodes.add(retryNode);
+    
+    
+    cmd = new AddUpdateCommand(null);
+    cmd.solrDoc = sdoc("id", 1111111);
+    params = new ModifiableSolrParams();
 
+    cmdDistrib.distribAdd(cmd, nodes, params);
+    
+    ccmd = new CommitUpdateCommand(null, false);
+    params = new ModifiableSolrParams();
+    params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
+    cmdDistrib.distribCommit(ccmd, nodes, params);
+    cmdDistrib.finish();
+    
+    long numFoundAfter = solrclient.query(new SolrQuery("*:*")).getResults()
+        .getNumFound();
+    
+    assertEquals(numFoundBefore + 1, numFoundAfter);
   }
   
   @Override
   public void setUp() throws Exception {
     super.setUp();
-    updateShardHandler = new UpdateShardHandler(10000, 10000);
   }
   
   @Override
   public void tearDown() throws Exception {
-    updateShardHandler = null;
+    ExecutorUtil.shutdownNowAndAwaitTermination(updateExecutor);
     super.tearDown();
   }
 }

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java?rev=1533652&r1=1533651&r2=1533652&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrServer.java Fri Oct 18 22:15:35 2013
@@ -44,6 +44,8 @@ import org.apache.solr.client.solrj.Solr
 import org.apache.solr.client.solrj.request.RequestWriter;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.util.ClientUtils;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
@@ -72,14 +74,15 @@ public class ConcurrentUpdateSolrServer 
       .getLogger(ConcurrentUpdateSolrServer.class);
   private HttpSolrServer server;
   final BlockingQueue<UpdateRequest> queue;
-  final ExecutorService scheduler = Executors.newCachedThreadPool(
-      new SolrjNamedThreadFactory("concurrentUpdateScheduler"));
+  final ExecutorService scheduler;
   final Queue<Runner> runners;
   volatile CountDownLatch lock = null; // used to block everything
   final int threadCount;
+  boolean shutdownExecutor = false;
+  int pollQueueTime = 250;
 
   /**
-   * Uses an internaly managed HttpClient instance.
+   * Uses an internally managed HttpClient instance.
    * 
    * @param solrServerUrl
    *          The Solr server URL
@@ -91,18 +94,27 @@ public class ConcurrentUpdateSolrServer 
   public ConcurrentUpdateSolrServer(String solrServerUrl, int queueSize,
       int threadCount) {
     this(solrServerUrl, null, queueSize, threadCount);
+    shutdownExecutor = true;
+  }
+  
+  public ConcurrentUpdateSolrServer(String solrServerUrl,
+      HttpClient client, int queueSize, int threadCount) {
+    this(solrServerUrl, null, queueSize, threadCount, Executors.newCachedThreadPool(
+        new SolrjNamedThreadFactory("concurrentUpdateScheduler")));
+    shutdownExecutor = true;
   }
 
   /**
    * Uses the supplied HttpClient to send documents to the Solr server.
    */
   public ConcurrentUpdateSolrServer(String solrServerUrl,
-      HttpClient client, int queueSize, int threadCount) {
+      HttpClient client, int queueSize, int threadCount, ExecutorService es) {
     this.server = new HttpSolrServer(solrServerUrl, client);
     this.server.setFollowRedirects(false);
     queue = new LinkedBlockingQueue<UpdateRequest>(queueSize);
     this.threadCount = threadCount;
     runners = new LinkedList<Runner>();
+    scheduler = es;
   }
 
   /**
@@ -115,8 +127,7 @@ public class ConcurrentUpdateSolrServer 
     public void run() {
       runnerLock.lock();
 
-      // info is ok since this should only happen once for each thread
-      log.info("starting runner: {}", this);
+      log.debug("starting runner: {}", this);
       HttpPost method = null;
       HttpResponse response = null;
       try {
@@ -169,14 +180,15 @@ public class ConcurrentUpdateSolrServer 
                       }
                     }
                     out.flush();
-                    req = queue.poll(250, TimeUnit.MILLISECONDS);
+                    req = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
                   }
                   if (isXml) {
                     out.write("</stream>".getBytes("UTF-8"));
                   }
 
                 } catch (InterruptedException e) {
-                  e.printStackTrace();
+                  Thread.currentThread().interrupt();
+                  log.warn("", e);
                 }
               }
             });
@@ -196,16 +208,13 @@ public class ConcurrentUpdateSolrServer 
             
             response = server.getHttpClient().execute(method);
             int statusCode = response.getStatusLine().getStatusCode();
-            log.info("Status for: "
-                + updateRequest.getDocuments().get(0).getFieldValue("id")
-                + " is " + statusCode);
             if (statusCode != HttpStatus.SC_OK) {
               StringBuilder msg = new StringBuilder();
               msg.append(response.getStatusLine().getReasonPhrase());
               msg.append("\n\n");
               msg.append("\n\n");
               msg.append("request: ").append(method.getURI());
-              handleError(new Exception(msg.toString()));
+              handleError(new SolrException(ErrorCode.getErrorCode(statusCode), msg.toString()));
             }
           } finally {
             try {
@@ -213,6 +222,7 @@ public class ConcurrentUpdateSolrServer 
                 response.getEntity().getContent().close();
               }
             } catch (Exception ex) {
+              log.warn("", ex);
             }
           }
         }
@@ -236,7 +246,7 @@ public class ConcurrentUpdateSolrServer 
           }
         }
 
-        log.info("finished: {}", this);
+        log.debug("finished: {}", this);
         runnerLock.unlock();
       }
     }
@@ -357,16 +367,18 @@ public class ConcurrentUpdateSolrServer 
   @Override
   public void shutdown() {
     server.shutdown();
-    scheduler.shutdown();
-    try {
-      if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
+    if (shutdownExecutor) {
+      scheduler.shutdown();
+      try {
+        if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
+          scheduler.shutdownNow();
+          if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) log
+              .error("ExecutorService did not terminate");
+        }
+      } catch (InterruptedException ie) {
         scheduler.shutdownNow();
-        if (!scheduler.awaitTermination(60, TimeUnit.SECONDS))
-          log.error("ExecutorService did not terminate");
+        Thread.currentThread().interrupt();
       }
-    } catch (InterruptedException ie) {
-      scheduler.shutdownNow();
-      Thread.currentThread().interrupt();
     }
   }
   
@@ -384,19 +396,30 @@ public class ConcurrentUpdateSolrServer 
 
   public void shutdownNow() {
     server.shutdown();
-    scheduler.shutdownNow(); // Cancel currently executing tasks
-    try {
-      if (!scheduler.awaitTermination(30, TimeUnit.SECONDS))
-        log.error("ExecutorService did not terminate");
-    } catch (InterruptedException ie) {
-      scheduler.shutdownNow();
-      Thread.currentThread().interrupt();
+    if (shutdownExecutor) {
+      scheduler.shutdownNow(); // Cancel currently executing tasks
+      try {
+        if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) log
+            .error("ExecutorService did not terminate");
+      } catch (InterruptedException ie) {
+        scheduler.shutdownNow();
+        Thread.currentThread().interrupt();
+      }
     }
   }
 
   public void setParser(ResponseParser responseParser) {
     server.setParser(responseParser);
   }
+  
+  
+  /**
+   * @param pollQueueTime time for an open connection to wait for updates when
+   * the queue is empty. 
+   */
+  public void setPollQueueTime(int pollQueueTime) {
+    this.pollQueueTime = pollQueueTime;
+  }
 
   public void setRequestWriter(RequestWriter requestWriter) {
     server.setRequestWriter(requestWriter);

Modified: lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java?rev=1533652&r1=1533651&r2=1533652&view=diff
==============================================================================
--- lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java (original)
+++ lucene/dev/branches/branch_4x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java Fri Oct 18 22:15:35 2013
@@ -100,7 +100,9 @@ public class HttpClientUtil {
    */
   public static HttpClient createClient(final SolrParams params) {
     final ModifiableSolrParams config = new ModifiableSolrParams(params);
-    logger.info("Creating new http client, config:" + config);
+    if (logger.isDebugEnabled()) {
+      logger.debug("Creating new http client, config:" + config);
+    }
     final DefaultHttpClient httpClient = new SystemDefaultHttpClient();
     configureClient(httpClient, config);
     return httpClient;