You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2017/06/07 14:11:45 UTC

[41/50] [abbrv] lucene-solr:feature/autoscaling: SOLR-10419: All collection APIs should use the new Policy framework for replica placement

SOLR-10419: All collection APIs should use the new Policy framework for replica placement


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/b47572ee
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/b47572ee
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/b47572ee

Branch: refs/heads/feature/autoscaling
Commit: b47572ee879468ac552668dcc78095c6dfbec11b
Parents: 744d1ab
Author: Shalin Shekhar Mangar <sh...@apache.org>
Authored: Tue Jun 6 09:22:38 2017 +0530
Committer: Shalin Shekhar Mangar <sh...@apache.org>
Committed: Tue Jun 6 09:22:38 2017 +0530

----------------------------------------------------------------------
 solr/CHANGES.txt                                |  2 +
 .../src/java/org/apache/solr/cloud/Assign.java  |  7 +-
 .../apache/solr/cloud/CreateCollectionCmd.java  |  2 +-
 .../cloud/OverseerCollectionMessageHandler.java |  5 +-
 .../java/org/apache/solr/cloud/RestoreCmd.java  |  2 +-
 .../org/apache/solr/cloud/SplitShardCmd.java    |  2 +-
 .../solr/cloud/autoscaling/TestPolicyCloud.java | 69 ++++++++++++++++++++
 .../apache/solr/cloud/autoscaling/Policy.java   | 13 +++-
 .../solr/cloud/autoscaling/PolicyHelper.java    | 10 ++-
 .../solr/cloud/autoscaling/TestPolicy.java      |  4 +-
 10 files changed, 102 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b47572ee/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 21d9ec1..d83b786 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -208,6 +208,8 @@ Other Changes
 
 * SOLR-10782: Improve error handling and tests for Snitch and subclasses and general cleanups. (Noble Paul, shalin)
 
+* SOLR-10419: All collection APIs should use the new Policy framework for replica placement. (Noble Paul, shalin)
+
 ==================  6.7.0 ==================
 
 Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b47572ee/solr/core/src/java/org/apache/solr/cloud/Assign.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Assign.java b/solr/core/src/java/org/apache/solr/cloud/Assign.java
