You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by kr...@apache.org on 2016/11/15 15:41:03 UTC

[6/9] lucene-solr:jira/solr-8593: SOLR-9736: Solr resolves the collection name against the first available leader or first replica of the first slice

SOLR-9736: Solr resolves the collection name against the first available leader or first replica of the first slice


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/0d290ae1
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/0d290ae1
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/0d290ae1

Branch: refs/heads/jira/solr-8593
Commit: 0d290ae136b246918eb8e7257a2197cee9910199
Parents: b57a5e4
Author: Shalin Shekhar Mangar <sh...@apache.org>
Authored: Tue Nov 15 14:18:43 2016 +0530
Committer: Shalin Shekhar Mangar <sh...@apache.org>
Committed: Tue Nov 15 14:18:43 2016 +0530

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   4 +
 .../org/apache/solr/servlet/HttpSolrCall.java   |  87 ++++++----
 .../solr/servlet/HttpSolrCallGetCoreTest.java   | 167 +++++++++++++++++++
 .../apache/solr/common/cloud/DocCollection.java |  42 ++++-
 4 files changed, 268 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0d290ae1/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index e7f7b6e..c2f218a 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -152,6 +152,10 @@ Bug Fixes
   
 * SOLR-9751: PreAnalyzedField can cause managed schema corruption. (Steve Rowe)
 
+* SOLR-9736: Solr resolves the collection name against the first available leader or first replica
+  of the first slice. This puts undue pressure on leader cores and likely on the wrong ones. This is
+  fixed to randomly pick a leader on updates or a replica core otherwise. (Cao Manh Dat via shalin)
+
 
 Other Changes
 ----------------------

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0d290ae1/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
index c41595e..1f98da9 100644
--- a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
+++ b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
@@ -191,7 +191,7 @@ public class HttpSolrCall {
     return queryParams;
   }
   
