You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2012/08/04 00:34:08 UTC

svn commit: r1369236 - in /lucene/dev/trunk/solr: core/src/java/org/apache/solr/cloud/ solrj/src/java/org/apache/solr/common/cloud/

Author: markrmiller
Date: Fri Aug  3 22:34:08 2012
New Revision: 1369236

URL: http://svn.apache.org/viewvc?rev=1369236&view=rev
Log:
LUCENE-3985: add close methods to Overseer and ConnectionManager and use them

Modified:
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1369236&r1=1369235&r2=1369236&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Fri Aug  3 22:34:08 2012
@@ -250,16 +250,13 @@ final class ShardLeaderElectionContext e
 final class OverseerElectionContext extends ElectionContext {
   
   private final SolrZkClient zkClient;
-  private final ZkStateReader stateReader;
-  private ShardHandler shardHandler;
-  private String adminPath;
+  private Overseer overseer;
 
-  public OverseerElectionContext(ShardHandler shardHandler, String adminPath, final String zkNodeName, ZkStateReader stateReader) {
-    super(zkNodeName, "/overseer_elect", "/overseer_elect/leader", null, stateReader.getZkClient());
-    this.stateReader = stateReader;
-    this.shardHandler = shardHandler;
-    this.adminPath = adminPath;
-    this.zkClient = stateReader.getZkClient();
+
+  public OverseerElectionContext(SolrZkClient zkClient, Overseer overseer, final String zkNodeName) {
+    super(zkNodeName, "/overseer_elect", "/overseer_elect/leader", null, zkClient);
+    this.overseer = overseer;
+    this.zkClient = zkClient;
   }
 
   @Override
@@ -281,7 +278,7 @@ final class OverseerElectionContext exte
           CreateMode.EPHEMERAL, true);
     }
   
-    new Overseer(shardHandler, adminPath, stateReader, id);
+    overseer.start(id);
   }
   
 }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1369236&r1=1369235&r2=1369236&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java Fri Aug  3 22:34:08 2012
