You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2018/11/29 18:19:13 UTC

[04/16] lucene-solr:master: SOLR-12801: Make massive improvements to the tests.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/SocketProxy.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/SocketProxy.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/SocketProxy.java
new file mode 100644
index 0000000..e4487cf
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/SocketProxy.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.client.solrj.cloud;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.invoke.MethodHandles;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.net.ssl.SSLServerSocketFactory;
+import javax.net.ssl.SSLSocketFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Kindly borrowed the idea and base implementation from the ActiveMQ project;
+ * useful for blocking traffic on a specified port.
+ */
+public class SocketProxy {
+  
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  
+  public static final int ACCEPT_TIMEOUT_MILLIS = 100;
+
+  // should be as large as the HttpShardHandlerFactory socket timeout ... or larger?
+  public static final int PUMP_SOCKET_TIMEOUT_MS = 100 * 1000;
+  
+  private URI proxyUrl;
+  private URI target;
+  
+  private Acceptor acceptor;
+  private ServerSocket serverSocket;
+  
+  private CountDownLatch closed = new CountDownLatch(1);
+  
+  public List<Bridge> connections = new LinkedList<Bridge>();
+  
+  private final int listenPort;
+  
+  private int receiveBufferSize = -1;
+  
+  private boolean pauseAtStart = false;
+  
+  private int acceptBacklog = 50;
+
+  private boolean usesSSL;
+
+  public SocketProxy() throws Exception {
+    this(0, false);
+  }
+  
+  public SocketProxy( boolean useSSL) throws Exception {
+    this(0, useSSL);
+  }
+  
+  public SocketProxy(int port, boolean useSSL) throws Exception {
+    int listenPort = port;
+    this.usesSSL = useSSL;
+    serverSocket = createServerSocket(useSSL);
+    serverSocket.setReuseAddress(true);
+    if (receiveBufferSize > 0) {
+      serverSocket.setReceiveBufferSize(receiveBufferSize);
+    }
+    serverSocket.bind(new InetSocketAddress(listenPort), acceptBacklog);
+    this.listenPort = serverSocket.getLocalPort();
+  }
+  
+  public void open(URI uri) throws Exception {
+    target = uri;
+    proxyUrl = urlFromSocket(target, serverSocket);
+    doOpen();
+  }
+  
+  public String toString() {
+    return "SocketyProxy: port="+listenPort+"; target="+target;
+  }
+    
+  public void setReceiveBufferSize(int receiveBufferSize) {
+    this.receiveBufferSize = receiveBufferSize;
+  }
+  
+  public void setTarget(URI tcpBrokerUri) {
+    target = tcpBrokerUri;
+  }
+  
+  private void doOpen() throws Exception {
+    
+    acceptor = new Acceptor(serverSocket, target);
+    if (pauseAtStart) {
+      acceptor.pause();
+    }
+    new Thread(null, acceptor, "SocketProxy-Acceptor-"
+        + serverSocket.getLocalPort()).start();
+    closed = new CountDownLatch(1);
+  }
+  
+  public int getListenPort() {
+    return listenPort;
+  }
+  
+  private ServerSocket createServerSocket(boolean useSSL) throws Exception {
+    if (useSSL) {
+      return SSLServerSocketFactory.getDefault().createServerSocket();
+    }
+    return new ServerSocket();
+  }
+  
+  private Socket createSocket(boolean useSSL) throws Exception {
+    if (useSSL) {
+      return SSLSocketFactory.getDefault().createSocket();
+    }
+    return new Socket();
+  }
+  
+  public URI getUrl() {
+    return proxyUrl;
+  }
+  
+  /*
+   * close all proxy connections and acceptor
+   */
+  public void close() {
+    List<Bridge> connections;
+    synchronized (this.connections) {
+      connections = new ArrayList<Bridge>(this.connections);
+    }
+    log.warn("Closing " + connections.size()+" connections to: "+getUrl()+", target: "+target);
+    for (Bridge con : connections) {
+      closeConnection(con);
+    }
+    acceptor.close();
+    closed.countDown();
+  }
+  
+  /*
+   * close all proxy receive connections, leaving acceptor open
+   */
+  public void halfClose() {
+    List<Bridge> connections;
+    synchronized (this.connections) {
+      connections = new ArrayList<Bridge>(this.connections);
+    }
+    log.info("halfClose, numConnections=" + connections.size());
+    for (Bridge con : connections) {
+      halfCloseConnection(con);
+    }
+  }
+  
+  public boolean waitUntilClosed(long timeoutSeconds)
+      throws InterruptedException {
+    return closed.await(timeoutSeconds, TimeUnit.SECONDS);
+  }
+  
+  /*
+   * called after a close to restart the acceptor on the same port
+   */
+  public void reopen() {
+    log.info("Re-opening connectivity to "+getUrl());
+    try {
+      if (proxyUrl == null) {
+        throw new IllegalStateException("Can not call open before open(URI uri).");
+      }
+      serverSocket = createServerSocket(usesSSL);
+      serverSocket.setReuseAddress(true);
+      if (receiveBufferSize > 0) {
+        serverSocket.setReceiveBufferSize(receiveBufferSize);
+      }
+      serverSocket.bind(new InetSocketAddress(proxyUrl.getPort()));
+      doOpen();
+    } catch (Exception e) {
+      log.debug("exception on reopen url:" + getUrl(), e);
+    }
+  }
+  
+  /*
+   * pause accepting new connections and data transfer through existing proxy
+   * connections. All sockets remain open
+   */
+  public void pause() {
+    synchronized (connections) {
+      log.info("pause, numConnections=" + connections.size());
+      acceptor.pause();
+      for (Bridge con : connections) {
+        con.pause();
+      }
+    }
+  }
+  
+  /*
+   * continue after pause
+   */
+  public void goOn() {
+    synchronized (connections) {
+      log.info("goOn, numConnections=" + connections.size());
+      for (Bridge con : connections) {
+        con.goOn();
+      }
+    }
+    acceptor.goOn();
+  }
+  
+  private void closeConnection(Bridge c) {
+    try {
+      c.close();
+    } catch (Exception e) {
+      log.debug("exception on close of: " + c, e);
+    }
+  }
+  
+  private void halfCloseConnection(Bridge c) {
+    try {
+      c.halfClose();
+    } catch (Exception e) {
+      log.debug("exception on half close of: " + c, e);
+    }
+  }
+  
+  public boolean isPauseAtStart() {
+    return pauseAtStart;
+  }
+  
+  public void setPauseAtStart(boolean pauseAtStart) {
+    this.pauseAtStart = pauseAtStart;
+  }
+  
+  public int getAcceptBacklog() {
+    return acceptBacklog;
+  }
+  
+  public void setAcceptBacklog(int acceptBacklog) {
+    this.acceptBacklog = acceptBacklog;
+  }
+  
+  private URI urlFromSocket(URI uri, ServerSocket serverSocket)
+      throws Exception {
+    int listenPort = serverSocket.getLocalPort();
+    
+    return new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(),
+        listenPort, uri.getPath(), uri.getQuery(), uri.getFragment());
+  }
+  
+  public class Bridge {
+    
+    private Socket receiveSocket;
+    private Socket sendSocket;
+    private Pump requestThread;
+    private Pump responseThread;
+    
+    public Bridge(Socket socket, URI target) throws Exception {
+      receiveSocket = socket;
+      sendSocket = createSocket(usesSSL);
+      if (receiveBufferSize > 0) {
+        sendSocket.setReceiveBufferSize(receiveBufferSize);
+      }
+      sendSocket.connect(new InetSocketAddress(target.getHost(), target
+          .getPort()));
+      linkWithThreads(receiveSocket, sendSocket);
+      log.info("proxy connection " + sendSocket + ", receiveBufferSize="
+          + sendSocket.getReceiveBufferSize());
+    }
+    
+    public void goOn() {
+      responseThread.goOn();
+      requestThread.goOn();
+    }
+    
+    public void pause() {
+      requestThread.pause();
+      responseThread.pause();
+    }
+    
+    public void close() throws Exception {
+      synchronized (connections) {
+        connections.remove(this);
+      }
+      receiveSocket.close();
+      sendSocket.close();
+    }
+    
+    public void halfClose() throws Exception {
+      receiveSocket.close();
+    }
+    
+    private void linkWithThreads(Socket source, Socket dest) {
+      requestThread = new Pump("Request", source, dest);
+      requestThread.start();
+      responseThread = new Pump("Response", dest, source);
+      responseThread.start();
+    }
+    
+    public class Pump extends Thread {
+      
+      protected Socket src;
+      private Socket destination;
+      private AtomicReference<CountDownLatch> pause = new AtomicReference<CountDownLatch>();
+      
+      public Pump(String kind, Socket source, Socket dest) {
+        super("SocketProxy-"+kind+"-" + source.getPort() + ":"
+            + dest.getPort());
+        src = source;
+        destination = dest;
+        pause.set(new CountDownLatch(0));
+      }
+      
+      public void pause() {
+        pause.set(new CountDownLatch(1));
+      }
+      
+      public void goOn() {
+        pause.get().countDown();
+      }
+      
+      public void run() {
+        byte[] buf = new byte[1024];
+
+        try {
+          src.setSoTimeout(PUMP_SOCKET_TIMEOUT_MS);
+        } catch (SocketException e) {
+          if (e.getMessage().equals("Socket is closed")) {
+            log.warn("Failed to set socket timeout on "+src+" due to: "+e);
+            return;
+          }
+          log.error("Failed to set socket timeout on "+src+" due to: "+e);
+          throw new RuntimeException(e);
+        }
+
+        InputStream in = null;
+        OutputStream out = null;
+        try {
+          in = src.getInputStream();
+          out = destination.getOutputStream();
+          while (true) {
+            int len = -1;
+            try {
+              len = in.read(buf);
+            } catch (SocketTimeoutException ste) {
+              log.warn(ste+" when reading from "+src);
+            }
+
+            if (len == -1) {
+              log.debug("read eof from:" + src);
+              break;
+            }
+            pause.get().await();
+            if (len > 0)
+              out.write(buf, 0, len);
+          }
+        } catch (Exception e) {
+          log.debug("read/write failed, reason: " + e.getLocalizedMessage());
+          try {
+            if (!receiveSocket.isClosed()) {
+              // for halfClose, on read/write failure if we close the
+              // remote end will see a close at the same time.
+              close();
+            }
+          } catch (Exception ignore) {}
+        } finally {
+          if (in != null) {
+            try {
+              in.close();
+            } catch (Exception exc) {
+              log.debug(exc+" when closing InputStream on socket: "+src);
+            }
+          }
+          if (out != null) {
+            try {
+              out.close();
+            } catch (Exception exc) {
+              log.debug(exc+" when closing OutputStream on socket: "+destination);
+            }
+          }
+        }
+      }
+    }
+  }
+  
+  public class Acceptor implements Runnable {
+    
+    private ServerSocket socket;
+    private URI target;
+    private AtomicReference<CountDownLatch> pause = new AtomicReference<CountDownLatch>();
+    
+    public Acceptor(ServerSocket serverSocket, URI uri) {
+      socket = serverSocket;
+      target = uri;
+      pause.set(new CountDownLatch(0));
+      try {
+        socket.setSoTimeout(ACCEPT_TIMEOUT_MILLIS);
+      } catch (SocketException e) {
+        e.printStackTrace();
+      }
+    }
+    
+    public void pause() {
+      pause.set(new CountDownLatch(1));
+    }
+    
+    public void goOn() {
+      pause.get().countDown();
+    }
+    
+    public void run() {
+      try {
+        while (!socket.isClosed()) {
+          pause.get().await();
+          try {
+            Socket source = socket.accept();
+            pause.get().await();
+            if (receiveBufferSize > 0) {
+              source.setReceiveBufferSize(receiveBufferSize);
+            }
+            log.info("accepted " + source + ", receiveBufferSize:"
+                + source.getReceiveBufferSize());
+            synchronized (connections) {
+              connections.add(new Bridge(source, target));
+            }
+          } catch (SocketTimeoutException expected) {}
+        }
+      } catch (Exception e) {
+        log.debug("acceptor: finished for reason: " + e.getLocalizedMessage());
+      }
+    }
+    
+    public void close() {
+      try {
+        socket.close();
+        closed.countDown();
+        goOn();
+      } catch (IOException ignored) {}
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
index 193555a..37cdba7 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
@@ -112,7 +112,7 @@ public class CloudSolrClient extends SolrClient {
   private HttpClient myClient;
   private final boolean clientIsInternal;
   //no of times collection state to be reloaded if stale state error is received
-  private static final int MAX_STALE_RETRIES = 5;
+  private static final int MAX_STALE_RETRIES = Integer.parseInt(System.getProperty("cloudSolrClientMaxStaleRetries", "5"));
   Random rand = new Random();
   
   private final boolean updatesToLeaders;
@@ -212,9 +212,9 @@ public class CloudSolrClient extends SolrClient {
     final DocCollection cached;
     final long cachedAt;
     //This is the time at which the collection is retried and got the same old version
-    long retriedAt = -1;
+    volatile long retriedAt = -1;
     //flag that suggests that this is potentially to be rechecked
-    boolean maybeStale = false;
+    volatile boolean maybeStale = false;
 
     ExpiringCachedDocCollection(DocCollection cached) {
       this.cached = cached;
@@ -916,17 +916,17 @@ public class CloudSolrClient extends SolrClient {
       int errorCode = (rootCause instanceof SolrException) ?
           ((SolrException)rootCause).code() : SolrException.ErrorCode.UNKNOWN.code;
 
-      log.error("Request to collection {} failed due to (" + errorCode + ") {}, retry? " + retryCount,
-          inputCollections, rootCause.toString());
-
-      boolean wasCommError =
-          (rootCause instanceof ConnectException ||
-              rootCause instanceof ConnectTimeoutException ||
-              rootCause instanceof NoHttpResponseException ||
-              rootCause instanceof SocketException);
+          boolean wasCommError =
+              (rootCause instanceof ConnectException ||
+                  rootCause instanceof ConnectTimeoutException ||
+                  rootCause instanceof NoHttpResponseException ||
+                  rootCause instanceof SocketException);
+          
+      log.error("Request to collection {} failed due to (" + errorCode + ") {}, retry={} commError={} errorCode={} ",
+          inputCollections, rootCause.toString(), retryCount, wasCommError, errorCode);
 
       if (wasCommError
-          || (exc instanceof RouteException && (errorCode == 404 || errorCode == 503)) // 404 because the core does not exist 503 service unavailable
+          || (exc instanceof RouteException && (errorCode == 503)) // 404 because the core does not exist 503 service unavailable
           //TODO there are other reasons for 404. We need to change the solr response format from HTML to structured data to know that
           ) {
         // it was a communication error. it is likely that
@@ -946,15 +946,18 @@ public class CloudSolrClient extends SolrClient {
           // and we could not get any information from the server
           //it is probably not worth trying again and again because
           // the state would not have been updated
+          log.info("trying request again");
           return requestWithRetryOnStaleState(request, retryCount + 1, inputCollections);
         }
+      } else {
+        log.info("request was not communication error it seems");
       }
 
       boolean stateWasStale = false;
       if (retryCount < MAX_STALE_RETRIES  &&
           requestedCollections != null    &&
           !requestedCollections.isEmpty() &&
-          SolrException.ErrorCode.getErrorCode(errorCode) == SolrException.ErrorCode.INVALID_STATE)
+          (SolrException.ErrorCode.getErrorCode(errorCode) == SolrException.ErrorCode.INVALID_STATE || errorCode == 404))
       {
         // cached state for one or more external collections was stale
         // re-issue request using updated state

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java
index c97ef94..d415f21 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java
@@ -347,7 +347,7 @@ public class HttpClientUtil {
     HttpClientBuilder retBuilder = builder.setDefaultRequestConfig(requestConfig);
 
     if (config.getBool(HttpClientUtil.PROP_USE_RETRY, true)) {
-      retBuilder = retBuilder.setRetryHandler(new SolrHttpRequestRetryHandler(3));
+      retBuilder = retBuilder.setRetryHandler(new SolrHttpRequestRetryHandler(Integer.getInteger("solr.httpclient.retries", 3)));
 
     } else {
       retBuilder = retBuilder.setRetryHandler(NO_RETRY);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrClient.java
index 6c2737d..b0322a7 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrClient.java
@@ -51,6 +51,7 @@ import org.apache.solr.common.SolrException;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.SolrjNamedThreadFactory;
 import org.slf4j.MDC;
@@ -115,11 +116,11 @@ public class LBHttpSolrClient extends SolrClient {
   private volatile ServerWrapper[] aliveServerList = new ServerWrapper[0];
 
 
-  private ScheduledExecutorService aliveCheckExecutor;
+  private volatile ScheduledExecutorService aliveCheckExecutor;
 
   private final HttpClient httpClient;
   private final boolean clientIsInternal;
-  private HttpSolrClient.Builder httpSolrClientBuilder;
+  private final HttpSolrClient.Builder httpSolrClientBuilder;
   private final AtomicInteger counter = new AtomicInteger(-1);
 
   private static final SolrQuery solrQuery = new SolrQuery("*:*");
@@ -129,7 +130,7 @@ public class LBHttpSolrClient extends SolrClient {
   private Set<String> queryParams = new HashSet<>();
   private Integer connectionTimeout;
 
-  private Integer soTimeout;
+  private volatile Integer soTimeout;
 
   static {
     solrQuery.setRows(0);
@@ -612,9 +613,13 @@ public class LBHttpSolrClient extends SolrClient {
 
   @Override
   public void close() {
-    if (aliveCheckExecutor != null) {
-      aliveCheckExecutor.shutdownNow();
+    synchronized (this) {
+      if (aliveCheckExecutor != null) {
+        aliveCheckExecutor.shutdownNow();
+        ExecutorUtil.shutdownAndAwaitTermination(aliveCheckExecutor);
+      }
     }
+
     if(clientIsInternal) {
       HttpClientUtil.close(httpClient);
     }
@@ -863,16 +868,6 @@ public class LBHttpSolrClient extends SolrClient {
   public RequestWriter getRequestWriter() {
     return requestWriter;
   }
-  
-  @Override
-  protected void finalize() throws Throwable {
-    try {
-      if(this.aliveCheckExecutor!=null)
-        this.aliveCheckExecutor.shutdownNow();
-    } finally {
-      super.finalize();
-    }
-  }
 
   // defaults
   private static final int CHECK_INTERVAL = 60 * 1000; //1 minute between checks

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientBuilder.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientBuilder.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientBuilder.java
index 74e981d..ce44a18 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientBuilder.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientBuilder.java
@@ -24,8 +24,8 @@ public abstract class SolrClientBuilder<B extends SolrClientBuilder<B>> {
 
   protected HttpClient httpClient;
   protected ResponseParser responseParser;
-  protected Integer connectionTimeoutMillis;
-  protected Integer socketTimeoutMillis;
+  protected Integer connectionTimeoutMillis = 15000;
+  protected Integer socketTimeoutMillis = 120000;
 
   /** The solution for the unchecked cast warning. */
   public abstract B getThis();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java
index 8a4b35c..e057c3e 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientNodeStateProvider.java
@@ -19,6 +19,7 @@ package org.apache.solr.client.solrj.impl;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.net.SocketException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -31,6 +32,7 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import org.apache.http.NoHttpResponseException;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.cloud.NodeStateProvider;
@@ -42,6 +44,7 @@ import org.apache.solr.client.solrj.request.GenericSolrRequest;
 import org.apache.solr.client.solrj.response.SimpleSolrResponse;
 import org.apache.solr.common.MapWriter;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.rule.ImplicitSnitch;
@@ -192,9 +195,36 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter
     ModifiableSolrParams params = new ModifiableSolrParams();
     params.add("key", metricsKeyVsTag.keySet().toArray(new String[0]));
     try {
-      SimpleSolrResponse rsp = ctx.invoke(solrNode, CommonParams.METRICS_PATH, params);
+      
+      SimpleSolrResponse rsp = null;
+      int cnt = 0;
+      while (cnt++ < 3) {
+        try {
+          rsp = ctx.invoke(solrNode, CommonParams.METRICS_PATH, params);
+        } catch (SolrException | SolrServerException | NoHttpResponseException e) {
+          boolean hasCauseNoHttpResponseException = false;
+          Throwable cause = e;
+          while (cause != null) {
+            if (cause instanceof NoHttpResponseException) {
+              hasCauseNoHttpResponseException = true;
+              break;
+            }
+            cause = cause.getCause();
+          }
+          if (hasCauseNoHttpResponseException || e instanceof NoHttpResponseException) {
+            log.info("Error on getting remote info, trying again: " + e.getMessage());
+            Thread.sleep(500);
+            continue;
+          } else {
+            throw e;
+          }
+        }
+      }
+      
+      
+      SimpleSolrResponse frsp = rsp;
       metricsKeyVsTag.forEach((key, tag) -> {
-        Object v = Utils.getObjectByPath(rsp.nl, true, Arrays.asList("metrics", key));
+        Object v = Utils.getObjectByPath(frsp.nl, true, Arrays.asList("metrics", key));
         if (tag instanceof Function) {
           Pair<String, Object> p = (Pair<String, Object>) ((Function) tag).apply(v);
           ctx.getTags().put(p.first(), p.second());
@@ -271,7 +301,36 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter
       params.add("prefix", StrUtils.join(prefixes, ','));
 
       try {
-        SimpleSolrResponse rsp = snitchContext.invoke(solrNode, CommonParams.METRICS_PATH, params);
+        SimpleSolrResponse rsp = null;
+        int retries = 5;
+        int cnt = 0;
+        while (cnt++ < retries) {
+          try {
+            rsp = snitchContext.invoke(solrNode, CommonParams.METRICS_PATH, params);
+          } catch (SolrException | SolrServerException | SocketException e) {
+            boolean hasCauseSocketException = false;
+            Throwable cause = e;
+            while (cause != null) {
+              if (cause instanceof SocketException) {
+                hasCauseSocketException = true;
+                break;
+              }
+              cause = cause.getCause();
+            }
+            if (hasCauseSocketException || e instanceof SocketException) {
+              log.info("Error on getting remote info, trying again: " + e.getMessage());
+              Thread.sleep(500);
+              continue;
+            } else {
+              throw e;
+            }
+          }
+        }
+        
+        if (cnt == retries) {
+          throw new SolrException(ErrorCode.SERVER_ERROR, "Could not get remote info after many retries on NoHttpResponseException");
+        }
+                
         Map m = rsp.nl.asMap(4);
         if (requestedTags.contains(FREEDISK.tagName)) {
           Object n = Utils.getObjectByPath(m, true, "metrics/solr.node/CONTAINER.fs.usableSpace");
@@ -298,7 +357,7 @@ public class SolrClientNodeStateProvider implements NodeStateProvider, MapWriter
           if (n != null) ctx.getTags().put(HEAPUSAGE, n.doubleValue() * 100.0d);
         }
       } catch (Exception e) {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error getting remote info", e);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java
index 968e514..53ff466 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.ZkStateReader;
@@ -39,11 +40,14 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
 
-  ZkStateReader zkStateReader;
+  volatile ZkStateReader zkStateReader;
   private boolean closeZkStateReader = true;
   String zkHost;
-  int zkConnectTimeout = 10000;
-  int zkClientTimeout = 10000;
+  int zkConnectTimeout = 15000;
+  int zkClientTimeout = 45000;
+
+
+  private volatile boolean isClosed = false;
 
   public ZkClientClusterStateProvider(ZkStateReader zkStateReader) {
     this.zkStateReader = zkStateReader;
@@ -73,6 +77,7 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
 
   @Override
   public Set<String> getLiveNodes() {
+    if (isClosed) throw new AlreadyClosedException();
     ClusterState clusterState = zkStateReader.getClusterState();
     if (clusterState != null) {
       return clusterState.getLiveNodes();
@@ -175,6 +180,7 @@ public class ZkClientClusterStateProvider implements ClusterStateProvider {
 
   @Override
   public void close() throws IOException {
+    isClosed  = true;
     if (zkStateReader != null && closeZkStateReader) {
       synchronized (this) {
         if (zkStateReader != null)

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkDistribStateManager.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkDistribStateManager.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkDistribStateManager.java
index 613ba25..77bd84c 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkDistribStateManager.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ZkDistribStateManager.java
@@ -29,6 +29,7 @@ import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
 import org.apache.solr.client.solrj.cloud.DistribStateManager;
 import org.apache.solr.client.solrj.cloud.autoscaling.NotEmptyException;
 import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
+import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.AutoScalingParams;
@@ -57,7 +58,8 @@ public class ZkDistribStateManager implements DistribStateManager {
     try {
       return zkClient.exists(path, true);
     } catch (InterruptedException e) {
-      throw e;
+      Thread.currentThread().interrupt();
+      throw new AlreadyClosedException();
     }
   }
 
@@ -68,7 +70,8 @@ public class ZkDistribStateManager implements DistribStateManager {
     } catch (KeeperException.NoNodeException e) {
       throw new NoSuchElementException(path);
     } catch (InterruptedException e) {
-      throw e;
+      Thread.currentThread().interrupt();
+      throw new AlreadyClosedException();
     }
   }
 
@@ -86,7 +89,8 @@ public class ZkDistribStateManager implements DistribStateManager {
     } catch (KeeperException.NoNodeException e) {
       throw new NoSuchElementException(path);
     } catch (InterruptedException e) {
-      throw e;
+      Thread.currentThread().interrupt();
+      throw new AlreadyClosedException();
     }
   }
 
@@ -97,7 +101,8 @@ public class ZkDistribStateManager implements DistribStateManager {
     } catch (KeeperException.NodeExistsException e) {
       throw new AlreadyExistsException(path);
     } catch (InterruptedException e) {
-      throw e;
+      Thread.currentThread().interrupt();
+      throw new AlreadyClosedException();
     }
   }
 
@@ -108,7 +113,8 @@ public class ZkDistribStateManager implements DistribStateManager {
     } catch (KeeperException.NodeExistsException e) {
       throw new AlreadyExistsException(path);
     } catch (InterruptedException e) {
-      throw e;
+      Thread.currentThread().interrupt();
+      throw new AlreadyClosedException();
     }
   }
 
@@ -121,7 +127,8 @@ public class ZkDistribStateManager implements DistribStateManager {
     } catch (KeeperException.NodeExistsException e) {
       throw new AlreadyExistsException(path);
     } catch (InterruptedException e) {
-      throw e;
+      Thread.currentThread().interrupt();
+      throw new AlreadyClosedException();
     }
   }
 
@@ -136,7 +143,8 @@ public class ZkDistribStateManager implements DistribStateManager {
     } catch (KeeperException.BadVersionException e) {
       throw new BadVersionException(version, path);
     } catch (InterruptedException e) {
-      throw e;
+      Thread.currentThread().interrupt();
+      throw new AlreadyClosedException();
     }
   }
 
@@ -149,7 +157,8 @@ public class ZkDistribStateManager implements DistribStateManager {
     } catch (KeeperException.BadVersionException e) {
       throw new BadVersionException(version, path);
     } catch (InterruptedException e) {
-      throw e;
+      Thread.currentThread().interrupt();
+      throw new AlreadyClosedException();
     }
   }
 
@@ -164,7 +173,8 @@ public class ZkDistribStateManager implements DistribStateManager {
     } catch (KeeperException.BadVersionException e) {
       throw new BadVersionException(-1, ops.toString());
     } catch (InterruptedException e) {
-      throw e;
+      Thread.currentThread().interrupt();
+      throw new AlreadyClosedException();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
index a45c5de..a813f30 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
@@ -59,7 +59,7 @@ public class SolrClientCache implements Serializable {
     } else {
       final List<String> hosts = new ArrayList<String>();
       hosts.add(zkHost);
-      CloudSolrClient.Builder builder = new CloudSolrClient.Builder(hosts, Optional.empty());
+      CloudSolrClient.Builder builder = new CloudSolrClient.Builder(hosts, Optional.empty()).withSocketTimeout(30000).withConnectionTimeout(15000);
       if (httpClient != null) {
         builder = builder.withHttpClient(httpClient);
       }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
index 126df81..ee4cb5d 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/FacetStream.java
@@ -484,7 +484,7 @@ public class FacetStream extends TupleStream implements Expressible  {
     } else {
       final List<String> hosts = new ArrayList<>();
       hosts.add(zkHost);
-      cloudSolrClient = new Builder(hosts, Optional.empty()).build();
+      cloudSolrClient = new Builder(hosts, Optional.empty()).withSocketTimeout(30000).withConnectionTimeout(15000).build();
     }
 
     FieldComparator[] adjustedSorts = adjustSorts(buckets, bucketSorts);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java
index 01aa047..052fc30 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/RandomStream.java
@@ -178,7 +178,7 @@ public class RandomStream extends TupleStream implements Expressible  {
     } else {
       final List<String> hosts = new ArrayList<>();
       hosts.add(zkHost);
-      cloudSolrClient = new CloudSolrClient.Builder(hosts, Optional.empty()).build();
+      cloudSolrClient = new CloudSolrClient.Builder(hosts, Optional.empty()).withSocketTimeout(30000).withConnectionTimeout(15000).build();
     }
 
     ModifiableSolrParams params = getParams(this.props);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/common/AlreadyClosedException.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/AlreadyClosedException.java b/solr/solrj/src/java/org/apache/solr/common/AlreadyClosedException.java
new file mode 100644
index 0000000..bdb5429
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/common/AlreadyClosedException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.common;
+
+/**
+ *
+ */
+public class AlreadyClosedException extends IllegalStateException {
+
+  public AlreadyClosedException() {
+    super();
+  }
+  
+  public AlreadyClosedException(String msg) {
+    super(msg);
+  }
+  
+  public AlreadyClosedException(Throwable th) {
+    super(th);
+  }
+  
+  public AlreadyClosedException(String msg, Throwable th) {
+    super(msg, th);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
index 98ddb47..3a55988 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
@@ -73,16 +73,23 @@ public class ConnectionManager implements Watcher {
         || ( stateType == StateType.TRACKING_TIME && (System.nanoTime() - lastDisconnectTime >  TimeUnit.NANOSECONDS.convert(timeToExpire, TimeUnit.MILLISECONDS)));
     }
   }
+  
+  public static abstract class IsClosed {
+    public abstract boolean isClosed();
+  }
 
   private volatile LikelyExpiredState likelyExpiredState = LikelyExpiredState.EXPIRED;
 
-  public ConnectionManager(String name, SolrZkClient client, String zkServerAddress, ZkClientConnectionStrategy strat, OnReconnect onConnect, BeforeReconnect beforeReconnect) {
+  private IsClosed isClosedCheck;
+
+  public ConnectionManager(String name, SolrZkClient client, String zkServerAddress, ZkClientConnectionStrategy strat, OnReconnect onConnect, BeforeReconnect beforeReconnect, IsClosed isClosed) {
     this.name = name;
     this.client = client;
     this.connectionStrategy = strat;
     this.zkServerAddress = zkServerAddress;
     this.onReconnect = onConnect;
     this.beforeReconnect = beforeReconnect;
+    this.isClosedCheck = isClosed;
   }
   
   private synchronized void connected() {
@@ -108,7 +115,7 @@ public class ConnectionManager implements Watcher {
       log.debug("Watcher {} name: {} got event {} path: {} type: {}", this, name, event, event.getPath(), event.getType());
     }
     
-    if (isClosed) {
+    if (isClosed()) {
       log.debug("Client->ZooKeeper status change trigger but we are already closed");
       return;
     }
@@ -120,6 +127,9 @@ public class ConnectionManager implements Watcher {
       connected();
       connectionStrategy.connected();
     } else if (state == Expired) {
+      if (isClosed()) {
+        return;
+      }
       // we don't call disconnected here, because we know we are expired
       connected = false;
       likelyExpiredState = LikelyExpiredState.EXPIRED;
@@ -177,7 +187,7 @@ public class ConnectionManager implements Watcher {
           waitSleep(1000);
         }
         
-      } while (!isClosed);
+      } while (!isClosed());
       log.info("zkClient Connected:" + connected);
     } else if (state == KeeperState.Disconnected) {
       log.warn("zkClient has disconnected");
@@ -188,8 +198,12 @@ public class ConnectionManager implements Watcher {
     }
   }
 
+  public synchronized boolean isConnectedAndNotClosed() {
+    return !isClosed() && connected;
+  }
+  
   public synchronized boolean isConnected() {
-    return !isClosed && connected;
+    return connected;
   }
   
   // we use a volatile rather than sync
@@ -199,8 +213,12 @@ public class ConnectionManager implements Watcher {
     this.likelyExpiredState = LikelyExpiredState.EXPIRED;
   }
   
+  private boolean isClosed() {
+    return isClosed || isClosedCheck.isClosed();
+  }
+  
   public boolean isLikelyExpired() {
-    return isClosed || likelyExpiredState.isLikelyExpired((long) (client.getZkClientTimeout() * 0.90));
+    return isClosed() || likelyExpiredState.isLikelyExpired((long) (client.getZkClientTimeout() * 0.90));
   }
   
   public synchronized void waitSleep(long waitFor) {
@@ -217,7 +235,7 @@ public class ConnectionManager implements Watcher {
     long expire = System.nanoTime() + TimeUnit.NANOSECONDS.convert(waitForConnection, TimeUnit.MILLISECONDS);
     long left = 1;
     while (!connected && left > 0) {
-      if (isClosed) {
+      if (isClosed()) {
         break;
       }
       try {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java
index e16ca68..2ed88e2 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.SolrException;
 import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
@@ -57,6 +58,8 @@ public class DefaultConnectionStrategy extends ZkClientConnectionStrategy {
           .update(zk);
       success = true;
       log.info("Reconnected to ZooKeeper");
+    } catch (AlreadyClosedException e) {
+
     } catch (Exception e) {
       SolrException.log(log, "Reconnect to ZooKeeper failed", e);
       log.warn("Reconnect to ZooKeeper failed");

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
index adf0211..e896272 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
@@ -92,9 +92,9 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
     this.nodeNameLeaderReplicas = new HashMap<>();
     this.nodeNameReplicas = new HashMap<>();
     this.replicationFactor = (Integer) verifyProp(props, REPLICATION_FACTOR);
-    this.numNrtReplicas = (Integer) verifyProp(props, NRT_REPLICAS);
-    this.numTlogReplicas = (Integer) verifyProp(props, TLOG_REPLICAS);
-    this.numPullReplicas = (Integer) verifyProp(props, PULL_REPLICAS);
+    this.numNrtReplicas = (Integer) verifyProp(props, NRT_REPLICAS, 0);
+    this.numTlogReplicas = (Integer) verifyProp(props, TLOG_REPLICAS, 0);
+    this.numPullReplicas = (Integer) verifyProp(props, PULL_REPLICAS, 0);
     this.maxShardsPerNode = (Integer) verifyProp(props, MAX_SHARDS_PER_NODE);
     Boolean autoAddReplicas = (Boolean) verifyProp(props, AUTO_ADD_REPLICAS);
     this.policy = (String) props.get(Policy.POLICY);
@@ -136,10 +136,14 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
       leaderReplicas.add(replica);
     }
   }
-
+  
   public static Object verifyProp(Map<String, Object> props, String propName) {
+    return verifyProp(props, propName, null);
+  }
+
+  public static Object verifyProp(Map<String, Object> props, String propName, Object def) {
     Object o = props.get(propName);
-    if (o == null) return null;
+    if (o == null) return def;
     switch (propName) {
       case MAX_SHARDS_PER_NODE:
       case REPLICATION_FACTOR:

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesListener.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesListener.java b/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesListener.java
index 1cf16e1..8d11b9a 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesListener.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesListener.java
@@ -33,6 +33,8 @@ public interface LiveNodesListener {
    *
    * @param oldLiveNodes set of live nodes before the change
    * @param newLiveNodes set of live nodes after the change
+   * 
+   * @return true if the listener should be removed
    */
-  void onChange(SortedSet<String> oldLiveNodes, SortedSet<String> newLiveNodes);
+  boolean onChange(SortedSet<String> oldLiveNodes, SortedSet<String> newLiveNodes);
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesPredicate.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesPredicate.java b/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesPredicate.java
new file mode 100644
index 0000000..a29e1df
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesPredicate.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+import java.util.SortedSet;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Interface to determine if live nodes matches a required state
+ *
+ * @see ZkStateReader#waitForLiveNodes(long, TimeUnit, LiveNodesPredicate)
+ */
+public interface LiveNodesPredicate {
+
+  boolean matches(SortedSet<String> oldLiveNodes, SortedSet<String> newLiveNodes);
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesWatcher.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesWatcher.java b/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesWatcher.java
new file mode 100644
index 0000000..8de2cce
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/LiveNodesWatcher.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+import java.util.SortedSet;
+
+public interface LiveNodesWatcher {
+
+  boolean onStateChanged(SortedSet<String> oldLiveNodes, SortedSet<String> newLiveNodes);
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
index 2fb2718..d73282b 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
@@ -163,7 +163,7 @@ public class Replica extends ZkNodeProps {
   }
 
   public boolean isActive(Set<String> liveNodes) {
-    return liveNodes.contains(this.nodeName) && this.state == State.ACTIVE;
+    return this.nodeName != null && liveNodes.contains(this.nodeName) && this.state == State.ACTIVE;
   }
   
   public Type getType() {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 1875073..a25fc45 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -16,12 +16,6 @@
  */
 package org.apache.solr.common.cloud;
 
-import javax.xml.transform.OutputKeys;
-import javax.xml.transform.Source;
-import javax.xml.transform.Transformer;
-import javax.xml.transform.TransformerFactory;
-import javax.xml.transform.stream.StreamResult;
-import javax.xml.transform.stream.StreamSource;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
@@ -38,15 +32,24 @@ import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.regex.Pattern;
 
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Source;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.stream.StreamResult;
+import javax.xml.transform.stream.StreamSource;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.StringUtils;
+import org.apache.solr.common.cloud.ConnectionManager.IsClosed;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.common.util.SolrjNamedThreadFactory;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoAuthException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.Op;
@@ -90,6 +93,8 @@ public class SolrZkClient implements Closeable {
   private ZkACLProvider zkACLProvider;
   private String zkServerAddress;
 
+  private IsClosed higherLevelIsClosed;
+
   public int getZkClientTimeout() {
     return zkClientTimeout;
   }
@@ -118,18 +123,18 @@ public class SolrZkClient implements Closeable {
 
   public SolrZkClient(String zkServerAddress, int zkClientTimeout, int clientConnectTimeout,
       ZkClientConnectionStrategy strat, final OnReconnect onReconnect) {
-    this(zkServerAddress, zkClientTimeout, clientConnectTimeout, strat, onReconnect, null, null);
+    this(zkServerAddress, zkClientTimeout, clientConnectTimeout, strat, onReconnect, null, null, null);
   }
 
   public SolrZkClient(String zkServerAddress, int zkClientTimeout, int clientConnectTimeout,
       ZkClientConnectionStrategy strat, final OnReconnect onReconnect, BeforeReconnect beforeReconnect) {
-    this(zkServerAddress, zkClientTimeout, clientConnectTimeout, strat, onReconnect, beforeReconnect, null);
+    this(zkServerAddress, zkClientTimeout, clientConnectTimeout, strat, onReconnect, beforeReconnect, null, null);
   }
 
   public SolrZkClient(String zkServerAddress, int zkClientTimeout, int clientConnectTimeout,
-      ZkClientConnectionStrategy strat, final OnReconnect onReconnect, BeforeReconnect beforeReconnect, ZkACLProvider zkACLProvider) {
+      ZkClientConnectionStrategy strat, final OnReconnect onReconnect, BeforeReconnect beforeReconnect, ZkACLProvider zkACLProvider, IsClosed higherLevelIsClosed) {
     this.zkServerAddress = zkServerAddress;
-    
+    this.higherLevelIsClosed = higherLevelIsClosed;
     if (strat == null) {
       strat = new DefaultConnectionStrategy();
     }
@@ -142,9 +147,21 @@ public class SolrZkClient implements Closeable {
 
     this.zkClientTimeout = zkClientTimeout;
     // we must retry at least as long as the session timeout
-    zkCmdExecutor = new ZkCmdExecutor(zkClientTimeout);
+    zkCmdExecutor = new ZkCmdExecutor(zkClientTimeout, new IsClosed() {
+      
+      @Override
+      public boolean isClosed() {
+        return SolrZkClient.this.isClosed();
+      }
+    });
     connManager = new ConnectionManager("ZooKeeperConnection Watcher:"
-        + zkServerAddress, this, zkServerAddress, strat, onReconnect, beforeReconnect);
+        + zkServerAddress, this, zkServerAddress, strat, onReconnect, beforeReconnect, new IsClosed() {
+          
+          @Override
+          public boolean isClosed() {
+            return SolrZkClient.this.isClosed();
+          }
+        });
 
     try {
       strat.connect(zkServerAddress, zkClientTimeout, wrapWatcher(connManager),
@@ -513,50 +530,46 @@ public class SolrZkClient implements Closeable {
       }
       byte[] bytes = null;
       final String currentPath = sbPath.toString();
-      Object exists = exists(currentPath, watcher, retryOnConnLoss);
-      if (exists == null || ((i == paths.length -1) && failOnExists)) {
-        CreateMode mode = CreateMode.PERSISTENT;
-        if (i == paths.length - 1) {
-          mode = createMode;
-          bytes = data;
-          if (!retryOnConnLoss) retry = false;
-        }
-        try {
-          if (retry) {
-            final CreateMode finalMode = mode;
-            final byte[] finalBytes = bytes;
-            zkCmdExecutor.retryOperation(() -> {
-              keeper.create(currentPath, finalBytes, zkACLProvider.getACLsToAdd(currentPath), finalMode);
-              return null;
-            });
-          } else {
-            keeper.create(currentPath, bytes, zkACLProvider.getACLsToAdd(currentPath), mode);
-          }
-        } catch (NodeExistsException e) {
-
-          if (!failOnExists) {
-            // TODO: version ? for now, don't worry about race
-            setData(currentPath, data, -1, retryOnConnLoss);
-            // set new watch
-            exists(currentPath, watcher, retryOnConnLoss);
-            return;
-          }
 
-          // ignore unless it's the last node in the path
-          if (i == paths.length - 1) {
-            throw e;
-          }
+      CreateMode mode = CreateMode.PERSISTENT;
+      if (i == paths.length - 1) {
+        mode = createMode;
+        bytes = data;
+        if (!retryOnConnLoss) retry = false;
+      }
+      try {
+        if (retry) {
+          final CreateMode finalMode = mode;
+          final byte[] finalBytes = bytes;
+          zkCmdExecutor.retryOperation(() -> {
+            keeper.create(currentPath, finalBytes, zkACLProvider.getACLsToAdd(currentPath), finalMode);
+            return null;
+          });
+        } else {
+          keeper.create(currentPath, bytes, zkACLProvider.getACLsToAdd(currentPath), mode);
         }
-        if(i == paths.length -1) {
+      } catch (NoAuthException e) {
+        // in auth cases, we may not have permission for an earlier part of a path, which is fine
+        if (i == paths.length - 1 || !exists(currentPath, retryOnConnLoss)) {
+ 
+          throw e;
+        }
+      } catch (NodeExistsException e) {
+
+        if (!failOnExists && i == paths.length - 1) {
+          // TODO: version ? for now, don't worry about race
+          setData(currentPath, data, -1, retryOnConnLoss);
           // set new watch
           exists(currentPath, watcher, retryOnConnLoss);
+          return;
+        }
+
+        // ignore unless it's the last node in the path
+        if (i == paths.length - 1) {
+          throw e;
         }
-      } else if (i == paths.length - 1) {
-        // TODO: version ? for now, don't worry about race
-        setData(currentPath, data, -1, retryOnConnLoss);
-        // set new watch
-        exists(currentPath, watcher, retryOnConnLoss);
       }
+
     }
   }
 
@@ -672,16 +685,16 @@ public class SolrZkClient implements Closeable {
     if (isClosed) return; // it's okay if we over close - same as solrcore
     isClosed = true;
     try {
-      closeKeeper(keeper);
+      closeCallbackExecutor();
     } finally {
       connManager.close();
-      closeCallbackExecutor();
+      closeKeeper(keeper);
     }
     assert ObjectReleaseTracker.release(this);
   }
 
   public boolean isClosed() {
-    return isClosed;
+    return isClosed || (higherLevelIsClosed != null && higherLevelIsClosed.isClosed());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZooKeeper.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZooKeeper.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZooKeeper.java
index 268ba2d..a60a275 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZooKeeper.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZooKeeper.java
@@ -93,9 +93,6 @@ public class SolrZooKeeper extends ZooKeeper {
   
   @Override
   public synchronized void close() throws InterruptedException {
-    for (Thread t : spawnedThreads) {
-      if (t.isAlive()) t.interrupt();
-    }
     super.close();
   }
   

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
index c27f767..aaba7ae 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
@@ -16,6 +16,8 @@
  */
 package org.apache.solr.common.cloud;
 
+import org.apache.solr.common.AlreadyClosedException;
+import org.apache.solr.common.cloud.ConnectionManager.IsClosed;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
@@ -25,6 +27,11 @@ public class ZkCmdExecutor {
   private long retryDelay = 1500L; // 1 second would match timeout, so 500 ms over for padding
   private int retryCount;
   private double timeouts;
+  private IsClosed isClosed;
+  
+  public ZkCmdExecutor(int timeoutms) {
+    this(timeoutms, null);
+  }
   
   /**
    * TODO: At this point, this should probably take a SolrZkClient in
@@ -34,9 +41,10 @@ public class ZkCmdExecutor {
    *          the client timeout for the ZooKeeper clients that will be used
    *          with this class.
    */
-  public ZkCmdExecutor(int timeoutms) {
+  public ZkCmdExecutor(int timeoutms, IsClosed isClosed) {
     timeouts = timeoutms / 1000.0;
     this.retryCount = Math.round(0.5f * ((float)Math.sqrt(8.0f * timeouts + 1.0f) - 1.0f)) + 1;
+    this.isClosed = isClosed;
   }
   
   public long getRetryDelay() {
@@ -57,6 +65,9 @@ public class ZkCmdExecutor {
     KeeperException exception = null;
     for (int i = 0; i < retryCount; i++) {
       try {
+        if (i > 0 && isClosed()) {
+          throw new AlreadyClosedException();
+        }
         return (T) operation.execute();
       } catch (KeeperException.ConnectionLossException e) {
         if (exception == null) {
@@ -74,6 +85,10 @@ public class ZkCmdExecutor {
     throw exception;
   }
   
+  private boolean isClosed() {
+    return isClosed != null && isClosed.isClosed();
+  }
+
   public void ensureExists(String path, final SolrZkClient zkClient) throws KeeperException, InterruptedException {
     ensureExists(path, null, CreateMode.PERSISTENT, zkClient, 0);
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 6011f8a..ff53f51 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -45,16 +45,19 @@ import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
 
 import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.Callable;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.params.AutoScalingParams;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.common.util.Pair;
 import org.apache.solr.common.util.SolrjNamedThreadFactory;
 import org.apache.solr.common.util.Utils;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
@@ -142,7 +145,7 @@ public class ZkStateReader implements Closeable {
   protected volatile ClusterState clusterState;
 
   private static final int GET_LEADER_RETRY_INTERVAL_MS = 50;
-  private static final int GET_LEADER_RETRY_DEFAULT_TIMEOUT = 4000;
+  private static final int GET_LEADER_RETRY_DEFAULT_TIMEOUT = Integer.parseInt(System.getProperty("zkReaderGetLeaderRetryTimeoutMs", "4000"));;
 
   public static final String LEADER_ELECT_ZKNODE = "leader_elect";
 
@@ -181,6 +184,8 @@ public class ZkStateReader implements Closeable {
   private Set<CloudCollectionsListener> cloudCollectionsListeners = ConcurrentHashMap.newKeySet();
 
   private final ExecutorService notifications = ExecutorUtil.newMDCAwareCachedThreadPool("watches");
+
+  private Set<LiveNodesListener> liveNodesListeners = ConcurrentHashMap.newKeySet();
   
   /** Used to submit notifications to Collection Properties watchers in order **/
   private final ExecutorService collectionPropsNotifications = ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrjNamedThreadFactory("collectionPropsNotifications"));
@@ -229,8 +234,6 @@ public class ZkStateReader implements Closeable {
 
   }
 
-  private Set<LiveNodesListener> liveNodesListeners = ConcurrentHashMap.newKeySet();
-
   public static final Set<String> KNOWN_CLUSTER_PROPS = unmodifiableSet(new HashSet<>(asList(
       LEGACY_CLOUD,
       URL_SCHEME,
@@ -283,6 +286,8 @@ public class ZkStateReader implements Closeable {
   private final boolean closeClient;
 
   private volatile boolean closed = false;
+  
+  private Set<CountDownLatch> waitLatches = ConcurrentHashMap.newKeySet();
 
   public ZkStateReader(SolrZkClient zkClient) {
     this(zkClient, null);
@@ -293,6 +298,7 @@ public class ZkStateReader implements Closeable {
     this.configManager = new ZkConfigManager(zkClient);
     this.closeClient = false;
     this.securityNodeListener = securityNodeListener;
+    assert ObjectReleaseTracker.track(this);
   }
 
 
@@ -318,6 +324,8 @@ public class ZkStateReader implements Closeable {
     this.configManager = new ZkConfigManager(zkClient);
     this.closeClient = true;
     this.securityNodeListener = null;
+    
+    assert ObjectReleaseTracker.track(this);
   }
 
   public ZkConfigManager getConfigManager() {
@@ -794,12 +802,20 @@ public class ZkStateReader implements Closeable {
       log.debug("Updated live nodes from ZooKeeper... {} -> {}", oldLiveNodes, newLiveNodes);
     }
     if (!oldLiveNodes.equals(newLiveNodes)) { // fire listeners
-      liveNodesListeners.forEach(listener ->
-          listener.onChange(new TreeSet<>(oldLiveNodes), new TreeSet<>(newLiveNodes)));
+      liveNodesListeners.forEach(listener -> {
+        if (listener.onChange(new TreeSet<>(oldLiveNodes), new TreeSet<>(newLiveNodes))) {
+          removeLiveNodesListener(listener);
+        }
+      });
     }
   }
 
   public void registerLiveNodesListener(LiveNodesListener listener) {
+    // fire it once with current live nodes
+    if (listener.onChange(new TreeSet<>(getClusterState().getLiveNodes()), new TreeSet<>(getClusterState().getLiveNodes()))) {
+      removeLiveNodesListener(listener);
+    }
+    
     liveNodesListeners.add(listener);
   }
 
@@ -820,18 +836,30 @@ public class ZkStateReader implements Closeable {
 
   public void close() {
     this.closed  = true;
-    notifications.shutdown();
+    notifications.shutdownNow();
+    
+    waitLatches.parallelStream().forEach(c -> { c.countDown(); });
+    
+    ExecutorUtil.shutdownAndAwaitTermination(notifications);
     ExecutorUtil.shutdownAndAwaitTermination(collectionPropsNotifications);
     if (closeClient) {
       zkClient.close();
     }
+    assert ObjectReleaseTracker.release(this);
   }
   
   public String getLeaderUrl(String collection, String shard, int timeout) throws InterruptedException {
     ZkCoreNodeProps props = new ZkCoreNodeProps(getLeaderRetry(collection, shard, timeout));
     return props.getCoreUrl();
   }
-
+  
+  public Replica getLeader(Set<String> liveNodes, DocCollection docCollection, String shard) {
+    Replica replica = docCollection != null ? docCollection.getLeader(shard) : null;
+    if (replica != null && liveNodes.contains(replica.getNodeName())) {
+      return replica;
+    }
+    return null;
+  }
   public Replica getLeader(String collection, String shard) {
     if (clusterState != null) {
       DocCollection docCollection = clusterState.getCollectionOrNull(collection);
@@ -854,16 +882,25 @@ public class ZkStateReader implements Closeable {
    * Get shard leader properties, with retry if none exist.
    */
   public Replica getLeaderRetry(String collection, String shard, int timeout) throws InterruptedException {
-    long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS);
-    while (true) {
-      Replica leader = getLeader(collection, shard);
-      if (leader != null) return leader;
-      if (System.nanoTime() >= timeoutAt || closed) break;
-      Thread.sleep(GET_LEADER_RETRY_INTERVAL_MS);
+
+    AtomicReference<Replica> leader = new AtomicReference<>();
+    try {
+      waitForState(collection, timeout, TimeUnit.MILLISECONDS, (n, c) -> {
+        if (c == null)
+          return false;
+        Replica l = getLeader(n, c, shard);
+        if (l != null) {
+          leader.set(l);
+          return true;
+        }
+        return false;
+      });
+    } catch (TimeoutException | InterruptedException e) {
+      throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "No registered leader was found after waiting for "
+          + timeout + "ms " + ", collection: " + collection + " slice: " + shard + " saw state=" + clusterState.getCollectionOrNull(collection)
+          + " with live_nodes=" + clusterState.getLiveNodes());
     }
-    throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "No registered leader was found after waiting for "
-        + timeout + "ms " + ", collection: " + collection + " slice: " + shard + " saw state=" + clusterState.getCollectionOrNull(collection)
-        + " with live_nodes=" + clusterState.getLiveNodes());
+    return leader.get();
   }
 
   /**
@@ -1257,6 +1294,10 @@ public class ZkStateReader implements Closeable {
 
     @Override
     public void process(WatchedEvent event) {
+      if (ZkStateReader.this.closed) {
+        return;
+      }
+      
       // session events are not change events, and do not remove the watcher
       if (EventType.None.equals(event.getType())) {
         return;
@@ -1457,13 +1498,20 @@ public class ZkStateReader implements Closeable {
    */
   public void waitForState(final String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate)
       throws InterruptedException, TimeoutException {
-
+    
+    if (closed) {
+      throw new AlreadyClosedException();
+    }
+    
     final CountDownLatch latch = new CountDownLatch(1);
-
+    waitLatches.add(latch);
+    AtomicReference<DocCollection> docCollection = new AtomicReference<>();
     CollectionStateWatcher watcher = (n, c) -> {
+      docCollection.set(c);
       boolean matches = predicate.matches(n, c);
       if (matches)
         latch.countDown();
+      
       return matches;
     };
     registerCollectionStateWatcher(collection, watcher);
@@ -1471,15 +1519,61 @@ public class ZkStateReader implements Closeable {
     try {
       // wait for the watcher predicate to return true, or time out
       if (!latch.await(wait, unit))
-        throw new TimeoutException();
+        throw new TimeoutException("Timeout waiting to see state for collection=" + collection + " :" + docCollection.get());
 
     }
     finally {
       removeCollectionStateWatcher(collection, watcher);
+      waitLatches.remove(latch);
     }
   }
 
   /**
+   * Block until a LiveNodesStatePredicate returns true, or the wait times out
+   *
+   * Note that the predicate may be called again even after it has returned true, so
+   * implementors should avoid changing state within the predicate call itself.
+   *
+   * @param wait       how long to wait
+   * @param unit       the units of the wait parameter
+   * @param predicate  the predicate to call on state changes
+   * @throws InterruptedException on interrupt
+   * @throws TimeoutException on timeout
+   */
+  public void waitForLiveNodes(long wait, TimeUnit unit, LiveNodesPredicate predicate)
+      throws InterruptedException, TimeoutException {
+    
+    if (closed) {
+      throw new AlreadyClosedException();
+    }
+    
+    final CountDownLatch latch = new CountDownLatch(1);
+    waitLatches.add(latch);
+
+    
+    LiveNodesListener listener = (o, n) -> {
+      boolean matches = predicate.matches(o, n);
+      if (matches)
+        latch.countDown();
+      return matches;
+    };
+    
+    registerLiveNodesListener(listener);
+
+    try {
+      // wait for the watcher predicate to return true, or time out
+      if (!latch.await(wait, unit))
+        throw new TimeoutException("Timeout waiting for live nodes, currently they are: " + getClusterState().getLiveNodes());
+
+    }
+    finally {
+      removeLiveNodesListener(listener);
+      waitLatches.remove(latch);
+    }
+  }
+
+  
+  /**
    * Remove a watcher from a collection's watch list.
    *
    * This allows Zookeeper watches to be removed if there is no interest in the
@@ -1611,6 +1705,9 @@ public class ZkStateReader implements Closeable {
   }
 
   private void notifyStateWatchers(Set<String> liveNodes, String collection, DocCollection collectionState) {
+    if (this.closed) {
+      return;
+    }
     try {
       notifications.submit(new Notification(liveNodes, collection, collectionState));
     }
@@ -1786,6 +1883,8 @@ public class ZkStateReader implements Closeable {
         final byte[] data = zkClient.getData(ALIASES, this, stat, true);
         // note: it'd be nice to avoid possibly needlessly parsing if we don't update aliases but not a big deal
         setIfNewer(Aliases.fromJSON(data, stat.getVersion()));
+      } catch (NoNodeException e) {
+        // /aliases.json will not always exist
       } catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) {
         // note: aliases.json is required to be present
         log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage());

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/UsingSolrJRefGuideExamplesTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/UsingSolrJRefGuideExamplesTest.java b/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/UsingSolrJRefGuideExamplesTest.java
index b45e702..c87bb87 100644
--- a/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/UsingSolrJRefGuideExamplesTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/ref_guide_examples/UsingSolrJRefGuideExamplesTest.java
@@ -68,6 +68,7 @@ public class UsingSolrJRefGuideExamplesTest extends SolrCloudTestCase {
 
     CollectionAdminResponse response = CollectionAdminRequest.createCollection("techproducts", "conf", 1, 1)
         .process(cluster.getSolrClient());
+    cluster.waitForActiveCollection("techproducts", 1, 1);
   }
 
   @Before

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleBinaryTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleBinaryTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleBinaryTest.java
index b1f1ee9..a4bd61a 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleBinaryTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleBinaryTest.java
@@ -31,7 +31,7 @@ import org.junit.BeforeClass;
 public class SolrExampleBinaryTest extends SolrExampleTests {
   @BeforeClass
   public static void beforeTest() throws Exception {
-    createJetty(legacyExampleCollection1SolrHome());
+    createAndStartJetty(legacyExampleCollection1SolrHome());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleXMLTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleXMLTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleXMLTest.java
index 5290347..538255b 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleXMLTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/SolrExampleXMLTest.java
@@ -30,7 +30,7 @@ import org.junit.BeforeClass;
 public class SolrExampleXMLTest extends SolrExampleTests {
   @BeforeClass
   public static void beforeTest() throws Exception {
-    createJetty(legacyExampleCollection1SolrHome());
+    createAndStartJetty(legacyExampleCollection1SolrHome());
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/test/org/apache/solr/client/solrj/SolrSchemalessExampleTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/SolrSchemalessExampleTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/SolrSchemalessExampleTest.java
index 47faf78..55d83c3 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/SolrSchemalessExampleTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/SolrSchemalessExampleTest.java
@@ -65,7 +65,7 @@ public class SolrSchemalessExampleTest extends SolrExampleTestsBase {
         } catch (Exception ignore){}
       }
     }
-    createJetty(tempSolrHome.getAbsolutePath());
+    createAndStartJetty(tempSolrHome.getAbsolutePath());
   }
   @Test
   public void testArbitraryJsonIndexing() throws Exception  {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/test/org/apache/solr/client/solrj/TestBatchUpdate.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/TestBatchUpdate.java b/solr/solrj/src/test/org/apache/solr/client/solrj/TestBatchUpdate.java
index a47b1ef..3e6f03d 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/TestBatchUpdate.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/TestBatchUpdate.java
@@ -41,7 +41,7 @@ public class TestBatchUpdate extends SolrJettyTestBase {
 
   @BeforeClass
   public static void beforeTest() throws Exception {
-    createJetty(legacyExampleCollection1SolrHome());
+    createAndStartJetty(legacyExampleCollection1SolrHome());
   }
 
   static final int numdocs = 1000;  

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/test/org/apache/solr/client/solrj/TestLBHttpSolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/TestLBHttpSolrClient.java b/solr/solrj/src/test/org/apache/solr/client/solrj/TestLBHttpSolrClient.java
index 84aff76..d739c0e 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/TestLBHttpSolrClient.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/TestLBHttpSolrClient.java
@@ -134,69 +134,71 @@ public class TestLBHttpSolrClient extends SolrTestCaseJ4 {
     for (int i = 0; i < solr.length; i++) {
       s[i] = solr[i].getUrl();
     }
-    LBHttpSolrClient client = getLBHttpSolrClient(httpClient, s);
-    client.setAliveCheckInterval(500);
-    SolrQuery solrQuery = new SolrQuery("*:*");
-    Set<String> names = new HashSet<>();
-    QueryResponse resp = null;
-    for (String value : s) {
-      resp = client.query(solrQuery);
-      assertEquals(10, resp.getResults().getNumFound());
-      names.add(resp.getResults().get(0).getFieldValue("name").toString());
-    }
-    assertEquals(3, names.size());
+    try (LBHttpSolrClient client = getLBHttpSolrClient(httpClient, s)) {
+      client.setAliveCheckInterval(500);
+      SolrQuery solrQuery = new SolrQuery("*:*");
+      Set<String> names = new HashSet<>();
+      QueryResponse resp = null;
+      for (String value : s) {
+        resp = client.query(solrQuery);
+        assertEquals(10, resp.getResults().getNumFound());
+        names.add(resp.getResults().get(0).getFieldValue("name").toString());
+      }
+      assertEquals(3, names.size());
 
-    // Kill a server and test again
-    solr[1].jetty.stop();
-    solr[1].jetty = null;
-    names.clear();
-    for (String value : s) {
-      resp = client.query(solrQuery);
-      assertEquals(10, resp.getResults().getNumFound());
-      names.add(resp.getResults().get(0).getFieldValue("name").toString());
-    }
-    assertEquals(2, names.size());
-    assertFalse(names.contains("solr1"));
-
-    // Start the killed server once again
-    solr[1].startJetty();
-    // Wait for the alive check to complete
-    Thread.sleep(1200);
-    names.clear();
-    for (String value : s) {
-      resp = client.query(solrQuery);
-      assertEquals(10, resp.getResults().getNumFound());
-      names.add(resp.getResults().get(0).getFieldValue("name").toString());
+      // Kill a server and test again
+      solr[1].jetty.stop();
+      solr[1].jetty = null;
+      names.clear();
+      for (String value : s) {
+        resp = client.query(solrQuery);
+        assertEquals(10, resp.getResults().getNumFound());
+        names.add(resp.getResults().get(0).getFieldValue("name").toString());
+      }
+      assertEquals(2, names.size());
+      assertFalse(names.contains("solr1"));
+
+      // Start the killed server once again
+      solr[1].startJetty();
+      // Wait for the alive check to complete
+      Thread.sleep(1200);
+      names.clear();
+      for (String value : s) {
+        resp = client.query(solrQuery);
+        assertEquals(10, resp.getResults().getNumFound());
+        names.add(resp.getResults().get(0).getFieldValue("name").toString());
+      }
+      assertEquals(3, names.size());
     }
-    assertEquals(3, names.size());
   }
 
   public void testTwoServers() throws Exception {
-    LBHttpSolrClient client = getLBHttpSolrClient(httpClient, solr[0].getUrl(), solr[1].getUrl());
-    client.setAliveCheckInterval(500);
-    SolrQuery solrQuery = new SolrQuery("*:*");
-    QueryResponse resp = null;
-    solr[0].jetty.stop();
-    solr[0].jetty = null;
-    resp = client.query(solrQuery);
-    String name = resp.getResults().get(0).getFieldValue("name").toString();
-    Assert.assertEquals("solr/collection11", name);
-    resp = client.query(solrQuery);
-    name = resp.getResults().get(0).getFieldValue("name").toString();
-    Assert.assertEquals("solr/collection11", name);
-    solr[1].jetty.stop();
-    solr[1].jetty = null;
-    solr[0].startJetty();
-    Thread.sleep(1200);
-    try {
+    try (LBHttpSolrClient client = getLBHttpSolrClient(httpClient, solr[0].getUrl(), solr[1].getUrl())) {
+      client.setAliveCheckInterval(500);
+      SolrQuery solrQuery = new SolrQuery("*:*");
+      QueryResponse resp = null;
+      solr[0].jetty.stop();
+      solr[0].jetty = null;
       resp = client.query(solrQuery);
-    } catch(SolrServerException e) {
-      // try again after a pause in case the error is lack of time to start server
-      Thread.sleep(3000);
+      String name = resp.getResults().get(0).getFieldValue("name").toString();
+      Assert.assertEquals("solr/collection11", name);
       resp = client.query(solrQuery);
+      name = resp.getResults().get(0).getFieldValue("name").toString();
+      Assert.assertEquals("solr/collection11", name);
+      solr[1].jetty.stop();
+      solr[1].jetty = null;
+      solr[0].startJetty();
+      Thread.sleep(1200);
+      try {
+        resp = client.query(solrQuery);
+      } catch (SolrServerException e) {
+        // try again after a pause in case the error is lack of time to start server
+        Thread.sleep(3000);
+        resp = client.query(solrQuery);
+      }
+      name = resp.getResults().get(0).getFieldValue("name").toString();
+      Assert.assertEquals("solr/collection10", name);
     }
-    name = resp.getResults().get(0).getFieldValue("name").toString();
-    Assert.assertEquals("solr/collection10", name);
   }
 
   public void testReliability() throws Exception {
@@ -207,21 +209,22 @@ public class TestLBHttpSolrClient extends SolrTestCaseJ4 {
 
     CloseableHttpClient myHttpClient = HttpClientUtil.createClient(null);
     try {
-      LBHttpSolrClient client = getLBHttpSolrClient(myHttpClient, 500, 500, s);
-      client.setAliveCheckInterval(500);
-  
-      // Kill a server and test again
-      solr[1].jetty.stop();
-      solr[1].jetty = null;
-  
-      // query the servers
-      for (String value : s)
-        client.query(new SolrQuery("*:*"));
-  
-      // Start the killed server once again
-      solr[1].startJetty();
-      // Wait for the alive check to complete
-      waitForServer(30, client, 3, solr[1].name);
+      try (LBHttpSolrClient client = getLBHttpSolrClient(myHttpClient, 500, 500, s)) {
+        client.setAliveCheckInterval(500);
+
+        // Kill a server and test again
+        solr[1].jetty.stop();
+        solr[1].jetty = null;
+
+        // query the servers
+        for (String value : s)
+          client.query(new SolrQuery("*:*"));
+
+        // Start the killed server once again
+        solr[1].startJetty();
+        // Wait for the alive check to complete
+        waitForServer(30, client, 3, solr[1].name);
+      }
     } finally {
       HttpClientUtil.close(myHttpClient);
     }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/test/org/apache/solr/client/solrj/TestSolrJErrorHandling.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/TestSolrJErrorHandling.java b/solr/solrj/src/test/org/apache/solr/client/solrj/TestSolrJErrorHandling.java
index a9c7fb1..0b36569 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/TestSolrJErrorHandling.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/TestSolrJErrorHandling.java
@@ -58,7 +58,7 @@ public class TestSolrJErrorHandling extends SolrJettyTestBase {
 
   @BeforeClass
   public static void beforeTest() throws Exception {
-    createJetty(legacyExampleCollection1SolrHome());
+    createAndStartJetty(legacyExampleCollection1SolrHome());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/LargeVolumeBinaryJettyTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/LargeVolumeBinaryJettyTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/LargeVolumeBinaryJettyTest.java
index fc28449..ebe2693 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/LargeVolumeBinaryJettyTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/LargeVolumeBinaryJettyTest.java
@@ -28,6 +28,6 @@ import org.junit.BeforeClass;
 public class LargeVolumeBinaryJettyTest extends LargeVolumeTestBase {
   @BeforeClass
   public static void beforeTest() throws Exception {
-    createJetty(legacyExampleCollection1SolrHome());
+    createAndStartJetty(legacyExampleCollection1SolrHome());
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/75b18319/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/LargeVolumeJettyTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/LargeVolumeJettyTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/LargeVolumeJettyTest.java
index 02764fb..5c7f36a 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/LargeVolumeJettyTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/LargeVolumeJettyTest.java
@@ -25,6 +25,6 @@ import org.junit.BeforeClass;
 public class LargeVolumeJettyTest extends LargeVolumeTestBase {
   @BeforeClass
   public static void beforeTest() throws Exception {
-    createJetty(legacyExampleCollection1SolrHome());
+    createAndStartJetty(legacyExampleCollection1SolrHome());
   }
 }