You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by st...@apache.org on 2023/08/08 16:15:22 UTC

[solr] branch main updated: SOLR-16265 reduce memory usage of ContentWriter based requests in Http2SolrClient (#1789)

This is an automated email from the ASF dual-hosted git repository.

stillalex pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new 9b1aba27340 SOLR-16265 reduce memory usage of ContentWriter based requests in Http2SolrClient (#1789)
9b1aba27340 is described below

commit 9b1aba273407d3b15a4f2a9e8b5a7709f3a5f5d4
Author: Alex <st...@apache.org>
AuthorDate: Tue Aug 8 09:15:16 2023 -0700

    SOLR-16265 reduce memory usage of ContentWriter based requests in Http2SolrClient (#1789)
    
    SOLR-16265 reduce memory usage of ContentWriter based requests in Http2SolrClient
---
 solr/CHANGES.txt                                   |   2 +
 .../solr/client/solrj/impl/Http2SolrClient.java    | 355 +++++++++++----------
 2 files changed, 182 insertions(+), 175 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 312c227110e..b4c7f498832 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -89,6 +89,8 @@ Optimizations
 
 * SOLR-16845: BinaryResponseWriter should not attempt cast to Utf8CharSequence (Alex Deparvu via Houston Putman)
 
+* SOLR-16265: reduce memory usage of ContentWriter based requests in Http2SolrClient (Alex Deparvu, Kevin Risden, David Smiley)
+
 
 Bug Fixes
 ---------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 5bbcadbb1e8..648504a816a 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -16,8 +16,6 @@
  */
 package org.apache.solr.client.solrj.impl;
 
-import static org.apache.solr.client.solrj.impl.BaseHttpSolrClient.RemoteExecutionException;
-import static org.apache.solr.client.solrj.impl.BaseHttpSolrClient.RemoteSolrException;
 import static org.apache.solr.common.util.Utils.getObjectByPath;
 
 import java.io.ByteArrayOutputStream;
@@ -30,7 +28,6 @@ import java.net.ConnectException;
 import java.net.CookieStore;
 import java.net.MalformedURLException;
 import java.net.URL;
-import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -57,6 +54,8 @@ import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.V2RequestSupport;
 import org.apache.solr.client.solrj.embedded.SSLConfig;
+import org.apache.solr.client.solrj.impl.BaseHttpSolrClient.RemoteExecutionException;
+import org.apache.solr.client.solrj.impl.BaseHttpSolrClient.RemoteSolrException;
 import org.apache.solr.client.solrj.impl.HttpListenerFactory.RequestResponseListener;
 import org.apache.solr.client.solrj.request.RequestWriter;
 import org.apache.solr.client.solrj.request.UpdateRequest;
@@ -83,13 +82,12 @@ import org.eclipse.jetty.client.api.Request;
 import org.eclipse.jetty.client.api.Response;
 import org.eclipse.jetty.client.api.Result;
 import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
-import org.eclipse.jetty.client.util.ByteBufferContentProvider;
-import org.eclipse.jetty.client.util.FormContentProvider;
-import org.eclipse.jetty.client.util.InputStreamContentProvider;
+import org.eclipse.jetty.client.util.FormRequestContent;
+import org.eclipse.jetty.client.util.InputStreamRequestContent;
 import org.eclipse.jetty.client.util.InputStreamResponseListener;
-import org.eclipse.jetty.client.util.MultiPartContentProvider;
-import org.eclipse.jetty.client.util.OutputStreamContentProvider;
-import org.eclipse.jetty.client.util.StringContentProvider;
+import org.eclipse.jetty.client.util.MultiPartRequestContent;
+import org.eclipse.jetty.client.util.OutputStreamRequestContent;
+import org.eclipse.jetty.client.util.StringRequestContent;
 import org.eclipse.jetty.http.HttpField;
 import org.eclipse.jetty.http.HttpFields;
 import org.eclipse.jetty.http.HttpHeader;
@@ -328,19 +326,19 @@ public class Http2SolrClient extends SolrClient {
   public static class OutStream implements Closeable {
     private final String origCollection;
     private final ModifiableSolrParams origParams;
-    private final OutputStreamContentProvider outProvider;
+    private final OutputStreamRequestContent content;
     private final InputStreamResponseListener responseListener;
     private final boolean isXml;
 
     public OutStream(
         String origCollection,
         ModifiableSolrParams origParams,
-        OutputStreamContentProvider outProvider,
+        OutputStreamRequestContent content,
         InputStreamResponseListener responseListener,
         boolean isXml) {
       this.origCollection = origCollection;
       this.origParams = origParams;
-      this.outProvider = outProvider;
+      this.content = content;
       this.responseListener = responseListener;
       this.isXml = isXml;
     }
@@ -352,11 +350,11 @@ public class Http2SolrClient extends SolrClient {
     }
 
     public void write(byte b[]) throws IOException {
-      this.outProvider.getOutputStream().write(b);
+      this.content.getOutputStream().write(b);
     }
 
     public void flush() throws IOException {
-      this.outProvider.getOutputStream().flush();
+      this.content.getOutputStream().flush();
     }
 
     @Override
@@ -364,7 +362,7 @@ public class Http2SolrClient extends SolrClient {
       if (isXml) {
         write("</stream>".getBytes(FALLBACK_CHARSET));
       }
-      this.outProvider.getOutputStream().close();
+      this.content.getOutputStream().close();
     }
 
     // TODO this class should be hidden
@@ -388,19 +386,18 @@ public class Http2SolrClient extends SolrClient {
     if (collection != null) basePath += "/" + collection;
     if (!basePath.endsWith("/")) basePath += "/";
 
-    OutputStreamContentProvider provider = new OutputStreamContentProvider();
+    OutputStreamRequestContent content = new OutputStreamRequestContent(contentType);
     Request postRequest =
         httpClient
             .newRequest(basePath + "update" + requestParams.toQueryString())
             .method(HttpMethod.POST)
-            .header(HttpHeader.CONTENT_TYPE, contentType)
-            .content(provider);
-    decorateRequest(postRequest, updateRequest);
+            .body(content);
+    decorateRequest(postRequest, updateRequest, false);
     InputStreamResponseListener responseListener = new InputStreamResponseListener();
     postRequest.send(responseListener);
 
     boolean isXml = ClientUtils.TEXT_XML.equals(requestWriter.getUpdateContentType());
-    OutStream outStream = new OutStream(collection, origParams, provider, responseListener, isXml);
+    OutStream outStream = new OutStream(collection, origParams, content, responseListener, isXml);
     if (isXml) {
       outStream.write("<stream>".getBytes(FALLBACK_CHARSET));
     }
@@ -409,7 +406,7 @@ public class Http2SolrClient extends SolrClient {
 
   public void send(OutStream outStream, SolrRequest<?> req, String collection) throws IOException {
     assert outStream.belongToThisStream(req, collection);
-    this.requestWriter.write(req, outStream.outProvider.getOutputStream());
+    this.requestWriter.write(req, outStream.content.getOutputStream());
     if (outStream.isXml) {
       // check for commit or optimize
       SolrParams params = req.getParams();
@@ -438,81 +435,77 @@ public class Http2SolrClient extends SolrClient {
   private static final Cancellable FAILED_MAKING_REQUEST_CANCELLABLE = () -> {};
 
   public Cancellable asyncRequest(
-      SolrRequest<?> solrRequest,
-      String collection,
-      AsyncListener<NamedList<Object>> asyncListener) {
+      SolrRequest<?> solrReq, String collection, AsyncListener<NamedList<Object>> asyncListener) {
+    MDCCopyHelper mdcCopyHelper = new MDCCopyHelper();
+    SolrRequest<?> solrRequest = unwrapV2Request(solrReq);
+
     Request req;
     try {
-      req = makeRequest(solrRequest, collection);
-    } catch (SolrServerException | IOException e) {
-      asyncListener.onFailure(e);
-      return FAILED_MAKING_REQUEST_CANCELLABLE;
-    }
-    final ResponseParser parser =
-        solrRequest.getResponseParser() == null ? this.parser : solrRequest.getResponseParser();
-    MDCCopyHelper mdcCopyHelper = new MDCCopyHelper();
-    req.onRequestQueued(asyncTracker.queuedListener)
-        .onComplete(asyncTracker.completeListener)
-        .send(
-            new InputStreamResponseListener() {
-              @Override
-              public void onHeaders(Response response) {
-                super.onHeaders(response);
-                InputStreamResponseListener listener = this;
-                executor.execute(
-                    () -> {
-                      InputStream is = listener.getInputStream();
-                      assert ObjectReleaseTracker.track(is);
-                      try {
-                        NamedList<Object> body =
-                            processErrorsAndResponse(
-                                solrRequest, parser, response, is, req.getURI().toString());
+      String url = getRequestPath(solrRequest, collection);
+      InputStreamResponseListener listener =
+          new InputStreamResponseListener() {
+            @Override
+            public void onHeaders(Response response) {
+              super.onHeaders(response);
+              executor.execute(
+                  () -> {
+                    InputStream is = getInputStream();
+                    assert ObjectReleaseTracker.track(is);
+                    try {
+                      NamedList<Object> body =
+                          processErrorsAndResponse(solrRequest, response, is, url);
+                      mdcCopyHelper.onBegin(null);
+                      log.debug("response processing success");
+                      asyncListener.onSuccess(body);
+                    } catch (RemoteSolrException e) {
+                      if (SolrException.getRootCause(e) != CANCELLED_EXCEPTION) {
                         mdcCopyHelper.onBegin(null);
-                        log.debug("response processing success");
-                        asyncListener.onSuccess(body);
-                      } catch (RemoteSolrException e) {
-                        if (SolrException.getRootCause(e) != CANCELLED_EXCEPTION) {
-                          mdcCopyHelper.onBegin(null);
-                          log.debug("response processing failed");
-                          asyncListener.onFailure(e);
-                        }
-                      } catch (SolrServerException e) {
-                        mdcCopyHelper.onBegin(null);
-                        log.debug("response processing failed");
+                        log.debug("response processing failed", e);
                         asyncListener.onFailure(e);
-                      } finally {
-                        log.debug("response processing completed");
-                        mdcCopyHelper.onComplete(null);
                       }
-                    });
-              }
+                    } catch (SolrServerException e) {
+                      mdcCopyHelper.onBegin(null);
+                      log.debug("response processing failed", e);
+                      asyncListener.onFailure(e);
+                    } finally {
+                      log.debug("response processing completed");
+                      mdcCopyHelper.onComplete(null);
+                    }
+                  });
+            }
 
-              @Override
-              public void onFailure(Response response, Throwable failure) {
-                super.onFailure(response, failure);
-                if (failure != CANCELLED_EXCEPTION) {
-                  asyncListener.onFailure(new SolrServerException(failure.getMessage(), failure));
-                }
+            @Override
+            public void onFailure(Response response, Throwable failure) {
+              super.onFailure(response, failure);
+              if (failure != CANCELLED_EXCEPTION) {
+                asyncListener.onFailure(new SolrServerException(failure.getMessage(), failure));
               }
-            });
+            }
+          };
+
+      req = makeRequestAndSend(solrRequest, url, listener, true);
+    } catch (SolrServerException | IOException e) {
+      asyncListener.onFailure(e);
+      return FAILED_MAKING_REQUEST_CANCELLABLE;
+    }
     return () -> req.abort(CANCELLED_EXCEPTION);
   }
 
   @Override
   public NamedList<Object> request(SolrRequest<?> solrRequest, String collection)
       throws SolrServerException, IOException {
-    Request req = makeRequest(solrRequest, collection);
-    final ResponseParser parser =
-        solrRequest.getResponseParser() == null ? this.parser : solrRequest.getResponseParser();
 
+    solrRequest = unwrapV2Request(solrRequest);
+    String url = getRequestPath(solrRequest, collection);
     Throwable abortCause = null;
+    Request req = null;
     try {
       InputStreamResponseListener listener = new InputStreamResponseListener();
-      req.send(listener);
+      req = makeRequestAndSend(solrRequest, url, listener, false);
       Response response = listener.get(idleTimeoutMillis, TimeUnit.MILLISECONDS);
       InputStream is = listener.getInputStream();
       assert ObjectReleaseTracker.track(is);
-      return processErrorsAndResponse(solrRequest, parser, response, is, req.getURI().toString());
+      return processErrorsAndResponse(solrRequest, response, is, req.getURI().toString());
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       abortCause = e;
@@ -544,12 +537,10 @@ public class Http2SolrClient extends SolrClient {
   }
 
   private NamedList<Object> processErrorsAndResponse(
-      SolrRequest<?> solrRequest,
-      ResponseParser parser,
-      Response response,
-      InputStream is,
-      String urlExceptionMessage)
+      SolrRequest<?> solrRequest, Response response, InputStream is, String urlExceptionMessage)
       throws SolrServerException {
+    ResponseParser parser =
+        solrRequest.getResponseParser() == null ? this.parser : solrRequest.getResponseParser();
     String contentType = response.getHeaders().get(HttpHeader.CONTENT_TYPE);
     String mimeType = null;
     String encoding = null;
@@ -566,9 +557,9 @@ public class Http2SolrClient extends SolrClient {
       String encoded =
           basicAuthCredentialsToAuthorizationString(
               solrRequest.getBasicAuthUser(), solrRequest.getBasicAuthPassword());
-      req.header("Authorization", encoded);
+      req.headers(headers -> headers.put("Authorization", encoded));
     } else if (basicAuthAuthorizationStr != null) {
-      req.header("Authorization", basicAuthAuthorizationStr);
+      req.headers(headers -> headers.put("Authorization", basicAuthAuthorizationStr));
     }
   }
 
@@ -577,15 +568,9 @@ public class Http2SolrClient extends SolrClient {
     return "Basic " + Base64.getEncoder().encodeToString(userPass.getBytes(FALLBACK_CHARSET));
   }
 
-  private Request makeRequest(SolrRequest<?> solrRequest, String collection)
-      throws SolrServerException, IOException {
-    Request req = createRequest(solrRequest, collection);
-    decorateRequest(req, solrRequest);
-    return req;
-  }
+  private void decorateRequest(Request req, SolrRequest<?> solrRequest, boolean isAsync) {
+    req.headers(headers -> headers.remove(HttpHeader.ACCEPT_ENCODING));
 
-  private void decorateRequest(Request req, SolrRequest<?> solrRequest) {
-    req.header(HttpHeader.ACCEPT_ENCODING, null);
     if (requestTimeoutMillis > 0) {
       req.timeout(requestTimeoutMillis, TimeUnit.MILLISECONDS);
     } else {
@@ -603,11 +588,17 @@ public class Http2SolrClient extends SolrClient {
       req.onComplete(listener);
     }
 
+    if (isAsync) {
+      req.onRequestQueued(asyncTracker.queuedListener);
+      req.onComplete(asyncTracker.completeListener);
+    }
+
     Map<String, String> headers = solrRequest.getHeaders();
     if (headers != null) {
-      for (Map.Entry<String, String> entry : headers.entrySet()) {
-        req.header(entry.getKey(), entry.getValue());
-      }
+      req.headers(
+          h ->
+              headers.entrySet().stream()
+                  .forEach(entry -> h.add(entry.getKey(), entry.getValue())));
     }
   }
 
@@ -617,38 +608,19 @@ public class Http2SolrClient extends SolrClient {
     return new URL(oldURL.getProtocol(), oldURL.getHost(), oldURL.getPort(), newPath).toString();
   }
 
-  private Request createRequest(SolrRequest<?> solrRequest, String collection)
-      throws IOException, SolrServerException {
+  private SolrRequest<?> unwrapV2Request(SolrRequest<?> solrRequest) {
     if (solrRequest.getBasePath() == null && serverBaseUrl == null)
       throw new IllegalArgumentException("Destination node is not provided!");
 
     if (solrRequest instanceof V2RequestSupport) {
-      solrRequest = ((V2RequestSupport) solrRequest).getV2Request();
-    }
-    SolrParams params = solrRequest.getParams();
-    RequestWriter.ContentWriter contentWriter = requestWriter.getContentWriter(solrRequest);
-    Collection<ContentStream> streams =
-        contentWriter == null ? requestWriter.getContentStreams(solrRequest) : null;
-    String path = requestWriter.getPath(solrRequest);
-    if (path == null || !path.startsWith("/")) {
-      path = DEFAULT_PATH;
-    }
-
-    ResponseParser parser = solrRequest.getResponseParser();
-    if (parser == null) {
-      parser = this.parser;
-    }
-
-    // The parser 'wt=' and 'version=' params are used instead of the original
-    // params
-    ModifiableSolrParams wparams = new ModifiableSolrParams(params);
-    if (parser != null) {
-      wparams.set(CommonParams.WT, parser.getWriterType());
-      wparams.set(CommonParams.VERSION, parser.getVersion());
+      return ((V2RequestSupport) solrRequest).getV2Request();
+    } else {
+      return solrRequest;
     }
+  }
 
-    // TODO add invariantParams support
-
+  private String getRequestPath(SolrRequest<?> solrRequest, String collection)
+      throws MalformedURLException {
     String basePath = solrRequest.getBasePath() == null ? serverBaseUrl : solrRequest.getBasePath();
     if (collection != null) basePath += "/" + collection;
 
@@ -659,62 +631,94 @@ public class Http2SolrClient extends SolrClient {
         basePath = serverBaseUrl + "/____v2";
       }
     }
+    String path = requestWriter.getPath(solrRequest);
+    if (path == null || !path.startsWith("/")) {
+      path = DEFAULT_PATH;
+    }
+
+    return basePath + path;
+  }
+
+  private Request makeRequestAndSend(
+      SolrRequest<?> solrRequest, String url, InputStreamResponseListener listener, boolean isAsync)
+      throws IOException, SolrServerException {
+
+    // TODO add invariantParams support
+    ResponseParser parser =
+        solrRequest.getResponseParser() == null ? this.parser : solrRequest.getResponseParser();
+
+    // The parser 'wt=' and 'version=' params are used instead of the original
+    // params
+    ModifiableSolrParams wparams = new ModifiableSolrParams(solrRequest.getParams());
+    wparams.set(CommonParams.WT, parser.getWriterType());
+    wparams.set(CommonParams.VERSION, parser.getVersion());
 
     if (SolrRequest.METHOD.GET == solrRequest.getMethod()) {
-      if (streams != null || contentWriter != null) {
+      RequestWriter.ContentWriter contentWriter = requestWriter.getContentWriter(solrRequest);
+      Collection<ContentStream> streams =
+          contentWriter == null ? requestWriter.getContentStreams(solrRequest) : null;
+      if (contentWriter != null || streams != null) {
         throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "GET can't send streams!");
       }
-
-      return httpClient
-          .newRequest(basePath + path + wparams.toQueryString())
-          .method(HttpMethod.GET);
+      var r = httpClient.newRequest(url + wparams.toQueryString()).method(HttpMethod.GET);
+      decorateRequest(r, solrRequest, isAsync);
+      r.send(listener);
+      return r;
     }
 
     if (SolrRequest.METHOD.DELETE == solrRequest.getMethod()) {
-      return httpClient
-          .newRequest(basePath + path + wparams.toQueryString())
-          .method(HttpMethod.DELETE);
+      var r = httpClient.newRequest(url + wparams.toQueryString()).method(HttpMethod.DELETE);
+      decorateRequest(r, solrRequest, isAsync);
+      r.send(listener);
+      return r;
     }
 
     if (SolrRequest.METHOD.POST == solrRequest.getMethod()
         || SolrRequest.METHOD.PUT == solrRequest.getMethod()) {
+      RequestWriter.ContentWriter contentWriter = requestWriter.getContentWriter(solrRequest);
+      Collection<ContentStream> streams =
+          contentWriter == null ? requestWriter.getContentStreams(solrRequest) : null;
 
-      String url = basePath + path;
-      boolean hasNullStreamName = false;
+      boolean isMultipart = false;
       if (streams != null) {
+        boolean hasNullStreamName = false;
         hasNullStreamName = streams.stream().anyMatch(cs -> cs.getName() == null);
+        isMultipart = !hasNullStreamName && streams.size() > 1;
       }
 
-      boolean isMultipart = streams != null && streams.size() > 1 && !hasNullStreamName;
-
       HttpMethod method =
           SolrRequest.METHOD.POST == solrRequest.getMethod() ? HttpMethod.POST : HttpMethod.PUT;
 
       if (contentWriter != null) {
-        Request req = httpClient.newRequest(url + wparams.toQueryString()).method(method);
-        Utils.BAOS baos = new Utils.BAOS();
-        contentWriter.write(baos);
-
-        // SOLR-16265: TODO reduce memory usage
-        return req.content(
-            // We're throwing this BAOS away, so no need to copy the byte[], just use the raw buf
-            new ByteBufferContentProvider(
-                contentWriter.getContentType(), ByteBuffer.wrap(baos.getbuf(), 0, baos.size())));
+        var content = new OutputStreamRequestContent(contentWriter.getContentType());
+        var r = httpClient.newRequest(url + wparams.toQueryString()).method(method).body(content);
+        decorateRequest(r, solrRequest, isAsync);
+        r.send(listener);
+        try (var output = content.getOutputStream()) {
+          contentWriter.write(output);
+        }
+        return r;
+
       } else if (streams == null || isMultipart) {
         // send server list and request list as query string params
         ModifiableSolrParams queryParams = calculateQueryParams(this.urlParamNames, wparams);
         queryParams.add(calculateQueryParams(solrRequest.getQueryParams(), wparams));
         Request req = httpClient.newRequest(url + queryParams.toQueryString()).method(method);
-        return fillContentStream(req, streams, wparams, isMultipart);
+        var r = fillContentStream(req, streams, wparams, isMultipart);
+        decorateRequest(r, solrRequest, isAsync);
+        r.send(listener);
+        return r;
+
       } else {
         // It is has one stream, it is the post body, put the params in the URL
         ContentStream contentStream = streams.iterator().next();
-        return httpClient
-            .newRequest(url + wparams.toQueryString())
-            .method(method)
-            .content(
-                new InputStreamContentProvider(contentStream.getStream()),
-                contentStream.getContentType());
+        var content =
+            new InputStreamRequestContent(
+                contentStream.getContentType(), contentStream.getStream());
+        var r = httpClient.newRequest(url + wparams.toQueryString()).method(method).body(content);
+        decorateRequest(r, solrRequest, isAsync);
+        r.send(listener);
+        return r;
       }
     }
 
@@ -729,37 +733,38 @@ public class Http2SolrClient extends SolrClient {
       throws IOException {
     if (isMultipart) {
       // multipart/form-data
-      MultiPartContentProvider content = new MultiPartContentProvider();
-      Iterator<String> iter = wparams.getParameterNamesIterator();
-      while (iter.hasNext()) {
-        String key = iter.next();
-        String[] vals = wparams.getParams(key);
-        if (vals != null) {
-          for (String val : vals) {
-            content.addFieldPart(key, new StringContentProvider(val), null);
+      try (MultiPartRequestContent content = new MultiPartRequestContent()) {
+        Iterator<String> iter = wparams.getParameterNamesIterator();
+        while (iter.hasNext()) {
+          String key = iter.next();
+          String[] vals = wparams.getParams(key);
+          if (vals != null) {
+            for (String val : vals) {
+              content.addFieldPart(key, new StringRequestContent(val), null);
+            }
           }
         }
-      }
-      if (streams != null) {
-        for (ContentStream contentStream : streams) {
-          String contentType = contentStream.getContentType();
-          if (contentType == null) {
-            contentType = "multipart/form-data"; // default
-          }
-          String name = contentStream.getName();
-          if (name == null) {
-            name = "";
+        if (streams != null) {
+          for (ContentStream contentStream : streams) {
+            String contentType = contentStream.getContentType();
+            if (contentType == null) {
+              contentType = "multipart/form-data"; // default
+            }
+            String name = contentStream.getName();
+            if (name == null) {
+              name = "";
+            }
+            HttpFields.Mutable fields = HttpFields.build(1);
+            fields.add(HttpHeader.CONTENT_TYPE, contentType);
+            content.addFilePart(
+                name,
+                contentStream.getName(),
+                new InputStreamRequestContent(contentStream.getStream()),
+                fields);
           }
-          HttpFields.Mutable fields = HttpFields.build(1);
-          fields.add(HttpHeader.CONTENT_TYPE, contentType);
-          content.addFilePart(
-              name,
-              contentStream.getName(),
-              new InputStreamContentProvider(contentStream.getStream()),
-              fields);
         }
+        req.body(content);
       }
-      req.content(content);
     } else {
       // application/x-www-form-urlencoded
       Fields fields = new Fields();
@@ -773,7 +778,7 @@ public class Http2SolrClient extends SolrClient {
           }
         }
       }
-      req.content(new FormContentProvider(fields, FALLBACK_CHARSET));
+      req.body(new FormRequestContent(fields, FALLBACK_CHARSET));
     }
 
     return req;