You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/09/03 02:44:34 UTC

[lucene-solr] branch reference_impl updated (e216019 -> b361dba)

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a change to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git.


    from e216019  @714 One more try to beat this stop watching collection we want to use race.
     new b0dc655  @715 Try to address issue where we can try to unregister a party that is not outstanding.
     new 32debc5  Have the CoreAdminHandler create the async_id znode the OverseerCollectionMessageHandler#waitForCoreAdminAsyncCallToComplete is waiting on
     new 01a120f  @716 Bring this test back.
     new b361dba  @717 OnQueuedListener missing and a bit of fallout.

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../OverseerCollectionMessageHandler.java          |   9 +-
 .../solr/handler/admin/CoreAdminHandler.java       |  11 ++
 .../AsyncCallRequestStatusResponseTest.java        |   1 -
 .../impl/ConcurrentUpdateHttp2SolrClient.java      | 167 +++++++++++++--------
 .../solr/client/solrj/impl/Http2SolrClient.java    |   3 +-
 .../SolrExampleStreamingBinaryHttp2Test.java       |   5 +-
 6 files changed, 128 insertions(+), 68 deletions(-)


[lucene-solr] 02/04: Have the CoreAdminHandler create the async_id znode the OverseerCollectionMessageHandler#waitForCoreAdminAsyncCallToComplete is waiting on

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 32debc5759508797639c7ca6d71caa8240631ede
Author: Timothy Potter <th...@gmail.com>
AuthorDate: Wed Sep 2 16:44:06 2020 -0600

    Have the CoreAdminHandler create the async_id znode the OverseerCollectionMessageHandler#waitForCoreAdminAsyncCallToComplete is waiting on
---
 .../api/collections/OverseerCollectionMessageHandler.java     |  9 +++++----
 .../java/org/apache/solr/handler/admin/CoreAdminHandler.java  | 11 +++++++++++
 2 files changed, 16 insertions(+), 4 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 44823db..a90a8e1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -888,6 +888,9 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
       sreq.params = params;
       CountDownLatch latch = new CountDownLatch(1);
 
+      // mn- from DistributedMap
+      final String asyncPathToWaitOn = Overseer.OVERSEER_ASYNC_IDS + "/mn-" + requestId;
+
       Watcher waitForAsyncId = new Watcher() {
         @Override
         public void process(WatchedEvent event) {
@@ -899,7 +902,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
           } else {
             Stat rstats2 = null;
             try {
-              rstats2 = zkStateReader.getZkClient().exists(Overseer.OVERSEER_ASYNC_IDS + "/" + requestId, this);
+              rstats2 = zkStateReader.getZkClient().exists(asyncPathToWaitOn, this);
             } catch (KeeperException e) {
               log.error("ZooKeeper exception", e);
             } catch (InterruptedException e) {
@@ -913,14 +916,12 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
         }
       };
 
-      Stat rstats = zkStateReader.getZkClient().exists(Overseer.OVERSEER_ASYNC_IDS + "/" + requestId, waitForAsyncId);
+      Stat rstats = zkStateReader.getZkClient().exists(asyncPathToWaitOn, waitForAsyncId);
 
       if (rstats != null) {
         latch.countDown();
       }
 
-      // TJP TODO: Getting weird timeout issues when trying to delete a collection that was
-      // created using processAndWait b/c of this latch.await ... need to dig in further.
       latch.await(15, TimeUnit.SECONDS); // nocommit - still need a central timeout strat
 
       shardHandler.submit(sreq, replica, sreq.params);
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
index 4c69a94..309fc71 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
@@ -49,6 +49,7 @@ import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
 import org.apache.solr.security.AuthorizationContext;
 import org.apache.solr.security.PermissionNameProvider;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -192,6 +193,16 @@ public class CoreAdminHandler extends RequestHandlerBase implements PermissionNa
               } else {
                 addTask("completed", taskObject, true);
               }
