You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2019/12/10 20:46:09 UTC

[lucene-solr] branch master updated: SOLR-13975, SOLR-13896: ConcurrentUpdateSolrClient connection stall prevention.

This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/master by this push:
     new c4f0c33  SOLR-13975, SOLR-13896: ConcurrentUpdateSolrClient connection stall prevention.
c4f0c33 is described below

commit c4f0c3363828c088eefa2b99783178848c2f1f7a
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Tue Dec 10 21:45:43 2019 +0100

    SOLR-13975, SOLR-13896: ConcurrentUpdateSolrClient connection stall prevention.
---
 solr/CHANGES.txt                                   |  2 +
 .../apache/solr/update/DirectUpdateHandler2.java   |  6 ++
 .../org/apache/solr/update/SolrCmdDistributor.java | 12 ++--
 .../apache/solr/update/StreamingSolrClients.java   |  7 ++-
 .../java/org/apache/solr/util/TestInjection.java   | 15 +++++
 .../cloud/FullThrottleStoppableIndexingThread.java |  3 +
 .../apache/solr/update/SolrCmdDistributorTest.java | 37 +++++++++++
 solr/solr-ref-guide/src/using-solrj.adoc           |  6 ++
 .../impl/ConcurrentUpdateHttp2SolrClient.java      | 69 ++++++++++++++++++++-
 .../solrj/impl/ConcurrentUpdateSolrClient.java     | 72 ++++++++++++++++++++--
 10 files changed, 216 insertions(+), 13 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 0b9af78..20679a0 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -230,6 +230,8 @@ Bug Fixes
 
 * SOLR-13806: SolrJ QueryResponse._explainMap is incorrectly typed. (Guna Sekhar Dorai, ab)
 
+* SOLR-13975, SOLR-13896: ConcurrentUpdateSolrClient connection stall prevention. (ab, caomanhdat)
+
 Other Changes
 ---------------------
 
