You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ho...@apache.org on 2016/06/06 21:24:42 UTC

lucene-solr:branch_6x: Revert "SOLR-9140: Replace some zk state polling with CollectionStateWatchers"

Repository: lucene-solr
Updated Branches:
  refs/heads/branch_6x 87d46225c -> 4e3884bec


Revert "SOLR-9140: Replace some zk state polling with CollectionStateWatchers"

Alan's comments (via Uwe) in SOLR-9140 jira comments suggest that he thought he had already
reverted this on both branches, but that is not the case.  Reverting on his behalf due to the
likelyhood that this is causing SOLR-9189.

Alan's comments regarding the master equivilent revert...

"There's still some places where notifications can be missed, so I'm reverting
this until those are fixed."

This reverts commit 9f299bb6ad39960469e297b0b364f5972e485621.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/4e3884be
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/4e3884be
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/4e3884be

Branch: refs/heads/branch_6x
Commit: 4e3884bec7c386fe718abc423b2381b68aaf1a97
Parents: 87d4622
Author: Chris Hostetter <ho...@apache.org>
Authored: Mon Jun 6 14:13:59 2016 -0700
Committer: Chris Hostetter <ho...@apache.org>
Committed: Mon Jun 6 14:13:59 2016 -0700

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   3 -
 .../org/apache/solr/cloud/ZkController.java     | 124 +++++++++++--------
 .../org/apache/solr/core/CoreContainer.java     |  16 +--
 .../solrj/request/CollectionAdminRequest.java   |   2 +-
 .../apache/solr/common/cloud/DocCollection.java |  23 ----
 .../cloud/TestCollectionStateWatchers.java      |   5 +-
 6 files changed, 78 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4e3884be/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 6f778ea..74331b0 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -252,9 +252,6 @@ Optimizations
 
 * SOLR-8744: Overseer operations performed with fine grained mutual exclusion (noble, Scott Blum)
 
