You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2011/08/24 00:38:29 UTC
svn commit: r1160915 - in /zookeeper/bookkeeper/trunk: CHANGES.txt
bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
Author: breed
Date: Tue Aug 23 22:38:29 2011
New Revision: 1160915
URL: http://svn.apache.org/viewvc?rev=1160915&view=rev
Log:
BOOKKEEPER-38: Bookie Server doesn't exit when its zookeeper session is expired. So the process is hang there.
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1160915&r1=1160914&r2=1160915&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Tue Aug 23 22:38:29 2011
@@ -30,6 +30,8 @@ BUGFIXES:
BOOKKEEPER-29: BookieRecoveryTest fails intermittently (ivank, fpj via fpj)
+ BOOKKEEPER-38: Bookie Server doesn't exit when its zookeeper session is expired. So the process is hang there. (Sijie Guo via breed)
+
hedwig-server/
BOOKKEEPER-43: NullPointException when releasing topic (Sijie Guo via breed)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1160915&r1=1160914&r2=1160915&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Tue Aug 23 22:38:29 2011
@@ -38,6 +38,7 @@ import java.util.LinkedList;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
@@ -60,13 +61,14 @@ public class Bookie extends Thread {
final File journalDirectory;
final File ledgerDirectories[];
-
+
// ZK registration path for this bookie
static final String BOOKIE_REGISTRATION_PATH = "/ledgers/available/";
// ZooKeeper client instance for the Bookie
ZooKeeper zk;
-
+ private volatile boolean isZkExpired = true;
+
// Running flag
private volatile boolean running = false;
@@ -134,6 +136,7 @@ public class Bookie extends Thread {
}
}
SyncThread syncThread = new SyncThread();
+
public Bookie(int port, String zkServers, File journalDirectory, File ledgerDirectories[]) throws IOException {
this.journalDirectory = journalDirectory;
this.ledgerDirectories = ledgerDirectories;
@@ -206,6 +209,10 @@ public class Bookie extends Thread {
LOG.debug("I'm starting a bookie with journal directory " + journalDirectory.getName());
start();
syncThread.start();
+ // set running here.
+ // since bookie server use running as a flag to tell bookie server whether it is alive
+ // if setting it in bookie thread, the watcher might run before bookie thread.
+ running = true;
}
/**
@@ -217,16 +224,9 @@ public class Bookie extends Thread {
zk = null;
return;
}
+ int zkTimeout = Integer.getInteger("zkTimeout", 10000);
// Create the ZooKeeper client instance
- zk = new ZooKeeper(zkServers, 10000, new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- // TODO: handle session disconnects and expires
- if (LOG.isDebugEnabled()) {
- LOG.debug("Process: " + event.getType() + " " + event.getPath());
- }
- }
- });
+ zk = newZookeeper(zkServers, zkTimeout);
// Create the ZK ephemeral node for this Bookie.
try {
zk.create(BOOKIE_REGISTRATION_PATH + InetAddress.getLocalHost().getHostAddress() + ":" + port, new byte[0],
@@ -239,6 +239,60 @@ public class Bookie extends Thread {
throw new IOException(e);
}
}
+
+ /**
+ * Create a new zookeeper client to zk cluster.
+ *
+ * <p>
+ * Bookie Server just used zk client when syncing ledgers for garbage collection.
+ * So when zk client is expired, it means this bookie server is not available in
+ * bookie server list. The bookie client will be notified for its expiration. No
+ * more bookie request will be sent to this server. So it's better to exit when zk
+ * expired.
+ * </p>
+ * <p>
+ * Since there are lots of bk operations cached in queue, so we wait for all the operations
+ * are processed and quit. It is done by calling <b>shutdown</b>.
+ * </p>
+ *
+ * @param zkServers the quorum list of zk servers
+ * @param sessionTimeout session timeout of zk connection
+ *
+ * @return zk client instance
+ */
+ private ZooKeeper newZookeeper(final String zkServers,
+ final int sessionTimeout) throws IOException {
+ ZooKeeper newZk = new ZooKeeper(zkServers, sessionTimeout,
+ new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ // handle session disconnects and expires
+ if (event.getType()
+ .equals(Watcher.Event.EventType.None)) {
+ if (event.getState().equals(
+ Watcher.Event.KeeperState.Disconnected)) {
+ LOG.warn("ZK client has been disconnected to the ZK server!");
+ } else if (event.getState().equals(
+ Watcher.Event.KeeperState.SyncConnected)) {
+ LOG.info("ZK client has been reconnected to the ZK server!");
+ }
+ }
+ // Check for expired connection.
+ if (event.getState().equals(
+ Watcher.Event.KeeperState.Expired)) {
+ LOG.error("ZK client connection to the ZK server has expired!");
+ isZkExpired = true;
+ try {
+ shutdown();
+ } catch (InterruptedException ie) {
+ System.exit(-1);
+ }
+ }
+ }
+ });
+ isZkExpired = false;
+ return newZk;
+ }
private static int fullRead(FileChannel fc, ByteBuffer bb) throws IOException {
int total = 0;
@@ -397,7 +451,7 @@ public class Bookie extends Thread {
long nextPrealloc = preAllocSize;
long lastFlushPosition = 0;
logFile.write(zeros, nextPrealloc);
- running = true;
+
// TODO: Currently, when we roll over the journal logs, the older
// ones are never garbage collected. We should remove a journal log
// once all of its entries have been synced with the entry logs.
@@ -418,7 +472,11 @@ public class Bookie extends Thread {
toFlush.clear();
}
}
- if (qe == null) {
+ if (isZkExpired) {
+ LOG.warn("Exiting... zk client has expired.");
+ break;
+ }
+ if (qe == null) { // no more queue entry
continue;
}
lenBuff.clear();
@@ -440,7 +498,6 @@ public class Bookie extends Thread {
} catch (Exception e) {
LOG.fatal("Bookie thread exiting", e);
}
- running = false;
}
private FileChannel openChannel(long logId) throws FileNotFoundException {
@@ -450,7 +507,10 @@ public class Bookie extends Thread {
return logFile;
}
- public void shutdown() throws InterruptedException {
+ public synchronized void shutdown() throws InterruptedException {
+ if (!running) { // avoid shutdown twice
+ return;
+ }
// Shutdown the ZK client
if(zk != null) zk.close();
this.interrupt();
@@ -462,6 +522,8 @@ public class Bookie extends Thread {
}
// Shutdown the EntryLogger which has the GarbageCollector Thread running
entryLogger.shutdown();
+ // setting running to false here, so watch thread in bookie server know it only after bookie shut down
+ running = false;
}
public void addEntry(ByteBuffer entry, WriteCallback cb, Object ctx, byte[] masterKey)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java?rev=1160915&r1=1160914&r2=1160915&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java Tue Aug 23 22:38:29 2011
@@ -40,6 +40,7 @@ public class BookieServer implements NIO
NIOServerFactory nioServerFactory;
private volatile boolean running = false;
Bookie bookie;
+ DeathWatcher deathWatcher;
static Logger LOG = Logger.getLogger(BookieServer.class);
public BookieServer(int port, String zkServers, File journalDirectory, File ledgerDirectories[]) throws IOException {
@@ -50,23 +51,71 @@ public class BookieServer implements NIO
public void start() throws IOException {
nioServerFactory = new NIOServerFactory(port, this);
running = true;
+ deathWatcher = new DeathWatcher();
+ deathWatcher.start();
}
- public void shutdown() throws InterruptedException {
- running = false;
+ public synchronized void shutdown() throws InterruptedException {
+ if (!running) {
+ return;
+ }
nioServerFactory.shutdown();
bookie.shutdown();
+ running = false;
}
public boolean isRunning(){
return bookie.isRunning() && nioServerFactory.isRunning() && running;
}
+ /**
+ * Whether bookie is running?
+ *
+ * @return true if bookie is running, otherwise return false
+ */
+ public boolean isBookieRunning() {
+ return bookie.isRunning();
+ }
+
+ /**
+ * Whether nio server is running?
+ *
+ * @return true if nio server is running, otherwise return false
+ */
+ public boolean isNioServerRunning() {
+ return nioServerFactory.isRunning();
+ }
+
public void join() throws InterruptedException {
nioServerFactory.join();
}
/**
+ * A thread to watch whether bookie & nioserver is still alive
+ */
+ class DeathWatcher extends Thread {
+ @Override
+ public void run() {
+ int watchInterval = Integer.getInteger("bookie_death_watch_interval", 1000);
+ while(true) {
+ try {
+ Thread.sleep(watchInterval);
+ } catch (InterruptedException ie) {
+ // do nothing
+ }
+ if (!isBookieRunning() || !isNioServerRunning()) {
+ try {
+ shutdown();
+ } catch (InterruptedException ie) {
+ System.exit(-1);
+ }
+ break;
+ }
+ }
+ }
+ }
+
+ /**
* @param args
* @throws IOException
* @throws InterruptedException