You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by tf...@apache.org on 2018/04/11 23:25:04 UTC

lucene-solr:branch_7x: SOLR-11982: Add support for indicating preferred replica types for queries

Repository: lucene-solr
Updated Branches:
  refs/heads/branch_7x 6ce215d6f -> ba26bf7c6


SOLR-11982: Add support for indicating preferred replica types for queries


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/ba26bf7c
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/ba26bf7c
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/ba26bf7c

Branch: refs/heads/branch_7x
Commit: ba26bf7c6f8dd28ef0438e719d9b0bb45a60dd58
Parents: 6ce215d
Author: Tomas Fernandez Lobbe <tf...@apache.org>
Authored: Wed Apr 11 16:23:00 2018 -0700
Committer: Tomas Fernandez Lobbe <tf...@apache.org>
Committed: Wed Apr 11 16:24:53 2018 -0700

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   3 +
 .../component/HttpShardHandlerFactory.java      | 165 +++++++++++++------
 .../component/TestHttpShardHandlerFactory.java  | 119 +++++++++++++
 .../src/distributed-requests.adoc               |  40 +++++
 .../shards-and-indexing-data-in-solrcloud.adoc  |   4 +
 .../apache/solr/common/params/CommonParams.java |   1 +
 .../apache/solr/common/params/ShardParams.java  |  12 ++
 .../client/solrj/impl/CloudSolrClientTest.java  | 143 ++++++++++++++--
 8 files changed, 425 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ba26bf7c/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 3fdbb4d..0acd2f3 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -65,6 +65,9 @@ New Features
 
 * SOLR-12181: Add index size autoscaling trigger, based on document count or size in bytes. (ab)
 
+* SOLR-11982: Add possibility to define replica order with the shards.preference parameter to e.g. prefer PULL replicas
+  for distributed queries. (Ere Maijala, Tomás Fernández Löbbe)
+
 Bug Fixes
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ba26bf7c/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
index 8cff025..4e2a794 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
@@ -29,12 +29,12 @@ import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.ShardParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.StrUtils;
 import org.apache.solr.common.util.URLUtil;
-import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.PluginInfo;
 import org.apache.solr.core.SolrInfoBean;
 import org.apache.solr.metrics.SolrMetricManager;
@@ -51,6 +51,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
@@ -303,30 +304,61 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
   /**
    * A distributed request is made via {@link LBHttpSolrClient} to the first live server in the URL list.
    * This means it is just as likely to choose current host as any of the other hosts.
-   * This function makes sure that the cores of current host are always put first in the URL list.
-   * If all nodes prefer local-cores then a bad/heavily-loaded node will receive less requests from healthy nodes.
-   * This will help prevent a distributed deadlock or timeouts in all the healthy nodes due to one bad node.
+   * This function makes sure that the cores are sorted according to the given list of preferences.
+   * E.g. If all nodes prefer local cores then a bad/heavily-loaded node will receive less requests from 
+   * healthy nodes. This will help prevent a distributed deadlock or timeouts in all the healthy nodes due 
+   * to one bad node.
    */
