You are viewing a plain text version of this content. The canonical link for it is here.
Posted to solr-commits@lucene.apache.org by ma...@apache.org on 2010/02/12 18:32:24 UTC

svn commit: r909510 - in /lucene/solr/branches/cloud/src/java/org/apache/solr: cloud/ZkController.java cloud/ZkStateReader.java core/CoreContainer.java

Author: markrmiller
Date: Fri Feb 12 17:32:23 2010
New Revision: 909510

URL: http://svn.apache.org/viewvc?rev=909510&view=rev
Log:
factor out ZkStateReader

Added:
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkStateReader.java
Modified:
    lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
    lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java?rev=909510&r1=909509&r2=909510&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkController.java Fri Feb 12 17:32:23 2010
@@ -20,10 +20,12 @@
 import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
-import java.util.*;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.TimeoutException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -55,7 +57,6 @@
 
   static final String NEWL = System.getProperty("line.separator");
 
-  private static final long CLOUD_UPDATE_DELAY = Long.parseLong(System.getProperty("CLOUD_UPDATE_DELAY", "5000"));
 
   private final static Pattern URL_POST = Pattern.compile("https?://(.*)");
   private final static Pattern URL_PREFIX = Pattern.compile("(https?://).*");
@@ -73,8 +74,8 @@
   public final static String CONFIGNAME_PROP="configName";
 
   private SolrZkClient zkClient;
-
-  private volatile CloudState cloudState;
+  
+  private ZkStateReader zkStateReader;
 
   private String zkServerAddress;
 
@@ -85,9 +86,7 @@
 
   private String hostName;
   
-  private ScheduledExecutorService updateCloudExecutor = Executors.newScheduledThreadPool(1);
 
-  private boolean cloudStateUpdateScheduled;
 
   private boolean readonly;  // temporary hack to enable reuse in SolrJ client
 
@@ -110,7 +109,7 @@
     this.localHostContext = localHostContext;
     this.localHost = localHost;
     this.readonly = localHostPort==null;
-    cloudState = new CloudState(new HashSet<String>(0), new HashMap<String,Map<String,Slice>>(0));
+
     zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout, zkClientConnectTimeout,
         // on reconnect, reload cloud info
         new OnReconnect() {
@@ -119,7 +118,7 @@
             try {
               // nocommit: recreate watches ????
               createEphemeralLiveNode();
-              updateCloudState(false);
+              zkStateReader.updateCloudState(false);
             } catch (KeeperException e) {
               log.error("", e);
               throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
@@ -138,7 +137,7 @@
 
           }
         });
-    
+    zkStateReader = new ZkStateReader(zkClient);
     init();
   }
 
