You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/09/03 21:34:17 UTC

[lucene-solr] branch reference_impl_dev updated: @728 Work out the async tracker.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl_dev by this push:
     new c2c25ee  @728 Work out the async tracker.
c2c25ee is described below

commit c2c25eeeeaba8053642dce8a334a960c43a1daae
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Sep 3 16:34:00 2020 -0500

    @728 Work out the async tracker.
---
 .../solr/client/solrj/impl/Http2SolrClient.java    | 59 +++++++++++++---------
 ...ntUpdateHttp2SolrClientMultiCollectionTest.java |  1 -
 2 files changed, 35 insertions(+), 25 deletions(-)

diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 0f0f536..628820c 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -351,13 +351,8 @@ public class Http2SolrClient extends SolrClient {
     }
 
     decorateRequest(postRequest, updateRequest);
-    InputStreamResponseListener responseListener = new InputStreamResponseListener() {
-      @Override
-      public void onComplete(Result result) {
-        super.onComplete(result);
-      }
-    };
-    asyncTracker.register();
+    InputStreamResponseListener responseListener = new OurInputStreamResponseListener();
+
     postRequest.send(responseListener);
 
     boolean isXml = ClientUtils.TEXT_XML.equals(requestWriter.getUpdateContentType());
@@ -401,9 +396,9 @@ public class Http2SolrClient extends SolrClient {
     final ResponseParser parser = solrRequest.getResponseParser() == null
         ? this.parser: solrRequest.getResponseParser();
     if (onComplete != null) {
+      asyncTracker.register();
       // This async call only suitable for indexing since the response size is limited by 5MB
-      req.onRequestQueued(asyncTracker.queuedListener)
-          .send(new BufferingResponseListener(5 * 1024 * 1024) {
+      req.send(new BufferingResponseListener(5 * 1024 * 1024) {
 
         @Override
         public void onComplete(Result result) {
@@ -431,12 +426,7 @@ public class Http2SolrClient extends SolrClient {
       return null;
     } else {
       try {
-        InputStreamResponseListener listener = new InputStreamResponseListener() {
-          @Override
-          public void onComplete(Result result) {
-            super.onComplete(result);
-          }
-        };
+        InputStreamResponseListener listener = new MyInputStreamResponseListener(asyncTracker);
         req.send(listener);
         Response response = listener.get(idleTimeout, TimeUnit.MILLISECONDS);
         InputStream is = listener.getInputStream();
@@ -895,9 +885,9 @@ public class Http2SolrClient extends SolrClient {
 
     public void waitForComplete() {
       if (log.isDebugEnabled()) log.debug("Before wait for outstanding requests registered: {} arrived: {}", phaser.getRegisteredParties(), phaser.getArrivedParties());
-      if (phaser.getUnarrivedParties() <= 1) {
-        return;
-      }
+//      if (phaser.getUnarrivedParties() <= 1) {
+//        return;
+//      }
       int arrival = phaser.arriveAndAwaitAdvance();
 
      // phaser.awaitAdvance(phaser.arriveAndDeregister());
@@ -916,12 +906,12 @@ public class Http2SolrClient extends SolrClient {
       if (log.isDebugEnabled()) {
         log.debug("Registered new party");
       }
-   //   phaser.register();
-//      try {
-//        available.acquire();
-//      } catch (InterruptedException ignored) {
-//        ParWork.propegateInterrupt(ignored);
-//      }
+      phaser.register();
+      try {
+        available.acquire();
+      } catch (InterruptedException ignored) {
+        ParWork.propegateInterrupt(ignored);
+      }
     }
 
     private static class ThePhaser extends Phaser {
@@ -1238,4 +1228,25 @@ public class Http2SolrClient extends SolrClient {
     ContentResponse response = httpClient.newRequest(url).method(PUT).content(new BytesContentProvider(bytes), contentType).send();
     return response.getContentAsString();
   }
+
+  private static class MyInputStreamResponseListener extends InputStreamResponseListener {
+
+    AsyncTracker tracker;
+
+    MyInputStreamResponseListener(AsyncTracker tracker) {
+      this.tracker = tracker;
+    }
+
+    @Override
+    public void onComplete(Result result) {
+      super.onComplete(result);
+    }
+  }
+
+  private static class OurInputStreamResponseListener extends InputStreamResponseListener {
+    @Override
+    public void onComplete(Result result) {
+      super.onComplete(result);
+    }
+  }
 }
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClientMultiCollectionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClientMultiCollectionTest.java
index d72b108..71ae4d3 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClientMultiCollectionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClientMultiCollectionTest.java
@@ -36,7 +36,6 @@ import org.junit.Test;
  * {@link ConcurrentUpdateSolrClient} reuses the same HTTP connection to send multiple requests.  These tests ensure
  * that this connection-reuse never results in documents being sent to the wrong collection.  See SOLR-12803
  */
-//@Ignore // nocommit debug
 public class ConcurrentUpdateHttp2SolrClientMultiCollectionTest extends SolrCloudTestCase {
 
   private static final String COLLECTION_ONE_NAME = "collection1";