You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ho...@apache.org on 2019/12/10 04:14:05 UTC

[lucene-solr] branch branch_8x updated: SOLR-12217: Support shards.preference in SolrJ for individual shard requests (#984)

This is an automated email from the ASF dual-hosted git repository.

houston pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/branch_8x by this push:
     new 53345cb  SOLR-12217: Support shards.preference in SolrJ for individual shard requests (#984)
53345cb is described below

commit 53345cb1cd1eb54e33af65046825f624ba3cadd1
Author: Houston Putman <ho...@apache.org>
AuthorDate: Mon Dec 9 17:11:58 2019 -0500

    SOLR-12217: Support shards.preference in SolrJ for individual shard requests (#984)
---
 solr/CHANGES.txt                                   |  2 +
 .../org/apache/solr/handler/StreamHandler.java     | 22 ++++++
 solr/solr-ref-guide/src/distributed-requests.adoc  |  2 +-
 solr/solr-ref-guide/src/streaming-expressions.adoc |  9 +++
 solr/solr-ref-guide/src/using-solrj.adoc           |  8 +++
 .../client/solrj/impl/BaseCloudSolrClient.java     | 82 +++++++++++++---------
 .../client/solrj/io/stream/CloudSolrStream.java    |  3 +-
 .../client/solrj/io/stream/DeepRandomStream.java   |  6 +-
 .../solr/client/solrj/io/stream/StreamContext.java | 20 ++++++
 .../solr/client/solrj/io/stream/TupleStream.java   | 50 +++++++++----
 .../routing/NodePreferenceRulesComparator.java     |  2 +-
 .../RequestReplicaListTransformerGenerator.java    | 40 ++++++++---
 .../java/org/apache/solr/common/cloud/Replica.java |  2 +-
 .../solrj/impl/CloudHttp2SolrClientTest.java       | 69 ++++++++++++++++++
 .../client/solrj/impl/CloudSolrClientTest.java     | 71 ++++++++++++++++++-
 .../solr/client/solrj/io/stream/StreamingTest.java | 58 ++++++++++++++-
 .../org/apache/solr/cloud/SolrCloudTestCase.java   | 17 +++++
 17 files changed, 396 insertions(+), 67 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 3ae0a79..7ef1cc3 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -62,6 +62,8 @@ New Features
 
 * SOLR-13912: Add 'countvals' aggregation in JSON FacetModule (hossman, Munendra S N)
 
+* SOLR-12217: Support shards.preference in SolrJ for single shard collections. The parameter is now used by the CloudSolrClient and Streaming Expressions. (Houston Putman, Tomas Fernandez-Lobbe)
+
 Improvements
 ---------------------
 
diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
index ccbbb3a..1502190 100644
--- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java
@@ -42,7 +42,10 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
+import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
@@ -162,9 +165,28 @@ public class StreamHandler extends RequestHandlerBase implements SolrCoreAware,
       return;
     }
 
+
+    final SolrCore core = req.getCore(); // explicit check for null core (temporary?, for tests)
+    ZkController zkController = core == null ? null : core.getCoreContainer().getZkController();
+    RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator;
+    if (zkController != null) {
+      requestReplicaListTransformerGenerator = new RequestReplicaListTransformerGenerator(
+          zkController.getZkStateReader().getClusterProperties()
+              .getOrDefault(ZkStateReader.DEFAULT_SHARD_PREFERENCES, "")
+              .toString(),
+          zkController.getNodeName(),
+          zkController.getBaseUrl(),
+          zkController.getSysPropsCacher()
+      );
+    } else {
+      requestReplicaListTransformerGenerator = new RequestReplicaListTransformerGenerator();
+    }
+
     int worker = params.getInt("workerID", 0);
     int numWorkers = params.getInt("numWorkers", 1);
     StreamContext context = new StreamContext();
+    context.setRequestParams(params);
+    context.setRequestReplicaListTransformerGenerator(requestReplicaListTransformerGenerator);
     context.put("shards", getCollectionShards(params));
     context.workerID = worker;
     context.numWorkers = numWorkers;
diff --git a/solr/solr-ref-guide/src/distributed-requests.adoc b/solr/solr-ref-guide/src/distributed-requests.adoc
index dde0fea..4d06728 100644
--- a/solr/solr-ref-guide/src/distributed-requests.adoc
+++ b/solr/solr-ref-guide/src/distributed-requests.adoc
@@ -160,7 +160,7 @@ Solr allows you to pass an optional string parameter named `shards.preference` t
 
 The syntax is: `shards.preference=_property_:__value__`. The order of the properties and the values are significant: the first one is the primary sort, the second is secondary, etc.
 
-IMPORTANT: `shards.preference` only works for distributed queries, i.e., queries targeting multiple shards. Single shard scenarios are not supported.
+IMPORTANT: `shards.preference` is supported for single shard scenarios when using the SolrJ clients.
 
 The properties that can be specified are as follows:
 
diff --git a/solr/solr-ref-guide/src/streaming-expressions.adoc b/solr/solr-ref-guide/src/streaming-expressions.adoc
index d666d76..57ca17c 100644
--- a/solr/solr-ref-guide/src/streaming-expressions.adoc
+++ b/solr/solr-ref-guide/src/streaming-expressions.adoc
@@ -114,6 +114,15 @@ unless the jvm has been started with `-DStreamingExpressionMacros=true` (usually
 
 Because streaming expressions relies on the `/export` handler, many of the field and field type requirements to use `/export` are also requirements for `/stream`, particularly for `sort` and `fl` parameters. Please see the section <<exporting-result-sets.adoc#exporting-result-sets,Exporting Result Sets>> for details.
 
+=== Request Routing
+
+Streaming Expressions respect the <<distributed-requests.adoc#shards-preference-parameter,shards.preference parameter>> for any call to Solr.
+
+The value of `shards.preference` that is used to route requests is determined in the following order. The first option available is used.
+- Provided as a parameter in the streaming expression (e.g. `search(...., shards.preference="replica.type:PULL")`)
+- Provided in the URL Params of the streaming expression (e.g. `http://solr_url:8983/solr/stream?expr=....&shards.preference=replica.type:PULL`)
+- Set as a default in the Cluster properties.
+
 === Adding Custom Expressions
 
 Creating your own custom expressions can be easily done by implementing the {solr-javadocs}/solr-solrj/org/apache/solr/client/solrj/io/stream/expr/Expressible.html[Expressible] interface.   To add a custom expression to the
diff --git a/solr/solr-ref-guide/src/using-solrj.adoc b/solr/solr-ref-guide/src/using-solrj.adoc
index e4e41f5..f60664d 100644
--- a/solr/solr-ref-guide/src/using-solrj.adoc
+++ b/solr/solr-ref-guide/src/using-solrj.adoc
@@ -120,6 +120,14 @@ include::{example-source-dir}UsingSolrJRefGuideExamplesTest.java[tag=solrj-solrc
 
 When these values are not explicitly provided, SolrJ falls back to using the defaults for the OS/environment is running on.
 
+=== Cloud Request Routing
+
+The SolrJ `CloudSolrClient` implementations (`CloudSolrClient` and `CloudHttp2SolrClient`) respect the <<distributed-requests.adoc#shards-preference-parameter,shards.preference parameter>>.
+Therefore requests sent to single-sharded collections, using either of the above clients, will route requests the same way that distributed requests are routed to individual shards.
+If no `shards.preference` parameter is provided, the clients will default to sorting replicas randomly.
+
+For update requests, while the replicas are sorted in the order defined by the request, leader replicas will always be sorted first.
+
 == Querying in SolrJ
 `SolrClient` has a number of `query()` methods for fetching results from Solr.  Each of these methods takes in a `SolrParams`,an object encapsulating arbitrary query-parameters.  And each method outputs a `QueryResponse`, a wrapper which can be used to access the result documents and other related metadata.
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
index 0461e67..52038ad 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
@@ -44,6 +44,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 import org.apache.solr.client.solrj.ResponseParser;
 import org.apache.solr.client.solrj.SolrClient;
@@ -55,6 +56,8 @@ import org.apache.solr.client.solrj.request.IsUpdateRequest;
 import org.apache.solr.client.solrj.request.RequestWriter;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.request.V2Request;
+import org.apache.solr.client.solrj.routing.ReplicaListTransformer;
+import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
 import org.apache.solr.client.solrj.util.ClientUtils;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
@@ -69,7 +72,6 @@ import org.apache.solr.common.cloud.ImplicitDocRouter;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.ShardParams;
@@ -100,6 +102,7 @@ public abstract class BaseCloudSolrClient extends SolrClient {
 
   private final boolean updatesToLeaders;
   private final boolean directUpdatesToLeadersOnly;
+  private final RequestReplicaListTransformerGenerator requestRLTGenerator;
   boolean parallelUpdates; //TODO final
   private ExecutorService threadPool = ExecutorUtil
       .newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory(
@@ -221,6 +224,7 @@ public abstract class BaseCloudSolrClient extends SolrClient {
     this.updatesToLeaders = updatesToLeaders;
     this.parallelUpdates = parallelUpdates;
     this.directUpdatesToLeadersOnly = directUpdatesToLeadersOnly;
+    this.requestRLTGenerator = new RequestReplicaListTransformerGenerator();
   }
 
   /** Sets the cache ttl for DocCollection Objects cached  . This is only applicable for collections which are persisted outside of clusterstate.json
@@ -467,6 +471,8 @@ public abstract class BaseCloudSolrClient extends SolrClient {
       for(String param : NON_ROUTABLE_PARAMS) {
         routableParams.remove(param);
       }
+    } else {
+      params = new ModifiableSolrParams();
     }
 
     if (collection == null) {
@@ -492,10 +498,12 @@ public abstract class BaseCloudSolrClient extends SolrClient {
       return null;
     }
 
+    ReplicaListTransformer replicaListTransformer = requestRLTGenerator.getReplicaListTransformer(params);
+
     //Create the URL map, which is keyed on slice name.
     //The value is a list of URLs for each replica in the slice.
     //The first value in the list is the leader for the slice.
-    final Map<String,List<String>> urlMap = buildUrlMap(col);
+    final Map<String,List<String>> urlMap = buildUrlMap(col, replicaListTransformer);
     final Map<String, ? extends LBSolrClient.Req> routes = createRoutes(updateRequest, routableParams, col, router, urlMap, idField);
     if (routes == null) {
       if (directUpdatesToLeadersOnly && hasInfoToFindLeaders(updateRequest, idField)) {
@@ -616,12 +624,12 @@ public abstract class BaseCloudSolrClient extends SolrClient {
     return urlMap == null ? null : updateRequest.getRoutesToCollection(router, col, urlMap, routableParams, idField);
   }
 
-  private Map<String,List<String>> buildUrlMap(DocCollection col) {
+  private Map<String,List<String>> buildUrlMap(DocCollection col, ReplicaListTransformer replicaListTransformer) {
     Map<String, List<String>> urlMap = new HashMap<>();
     Slice[] slices = col.getActiveSlicesArr();
     for (Slice slice : slices) {
       String name = slice.getName();
-      List<String> urls = new ArrayList<>();
+      List<Replica> sortedReplicas = new ArrayList<>();
       Replica leader = slice.getLeader();
       if (directUpdatesToLeadersOnly && leader == null) {
         for (Replica replica : slice.getReplicas(
@@ -638,20 +646,22 @@ public abstract class BaseCloudSolrClient extends SolrClient {
         // take unoptimized general path - we cannot find a leader yet
         return null;
       }
-      ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader);
-      String url = zkProps.getCoreUrl();
-      urls.add(url);
+
       if (!directUpdatesToLeadersOnly) {
         for (Replica replica : slice.getReplicas()) {
-          if (!replica.getNodeName().equals(leader.getNodeName()) &&
-              !replica.getName().equals(leader.getName())) {
-            ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica);
-            String url1 = zkProps1.getCoreUrl();
-            urls.add(url1);
+          if (!replica.equals(leader)) {
+            sortedReplicas.add(replica);
           }
         }
       }
-      urlMap.put(name, urls);
+
+      // Sort the non-leader replicas according to the request parameters
+      replicaListTransformer.transform(sortedReplicas);
+
+      // put the leaderUrl first.
+      sortedReplicas.add(0, leader);
+
+      urlMap.put(name, sortedReplicas.stream().map(Replica::getCoreUrl).collect(Collectors.toList()));
     }
     return urlMap;
   }
@@ -1046,6 +1056,8 @@ public abstract class BaseCloudSolrClient extends SolrClient {
       reqParams = new ModifiableSolrParams();
     }
 
+    ReplicaListTransformer replicaListTransformer = requestRLTGenerator.getReplicaListTransformer(reqParams);
+
     final Set<String> liveNodes = getClusterStateProvider().getLiveNodes();
 
     final List<String> theUrlList = new ArrayList<>(); // we populate this as follows...
@@ -1087,34 +1099,38 @@ public abstract class BaseCloudSolrClient extends SolrClient {
       }
 
       // Gather URLs, grouped by leader or replica
-      // TODO: allow filtering by group, role, etc
-      Set<String> seenNodes = new HashSet<>();
-      List<String> replicas = new ArrayList<>();
-      String joinedInputCollections = StrUtils.join(inputCollections, ',');
+      List<Replica> sortedReplicas = new ArrayList<>();
+      List<Replica> replicas = new ArrayList<>();
       for (Slice slice : slices.values()) {
-        for (ZkNodeProps nodeProps : slice.getReplicasMap().values()) {
-          ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);
-          String node = coreNodeProps.getNodeName();
+        Replica leader = slice.getLeader();
+        for (Replica replica : slice.getReplicas()) {
+          String node = replica.getNodeName();
           if (!liveNodes.contains(node) // Must be a live node to continue
-              || Replica.State.getState(coreNodeProps.getState()) != Replica.State.ACTIVE) // Must be an ACTIVE replica to continue
+              || replica.getState() != Replica.State.ACTIVE) // Must be an ACTIVE replica to continue
             continue;
-          if (seenNodes.add(node)) { // if we haven't yet collected a URL to this node...
-            String url = ZkCoreNodeProps.getCoreUrl(nodeProps.getStr(ZkStateReader.BASE_URL_PROP), joinedInputCollections);
-            if (sendToLeaders && coreNodeProps.isLeader()) {
-              theUrlList.add(url); // put leaders here eagerly (if sendToLeader mode)
-            } else {
-              replicas.add(url); // replicas here
-            }
+          if (sendToLeaders && replica.equals(leader)) {
+            sortedReplicas.add(replica); // put leaders here eagerly (if sendToLeader mode)
+          } else {
+            replicas.add(replica); // replicas here
           }
         }
       }
 
-      // Shuffle the leaders, if any    (none if !sendToLeaders)
-      Collections.shuffle(theUrlList, rand);
+      // Sort the leader replicas, if any, according to the request preferences    (none if !sendToLeaders)
+      replicaListTransformer.transform(sortedReplicas);
+
+      // Sort the replicas, if any, according to the request preferences and append to our list
+      replicaListTransformer.transform(replicas);
 
-      // Shuffle the replicas, if any, and append to our list
-      Collections.shuffle(replicas, rand);
-      theUrlList.addAll(replicas);
+      sortedReplicas.addAll(replicas);
+
+      String joinedInputCollections = StrUtils.join(inputCollections, ',');
+      Set<String> seenNodes = new HashSet<>();
+      sortedReplicas.forEach( replica -> {
+        if (seenNodes.add(replica.getNodeName())) {
+          theUrlList.add(ZkCoreNodeProps.getCoreUrl(replica.getBaseUrl(), joinedInputCollections));
+        }
+      });
 
       if (theUrlList.isEmpty()) {
         collectionStateCache.keySet().removeAll(collectionNames);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
index 8464cf3..2220989 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java
@@ -371,12 +371,13 @@ public class CloudSolrStream extends TupleStream implements Expressible {
   protected void constructStreams() throws IOException {
     try {
 
-      List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext);
 
       ModifiableSolrParams mParams = new ModifiableSolrParams(params);
       mParams = adjustParams(mParams);
       mParams.set(DISTRIB, "false"); // We are the aggregator.
 
+      List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext, mParams);
+
       for(String shardUrl : shardUrls) {
         SolrStream solrStream = new SolrStream(shardUrl, mParams);
         if(streamContext != null) {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java
index d2ef18c..3881a64 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java
@@ -309,12 +309,12 @@ public class DeepRandomStream extends TupleStream implements Expressible {
 
   protected void constructStreams() throws IOException {
     try {
-
-      List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext);
-
       ModifiableSolrParams mParams = new ModifiableSolrParams(params);
       mParams = adjustParams(mParams);
       mParams.set(DISTRIB, "false"); // We are the aggregator.
+
+      List<String> shardUrls = getShards(this.zkHost, this.collection, this.streamContext, mParams);
+
       String rows = mParams.get(ROWS);
       int r = Integer.parseInt(rows);
       int newRows = r/shardUrls.size();
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java
index 778aace..8243b2a 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java
@@ -24,6 +24,8 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.solr.client.solrj.io.ModelCache;
 import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
+import org.apache.solr.common.params.SolrParams;
 
 /**
  *  The StreamContext is passed to TupleStreams using the TupleStream.setStreamContext() method.
@@ -45,6 +47,8 @@ public class StreamContext implements Serializable{
   private SolrClientCache clientCache;
   private ModelCache modelCache;
   private StreamFactory streamFactory;
+  private SolrParams requestParams;
+  private RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator;
 
   public ConcurrentMap getObjectCache() {
     return this.objectCache;
@@ -101,4 +105,20 @@ public class StreamContext implements Serializable{
   public StreamFactory getStreamFactory() {
     return this.streamFactory;
   }
+
+  public void setRequestParams(SolrParams requestParams) {
+    this.requestParams = requestParams;
+  }
+
+  public SolrParams getRequestParams() {
+    return requestParams;
+  }
+
+  public void setRequestReplicaListTransformerGenerator(RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator) {
+    this.requestReplicaListTransformerGenerator = requestReplicaListTransformerGenerator;
+  }
+
+  public RequestReplicaListTransformerGenerator getRequestReplicaListTransformerGenerator() {
+    return requestReplicaListTransformerGenerator;
+  }
 }
\ No newline at end of file
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
index 94dd920..c2957bc 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java
@@ -21,26 +21,28 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
-import java.util.Random;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.Map;
 
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.io.Tuple;
 import org.apache.solr.client.solrj.io.comp.StreamComparator;
 import org.apache.solr.client.solrj.io.stream.expr.Explanation;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.client.solrj.routing.ReplicaListTransformer;
+import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
 import org.apache.solr.common.IteratorWriter;
 import org.apache.solr.common.MapWriter;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
 
 
 /**
@@ -118,6 +120,14 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter
                                        String collection,
                                        StreamContext streamContext)
       throws IOException {
+    return getShards(zkHost, collection, streamContext, new ModifiableSolrParams());
+  }
+
+  public static List<String> getShards(String zkHost,
+                                       String collection,
+                                       StreamContext streamContext,
+                                       SolrParams requestParams)
+      throws IOException {
     Map<String, List<String>> shardsMap = null;
     List<String> shards = new ArrayList();
 
@@ -130,24 +140,34 @@ public abstract class TupleStream implements Closeable, Serializable, MapWriter
       shards = shardsMap.get(collection);
     } else {
       //SolrCloud Sharding
-      CloudSolrClient cloudSolrClient = streamContext.getSolrClientCache().getCloudSolrClient(zkHost);
+      CloudSolrClient cloudSolrClient =
+          Optional.ofNullable(streamContext.getSolrClientCache()).orElseGet(SolrClientCache::new).getCloudSolrClient(zkHost);
       ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader();
       ClusterState clusterState = zkStateReader.getClusterState();
       Slice[] slices = CloudSolrStream.getSlices(collection, zkStateReader, true);
       Set<String> liveNodes = clusterState.getLiveNodes();
+
+
+      ModifiableSolrParams solrParams = new ModifiableSolrParams(streamContext.getRequestParams());
+      solrParams.add(requestParams);
+
+      RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator =
+          Optional.ofNullable(streamContext.getRequestReplicaListTransformerGenerator()).orElseGet(RequestReplicaListTransformerGenerator::new);
+
+      ReplicaListTransformer replicaListTransformer = requestReplicaListTransformerGenerator.getReplicaListTransformer(solrParams);
+
       for(Slice slice : slices) {
-        Collection<Replica> replicas = slice.getReplicas();
-        List<Replica> shuffler = new ArrayList<>();
-        for(Replica replica : replicas) {
-          if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName()))
-            shuffler.add(replica);
+        List<Replica> sortedReplicas = new ArrayList<>();
+        for(Replica replica : slice.getReplicas()) {
+          if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())) {
+            sortedReplicas.add(replica);
+          }
         }
 
-        Collections.shuffle(shuffler, new Random());
-        Replica rep = shuffler.get(0);
-        ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep);
-        String url = zkProps.getCoreUrl();
-        shards.add(url);
+        replicaListTransformer.transform(sortedReplicas);
+        if (sortedReplicas.size() > 0) {
+          shards.add(sortedReplicas.get(0).getCoreUrl());
+        }
       }
     }
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/routing/NodePreferenceRulesComparator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/routing/NodePreferenceRulesComparator.java
index 4fdab0f..bb8cecb 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/routing/NodePreferenceRulesComparator.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/routing/NodePreferenceRulesComparator.java
@@ -166,7 +166,7 @@ public class NodePreferenceRulesComparator implements Comparator<Object> {
       return false;
     }
     final String s = ((Replica)o).getType().toString();
-    return s.equals(preferred);
+    return s.equalsIgnoreCase(preferred);
   }
 
   public List<PreferenceRule> getSortRules() {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/routing/RequestReplicaListTransformerGenerator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/routing/RequestReplicaListTransformerGenerator.java
index 58c8b2e..12ce4cf 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/routing/RequestReplicaListTransformerGenerator.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/routing/RequestReplicaListTransformerGenerator.java
@@ -20,6 +20,7 @@ import java.lang.invoke.MethodHandles;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 import java.util.Random;
 
 import org.apache.solr.common.SolrException;
@@ -41,9 +42,13 @@ public class RequestReplicaListTransformerGenerator {
       (String configSpec, SolrParams requestParams, ReplicaListTransformerFactory fallback) -> shufflingReplicaListTransformer;
   private final ReplicaListTransformerFactory stableRltFactory;
   private final ReplicaListTransformerFactory defaultRltFactory;
+  private final String defaultShardPreferences;
+  private final String nodeName;
+  private final String localHostAddress;
+  private final NodesSysPropsCacher sysPropsCacher;
 
   public RequestReplicaListTransformerGenerator() {
-    this(RANDOM_RLTF);
+    this(null);
   }
 
   public RequestReplicaListTransformerGenerator(ReplicaListTransformerFactory defaultRltFactory) {
@@ -51,16 +56,24 @@ public class RequestReplicaListTransformerGenerator {
   }
 
   public RequestReplicaListTransformerGenerator(ReplicaListTransformerFactory defaultRltFactory, ReplicaListTransformerFactory stableRltFactory) {
-    this.defaultRltFactory = defaultRltFactory;
-    if (stableRltFactory == null) {
-      this.stableRltFactory = new AffinityReplicaListTransformerFactory();
-    } else {
-      this.stableRltFactory = stableRltFactory;
-    }
+    this(defaultRltFactory, stableRltFactory, null, null, null, null);
+  }
+
+  public RequestReplicaListTransformerGenerator(String defaultShardPreferences, String nodeName, String localHostAddress, NodesSysPropsCacher sysPropsCacher) {
+    this(null, null, defaultShardPreferences, nodeName, localHostAddress, sysPropsCacher);
+  }
+
+  public RequestReplicaListTransformerGenerator(ReplicaListTransformerFactory defaultRltFactory, ReplicaListTransformerFactory stableRltFactory, String defaultShardPreferences, String nodeName, String localHostAddress, NodesSysPropsCacher sysPropsCacher) {
+    this.defaultRltFactory = Optional.ofNullable(defaultRltFactory).orElse(RANDOM_RLTF);
+    this.stableRltFactory = Optional.ofNullable(stableRltFactory).orElseGet(AffinityReplicaListTransformerFactory::new);
+    this.defaultShardPreferences = Optional.ofNullable(defaultShardPreferences).orElse("");
+    this.nodeName = nodeName;
+    this.localHostAddress = localHostAddress;
+    this.sysPropsCacher = sysPropsCacher;
   }
 
   public ReplicaListTransformer getReplicaListTransformer(final SolrParams requestParams) {
-    return getReplicaListTransformer(requestParams, "");
+    return getReplicaListTransformer(requestParams, null);
   }
 
   public ReplicaListTransformer getReplicaListTransformer(final SolrParams requestParams, String defaultShardPreferences) {
@@ -70,6 +83,7 @@ public class RequestReplicaListTransformerGenerator {
   public ReplicaListTransformer getReplicaListTransformer(final SolrParams requestParams, String defaultShardPreferences, String nodeName, String localHostAddress, NodesSysPropsCacher sysPropsCacher) {
     @SuppressWarnings("deprecation")
     final boolean preferLocalShards = requestParams.getBool(CommonParams.PREFER_LOCAL_SHARDS, false);
+    defaultShardPreferences = Optional.ofNullable(defaultShardPreferences).orElse(this.defaultShardPreferences);
     final String shardsPreferenceSpec = requestParams.get(ShardParams.SHARDS_PREFERENCE, defaultShardPreferences);
 
     if (preferLocalShards || !shardsPreferenceSpec.isEmpty()) {
@@ -84,7 +98,15 @@ public class RequestReplicaListTransformerGenerator {
         preferenceRules.add(new PreferenceRule(ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION, ShardParams.REPLICA_LOCAL));
       }
 
-      NodePreferenceRulesComparator replicaComp = new NodePreferenceRulesComparator(preferenceRules, requestParams, nodeName, localHostAddress, sysPropsCacher, defaultRltFactory, stableRltFactory);
+      NodePreferenceRulesComparator replicaComp =
+          new NodePreferenceRulesComparator(
+              preferenceRules,
+              requestParams,
+              Optional.ofNullable(nodeName).orElse(this.nodeName),
+              Optional.ofNullable(localHostAddress).orElse(this.localHostAddress),
+              Optional.ofNullable(sysPropsCacher).orElse(this.sysPropsCacher),
+              defaultRltFactory,
+              stableRltFactory);
       ReplicaListTransformer baseReplicaListTransformer = replicaComp.getBaseReplicaListTransformer();
       if (replicaComp.getSortRules() == null) {
         // only applying base transformation
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
index bc57176..5ff10c2 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
@@ -102,7 +102,7 @@ public class Replica extends ZkNodeProps {
     PULL;
 
     public static Type get(String name){
-      return name == null ? Type.NRT : Type.valueOf(name);
+      return name == null ? Type.NRT : Type.valueOf(name.toUpperCase(Locale.ROOT));
     }
   }
 
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java
index 793ee5f..74b02cb 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -488,6 +489,74 @@ public class CloudHttp2SolrClientTest extends SolrCloudTestCase {
         shardAddresses.size() > 1 && ports.size()==1);
   }
 
+
+
+  /**
+   * Tests if the 'shards.preference' parameter works with single-sharded collections.
+   */
+  @Test
+  public void singleShardedPreferenceRules() throws Exception {
+    String collectionName = "singleShardPreferenceTestColl";
+
+    int liveNodes = cluster.getJettySolrRunners().size();
+
+    // For testing replica.type, we want to have all replica types available for the collection
+    CollectionAdminRequest.createCollection(collectionName, "conf", 1, liveNodes/3, liveNodes/3, liveNodes/3)
+        .setMaxShardsPerNode(liveNodes)
+        .processAndWait(cluster.getSolrClient(), TIMEOUT);
+    cluster.waitForActiveCollection(collectionName, 1, liveNodes);
+
+    // Add some new documents
+    new UpdateRequest()
+        .add(id, "0", "a_t", "hello1")
+        .add(id, "2", "a_t", "hello2")
+        .add(id, "3", "a_t", "hello2")
+        .commit(getRandomClient(), collectionName);
+
+    // Run the actual test for 'queryReplicaType'
+    queryReplicaType(getRandomClient(), Replica.Type.PULL, collectionName);
+    queryReplicaType(getRandomClient(), Replica.Type.TLOG, collectionName);
+    queryReplicaType(getRandomClient(), Replica.Type.NRT, collectionName);
+  }
+
+  private void queryReplicaType(CloudHttp2SolrClient cloudClient,
+                                Replica.Type typeToQuery,
+                                String collectionName)
+      throws Exception
+  {
+    SolrQuery qRequest = new SolrQuery("*:*");
+
+    ModifiableSolrParams qParams = new ModifiableSolrParams();
+    qParams.add(ShardParams.SHARDS_PREFERENCE, ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":" + typeToQuery.toString());
+    qParams.add(ShardParams.SHARDS_INFO, "true");
+    qRequest.add(qParams);
+
+    Map<String, String> replicaTypeToReplicas = mapReplicasToReplicaType(getCollectionState(collectionName));
+
+    QueryResponse qResponse = cloudClient.query(collectionName, qRequest);
+
+    Object shardsInfo = qResponse.getResponse().get(ShardParams.SHARDS_INFO);
+    assertNotNull("Unable to obtain "+ShardParams.SHARDS_INFO, shardsInfo);
+
+    // Iterate over shards-info and check what cores responded
+    SimpleOrderedMap<?> shardsInfoMap = (SimpleOrderedMap<?>)shardsInfo;
+    Iterator<Map.Entry<String, ?>> itr = shardsInfoMap.asMap(100).entrySet().iterator();
+    List<String> shardAddresses = new ArrayList<String>();
+    while (itr.hasNext()) {
+      Map.Entry<String, ?> e = itr.next();
+      assertTrue("Did not find map-type value in "+ShardParams.SHARDS_INFO, e.getValue() instanceof Map);
+      String shardAddress = (String)((Map)e.getValue()).get("shardAddress");
+      if (shardAddress.endsWith("/")) {
+        shardAddress = shardAddress.substring(0, shardAddress.length() - 1);
+      }
+      assertNotNull(ShardParams.SHARDS_INFO+" did not return 'shardAddress' parameter", shardAddress);
+      shardAddresses.add(shardAddress);
+    }
+    assertEquals("Shard addresses must be of size 1, since there is only 1 shard in the collection", 1, shardAddresses.size());
+
+    assertEquals("Make sure that the replica queried was the replicaType desired", typeToQuery.toString().toUpperCase(Locale.ROOT), replicaTypeToReplicas.get(shardAddresses.get(0)).toUpperCase(Locale.ROOT));
+  }
+
   private Long getNumRequests(String baseUrl, String collectionName) throws
       SolrServerException, IOException {
     return getNumRequests(baseUrl, collectionName, "QUERY", "/select", null, false);
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
index 0025ace..57050ce 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -427,8 +428,8 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
 
   @SuppressWarnings("deprecation")
   private void queryWithShardsPreferenceRules(CloudSolrClient cloudClient,
-                                          boolean useShardsPreference,
-                                          String collectionName)
+                                              boolean useShardsPreference,
+                                              String collectionName)
       throws Exception
   {
     SolrQuery qRequest = new SolrQuery("*:*");
@@ -476,6 +477,72 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
         shardAddresses.size() > 1 && ports.size()==1);
   }
 
+  /**
+   * Tests if the 'shards.preference' parameter works with single-sharded collections.
+   */
+  @Test
+  public void singleShardedPreferenceRules() throws Exception {
+    String collectionName = "singleShardPreferenceTestColl";
+
+    int liveNodes = cluster.getJettySolrRunners().size();
+
+    // For testing replica.type, we want to have all replica types available for the collection
+    CollectionAdminRequest.createCollection(collectionName, "conf", 1, liveNodes/3, liveNodes/3, liveNodes/3)
+        .setMaxShardsPerNode(liveNodes)
+        .processAndWait(cluster.getSolrClient(), TIMEOUT);
+    cluster.waitForActiveCollection(collectionName, 1, liveNodes);
+
+    // Add some new documents
+    new UpdateRequest()
+        .add(id, "0", "a_t", "hello1")
+        .add(id, "2", "a_t", "hello2")
+        .add(id, "3", "a_t", "hello2")
+        .commit(getRandomClient(), collectionName);
+
+    // Run the actual test for 'queryReplicaType'
+    queryReplicaType(getRandomClient(), Replica.Type.PULL, collectionName);
+    queryReplicaType(getRandomClient(), Replica.Type.TLOG, collectionName);
+    queryReplicaType(getRandomClient(), Replica.Type.NRT, collectionName);
+  }
+
+  private void queryReplicaType(CloudSolrClient cloudClient,
+                                          Replica.Type typeToQuery,
+                                          String collectionName)
+      throws Exception
+  {
+    SolrQuery qRequest = new SolrQuery("*:*");
+
+    ModifiableSolrParams qParams = new ModifiableSolrParams();
+    qParams.add(ShardParams.SHARDS_PREFERENCE, ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":" + typeToQuery.toString());
+    qParams.add(ShardParams.SHARDS_INFO, "true");
+    qRequest.add(qParams);
+
+    Map<String, String> replicaTypeToReplicas = mapReplicasToReplicaType(getCollectionState(collectionName));
+
+    QueryResponse qResponse = cloudClient.query(collectionName, qRequest);
+
+    Object shardsInfo = qResponse.getResponse().get(ShardParams.SHARDS_INFO);
+    assertNotNull("Unable to obtain "+ShardParams.SHARDS_INFO, shardsInfo);
+
+    // Iterate over shards-info and check what cores responded
+    SimpleOrderedMap<?> shardsInfoMap = (SimpleOrderedMap<?>)shardsInfo;
+    Iterator<Map.Entry<String, ?>> itr = shardsInfoMap.asMap(100).entrySet().iterator();
+    List<String> shardAddresses = new ArrayList<String>();
+    while (itr.hasNext()) {
+      Map.Entry<String, ?> e = itr.next();
+      assertTrue("Did not find map-type value in "+ShardParams.SHARDS_INFO, e.getValue() instanceof Map);
+      String shardAddress = (String)((Map)e.getValue()).get("shardAddress");
+      if (shardAddress.endsWith("/")) {
+        shardAddress = shardAddress.substring(0, shardAddress.length() - 1);
+      }
+      assertNotNull(ShardParams.SHARDS_INFO+" did not return 'shardAddress' parameter", shardAddress);
+      shardAddresses.add(shardAddress);
+    }
+    assertEquals("Shard addresses must be of size 1, since there is only 1 shard in the collection", 1, shardAddresses.size());
+
+    assertEquals("Make sure that the replica queried was the replicaType desired", typeToQuery.toString().toUpperCase(Locale.ROOT), replicaTypeToReplicas.get(shardAddresses.get(0)).toUpperCase(Locale.ROOT));
+  }
+
   private Long getNumRequests(String baseUrl, String collectionName) throws
       SolrServerException, IOException {
     return getNumRequests(baseUrl, collectionName, "QUERY", "/select", null, false);
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
index 2ecdf24..475b74d 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
@@ -48,8 +48,11 @@ import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
 import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
 import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.ShardParams;
 import org.apache.solr.common.params.SolrParams;
 import org.junit.Assume;
 import org.junit.Before;
@@ -68,6 +71,7 @@ import org.junit.Test;
 public class StreamingTest extends SolrCloudTestCase {
 
 public static final String COLLECTIONORALIAS = "streams";
+public static final String MULTI_REPLICA_COLLECTIONORALIAS = "streams-multi-replica";
 
 private static final StreamFactory streamFactory = new StreamFactory()
     .withFunctionName("search", CloudSolrStream.class)
@@ -100,7 +104,8 @@ public static void configureCluster() throws Exception {
   } else {
     collection = COLLECTIONORALIAS;
   }
-  CollectionAdminRequest.createCollection(collection, "conf", numShards, 1).process(cluster.getSolrClient());
+  CollectionAdminRequest.createCollection(collection, "conf", numShards, 1)
+      .process(cluster.getSolrClient());
   cluster.waitForActiveCollection(collection, numShards, numShards);
   if (useAlias) {
     CollectionAdminRequest.createAlias(COLLECTIONORALIAS, collection).process(cluster.getSolrClient());
@@ -108,6 +113,20 @@ public static void configureCluster() throws Exception {
 
   zkHost = cluster.getZkServer().getZkAddress();
   streamFactory.withCollectionZkHost(COLLECTIONORALIAS, zkHost);
+
+  // Set up multi-replica collection
+  if (useAlias) {
+    collection = MULTI_REPLICA_COLLECTIONORALIAS + "_collection";
+  } else {
+    collection = MULTI_REPLICA_COLLECTIONORALIAS;
+  }
+  CollectionAdminRequest.createCollection(collection, "conf", numShards, 1, 1, 1)
+      .setMaxShardsPerNode(numShards * 3)
+      .process(cluster.getSolrClient());
+  cluster.waitForActiveCollection(collection, numShards, numShards * 3);
+  if (useAlias) {
+    CollectionAdminRequest.createAlias(MULTI_REPLICA_COLLECTIONORALIAS, collection).process(cluster.getSolrClient());
+  }
 }
 
 private static final String id = "id";
@@ -2435,6 +2454,43 @@ public void testParallelRankStream() throws Exception {
     }
 
   }
+
+  @Test
+  public void testTupleStreamGetShardsPreference() throws Exception {
+    StreamContext streamContext = new StreamContext();
+    streamContext.setSolrClientCache(new SolrClientCache());
+    streamContext.setRequestReplicaListTransformerGenerator(new RequestReplicaListTransformerGenerator(ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":TLOG", null, null, null));
+
+    streamContext.setRequestParams(mapParams(ShardParams.SHARDS_PREFERENCE, ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":nrt"));
+
+    try {
+      ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
+      List<String> strings = zkStateReader.aliasesManager.getAliases().resolveAliases(MULTI_REPLICA_COLLECTIONORALIAS);
+      String collName = strings.size() > 0 ? strings.get(0) : MULTI_REPLICA_COLLECTIONORALIAS;
+      Map<String, String> replicaTypeMap = mapReplicasToReplicaType(zkStateReader.getClusterState().getCollectionOrNull(collName));
+
+      // Test from extra params
+      SolrParams sParams = mapParams("q", "*:*", ShardParams.SHARDS_PREFERENCE, ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":pull");
+      testTupleStreamSorting(streamContext, sParams, "PULL", replicaTypeMap);
+
+      // Test defaults from streamContext.getParams()
+      testTupleStreamSorting(streamContext, new ModifiableSolrParams(), "NRT", replicaTypeMap);
+
+      // Test defaults from the RLTG
+      streamContext.setRequestParams(new ModifiableSolrParams());
+      testTupleStreamSorting(streamContext, new ModifiableSolrParams(), "TLOG", replicaTypeMap);
+    } finally {
+      streamContext.getSolrClientCache().close();
+    }
+  }
+
+  public void testTupleStreamSorting(StreamContext streamContext, SolrParams solrParams, String replicaType, Map<String, String> replicaTypeMap) throws Exception {
+    List<String> shards = TupleStream.getShards(cluster.getZkClient().getZkServerAddress(), MULTI_REPLICA_COLLECTIONORALIAS, streamContext, solrParams);
+    for (String shard : shards) {
+      assertEquals(shard, replicaType.toUpperCase(Locale.ROOT), replicaTypeMap.getOrDefault(shard, "").toUpperCase(Locale.ROOT));
+    }
+  }
+
   protected List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
     tupleStream.open();
     List<Tuple> tuples = new ArrayList();
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java b/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
index 4ce7a5e..b8e0798 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
@@ -490,4 +490,21 @@ public class SolrCloudTestCase extends SolrTestCaseJ4 {
     cluster.waitForAllNodes(timeoutSeconds);
   }
 
+  public static Map<String, String> mapReplicasToReplicaType(DocCollection collection) {
+    Map<String, String> replicaTypeMap = new HashMap<>();
+    for (Slice slice : collection.getSlices()) {
+      for (Replica replica : slice.getReplicas()) {
+        String coreUrl = replica.getCoreUrl();
+        // It seems replica reports its core URL with a trailing slash while shard
+        // info returned from the query doesn't. Oh well. We will include both, just in case
+        replicaTypeMap.put(coreUrl, replica.getType().toString());
+        if (coreUrl.endsWith("/")) {
+          replicaTypeMap.put(coreUrl.substring(0, coreUrl.length() - 1), replica.getType().toString());
+        }else {
+          replicaTypeMap.put(coreUrl + "/", replica.getType().toString());
+        }
+      }
+    }
+    return replicaTypeMap;
+  }
 }