-  private static class IsOnPreferredHostComparator implements Comparator<Object> {
-    final private String preferredHostAddress;
-    public IsOnPreferredHostComparator(String preferredHostAddress) {
-      this.preferredHostAddress = preferredHostAddress;
+  static class NodePreferenceRulesComparator implements Comparator<Object> {
+    private static class PreferenceRule {
+      public final String name;
+      public final String value;
+
+      public PreferenceRule(String name, String value) {
+        this.name = name;
+        this.value = value;
+      }
+    }
+
+    private final SolrQueryRequest request;
+    private List<PreferenceRule> preferenceRules;
+    private String localHostAddress = null;
+
+    public NodePreferenceRulesComparator(final List<String> sortRules, final SolrQueryRequest request) {
+      this.request = request;
+      this.preferenceRules = new ArrayList<PreferenceRule>(sortRules.size());
+      sortRules.forEach(rule -> {
+        String[] parts = rule.split(":", 2);
+        if (parts.length != 2) {
+          throw new IllegalArgumentException("Invalid " + ShardParams.SHARDS_PREFERENCE + " rule: " + rule);
+        }
+        this.preferenceRules.add(new PreferenceRule(parts[0], parts[1])); 
+      });
     }
     @Override
     public int compare(Object left, Object right) {
-      final boolean lhs = hasPrefix(objectToString(left));
-      final boolean rhs = hasPrefix(objectToString(right));
-      if (lhs != rhs) {
-        if (lhs) {
-          return -1;
-        } else {
-          return +1;
+      for (PreferenceRule preferenceRule: this.preferenceRules) {
+        final boolean lhs;
+        final boolean rhs;
+        switch (preferenceRule.name) {
+          case ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE:
+            lhs = hasReplicaType(left, preferenceRule.value);
+            rhs = hasReplicaType(right, preferenceRule.value);
+            break;
+          case ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION:
+            lhs = hasCoreUrlPrefix(left, preferenceRule.value);
+            rhs = hasCoreUrlPrefix(right, preferenceRule.value);
+            break;
+          default:
+            throw new IllegalArgumentException("Invalid " + ShardParams.SHARDS_PREFERENCE + " type: " + preferenceRule.name);
+        }
+        if (lhs != rhs) {
+          return lhs ? -1 : +1;
         }
-      } else {
-        return 0;
       }
+      return 0;
     }
-    private String objectToString(Object o) {
+    private boolean hasCoreUrlPrefix(Object o, String prefix) {
       final String s;
       if (o instanceof String) {
         s = (String)o;
@@ -334,44 +366,80 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
       else if (o instanceof Replica) {
         s = ((Replica)o).getCoreUrl();
       } else {
-        s = null;
+        return false;
       }
-      return s;
+      if (prefix.equals(ShardParams.REPLICA_LOCAL)) {
+        if (null == localHostAddress) {
+          final ZkController zkController = this.request.getCore().getCoreContainer().getZkController();
+          localHostAddress = zkController != null ? zkController.getBaseUrl() : "";
+          if (localHostAddress.isEmpty()) {
+            log.warn("Couldn't determine current host address for sorting of local replicas");
+          }
+        }
+        if (!localHostAddress.isEmpty()) {
+          if (s.startsWith(localHostAddress)) {
+            return true;
+          }
+        }
+      } else {
+        if (s.startsWith(prefix)) {
+          return true;
+        }
+      }
+      return false;
     }
-    private boolean hasPrefix(String s) {
-      return s != null && s.startsWith(preferredHostAddress);
+    private static boolean hasReplicaType(Object o, String preferred) {
+      if (!(o instanceof Replica)) {
+        return false;
+      }
+      final String s = ((Replica)o).getType().toString();
+      return s.equals(preferred);
     }
   }
-  protected ReplicaListTransformer getReplicaListTransformer(final SolrQueryRequest req)
-  {
+
+  protected ReplicaListTransformer getReplicaListTransformer(final SolrQueryRequest req) {
     final SolrParams params = req.getParams();
+    @SuppressWarnings("deprecation")
+    final boolean preferLocalShards = params.getBool(CommonParams.PREFER_LOCAL_SHARDS, false);
+    final String shardsPreferenceSpec = params.get(ShardParams.SHARDS_PREFERENCE, "");
+
+    if (preferLocalShards || !shardsPreferenceSpec.isEmpty()) {
+      if (preferLocalShards && !shardsPreferenceSpec.isEmpty()) {
+        throw new SolrException(
+          SolrException.ErrorCode.BAD_REQUEST,
+          "preferLocalShards is deprecated and must not be used with shards.preference" 
+        );
+      }
+      List<String> preferenceRules = StrUtils.splitSmart(shardsPreferenceSpec, ',');
+      if (preferLocalShards) {
+        preferenceRules.add(ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION + ":" + ShardParams.REPLICA_LOCAL);
+      }
 
-    if (params.getBool(CommonParams.PREFER_LOCAL_SHARDS, false)) {
-      final CoreDescriptor coreDescriptor = req.getCore().getCoreDescriptor();
-      final ZkController zkController = req.getCore().getCoreContainer().getZkController();
-      final String preferredHostAddress = (zkController != null) ? zkController.getBaseUrl() : null;
-      if (preferredHostAddress == null) {
-        log.warn("Couldn't determine current host address to prefer local shards");
-      } else {
-        return new ShufflingReplicaListTransformer(r) {
-          @Override
-          public void transform(List<?> choices)
-          {
-            if (choices.size() > 1) {
-              super.transform(choices);
-              if (log.isDebugEnabled()) {
-                log.debug("Trying to prefer local shard on {} among the choices: {}",
-                    preferredHostAddress, Arrays.toString(choices.toArray()));
-              }
-              choices.sort(new IsOnPreferredHostComparator(preferredHostAddress));
-              if (log.isDebugEnabled()) {
-                log.debug("Applied local shard preference for choices: {}",
-                    Arrays.toString(choices.toArray()));
-              }
+      return new ShufflingReplicaListTransformer(r) {
+        @Override
+        public void transform(List<?> choices)
+        {
+          if (choices.size() > 1) {
+            super.transform(choices);
+            if (log.isDebugEnabled()) {
+              log.debug("Applying the following sorting preferences to replicas: {}",
+                  Arrays.toString(preferenceRules.toArray()));
+            }
+            try {
+              choices.sort(new NodePreferenceRulesComparator(preferenceRules, req));
+            } catch (IllegalArgumentException iae) {
+              throw new SolrException(
+                SolrException.ErrorCode.BAD_REQUEST,
+                iae.getMessage()
+              );
+            }
+            if (log.isDebugEnabled()) {
+              log.debug("Applied sorting preferences to replica list: {}",
+                  Arrays.toString(choices.toArray()));
             }
           }
-        };
-      }
+        }
+      };
     }
 
     return shufflingReplicaListTransformer;
@@ -409,4 +477,5 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
         manager.registry(registry),
         SolrMetricManager.mkName("httpShardExecutor", expandedScope, "threadPool"));
   }
+
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ba26bf7c/solr/core/src/test/org/apache/solr/handler/component/TestHttpShardHandlerFactory.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/handler/component/TestHttpShardHandlerFactory.java b/solr/core/src/test/org/apache/solr/handler/component/TestHttpShardHandlerFactory.java
index 3ffa015..523e31d 100644
--- a/solr/core/src/test/org/apache/solr/handler/component/TestHttpShardHandlerFactory.java
+++ b/solr/core/src/test/org/apache/solr/handler/component/TestHttpShardHandlerFactory.java
@@ -24,6 +24,10 @@ import java.util.List;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.ShardParams;
+import org.apache.solr.common.util.StrUtils;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.handler.component.HttpShardHandlerFactory;
 import org.apache.solr.handler.component.ShardHandlerFactory;
@@ -99,4 +103,119 @@ public class TestHttpShardHandlerFactory extends SolrTestCaseJ4 {
     }
   }
 
+  @SuppressWarnings("unchecked")
+  public void testNodePreferenceRulesComparator() throws Exception {
+    List<Replica> replicas = new ArrayList<Replica>();
+    replicas.add(
+      new Replica(
+        "node1",
+        map(
+          ZkStateReader.BASE_URL_PROP, "http://host1:8983/solr",
+          ZkStateReader.NODE_NAME_PROP, "node1",
+          ZkStateReader.CORE_NAME_PROP, "collection1",
+          ZkStateReader.REPLICA_TYPE, "NRT"
+        )
+      )
+    );
+    replicas.add(
+      new Replica(
+        "node2",
+        map(
+          ZkStateReader.BASE_URL_PROP, "http://host2:8983/solr",
+          ZkStateReader.NODE_NAME_PROP, "node2",
+          ZkStateReader.CORE_NAME_PROP, "collection1",
+          ZkStateReader.REPLICA_TYPE, "TLOG"
+        )
+      )
+    );
+    replicas.add(
+      new Replica(
+        "node3",
+        map(
+          ZkStateReader.BASE_URL_PROP, "http://host2_2:8983/solr",
+          ZkStateReader.NODE_NAME_PROP, "node3",
+          ZkStateReader.CORE_NAME_PROP, "collection1",
+          ZkStateReader.REPLICA_TYPE, "PULL"
+        )
+      )
+    );
+
+    // Simple replica type rule
+    List<String> rules = StrUtils.splitSmart(
+      ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":NRT," + 
+      ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":TLOG", 
+      ','
+    );
+    HttpShardHandlerFactory.NodePreferenceRulesComparator comparator = 
+      new HttpShardHandlerFactory.NodePreferenceRulesComparator(rules, null);
+    replicas.sort(comparator);
+    assertEquals("node1", replicas.get(0).getNodeName());
+    assertEquals("node2", replicas.get(1).getNodeName());
+
+    // Another simple replica type rule
+    rules = StrUtils.splitSmart(
+      ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":TLOG," + 
+      ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":NRT", 
+      ','
+    );
+    comparator = new HttpShardHandlerFactory.NodePreferenceRulesComparator(rules, null);
+    replicas.sort(comparator);
+    assertEquals("node2", replicas.get(0).getNodeName());
+    assertEquals("node1", replicas.get(1).getNodeName());
+
+    // replicaLocation rule
+    rules = StrUtils.splitSmart(ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION + ":http://host2:8983", ',');
+    comparator = new HttpShardHandlerFactory.NodePreferenceRulesComparator(rules, null);
+    replicas.sort(comparator);
+    assertEquals("node2", replicas.get(0).getNodeName());
+    assertEquals("node1", replicas.get(1).getNodeName());
+
+    // Add a replica so that sorting by replicaType:TLOG can cause a tie
+    replicas.add(
+      new Replica(
+        "node4",
+        map(
+          ZkStateReader.BASE_URL_PROP, "http://host2_2:8983/solr",
+          ZkStateReader.NODE_NAME_PROP, "node4",
+          ZkStateReader.CORE_NAME_PROP, "collection1",
+          ZkStateReader.REPLICA_TYPE, "TLOG"
+        )
+      )
+    );
+
+    // replicaType and replicaLocation combined rule
+    rules = StrUtils.splitSmart(
+      ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":NRT," + 
+      ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":TLOG," + 
+      ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION + ":http://host2_2", 
+      ','
+    );
+    comparator = new HttpShardHandlerFactory.NodePreferenceRulesComparator(rules, null);
+    replicas.sort(comparator);
+    assertEquals("node1", replicas.get(0).getNodeName());
+    assertEquals("node4", replicas.get(1).getNodeName());
+    assertEquals("node2", replicas.get(2).getNodeName());
+    assertEquals("node3", replicas.get(3).getNodeName());
+
+    // Bad rule
+    rules = StrUtils.splitSmart(ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE, ',');
+    try {
+      comparator = new HttpShardHandlerFactory.NodePreferenceRulesComparator(rules, null);
+      replicas.sort(comparator);
+      fail();
+    } catch (IllegalArgumentException e) {
+      assertEquals("Invalid shards.preference rule: " + ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE, e.getMessage());
+    }
+
+    // Unknown rule
+    rules = StrUtils.splitSmart("badRule:test", ',');
+    try {
+      comparator = new HttpShardHandlerFactory.NodePreferenceRulesComparator(rules, null);
+      replicas.sort(comparator);
+      fail();
+    } catch (IllegalArgumentException e) {
+      assertEquals("Invalid shards.preference type: badRule", e.getMessage());
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ba26bf7c/solr/solr-ref-guide/src/distributed-requests.adoc
----------------------------------------------------------------------
diff --git a/solr/solr-ref-guide/src/distributed-requests.adoc b/solr/solr-ref-guide/src/distributed-requests.adoc
index 096f632..b199c7e 100644
--- a/solr/solr-ref-guide/src/distributed-requests.adoc
+++ b/solr/solr-ref-guide/src/distributed-requests.adoc
@@ -138,6 +138,46 @@ For example, a deadlock might occur in the case of two shards, each with just a
 
 == preferLocalShards Parameter
 
+Deprecated, use `shards.preference=replica.location:local` instead (see below).
+
 Solr allows you to pass an optional boolean parameter named `preferLocalShards` to indicate that a distributed query should prefer local replicas of a shard when available. In other words, if a query includes `preferLocalShards=true`, then the query controller will look for local replicas to service the query instead of selecting replicas at random from across the cluster. This is useful when a query requests many fields or large fields to be returned per document because it avoids moving large amounts of data over the network when it is available locally. In addition, this feature can be useful for minimizing the impact of a problematic replica with degraded performance, as it reduces the likelihood that the degraded replica will be hit by other healthy replicas.
 
 Lastly, it follows that the value of this feature diminishes as the number of shards in a collection increases because the query controller will have to direct the query to non-local replicas for most of the shards. In other words, this feature is mostly useful for optimizing queries directed towards collections with a small number of shards and many replicas. Also, this option should only be used if you are load balancing requests across all nodes that host replicas for the collection you are querying, as Solr's CloudSolrClient will do. If not load-balancing, this feature can introduce a hotspot in the cluster since queries won't be evenly distributed across the cluster.
+
+== shards.preference Parameter
+
+Solr allows you to pass an optional string parameter named `shards.preference` to indicate that a distributed query should sort the available replicas in the given order of precedence within each shard. The syntax is: `shards.preference=property:value`. The order of the properties and the values are significant meaning that the first one is the primary sort, second one is secondary etc.
+
+IMPORTANT: `shards.preference` only works for distributed queries, i.e. queries targeting multiple shards. Not implemented yet for single shard scenarios
+
+The properties that can be specified are as follows:
+
+`replica.type`::
+One or more replica types that are preferred. Any combination of PULL, TLOG and NRT is allowed.
+
+`replica.location`::
+One or more replica locations that are preferred. A location starts with `http://hostname:port`. Matching is done for the given string as a prefix, so it's possible to e.g. leave out the port. `local` may be used as special value to denote any local replica running on the same Solr instance as the one handling the query. This is useful when a query requests many fields or large fields to be returned per document because it avoids moving large amounts of data over the network when it is available locally. In addition, this feature can be useful for minimizing the impact of a problematic replica with degraded performance, as it reduces the likelihood that the degraded replica will be hit by other healthy replicas.
+
+The value of `replica.location:local` diminishes as the number of shards (that have no locally-available replicas) in a collection increases because the query controller will have to direct the query to non-local replicas for most of the shards. In other words, this feature is mostly useful for optimizing queries directed towards collections with a small number of shards and many replicas. Also, this option should only be used if you are load balancing requests across all nodes that host replicas for the collection you are querying, as Solr's CloudSolrClient will do. If not load-balancing, this feature can introduce a hotspot in the cluster since queries won't be evenly distributed across the cluster.
+
+Examples:
+
+ * Prefer PULL replicas:
+   `shards.preference=replica.type:PULL`
+
+ * Prefer PULL replicas, or TLOG replicas if PULL replicas not available:
+   `shards.preference=replica.type:PULL,replica.type:TLOG`   
+
+ * Prefer any local replicas:
+   `shards.preference=replica.location:local`
+
+ * Prefer any replicas on a host called "server1" with "server2" as the secondary option:
+   `shards.preference=replica.location:http://server1,replica.location:http://server2`
+
+ * Prefer PULL replicas if available, otherwise TLOG replicas, and local ones among those:
+   `shards.preference=replica.type:PULL,replica.type:TLOG,replica.location:local`
+
+ * Prefer local replicas, and among them PULL replicas when available TLOG otherwise:
+   `shards.preference=replica.location:local,replica.type:PULL,replica.type:TLOG`
+
+Note that if you provide the settings in a query string, they need to be properly URL-encoded.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ba26bf7c/solr/solr-ref-guide/src/shards-and-indexing-data-in-solrcloud.adoc
----------------------------------------------------------------------
diff --git a/solr/solr-ref-guide/src/shards-and-indexing-data-in-solrcloud.adoc b/solr/solr-ref-guide/src/shards-and-indexing-data-in-solrcloud.adoc
index 81c6f86..b899c5f 100644
--- a/solr/solr-ref-guide/src/shards-and-indexing-data-in-solrcloud.adoc
+++ b/solr/solr-ref-guide/src/shards-and-indexing-data-in-solrcloud.adoc
@@ -86,6 +86,10 @@ If the PULL replica cannot connect to ZooKeeper, it would be removed from the cl
 
 If the PULL replica dies or is unreachable for any other reason, it won't be query-able. When it rejoins the cluster, it would replicate from the leader and when that is complete, it would be ready to serve queries again.
 
+=== Queries with Preferred Replica Types
+
+By default all replicas serve queries. See the section <<distributed-requests.adoc#shards-preference-parameter,shards.preference Parameter>> for details on how to indicate preferred replica types for queries.
+
 == Document Routing
 
 Solr offers the ability to specify the router implementation used by a collection by specifying the `router.name` parameter when <<collections-api.adoc#create,creating your collection>>.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ba26bf7c/solr/solrj/src/java/org/apache/solr/common/params/CommonParams.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CommonParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CommonParams.java
index b44e761..6fe2950 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CommonParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CommonParams.java
@@ -262,6 +262,7 @@ public interface CommonParams {
 
   /**
    * When querying a node, prefer local node's cores for distributed queries.
+   * @deprecated Use {@code ShardParams.SHARDS_PREFERENCE}
    */
   String PREFER_LOCAL_SHARDS = "preferLocalShards";
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ba26bf7c/solr/solrj/src/java/org/apache/solr/common/params/ShardParams.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/ShardParams.java b/solr/solrj/src/java/org/apache/solr/common/params/ShardParams.java
index cbc33f4..567e963 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/ShardParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/ShardParams.java
@@ -52,6 +52,18 @@ public interface ShardParams {
   /** query purpose for shard requests */
   String SHARDS_PURPOSE = "shards.purpose";
 
+  /** Shards sorting rules */
+  String SHARDS_PREFERENCE = "shards.preference";
+
+  /** Replica type sort rule */
+  String SHARDS_PREFERENCE_REPLICA_TYPE = "replica.type";
+
+  /** Replica location sort rule */
+  String SHARDS_PREFERENCE_REPLICA_LOCATION = "replica.location";
+
+  /** Value denoting local replicas */
+  String REPLICA_LOCAL = "local";
+
   String _ROUTE_ = "_route_";
 
   /** Force a single-pass distributed query? (true/false) */

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ba26bf7c/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
index e54f9ad..bc4bd8c 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
@@ -416,18 +416,24 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
         .commit(getRandomClient(), collectionName);
 
     // Run the actual test for 'preferLocalShards'
-    queryWithPreferLocalShards(getRandomClient(), true, collectionName);
+    queryWithShardsPreferenceRules(getRandomClient(), false, collectionName);
+    queryWithShardsPreferenceRules(getRandomClient(), true, collectionName);
   }
 
-  private void queryWithPreferLocalShards(CloudSolrClient cloudClient,
-                                          boolean preferLocalShards,
+  @SuppressWarnings("deprecation")
+  private void queryWithShardsPreferenceRules(CloudSolrClient cloudClient,
+                                          boolean useShardsPreference,
                                           String collectionName)
       throws Exception
   {
     SolrQuery qRequest = new SolrQuery("*:*");
 
     ModifiableSolrParams qParams = new ModifiableSolrParams();
-    qParams.add(CommonParams.PREFER_LOCAL_SHARDS, Boolean.toString(preferLocalShards));
+    if (useShardsPreference) {
+      qParams.add(ShardParams.SHARDS_PREFERENCE, ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION + ":" + ShardParams.REPLICA_LOCAL);
+    } else {
+      qParams.add(CommonParams.PREFER_LOCAL_SHARDS, "true");
+    }
     qParams.add(ShardParams.SHARDS_INFO, "true");
     qRequest.add(qParams);
 
@@ -454,17 +460,15 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
     log.info("Shards giving the response: " + Arrays.toString(shardAddresses.toArray()));
 
     // Make sure the distributed queries were directed to a single node only
-    if (preferLocalShards) {
-      Set<Integer> ports = new HashSet<Integer>();
-      for (String shardAddr: shardAddresses) {
-        URL url = new URL (shardAddr);
-        ports.add(url.getPort());
-      }
-
-      // This assertion would hold true as long as every shard has a core on each node
-      assertTrue ("Response was not received from shards on a single node",
-          shardAddresses.size() > 1 && ports.size()==1);
+    Set<Integer> ports = new HashSet<Integer>();
+    for (String shardAddr: shardAddresses) {
+      URL url = new URL (shardAddr);
+      ports.add(url.getPort());
     }
+
+    // This assertion would hold true as long as every shard has a core on each node
+    assertTrue ("Response was not received from shards on a single node",
+        shardAddresses.size() > 1 && ports.size()==1);
   }
 
   private Long getNumRequests(String baseUrl, String collectionName) throws
@@ -844,4 +848,115 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
     }
   }
 
+  /**
+   * Tests if the specification of 'preferReplicaTypes' in the query-params
+   * limits the distributed query to locally hosted shards only
+   */
+  @Test
+  public void preferReplicaTypesTest() throws Exception {
+
+    String collectionName = "replicaTypesTestColl";
+
+    int liveNodes = cluster.getJettySolrRunners().size();
+
+    // For these tests we need to have multiple replica types.
+    // Hence the below configuration for our collection
+    CollectionAdminRequest.createCollection(collectionName, "conf", liveNodes, 1, 1, Math.max(1, liveNodes - 2))
+        .setMaxShardsPerNode(liveNodes)
+        .processAndWait(cluster.getSolrClient(), TIMEOUT);
+    AbstractDistribZkTestBase.waitForRecoveriesToFinish(collectionName, cluster.getSolrClient().getZkStateReader(), false, true, TIMEOUT);
+
+    // Add some new documents
+    new UpdateRequest()
+        .add(id, "0", "a_t", "hello1")
+        .add(id, "2", "a_t", "hello2")
+        .add(id, "3", "a_t", "hello2")
+        .commit(getRandomClient(), collectionName);
+
+    // Run the actual tests for 'shards.preference=replica.type:*'
+    queryWithPreferReplicaTypes(getRandomClient(), "PULL", false, collectionName);
+    queryWithPreferReplicaTypes(getRandomClient(), "PULL|TLOG", false, collectionName);
+    queryWithPreferReplicaTypes(getRandomClient(), "TLOG", false, collectionName);
+    queryWithPreferReplicaTypes(getRandomClient(), "TLOG|PULL", false, collectionName);
+    queryWithPreferReplicaTypes(getRandomClient(), "NRT", false, collectionName);
+    queryWithPreferReplicaTypes(getRandomClient(), "NRT|PULL", false, collectionName);
+    // Test to verify that preferLocalShards=true doesn't break this
+    queryWithPreferReplicaTypes(getRandomClient(), "PULL", true, collectionName);
+    queryWithPreferReplicaTypes(getRandomClient(), "PULL|TLOG", true, collectionName);
+    queryWithPreferReplicaTypes(getRandomClient(), "TLOG", true, collectionName);
+    queryWithPreferReplicaTypes(getRandomClient(), "TLOG|PULL", true, collectionName);
+    queryWithPreferReplicaTypes(getRandomClient(), "NRT", false, collectionName);
+    queryWithPreferReplicaTypes(getRandomClient(), "NRT|PULL", true, collectionName);
+  }
+
+  private void queryWithPreferReplicaTypes(CloudSolrClient cloudClient,
+                                           String preferReplicaTypes,
+                                           boolean preferLocalShards,
+                                           String collectionName)
+      throws Exception
+  {
+    SolrQuery qRequest = new SolrQuery("*:*");
+    ModifiableSolrParams qParams = new ModifiableSolrParams();
+
+    final List<String> preferredTypes = Arrays.asList(preferReplicaTypes.split("\\|"));
+    StringBuilder rule = new StringBuilder();
+    preferredTypes.forEach(type -> {
+      if (rule.length() != 0) {
+        rule.append(',');
+      }
+      rule.append(ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE);
+      rule.append(':');
+      rule.append(type);
+    });
+    if (preferLocalShards) {
+      if (rule.length() != 0) {
+        rule.append(',');
+      }
+      rule.append(ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION);
+      rule.append(":local");
+    }
+    qParams.add(ShardParams.SHARDS_PREFERENCE, rule.toString());  
+    qParams.add(ShardParams.SHARDS_INFO, "true");
+    qRequest.add(qParams);
+
+    // CloudSolrClient sends the request to some node.
+    // And since all the nodes are hosting cores from all shards, the
+    // distributed query formed by this node will select cores from the
+    // local shards only
+    QueryResponse qResponse = cloudClient.query(collectionName, qRequest);
+
+    Object shardsInfo = qResponse.getResponse().get(ShardParams.SHARDS_INFO);
+    assertNotNull("Unable to obtain "+ShardParams.SHARDS_INFO, shardsInfo);
+
+    Map<String, String> replicaTypeMap = new HashMap<String, String>();
+    DocCollection collection = getCollectionState(collectionName);
+    for (Slice slice : collection.getSlices()) {
+      for (Replica replica : slice.getReplicas()) {
+        String coreUrl = replica.getCoreUrl();
+        // It seems replica reports its core URL with a trailing slash while shard
+        // info returned from the query doesn't. Oh well.
+        if (coreUrl.endsWith("/")) {
+          coreUrl = coreUrl.substring(0, coreUrl.length() - 1);
+        }
+        replicaTypeMap.put(coreUrl, replica.getType().toString());
+      }
+    }
+
+    // Iterate over shards-info and check that replicas of correct type responded
+    SimpleOrderedMap<?> shardsInfoMap = (SimpleOrderedMap<?>)shardsInfo;
+    Iterator<Map.Entry<String, ?>> itr = shardsInfoMap.asMap(100).entrySet().iterator();
+    List<String> shardAddresses = new ArrayList<String>();
+    while (itr.hasNext()) {
+      Map.Entry<String, ?> e = itr.next();
+      assertTrue("Did not find map-type value in "+ShardParams.SHARDS_INFO, e.getValue() instanceof Map);
+      String shardAddress = (String)((Map)e.getValue()).get("shardAddress");
+      assertNotNull(ShardParams.SHARDS_INFO+" did not return 'shardAddress' parameter", shardAddress);
+      assertTrue(replicaTypeMap.containsKey(shardAddress));
+      assertTrue(preferredTypes.indexOf(replicaTypeMap.get(shardAddress)) == 0);
+      shardAddresses.add(shardAddress);
+    }
+    assertTrue("No responses", shardAddresses.size() > 0);
+    log.info("Shards giving the response: " + Arrays.toString(shardAddresses.toArray()));
+  }
+
 }