You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2017/02/22 19:44:33 UTC

[3/3] lucene-solr:master: SOLR-9824: Some bulk update paths could be very slow due to CUSC polling.

SOLR-9824: Some bulk update paths could be very slow due to CUSC polling.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/d6337ac3
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/d6337ac3
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/d6337ac3

Branch: refs/heads/master
Commit: d6337ac3e566c504766d69499ab470bd26744a29
Parents: 2f82409
Author: markrmiller <ma...@apache.org>
Authored: Wed Feb 22 13:00:42 2017 -0500
Committer: markrmiller <ma...@apache.org>
Committed: Wed Feb 22 14:44:18 2017 -0500

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   2 +
 .../handler/loader/ContentStreamLoader.java     |   2 -
 .../solr/handler/loader/JavabinLoader.java      |   3 -
 .../apache/solr/update/AddUpdateCommand.java    |   2 -
 .../apache/solr/update/SolrCmdDistributor.java  |  15 +-
 .../solr/update/StreamingSolrClients.java       |   2 +-
 .../processor/DistributedUpdateProcessor.java   |   7 +
 .../solrj/impl/ConcurrentUpdateSolrClient.java  | 317 ++++++++++++++-----
 8 files changed, 252 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6337ac3/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index a6b5504..dcea40c 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -178,6 +178,8 @@ Bug Fixes
 
 * SOLR-10168: ShardSplit can fail with NPE in OverseerCollectionMessageHandler#waitForCoreAdminAsyncCallToComplete. (Mark Miller)
 
