You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2011/08/24 00:38:29 UTC

svn commit: r1160915 - in /zookeeper/bookkeeper/trunk: CHANGES.txt bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java

Author: breed
Date: Tue Aug 23 22:38:29 2011
New Revision: 1160915

URL: http://svn.apache.org/viewvc?rev=1160915&view=rev
Log:
BOOKKEEPER-38: Bookie Server doesn't exit when its zookeeper session is expired. So the process is hang there.

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
    zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1160915&r1=1160914&r2=1160915&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Tue Aug 23 22:38:29 2011
@@ -30,6 +30,8 @@ BUGFIXES:
 
   BOOKKEEPER-29: BookieRecoveryTest fails intermittently (ivank, fpj via fpj)
 
+  BOOKKEEPER-38: Bookie Server doesn't exit when its zookeeper session is expired. So the process is hang there. (Sijie Guo via breed)
+
  hedwig-server/
 
   BOOKKEEPER-43: NullPointException when releasing topic (Sijie Guo via breed)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1160915&r1=1160914&r2=1160915&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Tue Aug 23 22:38:29 2011
@@ -38,6 +38,7 @@ import java.util.LinkedList;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.CreateMode;
@@ -60,13 +61,14 @@ public class Bookie extends Thread {
     final File journalDirectory;
 
     final File ledgerDirectories[];
-    
+
     // ZK registration path for this bookie
     static final String BOOKIE_REGISTRATION_PATH = "/ledgers/available/";
 
     // ZooKeeper client instance for the Bookie
     ZooKeeper zk;
-    
+    private volatile boolean isZkExpired = true;
+
     // Running flag
     private volatile boolean running = false;
 
@@ -134,6 +136,7 @@ public class Bookie extends Thread {
         }
     }
     SyncThread syncThread = new SyncThread();
