You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2013/12/29 01:51:29 UTC

svn commit: r1553984 - in /lucene/dev/branches/lucene_solr_4_6: ./ solr/ solr/core/ solr/core/src/java/org/apache/solr/cloud/ solr/core/src/java/org/apache/solr/core/ solr/core/src/java/org/apache/solr/handler/ solr/core/src/java/org/apache/solr/handle...

Author: markrmiller
Date: Sun Dec 29 00:51:29 2013
New Revision: 1553984

URL: http://svn.apache.org/r1553984
Log:
SOLR-5496: We should share an http connection manager across non search HttpClients and ensure all http connection managers get shutdown.

Added:
    lucene/dev/branches/lucene_solr_4_6/solr/core/src/test/org/apache/solr/update/MockStreamingSolrServers.java   (with props)
Modified:
    lucene/dev/branches/lucene_solr_4_6/   (props changed)
    lucene/dev/branches/lucene_solr_4_6/solr/   (props changed)
    lucene/dev/branches/lucene_solr_4_6/solr/CHANGES.txt   (contents, props changed)
    lucene/dev/branches/lucene_solr_4_6/solr/core/   (props changed)
    lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
    lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
    lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/core/ConfigSolr.java
    lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/core/ConfigSolrXml.java
    lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/core/ConfigSolrXmlOld.java
    lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/core/CoreContainer.java
    lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/core/ZkContainer.java
    lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/handler/SnapPuller.java
    lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
    lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/update/PeerSync.java
    lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
    lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java
    lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
    lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
    lucene/dev/branches/lucene_solr_4_6/solr/core/src/test-files/solr/solr-50-all.xml
    lucene/dev/branches/lucene_solr_4_6/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
    lucene/dev/branches/lucene_solr_4_6/solr/core/src/test/org/apache/solr/core/TestSolrXml.java
    lucene/dev/branches/lucene_solr_4_6/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
    lucene/dev/branches/lucene_solr_4_6/solr/solrj/   (props changed)
    lucene/dev/branches/lucene_solr_4_6/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java

Modified: lucene/dev/branches/lucene_solr_4_6/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_6/solr/CHANGES.txt?rev=1553984&r1=1553983&r2=1553984&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_6/solr/CHANGES.txt (original)
+++ lucene/dev/branches/lucene_solr_4_6/solr/CHANGES.txt Sun Dec 29 00:51:29 2013
@@ -71,6 +71,10 @@ Bug Fixes
 * SOLR-5568 A SolrCore cannot decide to be the leader just because the cluster
   state says no other SolrCore's are active. (Mark Miller)
 
+* SOLR-5496: We should share an http connection manager across non search 
+  HttpClients and ensure all http connection managers get shutdown.
+  (Mark Miller)
+
 Optimizations
 ----------------------  
 

Modified: lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1553984&r1=1553983&r2=1553984&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Sun Dec 29 00:51:29 2013
@@ -114,9 +114,9 @@ class ShardLeaderElectionContextBase ext
 final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
   private static Logger log = LoggerFactory.getLogger(ShardLeaderElectionContext.class);
   
-  private ZkController zkController;
-  private CoreContainer cc;
-  private SyncStrategy syncStrategy = new SyncStrategy();
+  private final ZkController zkController;
+  private final CoreContainer cc;
+  private final SyncStrategy syncStrategy;
 
   private volatile boolean isClosed = false;
   
@@ -127,6 +127,7 @@ final class ShardLeaderElectionContext e
         zkController.getZkStateReader());
     this.zkController = zkController;
     this.cc = cc;
+    syncStrategy = new SyncStrategy(cc.getUpdateShardHandler());
   }
   
   @Override

Modified: lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java?rev=1553984&r1=1553983&r2=1553984&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java (original)
+++ lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java Sun Dec 29 00:51:29 2013
@@ -20,13 +20,10 @@ package org.apache.solr.cloud;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.http.client.HttpClient;
 import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.HttpClientUtil;
 import org.apache.solr.client.solrj.impl.HttpSolrServer;
 import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestRecovery;
 import org.apache.solr.common.SolrException;
@@ -35,7 +32,6 @@ import org.apache.solr.common.cloud.ZkNo
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
 import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.SolrCore;
@@ -44,7 +40,7 @@ import org.apache.solr.handler.component
 import org.apache.solr.handler.component.ShardRequest;
 import org.apache.solr.handler.component.ShardResponse;
 import org.apache.solr.update.PeerSync;
-import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.apache.solr.update.UpdateShardHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,26 +50,18 @@ public class SyncStrategy {
   private final boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery");
   
   private final ShardHandler shardHandler;
-  
-  private ThreadPoolExecutor recoveryCmdExecutor = new ThreadPoolExecutor(
-      0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS,
-      new SynchronousQueue<Runnable>(), new DefaultSolrThreadFactory(
-          "recoveryCmdExecutor"));
 
   private volatile boolean isClosed;
   
   private final HttpClient client;
-  {
-    ModifiableSolrParams params = new ModifiableSolrParams();
-    params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 10000);
-    params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 20);
-    params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, 30000);
-    params.set(HttpClientUtil.PROP_USE_RETRY, false);
-    client = HttpClientUtil.createClient(params);
-  }
+
+  private final ExecutorService updateExecutor;
   