@@ -209,7 +208,7 @@
    * @return information about the cluster from ZooKeeper
    */
   public CloudState getCloudState() {
-    return cloudState;
+    return zkStateReader.getCloudState();
   }
 
   /**
@@ -323,7 +322,7 @@
         try {
           log.info("Updating live nodes:" + zkClient);
           try {
-            updateLiveNodes();
+            zkStateReader.updateLiveNodes();
           } finally {
             // remake watch
             zkClient.getChildren(event.getPath(), this);
@@ -382,76 +381,7 @@
     return hostName + ":" + localHostPort + "_"+ localHostContext;
   }
 
-  // load and publish a new CollectionInfo
-  public void updateCloudState(boolean immediate) throws KeeperException, InterruptedException,
-      IOException {
-    updateCloudState(immediate, false);
-  }
-  
-  // load and publish a new CollectionInfo
-  private void updateLiveNodes() throws KeeperException, InterruptedException,
-      IOException {
-    updateCloudState(true, true);
-  }
-  
-  // load and publish a new CollectionInfo
-  private synchronized void updateCloudState(boolean immediate, final boolean onlyLiveNodes) throws KeeperException, InterruptedException,
-      IOException {
 
-    // TODO: - incremental update rather than reread everything
-    
-    // build immutable CloudInfo
-    
-    if(immediate) {
-      if(!onlyLiveNodes) {
-        log.info("Updating cloud state from ZooKeeper... ");
-      } else {
-        log.info("Updating live nodes from ZooKeeper... ");
-      }
-      CloudState cloudState;
-      cloudState = CloudState.buildCloudState(zkClient, this.cloudState, onlyLiveNodes);
-      // update volatile
-      this.cloudState = cloudState;
-    } else {
-      if(cloudStateUpdateScheduled) {
-        log.info("Cloud state update for ZooKeeper already scheduled");
-        return;
-      }
-      log.info("Scheduling cloud state update from ZooKeeper...");
-      cloudStateUpdateScheduled = true;
-      updateCloudExecutor.schedule(new Runnable() {
-        
-        public void run() {
-          log.info("Updating cloud state from ZooKeeper...");
-          synchronized (ZkController.this) {
-            cloudStateUpdateScheduled = false;
-            CloudState cloudState;
-            try {
-              cloudState = CloudState.buildCloudState(zkClient,
-                  ZkController.this.cloudState, onlyLiveNodes);
-            } catch (KeeperException e) {
-              log.error("", e);
-              throw new ZooKeeperException(
-                  SolrException.ErrorCode.SERVER_ERROR, "", e);
-            } catch (InterruptedException e) {
-              // Restore the interrupted status
-              Thread.currentThread().interrupt();
-              log.error("", e);
-              throw new ZooKeeperException(
-                  SolrException.ErrorCode.SERVER_ERROR, "", e);
-            } catch (IOException e) {
-              log.error("", e);
-              throw new ZooKeeperException(
-                  SolrException.ErrorCode.SERVER_ERROR, "", e);
-            }
-            // update volatile
-            ZkController.this.cloudState = cloudState;
-          }
-        }
-      }, CLOUD_UPDATE_DELAY, TimeUnit.MILLISECONDS);
-    }
-
-  }
 
   /**
    * @param path
@@ -634,9 +564,9 @@
       public void process(WatchedEvent event) {
           try {
             log.info("Detected a new or removed collection");
-            synchronized (ZkController.this) {
+            synchronized (zkStateReader.getUpdateLock()) {
               addShardZkNodeWatches();
-              updateCloudState(false);
+              zkStateReader.updateCloudState(false);
             }
             // re-watch
             zkClient.getChildren(event.getPath(), this);
@@ -666,9 +596,9 @@
         }
         log.info("Notified of CloudState change");
         try {
-          synchronized (ZkController.this) {
+          synchronized (zkStateReader.getUpdateLock()) {
             addShardZkNodeWatches();
-            updateCloudState(false);
+            zkStateReader.updateCloudState(false);
           }
           zkClient.exists(COLLECTIONS_ZKNODE, this);
         } catch (KeeperException e) {
@@ -704,7 +634,7 @@
             log.info("Detected changed ShardId in collection:" + collection);
             try {
               addShardsWatches(collection);
-              updateCloudState(false);
+              zkStateReader.updateCloudState(false);
             } catch (KeeperException e) {
               log.error("", e);
               throw new ZooKeeperException(
@@ -768,7 +698,7 @@
             public void process(WatchedEvent event) {
               log.info("Detected a shard change under ShardId:" + shardId + " in collection:" + collection);
               try {
-                updateCloudState(false);
+                zkStateReader.updateCloudState(false);
               } catch (KeeperException e) {
                 log.error("", e);
                 throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
@@ -889,5 +819,9 @@
     }
     
   }
+  
+  public ZkStateReader getZkStateReader() {
+    return zkStateReader;
+  }
 
 }

Added: lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkStateReader.java?rev=909510&view=auto
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkStateReader.java (added)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/cloud/ZkStateReader.java Fri Feb 12 17:32:23 2010
@@ -0,0 +1,162 @@
+package org.apache.solr.cloud;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.solr.cloud.SolrZkClient.OnReconnect;
+import org.apache.solr.common.SolrException;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZkStateReader {
+  private static Logger log = LoggerFactory.getLogger(ZkStateReader.class);
+  
+  private volatile CloudState cloudState  = new CloudState(new HashSet<String>(0), new HashMap<String,Map<String,Slice>>(0));
+  
+  private static final long CLOUD_UPDATE_DELAY = Long.parseLong(System.getProperty("CLOUD_UPDATE_DELAY", "5000"));
+
+  private ScheduledExecutorService updateCloudExecutor = Executors.newScheduledThreadPool(1);
+
+  private boolean cloudStateUpdateScheduled;
+
+  private SolrZkClient zkClient;
+  
+  public ZkStateReader(SolrZkClient zkClient) {
+    this.zkClient = zkClient;
+  }
+  
+  public ZkStateReader(String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout) throws InterruptedException, TimeoutException, IOException {
+    zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout, zkClientConnectTimeout,
+        // on reconnect, reload cloud info
+        new OnReconnect() {
+
+          public void command() {
+            try {
+              // nocommit: recreate watches ????
+              updateCloudState(false);
+            } catch (KeeperException e) {
+              log.error("", e);
+              throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                  "", e);
+            } catch (InterruptedException e) {
+              // Restore the interrupted status
+              Thread.currentThread().interrupt();
+              log.error("", e);
+              throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                  "", e);
+            } catch (IOException e) {
+              log.error("", e);
+              throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+                  "", e);
+            }
+
+          }
+        });
+  }
+  
+  // load and publish a new CollectionInfo
+  public void updateCloudState(boolean immediate) throws KeeperException, InterruptedException,
+      IOException {
+    updateCloudState(immediate, false);
+  }
+  
+  // load and publish a new CollectionInfo
+  public void updateLiveNodes() throws KeeperException, InterruptedException,
+      IOException {
+    updateCloudState(true, true);
+  }
+  
+  // load and publish a new CollectionInfo
+  private synchronized void updateCloudState(boolean immediate, final boolean onlyLiveNodes) throws KeeperException, InterruptedException,
+      IOException {
+
+    // TODO: - incremental update rather than reread everything
+    
+    // build immutable CloudInfo
+    
+    if(immediate) {
+      if(!onlyLiveNodes) {
+        log.info("Updating cloud state from ZooKeeper... ");
+      } else {
+        log.info("Updating live nodes from ZooKeeper... ");
+      }
+      CloudState cloudState;
+      cloudState = CloudState.buildCloudState(zkClient, this.cloudState, onlyLiveNodes);
+      // update volatile
+      this.cloudState = cloudState;
+    } else {
+      if(cloudStateUpdateScheduled) {
+        log.info("Cloud state update for ZooKeeper already scheduled");
+        return;
+      }
+      log.info("Scheduling cloud state update from ZooKeeper...");
+      cloudStateUpdateScheduled = true;
+      updateCloudExecutor.schedule(new Runnable() {
+        
+        public void run() {
+          log.info("Updating cloud state from ZooKeeper...");
+          synchronized (getUpdateLock()) {
+            cloudStateUpdateScheduled = false;
+            CloudState cloudState;
+            try {
+              cloudState = CloudState.buildCloudState(zkClient,
+                  ZkStateReader.this.cloudState, onlyLiveNodes);
+            } catch (KeeperException e) {
+              log.error("", e);
+              throw new ZooKeeperException(
+                  SolrException.ErrorCode.SERVER_ERROR, "", e);
+            } catch (InterruptedException e) {
+              // Restore the interrupted status
+              Thread.currentThread().interrupt();
+              log.error("", e);
+              throw new ZooKeeperException(
+                  SolrException.ErrorCode.SERVER_ERROR, "", e);
+            } catch (IOException e) {
+              log.error("", e);
+              throw new ZooKeeperException(
+                  SolrException.ErrorCode.SERVER_ERROR, "", e);
+            }
+            // update volatile
+            ZkStateReader.this.cloudState = cloudState;
+          }
+        }
+      }, CLOUD_UPDATE_DELAY, TimeUnit.MILLISECONDS);
+    }
+
+  }
+  
+  /**
+   * @return information about the cluster from ZooKeeper
+   */
+  public CloudState getCloudState() {
+    return cloudState;
+  }
+  
+  public Object getUpdateLock() {
+    return this;
+  }
+}

Modified: lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java
URL: http://svn.apache.org/viewvc/lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java?rev=909510&r1=909509&r2=909510&view=diff
==============================================================================
--- lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java (original)
+++ lucene/solr/branches/cloud/src/java/org/apache/solr/core/CoreContainer.java Fri Feb 12 17:32:23 2010
@@ -429,9 +429,9 @@
     
     if(zkController != null) {
       try {
-        synchronized (zkController) {
+        synchronized (zkController.getZkStateReader().getUpdateLock()) {
           zkController.addShardZkNodeWatches();
-          zkController.updateCloudState(true);
+          zkController.getZkStateReader().updateCloudState(true);
         }
       } catch (InterruptedException e) {
         // Restore the interrupted status