You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by tf...@apache.org on 2017/05/23 00:03:37 UTC
[15/27] lucene-solr:jira/solr-10233: SOLR-10698: StreamHandler should
allow connections to be closed early
SOLR-10698: StreamHandler should allow connections to be closed early
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/02b1c8aa
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/02b1c8aa
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/02b1c8aa
Branch: refs/heads/jira/solr-10233
Commit: 02b1c8aa360c8c87bf4cc20b688d7993ec6d7b1b
Parents: f62248c
Author: Joel Bernstein <jb...@apache.org>
Authored: Mon May 22 10:43:40 2017 -0400
Committer: Joel Bernstein <jb...@apache.org>
Committed: Mon May 22 10:44:02 2017 -0400
----------------------------------------------------------------------
.../org/apache/solr/handler/ExportWriter.java | 1 -
.../solr/client/solrj/impl/HttpSolrClient.java | 2 ++
.../solr/client/solrj/io/stream/SolrStream.java | 14 ++++----
.../client/solrj/io/stream/TupleStream.java | 36 ++++++++++++++++++--
4 files changed, 41 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/02b1c8aa/solr/core/src/java/org/apache/solr/handler/ExportWriter.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/ExportWriter.java b/solr/core/src/java/org/apache/solr/handler/ExportWriter.java
index bd43d5f..46ec3a4 100644
--- a/solr/core/src/java/org/apache/solr/handler/ExportWriter.java
+++ b/solr/core/src/java/org/apache/solr/handler/ExportWriter.java
@@ -270,7 +270,6 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
}
} catch(Throwable e) {
Throwable ex = e;
- e.printStackTrace();
while(ex != null) {
String m = ex.getMessage();
if(m != null && m.contains("Broken pipe")) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/02b1c8aa/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
index 1692aa9..dea1711 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
@@ -544,6 +544,7 @@ public class HttpSolrClient extends SolrClient {
// Execute the method.
HttpClientContext httpClientRequestContext = HttpClientUtil.createNewHttpClientRequestContext();
final HttpResponse response = httpClient.execute(method, httpClientRequestContext);
+
int httpStatus = response.getStatusLine().getStatusCode();
// Read the contents
@@ -582,6 +583,7 @@ public class HttpSolrClient extends SolrClient {
// no processor specified, return raw stream
NamedList<Object> rsp = new NamedList<>();
rsp.add("stream", respBody);
+ rsp.add("closeableResponse", response);
// Only case where stream should not be closed
shouldClose = false;
return rsp;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/02b1c8aa/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
index f132815..2bb2e1c 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
@@ -26,6 +26,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
@@ -68,6 +70,7 @@ public class SolrStream extends TupleStream {
private transient SolrClientCache cache;
private String slice;
private long checkpoint = -1;
+ private CloseableHttpResponse closeableHttpResponse;
/**
* @param baseUrl Base URL of the stream.
@@ -188,11 +191,7 @@ public class SolrStream extends TupleStream {
* */
public void close() throws IOException {
-
- if (tupleStreamParser != null) {
- tupleStreamParser.close();
- }
-
+ closeableHttpResponse.close();
if(cache == null) {
client.close();
}
@@ -266,7 +265,7 @@ public class SolrStream extends TupleStream {
}
// temporary...
- public static TupleStreamParser constructParser(SolrClient server, SolrParams requestParams) throws IOException, SolrServerException {
+ public TupleStreamParser constructParser(SolrClient server, SolrParams requestParams) throws IOException, SolrServerException {
String p = requestParams.get("qt");
if (p != null) {
ModifiableSolrParams modifiableSolrParams = (ModifiableSolrParams) requestParams;
@@ -280,6 +279,7 @@ public class SolrStream extends TupleStream {
query.setMethod(SolrRequest.METHOD.POST);
NamedList<Object> genericResponse = server.request(query);
InputStream stream = (InputStream) genericResponse.get("stream");
+ this.closeableHttpResponse = (CloseableHttpResponse)genericResponse.get("closeableResponse");
if (CommonParams.JAVABIN.equals(wt)) {
return new JavabinTupleStreamParser(stream, true);
} else {
@@ -287,6 +287,4 @@ public class SolrStream extends TupleStream {
return new JSONTupleStream(reader);
}
}
-
-
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/02b1c8aa/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
index ceea6af..0542bd6 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
@@ -18,7 +18,9 @@ package org.apache.solr.client.solrj.io.stream;
import java.io.Closeable;
import java.io.IOException;
+import java.io.PrintWriter;
import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -44,10 +46,14 @@ import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.StrUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public abstract class TupleStream implements Closeable, Serializable, MapWriter {
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
private static final long serialVersionUID = 1;
private UUID streamNodeId = UUID.randomUUID();
@@ -78,7 +84,7 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter
open();
ew.put("docs", (IteratorWriter) iw -> {
try {
- for (; ; ) {
+ for ( ; ; ) {
Tuple tuple = read();
if (tuple != null) {
iw.add(tuple);
@@ -90,8 +96,22 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter
break;
}
}
- } catch (IOException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ } catch (Throwable e) {
+ close();
+ Throwable ex = e;
+ while(ex != null) {
+ String m = ex.getMessage();
+ if(m != null && m.contains("Broken pipe")) {
+ throw new IgnoreException();
+ }
+ ex = ex.getCause();
+ }
+
+ if(e instanceof IOException) {
+ throw e;
+ } else {
+ throw new IOException(e);
+ }
}
});
}
@@ -178,4 +198,14 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter
throw new IOException("Slices not found for " + collectionName);
}
+
+ public static class IgnoreException extends IOException {
+ public void printStackTrace(PrintWriter pw) {
+ pw.print("Early Client Disconnect");
+ }
+
+ public String getMessage() {
+ return "Early Client Disconnect";
+ }
+ }
}
\ No newline at end of file