You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by cp...@apache.org on 2017/05/25 17:36:53 UTC

[48/59] [abbrv] lucene-solr:jira/solr-8668: SOLR-10698: StreamHandler should allow connections to be closed early

SOLR-10698: StreamHandler should allow connections to be closed early


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/02b1c8aa
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/02b1c8aa
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/02b1c8aa

Branch: refs/heads/jira/solr-8668
Commit: 02b1c8aa360c8c87bf4cc20b688d7993ec6d7b1b
Parents: f62248c
Author: Joel Bernstein <jb...@apache.org>
Authored: Mon May 22 10:43:40 2017 -0400
Committer: Joel Bernstein <jb...@apache.org>
Committed: Mon May 22 10:44:02 2017 -0400

----------------------------------------------------------------------
 .../org/apache/solr/handler/ExportWriter.java   |  1 -
 .../solr/client/solrj/impl/HttpSolrClient.java  |  2 ++
 .../solr/client/solrj/io/stream/SolrStream.java | 14 ++++----
 .../client/solrj/io/stream/TupleStream.java     | 36 ++++++++++++++++++--
 4 files changed, 41 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/02b1c8aa/solr/core/src/java/org/apache/solr/handler/ExportWriter.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/ExportWriter.java b/solr/core/src/java/org/apache/solr/handler/ExportWriter.java
index bd43d5f..46ec3a4 100644
--- a/solr/core/src/java/org/apache/solr/handler/ExportWriter.java
+++ b/solr/core/src/java/org/apache/solr/handler/ExportWriter.java
@@ -270,7 +270,6 @@ public class ExportWriter implements SolrCore.RawWriter, Closeable {
         }
       } catch(Throwable e) {
         Throwable ex = e;
-        e.printStackTrace();
         while(ex != null) {
           String m = ex.getMessage();
           if(m != null && m.contains("Broken pipe")) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/02b1c8aa/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
index 1692aa9..dea1711 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
@@ -544,6 +544,7 @@ public class HttpSolrClient extends SolrClient {
       // Execute the method.
       HttpClientContext httpClientRequestContext = HttpClientUtil.createNewHttpClientRequestContext();
       final HttpResponse response = httpClient.execute(method, httpClientRequestContext);
+
       int httpStatus = response.getStatusLine().getStatusCode();
       
       // Read the contents
@@ -582,6 +583,7 @@ public class HttpSolrClient extends SolrClient {
         // no processor specified, return raw stream
         NamedList<Object> rsp = new NamedList<>();
         rsp.add("stream", respBody);
+        rsp.add("closeableResponse", response);
         // Only case where stream should not be closed
         shouldClose = false;
         return rsp;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/02b1c8aa/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
index f132815..2bb2e1c 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/SolrStream.java
@@ -26,6 +26,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
@@ -68,6 +70,7 @@ public class SolrStream extends TupleStream {
   private transient SolrClientCache cache;
   private String slice;
   private long checkpoint = -1;
+  private CloseableHttpResponse closeableHttpResponse;
 
   /**
    * @param baseUrl Base URL of the stream.
@@ -188,11 +191,7 @@ public class SolrStream extends TupleStream {
   * */
 
   public void close() throws IOException {
-
-    if (tupleStreamParser != null) {
-      tupleStreamParser.close();
-    }
-
+    closeableHttpResponse.close();
     if(cache == null) {
       client.close();
     }
@@ -266,7 +265,7 @@ public class SolrStream extends TupleStream {
   }
 
   // temporary...
-  public static TupleStreamParser constructParser(SolrClient server, SolrParams requestParams) throws IOException, SolrServerException {
+  public TupleStreamParser constructParser(SolrClient server, SolrParams requestParams) throws IOException, SolrServerException {
     String p = requestParams.get("qt");
     if (p != null) {
       ModifiableSolrParams modifiableSolrParams = (ModifiableSolrParams) requestParams;
@@ -280,6 +279,7 @@ public class SolrStream extends TupleStream {
     query.setMethod(SolrRequest.METHOD.POST);
     NamedList<Object> genericResponse = server.request(query);
     InputStream stream = (InputStream) genericResponse.get("stream");
+    this.closeableHttpResponse = (CloseableHttpResponse)genericResponse.get("closeableResponse");
     if (CommonParams.JAVABIN.equals(wt)) {
       return new JavabinTupleStreamParser(stream, true);
     } else {
@@ -287,6 +287,4 @@ public class SolrStream extends TupleStream {
       return new JSONTupleStream(reader);
     }
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/02b1c8aa/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
index ceea6af..0542bd6 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
@@ -18,7 +18,9 @@ package org.apache.solr.client.solrj.io.stream;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.io.PrintWriter;
 import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -44,10 +46,14 @@ import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.StrUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public abstract class TupleStream implements Closeable, Serializable, MapWriter {
 
+  private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   private static final long serialVersionUID = 1;
   
   private UUID streamNodeId = UUID.randomUUID();
@@ -78,7 +84,7 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter
     open();
     ew.put("docs", (IteratorWriter) iw -> {
       try {
-        for (; ; ) {
+        for ( ; ; ) {
           Tuple tuple = read();
           if (tuple != null) {
             iw.add(tuple);
@@ -90,8 +96,22 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter
             break;
           }
         }
-      } catch (IOException e) {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+      } catch (Throwable e) {
+        close();
+        Throwable ex = e;
+        while(ex != null) {
+          String m = ex.getMessage();
+          if(m != null && m.contains("Broken pipe")) {
+            throw new IgnoreException();
+          }
+          ex = ex.getCause();
+        }
+
+        if(e instanceof IOException) {
+          throw e;
+        } else {
+          throw new IOException(e);
+        }
       }
     });
   }
@@ -178,4 +198,14 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter
 
     throw new IOException("Slices not found for " + collectionName);
   }
+
+  public static class IgnoreException extends IOException {
+    public void printStackTrace(PrintWriter pw) {
+      pw.print("Early Client Disconnect");
+    }
+
+    public String getMessage() {
+      return "Early Client Disconnect";
+    }
+  }
 }
\ No newline at end of file