-* SOLR-9140: Replace zk polling in ZkController with CollectionStateWatchers
-  (Alan Woodward)
-
 Other Changes
 ----------------------
 * SOLR-7516: Improve javadocs for JavaBinCodec, ObjectResolver and enforce the single-usage policy.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4e3884be/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 64fa54b..b36e766 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -27,7 +27,6 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.util.*;
 import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -683,23 +682,35 @@ public final class ZkController {
       InterruptedException {
 
     publishNodeAsDown(getNodeName());
-
-    Set<String> collections = cc.getLocalCollections();
-    CountDownLatch latch = new CountDownLatch(collections.size());
-
-    for (String collection : collections) {
-      zkStateReader.registerCollectionStateWatcher(collection, (nodes, state) -> {
-        for (Replica replica : state.getReplicasOnNode(getNodeName())) {
-          if (replica.getState() != Replica.State.DOWN)
-            return false;
+    
+    // now wait till the updates are in our state
+    long now = System.nanoTime();
+    long timeout = now + TimeUnit.NANOSECONDS.convert(WAIT_DOWN_STATES_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    boolean foundStates = true;
+
+    while (System.nanoTime() < timeout) {
+      ClusterState clusterState = zkStateReader.getClusterState();
+      Map<String, DocCollection> collections = clusterState.getCollectionsMap();
+      for (Map.Entry<String, DocCollection> entry : collections.entrySet()) {
+        DocCollection collection = entry.getValue();
+        Collection<Slice> slices = collection.getSlices();
+        for (Slice slice : slices) {
+          Collection<Replica> replicas = slice.getReplicas();
+          for (Replica replica : replicas) {
+            if (getNodeName().equals(replica.getNodeName()) && replica.getState() != Replica.State.DOWN) {
+              foundStates = false;
+            }
+          }
         }
-        latch.countDown();
-        return true;
-      });
-    }
+      }
 
-    if (latch.await(WAIT_DOWN_STATES_TIMEOUT_SECONDS, TimeUnit.SECONDS) == false) {
-      // TODO should we abort here?
+      if (foundStates) {
+        Thread.sleep(1000);
+        break;
+      }
+      Thread.sleep(1000);
+    }
+    if (!foundStates) {
       log.warn("Timed out waiting to see all nodes published as DOWN in our cluster state.");
     }
 
@@ -1355,7 +1366,7 @@ public final class ZkController {
     return zkStateReader;
   }
 
-  private void doGetShardIdAndNodeNameProcess(CoreDescriptor cd) throws InterruptedException {
+  private void doGetShardIdAndNodeNameProcess(CoreDescriptor cd) {
     final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
 
     if (coreNodeName != null) {
@@ -1367,45 +1378,58 @@ public final class ZkController {
     }
   }
 
-  private void waitForCoreNodeName(CoreDescriptor descriptor) throws InterruptedException {
-    log.info("Waiting for coreNodeName for core {} in collection {} to be assigned",
-        descriptor.getName(), descriptor.getCollectionName());
-    final String thisNode = getNodeName();
-    try {
-      zkStateReader.waitForState(descriptor.getCollectionName(), 320, TimeUnit.SECONDS, (n, c) -> {
-        if (c == null)
-          return false;
-        for (Replica replica : c.getReplicasOnNode(thisNode)) {
-          if (descriptor.getName().equals(replica.getCoreName())) {
-            descriptor.getCloudDescriptor().setCoreNodeName(replica.getName());
-            return true;
+  private void waitForCoreNodeName(CoreDescriptor descriptor) {
+    int retryCount = 320;
+    log.info("look for our core node name");
+    while (retryCount-- > 0) {
+      Map<String, Slice> slicesMap = zkStateReader.getClusterState()
+          .getSlicesMap(descriptor.getCloudDescriptor().getCollectionName());
+      if (slicesMap != null) {
+
+        for (Slice slice : slicesMap.values()) {
+          for (Replica replica : slice.getReplicas()) {
+            // TODO: for really large clusters, we could 'index' on this
+
+            String nodeName = replica.getStr(ZkStateReader.NODE_NAME_PROP);
+            String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
+
+            String msgNodeName = getNodeName();
+            String msgCore = descriptor.getName();
+
+            if (msgNodeName.equals(nodeName) && core.equals(msgCore)) {
+              descriptor.getCloudDescriptor()
+                  .setCoreNodeName(replica.getName());
+              return;
+            }
           }
         }
-        return false;
-      });
-    } catch (TimeoutException e) {
-      throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out getting coreNodeName for " + descriptor.getName());
+      }
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
     }
   }
 
-  private void waitForShardId(CoreDescriptor cd) throws InterruptedException {
+  private void waitForShardId(CoreDescriptor cd) {
     log.info("waiting to find shard id in clusterstate for " + cd.getName());
-    final String thisNode = getNodeName();
-    try {
-      zkStateReader.waitForState(cd.getCollectionName(), 320, TimeUnit.SECONDS, (n, c) -> {
-        if (c == null)
-          return false;
-        String shardId = c.getShardId(thisNode, cd.getName());
-        if (shardId != null) {
-          cd.getCloudDescriptor().setShardId(shardId);
-          return true;
-        }
-        return false;
-      });
-    }
-    catch (TimeoutException e) {
-      throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out getting shard id for core: " + cd.getName());
+    int retryCount = 320;
+    while (retryCount-- > 0) {
+      final String shardId = zkStateReader.getClusterState().getShardId(cd.getCollectionName(), getNodeName(), cd.getName());
+      if (shardId != null) {
+        cd.getCloudDescriptor().setShardId(shardId);
+        return;
+      }
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
     }
+
+    throw new SolrException(ErrorCode.SERVER_ERROR,
+        "Could not get shard id for core: " + cd.getName());
   }
 
 
@@ -1419,7 +1443,7 @@ public final class ZkController {
     return coreNodeName;
   }
 
-  public void preRegister(CoreDescriptor cd) throws InterruptedException {
+  public void preRegister(CoreDescriptor cd) {
 
     String coreNodeName = getCoreNodeName(cd);
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4e3884be/solr/core/src/java/org/apache/solr/core/CoreContainer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index f291bae..b55cc55 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -26,12 +26,11 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
@@ -42,7 +41,6 @@ import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.IOUtils;
@@ -822,7 +820,6 @@ public class CoreContainer {
 
       return core;
     } catch (Exception e) {
-      SolrZkClient.checkInterrupted(e);
       coreInitFailures.put(dcore.getName(), new CoreLoadFailure(dcore, e));
       log.error("Error creating core [{}]: {}", dcore.getName(), e.getMessage(), e);
       final SolrException solrException = new SolrException(ErrorCode.SERVER_ERROR, "Unable to create core [" + dcore.getName() + "]", e);
@@ -873,17 +870,6 @@ public class CoreContainer {
   }
 
   /**
-   * @return a Set containing the names of all collections with a core hosted in this container
-   */
-  public Set<String> getLocalCollections() {
-    Set<String> collections = getCoreDescriptors().stream()
-        .filter(cd -> cd.getCollectionName() != null)
-        .map(CoreDescriptor::getCollectionName)
-        .collect(Collectors.toSet());
-    return collections;
-  }
-
-  /**
    * Returns an immutable Map of Exceptions that occured when initializing 
    * SolrCores (either at startup, or do to runtime requests to create cores) 
    * keyed off of the name (String) of the SolrCore that had the Exception 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4e3884be/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index 2307463..452c7a1 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -1104,7 +1104,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
           deleteAsyncId(requestId).process(client);
           return state;
         }
-        TimeUnit.MILLISECONDS.sleep(100);
+        TimeUnit.SECONDS.sleep(1);
       }
       return state;
     }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4e3884be/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
index 9848e65..5504a8b 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
@@ -260,26 +259,4 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
     }
     return replicas;
   }
-
-  /**
-   * Get all the replicas on a particular node
-   */
-  public List<Replica> getReplicasOnNode(String nodeName) {
-    return getReplicas().stream()
-        .filter(replica -> replica.getNodeName().equals(nodeName))
-        .collect(Collectors.toList());
-  }
-
-  /**
-   * Get the shardId of a core on a specific node
-   */
-  public String getShardId(String nodeName, String coreName) {
-    for (Slice slice : this) {
-      for (Replica replica : slice) {
-        if (Objects.equals(replica.getNodeName(), nodeName) && Objects.equals(replica.getCoreName(), coreName))
-          return slice.getName();
-      }
-    }
-    return null;
-  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/4e3884be/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
index 8716dbe..2b2e181 100644
--- a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
+++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
@@ -51,9 +51,8 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
   @BeforeClass
   public static void startCluster() throws Exception {
     configureCluster(CLUSTER_SIZE)
-        .addConfig("config", getFile("solrj/solr/configsets/streaming/conf").toPath())
+        .addConfig("config", getFile("solrj/solr/collection1/conf").toPath())
         .configure();
-    cluster.getSolrClient().connect();
   }
 
   @AfterClass
@@ -260,7 +259,7 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
 
     final CloudSolrClient client = cluster.getSolrClient();
 
-    Future<Boolean> future = waitInBackground("stateformat1", 30, TimeUnit.SECONDS,
+    Future<Boolean> future = waitInBackground("stateformat1", 10, TimeUnit.SECONDS,
         (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
 
     CollectionAdminRequest.createCollection("stateformat1", "config", 1, 1).setStateFormat(1)