You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/09/03 21:34:17 UTC
[lucene-solr] branch reference_impl_dev updated: @728 Work out the
async tracker.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl_dev by this push:
new c2c25ee @728 Work out the async tracker.
c2c25ee is described below
commit c2c25eeeeaba8053642dce8a334a960c43a1daae
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Sep 3 16:34:00 2020 -0500
@728 Work out the async tracker.
---
.../solr/client/solrj/impl/Http2SolrClient.java | 59 +++++++++++++---------
...ntUpdateHttp2SolrClientMultiCollectionTest.java | 1 -
2 files changed, 35 insertions(+), 25 deletions(-)
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 0f0f536..628820c 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -351,13 +351,8 @@ public class Http2SolrClient extends SolrClient {
}
decorateRequest(postRequest, updateRequest);
- InputStreamResponseListener responseListener = new InputStreamResponseListener() {
- @Override
- public void onComplete(Result result) {
- super.onComplete(result);
- }
- };
- asyncTracker.register();
+ InputStreamResponseListener responseListener = new OurInputStreamResponseListener();
+
postRequest.send(responseListener);
boolean isXml = ClientUtils.TEXT_XML.equals(requestWriter.getUpdateContentType());
@@ -401,9 +396,9 @@ public class Http2SolrClient extends SolrClient {
final ResponseParser parser = solrRequest.getResponseParser() == null
? this.parser: solrRequest.getResponseParser();
if (onComplete != null) {
+ asyncTracker.register();
// This async call only suitable for indexing since the response size is limited by 5MB
- req.onRequestQueued(asyncTracker.queuedListener)
- .send(new BufferingResponseListener(5 * 1024 * 1024) {
+ req.send(new BufferingResponseListener(5 * 1024 * 1024) {
@Override
public void onComplete(Result result) {
@@ -431,12 +426,7 @@ public class Http2SolrClient extends SolrClient {
return null;
} else {
try {
- InputStreamResponseListener listener = new InputStreamResponseListener() {
- @Override
- public void onComplete(Result result) {
- super.onComplete(result);
- }
- };
+ InputStreamResponseListener listener = new MyInputStreamResponseListener(asyncTracker);
req.send(listener);
Response response = listener.get(idleTimeout, TimeUnit.MILLISECONDS);
InputStream is = listener.getInputStream();
@@ -895,9 +885,9 @@ public class Http2SolrClient extends SolrClient {
public void waitForComplete() {
if (log.isDebugEnabled()) log.debug("Before wait for outstanding requests registered: {} arrived: {}", phaser.getRegisteredParties(), phaser.getArrivedParties());
- if (phaser.getUnarrivedParties() <= 1) {
- return;
- }
+// if (phaser.getUnarrivedParties() <= 1) {
+// return;
+// }
int arrival = phaser.arriveAndAwaitAdvance();
// phaser.awaitAdvance(phaser.arriveAndDeregister());
@@ -916,12 +906,12 @@ public class Http2SolrClient extends SolrClient {
if (log.isDebugEnabled()) {
log.debug("Registered new party");
}
- // phaser.register();
-// try {
-// available.acquire();
-// } catch (InterruptedException ignored) {
-// ParWork.propegateInterrupt(ignored);
-// }
+ phaser.register();
+ try {
+ available.acquire();
+ } catch (InterruptedException ignored) {
+ ParWork.propegateInterrupt(ignored);
+ }
}
private static class ThePhaser extends Phaser {
@@ -1238,4 +1228,25 @@ public class Http2SolrClient extends SolrClient {
ContentResponse response = httpClient.newRequest(url).method(PUT).content(new BytesContentProvider(bytes), contentType).send();
return response.getContentAsString();
}
+
+ private static class MyInputStreamResponseListener extends InputStreamResponseListener {
+
+ AsyncTracker tracker;
+
+ MyInputStreamResponseListener(AsyncTracker tracker) {
+ this.tracker = tracker;
+ }
+
+ @Override
+ public void onComplete(Result result) {
+ super.onComplete(result);
+ }
+ }
+
+ private static class OurInputStreamResponseListener extends InputStreamResponseListener {
+ @Override
+ public void onComplete(Result result) {
+ super.onComplete(result);
+ }
+ }
}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClientMultiCollectionTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClientMultiCollectionTest.java
index d72b108..71ae4d3 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClientMultiCollectionTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClientMultiCollectionTest.java
@@ -36,7 +36,6 @@ import org.junit.Test;
* {@link ConcurrentUpdateSolrClient} reuses the same HTTP connection to send multiple requests. These tests ensure
* that this connection-reuse never results in documents being sent to the wrong collection. See SOLR-12803
*/
-//@Ignore // nocommit debug
public class ConcurrentUpdateHttp2SolrClientMultiCollectionTest extends SolrCloudTestCase {
private static final String COLLECTION_ONE_NAME = "collection1";