You are viewing a plain text version of this content. The canonical link for it is here.
Posted to solr-commits@lucene.apache.org by ma...@apache.org on 2010/02/01 16:40:47 UTC

svn commit: r905311 - in /lucene/solr/branches/cloud/src: java/org/apache/solr/cloud/CloudState.java java/org/apache/solr/cloud/ZkController.java test/org/apache/solr/cloud/CloudStateUpdateTest.java

Author: markrmiller
Date: Mon Feb  1 15:40:46 2010
New Revision: 905311

URL: http://svn.apache.org/viewvc?rev=905311&view=rev
Log:
all for only updating live nodes, add watch for live node changes, add simple test for live node additions and removals

Modified:
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudState.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
    lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudState.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudState.java?rev=905311&r1=905310&r2=905311&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudState.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/CloudState.java Mon Feb  1 15:40:46 2010
@@ -56,6 +56,10 @@
     return Collections.unmodifiableSet(collectionStates.keySet());
   }
   
+  public Map<String,Map<String,Slice>> getCollectionStates() {
+    return Collections.unmodifiableMap(collectionStates);
+  }
+  
   public Set<String> getLiveNodes() {
     return Collections.unmodifiableSet(liveNodes);
   }
@@ -64,28 +68,35 @@
     return liveNodes.contains(name);
   }
   
-  public static CloudState buildCloudState(SolrZkClient zkClient) throws KeeperException, InterruptedException, IOException {
+  public static CloudState buildCloudState(SolrZkClient zkClient, CloudState oldCloudState, boolean onlyLiveNodes) throws KeeperException, InterruptedException, IOException {
+    Map<String,Map<String,Slice>> collectionStates;
+    if (!onlyLiveNodes) {
+      List<String> collections = zkClient.getChildren(
+          ZkController.COLLECTIONS_ZKNODE, null);
+
+      collectionStates = new HashMap<String,Map<String,Slice>>();
+      for (String collection : collections) {
+        String shardIdPaths = ZkController.COLLECTIONS_ZKNODE + "/"
+            + collection + ZkController.SHARDS_ZKNODE;
+        List<String> shardIdNames;
+        try {
+          shardIdNames = zkClient.getChildren(shardIdPaths, null);
+        } catch (KeeperException.NoNodeException e) {
+          // node is not valid currently
+          continue;
+        }
+        Map<String,Slice> slices = new HashMap<String,Slice>();
+        for (String shardIdZkPath : shardIdNames) {
+          Map<String,ZkNodeProps> shardsMap = readShards(zkClient, shardIdPaths
+              + "/" + shardIdZkPath);
+          Slice slice = new Slice(shardIdZkPath, shardsMap);
+          slices.put(shardIdZkPath, slice);
+        }
+        collectionStates.put(collection, slices);
 
-    List<String> collections = zkClient.getChildren(ZkController.COLLECTIONS_ZKNODE, null);
-    
-    Map<String,Map<String,Slice>> collectionStates = new HashMap<String,Map<String,Slice>>();
-    for (String collection : collections) {
-      String shardIdPaths = ZkController.COLLECTIONS_ZKNODE + "/" + collection + ZkController.SHARDS_ZKNODE;
-      List<String> shardIdNames;
-      try {
-        shardIdNames = zkClient.getChildren(shardIdPaths, null);
-      } catch(KeeperException.NoNodeException e) {
-        // node is not valid currently
-        continue;
       }
-      Map<String,Slice> slices = new HashMap<String,Slice>();
-      for(String shardIdZkPath : shardIdNames) {
-        Map<String,ZkNodeProps> shardsMap = readShards(zkClient, shardIdPaths + "/" + shardIdZkPath);
-        Slice slice = new Slice(shardIdZkPath, shardsMap);
-        slices.put(shardIdZkPath, slice);
-      }
-      collectionStates.put(collection, slices);
-      
+    } else {
+      collectionStates = oldCloudState.getCollectionStates();
     }
     
     CloudState cloudInfo = new CloudState(getLiveNodes(zkClient), collectionStates);

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java?rev=905311&r1=905310&r2=905311&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java Mon Feb  1 15:40:46 2010
@@ -128,7 +128,7 @@
           public void command() {
             try {
 
-              createEphemeralNode();
+              createEphemeralLiveNode();
               // register cores in case any new cores came online will zk was down
               
               // coreContainer may currently be null in tests, so don't re-register
@@ -361,7 +361,7 @@
               "", e);
         }
       }
-      createEphemeralNode();
+      createEphemeralLiveNode();
       
       setUpCollectionsNode();
       
@@ -383,11 +383,41 @@
 
   }
 
-  private void createEphemeralNode() throws KeeperException,
+  private void createEphemeralLiveNode() throws KeeperException,
       InterruptedException {
     String nodeName = getNodeName();
     String nodePath = NODES_ZKNODE + "/" + nodeName;
     log.info("Register node as live in ZooKeeper:" + nodePath);
+    Watcher liveNodeWatcher = new Watcher() {
+
+      public void process(WatchedEvent event) {
+        try {
+          log.info("Updating live nodes:" + zkClient);
+          try {
+            updateLiveNodes();
+          } finally {
+            // remake watch
+            zkClient.getChildren(event.getPath(), this);
+          }
+        } catch (KeeperException e) {
+          log.error("", e);
+          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+              "", e);
+        } catch (InterruptedException e) {
+          // Restore the interrupted status
+          Thread.currentThread().interrupt();
+          log.error("", e);
+          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+              "", e);
+        } catch (IOException e) {
+          log.error("", e);
+          throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+              "", e);
+        }
+        
+      }
+      
+    };
     try {
       zkClient.makePath(nodePath, CreateMode.EPHEMERAL);
     } catch (KeeperException e) {
@@ -396,6 +426,7 @@
         throw e;
       }
     }
