You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2019/08/30 21:55:56 UTC

[lucene-solr] branch jira/SOLR-13722 updated: with tests

This is an automated email from the ASF dual-hosted git repository.

noble pushed a commit to branch jira/SOLR-13722
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/jira/SOLR-13722 by this push:
     new 81f1517  with tests
81f1517 is described below

commit 81f151738a2882fe8bcf3fde581343679ec93dcf
Author: noble <no...@apache.org>
AuthorDate: Sat Aug 31 07:55:36 2019 +1000

    with tests
---
 .../java/org/apache/solr/core/BlobRepository.java  |  50 +++++-----
 .../solr/handler/admin/BaseHandlerApiSupport.java  |   4 +
 .../solr/handler/admin/CollectionHandlerApi.java   |  21 ++---
 .../solr/handler/TestContainerReqHandler.java      | 101 +++++++++++++++------
 .../client/solrj/request/CollectionApiMapping.java |  13 ++-
 .../solr/client/solrj/request/V2Request.java       |  22 +++++
 .../java/org/apache/solr/common/util/Utils.java    |  52 +++++++++--
 7 files changed, 188 insertions(+), 75 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/core/BlobRepository.java b/solr/core/src/java/org/apache/solr/core/BlobRepository.java
index c3f008b..f053c20 100644
--- a/solr/core/src/java/org/apache/solr/core/BlobRepository.java
+++ b/solr/core/src/java/org/apache/solr/core/BlobRepository.java
@@ -56,7 +56,6 @@ import org.apache.solr.common.params.CollectionAdminParams;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.util.JavaBinCodec;
 import org.apache.solr.common.util.StrUtils;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.handler.RequestHandlerBase;
@@ -126,7 +125,6 @@ public class BlobRepository {
           return MapWriter.EMPTY;
         }
       }