-  public SyncStrategy() {
+  public SyncStrategy(UpdateShardHandler updateShardHandler) {
+    client = updateShardHandler.getHttpClient();
+    
     shardHandler = new HttpShardHandlerFactory().getShardHandler(client);
+    updateExecutor = updateShardHandler.getUpdateExecutor();
   }
   
   private static class ShardCoreRequest extends ShardRequest {
@@ -256,16 +244,6 @@ public class SyncStrategy {
   
   public void close() {
     this.isClosed = true;
-    try {
-      client.getConnectionManager().shutdown();
-    } catch (Throwable e) {
-      SolrException.log(log, e);
-    }
-    try {
-      ExecutorUtil.shutdownNowAndAwaitTermination(recoveryCmdExecutor);
-    } catch (Throwable e) {
-      SolrException.log(log, e);
-    }
   }
   
   private void requestRecovery(final ZkNodeProps leaderProps, final String baseUrl, final String coreName) throws SolrServerException, IOException {
@@ -291,7 +269,7 @@ public class SyncStrategy {
         }
       }
     };
-    recoveryCmdExecutor.execute(thread);
+    updateExecutor.execute(thread);
   }
   
   public static ModifiableSolrParams params(String... params) {

Modified: lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1553984&r1=1553983&r2=1553984&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/cloud/ZkController.java Sun Dec 29 00:51:29 2013
@@ -172,11 +172,9 @@ public final class ZkController {
   private int clientTimeout;
 
   private volatile boolean isClosed;
-  
-  private UpdateShardHandler updateShardHandler;
 
   public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,
-      String localHostContext, int leaderVoteWait, boolean genericCoreNodeNames, int distribUpdateConnTimeout, int distribUpdateSoTimeout, final CurrentCoreDescriptorProvider registerOnReconnect) throws InterruptedException,
+      String localHostContext, int leaderVoteWait, boolean genericCoreNodeNames, final CurrentCoreDescriptorProvider registerOnReconnect) throws InterruptedException,
       TimeoutException, IOException {
     if (cc == null) throw new IllegalArgumentException("CoreContainer cannot be null.");
     this.cc = cc;
@@ -187,8 +185,6 @@ public final class ZkController {
     // which means the default of "solr"
     localHostContext = trimLeadingAndTrailingSlashes(localHostContext);
     
-    updateShardHandler = new UpdateShardHandler(distribUpdateConnTimeout, distribUpdateSoTimeout);
-    
     this.zkServerAddress = zkServerAddress;
     this.localHostPort = locaHostPort;
     this.localHostContext = localHostContext;
@@ -401,13 +397,6 @@ public final class ZkController {
       log.error("Error closing zkClient", t);
     } 
     
-    if (updateShardHandler != null) {
-      try {
-        updateShardHandler.close();
-      } catch(Throwable t) {
-        log.error("Error closing updateShardHandler", t);
-      }
-    }
   }
 
   /**
@@ -1566,11 +1555,6 @@ public final class ZkController {
     return clientTimeout;
   }
 
-  // may return null if not in zk mode
-  public UpdateShardHandler getUpdateShardHandler() {
-    return updateShardHandler;
-  }
-
   /**
    * Returns the nodeName that should be used based on the specified properties.
    *

Modified: lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/core/ConfigSolr.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/core/ConfigSolr.java?rev=1553984&r1=1553983&r2=1553984&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/core/ConfigSolr.java (original)
+++ lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/core/ConfigSolr.java Sun Dec 29 00:51:29 2013
@@ -160,6 +160,14 @@ public abstract class ConfigSolr {
   public int getDistributedSocketTimeout() {
     return getInt(CfgProp.SOLR_DISTRIBUPDATESOTIMEOUT, 0);
   }
+  
+  public int getMaxUpdateConnections() {
+    return getInt(CfgProp.SOLR_MAXUPDATECONNECTIONS, 10000);
+  }
+
+  public int getMaxUpdateConnectionsPerHost() {
+    return getInt(CfgProp.SOLR_MAXUPDATECONNECTIONSPERHOST, 100);
+  }
 
   public int getCoreLoadThreadCount() {
     return getInt(ConfigSolr.CfgProp.SOLR_CORELOADTHREADS, DEFAULT_CORE_LOAD_THREADS);
@@ -211,6 +219,8 @@ public abstract class ConfigSolr {
     SOLR_COREROOTDIRECTORY,
     SOLR_DISTRIBUPDATECONNTIMEOUT,
     SOLR_DISTRIBUPDATESOTIMEOUT,
+    SOLR_MAXUPDATECONNECTIONS,
+    SOLR_MAXUPDATECONNECTIONSPERHOST,
     SOLR_HOST,
     SOLR_HOSTCONTEXT,
     SOLR_HOSTPORT,

Modified: lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/core/ConfigSolrXml.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/core/ConfigSolrXml.java?rev=1553984&r1=1553983&r2=1553984&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/core/ConfigSolrXml.java (original)
+++ lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/core/ConfigSolrXml.java Sun Dec 29 00:51:29 2013
@@ -107,6 +107,8 @@ public class ConfigSolrXml extends Confi
     propMap.put(CfgProp.SOLR_COREROOTDIRECTORY, doSub("solr/str[@name='coreRootDirectory']"));
     propMap.put(CfgProp.SOLR_DISTRIBUPDATECONNTIMEOUT, doSub("solr/solrcloud/int[@name='distribUpdateConnTimeout']"));
     propMap.put(CfgProp.SOLR_DISTRIBUPDATESOTIMEOUT, doSub("solr/solrcloud/int[@name='distribUpdateSoTimeout']"));
+    propMap.put(CfgProp.SOLR_MAXUPDATECONNECTIONS, doSub("solr/solrcloud/int[@name='maxUpdateConnections']"));
+    propMap.put(CfgProp.SOLR_MAXUPDATECONNECTIONSPERHOST, doSub("solr/solrcloud/int[@name='maxUpdateConnectionsPerHost']"));
     propMap.put(CfgProp.SOLR_HOST, doSub("solr/solrcloud/str[@name='host']"));
     propMap.put(CfgProp.SOLR_HOSTCONTEXT, doSub("solr/solrcloud/str[@name='hostContext']"));
     propMap.put(CfgProp.SOLR_HOSTPORT, doSub("solr/solrcloud/int[@name='hostPort']"));

Modified: lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/core/ConfigSolrXmlOld.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/core/ConfigSolrXmlOld.java?rev=1553984&r1=1553983&r2=1553984&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/core/ConfigSolrXmlOld.java (original)
+++ lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/core/ConfigSolrXmlOld.java Sun Dec 29 00:51:29 2013
@@ -143,6 +143,10 @@ public class ConfigSolrXmlOld extends Co
         config.getVal("solr/cores/@distribUpdateConnTimeout", false));
     propMap.put(CfgProp.SOLR_DISTRIBUPDATESOTIMEOUT,
         config.getVal("solr/cores/@distribUpdateSoTimeout", false));
+    propMap.put(CfgProp.SOLR_MAXUPDATECONNECTIONS,
+        config.getVal("solr/cores/@maxUpdateConnections", false));
+    propMap.put(CfgProp.SOLR_MAXUPDATECONNECTIONSPERHOST,
+        config.getVal("solr/cores/@maxUpdateConnectionsPerHost", false));
     propMap.put(CfgProp.SOLR_HOST, config.getVal("solr/cores/@host", false));
     propMap.put(CfgProp.SOLR_HOSTCONTEXT,
         config.getVal("solr/cores/@hostContext", false));

Modified: lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/core/CoreContainer.java?rev=1553984&r1=1553983&r2=1553984&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/core/CoreContainer.java Sun Dec 29 00:51:29 2013
@@ -17,27 +17,6 @@
 
 package org.apache.solr.core;
 
-import com.google.common.collect.Maps;
-
-import org.apache.solr.cloud.ZkController;
-import org.apache.solr.cloud.ZkSolrResourceLoader;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.ZooKeeperException;
-import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.SolrjNamedThreadFactory;
-import org.apache.solr.handler.admin.CollectionsHandler;
-import org.apache.solr.handler.admin.CoreAdminHandler;
-import org.apache.solr.handler.admin.InfoHandler;
-import org.apache.solr.handler.component.ShardHandlerFactory;
-import org.apache.solr.logging.LogWatcher;
-import org.apache.solr.schema.IndexSchema;
-import org.apache.solr.schema.IndexSchemaFactory;
-import org.apache.solr.util.DefaultSolrThreadFactory;
-import org.apache.solr.util.FileUtils;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.io.File;
@@ -74,6 +53,7 @@ import org.apache.solr.handler.component
 import org.apache.solr.logging.LogWatcher;
 import org.apache.solr.schema.IndexSchema;
 import org.apache.solr.schema.IndexSchemaFactory;
+import org.apache.solr.update.UpdateShardHandler;
 import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.apache.solr.util.FileUtils;
 import org.apache.zookeeper.KeeperException;
@@ -108,8 +88,7 @@ public class CoreContainer {
   protected ZkContainer zkSys = new ZkContainer();
   private ShardHandlerFactory shardHandlerFactory;
   
-  private ExecutorService updateExecutor = Executors.newCachedThreadPool(
-      new SolrjNamedThreadFactory("updateExecutor"));
+  private UpdateShardHandler updateShardHandler;
 
   protected LogWatcher logging = null;
 
@@ -120,6 +99,7 @@ public class CoreContainer {
   protected final String solrHome;
 
   protected final CoresLocator coresLocator;
+  
 
   {
     log.info("New CoreContainer " + System.identityHashCode(this));
@@ -214,6 +194,8 @@ public class CoreContainer {
     }
 
     shardHandlerFactory = ShardHandlerFactory.newInstance(cfg.getShardHandlerFactoryPluginInfo(), loader);
+    
+    updateShardHandler = new UpdateShardHandler(cfg);
 
     solrCores.allocateLazyCores(cfg.getTransientCacheSize(), loader);
 
@@ -387,7 +369,6 @@ public class CoreContainer {
       cancelCoreRecoveries();
     }
 
-
     try {
       // First wake up the closer thread, it'll terminate almost immediately since it checks isShutDown.
       synchronized (solrCores.getModifyLock()) {
@@ -413,16 +394,20 @@ public class CoreContainer {
       }
 
     } finally {
-      if (shardHandlerFactory != null) {
-        shardHandlerFactory.close();
+      try {
+        if (shardHandlerFactory != null) {
+          shardHandlerFactory.close();
+        }
+      } finally {
+        try {
+          if (updateShardHandler != null) {
+            updateShardHandler.close();
+          }
+        } finally {
+          // we want to close zk stuff last
+          zkSys.close();
+        }
       }
-      
-      ExecutorUtil.shutdownAndAwaitTermination(updateExecutor);
-      
-      // we want to close zk stuff last
-
-      zkSys.close();
-
     }
     org.apache.lucene.util.IOUtils.closeWhileHandlingException(loader); // best effort
   }
@@ -903,7 +888,7 @@ public class CoreContainer {
   public InfoHandler getInfoHandler() {
     return infoHandler;
   }
-  
+
   /**
    * the default core name, or null if there is no default core name
    */
@@ -985,8 +970,12 @@ public class CoreContainer {
     return shardHandlerFactory;
   }
   
+  public UpdateShardHandler getUpdateShardHandler() {
+    return updateShardHandler;
+  }
+  
   public ExecutorService getUpdateExecutor() {
-    return updateExecutor;
+    return updateShardHandler.getUpdateExecutor();
   }
   
   // Just to tidy up the code where it did this in-line.
@@ -1002,8 +991,6 @@ public class CoreContainer {
   String getCoreToOrigName(SolrCore core) {
     return solrCores.getCoreToOrigName(core);
   }
-  
-
 }
 
 class CloserThread extends Thread {

Modified: lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/core/ZkContainer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/core/ZkContainer.java?rev=1553984&r1=1553983&r2=1553984&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/core/ZkContainer.java (original)
+++ lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/core/ZkContainer.java Sun Dec 29 00:51:29 2013
@@ -58,8 +58,6 @@ public class ZkContainer {
 
   private ExecutorService coreZkRegister = Executors.newFixedThreadPool(Integer.MAX_VALUE,
       new DefaultSolrThreadFactory("coreZkRegister") );
-  private int distribUpdateConnTimeout;
-  private int distribUpdateSoTimeout;
   
   public ZkContainer() {
     
@@ -74,8 +72,7 @@ public class ZkContainer {
 
     initZooKeeper(cc, solrHome,
         config.getZkHost(), config.getZkClientTimeout(), config.getZkHostPort(), config.getZkHostContext(),
-        config.getHost(), config.getLeaderVoteWait(), config.getGenericCoreNodeNames(),
-        config.getDistributedConnectionTimeout(), config.getDistributedSocketTimeout());
+        config.getHost(), config.getLeaderVoteWait(), config.getGenericCoreNodeNames());
   }
   // TODO: 5.0 remove this, it's only here for back-compat and only called from ConfigSolr.
   public static boolean isZkMode() {
@@ -87,8 +84,7 @@ public class ZkContainer {
   }
 
   public void initZooKeeper(final CoreContainer cc, String solrHome, String zkHost, int zkClientTimeout, String hostPort,
-                            String hostContext, String host, int leaderVoteWait, boolean genericCoreNodeNames,
-                            int distribUpdateConnTimeout, int distribUpdateSoTimeout) {
+                            String hostContext, String host, int leaderVoteWait, boolean genericCoreNodeNames) {
     ZkController zkController = null;
     
     // if zkHost sys property is not set, we are not using ZooKeeper
@@ -160,7 +156,7 @@ public class ZkContainer {
         }
         zkController = new ZkController(cc, zookeeperHost, zkClientTimeout,
             zkClientConnectTimeout, host, hostPort, hostContext,
-            leaderVoteWait, genericCoreNodeNames, distribUpdateConnTimeout, distribUpdateSoTimeout,
+            leaderVoteWait, genericCoreNodeNames,
             new CurrentCoreDescriptorProvider() {
 
               @Override

Modified: lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/handler/SnapPuller.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/handler/SnapPuller.java?rev=1553984&r1=1553983&r2=1553984&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/handler/SnapPuller.java (original)
+++ lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/handler/SnapPuller.java Sun Dec 29 00:51:29 2013
@@ -162,25 +162,18 @@ public class SnapPuller {
    */
   private AtomicBoolean pollDisabled = new AtomicBoolean(false);
 
-  // HttpClient shared by all cores (used if timeout is not specified for a core)
-  private static HttpClient client;
-  // HttpClient for this instance if connectionTimeout or readTimeout has been specified
   private final HttpClient myHttpClient;
 
-  private static synchronized HttpClient createHttpClient(String connTimeout, String readTimeout, String httpBasicAuthUser, String httpBasicAuthPassword, boolean useCompression) {
-    if (connTimeout == null && readTimeout == null && client != null)  return client;
+  private static HttpClient createHttpClient(SolrCore core, String connTimeout, String readTimeout, String httpBasicAuthUser, String httpBasicAuthPassword, boolean useCompression) {
     final ModifiableSolrParams httpClientParams = new ModifiableSolrParams();
     httpClientParams.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, connTimeout != null ? connTimeout : "5000");
     httpClientParams.set(HttpClientUtil.PROP_SO_TIMEOUT, readTimeout != null ? readTimeout : "20000");
     httpClientParams.set(HttpClientUtil.PROP_BASIC_AUTH_USER, httpBasicAuthUser);
     httpClientParams.set(HttpClientUtil.PROP_BASIC_AUTH_PASS, httpBasicAuthPassword);
     httpClientParams.set(HttpClientUtil.PROP_ALLOW_COMPRESSION, useCompression);
-    // Keeping a very high number so that if you have a large number of cores
-    // no requests are kept waiting for an idle connection.
-    httpClientParams.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 10000);
-    httpClientParams.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 10000);
-    HttpClient httpClient = HttpClientUtil.createClient(httpClientParams);
-    if (client == null && connTimeout == null && readTimeout == null) client = httpClient;
+
+    HttpClient httpClient = HttpClientUtil.createClient(httpClientParams, core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getConnectionManager());
+
     return httpClient;
   }
 
@@ -207,7 +200,7 @@ public class SnapPuller {
     String readTimeout = (String) initArgs.get(HttpClientUtil.PROP_SO_TIMEOUT);
     String httpBasicAuthUser = (String) initArgs.get(HttpClientUtil.PROP_BASIC_AUTH_USER);
     String httpBasicAuthPassword = (String) initArgs.get(HttpClientUtil.PROP_BASIC_AUTH_PASS);
-    myHttpClient = createHttpClient(connTimeout, readTimeout, httpBasicAuthUser, httpBasicAuthPassword, useExternal);
+    myHttpClient = createHttpClient(solrCore, connTimeout, readTimeout, httpBasicAuthUser, httpBasicAuthPassword, useExternal);
     if (pollInterval != null && pollInterval > 0) {
       startExecutorService();
     } else {

Modified: lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1553984&r1=1553983&r2=1553984&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Sun Dec 29 00:51:29 2013
@@ -793,7 +793,7 @@ public class CoreAdminHandler extends Re
     try {
       core = coreContainer.getCore(cname);
       if (core != null) {
-        syncStrategy = new SyncStrategy();
+        syncStrategy = new SyncStrategy(core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler());
         
         Map<String,Object> props = new HashMap<String,Object>();
         props.put(ZkStateReader.BASE_URL_PROP, zkController.getBaseUrl());

Modified: lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/update/PeerSync.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/update/PeerSync.java?rev=1553984&r1=1553983&r2=1553984&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/update/PeerSync.java (original)
+++ lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/update/PeerSync.java Sun Dec 29 00:51:29 2013
@@ -79,15 +79,7 @@ public class PeerSync  {
   private long ourHighThreshold; // 80th percentile
   private boolean cantReachIsSuccess;
   private boolean getNoVersionsIsSuccess;
-  private static final HttpClient client;
-  static {
-    ModifiableSolrParams params = new ModifiableSolrParams();
-    params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 20);
-    params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 10000);
-    params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, 30000);
-    params.set(HttpClientUtil.PROP_USE_RETRY, false);
-    client = HttpClientUtil.createClient(params);
-  }
+  private final HttpClient client;
 
   // comparator that sorts by absolute value, putting highest first
   private static Comparator<Long> absComparator = new Comparator<Long>() {
@@ -137,7 +129,7 @@ public class PeerSync  {
     this.maxUpdates = nUpdates;
     this.cantReachIsSuccess = cantReachIsSuccess;
     this.getNoVersionsIsSuccess = getNoVersionsIsSuccess;
-
+    this.client = core.getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getHttpClient();
     
     uhandler = core.getUpdateHandler();
     ulog = uhandler.getUpdateLog();

Modified: lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java?rev=1553984&r1=1553983&r2=1553984&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java (original)
+++ lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java Sun Dec 29 00:51:29 2013
@@ -20,7 +20,6 @@ package org.apache.solr.update;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
 
 import org.apache.solr.client.solrj.SolrServer;
 import org.apache.solr.client.solrj.SolrServerException;
@@ -51,8 +50,8 @@ public class SolrCmdDistributor {
     public boolean abortCheck();
   }
   
-  public SolrCmdDistributor(ExecutorService updateExecutor) {
-    servers = new StreamingSolrServers(updateExecutor);
+  public SolrCmdDistributor(UpdateShardHandler updateShardHandler) {
+    servers = new StreamingSolrServers(updateShardHandler);
   }
   
   public void finish() {

Modified: lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java?rev=1553984&r1=1553983&r2=1553984&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java (original)
+++ lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/update/StreamingSolrServers.java Sun Dec 29 00:51:29 2013
@@ -43,23 +43,23 @@ import org.slf4j.LoggerFactory;
 public class StreamingSolrServers {
   public static Logger log = LoggerFactory.getLogger(StreamingSolrServers.class);
   
-  private static HttpClient httpClient;
-  static {
-    ModifiableSolrParams params = new ModifiableSolrParams();
-    params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 128);
-    params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 32);
-    params.set(HttpClientUtil.PROP_FOLLOW_REDIRECTS, false);
-    params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, 30000);
-    httpClient = HttpClientUtil.createClient(params);
-  }
+  private HttpClient httpClient;
+
   
   private Map<String,ConcurrentUpdateSolrServer> solrServers = new HashMap<String,ConcurrentUpdateSolrServer>();
   private List<Error> errors = Collections.synchronizedList(new ArrayList<Error>());
 
   private ExecutorService updateExecutor;
 
-  public StreamingSolrServers(ExecutorService updateExecutor) {
-    this.updateExecutor = updateExecutor;
+  public StreamingSolrServers(UpdateShardHandler updateShardHandler) {
+    this.updateExecutor = updateShardHandler.getUpdateExecutor();
+    
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(HttpClientUtil.PROP_FOLLOW_REDIRECTS, false);
+    params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, 30000);
+    params.set(HttpClientUtil.PROP_USE_RETRY, false);
+    
+    httpClient = updateShardHandler.getHttpClient();
   }
 
   public List<Error> getErrors() {

Modified: lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java?rev=1553984&r1=1553983&r2=1553984&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java (original)
+++ lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java Sun Dec 29 00:51:29 2013
@@ -17,16 +17,18 @@ package org.apache.solr.update;
  * limitations under the License.
  */
 
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.http.client.HttpClient;
+import org.apache.http.conn.ClientConnectionManager;
+import org.apache.http.impl.conn.PoolingClientConnectionManager;
 import org.apache.solr.client.solrj.impl.HttpClientUtil;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
+import org.apache.solr.core.ConfigSolr;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,36 +36,48 @@ public class UpdateShardHandler {
   
   private static Logger log = LoggerFactory.getLogger(UpdateShardHandler.class);
   
-  private ThreadPoolExecutor cmdDistribExecutor = new ThreadPoolExecutor(0,
-      Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
-      new DefaultSolrThreadFactory("cmdDistribExecutor"));
+  private ExecutorService updateExecutor = Executors.newCachedThreadPool(
+      new SolrjNamedThreadFactory("updateExecutor"));
+  
+  private PoolingClientConnectionManager clientConnectionManager;
   
   private final HttpClient client;
 
-  public UpdateShardHandler(int distribUpdateConnTimeout, int distribUpdateSoTimeout) {
+  public UpdateShardHandler(ConfigSolr cfg) {
+    
+    clientConnectionManager = new PoolingClientConnectionManager();
+    clientConnectionManager.setDefaultMaxPerRoute(cfg.getMaxUpdateConnections());
+    clientConnectionManager.setDefaultMaxPerRoute(cfg.getMaxUpdateConnectionsPerHost());
+    
+    
     ModifiableSolrParams params = new ModifiableSolrParams();
-    params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 500);
-    params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 16);
-    params.set(HttpClientUtil.PROP_SO_TIMEOUT, distribUpdateSoTimeout);
-    params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, distribUpdateConnTimeout);
-    client = HttpClientUtil.createClient(params);
+    params.set(HttpClientUtil.PROP_SO_TIMEOUT, cfg.getDistributedSocketTimeout());
+    params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, cfg.getDistributedConnectionTimeout());
+    params.set(HttpClientUtil.PROP_USE_RETRY, false);
+    client = HttpClientUtil.createClient(params, clientConnectionManager);
   }
   
   
   public HttpClient getHttpClient() {
     return client;
   }
+
+  public ClientConnectionManager getConnectionManager() {
+    return clientConnectionManager;
+  }
   
-  public ThreadPoolExecutor getCmdDistribExecutor() {
-    return cmdDistribExecutor;
+  public ExecutorService getUpdateExecutor() {
+    return updateExecutor;
   }
 
   public void close() {
     try {
-      ExecutorUtil.shutdownNowAndAwaitTermination(cmdDistribExecutor);
+      ExecutorUtil.shutdownAndAwaitTermination(updateExecutor);
     } catch (Throwable e) {
       SolrException.log(log, e);
+    } finally {
+      clientConnectionManager.shutdown();
     }
-    client.getConnectionManager().shutdown();
   }
+
 }

Modified: lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1553984&r1=1553983&r2=1553984&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/lucene_solr_4_6/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Sun Dec 29 00:51:29 2013
@@ -172,7 +172,7 @@ public class DistributedUpdateProcessor 
     this.zkEnabled  = coreDesc.getCoreContainer().isZooKeeperAware();
     zkController = req.getCore().getCoreDescriptor().getCoreContainer().getZkController();
     if (zkEnabled) {
-      cmdDistrib = new SolrCmdDistributor(coreDesc.getCoreContainer().getUpdateExecutor());
+      cmdDistrib = new SolrCmdDistributor(coreDesc.getCoreContainer().getUpdateShardHandler());
     }
     //this.rsp = reqInfo != null ? reqInfo.getRsp() : null;
 
@@ -557,7 +557,7 @@ public class DistributedUpdateProcessor 
           }
         }
       };
-      ExecutorService executor = req.getCore().getCoreDescriptor().getCoreContainer().getUpdateExecutor();
+      ExecutorService executor = req.getCore().getCoreDescriptor().getCoreContainer().getUpdateShardHandler().getUpdateExecutor();
       executor.execute(thread);
       
     }

Modified: lucene/dev/branches/lucene_solr_4_6/solr/core/src/test-files/solr/solr-50-all.xml
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_6/solr/core/src/test-files/solr/solr-50-all.xml?rev=1553984&r1=1553983&r2=1553984&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_6/solr/core/src/test-files/solr/solr-50-all.xml (original)
+++ lucene/dev/branches/lucene_solr_4_6/solr/core/src/test-files/solr/solr-50-all.xml Sun Dec 29 00:51:29 2013
@@ -27,6 +27,8 @@
   <solrcloud>
     <int name="distribUpdateConnTimeout">22</int>
     <int name="distribUpdateSoTimeout">33</int>
+    <int name="maxUpdateConnections">3</int>
+    <int name="maxUpdateConnectionsPerHost">37</int>
     <int name="leaderVoteWait">55</int>
     <str name="host">testHost</str>
     <str name="hostContext">testHostContext</str>

Modified: lucene/dev/branches/lucene_solr_4_6/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_6/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java?rev=1553984&r1=1553983&r2=1553984&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_6/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java (original)
+++ lucene/dev/branches/lucene_solr_4_6/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java Sun Dec 29 00:51:29 2013
@@ -179,7 +179,7 @@ public class ZkControllerTest extends So
       cc = getCoreContainer();
       
       ZkController zkController = new ZkController(cc, server.getZkAddress(), TIMEOUT, 10000,
-          "127.0.0.1", "8983", "solr", 0, true, 10000, 10000, new CurrentCoreDescriptorProvider() {
+          "127.0.0.1", "8983", "solr", 0, true, new CurrentCoreDescriptorProvider() {
             
             @Override
             public List<CoreDescriptor> getCurrentDescriptors() {
@@ -219,7 +219,7 @@ public class ZkControllerTest extends So
       cc = getCoreContainer();
       
       zkController = new ZkController(cc, server.getZkAddress(),
-          TIMEOUT, 10000, "127.0.0.1", "8983", "solr", 0, true, 10000, 10000, new CurrentCoreDescriptorProvider() {
+          TIMEOUT, 10000, "127.0.0.1", "8983", "solr", 0, true, new CurrentCoreDescriptorProvider() {
             
             @Override
             public List<CoreDescriptor> getCurrentDescriptors() {

Modified: lucene/dev/branches/lucene_solr_4_6/solr/core/src/test/org/apache/solr/core/TestSolrXml.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_6/solr/core/src/test/org/apache/solr/core/TestSolrXml.java?rev=1553984&r1=1553983&r2=1553984&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_6/solr/core/src/test/org/apache/solr/core/TestSolrXml.java (original)
+++ lucene/dev/branches/lucene_solr_4_6/solr/core/src/test/org/apache/solr/core/TestSolrXml.java Sun Dec 29 00:51:29 2013
@@ -51,6 +51,8 @@ public class TestSolrXml extends SolrTes
       assertEquals("Did not find expected value", cfg.get(ConfigSolr.CfgProp.SOLR_COREROOTDIRECTORY, null), "testCoreRootDirectory");
       assertEquals("Did not find expected value", cfg.getInt(ConfigSolr.CfgProp.SOLR_DISTRIBUPDATECONNTIMEOUT, 0), 22);
       assertEquals("Did not find expected value", cfg.getInt(ConfigSolr.CfgProp.SOLR_DISTRIBUPDATESOTIMEOUT, 0), 33);
+      assertEquals("Did not find expected value", cfg.getInt(ConfigSolr.CfgProp.SOLR_MAXUPDATECONNECTIONS, 0), 3);
+      assertEquals("Did not find expected value", cfg.getInt(ConfigSolr.CfgProp.SOLR_MAXUPDATECONNECTIONSPERHOST, 0), 37);
       assertEquals("Did not find expected value", cfg.get(ConfigSolr.CfgProp.SOLR_HOST, null), "testHost");
       assertEquals("Did not find expected value", cfg.get(ConfigSolr.CfgProp.SOLR_HOSTCONTEXT, null), "testHostContext");
       assertEquals("Did not find expected value", cfg.getInt(ConfigSolr.CfgProp.SOLR_HOSTPORT, 0), 44);

Added: lucene/dev/branches/lucene_solr_4_6/solr/core/src/test/org/apache/solr/update/MockStreamingSolrServers.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_6/solr/core/src/test/org/apache/solr/update/MockStreamingSolrServers.java?rev=1553984&view=auto
==============================================================================
--- lucene/dev/branches/lucene_solr_4_6/solr/core/src/test/org/apache/solr/update/MockStreamingSolrServers.java (added)
+++ lucene/dev/branches/lucene_solr_4_6/solr/core/src/test/org/apache/solr/update/MockStreamingSolrServers.java Sun Dec 29 00:51:29 2013
@@ -0,0 +1,93 @@
+package org.apache.solr.update;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.SocketException;
+
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.common.util.NamedList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MockStreamingSolrServers extends StreamingSolrServers {
+  public static Logger log = LoggerFactory
+      .getLogger(MockStreamingSolrServers.class);
+  
+  public enum Exp {CONNECT_EXCEPTION, SOCKET_EXCEPTION};
+  
+  private volatile Exp exp = null;
+  
+  public MockStreamingSolrServers(UpdateShardHandler updateShardHandler) {
+    super(updateShardHandler);
+  }
+  
+  @Override
+  public synchronized SolrServer getSolrServer(final SolrCmdDistributor.Req req) {
+    SolrServer server = super.getSolrServer(req);
+    return new MockSolrServer(server);
+  }
+  
+  public void setExp(Exp exp) {
+    this.exp = exp;
+  }
+
+  private IOException exception() {
+    switch (exp) {
+      case CONNECT_EXCEPTION:
+        return new ConnectException();
+      case SOCKET_EXCEPTION:
+        return new SocketException();
+      default:
+        break;
+    }
+    return null;
+  }
+
+  class MockSolrServer extends SolrServer {
+
+    private SolrServer solrServer;
+
+    public MockSolrServer(SolrServer solrServer) {
+      this.solrServer = solrServer;
+    }
+    
+    @Override
+    public NamedList<Object> request(SolrRequest request)
+        throws SolrServerException, IOException {
+      if (exp != null) {
+        if (LuceneTestCase.random().nextBoolean()) {
+          throw exception();
+        } else {
+          throw new SolrServerException(exception());
+        }
+      }
+      
+      return solrServer.request(request);
+    }
+
+
+    @Override
+    public void shutdown() {}
+    
+  }
+}

Modified: lucene/dev/branches/lucene_solr_4_6/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_6/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java?rev=1553984&r1=1553983&r2=1553984&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_6/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java (original)
+++ lucene/dev/branches/lucene_solr_4_6/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java Sun Dec 29 00:51:29 2013
@@ -18,13 +18,14 @@ package org.apache.solr.update;
  */
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.xml.parsers.ParserConfigurationException;
+
 import org.apache.lucene.index.LogDocMergePolicy;
 import org.apache.solr.BaseDistributedSearchTestCase;
 import org.apache.solr.client.solrj.SolrQuery;
@@ -37,10 +38,10 @@ import org.apache.solr.common.cloud.ZkCo
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.SolrjNamedThreadFactory;
+import org.apache.solr.core.ConfigSolr;
 import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.CoresLocator;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.core.SolrEventListener;
 import org.apache.solr.search.SolrIndexSearcher;
@@ -51,6 +52,7 @@ import org.apache.solr.update.SolrCmdDis
 import org.apache.solr.update.SolrCmdDistributor.StdNode;
 import org.apache.solr.update.processor.DistributedUpdateProcessor;
 import org.junit.BeforeClass;
+import org.xml.sax.SAXException;
 
 public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
   @BeforeClass
@@ -60,10 +62,26 @@ public class SolrCmdDistributorTest exte
     // being able to call optimize to have all deletes expunged.
     System.setProperty("solr.tests.mergePolicy", LogDocMergePolicy.class.getName());
   }
-  private ExecutorService updateExecutor = Executors.newCachedThreadPool(
-      new SolrjNamedThreadFactory("updateExecutor"));
+  private UpdateShardHandler updateShardHandler;
   
-  public SolrCmdDistributorTest() {
+  public SolrCmdDistributorTest() throws ParserConfigurationException, IOException, SAXException {
+    updateShardHandler = new UpdateShardHandler(new ConfigSolr() {
+
+      @Override
+      public CoresLocator getCoresLocator() {
+        return null;
+      }
+
+      @Override
+      protected String getShardHandlerFactoryConfigPath() {
+        return null;
+      }
+
+      @Override
+      public boolean isPersistent() {
+        return false;
+      }});
+    
     fixShardCount = true;
     shardCount = 4;
     stress = 0;
@@ -107,7 +125,7 @@ public class SolrCmdDistributorTest exte
   public void doTest() throws Exception {
     del("*:*");
     
-    SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateExecutor);
+    SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateShardHandler);
     
     ModifiableSolrParams params = new ModifiableSolrParams();
 
@@ -147,7 +165,7 @@ public class SolrCmdDistributorTest exte
     nodes.add(new StdNode(new ZkCoreNodeProps(nodeProps)));
     
     // add another 2 docs to control and 3 to client
-    cmdDistrib = new SolrCmdDistributor(updateExecutor);
+    cmdDistrib = new SolrCmdDistributor(updateShardHandler);
     cmd.solrDoc = sdoc("id", 2);
     params = new ModifiableSolrParams();
     params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
@@ -190,7 +208,7 @@ public class SolrCmdDistributorTest exte
     
     
 
-    cmdDistrib = new SolrCmdDistributor(updateExecutor);
+    cmdDistrib = new SolrCmdDistributor(updateShardHandler);
     
     params = new ModifiableSolrParams();
     params.set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
@@ -223,7 +241,7 @@ public class SolrCmdDistributorTest exte
     
     int id = 5;
     
-    cmdDistrib = new SolrCmdDistributor(updateExecutor);
+    cmdDistrib = new SolrCmdDistributor(updateShardHandler);
     
     int cnt = atLeast(303);
     for (int i = 0; i < cnt; i++) {
@@ -295,7 +313,7 @@ public class SolrCmdDistributorTest exte
     }
     
     // Test RetryNode
-    cmdDistrib = new SolrCmdDistributor(updateExecutor);
+    cmdDistrib = new SolrCmdDistributor(updateShardHandler);
     final HttpSolrServer solrclient = (HttpSolrServer) clients.get(0);
     long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults()
         .getNumFound();
@@ -343,7 +361,7 @@ public class SolrCmdDistributorTest exte
   
   @Override
   public void tearDown() throws Exception {
-    ExecutorUtil.shutdownNowAndAwaitTermination(updateExecutor);
+    updateShardHandler.close();
     super.tearDown();
   }
 }

Modified: lucene/dev/branches/lucene_solr_4_6/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_6/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java?rev=1553984&r1=1553983&r2=1553984&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_6/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java (original)
+++ lucene/dev/branches/lucene_solr_4_6/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java Sun Dec 29 00:51:29 2013
@@ -33,12 +33,13 @@ import org.apache.http.auth.AuthScope;
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.params.ClientParamBean;
+import org.apache.http.conn.ClientConnectionManager;
 import org.apache.http.entity.HttpEntityWrapper;
 import org.apache.http.impl.client.DefaultHttpClient;
-import org.apache.http.impl.client.SystemDefaultHttpClient;
 import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
-import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager; // jdoc
+import org.apache.http.impl.client.SystemDefaultHttpClient;
 import org.apache.http.impl.conn.PoolingClientConnectionManager;
+import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager; // jdoc
 import org.apache.http.params.HttpConnectionParams;
 import org.apache.http.protocol.HttpContext;
 import org.apache.solr.common.params.ModifiableSolrParams;
@@ -91,6 +92,7 @@ public class HttpClientUtil {
   public static void setConfigurer(HttpClientConfigurer newConfigurer) {
     configurer = newConfigurer;
   }
+  
   /**
    * Creates new http client by using the provided configuration.
    * 
@@ -107,6 +109,20 @@ public class HttpClientUtil {
     configureClient(httpClient, config);
     return httpClient;
   }
+  
+  /**
+   * Creates new http client by using the provided configuration.
+   * 
+   */
+  public static HttpClient createClient(final SolrParams params, ClientConnectionManager cm) {
+    final ModifiableSolrParams config = new ModifiableSolrParams(params);
+    if (logger.isDebugEnabled()) {
+      logger.debug("Creating new http client, config:" + config);
+    }
+    final DefaultHttpClient httpClient = new DefaultHttpClient(cm);
+    configureClient(httpClient, config);
+    return httpClient;
+  }
 
   /**
    * Configures {@link DefaultHttpClient}, only sets parameters if they are