You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ro...@apache.org on 2016/06/03 14:53:47 UTC

lucene-solr:branch_6x: SOLR-9140: Replace some zk state polling with CollectionStateWatchers

Repository: lucene-solr
Updated Branches:
  refs/heads/branch_6x 0cfe43164 -> 9f299bb6a


SOLR-9140: Replace some zk state polling with CollectionStateWatchers


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/9f299bb6
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/9f299bb6
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/9f299bb6

Branch: refs/heads/branch_6x
Commit: 9f299bb6ad39960469e297b0b364f5972e485621
Parents: 0cfe431
Author: Alan Woodward <ro...@apache.org>
Authored: Fri Jun 3 15:05:20 2016 +0100
Committer: Alan Woodward <ro...@apache.org>
Committed: Fri Jun 3 15:52:41 2016 +0100

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   3 +
 .../org/apache/solr/cloud/ZkController.java     | 124 ++++++++-----------
 .../org/apache/solr/core/CoreContainer.java     |  16 ++-
 .../solrj/request/CollectionAdminRequest.java   |   2 +-
 .../apache/solr/common/cloud/DocCollection.java |  23 ++++
 .../cloud/TestCollectionStateWatchers.java      |   5 +-
 6 files changed, 95 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9f299bb6/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 7299c7c..d96ecd9 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -246,6 +246,9 @@ Optimizations
 
 * SOLR-8744: Overseer operations performed with fine grained mutual exclusion (noble, Scott Blum)
 