index 4e1fd68..eeab761 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Assign.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Assign.java
@@ -199,7 +199,7 @@ public class Assign {
     Map autoScalingJson = Utils.getJson(cc.getZkController().getZkClient(), SOLR_AUTOSCALING_CONF_PATH, true);
     if (policyName != null || autoScalingJson.get(Policy.CLUSTER_POLICY) != null) {
       positions= Assign.getPositionsUsingPolicy(collectionName, Collections.singletonList(shard), numberOfNodes,
-          policyName, cc.getZkController().getZkStateReader());
+          policyName, cc.getZkController().getZkStateReader(), createNodeList);
     }
 
     if(positions != null){
@@ -216,7 +216,8 @@ public class Assign {
 
   }
   public static Map<ReplicaAssigner.Position, String> getPositionsUsingPolicy(String collName, List<String> shardNames, int numReplicas,
-                                                                              String policyName, ZkStateReader zkStateReader) throws KeeperException, InterruptedException {
+                                                                              String policyName, ZkStateReader zkStateReader,
+                                                                              List<String> nodesList) throws KeeperException, InterruptedException {
     try (CloudSolrClient csc = new CloudSolrClient.Builder()
         .withClusterStateProvider(new ZkClientClusterStateProvider(zkStateReader))
         .build()) {
@@ -224,7 +225,7 @@ public class Assign {
       Map<String, Object> autoScalingJson = Utils.getJson(zkStateReader.getZkClient(), SOLR_AUTOSCALING_CONF_PATH, true);
       Map<String, List<String>> locations = PolicyHelper.getReplicaLocations(collName,
           autoScalingJson,
-          clientDataProvider, singletonMap(collName, policyName), shardNames, numReplicas);
+          clientDataProvider, singletonMap(collName, policyName), shardNames, numReplicas, nodesList);
       Map<ReplicaAssigner.Position, String> result = new HashMap<>();
       for (Map.Entry<String, List<String>> e : locations.entrySet()) {
         List<String> value = e.getValue();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b47572ee/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
index 3d1a54e..e0d4cba 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
@@ -162,7 +162,7 @@ public class CreateCollectionCmd implements Cmd {
               + " shards to be created (higher than the allowed number)");
         }
 
-        positionVsNodes = ocmh.identifyNodes(clusterState, nodeList, message, shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas);
+        positionVsNodes = ocmh.identifyNodes(clusterState, nodeList, collectionName, message, shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas);
       }
 
       ZkStateReader zkStateReader = ocmh.zkStateReader;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b47572ee/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
index e5b3b9b..a055033 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
@@ -706,6 +706,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
 
   Map<Position, String> identifyNodes(ClusterState clusterState,
                                       List<String> nodeList,
+                                      String collectionName,
                                       ZkNodeProps message,
                                       List<String> shardNames,
                                       int numNrtReplicas, 
@@ -741,8 +742,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
     }
 
     if (policyName != null || autoScalingJson.get(Policy.CLUSTER_POLICY) != null) {
-      return Assign.getPositionsUsingPolicy(message.getStr(COLLECTION_PROP, message.getStr(NAME)),
-          shardNames, numNrtReplicas, policyName, zkStateReader);
+      return Assign.getPositionsUsingPolicy(collectionName,
+          shardNames, numNrtReplicas, policyName, zkStateReader, nodeList);
 
     } else {
       List<Rule> rules = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b47572ee/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
index 76c12b8..6a18bff 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
@@ -214,7 +214,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
     restoreCollection.getSlices().forEach(x -> sliceNames.add(x.getName()));
 
     Map<ReplicaAssigner.Position, String> positionVsNodes = ocmh.identifyNodes(clusterState, nodeList,
-        message, sliceNames, numNrtReplicas, numTlogReplicas, numPullReplicas);
+        restoreCollectionName, message, sliceNames, numNrtReplicas, numTlogReplicas, numPullReplicas);
 
     //Create one replica per shard and copy backed up data to it
     for (Slice slice : restoreCollection.getSlices()) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b47572ee/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
index fe95458..2e2e335 100644
--- a/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
@@ -381,9 +381,9 @@ public class SplitShardCmd implements Cmd {
 
       // TODO: change this to handle sharding a slice into > 2 sub-shards.
 
-
       Map<ReplicaAssigner.Position, String> nodeMap = ocmh.identifyNodes(clusterState,
           new ArrayList<>(clusterState.getLiveNodes()),
+          collectionName,
           new ZkNodeProps(collection.getProperties()),
           subSlices, repFactor - 1, 0, 0);
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b47572ee/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java
index fa592f3..ddb9d11 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.lucene.util.LuceneTestCase;
@@ -31,7 +32,9 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.cloud.OverseerTaskProcessor;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.Utils;
 import org.apache.zookeeper.KeeperException;
 import org.junit.After;
@@ -61,6 +64,72 @@ public class TestPolicyCloud extends SolrCloudTestCase {
         "{}".getBytes(StandardCharsets.UTF_8), true);
   }
 
+  public void testCreateCollectionAddReplica() throws Exception  {
+    JettySolrRunner jetty = cluster.getRandomJetty(random());
+    int port = jetty.getLocalPort();
+
+    String commands =  "{set-policy :{c1 : [{replica:2 , shard:'#EACH', port: '" + port + "'}]}}";
+    cluster.getSolrClient().request(AutoScalingHandlerTest.createAutoScalingRequest(SolrRequest.METHOD.POST, commands));
+
+    String collectionName = "testCreateCollectionAddReplica";
+    CollectionAdminRequest.createCollection(collectionName, 1, 1)
+        .setPolicy("c1")
+        .process(cluster.getSolrClient());
+
+    getCollectionState(collectionName).forEachReplica((s, replica) -> assertEquals(jetty.getNodeName(), replica.getNodeName()));
+
+    CollectionAdminRequest.addReplicaToShard(collectionName, "shard1").process(cluster.getSolrClient());
+    waitForState("Timed out waiting to see 2 replicas for collection: " + collectionName,
+        collectionName, (liveNodes, collectionState) -> collectionState.getReplicas().size() == 2);
+
+    getCollectionState(collectionName).forEachReplica((s, replica) -> assertEquals(jetty.getNodeName(), replica.getNodeName()));
+  }
+
+  public void testCreateCollectionSplitShard() throws Exception  {
+    JettySolrRunner firstNode = cluster.getRandomJetty(random());
+    int firstNodePort = firstNode.getLocalPort();
+
+    JettySolrRunner secondNode = null;
+    while (true)  {
+      secondNode = cluster.getRandomJetty(random());
+      if (secondNode.getLocalPort() != firstNodePort)  break;
+    }
+    int secondNodePort = secondNode.getLocalPort();
+
+    String commands =  "{set-policy :{c1 : [{replica:1 , shard:'#EACH', port: '" + firstNodePort + "'}, {replica:1, shard:'#EACH', port:'" + secondNodePort + "'}]}}";
+    NamedList<Object> response = cluster.getSolrClient().request(AutoScalingHandlerTest.createAutoScalingRequest(SolrRequest.METHOD.POST, commands));
+    assertEquals("success", response.get("result"));
+
+    String collectionName = "testCreateCollectionSplitShard";
+    CollectionAdminRequest.createCollection(collectionName, 1, 2)
+        .setPolicy("c1")
+        .setMaxShardsPerNode(10)
+        .process(cluster.getSolrClient());
+
+    DocCollection docCollection = getCollectionState(collectionName);
+    List<Replica> list = docCollection.getReplicas(firstNode.getNodeName());
+    int replicasOnNode1 = list != null ? list.size() : 0;
+    list = docCollection.getReplicas(secondNode.getNodeName());
+    int replicasOnNode2 = list != null ? list.size() : 0;
+
+    assertEquals("Expected exactly one replica of collection on node with port: " + firstNodePort, 1, replicasOnNode1);
+    assertEquals("Expected exactly one replica of collection on node with port: " + secondNodePort, 1, replicasOnNode2);
+
+    CollectionAdminRequest.splitShard(collectionName).setShardName("shard1").process(cluster.getSolrClient());
+
+    waitForState("Timed out waiting to see 6 replicas for collection: " + collectionName,
+        collectionName, (liveNodes, collectionState) -> collectionState.getReplicas().size() == 6);
+
+    docCollection = getCollectionState(collectionName);
+    list = docCollection.getReplicas(firstNode.getNodeName());
+    replicasOnNode1 = list != null ? list.size() : 0;
+    list = docCollection.getReplicas(secondNode.getNodeName());
+    replicasOnNode2 = list != null ? list.size() : 0;
+
+    assertEquals("Expected exactly three replica of collection on node with port: " + firstNodePort, 3, replicasOnNode1);
+    assertEquals("Expected exactly three replica of collection on node with port: " + secondNodePort, 3, replicasOnNode2);
+  }
+
   public void testCreateCollectionAddShardUsingPolicy() throws Exception {
     JettySolrRunner jetty = cluster.getRandomJetty(random());
     int port = jetty.getLocalPort();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b47572ee/solr/solrj/src/java/org/apache/solr/cloud/autoscaling/Policy.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/cloud/autoscaling/Policy.java b/solr/solrj/src/java/org/apache/solr/cloud/autoscaling/Policy.java
index 72aeda9..dd9dfc5 100644
--- a/solr/solrj/src/java/org/apache/solr/cloud/autoscaling/Policy.java
+++ b/solr/solrj/src/java/org/apache/solr/cloud/autoscaling/Policy.java
@@ -348,7 +348,11 @@ public class Policy implements MapWriter {
     }
 
     public Suggester hint(Hint hint, Object value) {
-      hints.put(hint, value);
+      if (hint == Hint.TARGET_NODE || hint == Hint.SRC_NODE) {
+        ((Set) hints.computeIfAbsent(hint, h -> new HashSet<>())).add(value);
+      } else {
+        hints.put(hint, value);
+      }
       return this;
     }
 
@@ -461,7 +465,12 @@ public class Policy implements MapWriter {
 
     protected boolean isAllowed(Object v, Hint hint) {
       Object hintVal = hints.get(hint);
-      return hintVal == null || Objects.equals(v, hintVal);
+      if (hint == Hint.TARGET_NODE || hint == Hint.SRC_NODE) {
+        Set set = (Set) hintVal;
+        return set == null || set.contains(v);
+      } else {
+        return hintVal == null || Objects.equals(v, hintVal);
+      }
     }
 
     public enum Hint {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b47572ee/solr/solrj/src/java/org/apache/solr/cloud/autoscaling/PolicyHelper.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/cloud/autoscaling/PolicyHelper.java b/solr/solrj/src/java/org/apache/solr/cloud/autoscaling/PolicyHelper.java
index 168e94e..33d4b97 100644
--- a/solr/solrj/src/java/org/apache/solr/cloud/autoscaling/PolicyHelper.java
+++ b/solr/solrj/src/java/org/apache/solr/cloud/autoscaling/PolicyHelper.java
@@ -25,10 +25,10 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.cloud.autoscaling.Policy.Suggester.Hint;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.util.Utils;
-import org.apache.solr.cloud.autoscaling.Policy.Suggester.Hint;
 
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
 
@@ -37,7 +37,8 @@ public class PolicyHelper {
                                                               ClusterDataProvider cdp,
                                                               Map<String, String> optionalPolicyMapping,
                                                               List<String> shardNames,
-                                                              int repFactor) {
+                                                              int repFactor,
+                                                              List<String> nodesList) {
     Map<String, List<String>> positionMapping = new HashMap<>();
     for (String shardName : shardNames) positionMapping.put(shardName, new ArrayList<>(repFactor));
     if (optionalPolicyMapping != null) {
@@ -76,6 +77,11 @@ public class PolicyHelper {
         Policy.Suggester suggester = session.getSuggester(ADDREPLICA)
             .hint(Hint.COLL, collName)
             .hint(Hint.SHARD, shardName);
+        if (nodesList != null)  {
+          for (String nodeName : nodesList) {
+            suggester = suggester.hint(Hint.TARGET_NODE, nodeName);
+          }
+        }
         SolrRequest op = suggester.getOperation();
         if (op == null) {
           throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No node can satisfy the rules "+ Utils.toJSONString(Utils.getDeepCopy(session.expandedClauses, 4, true)));

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b47572ee/solr/solrj/src/test/org/apache/solr/cloud/autoscaling/TestPolicy.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/cloud/autoscaling/TestPolicy.java b/solr/solrj/src/test/org/apache/solr/cloud/autoscaling/TestPolicy.java
index f992109..8c296b9 100644
--- a/solr/solrj/src/test/org/apache/solr/cloud/autoscaling/TestPolicy.java
+++ b/solr/solrj/src/test/org/apache/solr/cloud/autoscaling/TestPolicy.java
@@ -586,7 +586,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
     };
     Map<String, List<String>> locations = PolicyHelper.getReplicaLocations(
         "newColl", (Map<String, Object>) Utils.fromJSONString(autoScaleJson),
-        dataProvider, Collections.singletonMap("newColl", "c1"), Arrays.asList("shard1", "shard2"), 1);
+        dataProvider, Collections.singletonMap("newColl", "c1"), Arrays.asList("shard1", "shard2"), 1, null);
     assertTrue(locations.get("shard1").containsAll(ImmutableList.of("127.0.0.1:50096_solr")));
     assertTrue(locations.get("shard2").containsAll(ImmutableList.of("127.0.0.1:50096_solr")));
   }
@@ -643,7 +643,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
     };
     Map<String, List<String>> locations = PolicyHelper.getReplicaLocations(
         "newColl", (Map<String, Object>) Utils.fromJSONString(autoScaleJson),
-        dataProvider, Collections.singletonMap("newColl", "policy1"), Arrays.asList("shard1", "shard2"), 3);
+        dataProvider, Collections.singletonMap("newColl", "policy1"), Arrays.asList("shard1", "shard2"), 3, null);
     assertTrue(locations.get("shard1").containsAll(ImmutableList.of("node2", "node1", "node3")));
     assertTrue(locations.get("shard2").containsAll(ImmutableList.of("node2", "node1", "node3")));