+
+              // Claim the task so the caller that's waiting for async status knows we're done
+              // TODO: TJP ~ not sure if this the correct place for this ...
+              if (coreContainer.getZkController() != null) {
+                try {
+                  coreContainer.getZkController().claimAsyncId(taskId);
+                } catch (KeeperException e) {
+                  log.error("Failed to claim async task {}", taskId, e);
+                }
+              }
             }
           });
         } finally {


[lucene-solr] 01/04: @715 Try to address issue where we can try to unregister a party that is not outstanding.

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit b0dc65502da92299b2f238aa02322861c6cc852f
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Sep 2 14:45:56 2020 -0500

    @715 Try to address issue where we can try to unregister a party that is not outstanding.
---
 .../src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java      | 1 -
 1 file changed, 1 deletion(-)

diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 1c6a002..7e5040a 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -427,7 +427,6 @@ public class Http2SolrClient extends SolrClient {
               onComplete.onSuccess(rsp);
             } catch (Exception e) {
               ParWork.propegateInterrupt(e);
-              onComplete.onFailure(e);
             }
           } finally {
             asyncTracker.completeListener.onComplete(result);


[lucene-solr] 04/04: @717 OnQueuedListener missing and a bit of fallout.

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit b361dba39485907744fc0b7f22914a2063906cbb
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Sep 2 21:19:25 2020 -0500

    @717 OnQueuedListener missing and a bit of fallout.
    
    # Conflicts:
    #	solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java
    #	solr/solrj/src/test/org/apache/solr/client/solrj/embedded/SolrExampleStreamingBinaryHttp2Test.java
---
 .../impl/ConcurrentUpdateHttp2SolrClient.java      | 167 +++++++++++++--------
 .../solr/client/solrj/impl/Http2SolrClient.java    |   2 +-
 .../SolrExampleStreamingBinaryHttp2Test.java       |   5 +-
 3 files changed, 112 insertions(+), 62 deletions(-)

diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java
index 4385963..2217768 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java
@@ -46,6 +46,7 @@ import java.util.LinkedList;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
@@ -69,10 +70,11 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
 
   private boolean shutdownClient;
   private boolean shutdownExecutor;
-  private int pollQueueTime = 50;
+  private int pollQueueTime = 250;
   private int stallTime;
   private final boolean streamDeletes;
   private volatile boolean closed;
+  private volatile CountDownLatch lock = null; // used to block everything
 
   private static class CustomBlockingQueue<E> implements Iterable<E>{
     private final BlockingQueue<E> queue;
@@ -154,7 +156,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
     this.runners = new LinkedList<>();
     this.streamDeletes = builder.streamDeletes;
     this.basePath = builder.baseSolrUrl;
-    this.stallTime = Integer.getInteger("solr.cloud.client.stallTime", 0);
+    this.stallTime = Integer.getInteger("solr.cloud.client.stallTime", 15000);
     if (stallTime < pollQueueTime * 2) {
       throw new RuntimeException("Invalid stallTime: " + stallTime + "ms, must be 2x > pollQueueTime " + pollQueueTime);
     }
@@ -175,44 +177,38 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
    */
   class Runner implements Runnable {
 
-    volatile boolean isRunning = false;
-
     @Override
     public void run() {
       log.debug("starting runner: {}", this);
-      this.isRunning = true;
-      try {
-        // This loop is so we can continue if an element was added to the queue after the last runner exited.
-        for (; ; ) {
-          try {
+      // This loop is so we can continue if an element was added to the queue after the last runner exited.
+      for (;;) {
+        try {
 
-            sendUpdateStream();
+          sendUpdateStream();
 
-          } catch (Throwable e) {
-            ParWork.propegateInterrupt(e);
-            if (e instanceof OutOfMemoryError) {
-              throw (OutOfMemoryError) e;
-            }
-            handleError(e);
-          } finally {
-            synchronized (runners) {
-              // check to see if anything else was added to the queue
-              if (runners.size() == 1 && !queue.isEmpty() && !scheduler.isShutdown()) {
-                // If there is something else to process, keep last runner alive by staying in the loop.
-              } else {
-                runners.remove(this);
+        } catch (Throwable e) {
+          if (e instanceof OutOfMemoryError) {
+            throw (OutOfMemoryError) e;
+          }
+          handleError(e);
+        } finally {
+          synchronized (runners) {
+            // check to see if anything else was added to the queue
+            if (runners.size() == 1 && !queue.isEmpty() && !scheduler.isShutdown()) {
+              // If there is something else to process, keep last runner alive by staying in the loop.
+            } else {
+              runners.remove(this);
+              if (runners.isEmpty()) {
                 // notify anyone waiting in blockUntilFinished
                 runners.notifyAll();
-                break;
               }
+              break;
             }
           }
         }
-
-        log.debug("finished: {}", this);
-      } finally {
-        isRunning = false;
       }
+
+      log.debug("finished: {}", this);
     }
 
     //
@@ -329,14 +325,16 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
   }
 
   private void notifyQueueAndRunnersIfEmptyQueue() {
-    synchronized (queue) {
-      // queue may be empty
-      queue.notifyAll();
-    }
-    synchronized (runners) {
-      // we notify runners too - if there is a high queue poll time and this is the update
-      // that emptied the queue, we make an attempt to avoid the 250ms timeout in blockUntilFinished
-      runners.notifyAll();
+    if (queue.size() == 0) {
+      synchronized (queue) {
+        // queue may be empty
+        queue.notifyAll();
+      }
+      synchronized (runners) {
+        // we notify runners too - if there is a high queue poll time and this is the update
+        // that emptied the queue, we make an attempt to avoid the 250ms timeout in blockUntilFinished
+        runners.notifyAll();
+      }
     }
   }
 
@@ -395,10 +393,16 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
     }
 
     try {
+      CountDownLatch tmpLock = lock;
+      if (tmpLock != null) {
+        tmpLock.await();
+      }
 
       Update update = new Update(req, collection);
       boolean success = queue.offer(update);
 
+      long lastStallTime = -1;
+      int lastQueueSize = -1;
       for (;;) {
         synchronized (runners) {
           // see if queue is half full and we can add more runners
@@ -430,7 +434,26 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
         // start more runners.
         //
         if (!success) {
-          success = queue.offer(update, 5, TimeUnit.MILLISECONDS);
+          success = queue.offer(update, 100, TimeUnit.MILLISECONDS);
+        }
+        if (!success) {
+          // stall prevention
+          int currentQueueSize = queue.size();
+          if (currentQueueSize != lastQueueSize) {
+            // there's still some progress in processing the queue - not stalled
+            lastQueueSize = currentQueueSize;
+            lastStallTime = -1;
+          } else {
+            if (lastStallTime == -1) {
+              // mark a stall but keep trying
+              lastStallTime = System.nanoTime();
+            } else {
+              long currentStallTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastStallTime);
+              if (currentStallTime > stallTime) {
+                throw new IOException("Request processing has stalled for " + currentStallTime + "ms with " + queue.size() + " remaining elements in the queue.");
+              }
+            }
+          }
         }
       }
     } catch (InterruptedException e) {
@@ -446,7 +469,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
   }
 
   public synchronized void blockUntilFinished() throws IOException {
-
+    lock = new CountDownLatch(1);
     try {
 
       waitForEmptyQueue();
@@ -481,9 +504,8 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
             } else {
               long currentStallTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastStallTime);
               if (currentStallTime > stallTime) {
-                throw new IOException("Task queue processing has stalled for " + currentStallTime + " ms with " + queueSize + " remaining elements to process.");
-//                Thread.currentThread().interrupt();
-//                break;
+                log.warn("Task queue processing has stalled for " + currentStallTime + " ms with " + queueSize + " remaining elements to process.");
+                return;
               }
             }
           }
@@ -515,35 +537,62 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
         }
       }
     } finally {
-
+      lock.countDown();
+      lock = null;
     }
   }
 
   private void waitForEmptyQueue() throws IOException {
-    synchronized (queue) {
-      while (!queue.isEmpty()) {
-        synchronized (runners) {
-          int queueSize = queue.size();
-          if (queueSize > 0 && runners.size() == 0) {
-            log.warn("No more runners, but queue still has " +
-                    queueSize + " adding more runners to process remaining requests on queue");
-            addRunner();
-          }
-        }
+    boolean threadInterrupted = Thread.currentThread().isInterrupted();
+
+    long lastStallTime = -1;
+    int lastQueueSize = -1;
+    while (!queue.isEmpty()) {
+      if (scheduler.isTerminated()) {
+        log.warn("The task queue still has elements but the update scheduler {} is terminated. Can't process any more tasks. Queue size: {}, Runners: {}. Current thread Interrupted? {}"
+            , scheduler, queue.size(), runners.size(), threadInterrupted);
+        break;
+      }
 
+      synchronized (runners) {
+        int queueSize = queue.size();
+        if (queueSize > 0 && runners.isEmpty()) {
+          log.warn("No more runners, but queue still has {} adding more runners to process remaining requests on queue"
+              , queueSize);
+          addRunner();
+        }
+      }
+      synchronized (queue) {
         try {
           queue.wait(250);
         } catch (InterruptedException e) {
-          ParWork.propegateInterrupt(e);
-          return;
+          // If we set the thread as interrupted again, the next time the wait it's called i t's going to return immediately
+          threadInterrupted = true;
+          log.warn("Thread interrupted while waiting for update queue to be empty. There are still {} elements in the queue.",
+              queue.size());
+        }
+      }
+      int currentQueueSize = queue.size();
+      // stall prevention
+      if (currentQueueSize != lastQueueSize) {
+        lastQueueSize = currentQueueSize;
+        lastStallTime = -1;
+      } else {
+        lastQueueSize = currentQueueSize;
+        if (lastStallTime == -1) {
+          lastStallTime = System.nanoTime();
+        } else {
+          long currentStallTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastStallTime);
+          if (currentStallTime > stallTime) {
+            throw new IOException("Task queue processing has stalled for " + currentStallTime + " ms with " + currentQueueSize + " remaining elements to process.");
+//            threadInterrupted = true;
+//            break;
+          }
         }
       }
-
     }
-    if (scheduler.isTerminated() || scheduler.isShutdown()) {
-      log.warn("The task queue still has elements but the update scheduler {} is terminated. Can't process any more tasks. Queue size: {}, Runners: {}. Current thread Interrupted? {}"
-              , scheduler, queue.size(), runners.size());
-      return;
+    if (threadInterrupted) {
+      Thread.currentThread().interrupt();
     }
   }
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 7e5040a..32b95bd 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -363,7 +363,7 @@ public class Http2SolrClient extends SolrClient {
       }
     };
     asyncTracker.register();
-    postRequest.send(responseListener);
+    postRequest.onRequestQueued(asyncTracker.queuedListener).send(responseListener);
 
     boolean isXml = ClientUtils.TEXT_XML.equals(requestWriter.getUpdateContentType());
     OutStream outStream = new OutStream(collection, origParams, provider, responseListener,
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/SolrExampleStreamingBinaryHttp2Test.java b/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/SolrExampleStreamingBinaryHttp2Test.java
index 0f5a2ac..239ebd2 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/SolrExampleStreamingBinaryHttp2Test.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/embedded/SolrExampleStreamingBinaryHttp2Test.java
@@ -18,6 +18,7 @@
 package org.apache.solr.client.solrj.embedded;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.lucene.util.LuceneTestCase;
@@ -37,7 +38,7 @@ import org.junit.Test;
 
 @LuceneTestCase.Slow
 @SolrTestCaseJ4.SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
-@Ignore // nocommit debug
+@Ignore // nocommit - mrm: some fails to look deeper at here
 public class SolrExampleStreamingBinaryHttp2Test extends SolrExampleStreamingHttp2Test {
 
   @Override
@@ -86,7 +87,7 @@ public class SolrExampleStreamingBinaryHttp2Test extends SolrExampleStreamingHtt
     assertEquals(1, parentDoc.getChildDocumentCount());
 
     // test streaming
-    final List<SolrDocument> docs = new ArrayList<>();
+    final List<SolrDocument> docs = Collections.synchronizedList(new ArrayList<>());
     client.queryAndStreamResponse(query, new StreamingResponseCallback() {
       @Override
       public void streamSolrDocument(SolrDocument doc) {


[lucene-solr] 03/04: @716 Bring this test back.

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 01a120f49daaac9bfd2daebf6e8ea59a8094617c
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Sep 2 20:43:07 2020 -0500

    @716 Bring this test back.
---
 .../solr/cloud/api/collections/AsyncCallRequestStatusResponseTest.java   | 1 -
 1 file changed, 1 deletion(-)

diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/AsyncCallRequestStatusResponseTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/AsyncCallRequestStatusResponseTest.java
index d994af8..8786957 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/AsyncCallRequestStatusResponseTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/AsyncCallRequestStatusResponseTest.java
@@ -50,7 +50,6 @@ public class AsyncCallRequestStatusResponseTest extends SolrCloudTestCase {
 
   @SuppressWarnings("deprecation")
   @Test
-  @Ignore // nocommit - this is flakey in it's wait for the async call to complete, or the system is flakey in reporting
   public void testAsyncCallStatusResponse() throws Exception {
     int numShards = 4;
     int numReplicas = 1;