@@ -47,7 +47,7 @@ public class Overseer {
 
   private static Logger log = LoggerFactory.getLogger(Overseer.class);
   
-  private static class CloudStateUpdater implements Runnable {
+  private class CloudStateUpdater implements Runnable {
     
     private static final String DELETECORE = "deletecore";
     private final ZkStateReader reader;
@@ -70,7 +70,7 @@ public class Overseer {
     @Override
     public void run() {
         
-      if(amILeader()) {
+      if(amILeader() && !Overseer.this.isClosed) {
         // see if there's something left from the previous Overseer and re
         // process all events that were not persisted into cloud state
           synchronized (reader.getUpdateLock()) { //XXX this only protects against edits inside single node
@@ -110,7 +110,7 @@ public class Overseer {
         }
       
       log.info("Starting to work on the main queue");
-      while (amILeader()) {
+      while (amILeader() && !isClosed) {
         synchronized (reader.getUpdateLock()) {
           try {
             byte[] head = stateUpdateQueue.peek();
@@ -401,21 +401,45 @@ public class Overseer {
      }
     
   }
+
+  private Thread ccThread;
+
+  private Thread updaterThread;
+
+  private volatile boolean isClosed;
+
+  private ZkStateReader reader;
+
+  private ShardHandler shardHandler;
+
+  private String adminPath;
+  
+  public Overseer(ShardHandler shardHandler, String adminPath, final ZkStateReader reader) throws KeeperException, InterruptedException {
+    this.reader = reader;
+    this.shardHandler = shardHandler;
+    this.adminPath = adminPath;
+  }
   
-  public Overseer(ShardHandler shardHandler, String adminPath, final ZkStateReader reader, final String id) throws KeeperException, InterruptedException {
+  public void start(String id) {
     log.info("Overseer (id=" + id + ") starting");
     createOverseerNode(reader.getZkClient());
     //launch cluster state updater thread
     ThreadGroup tg = new ThreadGroup("Overseer state updater.");
-    Thread updaterThread = new Thread(tg, new CloudStateUpdater(reader, id));
+    updaterThread = new Thread(tg, new CloudStateUpdater(reader, id));
     updaterThread.setDaemon(true);
-    updaterThread.start();
-    
+
     ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
-    Thread ccThread = new Thread(ccTg, new OverseerCollectionProcessor(reader, id, shardHandler, adminPath));
+    ccThread = new Thread(ccTg, new OverseerCollectionProcessor(reader, id, shardHandler, adminPath), 
+        "Overseer-" + id);
     ccThread.setDaemon(true);
+    
+    updaterThread.start();
     ccThread.start();
   }
+  
+  public void close() {
+    isClosed = true;
+  }
 
   /**
    * Get queue that can be used to send messages to Overseer.

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1369236&r1=1369235&r2=1369236&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Fri Aug  3 22:34:08 2012
@@ -64,6 +64,8 @@ public class OverseerCollectionProcessor
   private String adminPath;
 
   private ZkStateReader zkStateReader;
+
+  private boolean isClosed;
   
   public OverseerCollectionProcessor(ZkStateReader zkStateReader, String myId, ShardHandler shardHandler, String adminPath) {
     this.zkStateReader = zkStateReader;
@@ -76,7 +78,7 @@ public class OverseerCollectionProcessor
   @Override
   public void run() {
     log.info("Process current queue of collection creations");
-    while (amILeader()) {
+    while (amILeader() && !isClosed) {
       try {
         byte[] head = workQueue.peek(true);
         
@@ -108,6 +110,10 @@ public class OverseerCollectionProcessor
     }
   }
   
+  public void close() {
+    isClosed = true;
+  }
+  
   private boolean amILeader() {
     try {
       ZkNodeProps props = ZkNodeProps.load(zkStateReader.getZkClient().getData(

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1369236&r1=1369235&r2=1369236&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java Fri Aug  3 22:34:08 2012
@@ -121,6 +121,8 @@ public final class ZkController {
   // may accept defaults or use mocks rather than pulling things from a CoreContainer
   private CoreContainer cc;
 
+  protected volatile Overseer overseer;
+
   /**
    * @param cc if null, recovery will not be enabled
    * @param zkServerAddress
@@ -170,10 +172,8 @@ public final class ZkController {
                 shardHandler = cc.getShardHandlerFactory().getShardHandler();
                 adminPath = cc.getAdminPath();
               }
-              
-              ElectionContext context = new OverseerElectionContext(
-                  shardHandler, adminPath,
-                  getNodeName(), zkStateReader);
+              ZkController.this.overseer = new Overseer(shardHandler, adminPath, zkStateReader);
+              ElectionContext context = new OverseerElectionContext(zkClient, overseer, getNodeName());
               overseerElector.joinElection(context);
               zkStateReader.createClusterStateWatchersAndUpdate();
               
@@ -243,6 +243,11 @@ public final class ZkController {
    */
   public void close() {
     try {
+      overseer.close();
+    } catch(Throwable t) {
+      log.error("Error closing overseer", t);
+    }
+    try {
       zkClient.close();
     } catch (InterruptedException e) {
       // Restore the interrupted status
@@ -366,8 +371,8 @@ public final class ZkController {
       }
       
       overseerElector = new LeaderElector(zkClient);
-      ElectionContext context = new OverseerElectionContext(shardHandler,
-          adminPath, getNodeName(), zkStateReader);
+      this.overseer = new Overseer(shardHandler, adminPath, zkStateReader);
+      ElectionContext context = new OverseerElectionContext(zkClient, overseer, getNodeName());
       overseerElector.setup(context);
       overseerElector.joinElection(context);
       zkStateReader.createClusterStateWatchersAndUpdate();

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java?rev=1369236&r1=1369235&r2=1369236&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java Fri Aug  3 22:34:08 2012
@@ -47,6 +47,8 @@ class ConnectionManager implements Watch
 
   private OnReconnect onReconnect;
 
+  private volatile boolean isClosed = false;
+
   public ConnectionManager(String name, SolrZkClient client, String zkServerAddress, int zkClientTimeout, ZkClientConnectionStrategy strat, OnReconnect onConnect) {
     this.name = name;
     this.client = client;
@@ -68,6 +70,8 @@ class ConnectionManager implements Watch
       log.info("Watcher " + this + " name:" + name + " got event " + event
           + " path:" + event.getPath() + " type:" + event.getType());
     }
+    
+    checkClosed();
 
     state = event.getState();
     if (state == KeeperState.SyncConnected) {
@@ -81,11 +85,18 @@ class ConnectionManager implements Watch
         connectionStrategy.reconnect(zkServerAddress, zkClientTimeout, this,
             new ZkClientConnectionStrategy.ZkUpdate() {
               @Override
-              public void update(SolrZooKeeper keeper)
-                  throws InterruptedException, TimeoutException {
+              public void update(SolrZooKeeper keeper) throws TimeoutException {
                 synchronized (connectionStrategy) {
-                  waitForConnected(SolrZkClient.DEFAULT_CLIENT_CONNECT_TIMEOUT);
-                  client.updateKeeper(keeper);
+                  checkClosed();
+                  try {
+                    waitForConnected(SolrZkClient.DEFAULT_CLIENT_CONNECT_TIMEOUT);
+                    checkClosed();
+                    client.updateKeeper(keeper);
+                  } catch (InterruptedException e) {
+                    // we must have been asked to stop
+                    throw new RuntimeException("Giving up on connecting - we were interrupted");
+                  }
+                  checkClosed();
                   if (onReconnect != null) {
                     onReconnect.command();
                   }
@@ -95,6 +106,7 @@ class ConnectionManager implements Watch
                 }
                 
               }
+
             });
       } catch (Exception e) {
         SolrException.log(log, "", e);
@@ -109,7 +121,13 @@ class ConnectionManager implements Watch
   }
 
   public synchronized boolean isConnected() {
-    return connected;
+    return !isClosed && connected;
+  }
+  
+  // we use a volatile rather than sync
+  // to avoid deadlock on shutdown
+  public void close() {
+    this.isClosed = true;
   }
 
   public synchronized KeeperState state() {
@@ -122,12 +140,20 @@ class ConnectionManager implements Watch
     long left = waitForConnection;
     while (!connected && left > 0) {
       wait(left);
+      checkClosed();
       left = expire - System.currentTimeMillis();
     }
     if (!connected) {
       throw new TimeoutException("Could not connect to ZooKeeper " + zkServerAddress + " within " + waitForConnection + " ms");
     }
   }
+  
+  private synchronized void checkClosed() {
+    if (isClosed) {
+      log.info("Not acting because I am closed");
+      return;
+    }
+  }
 
   public synchronized void waitForDisconnected(long timeout)
       throws InterruptedException, TimeoutException {

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java?rev=1369236&r1=1369235&r2=1369236&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java Fri Aug  3 22:34:08 2012
@@ -654,7 +654,11 @@ public class SolrZkClient {
   public void close() throws InterruptedException {
     if (isClosed) return; // it's okay if we over close - same as solrcore
     isClosed = true;
-    keeper.close();
+    try {
+      keeper.close();
+    } finally {
+      connManager.close();
+    }
     numCloses.incrementAndGet();
   }