+* SOLR-9824: Some bulk update paths could be very slow due to CUSC polling. (David Smiley, Mark Miller)
+
 Optimizations
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6337ac3/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java b/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java
index 1dd038f..7751b43 100644
--- a/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java
+++ b/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java
@@ -29,8 +29,6 @@ import org.apache.solr.update.processor.UpdateRequestProcessor;
  */
 public abstract class ContentStreamLoader {
 
-  protected static final int pollQueueTime = Integer.getInteger("solr.cloud.replication.poll-queue-time-ms", 25);
-
   /**
    * This should be called once for each RequestHandler
    */

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6337ac3/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java b/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
index 6114280..873bcd1 100644
--- a/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
+++ b/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
@@ -116,9 +116,6 @@ public class JavabinLoader extends ContentStreamLoader {
 
   private AddUpdateCommand getAddCommand(SolrQueryRequest req, SolrParams params) {
     AddUpdateCommand addCmd = new AddUpdateCommand(req);
-    // since we can give a hint to the leader that the end of a batch is being processed, it's OK to have a larger
-    // pollQueueTime than the default 0 since we can optimize around not waiting unnecessarily
-    addCmd.pollQueueTime = pollQueueTime;
     addCmd.overwrite = params.getBool(UpdateParams.OVERWRITE, true);
     addCmd.commitWithin = params.getInt(UpdateParams.COMMIT_WITHIN, -1);
     return addCmd;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6337ac3/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java b/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
index 0ede728..f526397 100644
--- a/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
+++ b/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
@@ -60,8 +60,6 @@ public class AddUpdateCommand extends UpdateCommand implements Iterable<Document
    public int commitWithin = -1;
 
    public boolean isLastDocInBatch = false;
-
-   public int pollQueueTime = 0;
    
    public AddUpdateCommand(SolrQueryRequest req) {
      super(req);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6337ac3/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index 5caf43e..dac4000 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -36,6 +36,7 @@ import org.apache.solr.update.processor.DistributedUpdateProcessor.RequestReplic
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.lang.invoke.MethodHandles;
@@ -51,7 +52,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
 
-public class SolrCmdDistributor {
+public class SolrCmdDistributor implements Closeable {
   private static final int MAX_RETRIES_ON_FORWARD = 25;
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   
@@ -96,6 +97,10 @@ public class SolrCmdDistributor {
       clients.shutdown();
     }
   }
+  
+  public void close() {
+    clients.shutdown();
+  }
 
   private void doRetriesIfNeeded() {
     // NOTE: retries will be forwards to a single url
@@ -210,7 +215,7 @@ public class SolrCmdDistributor {
       if (cmd.isInPlaceUpdate()) {
         params.set(DistributedUpdateProcessor.DISTRIB_INPLACE_PREVVERSION, String.valueOf(cmd.prevVersion));
       }
-      submit(new Req(cmd, node, uReq, synchronous, rrt, cmd.pollQueueTime), false);
+      submit(new Req(cmd, node, uReq, synchronous, rrt), false);
     }
     
   }
@@ -314,19 +319,17 @@ public class SolrCmdDistributor {
     public boolean synchronous;
     public UpdateCommand cmd;
     public RequestReplicationTracker rfTracker;
-    public int pollQueueTime;
 
     public Req(UpdateCommand cmd, Node node, UpdateRequest uReq, boolean synchronous) {
-      this(cmd, node, uReq, synchronous, null, 0);
+      this(cmd, node, uReq, synchronous, null);
     }
     
-    public Req(UpdateCommand cmd, Node node, UpdateRequest uReq, boolean synchronous, RequestReplicationTracker rfTracker, int pollQueueTime) {
+    public Req(UpdateCommand cmd, Node node, UpdateRequest uReq, boolean synchronous, RequestReplicationTracker rfTracker) {
       this.node = node;
       this.uReq = uReq;
       this.synchronous = synchronous;
       this.cmd = cmd;
       this.rfTracker = rfTracker;
-      this.pollQueueTime = pollQueueTime;
     }
     
     public String toString() {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6337ac3/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java b/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
index fc50be2..7c630f4 100644
--- a/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
+++ b/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
@@ -73,9 +73,9 @@ public class StreamingSolrClients {
       // on a greater scale since the current behavior is to only increase the number of connections/Runners when
       // the queue is more than half full.
       client = new ErrorReportingConcurrentUpdateSolrClient(url, httpClient, 100, runnerCount, updateExecutor, true, req);
+      client.setPollQueueTime(Integer.MAX_VALUE); // minimize connections created
       client.setParser(new BinaryResponseParser());
       client.setRequestWriter(new BinaryRequestWriter());
-      client.setPollQueueTime(req.pollQueueTime);
       Set<String> queryParams = new HashSet<>(2);
       queryParams.add(DistributedUpdateProcessor.DISTRIB_FROM);
       queryParams.add(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6337ac3/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index c6ccb71..ec093cf 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -828,6 +828,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     // Given that, it may also make sense to move the version reporting out of this
     // processor too.
   }
+  
+  @Override
+  protected void doClose() {
+    if (cmdDistrib != null) {
+      cmdDistrib.close();
+    }
+  }
  
   // TODO: optionally fail if n replicas are not reached...
   private void doFinish() {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6337ac3/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
index 5c3f289..4eac2a5 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.http.HttpResponse;
 import org.apache.http.HttpStatus;
@@ -87,7 +88,13 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
   private boolean internalHttpClient;
   private volatile Integer connectionTimeout;
   private volatile Integer soTimeout;
-
+  private volatile boolean closed;
+  
+  AtomicInteger pollInterrupts;
+  AtomicInteger pollExits;
+  AtomicInteger blockLoops;
+  AtomicInteger emptyQueueLoops;
+  
   /**
    * Uses an internally managed HttpClient instance.
    * 
@@ -156,6 +163,13 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
       scheduler = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("concurrentUpdateScheduler"));
       shutdownExecutor = true;
     }
+    
+    if (log.isDebugEnabled()) {
+      pollInterrupts = new AtomicInteger();
+      pollExits = new AtomicInteger();
+      blockLoops = new AtomicInteger();
+      emptyQueueLoops = new AtomicInteger();
+    }
   }
 
   public Set<String> getQueryParams() {
@@ -174,13 +188,19 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
    * Opens a connection and sends everything...
    */
   class Runner implements Runnable {
+    volatile Thread thread = null;
+    volatile boolean inPoll = false;
+    
+    public Thread getThread() {
+      return thread;
+    }
+    
     @Override
     public void run() {
+      this.thread = Thread.currentThread();
       log.debug("starting runner: {}", this);
-
       // This loop is so we can continue if an element was added to the queue after the last runner exited.
       for (;;) {
-
         try {
 
           sendUpdateStream();
@@ -191,7 +211,6 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
           }
           handleError(e);
         } finally {
-
           synchronized (runners) {
             // check to see if anything else was added to the queue
             if (runners.size() == 1 && !queue.isEmpty() && !scheduler.isShutdown()) {
@@ -205,26 +224,42 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
               break;
             }
           }
-
         }
       }
 
       log.debug("finished: {}", this);
     }
 
+    public void interruptPoll() {
+      Thread lthread = thread;
+      if (inPoll && lthread != null) {
+        lthread.interrupt();
+      }
+    }
+    
     //
     // Pull from the queue multiple times and streams over a single connection.
     // Exits on exception, interruption, or an empty queue to pull from.
     //
     void sendUpdateStream() throws Exception {
+    
       while (!queue.isEmpty()) {
         HttpPost method = null;
         HttpResponse response = null;
-
+        
         InputStream rspBody = null;
         try {
-          final Update update = 
-              queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
+          Update update;
+          notifyQueueAndRunnersIfEmptyQueue();
+          try {
+            inPoll = true;
+            update = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
+          } catch (InterruptedException e) {
+            if (log.isDebugEnabled()) pollInterrupts.incrementAndGet();
+            continue;
+          } finally {
+            inPoll = false;
+          }
           if (update == null)
             break;
 
@@ -234,61 +269,73 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
           final ModifiableSolrParams origParams = new ModifiableSolrParams(update.getRequest().getParams());
 
           EntityTemplate template = new EntityTemplate(new ContentProducer() {
-
+            
             @Override
             public void writeTo(OutputStream out) throws IOException {
-              try {
-                if (isXml) {
-                  out.write("<stream>".getBytes(StandardCharsets.UTF_8)); // can be anything
+
+              if (isXml) {
+                out.write("<stream>".getBytes(StandardCharsets.UTF_8)); // can be anything
+              }
+              Update upd = update;
+              while (upd != null) {
+                UpdateRequest req = upd.getRequest();
+                SolrParams currentParams = new ModifiableSolrParams(req.getParams());
+                if (!origParams.toNamedList().equals(currentParams.toNamedList())) {
+                  queue.add(upd); // params are different, push back to queue
+                  break;
                 }
-                Update upd = update;
-                while (upd != null) {
-                  UpdateRequest req = upd.getRequest();
-                  SolrParams currentParams = new ModifiableSolrParams(req.getParams());
-                  if (!origParams.toNamedList().equals(currentParams.toNamedList())) {
-                    queue.add(upd); // params are different, push back to queue
-                    break;
-                  }
 
-                  client.requestWriter.write(req, out);
-                  if (isXml) {
-                    // check for commit or optimize
-                    SolrParams params = req.getParams();
-                    if (params != null) {
-                      String fmt = null;
-                      if (params.getBool(UpdateParams.OPTIMIZE, false)) {
-                        fmt = "<optimize waitSearcher=\"%s\" />";
-                      } else if (params.getBool(UpdateParams.COMMIT, false)) {
-                        fmt = "<commit waitSearcher=\"%s\" />";
-                      }
-                      if (fmt != null) {
-                        byte[] content = String.format(Locale.ROOT,
-                            fmt,
-                            params.getBool(UpdateParams.WAIT_SEARCHER, false)
-                                + "").getBytes(StandardCharsets.UTF_8);
-                        out.write(content);
-                      }
+                client.requestWriter.write(req, out);
+                if (isXml) {
+                  // check for commit or optimize
+                  SolrParams params = req.getParams();
+                  if (params != null) {
+                    String fmt = null;
+                    if (params.getBool(UpdateParams.OPTIMIZE, false)) {
+                      fmt = "<optimize waitSearcher=\"%s\" />";
+                    } else if (params.getBool(UpdateParams.COMMIT, false)) {
+                      fmt = "<commit waitSearcher=\"%s\" />";
+                    }
+                    if (fmt != null) {
+                      byte[] content = String.format(Locale.ROOT,
+                          fmt, params.getBool(UpdateParams.WAIT_SEARCHER, false)
+                              + "")
+                          .getBytes(StandardCharsets.UTF_8);
+                      out.write(content);
                     }
                   }
-                  out.flush();
-
-                  if (pollQueueTime > 0 && threadCount == 1 && req.isLastDocInBatch()) {
-                    // no need to wait to see another doc in the queue if we've hit the last doc in a batch
-                    upd = queue.poll(0, TimeUnit.MILLISECONDS);
-                  } else {
-                    upd = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
-                  }
-
                 }
-
-                if (isXml) {
-                  out.write("</stream>".getBytes(StandardCharsets.UTF_8));
+                out.flush();
+
+                notifyQueueAndRunnersIfEmptyQueue();
+                inPoll = true;
+                try {
+                  while (true) {
+                    try {
+                      upd = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
+                      break;
+                    } catch (InterruptedException e) {
+                      if (log.isDebugEnabled()) pollInterrupts.incrementAndGet();
+                      if (!queue.isEmpty()) {
+                        continue;
+                      }
+                      if (log.isDebugEnabled()) pollExits.incrementAndGet();
+                      upd = null;
+                      break;
+                    } finally {
+                      inPoll = false;
+                    }
+                  }
+                }finally {
+                  inPoll = false;
                 }
+              }
 
-              } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                log.warn("", e);
+              if (isXml) {
+                out.write("</stream>".getBytes(StandardCharsets.UTF_8));
               }
+            
+            
             }
           });
 
@@ -318,10 +365,13 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
           method.setEntity(template);
           method.addHeader("User-Agent", HttpSolrClient.AGENT);
           method.addHeader("Content-Type", contentType);
-
+          
+       
           response = client.getHttpClient()
               .execute(method, HttpClientUtil.createNewHttpClientRequestContext());
+          
           rspBody = response.getEntity().getContent();
+            
           int statusCode = response.getStatusLine().getStatusCode();
           if (statusCode != HttpStatus.SC_OK) {
             StringBuilder msg = new StringBuilder();
@@ -364,6 +414,7 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
           } else {
             onSuccess(response);
           }
+          
         } finally {
           try {
             if (response != null) {
@@ -372,10 +423,25 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
           } catch (Exception e) {
             log.error("Error consuming and closing http response stream.", e);
           }
+          notifyQueueAndRunnersIfEmptyQueue();
         }
       }
     }
   }
+  
+  private void notifyQueueAndRunnersIfEmptyQueue() {
+    if (queue.size() == 0) {
+      synchronized (queue) {
+        // queue may be empty
+        queue.notifyAll();
+      }
+      synchronized (runners) {
+        // we notify runners too - if there is a high queue poll time and this is the update
+        // that emptied the queue, we make an attempt to avoid the 250ms timeout in blockUntilFinished
+        runners.notifyAll();
+      }
+    }
+  }
 
   // *must* be called with runners monitor held, e.g. synchronized(runners){ addRunner() }
   private void addRunner() {
@@ -383,7 +449,9 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
     try {
       Runner r = new Runner();
       runners.add(r);
+      
       scheduler.execute(r);  // this can throw an exception if the scheduler has been shutdown, but that should be fine.
+
     } finally {
       MDC.remove("ConcurrentUpdateSolrClient.url");
     }
@@ -517,29 +585,52 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
   public synchronized void blockUntilFinished() {
     lock = new CountDownLatch(1);
     try {
+
+      waitForEmptyQueue();
+      interruptRunnerThreadsPolling();
+
       synchronized (runners) {
 
         // NOTE: if the executor is shut down, runners may never become empty (a scheduled task may never be run,
-        // which means it would never remove itself from the runners list.  This is why we don't wait forever
+        // which means it would never remove itself from the runners list. This is why we don't wait forever
         // and periodically check if the scheduler is shutting down.
+        int loopCount = 0;
         while (!runners.isEmpty()) {
-          try {
-            runners.wait(250);
-          } catch (InterruptedException e) {
-            Thread.interrupted();
-          }
+          
+          if (log.isDebugEnabled()) blockLoops.incrementAndGet();
           
           if (scheduler.isShutdown())
             break;
-                      
+          
+          loopCount++;
+          
           // Need to check if the queue is empty before really considering this is finished (SOLR-4260)
           int queueSize = queue.size();
           if (queueSize > 0 && runners.isEmpty()) {
             // TODO: can this still happen?
-            log.warn("No more runners, but queue still has "+
-              queueSize+" adding more runners to process remaining requests on queue");
+            log.warn("No more runners, but queue still has " +
+                queueSize + " adding more runners to process remaining requests on queue");
             addRunner();
           }
+          
+          interruptRunnerThreadsPolling();
+          
+          // try to avoid the worst case wait timeout
+          // without bad spin
+          int timeout;
+          if (loopCount < 3) {
+            timeout = 10;
+          } else if (loopCount < 10) {
+            timeout = 25;
+          } else {
+            timeout = 250;
+          }
+          
+          try {
+            runners.wait(timeout);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+          }
         }
       }
     } finally {
@@ -548,6 +639,29 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
     }
   }
 
+  private void waitForEmptyQueue() {
+
+    while (!queue.isEmpty()) {
+      if (log.isDebugEnabled()) emptyQueueLoops.incrementAndGet();
+
+      synchronized (runners) {
+        int queueSize = queue.size();
+        if (queueSize > 0 && runners.isEmpty()) {
+          log.warn("No more runners, but queue still has " +
+              queueSize + " adding more runners to process remaining requests on queue");
+          addRunner();
+        }
+      }
+      synchronized (queue) {
+        try {
+          queue.wait(250);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+  }
+
   public void handleError(Throwable ex) {
     log.error("error", ex);
   }
@@ -560,19 +674,42 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
   }
 
   @Override
-  public void close() {
-    if (internalHttpClient) IOUtils.closeQuietly(client);
-    if (shutdownExecutor) {
-      scheduler.shutdown();
-      try {
-        if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
+  public synchronized void close() {
+    if (closed) {
+      interruptRunnerThreadsPolling();
+      return;
+    }
+    closed = true;
+    
+    try {
+      if (shutdownExecutor) {
+        scheduler.shutdown();
+        interruptRunnerThreadsPolling();
+        try {
+          if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
+            scheduler.shutdownNow();
+            if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) log
+                .error("ExecutorService did not terminate");
+          }
+        } catch (InterruptedException ie) {
           scheduler.shutdownNow();
-          if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) log
-              .error("ExecutorService did not terminate");
+          Thread.currentThread().interrupt();
         }
-      } catch (InterruptedException ie) {
-        scheduler.shutdownNow();
-        Thread.currentThread().interrupt();
+      } else {
+        interruptRunnerThreadsPolling();
+      }
+    } finally {
+      if (internalHttpClient) IOUtils.closeQuietly(client);
+      if (log.isDebugEnabled()) {
+        log.debug("STATS pollInteruppts={} pollExists={} blockLoops={} emptyQueueLoops={}", pollInterrupts.get(), pollExits.get(), blockLoops.get(), emptyQueueLoops.get());
+      }
+    }
+  }
+
+  private void interruptRunnerThreadsPolling() {
+    synchronized (runners) {
+      for (Runner runner : runners) {
+        runner.interruptPoll();
       }
     }
   }
@@ -590,17 +727,29 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
   }
 
   public void shutdownNow() {
-    if (internalHttpClient) IOUtils.closeQuietly(client);
-    if (shutdownExecutor) {
-      scheduler.shutdownNow(); // Cancel currently executing tasks
-      try {
-        if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) 
-          log.error("ExecutorService did not terminate");
-      } catch (InterruptedException ie) {
-        scheduler.shutdownNow();
-        Thread.currentThread().interrupt();
+    if (closed) {
+      return;
+    }
+    closed = true;
+    try {
+
+      if (shutdownExecutor) {
+        scheduler.shutdown();
+        interruptRunnerThreadsPolling();
+        scheduler.shutdownNow(); // Cancel currently executing tasks
+        try {
+          if (!scheduler.awaitTermination(30, TimeUnit.SECONDS))
+            log.error("ExecutorService did not terminate");
+        } catch (InterruptedException ie) {
+          scheduler.shutdownNow();
+          Thread.currentThread().interrupt();
+        }
+      } else {
+        interruptRunnerThreadsPolling();
       }
-    }    
+    } finally {
+      if (internalHttpClient) IOUtils.closeQuietly(client);
+    }
   }
   
   public void setParser(ResponseParser responseParser) {