You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2013/02/16 17:59:54 UTC

svn commit: r1446914 - in /lucene/dev/trunk/solr: CHANGES.txt core/src/java/org/apache/solr/cloud/ZkController.java core/src/java/org/apache/solr/core/CoreContainer.java test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java

Author: markrmiller
Date: Sat Feb 16 16:59:53 2013
New Revision: 1446914

URL: http://svn.apache.org/r1446914
Log:
SOLR-4421,SOLR-4165: On CoreContainer shutdown, all SolrCores should publish their state as DOWN.

Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java
    lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1446914&r1=1446913&r2=1446914&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Sat Feb 16 16:59:53 2013
@@ -129,6 +129,9 @@ Bug Fixes
   kick in when using NRTCachingDirectory or the rate limiting feature.
   (Mark Miller)
 
+* SOLR-4421,SOLR-4165: On CoreContainer shutdown, all SolrCores should publish their 
+  state as DOWN. (Mark Miller, Markus Jelsma)
+
 Optimizations
 ----------------------
 
@@ -161,9 +164,6 @@ Other Changes
 
 * SOLR-4384: Make post.jar report timing information (Upayavira via janhoy)
 
-* SOLR-4421: On CoreContainer shutdown, all SolrCores should publish their 
-  state as DOWN. (Mark Miller)
-
 ==================  4.1.0 ==================
 
 Versions of Major Components

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1446914&r1=1446913&r2=1446914&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java Sat Feb 16 16:59:53 2013
@@ -480,9 +480,14 @@ public final class ZkController {
   }
 
   private void init(CurrentCoreDescriptorProvider registerOnReconnect) {
-    boolean alreadyCreatedZkReader = false;
+
     try {
-      alreadyCreatedZkReader = publishAndWaitForDownStates(alreadyCreatedZkReader);
+      boolean createdWatchesAndUpdated = false;
+      if (zkClient.exists(ZkStateReader.LIVE_NODES_ZKNODE, true)) {
+        zkStateReader.createClusterStateWatchersAndUpdate();
+        createdWatchesAndUpdated = true;
+        publishAndWaitForDownStates();
+      }
       
       // makes nodes zkNode
       cmdExecutor.ensureExists(ZkStateReader.LIVE_NODES_ZKNODE, zkClient);
@@ -501,7 +506,7 @@ public final class ZkController {
       overseerElector.setup(context);
       overseerElector.joinElection(context, false);
       
-      if (!alreadyCreatedZkReader) {
+      if (!createdWatchesAndUpdated) {
         zkStateReader.createClusterStateWatchersAndUpdate();
       }
       
@@ -523,93 +528,92 @@ public final class ZkController {
 
   }
 
-  private boolean publishAndWaitForDownStates(boolean alreadyCreatedZkReader)
-      throws KeeperException, InterruptedException {
-    if (zkClient.exists(ZkStateReader.LIVE_NODES_ZKNODE, true)) {
-      alreadyCreatedZkReader = true;
-      // try and publish anyone from our node as down
-      zkStateReader.createClusterStateWatchersAndUpdate();
-      ClusterState clusterState = zkStateReader.getClusterState();
-      Set<String> collections = clusterState.getCollections();
-      List<String> updatedNodes = new ArrayList<String>();
+  public void publishAndWaitForDownStates() throws KeeperException,
+      InterruptedException {
+    
+    ClusterState clusterState = zkStateReader.getClusterState();
+    Set<String> collections = clusterState.getCollections();
+    List<String> updatedNodes = new ArrayList<String>();
+    for (String collectionName : collections) {
+      DocCollection collection = clusterState.getCollection(collectionName);
+      Collection<Slice> slices = collection.getSlices();
+      for (Slice slice : slices) {
+        Collection<Replica> replicas = slice.getReplicas();
+        for (Replica replica : replicas) {
+          if (replica.getNodeName().equals(getNodeName())
+              && !(replica.getStr(ZkStateReader.STATE_PROP)
+                  .equals(ZkStateReader.DOWN))) {
+            ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
+                ZkStateReader.STATE_PROP, ZkStateReader.DOWN,
+                ZkStateReader.BASE_URL_PROP, getBaseUrl(),
+                ZkStateReader.CORE_NAME_PROP,
+                replica.getStr(ZkStateReader.CORE_NAME_PROP),
+                ZkStateReader.ROLES_PROP,
+                replica.getStr(ZkStateReader.ROLES_PROP),
+                ZkStateReader.NODE_NAME_PROP, getNodeName(),
+                ZkStateReader.SHARD_ID_PROP,
+                replica.getStr(ZkStateReader.SHARD_ID_PROP),
+                ZkStateReader.COLLECTION_PROP,
+                replica.getStr(ZkStateReader.COLLECTION_PROP));
+            updatedNodes.add(replica.getStr(ZkStateReader.CORE_NAME_PROP));
+            overseerJobQueue.offer(ZkStateReader.toJSON(m));
+          }
+        }
+      }
+    }
+    
+    // now wait till the updates are in our state
+    long now = System.currentTimeMillis();
+    long timeout = now + 1000 * 300;
+    boolean foundStates = false;
+    while (System.currentTimeMillis() < timeout) {
+      clusterState = zkStateReader.getClusterState();
+      collections = clusterState.getCollections();
       for (String collectionName : collections) {
         DocCollection collection = clusterState.getCollection(collectionName);
         Collection<Slice> slices = collection.getSlices();
         for (Slice slice : slices) {
           Collection<Replica> replicas = slice.getReplicas();
           for (Replica replica : replicas) {
-            if (replica.getNodeName().equals(getNodeName())
-                && !(replica.getStr(ZkStateReader.STATE_PROP)
-                    .equals(ZkStateReader.DOWN))) {
-              ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
-                  "state", ZkStateReader.STATE_PROP, ZkStateReader.DOWN,
-                  ZkStateReader.BASE_URL_PROP, getBaseUrl(),
-                  ZkStateReader.CORE_NAME_PROP, replica.getStr(ZkStateReader.CORE_NAME_PROP),
-                  ZkStateReader.ROLES_PROP,
-                  replica.getStr(ZkStateReader.ROLES_PROP),
-                  ZkStateReader.NODE_NAME_PROP, getNodeName(),
-                  ZkStateReader.SHARD_ID_PROP,
-                  replica.getStr(ZkStateReader.SHARD_ID_PROP),
-                  ZkStateReader.COLLECTION_PROP,
-                  replica.getStr(ZkStateReader.COLLECTION_PROP));
-              updatedNodes.add(replica.getStr(ZkStateReader.CORE_NAME_PROP));
-              overseerJobQueue.offer(ZkStateReader.toJSON(m));
+            if (replica.getStr(ZkStateReader.STATE_PROP).equals(
+                ZkStateReader.DOWN)) {
+              updatedNodes.remove(replica.getStr(ZkStateReader.CORE_NAME_PROP));
+              
             }
           }
         }
       }
       
-      // now wait till the updates are in our state
-      long now = System.currentTimeMillis();
-      long timeout = now + 1000 * 300;
-      boolean foundStates = false;
-      while (System.currentTimeMillis() < timeout) {
-        clusterState = zkStateReader.getClusterState();
-        collections = clusterState.getCollections();
-        for (String collectionName : collections) {
-          DocCollection collection = clusterState
-              .getCollection(collectionName);
-          Collection<Slice> slices = collection.getSlices();
-          for (Slice slice : slices) {
-            Collection<Replica> replicas = slice.getReplicas();
-            for (Replica replica : replicas) {
-              if (replica.getStr(ZkStateReader.STATE_PROP).equals(
-                  ZkStateReader.DOWN)) {
-                updatedNodes.remove(replica
-                    .getStr(ZkStateReader.CORE_NAME_PROP));
-                
-              }
-            }
-          }
-        }
-        
-        if (updatedNodes.size() == 0) {
-          foundStates = true;
-          break;
-        }
-      }
-      if (!foundStates) {
-        log.warn("Timed out waiting to see all nodes published as DOWN in our cluster state.");
+      if (updatedNodes.size() == 0) {
+        foundStates = true;
+        break;
       }
     }
-    return alreadyCreatedZkReader;
+    if (!foundStates) {
+      log.warn("Timed out waiting to see all nodes published as DOWN in our cluster state.");
+    }
+    
   }
-
+  
   /**
-   * Validates if the chroot exists in zk (or if it is successfully created). Optionally, if create is set to true this method will create the path
-   * in case it doesn't exist
-   * @return true if the path exists or is created
-   * false if the path doesn't exist and 'create' = false
+   * Validates if the chroot exists in zk (or if it is successfully created).
+   * Optionally, if create is set to true this method will create the path in
+   * case it doesn't exist
+   * 
+   * @return true if the path exists or is created false if the path doesn't
+   *         exist and 'create' = false
    */
-  public static boolean checkChrootPath(String zkHost, boolean create) throws KeeperException, InterruptedException {
-    if(!containsChroot(zkHost)) {
+  public static boolean checkChrootPath(String zkHost, boolean create)
+      throws KeeperException, InterruptedException {
+    if (!containsChroot(zkHost)) {
       return true;
     }
     log.info("zkHost includes chroot");
     String chrootPath = zkHost.substring(zkHost.indexOf("/"), zkHost.length());
-    SolrZkClient tmpClient = new SolrZkClient(zkHost.substring(0, zkHost.indexOf("/")), 60*1000);
+    SolrZkClient tmpClient = new SolrZkClient(zkHost.substring(0,
+        zkHost.indexOf("/")), 60 * 1000);
     boolean exists = tmpClient.exists(chrootPath, true);
-    if(!exists && create) {
+    if (!exists && create) {
       tmpClient.makePath(chrootPath, false, true);
       exists = true;
     }
@@ -617,7 +621,6 @@ public final class ZkController {
     return exists;
   }
 
-
   /**
    * Validates if zkHost contains a chroot. See http://zookeeper.apache.org/doc/r3.2.2/zookeeperProgrammers.html#ch_zkSessions
    */

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java?rev=1446914&r1=1446913&r2=1446914&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/core/CoreContainer.java Sat Feb 16 16:59:53 2013
@@ -740,12 +740,24 @@ public class CoreContainer 
   public void shutdown() {
     log.info("Shutting down CoreContainer instance="
         + System.identityHashCode(this));
+    
+    if (isZooKeeperAware()) {
+      try {
+        zkController.publishAndWaitForDownStates();
+      } catch (KeeperException e) {
+        log.error("", e);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        log.warn("", e);
+      }
+    }
+    
     isShutDown = true;
     
     if (isZooKeeperAware()) {
-      publishCoresAsDown();
       cancelCoreRecoveries();
     }
+    
     try {
       synchronized (cores) {
 
@@ -784,20 +796,6 @@ public class CoreContainer 
     }
   }
 
-  private void publishCoresAsDown() {
-    synchronized (cores) {
-      for (SolrCore core : cores.values()) {
-        try {
-          zkController.publish(core.getCoreDescriptor(), ZkStateReader.DOWN);
-        } catch (KeeperException e) {
-          log.error("", e);
-        } catch (InterruptedException e) {
-          log.error("", e);
-        }
-      }
-    }
-  }
-
   public void cancelCoreRecoveries() {
     ArrayList<SolrCoreState> coreStates = new ArrayList<SolrCoreState>();
     synchronized (cores) {

Modified: lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java?rev=1446914&r1=1446913&r2=1446914&view=diff
==============================================================================
--- lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java (original)
+++ lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractDistribZkTestBase.java Sat Feb 16 16:59:53 2013
@@ -207,7 +207,6 @@ public abstract class AbstractDistribZkT
     if (DEBUG) {
       printLayout();
     }
-    zkServer.shutdown();
     System.clearProperty("zkHost");
     System.clearProperty("collection");
     System.clearProperty("enable.update.log");
@@ -217,6 +216,7 @@ public abstract class AbstractDistribZkT
     System.clearProperty("solr.test.sys.prop2");
     resetExceptionIgnores();
     super.tearDown();
+    zkServer.shutdown();
   }
   
   protected void printLayout() throws Exception {