You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2020/08/06 10:04:19 UTC
[lucene-solr] 01/01: SOLR-14713: Single thread on streaming updates
This is an automated email from the ASF dual-hosted git repository.
datcm pushed a commit to branch jira/SOLR-14713
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 5986b4cc3c83ac89d014d85ec4ea53d303800fe7
Author: Cao Manh Dat <da...@apache.org>
AuthorDate: Thu Aug 6 17:03:57 2020 +0700
SOLR-14713: Single thread on streaming updates
---
.../org/apache/solr/update/SolrCmdDistributor.java | 127 +++-----------
.../apache/solr/update/StreamingSolrClients.java | 188 ++++++++++++---------
.../solr/update/MockStreamingSolrClients.java | 47 +++---
.../apache/solr/update/SolrCmdDistributorTest.java | 24 +--
.../solr/client/solrj/impl/Http2SolrClient.java | 76 +++++++--
5 files changed, 226 insertions(+), 236 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index 9d2377e..ef627d5 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -26,18 +26,12 @@ import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.propagation.Format;
import org.apache.http.NoHttpResponseException;
-import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.BinaryResponseParser;
import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient;
@@ -70,49 +64,37 @@ public class SolrCmdDistributor implements Closeable {
private int retryPause = 500;
private final List<Error> allErrors = new ArrayList<>();
- private final List<Error> errors = Collections.synchronizedList(new ArrayList<Error>());
-
- private final CompletionService<Object> completionService;
- private final Set<Future<Object>> pending = new HashSet<>();
-
- public static interface AbortCheck {
- public boolean abortCheck();
- }
-
+
public SolrCmdDistributor(UpdateShardHandler updateShardHandler) {
this.clients = new StreamingSolrClients(updateShardHandler);
- this.completionService = new ExecutorCompletionService<>(updateShardHandler.getUpdateExecutor());
}
/* For tests only */
SolrCmdDistributor(StreamingSolrClients clients, int retryPause) {
this.clients = clients;
this.retryPause = retryPause;
- completionService = new ExecutorCompletionService<>(clients.getUpdateExecutor());
}
- public void finish() {
+ public void finish() {
+ if (finished)
+ return;
try {
- assert !finished : "lifecycle sanity check";
finished = true;
blockAndDoRetries();
} catch (IOException e) {
log.warn("Unable to finish sending updates", e);
- } finally {
- clients.shutdown();
}
}
public void close() {
- clients.shutdown();
+ clients.finishStreams();
}
private void doRetriesIfNeeded() throws IOException {
// NOTE: retries will be forwards to a single url
- List<Error> errors = new ArrayList<>(this.errors);
- errors.addAll(clients.getErrors());
+ List<Error> errors = new ArrayList<>(clients.getErrors());
List<Error> resubmitList = new ArrayList<>();
if (log.isInfoEnabled() && errors.size() > 0) {
@@ -150,6 +132,8 @@ public class SolrCmdDistributor implements Closeable {
err.req.retries++;
resubmitList.add(err);
} else {
+ // only track the error if we are not retrying the request
+ err.req.trackRequestResult(null, false);
allErrors.add(err);
}
} catch (Exception e) {
@@ -173,7 +157,6 @@ public class SolrCmdDistributor implements Closeable {
}
clients.clearErrors();
- this.errors.clear();
for (Error err : resubmitList) {
if (err.req.node instanceof ForwardNode) {
SolrException.log(SolrCmdDistributor.log, "forwarding update to "
@@ -188,7 +171,7 @@ public class SolrCmdDistributor implements Closeable {
+ err.req.cmd.toString() + " params:"
+ err.req.uReq.getParams() + " rsp:" + err.statusCode, err.e);
}
- submit(err.req, false);
+ submit(err.req);
}
if (resubmitList.size() > 0) {
@@ -217,7 +200,7 @@ public class SolrCmdDistributor implements Closeable {
} else {
uReq.deleteByQuery(cmd.query);
}
- submit(new Req(cmd, node, uReq, sync, rollupTracker, leaderTracker), false);
+ submit(new Req(cmd, node, uReq, sync, rollupTracker, leaderTracker));
}
}
@@ -241,7 +224,7 @@ public class SolrCmdDistributor implements Closeable {
if (cmd.isInPlaceUpdate()) {
params.set(DistributedUpdateProcessor.DISTRIB_INPLACE_PREVVERSION, String.valueOf(cmd.prevVersion));
}
- submit(new Req(cmd, node, uReq, synchronous, rollupTracker, leaderTracker), false);
+ submit(new Req(cmd, node, uReq, synchronous, rollupTracker, leaderTracker));
}
}
@@ -258,28 +241,14 @@ public class SolrCmdDistributor implements Closeable {
uReq.setParams(params);
addCommit(uReq, cmd);
- submit(new Req(cmd, node, uReq, false), true);
+ submit(new Req(cmd, node, uReq, false));
}
}
public void blockAndDoRetries() throws IOException {
- clients.blockUntilFinished();
-
- // wait for any async commits to complete
- while (pending != null && pending.size() > 0) {
- Future<Object> future = null;
- try {
- future = completionService.take();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.error("blockAndDoRetries interrupted", e);
- }
- if (future == null) break;
- pending.remove(future);
- }
+ clients.finishStreams();
doRetriesIfNeeded();
-
}
void addCommit(UpdateRequest ureq, CommitUpdateCommand cmd) {
@@ -288,7 +257,7 @@ public class SolrCmdDistributor implements Closeable {
: AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher, cmd.maxOptimizeSegments, cmd.softCommit, cmd.expungeDeletes, cmd.openSearcher);
}
- private void submit(final Req req, boolean isCommit) throws IOException {
+ private void submit(final Req req) throws IOException {
// Copy user principal from the original request to the new update request, for later authentication interceptor use
if (SolrRequestInfo.getRequestInfo() != null) {
req.uReq.setUserPrincipal(SolrRequestInfo.getRequestInfo().getReq().getUserPrincipal());
@@ -301,60 +270,13 @@ public class SolrCmdDistributor implements Closeable {
new SolrRequestCarrier(req.uReq));
}
- if (req.synchronous) {
- blockAndDoRetries();
-
- try {
- req.uReq.setBasePath(req.node.getUrl());
- clients.getHttpClient().request(req.uReq);
- } catch (Exception e) {
- SolrException.log(log, e);
- Error error = new Error();
- error.e = e;
- error.req = req;
- if (e instanceof SolrException) {
- error.statusCode = ((SolrException) e).code();
- }
- errors.add(error);
- }
-
- return;
- }
-
if (log.isDebugEnabled()) {
log.debug("sending update to {} retry: {} {} params {}"
- , req.node.getUrl(), req.retries, req.cmd, req.uReq.getParams());
- }
-
- if (isCommit) {
- // a commit using ConncurrentUpdateSolrServer is not async,
- // so we make it async to prevent commits from happening
- // serially across multiple nodes
- pending.add(completionService.submit(() -> {
- doRequest(req);
- return null;
- }));
- } else {
- doRequest(req);
- }
- }
-
- private void doRequest(final Req req) {
- try {
- SolrClient solrClient = clients.getSolrClient(req);
- solrClient.request(req.uReq);
- } catch (Exception e) {
- SolrException.log(log, e);
- Error error = new Error();
- error.e = e;
- error.req = req;
- if (e instanceof SolrException) {
- error.statusCode = ((SolrException) e).code();
- }
- errors.add(error);
+ , req.node.getUrl(), req.retries, req.cmd, req.uReq.getParams());
}
+ clients.getClient(req).request(req);
}
-
+
public static class Req {
public Node node;
public UpdateRequest uReq;
@@ -406,11 +328,11 @@ public class SolrCmdDistributor implements Closeable {
//
// In the case of a leaderTracker and rollupTracker both being present, then we need to take care when assembling
// the final response to check both the rollup and leader trackers on the aggregator node.
- public void trackRequestResult(org.eclipse.jetty.client.api.Response resp, InputStream respBody, boolean success) {
+ public void trackRequestResult(NamedList<Object> rs, boolean success) {
// Returning Integer.MAX_VALUE here means there was no "rf" on the response, therefore we just need to increment
// our achieved rf if we are a leader, i.e. have a leaderTracker.
- int rfFromResp = getRfFromResponse(respBody);
+ int rfFromResp = getRfFromResponse(rs);
if (leaderTracker != null && rfFromResp == Integer.MAX_VALUE) {
leaderTracker.trackRequestResult(node, success);
@@ -421,11 +343,9 @@ public class SolrCmdDistributor implements Closeable {
}
}
- private int getRfFromResponse(InputStream inputStream) {
- if (inputStream != null) {
+ private int getRfFromResponse(NamedList<Object> nl) {
+ if (nl != null) {
try {
- BinaryResponseParser brp = new BinaryResponseParser();
- NamedList<Object> nl = brp.processResponse(inputStream, null);
Object hdr = nl.get("responseHeader");
if (hdr != null && hdr instanceof NamedList) {
@SuppressWarnings({"unchecked"})
@@ -453,6 +373,7 @@ public class SolrCmdDistributor implements Closeable {
public static class Error {
public Exception e;
public int statusCode = -1;
+ public Boolean retriable;
/**
* NOTE: This is the request that happened to be executed when this error was <b>triggered</b> the error,
@@ -465,8 +386,8 @@ public class SolrCmdDistributor implements Closeable {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("SolrCmdDistributor$Error: statusCode=").append(statusCode);
- sb.append("; exception=").append(String.valueOf(e));
- sb.append("; req=").append(String.valueOf(req));
+ sb.append("; exception=").append(e);
+ sb.append("; req=").append(req);
return sb.toString();
}
}
diff --git a/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java b/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
index c9040c9..a2646cb 100644
--- a/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
+++ b/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
@@ -26,10 +26,11 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
-import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.ConcurrentUpdateHttp2SolrClient;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.NamedList;
import org.apache.solr.update.SolrCmdDistributor.Error;
import org.eclipse.jetty.client.api.Response;
import org.slf4j.Logger;
@@ -38,14 +39,10 @@ import org.slf4j.LoggerFactory;
public class StreamingSolrClients {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final int runnerCount = Integer.getInteger("solr.cloud.replication.runners", 1);
- // should be less than solr.jetty.http.idleTimeout
- private final int pollQueueTime = Integer.getInteger("solr.cloud.client.pollQueueTime", 10000);
-
private Http2SolrClient httpClient;
- private Map<String, ConcurrentUpdateHttp2SolrClient> solrClients = new HashMap<>();
- private List<Error> errors = Collections.synchronizedList(new ArrayList<Error>());
+ private Map<String, SingleStreamClient> clients = new HashMap<>();
+ private List<Error> errors = Collections.synchronizedList(new ArrayList<>());
private ExecutorService updateExecutor;
@@ -62,35 +59,124 @@ public class StreamingSolrClients {
errors.clear();
}
- public synchronized SolrClient getSolrClient(final SolrCmdDistributor.Req req) {
+ public synchronized SingleStreamClient getClient(final SolrCmdDistributor.Req req) {
String url = getFullUrl(req.node.getUrl());
- ConcurrentUpdateHttp2SolrClient client = solrClients.get(url);
+ SingleStreamClient client = clients.get(url);
if (client == null) {
- // NOTE: increasing to more than 1 threadCount for the client could cause updates to be reordered
- // on a greater scale since the current behavior is to only increase the number of connections/Runners when
- // the queue is more than half full.
- client = new ErrorReportingConcurrentUpdateSolrClient.Builder(url, httpClient, req, errors)
- .withQueueSize(100)
- .withThreadCount(runnerCount)
- .withExecutorService(updateExecutor)
- .alwaysStreamDeletes()
- .build();
- client.setPollQueueTime(pollQueueTime); // minimize connections created
- solrClients.put(url, client);
+ client = new SingleStreamClient(httpClient, url);
+ clients.put(url, client);
}
return client;
}
- public synchronized void blockUntilFinished() throws IOException {
- for (ConcurrentUpdateHttp2SolrClient client : solrClients.values()) {
- client.blockUntilFinished();
+ class SingleStreamClient {
+
+ private final Http2SolrClient client;
+ private final String basePath;
+ private Http2SolrClient.OutStream outStream;
+ private SolrCmdDistributor.Req lastReq;
+ private int numRequests = 0;
+
+ public SingleStreamClient(Http2SolrClient client, String basePath) {
+ this.client = client;
+ this.basePath = basePath;
+ }
+
+ public void request(SolrCmdDistributor.Req req) {
+ this.lastReq = req;
+ if (req.synchronous) {
+ closeStream();
+ sendSyncReq(req);
+ return;
+ }
+
+ UpdateRequest request = req.uReq;
+ numRequests++;
+ //TODO datcm serialize the request up-front and reuse it
+ try {
+ if (outStream == null) {
+ outStream = client.initOutStream(basePath, request, request.getCollection());
+ }
+ client.send(outStream, request, request.getCollection());
+ } catch (IOException e) {
+ if (outStream != null) {
+ closeStream();
+ } else {
+ handleError(e, req);
+ }
+ }
+ }
+
+ protected void handleError(Exception ex, SolrCmdDistributor.Req req) {
+ log.error("Error when calling {} to {}", req, req.node.getUrl(), ex);
+ Error error = new Error();
+ error.e = ex;
+ if (ex instanceof SolrException) {
+ error.statusCode = ((SolrException) ex).code();
+ }
+ error.req = req;
+ errors.add(error);
+ }
+
+ public void preFinish() {
+ if (outStream != null) {
+ try {
+ client.closeStream(outStream);
+ } catch (IOException e) {
+ if (numRequests > 1) {
+ handleError(new RuntimeException(), lastReq);
+ } else {
+ handleError(e, lastReq);
+ }
+ }
+ }
+ }
+
+ public void finish() {
+ closeStream();
+ }
+
+ private void sendSyncReq(SolrCmdDistributor.Req req) {
+ try {
+ NamedList<Object> rs = client.request(req.uReq);
+ trackResult(rs);
+ } catch (Exception e) {
+ handleError(e, req);
+ }
+ }
+
+ private void trackResult(NamedList<Object> rs) {
+ lastReq.trackRequestResult(rs, true);
+ }
+
+ /**
+ * Close previous stream and call error handling code
+ */
+ private void closeStream() {
+ if (outStream != null) {
+ try {
+ NamedList<Object> rs = client.closeAndGetResponse(outStream);
+ trackResult(rs);
+ } catch (Exception e) {
+ if (numRequests > 1) {
+ handleError(new RuntimeException(), lastReq);
+ } else {
+ handleError(e, lastReq);
+ }
+ }
+ }
+ outStream = null;
}
}
- public synchronized void shutdown() {
- for (ConcurrentUpdateHttp2SolrClient client : solrClients.values()) {
- client.close();
+ public synchronized void finishStreams() {
+ //TODO nococmmit SOLR-13975
+ for (SingleStreamClient client : clients.values()) {
+ client.preFinish();
+ }
+ for (SingleStreamClient client : clients.values()) {
+ client.closeStream();
}
}
@@ -108,54 +194,4 @@ public class StreamingSolrClients {
return httpClient;
}
- public ExecutorService getUpdateExecutor() {
- return updateExecutor;
- }
-}
-
-class ErrorReportingConcurrentUpdateSolrClient extends ConcurrentUpdateHttp2SolrClient {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final SolrCmdDistributor.Req req;
- private final List<Error> errors;
-
- public ErrorReportingConcurrentUpdateSolrClient(Builder builder) {
- super(builder);
- this.req = builder.req;
- this.errors = builder.errors;
- }
-
- @Override
- public void handleError(Throwable ex) {
- log.error("Error when calling {} to {}", req, req.node.getUrl(), ex);
- Error error = new Error();
- error.e = (Exception) ex;
- if (ex instanceof SolrException) {
- error.statusCode = ((SolrException) ex).code();
- }
- error.req = req;
- errors.add(error);
- if (!req.shouldRetry(error)) {
- // only track the error if we are not retrying the request
- req.trackRequestResult(null, null, false);
- }
- }
- @Override
- public void onSuccess(Response resp, InputStream respBody) {
- req.trackRequestResult(resp, respBody, true);
- }
-
- static class Builder extends ConcurrentUpdateHttp2SolrClient.Builder {
- protected SolrCmdDistributor.Req req;
- protected List<Error> errors;
-
- public Builder(String baseSolrUrl, Http2SolrClient client, SolrCmdDistributor.Req req, List<Error> errors) {
- super(baseSolrUrl, client);
- this.req = req;
- this.errors = errors;
- }
-
- public ErrorReportingConcurrentUpdateSolrClient build() {
- return new ErrorReportingConcurrentUpdateSolrClient(this);
- }
- }
}
diff --git a/solr/core/src/test/org/apache/solr/update/MockStreamingSolrClients.java b/solr/core/src/test/org/apache/solr/update/MockStreamingSolrClients.java
index bfa6221..a19ca75 100644
--- a/solr/core/src/test/org/apache/solr/update/MockStreamingSolrClients.java
+++ b/solr/core/src/test/org/apache/solr/update/MockStreamingSolrClients.java
@@ -23,6 +23,7 @@ import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.NamedList;
@@ -35,13 +36,13 @@ public class MockStreamingSolrClients extends StreamingSolrClients {
public MockStreamingSolrClients(UpdateShardHandler updateShardHandler) {
super(updateShardHandler);
}
-
+
@Override
- public synchronized SolrClient getSolrClient(final SolrCmdDistributor.Req req) {
- SolrClient client = super.getSolrClient(req);
- return new MockSolrClient(client);
+ public synchronized SingleStreamClient getClient(SolrCmdDistributor.Req req) {
+ SingleStreamClient client = super.getClient(req);
+ return new MockClient(client);
}
-
+
public void setExp(Exp exp) {
this.exp = exp;
}
@@ -60,37 +61,41 @@ public class MockStreamingSolrClients extends StreamingSolrClients {
return null;
}
- class MockSolrClient extends SolrClient {
+ class MockClient extends SingleStreamClient {
+ SingleStreamClient client;
- private SolrClient solrClient;
-
- public MockSolrClient(SolrClient solrClient) {
- this.solrClient = solrClient;
+ public MockClient(SingleStreamClient client) {
+ super(null, null);
+ this.client = client;
}
-
+
@Override
- public NamedList<Object> request(@SuppressWarnings({"rawtypes"})SolrRequest request, String collection)
- throws SolrServerException, IOException {
+ public void request(SolrCmdDistributor.Req req) {
if (exp != null) {
Exception e = exception();
if (e instanceof IOException) {
if (LuceneTestCase.random().nextBoolean()) {
- throw (IOException)e;
+ this.client.handleError(e, req);
+ return;
} else {
- throw new SolrServerException(e);
+ this.client.handleError(e, req);
+ return;
}
} else if (e instanceof SolrServerException) {
- throw (SolrServerException)e;
+ this.client.handleError(e, req);
+ return;
} else {
- throw new SolrServerException(e);
+ this.client.handleError(e, req);
+ return;
}
}
-
- return solrClient.request(request);
+ this.client.request(req);
}
@Override
- public void close() {}
-
+ public void finish() {
+ this.client.finish();
+ }
}
+
}
diff --git a/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java b/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
index 77f5a89..c4bc9e4 100644
--- a/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
+++ b/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
@@ -19,6 +19,7 @@ package org.apache.solr.update;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
+import java.lang.invoke.MethodHandles;
import java.net.SocketException;
import java.nio.file.Path;
import java.util.ArrayList;
@@ -61,11 +62,14 @@ import org.apache.solr.util.TestInjection;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;
// See: https://issues.apache.org/jira/browse/SOLR-12028 Tests cannot remove files on Windows machines occasionally
public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
-
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static enum NodeType {FORWARD, STANDARD};
private AtomicInteger id = new AtomicInteger();
@@ -359,7 +363,6 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
testDeletes(false, false);
testDeletes(true, true);
testDeletes(true, false);
- getRfFromResponseShouldNotCloseTheInputStream();
testStuckUpdates();
}
@@ -540,22 +543,6 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
assertFalse(req.shouldRetry(err));
}
- public void getRfFromResponseShouldNotCloseTheInputStream() {
- UpdateRequest dbqReq = new UpdateRequest();
- dbqReq.deleteByQuery("*:*");
- SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 1), dbqReq, true);
- AtomicBoolean isClosed = new AtomicBoolean(false);
- ByteArrayInputStream is = new ByteArrayInputStream(new byte[100]) {
- @Override
- public void close() throws IOException {
- isClosed.set(true);
- super.close();
- }
- };
- req.trackRequestResult(null, is, true);
- assertFalse("Underlying stream should not be closed!", isClosed.get());
- }
-
private void testReqShouldRetryMaxRetries() {
Error err = getError(new SocketException());
SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 1), new UpdateRequest(), true);
@@ -787,6 +774,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
}
private void testRetryNodeAgainstBadAddress() throws SolrServerException, IOException {
+ log.info("Datcm start test");
// Test RetryNode
try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateShardHandler)) {
final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 218a096..82788b0 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -266,14 +266,22 @@ public class Http2SolrClient extends SolrClient {
private final OutputStreamContentProvider outProvider;
private final InputStreamResponseListener responseListener;
private final boolean isXml;
+ private final boolean isV2Api;
+ private final String destUri;
+ private boolean closed;
+ private long closedTime;
public OutStream(String origCollection, ModifiableSolrParams origParams,
- OutputStreamContentProvider outProvider, InputStreamResponseListener responseListener, boolean isXml) {
+ OutputStreamContentProvider outProvider, InputStreamResponseListener responseListener,
+ String destUri,
+ boolean isXml, boolean isV2Api) {
this.origCollection = origCollection;
this.origParams = origParams;
this.outProvider = outProvider;
this.responseListener = responseListener;
this.isXml = isXml;
+ this.isV2Api = isV2Api;
+ this.destUri = destUri;
}
boolean belongToThisStream(@SuppressWarnings({"rawtypes"})SolrRequest solrRequest, String collection) {
@@ -294,10 +302,23 @@ public class Http2SolrClient extends SolrClient {
@Override
public void close() throws IOException {
+ if (closed) {
+ return;
+ }
if (isXml) {
write("</stream>".getBytes(FALLBACK_CHARSET));
}
this.outProvider.getOutputStream().close();
+ closed = true;
+ closedTime = System.nanoTime();
+ }
+
+ private long getIdleTimeout(long idleTimeout) {
+ long timePassed = (System.nanoTime() - closedTime) / 1000;
+ if (timePassed >= idleTimeout) {
+ return 0;
+ }
+ return idleTimeout - timePassed;
}
//TODO this class should be hidden
@@ -337,7 +358,8 @@ public class Http2SolrClient extends SolrClient {
boolean isXml = ClientUtils.TEXT_XML.equals(requestWriter.getUpdateContentType());
OutStream outStream = new OutStream(collection, origParams, provider, responseListener,
- isXml);
+ postRequest.getURI().toString(),
+ isXml, isV2ApiRequest(updateRequest));
if (isXml) {
outStream.write("<stream>".getBytes(FALLBACK_CHARSET));
}
@@ -380,6 +402,7 @@ public class Http2SolrClient extends SolrClient {
asyncListener.onFailure(e);
return FAILED_MAKING_REQUEST_CANCELLABLE;
}
+ final boolean isV2Api = isV2ApiRequest(solrRequest);
final ResponseParser parser = solrRequest.getResponseParser() == null
? this.parser: solrRequest.getResponseParser();
req.onRequestQueued(asyncTracker.queuedListener)
@@ -393,7 +416,7 @@ public class Http2SolrClient extends SolrClient {
InputStream is = listener.getInputStream();
assert ObjectReleaseTracker.track(is);
try {
- NamedList<Object> body = processErrorsAndResponse(solrRequest, parser, response, is);
+ NamedList<Object> body = processErrorsAndResponse(parser, response, is, isV2Api);
asyncListener.onSuccess(body);
} catch (RemoteSolrException e) {
if (SolrException.getRootCause(e) != CANCELLED_EXCEPTION) {
@@ -416,43 +439,60 @@ public class Http2SolrClient extends SolrClient {
return () -> req.abort(CANCELLED_EXCEPTION);
}
- @Override
- public NamedList<Object> request(@SuppressWarnings({"rawtypes"}) SolrRequest solrRequest, String collection) throws SolrServerException, IOException {
- Request req = makeRequest(solrRequest, collection);
- final ResponseParser parser = solrRequest.getResponseParser() == null
- ? this.parser: solrRequest.getResponseParser();
+ public void closeStream(OutStream outStream) throws IOException {
+ outStream.close();
+ }
+
+ public NamedList<Object> closeAndGetResponse(OutStream outStream) throws IOException, SolrServerException {
+ outStream.close();
+ return getResponseFrom(outStream.responseListener, outStream.destUri, outStream.isV2Api, null, outStream.getIdleTimeout(idleTimeout));
+ }
+ private NamedList<Object> getResponseFrom(InputStreamResponseListener listener,
+ String destUri, boolean isV2Api,
+ ResponseParser parser, long idleTimeout) throws SolrServerException, IOException {
try {
- InputStreamResponseListener listener = new InputStreamResponseListener();
- req.send(listener);
Response response = listener.get(idleTimeout, TimeUnit.MILLISECONDS);
InputStream is = listener.getInputStream();
assert ObjectReleaseTracker.track(is);
-
- return processErrorsAndResponse(solrRequest, parser, response, is);
+ if (parser == null) {
+ parser = this.parser;
+ }
+ return processErrorsAndResponse(parser, response, is, isV2Api);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new SolrServerException(
- "Timeout occured while waiting response from server at: " + req.getURI(), e);
+ "Timeout occured while waiting response from server at: " + destUri, e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof ConnectException) {
- throw new SolrServerException("Server refused connection at: " + req.getURI(), cause);
+ throw new SolrServerException("Server refused connection at: " + destUri, cause);
}
if (cause instanceof SolrServerException) {
throw (SolrServerException) cause;
} else if (cause instanceof IOException) {
throw new SolrServerException(
- "IOException occured when talking to server at: " + getBaseURL(), cause);
+ "IOException occured when talking to server at: " + destUri, cause);
}
throw new SolrServerException(cause.getMessage(), cause);
}
}
- private NamedList<Object> processErrorsAndResponse(@SuppressWarnings({"rawtypes"})SolrRequest solrRequest,
- ResponseParser parser, Response response, InputStream is) throws SolrServerException {
+ @Override
+ public NamedList<Object> request(@SuppressWarnings({"rawtypes"}) SolrRequest solrRequest, String collection) throws SolrServerException, IOException {
+ Request req = makeRequest(solrRequest, collection);
+ final ResponseParser parser = solrRequest.getResponseParser() == null
+ ? this.parser: solrRequest.getResponseParser();
+
+ InputStreamResponseListener listener = new InputStreamResponseListener();
+ req.send(listener);
+ return getResponseFrom(listener, req.getURI().toString(), isV2ApiRequest(solrRequest), parser, idleTimeout);
+ }
+
+ private NamedList<Object> processErrorsAndResponse(ResponseParser parser, Response response,
+ InputStream is, boolean isV2Api) throws SolrServerException {
ContentType contentType = getContentType(response);
String mimeType = null;
String encoding = null;
@@ -460,7 +500,7 @@ public class Http2SolrClient extends SolrClient {
mimeType = contentType.getMimeType();
encoding = contentType.getCharset() != null? contentType.getCharset().name() : null;
}
- return processErrorsAndResponse(response, parser, is, mimeType, encoding, isV2ApiRequest(solrRequest));
+ return processErrorsAndResponse(response, parser, is, mimeType, encoding, isV2Api);
}
private ContentType getContentType(Response response) {