+    zkClient.getChildren(NODES_ZKNODE, liveNodeWatcher);
   }
   
   public String getNodeName() {
@@ -403,7 +434,19 @@
   }
 
   // load and publish a new CollectionInfo
-  public synchronized void updateCloudState(boolean immediate) throws KeeperException, InterruptedException,
+  public void updateCloudState(boolean immediate) throws KeeperException, InterruptedException,
+      IOException {
+    updateCloudState(immediate, false);
+  }
+  
+  // load and publish a new CollectionInfo
+  public void updateLiveNodes() throws KeeperException, InterruptedException,
+      IOException {
+    updateCloudState(true, true);
+  }
+  
+  // load and publish a new CollectionInfo
+  private synchronized void updateCloudState(boolean immediate, final boolean onlyLiveNodes) throws KeeperException, InterruptedException,
       IOException {
 
     // TODO: - incremental update rather than reread everything
@@ -411,9 +454,13 @@
     // build immutable CloudInfo
     
     if(immediate) {
-      log.info("Updating cloud state from ZooKeeper... :" + zkClient.keeper);
+      if(!onlyLiveNodes) {
+        log.info("Updating cloud state from ZooKeeper... ");
+      } else {
+        log.info("Updating live nodes from ZooKeeper... ");
+      }
       CloudState cloudState;
-      cloudState = CloudState.buildCloudState(zkClient);
+      cloudState = CloudState.buildCloudState(zkClient, this.cloudState, onlyLiveNodes);
       // update volatile
       this.cloudState = cloudState;
     } else {
@@ -431,7 +478,8 @@
             cloudStateUpdateScheduled = false;
             CloudState cloudState;
             try {
-              cloudState = CloudState.buildCloudState(zkClient);
+              cloudState = CloudState.buildCloudState(zkClient,
+                  ZkController.this.cloudState, onlyLiveNodes);
             } catch (KeeperException e) {
               log.error("", e);
               throw new ZooKeeperException(

Modified: lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java?rev=905311&r1=905310&r2=905311&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java (original)
+++ lucene/solr/branches/cloud/src/test/org/apache/solr/cloud/CloudStateUpdateTest.java Mon Feb  1 15:40:46 2010
@@ -29,6 +29,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ *
+ */
 public class CloudStateUpdateTest extends TestCase {
   protected static Logger log = LoggerFactory
       .getLogger(AbstractZkTestCase.class);
@@ -44,14 +47,12 @@
   protected String zkDir;
 
   private CoreContainer container1;
-
   private CoreContainer container2;
-  
   private CoreContainer container3;
 
   private File dataDir1;
-
   private File dataDir2;
+  private File dataDir3;
 
   public void setUp() throws Exception {
     try {
@@ -69,6 +70,9 @@
 
       dataDir2 = new File(tmpDir + File.separator + "data2");
       dataDir2.mkdirs();
+      
+      dataDir3 = new File(tmpDir + File.separator + "data3");
+      dataDir3.mkdirs();
 
       // set some system properties for use by tests
       System.setProperty("solr.test.sys.prop1", "propone");
@@ -94,7 +98,7 @@
       
       CoreContainer.Initializer init3 = new CoreContainer.Initializer() {
         {
-          this.dataDir = CloudStateUpdateTest.this.dataDir2.getAbsolutePath();
+          this.dataDir = CloudStateUpdateTest.this.dataDir3.getAbsolutePath();
           this.zkPortOverride = "8985";
         }
       };
@@ -149,8 +153,10 @@
     
     liveNodes = zkController2.getCloudState().getLiveNodes();
     
-    // nocommit - fix update cloud state when nodes removed
-    //assertEquals(2, liveNodes.size());
+    // slight pause for watch to trigger
+    Thread.sleep(500);
+    
+    assertEquals(2, liveNodes.size());
   }
 
   public void tearDown() throws Exception {