diff --git a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
index b6232ae..b00dc16 100644
--- a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
+++ b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
@@ -228,6 +228,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
 
   @Override
   public int addDoc(AddUpdateCommand cmd) throws IOException {
+    TestInjection.injectDirectUpdateLatch();
     try {
       return addDoc0(cmd);
     } catch (SolrException e) {
@@ -414,6 +415,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
   // we don't return the number of docs deleted because it's not always possible to quickly know that info.
   @Override
   public void delete(DeleteUpdateCommand cmd) throws IOException {
+    TestInjection.injectDirectUpdateLatch();
     deleteByIdCommands.increment();
     deleteByIdCommandsCumulative.mark();
 
@@ -477,6 +479,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
   // we don't return the number of docs deleted because it's not always possible to quickly know that info.
   @Override
   public void deleteByQuery(DeleteUpdateCommand cmd) throws IOException {
+    TestInjection.injectDirectUpdateLatch();
     deleteByQueryCommands.increment();
     deleteByQueryCommandsCumulative.mark();
     boolean madeIt=false;
@@ -542,6 +545,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
 
   @Override
   public int mergeIndexes(MergeIndexesCommand cmd) throws IOException {
+    TestInjection.injectDirectUpdateLatch();
     mergeIndexesCommands.mark();
     int rc;
 
@@ -605,6 +609,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
 
   @Override
   public void commit(CommitUpdateCommand cmd) throws IOException {
+    TestInjection.injectDirectUpdateLatch();
     if (cmd.prepareCommit) {
       prepareCommit(cmd);
       return;
@@ -754,6 +759,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
    */
   @Override
   public void rollback(RollbackUpdateCommand cmd) throws IOException {
+    TestInjection.injectDirectUpdateLatch();
     if (core.getCoreContainer().isZooKeeperAware()) {
       throw new UnsupportedOperationException("Rollback is currently not supported in SolrCloud mode. (SOLR-4895)");
     }
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index 5098cd1..e4727da 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -93,10 +93,12 @@ public class SolrCmdDistributor implements Closeable {
   
   public void finish() {    
     try {
-      assert ! finished : "lifecycle sanity check";
+      assert !finished : "lifecycle sanity check";
       finished = true;
-      
+
       blockAndDoRetries();
+    } catch (IOException e) {
+      log.warn("Unable to finish sending updates", e);
     } finally {
       clients.shutdown();
     }
@@ -106,7 +108,7 @@ public class SolrCmdDistributor implements Closeable {
     clients.shutdown();
   }
 
-  private void doRetriesIfNeeded() {
+  private void doRetriesIfNeeded() throws IOException {
     // NOTE: retries will be forwards to a single url
     
     List<Error> errors = new ArrayList<>(this.errors);
@@ -259,7 +261,7 @@ public class SolrCmdDistributor implements Closeable {
     
   }
 
-  public void blockAndDoRetries() {
+  public void blockAndDoRetries() throws IOException {
     clients.blockUntilFinished();
     
     // wait for any async commits to complete
@@ -284,7 +286,7 @@ public class SolrCmdDistributor implements Closeable {
         : AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher, cmd.maxOptimizeSegments, cmd.softCommit, cmd.expungeDeletes, cmd.openSearcher);
   }
 
-  private void submit(final Req req, boolean isCommit) {
+  private void submit(final Req req, boolean isCommit) throws IOException {
     // Copy user principal from the original request to the new update request, for later authentication interceptor use
     if (SolrRequestInfo.getRequestInfo() != null) {
       req.uReq.setUserPrincipal(SolrRequestInfo.getRequestInfo().getReq().getUserPrincipal());
diff --git a/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java b/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
index 9d06c95..2a22ee0 100644
--- a/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
+++ b/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.update;
 
+import java.io.IOException;
 import java.io.InputStream;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
@@ -38,6 +39,8 @@ public class StreamingSolrClients {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final int runnerCount = Integer.getInteger("solr.cloud.replication.runners", 1);
+  // should be less than solr.jetty.http.idleTimeout
+  private final int pollQueueTime = Integer.getInteger("solr.cloud.client.pollQueueTime", 10000);
 
   private Http2SolrClient httpClient;
 
@@ -72,14 +75,14 @@ public class StreamingSolrClients {
           .withExecutorService(updateExecutor)
           .alwaysStreamDeletes()
           .build();
-      client.setPollQueueTime(Integer.MAX_VALUE); // minimize connections created
+      client.setPollQueueTime(pollQueueTime); // minimize connections created
       solrClients.put(url, client);
     }
 
     return client;
   }
 
-  public synchronized void blockUntilFinished() {
+  public synchronized void blockUntilFinished() throws IOException {
     for (ConcurrentUpdateHttp2SolrClient client : solrClients.values()) {
       client.blockUntilFinished();
     }
diff --git a/solr/core/src/java/org/apache/solr/util/TestInjection.java b/solr/core/src/java/org/apache/solr/util/TestInjection.java
index 39c849b..9af4a8f 100644
--- a/solr/core/src/java/org/apache/solr/util/TestInjection.java
+++ b/solr/core/src/java/org/apache/solr/util/TestInjection.java
@@ -126,6 +126,8 @@ public class TestInjection {
 
   public volatile static CountDownLatch splitLatch = null;
 
+  public volatile static CountDownLatch directUpdateLatch = null;
+
   public volatile static CountDownLatch reindexLatch = null;
 
   public volatile static String reindexFailure = null;
@@ -164,6 +166,7 @@ public class TestInjection {
     splitFailureBeforeReplicaCreation = null;
     splitFailureAfterReplicaCreation = null;
     splitLatch = null;
+    directUpdateLatch = null;
     reindexLatch = null;
     reindexFailure = null;
     prepRecoveryOpPauseForever = null;
@@ -435,6 +438,18 @@ public class TestInjection {
     return true;
   }
 
+  public static boolean injectDirectUpdateLatch() {
+    if (directUpdateLatch != null) {
+      try {
+        log.info("Waiting in DirectUpdateHandler2 for up to 60s");
+        return directUpdateLatch.await(60, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+    return true;
+  }
+
   public static boolean injectReindexFailure() {
     if (reindexFailure != null)  {
       Random rand = random();
diff --git a/solr/core/src/test/org/apache/solr/cloud/FullThrottleStoppableIndexingThread.java b/solr/core/src/test/org/apache/solr/cloud/FullThrottleStoppableIndexingThread.java
index 78dc1de..1c5d470 100644
--- a/solr/core/src/test/org/apache/solr/cloud/FullThrottleStoppableIndexingThread.java
+++ b/solr/core/src/test/org/apache/solr/cloud/FullThrottleStoppableIndexingThread.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.cloud;
 
+import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.net.ConnectException;
 import java.util.List;
@@ -129,6 +130,8 @@ class FullThrottleStoppableIndexingThread extends StoppableIndexingThread {
     stop = true;
     try {
       cusc.blockUntilFinished();
+    } catch (IOException e) {
+      log.warn("Exception waiting for the indexing client to finish", e);
     } finally {
       cusc.shutdownNow();
     }
diff --git a/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java b/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
index 6926c6f..f6d7087 100644
--- a/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
+++ b/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
@@ -24,6 +24,7 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.xml.parsers.ParserConfigurationException;
@@ -56,6 +57,7 @@ import org.apache.solr.update.SolrCmdDistributor.StdNode;
 import org.apache.solr.update.processor.DistributedUpdateProcessor;
 import org.apache.solr.update.processor.DistributedUpdateProcessor.LeaderRequestReplicationTracker;
 import org.apache.solr.update.processor.DistributedUpdateProcessor.RollupRequestReplicationTracker;
+import org.apache.solr.util.TestInjection;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -73,11 +75,13 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
     // we can't use the Randomized merge policy because the test depends on
     // being able to call optimize to have all deletes expunged.
     systemSetPropertySolrTestsMergePolicyFactory(LogDocMergePolicyFactory.class.getName());
+    System.setProperty("solr.cloud.client.pollQueueTime", "2000");
   }
 
   @AfterClass
   public static void afterClass() {
     systemClearPropertySolrTestsMergePolicyFactory();
+    System.clearProperty("solr.cloud.client.pollQueueTime");
   }
 
   private UpdateShardHandler updateShardHandler;
@@ -356,6 +360,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
     testDeletes(true, true);
     testDeletes(true, false);
     getRfFromResponseShouldNotCloseTheInputStream();
+    testStuckUpdates();
   }
   
   private void testDeletes(boolean dbq, boolean withFailures) throws Exception {
@@ -859,4 +864,36 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
       assertFalse(openSearcher);
     }
   }
+
+  private void testStuckUpdates() throws Exception {
+    TestInjection.directUpdateLatch = new CountDownLatch(1);
+    List<Node> nodes = new ArrayList<>();
+    ModifiableSolrParams params;
+    try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateShardHandler)) {
+      for (int i = 0; i < 3; i++) {
+        nodes.clear();
+        for (SolrClient c : clients) {
+          if (random().nextBoolean()) {
+            continue;
+          }
+          HttpSolrClient httpClient = (HttpSolrClient) c;
+          ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
+              httpClient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
+          StdNode node = new StdNode(new ZkCoreNodeProps(nodeProps));
+          nodes.add(node);
+        }
+        AddUpdateCommand c = new AddUpdateCommand(null);
+        c.solrDoc = sdoc("id", id.incrementAndGet());
+        if (nodes.size() > 0) {
+          params = new ModifiableSolrParams();
+          cmdDistrib.distribAdd(c, nodes, params, false);
+        }
+      }
+      cmdDistrib.blockAndDoRetries();
+    } catch (IOException e) {
+      assertTrue(e.toString(), e.toString().contains("processing has stalled"));
+    } finally {
+      TestInjection.directUpdateLatch.countDown();
+    }
+  }
 }
diff --git a/solr/solr-ref-guide/src/using-solrj.adoc b/solr/solr-ref-guide/src/using-solrj.adoc
index f60664d..8eb82c1 100644
--- a/solr/solr-ref-guide/src/using-solrj.adoc
+++ b/solr/solr-ref-guide/src/using-solrj.adoc
@@ -120,6 +120,12 @@ include::{example-source-dir}UsingSolrJRefGuideExamplesTest.java[tag=solrj-solrc
 
 When these values are not explicitly provided, SolrJ falls back to using the defaults for the OS/environment is running on.
 
+`ConcurrentUpdateSolrClient` and its counterpart `ConcurrentUpdateHttp2SolrClient` implement also a stall prevention
+timeout that allows requests to non-responsive nodes to fail quicker than waiting for a socket timeout.
+The default value of this timeout is set to 15000 ms and can be adjusted by a system property `solr.cloud.client.stallTime`.
+This value should be smaller than `solr.jetty.http.idleTimeout` (Which is 120000 ms by default) and greater than the
+processing time of the largest update request.
+
 === Cloud Request Routing
 
 The SolrJ `CloudSolrClient` implementations (`CloudSolrClient` and `CloudHttp2SolrClient`) respect the <<distributed-requests.adoc#shards-preference-parameter,shards.preference parameter>>.
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java
index 7165e9b..bb0c582 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java
@@ -66,6 +66,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
   private boolean shutdownClient;
   private boolean shutdownExecutor;
   private int pollQueueTime = 250;
+  private int stallTime;
   private final boolean streamDeletes;
   private volatile boolean closed;
   private volatile CountDownLatch lock = null; // used to block everything
@@ -150,6 +151,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
     this.runners = new LinkedList<>();
     this.streamDeletes = builder.streamDeletes;
     this.basePath = builder.baseSolrUrl;
+    this.stallTime = Integer.getInteger("solr.cloud.client.stallTime", 15000);
 
     if (builder.executorService != null) {
       this.scheduler = builder.executorService;
@@ -212,6 +214,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
           try {
             Update update;
             notifyQueueAndRunnersIfEmptyQueue();
+            //log.info("-- polling 1");
             update = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
 
             if (update == null) {
@@ -385,6 +388,8 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
       Update update = new Update(req, collection);
       boolean success = queue.offer(update);
 
+      long lastStallTime = -1;
+      int lastQueueSize = -1;
       for (;;) {
         synchronized (runners) {
           // see if queue is half full and we can add more runners
@@ -418,6 +423,25 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
         if (!success) {
           success = queue.offer(update, 100, TimeUnit.MILLISECONDS);
         }
+        if (!success) {
+          // stall prevention
+          int currentQueueSize = queue.size();
+          if (currentQueueSize != lastQueueSize) {
+            // there's still some progress in processing the queue - not stalled
+            lastQueueSize = currentQueueSize;
+            lastStallTime = -1;
+          } else {
+            if (lastStallTime == -1) {
+              // mark a stall but keep trying
+              lastStallTime = System.nanoTime();
+            } else {
+              long currentStallTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastStallTime);
+              if (currentStallTime > stallTime) {
+                throw new IOException("Request processing has stalled for " + currentStallTime + "ms with " + queue.size() + " remaining elements in the queue.");
+              }
+            }
+          }
+        }
       }
     } catch (InterruptedException e) {
       log.error("interrupted", e);
@@ -430,13 +454,16 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
     return dummy;
   }
 
-  public synchronized void blockUntilFinished() {
+  public synchronized void blockUntilFinished() throws IOException {
     lock = new CountDownLatch(1);
     try {
 
       waitForEmptyQueue();
       interruptRunnerThreadsPolling();
 
+      long lastStallTime = -1;
+      int lastQueueSize = -1;
+
       synchronized (runners) {
 
         // NOTE: if the executor is shut down, runners may never become empty (a scheduled task may never be run,
@@ -452,6 +479,23 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
 
           // Need to check if the queue is empty before really considering this is finished (SOLR-4260)
           int queueSize = queue.size();
+          // stall prevention
+          if (lastQueueSize != queueSize) {
+            // init, or no stall
+            lastQueueSize = queueSize;
+            lastStallTime = -1;
+          } else {
+            if (lastStallTime == -1) {
+              lastStallTime = System.nanoTime();
+            } else {
+              long currentStallTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastStallTime);
+              if (currentStallTime > stallTime) {
+                throw new IOException("Task queue processing has stalled for " + currentStallTime + " ms with " + queueSize + " remaining elements to process.");
+//                Thread.currentThread().interrupt();
+//                break;
+              }
+            }
+          }
           if (queueSize > 0 && runners.isEmpty()) {
             // TODO: can this still happen?
             log.warn("No more runners, but queue still has " +
@@ -485,9 +529,11 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
     }
   }
 
-  private void waitForEmptyQueue() {
+  private void waitForEmptyQueue() throws IOException {
     boolean threadInterrupted = Thread.currentThread().isInterrupted();
 
+    long lastStallTime = -1;
+    int lastQueueSize = -1;
     while (!queue.isEmpty()) {
       if (scheduler.isTerminated()) {
         log.warn("The task queue still has elements but the update scheduler {} is terminated. Can't process any more tasks. "
@@ -513,6 +559,24 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
               queue.size());
         }
       }
+      int currentQueueSize = queue.size();
+      // stall prevention
+      if (currentQueueSize != lastQueueSize) {
+        lastQueueSize = currentQueueSize;
+        lastStallTime = -1;
+      } else {
+        lastQueueSize = currentQueueSize;
+        if (lastStallTime == -1) {
+          lastStallTime = System.nanoTime();
+        } else {
+          long currentStallTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastStallTime);
+          if (currentStallTime > stallTime) {
+            throw new IOException("Task queue processing has stalled for " + currentStallTime + " ms with " + currentQueueSize + " remaining elements to process.");
+//            threadInterrupted = true;
+//            break;
+          }
+        }
+      }
     }
     if (threadInterrupted) {
       Thread.currentThread().interrupt();
@@ -598,6 +662,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
    */
   public void setPollQueueTime(int pollQueueTime) {
     this.pollQueueTime = pollQueueTime;
+    this.stallTime = this.pollQueueTime * 3 / 2;
   }
 
   /**
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
index e19d278..d921cb2 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
@@ -85,6 +85,7 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
   final int threadCount;
   boolean shutdownExecutor = false;
   int pollQueueTime = 250;
+  int stallTime;
   private final boolean streamDeletes;
   private boolean internalHttpClient;
   private volatile Integer connectionTimeout;
@@ -132,7 +133,9 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
     this.streamDeletes = builder.streamDeletes;
     this.connectionTimeout = builder.connectionTimeoutMillis;
     this.soTimeout = builder.socketTimeoutMillis;
-    
+    this.stallTime = Integer.getInteger("solr.cloud.client.stallTime", 15000);
+
+
     if (builder.executorService != null) {
       this.scheduler = builder.executorService;
       this.shutdownExecutor = false;
@@ -518,6 +521,8 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
       Update update = new Update(req, collection);
       boolean success = queue.offer(update);
 
+      long lastStallTime = -1;
+      int lastQueueSize = -1;
       for (;;) {
         synchronized (runners) {
           // see if queue is half full and we can add more runners
@@ -551,6 +556,25 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
         if (!success) {
           success = queue.offer(update, 100, TimeUnit.MILLISECONDS);
         }
+        if (!success) {
+          // stall prevention
+          int currentQueueSize = queue.size();
+          if (currentQueueSize != lastQueueSize) {
+            // there's still some progress in processing the queue - not stalled
+            lastQueueSize = currentQueueSize;
+            lastStallTime = -1;
+          } else {
+            if (lastStallTime == -1) {
+              // mark a stall but keep trying
+              lastStallTime = System.nanoTime();
+            } else {
+              long currentStallTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastStallTime);
+              if (currentStallTime > stallTime) {
+                throw new IOException("Request processing has stalled for " + currentStallTime + "ms with " + queue.size() + " remaining elements in the queue.");
+              }
+            }
+          }
+        }
       }
     } catch (InterruptedException e) {
       log.error("interrupted", e);
@@ -563,13 +587,16 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
     return dummy;
   }
 
-  public synchronized void blockUntilFinished() {
+  public synchronized void blockUntilFinished() throws IOException {
     lock = new CountDownLatch(1);
     try {
 
       waitForEmptyQueue();
       interruptRunnerThreadsPolling();
 
+      long lastStallTime = -1;
+      int lastQueueSize = -1;
+
       synchronized (runners) {
 
         // NOTE: if the executor is shut down, runners may never become empty (a scheduled task may never be run,
@@ -587,6 +614,23 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
           
           // Need to check if the queue is empty before really considering this is finished (SOLR-4260)
           int queueSize = queue.size();
+          // stall prevention
+          if (lastQueueSize != queueSize) {
+            // init, or no stall
+            lastQueueSize = queueSize;
+            lastStallTime = -1;
+          } else {
+            if (lastStallTime == -1) {
+              lastStallTime = System.nanoTime();
+            } else {
+              long currentStallTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastStallTime);
+              if (currentStallTime > stallTime) {
+                throw new IOException("Task queue processing has stalled for " + currentStallTime + " ms with " + queueSize + " remaining elements to process.");
+//                Thread.currentThread().interrupt();
+//                break;
+              }
+            }
+          }
           if (queueSize > 0 && runners.isEmpty()) {
             // TODO: can this still happen?
             log.warn("No more runners, but queue still has " +
@@ -620,9 +664,11 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
     }
   }
 
-  private void waitForEmptyQueue() {
+  private void waitForEmptyQueue() throws IOException {
     boolean threadInterrupted = Thread.currentThread().isInterrupted();
 
+    long lastStallTime = -1;
+    int lastQueueSize = -1;
     while (!queue.isEmpty()) {
       if (log.isDebugEnabled()) emptyQueueLoops.incrementAndGet();
       if (scheduler.isTerminated()) {
@@ -643,12 +689,30 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
         try {
           queue.wait(250);
         } catch (InterruptedException e) {
-          // If we set the thread as interrupted again, the next time the wait it's called i t's going to return immediately
+          // If we set the thread as interrupted again, the next time the wait it's called it's going to return immediately
           threadInterrupted = true;
           log.warn("Thread interrupted while waiting for update queue to be empty. There are still {} elements in the queue.", 
               queue.size());
         }
       }
+      int currentQueueSize = queue.size();
+      // stall prevention
+      if (currentQueueSize != lastQueueSize) {
+        lastQueueSize = currentQueueSize;
+        lastStallTime = -1;
+      } else {
+        lastQueueSize = currentQueueSize;
+        if (lastStallTime == -1) {
+          lastStallTime = System.nanoTime();
+        } else {
+          long currentStallTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastStallTime);
+          if (currentStallTime > stallTime) {
+            throw new IOException("Task queue processing has stalled for " + currentStallTime + " ms with " + currentQueueSize + " remaining elements to process.");
+//            threadInterrupted = true;
+//            break;
+          }
+        }
+      }
     }
     if (threadInterrupted) {
       Thread.currentThread().interrupt();