You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/09/03 02:44:38 UTC
[lucene-solr] 04/04: @717 OnQueuedListener missing and a bit of
fallout.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit b361dba39485907744fc0b7f22914a2063906cbb
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Sep 2 21:19:25 2020 -0500
@717 OnQueuedListener missing and a bit of fallout.
# Conflicts:
# solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java
# solr/solrj/src/test/org/apache/solr/client/solrj/embedded/SolrExampleStreamingBinaryHttp2Test.java
---
.../impl/ConcurrentUpdateHttp2SolrClient.java | 167 +++++++++++++--------
.../solr/client/solrj/impl/Http2SolrClient.java | 2 +-
.../SolrExampleStreamingBinaryHttp2Test.java | 5 +-
3 files changed, 112 insertions(+), 62 deletions(-)
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java
index 4385963..2217768 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java
@@ -46,6 +46,7 @@ import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
@@ -69,10 +70,11 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
private boolean shutdownClient;
private boolean shutdownExecutor;
- private int pollQueueTime = 50;
+ private int pollQueueTime = 250;
private int stallTime;
private final boolean streamDeletes;
private volatile boolean closed;
+ private volatile CountDownLatch lock = null; // used to block everything
private static class CustomBlockingQueue<E> implements Iterable<E>{
private final BlockingQueue<E> queue;
@@ -154,7 +156,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
this.runners = new LinkedList<>();
this.streamDeletes = builder.streamDeletes;
this.basePath = builder.baseSolrUrl;
- this.stallTime = Integer.getInteger("solr.cloud.client.stallTime", 0);
+ this.stallTime = Integer.getInteger("solr.cloud.client.stallTime", 15000);
if (stallTime < pollQueueTime * 2) {
throw new RuntimeException("Invalid stallTime: " + stallTime + "ms, must be 2x > pollQueueTime " + pollQueueTime);
}
@@ -175,44 +177,38 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
*/
class Runner implements Runnable {
- volatile boolean isRunning = false;
-
@Override
public void run() {
log.debug("starting runner: {}", this);
- this.isRunning = true;
- try {
- // This loop is so we can continue if an element was added to the queue after the last runner exited.
- for (; ; ) {
- try {
+ // This loop is so we can continue if an element was added to the queue after the last runner exited.
+ for (;;) {
+ try {
- sendUpdateStream();
+ sendUpdateStream();
- } catch (Throwable e) {
- ParWork.propegateInterrupt(e);
- if (e instanceof OutOfMemoryError) {
- throw (OutOfMemoryError) e;
- }
- handleError(e);
- } finally {
- synchronized (runners) {
- // check to see if anything else was added to the queue
- if (runners.size() == 1 && !queue.isEmpty() && !scheduler.isShutdown()) {
- // If there is something else to process, keep last runner alive by staying in the loop.
- } else {
- runners.remove(this);
+ } catch (Throwable e) {
+ if (e instanceof OutOfMemoryError) {
+ throw (OutOfMemoryError) e;
+ }
+ handleError(e);
+ } finally {
+ synchronized (runners) {
+ // check to see if anything else was added to the queue
+ if (runners.size() == 1 && !queue.isEmpty() && !scheduler.isShutdown()) {
+ // If there is something else to process, keep last runner alive by staying in the loop.
+ } else {
+ runners.remove(this);
+ if (runners.isEmpty()) {
// notify anyone waiting in blockUntilFinished
runners.notifyAll();
- break;
}
+ break;
}
}
}
-
- log.debug("finished: {}", this);
- } finally {
- isRunning = false;
}
+
+ log.debug("finished: {}", this);
}
//
@@ -329,14 +325,16 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
}
private void notifyQueueAndRunnersIfEmptyQueue() {
- synchronized (queue) {
- // queue may be empty
- queue.notifyAll();
- }
- synchronized (runners) {
- // we notify runners too - if there is a high queue poll time and this is the update
- // that emptied the queue, we make an attempt to avoid the 250ms timeout in blockUntilFinished
- runners.notifyAll();
+ if (queue.size() == 0) {
+ synchronized (queue) {
+ // queue may be empty
+ queue.notifyAll();
+ }
+ synchronized (runners) {
+ // we notify runners too - if there is a high queue poll time and this is the update
+ // that emptied the queue, we make an attempt to avoid the 250ms timeout in blockUntilFinished
+ runners.notifyAll();
+ }
}
}
@@ -395,10 +393,16 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
}
try {
+ CountDownLatch tmpLock = lock;
+ if (tmpLock != null) {
+ tmpLock.await();
+ }
Update update = new Update(req, collection);
boolean success = queue.offer(update);
+ long lastStallTime = -1;
+ int lastQueueSize = -1;
for (;;) {
synchronized (runners) {
// see if queue is half full and we can add more runners
@@ -430,7 +434,26 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
// start more runners.
//
if (!success) {
- success = queue.offer(update, 5, TimeUnit.MILLISECONDS);
+ success = queue.offer(update, 100, TimeUnit.MILLISECONDS);
+ }
+ if (!success) {
+ // stall prevention
+ int currentQueueSize = queue.size();
+ if (currentQueueSize != lastQueueSize) {
+ // there's still some progress in processing the queue - not stalled
+ lastQueueSize = currentQueueSize;
+ lastStallTime = -1;
+ } else {
+ if (lastStallTime == -1) {
+ // mark a stall but keep trying
+ lastStallTime = System.nanoTime();
+ } else {
+ long currentStallTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastStallTime);
+ if (currentStallTime > stallTime) {
+ throw new IOException("Request processing has stalled for " + currentStallTime + "ms with " + queue.size() + " remaining elements in the queue.");
+ }
+ }
+ }
}
}
} catch (InterruptedException e) {
@@ -446,7 +469,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
}
public synchronized void blockUntilFinished() throws IOException {
-
+ lock = new CountDownLatch(1);
try {
waitForEmptyQueue();
@@ -481,9 +504,8 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
} else {
long currentStallTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastStallTime);
if (currentStallTime > stallTime) {
- throw new IOException("Task queue processing has stalled for " + currentStallTime + " ms with " + queueSize + " remaining elements to process.");
-// Thread.currentThread().interrupt();
-// break;
+ log.warn("Task queue processing has stalled for " + currentStallTime + " ms with " + queueSize + " remaining elements to process.");
+ return;
}
}
}
@@ -515,35 +537,62 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
}
}
} finally {
-
+ lock.countDown();
+ lock = null;
}
}
private void waitForEmptyQueue() throws IOException {
- synchronized (queue) {
- while (!queue.isEmpty()) {
- synchronized (runners) {
- int queueSize = queue.size();
- if (queueSize > 0 && runners.size() == 0) {
- log.warn("No more runners, but queue still has " +
- queueSize + " adding more runners to process remaining requests on queue");
- addRunner();
- }
- }
+ boolean threadInterrupted = Thread.currentThread().isInterrupted();
+
+ long lastStallTime = -1;
+ int lastQueueSize = -1;
+ while (!queue.isEmpty()) {
+ if (scheduler.isTerminated()) {
+ log.warn("The task queue still has elements but the update scheduler {} is terminated. Can't process any more tasks. Queue size: {}, Runners: {}. Current thread Interrupted? {}"
+ , scheduler, queue.size(), runners.size(), threadInterrupted);
+ break;
+ }
+ synchronized (runners) {
+ int queueSize = queue.size();
+ if (queueSize > 0 && runners.isEmpty()) {
+ log.warn("No more runners, but queue still has {} adding more runners to process remaining requests on queue"
+ , queueSize);
+ addRunner();
+ }
+ }
+ synchronized (queue) {
try {
queue.wait(250);
} catch (InterruptedException e) {
- ParWork.propegateInterrupt(e);
- return;
+ // If we set the thread as interrupted again, the next time the wait it's called i t's going to return immediately
+ threadInterrupted = true;
+ log.warn("Thread interrupted while waiting for update queue to be empty. There are still {} elements in the queue.",
+ queue.size());
+ }
+ }
+ int currentQueueSize = queue.size();
+ // stall prevention
+ if (currentQueueSize != lastQueueSize) {
+ lastQueueSize = currentQueueSize;
+ lastStallTime = -1;
+ } else {
+ lastQueueSize = currentQueueSize;
+ if (lastStallTime == -1) {
+ lastStallTime = System.nanoTime();
+ } else {
+ long currentStallTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastStallTime);
+ if (currentStallTime > stallTime) {
+ throw new IOException("Task queue processing has stalled for " + currentStallTime + " ms with " + currentQueueSize + " remaining elements to process.");
+// threadInterrupted = true;
+// break;
+ }
}
}
-
}
- if (scheduler.isTerminated() || scheduler.isShutdown()) {
- log.warn("The task queue still has elements but the update scheduler {} is terminated. Can't process any more tasks. Queue size: {}, Runners: {}. Current thread Interrupted? {}"
- , scheduler, queue.size(), runners.size());
- return;
+ if (threadInterrupted) {
+ Thread.currentThread().interrupt();
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 7e5040a..32b95bd 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -363,7 +363,7 @@ public class Http2SolrClient extends SolrClient {
}
};
asyncTracker.register();
- postRequest.send(responseListener);
+ postRequest.onRequestQueued(asyncTracker.queuedListener).send(responseListener);
boolean isXml = ClientUtils.TEXT_XML.equals(requestWriter.getUpdateContentType());
OutStream outStream = new OutStream(collection, origParams, provider, responseListener,
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/SolrExampleStreamingBinaryHttp2Test.java b/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/SolrExampleStreamingBinaryHttp2Test.java
index 0f5a2ac..239ebd2 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/SolrExampleStreamingBinaryHttp2Test.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/SolrExampleStreamingBinaryHttp2Test.java
@@ -18,6 +18,7 @@
package org.apache.solr.client.solrj.embedded;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import org.apache.lucene.util.LuceneTestCase;
@@ -37,7 +38,7 @@ import org.junit.Test;
@LuceneTestCase.Slow
@SolrTestCaseJ4.SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
-@Ignore // nocommit debug
+@Ignore // nocommit - mrm: some fails to look deeper at here
public class SolrExampleStreamingBinaryHttp2Test extends SolrExampleStreamingHttp2Test {
@Override
@@ -86,7 +87,7 @@ public class SolrExampleStreamingBinaryHttp2Test extends SolrExampleStreamingHtt
assertEquals(1, parentDoc.getChildDocumentCount());
// test streaming
- final List<SolrDocument> docs = new ArrayList<>();
+ final List<SolrDocument> docs = Collections.synchronizedList(new ArrayList<>());
client.queryAndStreamResponse(query, new StreamingResponseCallback() {
@Override
public void streamSolrDocument(SolrDocument doc) {