-  private void init() throws Exception {
+  void init() throws Exception {
     //The states of client that is invalid in this request
     Aliases aliases = null;
     String corename = "";
@@ -271,7 +271,11 @@ public class HttpSolrCall {
 
     if (core == null && cores.isZooKeeperAware()) {
       // we couldn't find the core - lets make sure a collection was not specified instead
-      core = getCoreByCollection(corename);
+      boolean isPreferLeader = false;
+      if (path.endsWith("/update") || path.contains("/update/")) {
+        isPreferLeader = true;
+      }
+      core = getCoreByCollection(corename, isPreferLeader);
       if (core != null) {
         // we found a core, update the path
         path = path.substring(idx);
@@ -753,7 +757,7 @@ public class HttpSolrCall {
     return result;
   }
 
-  private SolrCore getCoreByCollection(String collectionName) {
+  private SolrCore getCoreByCollection(String collectionName, boolean isPreferLeader) {
     ZkStateReader zkStateReader = cores.getZkController().getZkStateReader();
 
     ClusterState clusterState = zkStateReader.getClusterState();
@@ -761,37 +765,27 @@ public class HttpSolrCall {
     if (collection == null) {
       return null;
     }
-    Map<String, Slice> slices = collection.getActiveSlicesMap();
-    if (slices == null) {
-      return null;
-    }
+
     Set<String> liveNodes = clusterState.getLiveNodes();
-    // look for a core on this node
-    Set<Map.Entry<String, Slice>> entries = slices.entrySet();
-    SolrCore core = null;
 
-    //Hitting the leaders is useful when it's an update request.
-    //For queries it doesn't matter and hence we don't distinguish here.
-    for (Map.Entry<String, Slice> entry : entries) {
-      // first see if we have the leader
-      Replica leaderProps = collection.getLeader(entry.getKey());
-      if (leaderProps != null && liveNodes.contains(leaderProps.getNodeName()) && leaderProps.getState() == Replica.State.ACTIVE) {
-        core = checkProps(leaderProps);
-        if (core != null) {
-          return core;
-        }
-      }
+    if (isPreferLeader) {
+      List<Replica> leaderReplicas = collection.getLeaderReplicas(cores.getZkController().getNodeName());
+      SolrCore core = randomlyGetSolrCore(liveNodes, leaderReplicas);
+      if (core != null) return core;
+    }
 
-      // check everyone then
-      Map<String, Replica> shards = entry.getValue().getReplicasMap();
-      Set<Map.Entry<String, Replica>> shardEntries = shards.entrySet();
-      for (Map.Entry<String, Replica> shardEntry : shardEntries) {
-        Replica zkProps = shardEntry.getValue();
-        if (liveNodes.contains(zkProps.getNodeName()) && zkProps.getState() == Replica.State.ACTIVE) {
-          core = checkProps(zkProps);
-          if (core != null) {
-            return core;
-          }
+    List<Replica> replicas = collection.getReplicas(cores.getZkController().getNodeName());
+    return randomlyGetSolrCore(liveNodes, replicas);
+  }
+
+  private SolrCore randomlyGetSolrCore(Set<String> liveNodes, List<Replica> replicas) {
+    if (replicas != null) {
+      RandomIterator<Replica> it = new RandomIterator<>(random, replicas);
+      while (it.hasNext()) {
+        Replica replica = it.next();
+        if (liveNodes.contains(replica.getNodeName()) && replica.getState() == Replica.State.ACTIVE) {
+          SolrCore core = checkProps(replica);
+          if (core != null) return core;
         }
       }
     }
@@ -1027,4 +1021,35 @@ public class HttpSolrCall {
   static final String CONNECTION_HEADER = "Connection";
   static final String TRANSFER_ENCODING_HEADER = "Transfer-Encoding";
   static final String CONTENT_LENGTH_HEADER = "Content-Length";
+
+  /**
+   * A faster method for randomly picking items when you do not need to
+   * consume all items.
+   */
+  private static class RandomIterator<E> implements Iterator<E> {
+    private Random rand;
+    private ArrayList<E> elements;
+    private int size;
+
+    public RandomIterator(Random rand, Collection<E> elements) {
+      this.rand = rand;
+      this.elements = new ArrayList<>(elements);
+      this.size = elements.size();
+    }
+
+    @Override
+    public boolean hasNext() {
+      return size > 0;
+    }
+
+    @Override
+    public E next() {
+      int idx = rand.nextInt(size);
+      E e1 = elements.get(idx);
+      E e2 = elements.get(size-1);
+      elements.set(idx,e2);
+      size--;
+      return e1;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0d290ae1/solr/core/src/test/org/apache/solr/servlet/HttpSolrCallGetCoreTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/servlet/HttpSolrCallGetCoreTest.java b/solr/core/src/test/org/apache/solr/servlet/HttpSolrCallGetCoreTest.java
new file mode 100644
index 0000000..bd851eb
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/servlet/HttpSolrCallGetCoreTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.servlet;
+
+import javax.servlet.ReadListener;
+import javax.servlet.ServletInputStream;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.WriteListener;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.AbstractDistribZkTestBase;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Response;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class HttpSolrCallGetCoreTest extends SolrCloudTestCase {
+  private static final String COLLECTION = "collection1";
+  private static final int NUM_SHARD = 3;
+  private static final int REPLICA_FACTOR = 2;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    configureCluster(1)
+        .addConfig("config", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
+        .configure();
+
+    CollectionAdminRequest
+        .createCollection(COLLECTION, "config", NUM_SHARD, REPLICA_FACTOR)
+        .setMaxShardsPerNode(NUM_SHARD * REPLICA_FACTOR)
+        .process(cluster.getSolrClient());
+    AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
+        false, true, 30);
+  }
+
+  @Test
+  public void test() throws Exception {
+    assertCoreChosen(NUM_SHARD, new TestRequest("/collection1/update"));
+    assertCoreChosen(NUM_SHARD, new TestRequest("/collection1/update/json"));
+    assertCoreChosen(NUM_SHARD * REPLICA_FACTOR, new TestRequest("/collection1/select"));
+  }
+
+  private void assertCoreChosen(int numCores, TestRequest testRequest) {
+    JettySolrRunner jettySolrRunner = cluster.getJettySolrRunner(0);
+    Set<String> coreNames = new HashSet<>();
+    SolrDispatchFilter dispatchFilter = jettySolrRunner.getSolrDispatchFilter();
+    for (int i = 0; i < NUM_SHARD * REPLICA_FACTOR * 20; i++) {
+      if (coreNames.size() == numCores) return;
+      HttpSolrCall httpSolrCall = new HttpSolrCall(dispatchFilter, dispatchFilter.getCores(), testRequest, new TestResponse(), false);
+      try {
+        httpSolrCall.init();
+      } catch (Exception e) {
+      } finally {
+        coreNames.add(httpSolrCall.core.getName());
+        httpSolrCall.destroy();
+      }
+    }
+    assertEquals(numCores, coreNames.size());
+  }
+
+  private static class TestResponse extends Response {
+
+    public TestResponse() {
+      super(null, null);
+    }
+
+    @Override
+    public ServletOutputStream getOutputStream() throws IOException {
+      return new ServletOutputStream() {
+        @Override
+        public boolean isReady() {
+          return true;
+        }
+
+        @Override
+        public void setWriteListener(WriteListener writeListener) {
+
+        }
+
+        @Override
+        public void write(int b) throws IOException {
+
+        }
+      };
+    }
+
+    @Override
+    public boolean isCommitted() {
+      return true;
+    }
+  }
+
+  private static class TestRequest extends Request {
+    private String path;
+
+    public TestRequest(String path) {
+      super(null, null);
+      this.path = path;
+    }
+
+    @Override
+    public String getQueryString() {
+      return "wt=json&version=2";
+    }
+
+    @Override
+    public String getContentType() {
+      return "application/json";
+    }
+
+    @Override
+    public String getServletPath() {
+      return path;
+    }
+
+    @Override
+    public String getRequestURI() {
+      return path;
+    }
+
+    @Override
+    public ServletInputStream getInputStream() throws IOException {
+      return new ServletInputStream() {
+        @Override
+        public boolean isFinished() {
+          return true;
+        }
+
+        @Override
+        public boolean isReady() {
+          return true;
+        }
+
+        @Override
+        public void setReadListener(ReadListener readListener) {
+
+        }
+
+        @Override
+        public int read() throws IOException {
+          return 0;
+        }
+      };
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0d290ae1/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
index 5207994..179b9d5 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
@@ -51,6 +51,8 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
   private final String name;
   private final Map<String, Slice> slices;
   private final Map<String, Slice> activeSlices;
+  private final Map<String, List<Replica>> nodeNameReplicas;
+  private final Map<String, List<Replica>> nodeNameLeaderReplicas;
   private final DocRouter router;
   private final String znode;
 
@@ -76,6 +78,8 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
 
     this.slices = slices;
     this.activeSlices = new HashMap<>();
+    this.nodeNameLeaderReplicas = new HashMap<>();
+    this.nodeNameReplicas = new HashMap<>();
     this.replicationFactor = (Integer) verifyProp(props, REPLICATION_FACTOR);
     this.maxShardsPerNode = (Integer) verifyProp(props, MAX_SHARDS_PER_NODE);
     Boolean autoAddReplicas = (Boolean) verifyProp(props, AUTO_ADD_REPLICAS);
@@ -86,14 +90,36 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
 
     while (iter.hasNext()) {
       Map.Entry<String, Slice> slice = iter.next();
-      if (slice.getValue().getState() == Slice.State.ACTIVE)
+      if (slice.getValue().getState() == Slice.State.ACTIVE) {
         this.activeSlices.put(slice.getKey(), slice.getValue());
+      }
+      for (Replica replica : slice.getValue()) {
+        addNodeNameReplica(replica);
+      }
     }
     this.router = router;
     this.znode = znode == null? ZkStateReader.CLUSTER_STATE : znode;
     assert name != null && slices != null;
   }
 
+  private void addNodeNameReplica(Replica replica) {
+    List<Replica> replicas = nodeNameReplicas.get(replica.getNodeName());
+    if (replicas == null) {
+      replicas = new ArrayList<>();
+      nodeNameReplicas.put(replica.getNodeName(), replicas);
+    }
+    replicas.add(replica);
+
+    if (replica.getStr(Slice.LEADER) != null) {
+      List<Replica> leaderReplicas = nodeNameLeaderReplicas.get(replica.getNodeName());
+      if (leaderReplicas == null) {
+        leaderReplicas = new ArrayList<>();
+        nodeNameLeaderReplicas.put(replica.getNodeName(), leaderReplicas);
+      }
+      leaderReplicas.add(replica);
+    }
+  }
+
   public static Object verifyProp(Map<String, Object> props, String propName) {
     Object o = props.get(propName);
     if (o == null) return null;
@@ -160,6 +186,20 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
     return activeSlices;
   }
 
+  /**
+   * Get the list of replicas hosted on the given node or <code>null</code> if none.
+   */
+  public List<Replica> getReplicas(String nodeName) {
+    return nodeNameReplicas.get(nodeName);
+  }
+
+  /**
+   * Get the list of all leaders hosted on the given node or <code>null</code> if none.
+   */
+  public List<Replica> getLeaderReplicas(String nodeName) {
+    return nodeNameLeaderReplicas.get(nodeName);
+  }
+
   public int getZNodeVersion(){
     return znodeVersion;
   }