+
     public Bookie(int port, String zkServers, File journalDirectory, File ledgerDirectories[]) throws IOException {
         this.journalDirectory = journalDirectory;
         this.ledgerDirectories = ledgerDirectories;
@@ -206,6 +209,10 @@ public class Bookie extends Thread {
         LOG.debug("I'm starting a bookie with journal directory " + journalDirectory.getName());
         start();
         syncThread.start();
+        // set running here. 
+        // since bookie server use running as a flag to tell bookie server whether it is alive
+        // if setting it in bookie thread, the watcher might run before bookie thread.
+        running = true;
     }
 
     /**
@@ -217,16 +224,9 @@ public class Bookie extends Thread {
             zk = null;
             return;
         }
+        int zkTimeout = Integer.getInteger("zkTimeout", 10000);
         // Create the ZooKeeper client instance
-        zk = new ZooKeeper(zkServers, 10000, new Watcher() {
-            @Override
-            public void process(WatchedEvent event) {
-                // TODO: handle session disconnects and expires
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Process: " + event.getType() + " " + event.getPath());
-                }
-            }
-        });
+        zk = newZookeeper(zkServers, zkTimeout);
         // Create the ZK ephemeral node for this Bookie.
         try {
             zk.create(BOOKIE_REGISTRATION_PATH + InetAddress.getLocalHost().getHostAddress() + ":" + port, new byte[0],
@@ -239,6 +239,60 @@ public class Bookie extends Thread {
             throw new IOException(e);
         }
     }
+    
+    /** 
+     * Create a new zookeeper client to zk cluster.
+     * 
+     * <p>
+     * Bookie Server just used zk client when syncing ledgers for garbage collection.
+     * So when zk client is expired, it means this bookie server is not available in
+     * bookie server list. The bookie client will be notified for its expiration. No
+     * more bookie request will be sent to this server. So it's better to exit when zk
+     * expired.
+     * </p>
+     * <p>
+     * Since there are lots of bk operations cached in queue, so we wait for all the operations
+     * are processed and quit. It is done by calling <b>shutdown</b>.
+     * </p>
+     *
+     * @param zkServers the quorum list of zk servers
+     * @param sessionTimeout session timeout of zk connection
+     *
+     * @return zk client instance
+     */
+    private ZooKeeper newZookeeper(final String zkServers,
+            final int sessionTimeout) throws IOException {
+        ZooKeeper newZk = new ZooKeeper(zkServers, sessionTimeout,
+                new Watcher() {
+                    @Override
+                    public void process(WatchedEvent event) {
+                        // handle session disconnects and expires
+                        if (event.getType()
+                                .equals(Watcher.Event.EventType.None)) {
+                            if (event.getState().equals(
+                                    Watcher.Event.KeeperState.Disconnected)) {
+                                LOG.warn("ZK client has been disconnected to the ZK server!");
+                            } else if (event.getState().equals(
+                                    Watcher.Event.KeeperState.SyncConnected)) {
+                                LOG.info("ZK client has been reconnected to the ZK server!");
+                            }
+                        }
+                        // Check for expired connection.
+                        if (event.getState().equals(
+                                Watcher.Event.KeeperState.Expired)) {
+                            LOG.error("ZK client connection to the ZK server has expired!");
+                            isZkExpired = true;
+                            try {
+                                shutdown();
+                            } catch (InterruptedException ie) {
+                                System.exit(-1);
+                            }
+                        }
+                    }
+                });
+        isZkExpired = false;
+        return newZk;
+    }
 
     private static int fullRead(FileChannel fc, ByteBuffer bb) throws IOException {
         int total = 0;
@@ -397,7 +451,7 @@ public class Bookie extends Thread {
             long nextPrealloc = preAllocSize;
             long lastFlushPosition = 0;
             logFile.write(zeros, nextPrealloc);
-            running = true;
+
             // TODO: Currently, when we roll over the journal logs, the older
             // ones are never garbage collected. We should remove a journal log
             // once all of its entries have been synced with the entry logs.
@@ -418,7 +472,11 @@ public class Bookie extends Thread {
                         toFlush.clear();
                     }
                 }
-                if (qe == null) {
+                if (isZkExpired) {
+                    LOG.warn("Exiting... zk client has expired.");
+                    break;
+                }
+                if (qe == null) { // no more queue entry
                     continue;
                 }
                 lenBuff.clear();
@@ -440,7 +498,6 @@ public class Bookie extends Thread {
         } catch (Exception e) {
             LOG.fatal("Bookie thread exiting", e);
         }
-        running = false;
     }
 
     private FileChannel openChannel(long logId) throws FileNotFoundException {
@@ -450,7 +507,10 @@ public class Bookie extends Thread {
         return logFile;
     }
 
-    public void shutdown() throws InterruptedException {
+    public synchronized void shutdown() throws InterruptedException {
+        if (!running) { // avoid shutdown twice
+          return;
+        }
         // Shutdown the ZK client
         if(zk != null) zk.close();
         this.interrupt();
@@ -462,6 +522,8 @@ public class Bookie extends Thread {
         }
         // Shutdown the EntryLogger which has the GarbageCollector Thread running
         entryLogger.shutdown();
+        // setting running to false here, so watch thread in bookie server know it only after bookie shut down
+        running = false;
     }
     
     public void addEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java?rev=1160915&r1=1160914&r2=1160915&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java Tue Aug 23 22:38:29 2011
@@ -40,6 +40,7 @@ public class BookieServer implements NIO
     NIOServerFactory nioServerFactory;
     private volatile boolean running = false;
     Bookie bookie;
+    DeathWatcher deathWatcher;
     static Logger LOG = Logger.getLogger(BookieServer.class);
 
     public BookieServer(int port, String zkServers, File journalDirectory, File ledgerDirectories[]) throws IOException {
@@ -50,23 +51,71 @@ public class BookieServer implements NIO
     public void start() throws IOException {
         nioServerFactory = new NIOServerFactory(port, this);
         running = true;
+        deathWatcher = new DeathWatcher();
+        deathWatcher.start();
     }
 
-    public void shutdown() throws InterruptedException {
-        running = false;
+    public synchronized void shutdown() throws InterruptedException {
+        if (!running) {
+            return;
+        }
         nioServerFactory.shutdown();
         bookie.shutdown();
+        running = false;
     }
 
     public boolean isRunning(){
         return bookie.isRunning() && nioServerFactory.isRunning() && running;
     }
 
+    /**
+     * Whether bookie is running?
+     *
+     * @return true if bookie is running, otherwise return false
+     */
+    public boolean isBookieRunning() {
+        return bookie.isRunning();
+    }
+
+    /**
+     * Whether nio server is running?
+     *
+     * @return true if nio server is running, otherwise return false
+     */
+    public boolean isNioServerRunning() {
+        return nioServerFactory.isRunning();
+    }
+
     public void join() throws InterruptedException {
         nioServerFactory.join();
     }
 
     /**
+     * A thread to watch whether bookie & nioserver is still alive
+     */
+    class DeathWatcher extends Thread {
+        @Override
+        public void run() {
+            int watchInterval = Integer.getInteger("bookie_death_watch_interval", 1000);
+            while(true) {
+                try {
+                    Thread.sleep(watchInterval);
+                } catch (InterruptedException ie) {
+                    // do nothing
+                }
+                if (!isBookieRunning() || !isNioServerRunning()) {
+                    try {
+                        shutdown();
+                    } catch (InterruptedException ie) {
+                        System.exit(-1);
+                    }
+                    break;
+                }
+            }
+        }
+    }
+
+    /**
      * @param args
      * @throws IOException
      * @throws InterruptedException