+* SOLR-9140: Replace zk polling in ZkController with CollectionStateWatchers
+  (Alan Woodward)
+
 Other Changes
 ----------------------
 * SOLR-7516: Improve javadocs for JavaBinCodec, ObjectResolver and enforce the single-usage policy.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9f299bb6/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index b36e766..64fa54b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -27,6 +27,7 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.util.*;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -682,35 +683,23 @@ public final class ZkController {
       InterruptedException {
 
     publishNodeAsDown(getNodeName());
-    
-    // now wait till the updates are in our state
-    long now = System.nanoTime();
-    long timeout = now + TimeUnit.NANOSECONDS.convert(WAIT_DOWN_STATES_TIMEOUT_SECONDS, TimeUnit.SECONDS);
-    boolean foundStates = true;
-
-    while (System.nanoTime() < timeout) {
-      ClusterState clusterState = zkStateReader.getClusterState();
-      Map<String, DocCollection> collections = clusterState.getCollectionsMap();
-      for (Map.Entry<String, DocCollection> entry : collections.entrySet()) {
-        DocCollection collection = entry.getValue();
-        Collection<Slice> slices = collection.getSlices();
-        for (Slice slice : slices) {
-          Collection<Replica> replicas = slice.getReplicas();
-          for (Replica replica : replicas) {
-            if (getNodeName().equals(replica.getNodeName()) && replica.getState() != Replica.State.DOWN) {
-              foundStates = false;
-            }
-          }
-        }
-      }
 
-      if (foundStates) {
-        Thread.sleep(1000);
-        break;
-      }
-      Thread.sleep(1000);
+    Set<String> collections = cc.getLocalCollections();
+    CountDownLatch latch = new CountDownLatch(collections.size());
+
+    for (String collection : collections) {
+      zkStateReader.registerCollectionStateWatcher(collection, (nodes, state) -> {
+        for (Replica replica : state.getReplicasOnNode(getNodeName())) {
+          if (replica.getState() != Replica.State.DOWN)
+            return false;
+        }
+        latch.countDown();
+        return true;
+      });
     }
-    if (!foundStates) {
+
+    if (latch.await(WAIT_DOWN_STATES_TIMEOUT_SECONDS, TimeUnit.SECONDS) == false) {
+      // TODO should we abort here?
       log.warn("Timed out waiting to see all nodes published as DOWN in our cluster state.");
     }
 
@@ -1366,7 +1355,7 @@ public final class ZkController {
     return zkStateReader;
   }
 
-  private void doGetShardIdAndNodeNameProcess(CoreDescriptor cd) {
+  private void doGetShardIdAndNodeNameProcess(CoreDescriptor cd) throws InterruptedException {
     final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
 
     if (coreNodeName != null) {
@@ -1378,58 +1367,45 @@ public final class ZkController {
     }
   }
 
-  private void waitForCoreNodeName(CoreDescriptor descriptor) {
-    int retryCount = 320;
-    log.info("look for our core node name");
-    while (retryCount-- > 0) {
-      Map<String, Slice> slicesMap = zkStateReader.getClusterState()
-          .getSlicesMap(descriptor.getCloudDescriptor().getCollectionName());
-      if (slicesMap != null) {
-
-        for (Slice slice : slicesMap.values()) {
-          for (Replica replica : slice.getReplicas()) {
-            // TODO: for really large clusters, we could 'index' on this
-
-            String nodeName = replica.getStr(ZkStateReader.NODE_NAME_PROP);
-            String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
-
-            String msgNodeName = getNodeName();
-            String msgCore = descriptor.getName();
-
-            if (msgNodeName.equals(nodeName) && core.equals(msgCore)) {
-              descriptor.getCloudDescriptor()
-                  .setCoreNodeName(replica.getName());
-              return;
-            }
+  private void waitForCoreNodeName(CoreDescriptor descriptor) throws InterruptedException {
+    log.info("Waiting for coreNodeName for core {} in collection {} to be assigned",
+        descriptor.getName(), descriptor.getCollectionName());
+    final String thisNode = getNodeName();
+    try {
+      zkStateReader.waitForState(descriptor.getCollectionName(), 320, TimeUnit.SECONDS, (n, c) -> {
+        if (c == null)
+          return false;
+        for (Replica replica : c.getReplicasOnNode(thisNode)) {
+          if (descriptor.getName().equals(replica.getCoreName())) {
+            descriptor.getCloudDescriptor().setCoreNodeName(replica.getName());
+            return true;
           }
         }
-      }
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
+        return false;
+      });
+    } catch (TimeoutException e) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out getting coreNodeName for " + descriptor.getName());
     }
   }
 
-  private void waitForShardId(CoreDescriptor cd) {
+  private void waitForShardId(CoreDescriptor cd) throws InterruptedException {
     log.info("waiting to find shard id in clusterstate for " + cd.getName());
-    int retryCount = 320;
-    while (retryCount-- > 0) {
-      final String shardId = zkStateReader.getClusterState().getShardId(cd.getCollectionName(), getNodeName(), cd.getName());
-      if (shardId != null) {
-        cd.getCloudDescriptor().setShardId(shardId);
-        return;
-      }
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
+    final String thisNode = getNodeName();
+    try {
+      zkStateReader.waitForState(cd.getCollectionName(), 320, TimeUnit.SECONDS, (n, c) -> {
+        if (c == null)
+          return false;
+        String shardId = c.getShardId(thisNode, cd.getName());
+        if (shardId != null) {
+          cd.getCloudDescriptor().setShardId(shardId);
+          return true;
+        }
+        return false;
+      });
+    }
+    catch (TimeoutException e) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out getting shard id for core: " + cd.getName());
     }
-
-    throw new SolrException(ErrorCode.SERVER_ERROR,
-        "Could not get shard id for core: " + cd.getName());
   }
 
 
@@ -1443,7 +1419,7 @@ public final class ZkController {
     return coreNodeName;
   }
 
-  public void preRegister(CoreDescriptor cd) {
+  public void preRegister(CoreDescriptor cd) throws InterruptedException {
 
     String coreNodeName = getCoreNodeName(cd);
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9f299bb6/solr/core/src/java/org/apache/solr/core/CoreContainer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index b55cc55..f291bae 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -26,11 +26,12 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.Callable;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
@@ -41,6 +42,7 @@ import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.IOUtils;
@@ -820,6 +822,7 @@ public class CoreContainer {
 
       return core;
     } catch (Exception e) {
+      SolrZkClient.checkInterrupted(e);
       coreInitFailures.put(dcore.getName(), new CoreLoadFailure(dcore, e));
       log.error("Error creating core [{}]: {}", dcore.getName(), e.getMessage(), e);
       final SolrException solrException = new SolrException(ErrorCode.SERVER_ERROR, "Unable to create core [" + dcore.getName() + "]", e);
@@ -870,6 +873,17 @@ public class CoreContainer {
   }
 
   /**
+   * @return a Set containing the names of all collections with a core hosted in this container
+   */
+  public Set<String> getLocalCollections() {
+    Set<String> collections = getCoreDescriptors().stream()
+        .filter(cd -> cd.getCollectionName() != null)
+        .map(CoreDescriptor::getCollectionName)
+        .collect(Collectors.toSet());
+    return collections;
+  }
+
+  /**
    * Returns an immutable Map of Exceptions that occured when initializing 
    * SolrCores (either at startup, or do to runtime requests to create cores) 
    * keyed off of the name (String) of the SolrCore that had the Exception 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9f299bb6/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index 452c7a1..2307463 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -1104,7 +1104,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
           deleteAsyncId(requestId).process(client);
           return state;
         }
-        TimeUnit.SECONDS.sleep(1);
+        TimeUnit.MILLISECONDS.sleep(100);
       }
       return state;
     }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9f299bb6/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
index 5504a8b..9848e65 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
@@ -259,4 +260,26 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
     }
     return replicas;
   }
+
+  /**
+   * Get all the replicas on a particular node
+   */
+  public List<Replica> getReplicasOnNode(String nodeName) {
+    return getReplicas().stream()
+        .filter(replica -> replica.getNodeName().equals(nodeName))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Get the shardId of a core on a specific node
+   */
+  public String getShardId(String nodeName, String coreName) {
+    for (Slice slice : this) {
+      for (Replica replica : slice) {
+        if (Objects.equals(replica.getNodeName(), nodeName) && Objects.equals(replica.getCoreName(), coreName))
+          return slice.getName();
+      }
+    }
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9f299bb6/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
index 2b2e181..8716dbe 100644
--- a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
+++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
@@ -51,8 +51,9 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
   @BeforeClass
   public static void startCluster() throws Exception {
     configureCluster(CLUSTER_SIZE)
-        .addConfig("config", getFile("solrj/solr/collection1/conf").toPath())
+        .addConfig("config", getFile("solrj/solr/configsets/streaming/conf").toPath())
         .configure();
+    cluster.getSolrClient().connect();
   }
 
   @AfterClass
@@ -259,7 +260,7 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
 
     final CloudSolrClient client = cluster.getSolrClient();
 
-    Future<Boolean> future = waitInBackground("stateformat1", 10, TimeUnit.SECONDS,
+    Future<Boolean> future = waitInBackground("stateformat1", 30, TimeUnit.SECONDS,
         (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
 
     CollectionAdminRequest.createCollection("stateformat1", "config", 1, 1).setStateFormat(1)