-
     }
     return ew -> dir.listFiles((f, name) -> {
       if (sha256 == null || name.equals(sha256)) {
@@ -153,17 +151,14 @@ public class BlobRepository {
     coreContainer.getUpdateShardHandler().getUpdateExecutor().submit(() -> {
       String fromUrl = url.replace("/solr", "/api") + "/node/blob/" + sha256;
       try {
-        Utils.executeGET(coreContainer.getUpdateShardHandler().getDefaultHttpClient(), fromUrl, is -> {
-          ByteBuffer b = SimplePostTool.inputStreamToByteArray(is);
-          String actualSha256 = sha256Digest(b);
-          if (sha256.equals(actualSha256)) {
-            putBlob(b, sha256);
-            result[0] = b;
-          } else {
-            log.error("expected sha256 : {} actual sha256: {} from blob downloaded from {} ", sha256, actualSha256, fromNode);
-          }
-
-        });
+        HttpClient httpClient = coreContainer.getUpdateShardHandler().getDefaultHttpClient();
+        result[0] = Utils.executeGET(httpClient, fromUrl, Utils.newBytesConsumer((int) MAX_JAR_SIZE));
+        String actualSha256 = sha256Digest(result[0]);
+        if (sha256.equals(actualSha256)) {
+          putBlob(result[0], sha256);
+        } else {
+          log.error("expected sha256 : {} actual sha256: {} from blob downloaded from {} ", sha256, actualSha256, fromNode);
+        }
       } catch (IOException e) {
         log.error("Unable to fetch jar: {} from node: {}", sha256, fromNode);
       }
@@ -269,9 +264,7 @@ public class BlobRepository {
 
   private ByteBuffer fetchFromOtherNodes(String sha256) {
     ByteBuffer[] result = new ByteBuffer[1];
-    Set<String> liveNodes = coreContainer.getZkController().getZkStateReader().getClusterState().getLiveNodes();
-    ArrayList<String> l = new ArrayList(liveNodes);
-    Collections.shuffle(l, RANDOM);
+    ArrayList<String> l = shuffledNodes();
     ModifiableSolrParams solrParams = new ModifiableSolrParams();
     solrParams.add(SHA256, sha256);
     ZkStateReader stateReader = coreContainer.getZkController().getZkStateReader();
@@ -280,14 +273,13 @@ public class BlobRepository {
         String baseurl = stateReader.getBaseUrlForNodeName(liveNode);
         String url = baseurl.replace("/solr", "/api");
         String reqUrl = url + "/node/blob?wt=javabin&omitHeader=true&sha256=" + sha256;
-        boolean[] nodeHasBlob = new boolean[]{false};
-        Utils.executeGET(coreContainer.getUpdateShardHandler().getDefaultHttpClient(), reqUrl, is -> {
-          Object nl = new JavaBinCodec().unmarshal(is);
-          if (Utils.getObjectByPath(nl, false, Arrays.asList("blob", sha256)) != null) {
-            nodeHasBlob[0] = true;
-          }
-        });
-        if (nodeHasBlob[0]) {
+        boolean nodeHasBlob = false;
+        Object nl = Utils.executeGET(coreContainer.getUpdateShardHandler().getDefaultHttpClient(), reqUrl, Utils.JAVABINCONSUMER);
+        if (Utils.getObjectByPath(nl, false, Arrays.asList("blob", sha256)) != null) {
+          nodeHasBlob = true;
+        }
+
+        if (nodeHasBlob) {
           result[0] = fetchBlobFromNodeAndPersist(sha256, liveNode);
           if (result[0] != null) break;
         }
@@ -299,6 +291,16 @@ public class BlobRepository {
     return result[0];
   }
 
+  /** get a list of nodes randomly shuffled
+   * * @lucene.internal
+   */
+  public ArrayList<String> shuffledNodes() {
+    Set<String> liveNodes = coreContainer.getZkController().getZkStateReader().getClusterState().getLiveNodes();
+    ArrayList<String> l = new ArrayList(liveNodes);
+    Collections.shuffle(l, RANDOM);
+    return l;
+  }
+
   private ByteBuffer getAndValidate(String key, String url, String sha256) throws IOException {
     ByteBuffer byteBuffer = fetchFromUrl(key, url);
     String computedDigest = sha256Digest(byteBuffer);
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/BaseHandlerApiSupport.java b/solr/core/src/java/org/apache/solr/handler/admin/BaseHandlerApiSupport.java
index 85033f3..51e8e52 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/BaseHandlerApiSupport.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/BaseHandlerApiSupport.java
@@ -79,6 +79,10 @@ public abstract class BaseHandlerApiSupport implements ApiSupport {
         SolrRequest.METHOD method = SolrRequest.METHOD.valueOf(req.getHttpMethod());
         List<ApiCommand> commands = commandsMapping.get(method).get(op);
         try {
+          if(commands!=null &&  commands.size() == 1 && commands.get(0).meta().isRaw()){
+            commands.get(0).invoke(req,rsp, apiHandler);
+            return;
+          }
           if (method == POST) {
             List<CommandOperation> cmds = req.getCommands(true);
             if (cmds.size() > 1)
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionHandlerApi.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionHandlerApi.java
index 96d10a0..f473c63 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionHandlerApi.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionHandlerApi.java
@@ -28,11 +28,9 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 
 import org.apache.solr.api.ApiBag;
 import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.CollectionApiMapping;
 import org.apache.solr.client.solrj.request.CollectionApiMapping.CommandMeta;
 import org.apache.solr.client.solrj.request.CollectionApiMapping.Meta;
@@ -127,23 +125,25 @@ public class CollectionHandlerApi extends BaseHandlerApiSupport {
       String sha256 = BlobRepository.sha256Digest(buf);
       CoreContainer coreContainer = ((CollectionHandlerApi) info.apiHandler).handler.coreContainer;
       coreContainer.getBlobRepository().putBlob(buf, sha256);
-      Set<String> nodes = coreContainer.getZkController().getZkStateReader().getClusterState().getLiveNodes();
-
+      List<String> nodes = coreContainer.getBlobRepository().shuffledNodes();
       int i = 0;
       for (String node : nodes) {
         String baseUrl = coreContainer.getZkController().getZkStateReader().getBaseUrlForNodeName(node);
         String url = baseUrl.replace("/solr", "/api") + "/node/blob?sha256=" + sha256 + "&fromNode=";
-        if (i <= 4) {
-          // the first 5 nodes will be asked to fetch from this node
-          url = coreContainer.getNodeConfig().getNodeName();
+        if (i < 10 ) {
+          // the first 10 nodes will be asked to fetch from this node
+          url += coreContainer.getZkController().getNodeName();
         } else {
           // trying to avoid the thundering herd problem when there are a very large no:of nodes
           // others should try to fetch it from any node where it is available
           url += "*";
         }
         try {
+          //fire and forget
           Utils.executeGET(coreContainer.getUpdateShardHandler().getDefaultHttpClient(), url, null);
-        } catch (IOException e) {
+        } catch (Exception e) {
+          log.info( "Node: " +node+
+              " failed to respond for blob notification",e );
           //ignore the exception
           // some nodes may be down or not responding
         }
@@ -414,10 +414,5 @@ public class CollectionHandlerApi extends BaseHandlerApiSupport {
     }
   }
 
-  public static void postBlob(String baseUrl, ByteBuffer buf) throws IOException {
-    try (HttpSolrClient client = new HttpSolrClient.Builder(baseUrl + "/____v2/node/blob").build()) {
-
-    }
-  }
 
 }
diff --git a/solr/core/src/test/org/apache/solr/handler/TestContainerReqHandler.java b/solr/core/src/test/org/apache/solr/handler/TestContainerReqHandler.java
index 7f9271a..af63df6 100644
--- a/solr/core/src/test/org/apache/solr/handler/TestContainerReqHandler.java
+++ b/solr/core/src/test/org/apache/solr/handler/TestContainerReqHandler.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.Callable;
 import java.util.function.Predicate;
 
 import com.google.common.collect.ImmutableMap;
@@ -34,8 +35,8 @@ import org.apache.solr.client.solrj.ResponseParser;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.SolrResponse;
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.impl.BaseHttpSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
@@ -48,6 +49,7 @@ import org.apache.solr.client.solrj.response.V2Response;
 import org.apache.solr.cloud.ConfigRequest;
 import org.apache.solr.cloud.MiniSolrCloudCluster;
 import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.NavigableObject;
 import org.apache.solr.common.cloud.ClusterProperties;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.params.MapSolrParams;
@@ -61,7 +63,6 @@ import org.apache.solr.core.MemClassLoader;
 import org.apache.solr.core.RuntimeLib;
 import org.apache.solr.request.SolrRequestHandler;
 import org.apache.solr.util.LogLevel;
-import org.apache.solr.util.SimplePostTool;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.data.Stat;
 import org.eclipse.jetty.server.Server;
@@ -75,7 +76,9 @@ import static java.util.Arrays.asList;
 import static org.apache.solr.cloud.TestCryptoKeys.readFile;
 import static org.apache.solr.common.params.CommonParams.JAVABIN;
 import static org.apache.solr.common.params.CommonParams.WT;
+import static org.apache.solr.common.util.Utils.JAVABINCONSUMER;
 import static org.apache.solr.common.util.Utils.getObjectByPath;
+import static org.apache.solr.common.util.Utils.newBytesConsumer;
 import static org.apache.solr.core.BlobRepository.sha256Digest;
 import static org.apache.solr.core.TestDynamicLoading.getFileContent;
 import static org.apache.solr.core.TestDynamicLoadingUrl.runHttpServer;
@@ -92,15 +95,21 @@ public class TestContainerReqHandler extends SolrCloudTestCase {
 
   }
 
-  static SolrResponse assertResponseValues(int repeats, SolrClient client, SolrRequest req, Map vals) throws Exception {
-    SolrResponse rsp = null;
+  static NavigableObject assertResponseValues(int repeats, SolrClient client, SolrRequest req, Map vals) throws Exception {
+    Callable<NavigableObject> callable = () -> req.process(client);
+
+    return assertResponseValues(repeats, callable,vals);
+  }
+
+  static NavigableObject assertResponseValues(int repeats,  Callable<NavigableObject> callable,Map vals) throws Exception {
+    NavigableObject rsp = null;
 
     for (int i = 0; i < repeats; i++) {
       if (i > 0) {
         Thread.sleep(100);
       }
       try {
-        rsp = req.process(client);
+        rsp = callable.call();
       } catch (Exception e) {
         if (i >= repeats - 1) throw e;
         continue;
@@ -245,29 +254,27 @@ public class TestContainerReqHandler extends SolrCloudTestCase {
           (Predicate<Object>) o -> o instanceof List && ((List) o).isEmpty()));
 
 
-      String  baseUrl = cluster.getRandomJetty(random()).getBaseUrl().toString();
-      try(HttpSolrClient client = new HttpSolrClient.Builder(baseUrl).build()){
+      String baseUrl = cluster.getRandomJetty(random()).getBaseUrl().toString();
+      try (HttpSolrClient client = new HttpSolrClient.Builder(baseUrl).build()) {
         V2Response rsp = new V2Request.Builder("/node/blob")
             .withMethod(SolrRequest.METHOD.GET)
             .forceV2(true)
             .build()
             .process(client);
         assertNotNull(rsp._get(asList("blob", "e1f9e23988c19619402f1040c9251556dcd6e02b9d3e3b966a129ea1be5c70fc"), null));
-        assertNotNull(rsp._get(asList ("blob", "20e0bfaec71b2e93c4da9f2ed3745dda04dc3fc915b66cc0275863982e73b2a3"), null));
-        Utils.executeGET(client.getHttpClient(), baseUrl.replace("/solr", "/api")+"/node/blob/e1f9e23988c19619402f1040c9251556dcd6e02b9d3e3b966a129ea1be5c70fc",
-            is -> {
-              ByteBuffer buf = SimplePostTool.inputStreamToByteArray(is);
-              assertEquals("e1f9e23988c19619402f1040c9251556dcd6e02b9d3e3b966a129ea1be5c70fc", sha256Digest(buf));
-            });
-        Utils.executeGET(client.getHttpClient(), baseUrl.replace("/solr", "/api") + "/node/blob/20e0bfaec71b2e93c4da9f2ed3745dda04dc3fc915b66cc0275863982e73b2a3",
-            is -> {
-              ByteBuffer buf = SimplePostTool.inputStreamToByteArray(is);
-              assertEquals("20e0bfaec71b2e93c4da9f2ed3745dda04dc3fc915b66cc0275863982e73b2a3", sha256Digest(buf));
-            });
+        assertNotNull(rsp._get(asList("blob", "20e0bfaec71b2e93c4da9f2ed3745dda04dc3fc915b66cc0275863982e73b2a3"), null));
 
-      }
+        ByteBuffer buf = Utils.executeGET(client.getHttpClient(),
+            baseUrl.replace("/solr", "/api") + "/node/blob/e1f9e23988c19619402f1040c9251556dcd6e02b9d3e3b966a129ea1be5c70fc",
+            newBytesConsumer(Integer.MAX_VALUE));
+        assertEquals("e1f9e23988c19619402f1040c9251556dcd6e02b9d3e3b966a129ea1be5c70fc", sha256Digest(buf));
 
 
+        buf = Utils.executeGET(client.getHttpClient(), baseUrl.replace("/solr", "/api") + "/node/blob/20e0bfaec71b2e93c4da9f2ed3745dda04dc3fc915b66cc0275863982e73b2a3",
+            newBytesConsumer(Integer.MAX_VALUE));
+        assertEquals("20e0bfaec71b2e93c4da9f2ed3745dda04dc3fc915b66cc0275863982e73b2a3", sha256Digest(buf));
+
+      }
 
 
     } finally {
@@ -525,7 +532,7 @@ public class TestContainerReqHandler extends SolrCloudTestCase {
 
       SolrParams params = new MapSolrParams((Map) Utils.makeMap("collection", COLLECTION_NAME,
           WT, JAVABIN,
-          "meta","true"));
+          "meta", "true"));
 
       assertResponseValues(10,
           cluster.getSolrClient(),
@@ -537,7 +544,7 @@ public class TestContainerReqHandler extends SolrCloudTestCase {
 
       params = new MapSolrParams((Map) Utils.makeMap("collection", COLLECTION_NAME,
           WT, JAVABIN,
-          "meta","true"));
+          "meta", "true"));
 
       assertResponseValues(10,
           cluster.getSolrClient(),
@@ -549,7 +556,7 @@ public class TestContainerReqHandler extends SolrCloudTestCase {
 
       params = new MapSolrParams((Map) Utils.makeMap("collection", COLLECTION_NAME,
           WT, JAVABIN,
-          "meta","true"));
+          "meta", "true"));
 
       assertResponseValues(10,
           cluster.getSolrClient(),
@@ -622,7 +629,7 @@ public class TestContainerReqHandler extends SolrCloudTestCase {
 
       params = new MapSolrParams((Map) Utils.makeMap("collection", COLLECTION_NAME,
           WT, JAVABIN,
-          "meta","true"));
+          "meta", "true"));
 
       assertResponseValues(10,
           cluster.getSolrClient(),
@@ -634,7 +641,7 @@ public class TestContainerReqHandler extends SolrCloudTestCase {
 
       params = new MapSolrParams((Map) Utils.makeMap("collection", COLLECTION_NAME,
           WT, JAVABIN,
-          "meta","true"));
+          "meta", "true"));
 
       assertResponseValues(10,
           cluster.getSolrClient(),
@@ -646,7 +653,7 @@ public class TestContainerReqHandler extends SolrCloudTestCase {
 
       params = new MapSolrParams((Map) Utils.makeMap("collection", COLLECTION_NAME,
           WT, JAVABIN,
-          "meta","true"));
+          "meta", "true"));
 
       assertResponseValues(10,
           cluster.getSolrClient(),
@@ -657,7 +664,6 @@ public class TestContainerReqHandler extends SolrCloudTestCase {
           ));
 
 
-
       try {
         new V2Request.Builder("/cluster")
             .withPayload(payload)
@@ -683,7 +689,7 @@ public class TestContainerReqHandler extends SolrCloudTestCase {
 
   }
 
-//  @AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-13650")
+  //  @AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-13650")
   public void testCacheLoadFromPackage() throws Exception {
     String COLLECTION_NAME = "globalCacheColl";
     Map<String, Object> jars = Utils.makeMap(
@@ -731,7 +737,7 @@ public class TestContainerReqHandler extends SolrCloudTestCase {
 
       params = new MapSolrParams((Map) Utils.makeMap("collection", COLLECTION_NAME,
           WT, JAVABIN,
-          "meta","true"));
+          "meta", "true"));
 
       assertResponseValues(10,
           cluster.getSolrClient(),
@@ -769,7 +775,7 @@ public class TestContainerReqHandler extends SolrCloudTestCase {
 
       params = new MapSolrParams((Map) Utils.makeMap("collection", COLLECTION_NAME,
           WT, JAVABIN,
-          "meta","true"));
+          "meta", "true"));
 
       assertResponseValues(10,
           cluster.getSolrClient(),
@@ -786,7 +792,7 @@ public class TestContainerReqHandler extends SolrCloudTestCase {
 
 
       solrQuery = new SolrQuery("q", "id:2", "collection", COLLECTION_NAME);
-      SolrResponse result = assertResponseValues(10,
+      NavigableObject result = assertResponseValues(10,
           cluster.getSolrClient(),
           new QueryRequest(solrQuery),
           Utils.makeMap("response[0]/my_synthetic_fld_s", "version_2"));
@@ -798,4 +804,39 @@ public class TestContainerReqHandler extends SolrCloudTestCase {
     }
   }
 
+  public void testBlobManagement() throws Exception {
+    System.setProperty("enable.runtime.lib", "true");
+    MiniSolrCloudCluster cluster = configureCluster(4)
+        .withJettyConfig(jetty -> jetty.enableV2(true))
+        .addConfig("conf", configset("cloud-minimal"))
+        .configure();
+    try {
+      ByteBuffer jar1 = getFileContent("runtimecode/runtimelibs.jar.bin");
+      new V2Request.Builder("/cluster/blob")
+          .withMethod(SolrRequest.METHOD.POST)
+          .withPayload(jar1)
+          .forceV2(true)
+          .withMimeType("application/octet-stream")
+          .build()
+          .process(cluster.getSolrClient());
+
+      Map expected = Utils.makeMap("/blob/e1f9e23988c19619402f1040c9251556dcd6e02b9d3e3b966a129ea1be5c70fc", (Predicate<?>) o -> o != null);
+      for (JettySolrRunner jettySolrRunner : cluster.getJettySolrRunners()) {
+        String url =  jettySolrRunner.getBaseUrl().toString().replace("/solr", "/api") + "/node/blob?wt=javabin";
+        assertResponseValues(10, () -> {
+          try(HttpSolrClient solrClient = (HttpSolrClient) jettySolrRunner.newClient()) {
+            return (NavigableObject) Utils.executeGET(solrClient.getHttpClient(),url , JAVABINCONSUMER);
+          }
+        }, expected);
+
+      }
+
+
+    } finally {
+      cluster.shutdown();
+    }
+
+
+  }
+
 }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionApiMapping.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionApiMapping.java
index 5d0f022..fe647d0 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionApiMapping.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionApiMapping.java
@@ -243,7 +243,12 @@ public class CollectionApiMapping {
     GET_NODES(CLUSTER_NODES, GET, null),
     FORCE_LEADER(PER_COLLECTION_PER_SHARD_COMMANDS, POST, CollectionAction.FORCELEADER, "force-leader", null),
     BALANCE_SHARD_UNIQUE(PER_COLLECTION, POST, BALANCESHARDUNIQUE,"balance-shard-unique" , null),
-    POST_BLOB(EndPoint.CLUSTER_BLOB, POST, null)
+    POST_BLOB(EndPoint.CLUSTER_BLOB, POST, null){
+      @Override
+      public boolean isRaw() {
+        return true;
+      }
+    }
     ;
 
     public final String commandName;
@@ -474,6 +479,12 @@ public class CollectionApiMapping {
   public interface CommandMeta {
     String getName();
 
+    /** If true, do not do anything with the payload. The command implementation will do everything
+     */
+    default boolean isRaw(){
+      return false;
+    }
+
     /**
      * the http method supported by this command
      */
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/V2Request.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/V2Request.java
index 6357266..da08842 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/V2Request.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/V2Request.java
@@ -18,11 +18,14 @@
 package org.apache.solr.client.solrj.request;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.solr.client.solrj.ResponseParser;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrRequest;
@@ -43,6 +46,7 @@ public class V2Request extends SolrRequest<V2Response> implements MapWriter {
   private SolrParams solrParams;
   public final boolean useBinary;
   private String collection;
+  private String mimeType;
   private boolean forceV2 = false;
   private boolean isPerCollectionRequest = false;
 
@@ -76,6 +80,15 @@ public class V2Request extends SolrRequest<V2Response> implements MapWriter {
     return new RequestWriter.ContentWriter() {
       @Override
       public void write(OutputStream os) throws IOException {
+        if(payload instanceof ByteBuffer){
+          ByteBuffer b = (ByteBuffer) payload;
+          os.write(b.array(), b.arrayOffset(), b.limit());
+          return;
+        }
+        if(payload instanceof InputStream){
+          IOUtils.copy((InputStream) payload, os);
+          return;
+        }
         if (useBinary) {
           new JavaBinCodec().marshal(payload, os);
         } else {
@@ -85,6 +98,7 @@ public class V2Request extends SolrRequest<V2Response> implements MapWriter {
 
       @Override
       public String getContentType() {
+        if(mimeType != null) return mimeType;
         return useBinary ? JAVABIN_MIME : JSON_MIME;
       }
     };
@@ -121,6 +135,7 @@ public class V2Request extends SolrRequest<V2Response> implements MapWriter {
 
     private boolean forceV2EndPoint = false;
     private ResponseParser parser;
+    private String mimeType;
 
     /**
      * Create a Builder object based on the provided resource.
@@ -180,11 +195,18 @@ public class V2Request extends SolrRequest<V2Response> implements MapWriter {
       return this;
     }
 
+    public Builder withMimeType(String mimeType){
+      this.mimeType = mimeType;
+      return this;
+
+    }
+
     public V2Request build() {
       V2Request v2Request = new V2Request(method, resource, useBinary);
       v2Request.solrParams = params;
       v2Request.payload = payload;
       v2Request.forceV2 = forceV2EndPoint;
+      v2Request.mimeType = mimeType;
       return v2Request;
     }
   }
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/Utils.java b/solr/solrj/src/java/org/apache/solr/common/util/Utils.java
index 09a04c4..f1bc60d 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/Utils.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/Utils.java
@@ -27,6 +27,7 @@ import java.io.Writer;
 import java.lang.invoke.MethodHandles;
 import java.net.URL;
 import java.net.URLDecoder;
+import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.AbstractMap;
@@ -720,26 +721,63 @@ public class Utils {
     return def;
   }
 
-  public interface InputStreamConsumer {
+  public interface InputStreamConsumer<T> {
 
-    void accept(InputStream is) throws IOException;
+    T accept(InputStream is) throws IOException;
 
   }
+  public static final InputStreamConsumer<?> JAVABINCONSUMER = is -> new JavaBinCodec().unmarshal(is);
+  public static final InputStreamConsumer<?> JSONCONSUMER = is -> Utils.fromJSON(is);
+  public static InputStreamConsumer<ByteBuffer> newBytesConsumer(int maxSize){
+    return is -> {
+      try (BinaryRequestWriter.BAOS bos = new BinaryRequestWriter.BAOS()) {
+        long sz = 0;
+        int next = is.read();
+        while (next > -1) {
+          if (++sz > maxSize) throw new BufferOverflowException();
+          bos.write(next);
+          next = is.read();
+        }
+        bos.flush();
+        return ByteBuffer.wrap( bos.getbuf(), 0, bos.size());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    };
+
+  }
+
 
 
-  public static int executeGET(HttpClient client, String url, InputStreamConsumer consumer) throws IOException {
+
+  public static <T> T executeGET(HttpClient client, String url, InputStreamConsumer<T> consumer) throws SolrException {
+    T result = null;
     HttpGet httpGet = new HttpGet(url);
-    HttpResponse rsp = client.execute(httpGet);
+    HttpResponse rsp = null;
+    try {
+      rsp = client.execute(httpGet);
+    } catch (IOException e) {
+      log.error("Error in request to url : "+ url, e);
+      throw new SolrException(SolrException.ErrorCode.UNKNOWN, "error sending request");
+    }
     int statusCode = rsp.getStatusLine().getStatusCode();
-    if(statusCode != 200) return statusCode;
+    if(statusCode != 200) {
+      log.error("Failed a request to : "+ url);
+      throw new SolrException(SolrException.ErrorCode.getErrorCode(statusCode), "Unknown error");
+    }
     HttpEntity entity = rsp.getEntity();
     try{
       InputStream is = entity.getContent();
-      if(consumer != null) consumer.accept(is);
+      if(consumer != null) {
+
+        result = consumer.accept(is);
+      }
+    } catch (IOException e) {
+      throw new SolrException(SolrException.ErrorCode.UNKNOWN, e);
     } finally {
       Utils.consumeFully(entity);
     }
-    return statusCode;
+    return result;
   }
 
 }