You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/12/18 12:00:52 UTC
svn commit: r1423409 - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/
bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/
bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/ ...
Author: ivank
Date: Tue Dec 18 11:00:49 2012
New Revision: 1423409
URL: http://svn.apache.org/viewvc?rev=1423409&view=rev
Log:
BOOKKEEPER-496: Ensure that the auditor and replication worker will shutdown if they lose their ZK session (ivank)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorBookieTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1423409&r1=1423408&r2=1423409&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Tue Dec 18 11:00:49 2012
@@ -146,6 +146,8 @@ Trunk (unreleased changes)
BOOKKEEPER-509: TestBookKeeperPersistenceManager failed on latest trunk (sijie via ivank)
+ BOOKKEEPER-496: Ensure that the auditor and replication worker will shutdown if they lose their ZK session (ivank)
+
hedwig-protocol:
BOOKKEEPER-394: CompositeException message is not useful (Stu Hood via sijie)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java?rev=1423409&r1=1423408&r2=1423409&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java Tue Dec 18 11:00:49 2012
@@ -191,7 +191,7 @@ public class ZkLedgerUnderreplicationMan
}
}
- private String getParentZnodePath(String base, long ledgerId) {
+ public static String getParentZnodePath(String base, long ledgerId) {
String subdir1 = String.format("%04x", ledgerId >> 48 & 0xffff);
String subdir2 = String.format("%04x", ledgerId >> 32 & 0xffff);
String subdir3 = String.format("%04x", ledgerId >> 16 & 0xffff);
@@ -201,8 +201,12 @@ public class ZkLedgerUnderreplicationMan
base, subdir1, subdir2, subdir3, subdir4);
}
+ public static String getUrLedgerZnode(String base, long ledgerId) {
+ return String.format("%s/urL%010d", getParentZnodePath(base, ledgerId), ledgerId);
+ }
+
private String getUrLedgerZnode(long ledgerId) {
- return String.format("%s/urL%010d", getParentZnodePath(urLedgerPath, ledgerId), ledgerId);
+ return getUrLedgerZnode(urLedgerPath, ledgerId);
}
@@ -376,7 +380,9 @@ public class ZkLedgerUnderreplicationMan
public void process(WatchedEvent e) {
if (e.getType() == Watcher.Event.EventType.NodeChildrenChanged
|| e.getType() == Watcher.Event.EventType.NodeDeleted
- || e.getType() == Watcher.Event.EventType.NodeCreated) {
+ || e.getType() == Watcher.Event.EventType.NodeCreated
+ || e.getState() == Watcher.Event.KeeperState.Expired
+ || e.getState() == Watcher.Event.KeeperState.Disconnected) {
changedLatch.countDown();
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java?rev=1423409&r1=1423408&r2=1423409&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java Tue Dec 18 11:00:49 2012
@@ -23,9 +23,13 @@ package org.apache.bookkeeper.replicatio
import java.io.IOException;
import java.util.Collection;
import java.util.List;
+import java.util.ArrayList;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadFactory;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
@@ -39,6 +43,7 @@ import org.apache.zookeeper.WatchedEvent
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,22 +54,30 @@ import org.slf4j.LoggerFactory;
* re-replication activities by keeping all the corresponding ledgers of the
* failed bookie as underreplicated znode in zk.
*/
-public class Auditor extends Thread implements Watcher {
+public class Auditor implements Watcher {
private static final Logger LOG = LoggerFactory.getLogger(Auditor.class);
- private final LinkedBlockingQueue<EventType> bookieNotifications = new LinkedBlockingQueue<EventType>();
+
private final AbstractConfiguration conf;
private final ZooKeeper zkc;
private BookieLedgerIndexer bookieLedgerIndexer;
private LedgerUnderreplicationManager ledgerUnderreplicationManager;
- private volatile boolean running = true;
+ private final ExecutorService executor;
+ private List<String> knownBookies = new ArrayList<String>();
- public Auditor(String bookieIdentifier, AbstractConfiguration conf,
+ public Auditor(final String bookieIdentifier, AbstractConfiguration conf,
ZooKeeper zkc) throws UnavailableException {
- setName("AuditorBookie-" + bookieIdentifier);
- setDaemon(true);
this.conf = conf;
this.zkc = zkc;
initialize(conf, zkc);
+
+ executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, "AuditorBookie-" + bookieIdentifier);
+ t.setDaemon(true);
+ return t;
+ }
+ });
}
private void initialize(AbstractConfiguration conf, ZooKeeper zkc)
@@ -94,53 +107,91 @@ public class Auditor extends Thread impl
}
}
- @Override
- public void run() {
- LOG.info("I'm starting as Auditor Bookie");
- try {
- // on startup watching available bookie and based on the
- // available bookies determining the bookie failures.
- List<String> knownBookies = getAvailableBookies();
- auditingBookies(knownBookies);
-
- while (true) {
- // wait for bookie join/failure notifications
- bookieNotifications.take();
-
- // check whether ledger replication is enabled
- waitIfLedgerReplicationDisabled();
-
- List<String> availableBookies = getAvailableBookies();
-
- // casting to String, as knownBookies and availableBookies
- // contains only String values
- // find new bookies(if any) and update the known bookie list
- Collection<String> newBookies = CollectionUtils.subtract(
- availableBookies, knownBookies);
- knownBookies.addAll(newBookies);
-
- // find lost bookies(if any)
- Collection<String> lostBookies = CollectionUtils.subtract(
- knownBookies, availableBookies);
-
- if (lostBookies.size() > 0) {
- knownBookies.removeAll(lostBookies);
- Map<String, Set<Long>> ledgerDetails = generateBookie2LedgersIndex();
- handleLostBookies(lostBookies, ledgerDetails);
- }
+ private void submitShutdownTask() {
+ synchronized (this) {
+ if (executor.isShutdown()) {
+ return;
}
- } catch (KeeperException ke) {
- LOG.error("Exception while watching available bookies", ke);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- LOG.error("Interrupted while watching available bookies ", ie);
- } catch (BKAuditException bke) {
- LOG.error("Exception while watching available bookies", bke);
- } catch (UnavailableException ue) {
- LOG.error("Exception while watching available bookies", ue);
+ executor.submit(new Runnable() {
+ public void run() {
+ synchronized (Auditor.this) {
+ executor.shutdown();
+ }
+ }
+ });
+ }
+ }
+
+ private synchronized void submitAuditTask() {
+ synchronized (this) {
+ if (executor.isShutdown()) {
+ return;
+ }
+ executor.submit(new Runnable() {
+ public void run() {
+ try {
+ waitIfLedgerReplicationDisabled();
+
+ List<String> availableBookies = getAvailableBookies();
+
+ // casting to String, as knownBookies and availableBookies
+ // contains only String values
+ // find new bookies(if any) and update the known bookie list
+ Collection<String> newBookies = CollectionUtils.subtract(
+ availableBookies, knownBookies);
+ knownBookies.addAll(newBookies);
+
+ // find lost bookies(if any)
+ Collection<String> lostBookies = CollectionUtils.subtract(
+ knownBookies, availableBookies);
+
+ if (lostBookies.size() > 0) {
+ knownBookies.removeAll(lostBookies);
+ Map<String, Set<Long>> ledgerDetails = generateBookie2LedgersIndex();
+ handleLostBookies(lostBookies, ledgerDetails);
+ }
+ } catch (KeeperException ke) {
+ LOG.error("Exception while watching available bookies", ke);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ LOG.error("Interrupted while watching available bookies ", ie);
+ } catch (BKAuditException bke) {
+ LOG.error("Exception while watching available bookies", bke);
+ } catch (UnavailableException ue) {
+ LOG.error("Exception while watching available bookies", ue);
+ }
+ }
+ });
}
+ }
- shutdown();
+ public void start() {
+ LOG.info("I'm starting as Auditor Bookie");
+ // on startup watching available bookie and based on the
+ // available bookies determining the bookie failures.
+ synchronized (this) {
+ if (executor.isShutdown()) {
+ return;
+ }
+ executor.submit(new Runnable() {
+ public void run() {
+ try {
+ knownBookies = getAvailableBookies();
+ auditingBookies(knownBookies);
+ } catch (KeeperException ke) {
+ LOG.error("Exception while watching available bookies", ke);
+ submitShutdownTask();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ LOG.error("Interrupted while watching available bookies ", ie);
+ submitShutdownTask();
+ } catch (BKAuditException bke) {
+ LOG.error("Exception while watching available bookies", bke);
+ submitShutdownTask();
+ }
+ }
+ });
+ }
}
private void waitIfLedgerReplicationDisabled() throws UnavailableException,
@@ -214,9 +265,11 @@ public class Auditor extends Thread impl
@Override
public void process(WatchedEvent event) {
// listen children changed event from ZooKeeper
- if (event.getType() == EventType.NodeChildrenChanged) {
- if (running)
- bookieNotifications.add(event.getType());
+ if (event.getState() == KeeperState.Disconnected
+ || event.getState() == KeeperState.Expired) {
+ submitShutdownTask();
+ } else if (event.getType() == EventType.NodeChildrenChanged) {
+ submitAuditTask();
}
}
@@ -224,14 +277,14 @@ public class Auditor extends Thread impl
* Shutdown the auditor
*/
public void shutdown() {
- if (!running) {
- return;
- }
- running = false;
- LOG.info("Shutting down " + getName());
- this.interrupt();
+ LOG.info("Shutting down auditor");
+ submitShutdownTask();
+
try {
- this.join();
+ while (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
+ LOG.warn("Executor not shutting down, interrupting");
+ executor.shutdownNow();
+ }
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.warn("Interrupted while shutting down auditor bookie", ie);
@@ -244,6 +297,6 @@ public class Auditor extends Thread impl
* @return auditor status
*/
public boolean isRunning() {
- return running;
+ return !executor.isShutdown();
}
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java?rev=1423409&r1=1423408&r2=1423409&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java Tue Dec 18 11:00:49 2012
@@ -26,6 +26,12 @@ import java.util.List;
import java.io.Serializable;
import org.apache.bookkeeper.proto.DataFormats.AuditorVoteFormat;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.util.BookKeeperConstants;
@@ -36,6 +42,7 @@ import org.apache.zookeeper.WatchedEvent
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import com.google.protobuf.TextFormat;
import static com.google.common.base.Charsets.UTF_8;
@@ -69,10 +76,12 @@ public class AuditorElector {
private final String bookieId;
private final AbstractConfiguration conf;
private final ZooKeeper zkc;
+ private final ExecutorService executor;
private String myVote;
Auditor auditor;
- private volatile boolean running = true;
+ private AtomicBoolean running = new AtomicBoolean(false);
+
/**
* AuditorElector for performing the auditor election
@@ -86,8 +95,8 @@ public class AuditorElector {
* @throws UnavailableException
* throws unavailable exception while initializing the elector
*/
- public AuditorElector(String bookieId, AbstractConfiguration conf,
- ZooKeeper zkc) throws UnavailableException {
+ public AuditorElector(final String bookieId, AbstractConfiguration conf,
+ ZooKeeper zkc) throws UnavailableException {
this.bookieId = bookieId;
this.conf = conf;
this.zkc = zkc;
@@ -95,65 +104,15 @@ public class AuditorElector {
+ BookKeeperConstants.UNDER_REPLICATION_NODE;
electionPath = basePath + "/auditorelection";
createElectorPath();
+ executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "AuditorElector-"+bookieId);
+ }
+ });
}
- /**
- * Performing the auditor election using the ZooKeeper ephemeral sequential
- * znode. The bookie which has created the least sequential will be elect as
- * Auditor.
- *
- * @throws UnavailableException
- * when performing auditor election
- *
- */
- public void doElection() throws UnavailableException {
- try {
- // creating my vote in zk. Vote format is 'V_numeric'
- createMyVote();
- List<String> children = zkc.getChildren(getVotePath(""), false);
-
- if (0 >= children.size()) {
- throw new IllegalArgumentException(
- "Atleast one bookie server should present to elect the Auditor!");
- }
- // sorting in ascending order of sequential number
- Collections.sort(children, new ElectionComparator());
- String voteNode = StringUtils.substringAfterLast(myVote,
- PATH_SEPARATOR);
-
- // starting Auditing service
- if (children.get(AUDITOR_INDEX).equals(voteNode)) {
- // update the auditor bookie id in the election path. This is
- // done for debugging purpose
- AuditorVoteFormat.Builder builder = AuditorVoteFormat.newBuilder()
- .setBookieId(bookieId);
-
- zkc.setData(getVotePath(""), TextFormat.printToString(builder.build()).getBytes(UTF_8), -1);
- auditor = new Auditor(bookieId, conf, zkc);
- auditor.start();
- } else {
- // If not an auditor, will be watching to my predecessor and
- // looking the previous node deletion.
- Watcher electionWatcher = new ElectionWatcher();
- int myIndex = children.indexOf(voteNode);
- int prevNodeIndex = myIndex - 1;
- if (null == zkc.exists(getVotePath(PATH_SEPARATOR)
- + children.get(prevNodeIndex), electionWatcher)) {
- // While adding, the previous znode doesn't exists.
- // Again going to election.
- doElection();
- }
- }
- } catch (KeeperException e) {
- throw new UnavailableException(
- "Exception while performing auditor election", e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new UnavailableException(
- "Interrupted while performing auditor election", e);
- }
- }
private void createMyVote() throws KeeperException, InterruptedException {
if (null == myVote || null == zkc.exists(myVote, false)) {
@@ -202,38 +161,124 @@ public class AuditorElector {
* deletion or expiration.
*/
private class ElectionWatcher implements Watcher {
-
@Override
public void process(WatchedEvent event) {
- if (event.getType() == EventType.NodeDeleted) {
- try {
- doElection();
- } catch (UnavailableException e) {
- LOG.error("Exception when performing Auditor re-election",
- e);
- shutdown();
- }
+ if (event.getState() == KeeperState.Disconnected
+ || event.getState() == KeeperState.Expired) {
+ LOG.error("Lost ZK connection, shutting down");
+ submitShutdownTask();
+ } else if (event.getType() == EventType.NodeDeleted) {
+ submitElectionTask();
}
}
}
+ public void start() {
+ running.set(true);
+ submitElectionTask();
+ }
+
+ /**
+ * Run cleanup operations for the auditor elector.
+ */
+ private void submitShutdownTask() {
+ executor.submit(new Runnable() {
+ public void run() {
+ if (!running.compareAndSet(true, false)) {
+ return;
+ }
+ LOG.info("Shutting down AuditorElector");
+ if (myVote != null) {
+ try {
+ zkc.delete(myVote, -1);
+ } catch (InterruptedException ie) {
+ LOG.warn("InterruptedException while deleting myVote: " + myVote,
+ ie);
+ } catch (KeeperException ke) {
+ LOG.error("Exception while deleting myVote:" + myVote, ke);
+ }
+ }
+ }
+ });
+ }
+
+ /**
+ * Performing the auditor election using the ZooKeeper ephemeral sequential
+ * znode. The bookie which has created the least sequential will be elect as
+ * Auditor.
+ */
+ @VisibleForTesting
+ void submitElectionTask() {
+
+ Runnable r = new Runnable() {
+ public void run() {
+ try {
+ // creating my vote in zk. Vote format is 'V_numeric'
+ createMyVote();
+ List<String> children = zkc.getChildren(getVotePath(""), false);
+
+ if (0 >= children.size()) {
+ throw new IllegalArgumentException(
+ "Atleast one bookie server should present to elect the Auditor!");
+ }
+
+ // sorting in ascending order of sequential number
+ Collections.sort(children, new ElectionComparator());
+ String voteNode = StringUtils.substringAfterLast(myVote,
+ PATH_SEPARATOR);
+
+ // starting Auditing service
+ if (children.get(AUDITOR_INDEX).equals(voteNode)) {
+ // update the auditor bookie id in the election path. This is
+ // done for debugging purpose
+ AuditorVoteFormat.Builder builder = AuditorVoteFormat.newBuilder()
+ .setBookieId(bookieId);
+
+ zkc.setData(getVotePath(""),
+ TextFormat.printToString(builder.build()).getBytes(UTF_8), -1);
+ auditor = new Auditor(bookieId, conf, zkc);
+ auditor.start();
+ } else {
+ // If not an auditor, will be watching to my predecessor and
+ // looking the previous node deletion.
+ Watcher electionWatcher = new ElectionWatcher();
+ int myIndex = children.indexOf(voteNode);
+ int prevNodeIndex = myIndex - 1;
+ if (null == zkc.exists(getVotePath(PATH_SEPARATOR)
+ + children.get(prevNodeIndex), electionWatcher)) {
+ // While adding, the previous znode doesn't exists.
+ // Again going to election.
+ submitElectionTask();
+ }
+ }
+ } catch (KeeperException e) {
+ LOG.error("Exception while performing auditor election", e);
+ submitShutdownTask();
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while performing auditor election", e);
+ Thread.currentThread().interrupt();
+ submitShutdownTask();
+ } catch (UnavailableException e) {
+ LOG.error("Ledger underreplication manager unavailable during election", e);
+ submitShutdownTask();
+ }
+ }
+ };
+ executor.submit(r);
+ }
+
/**
* Shutting down AuditorElector
*/
- public void shutdown() {
- if (!running) {
- return;
- }
- running = false;
- LOG.info("Shutting down AuditorElector");
- try {
- zkc.delete(myVote, -1);
- } catch (InterruptedException ie) {
- LOG.warn("InterruptedException while deleting myVote: " + myVote,
- ie);
- } catch (KeeperException ke) {
- LOG.warn("Exception while deleting myVote:" + myVote, ke);
+ public void shutdown() throws InterruptedException {
+ synchronized (this) {
+ if (executor.isShutdown()) {
+ return;
+ }
+ submitShutdownTask();
+ executor.shutdown();
}
+
if (auditor != null) {
auditor.shutdown();
auditor = null;
@@ -250,7 +295,7 @@ public class AuditorElector {
if (auditor != null) {
return auditor.isRunning();
}
- return running;
+ return running.get();
}
/**
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java?rev=1423409&r1=1423408&r2=1423409&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java Tue Dec 18 11:00:49 2012
@@ -89,7 +89,7 @@ public class AutoRecoveryMain {
* Start daemons
*/
public void start() throws UnavailableException {
- auditorElector.doElection();
+ auditorElector.start();
replicationWorker.start();
deathWatcher.start();
}
@@ -117,15 +117,19 @@ public class AutoRecoveryMain {
try {
deathWatcher.interrupt();
deathWatcher.join();
+
+ auditorElector.shutdown();
} catch (InterruptedException e) {
- // Ignore
+ Thread.currentThread().interrupt();
+ LOG.warn("Interrupted shutting down auto recovery", e);
}
- auditorElector.shutdown();
+
replicationWorker.shutdown();
try {
zk.close();
} catch (InterruptedException e) {
- // Ignore
+ Thread.currentThread().interrupt();
+ LOG.warn("Interrupted shutting down auto recovery", e);
}
}
@@ -157,7 +161,7 @@ public class AutoRecoveryMain {
}
// If any one service not running, then shutdown peer.
if (!autoRecoveryMain.auditorElector.isRunning()
- || !autoRecoveryMain.replicationWorker.isRunning()) {
+ || !autoRecoveryMain.replicationWorker.isRunning()) {
autoRecoveryMain.shutdown();
break;
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java?rev=1423409&r1=1423408&r2=1423409&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java Tue Dec 18 11:00:49 2012
@@ -283,10 +283,12 @@ public class ReplicationWorker implement
* Stop the replication worker service
*/
public void shutdown() {
- if (!workerRunning) {
- return;
+ synchronized (this) {
+ if (!workerRunning) {
+ return;
+ }
+ workerRunning = false;
}
- workerRunning = false;
try {
underreplicationManager.close();
} catch (UnavailableException e) {
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorBookieTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorBookieTest.java?rev=1423409&r1=1423408&r2=1423409&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorBookieTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorBookieTest.java Tue Dec 18 11:00:49 2012
@@ -28,10 +28,13 @@ import junit.framework.Assert;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookieServer;
-import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
+
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.util.StringUtils;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,6 +51,7 @@ public class AuditorBookieTest extends B
.getLogger(AuditorBookieTest.class);
private String electionPath;
private HashMap<String, AuditorElector> auditorElectors = new HashMap<String, AuditorElector>();
+ private List<ZooKeeper> zkClients = new LinkedList<ZooKeeper>();
public AuditorBookieTest() {
super(6);
@@ -64,6 +68,10 @@ public class AuditorBookieTest extends B
@Override
public void tearDown() throws Exception {
stopAuditorElectors();
+ for (ZooKeeper zk : zkClients) {
+ zk.close();
+ }
+ zkClients.clear();
super.tearDown();
}
@@ -84,7 +92,7 @@ public class AuditorBookieTest extends B
} else {
bkIndexDownBookie = indexOf - 1;
}
- shudownBookie(bs.get(bkIndexDownBookie));
+ shutdownBookie(bs.get(bkIndexDownBookie));
startNewBookie();
startNewBookie();
@@ -102,12 +110,12 @@ public class AuditorBookieTest extends B
@Test
public void testSuccessiveAuditorCrashes() throws Exception {
BookieServer auditor = verifyAuditor();
- shudownBookie(auditor);
+ shutdownBookie(auditor);
BookieServer newAuditor1 = waitForNewAuditor(auditor);
bs.remove(auditor);
- shudownBookie(newAuditor1);
+ shutdownBookie(newAuditor1);
BookieServer newAuditor2 = waitForNewAuditor(newAuditor1);
Assert.assertNotSame(
"Auditor re-election is not happened for auditor failure!",
@@ -143,7 +151,7 @@ public class AuditorBookieTest extends B
@Test
public void testShutdown() throws Exception {
BookieServer auditor = verifyAuditor();
- shudownBookie(auditor);
+ shutdownBookie(auditor);
// waiting for new auditor
BookieServer newAuditor = waitForNewAuditor(auditor);
@@ -172,7 +180,8 @@ public class AuditorBookieTest extends B
public void testRestartAuditorBookieAfterCrashing() throws Exception {
BookieServer auditor = verifyAuditor();
- shudownBookie(auditor);
+ shutdownBookie(auditor);
+ String addr = StringUtils.addrToString(auditor.getLocalAddress());
// restarting Bookie with same configurations.
int indexOfDownBookie = bs.indexOf(auditor);
@@ -181,11 +190,12 @@ public class AuditorBookieTest extends B
bs.remove(indexOfDownBookie);
bsConfs.remove(indexOfDownBookie);
tmpDirs.remove(indexOfDownBookie);
+ auditorElectors.remove(addr);
startBookie(serverConfiguration);
// starting corresponding auditor elector
- String addr = StringUtils.addrToString(auditor.getLocalAddress());
+
LOG.debug("Performing Auditor Election:" + addr);
- auditorElectors.get(addr).doElection();
+ startAuditorElector(addr);
// waiting for new auditor to come
BookieServer newAuditor = waitForNewAuditor(auditor);
@@ -197,18 +207,46 @@ public class AuditorBookieTest extends B
.getPort());
}
- private void startAuditorElectors() throws UnavailableException {
+ /**
+ * Test that, if an auditor looses its ZK connection/session
+ * it will shutdown.
+ */
+ @Test
+ public void testAuditorZKSessionLoss() throws Exception {
+ stopZKCluster();
+ for (AuditorElector e : auditorElectors.values()) {
+ for (int i = 0; i < 10; i++) { // give it 10 seconds to shutdown
+ if (!e.isRunning()) {
+ break;
+ }
+
+ Thread.sleep(1000);
+ }
+ assertFalse("AuditorElector should have shutdown", e.isRunning());
+ }
+ }
+
+ private void startAuditorElector(String addr) throws Exception {
+ ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(10000);
+ ZooKeeper zk = ZkUtils.createConnectedZookeeperClient(
+ zkUtil.getZooKeeperConnectString(), w);
+ zkClients.add(zk);
+
+ AuditorElector auditorElector = new AuditorElector(addr,
+ baseClientConf, zk);
+ auditorElectors.put(addr, auditorElector);
+ auditorElector.start();
+ LOG.debug("Starting Auditor Elector");
+ }
+
+ private void startAuditorElectors() throws Exception {
for (BookieServer bserver : bs) {
String addr = StringUtils.addrToString(bserver.getLocalAddress());
- AuditorElector auditorElector = new AuditorElector(addr,
- baseClientConf, zkc);
- auditorElectors.put(addr, auditorElector);
- auditorElector.doElection();
- LOG.debug("Starting Auditor Elector");
+ startAuditorElector(addr);
}
}
- private void stopAuditorElectors() {
+ private void stopAuditorElectors() throws Exception {
for (AuditorElector auditorElector : auditorElectors.values()) {
auditorElector.shutdown();
LOG.debug("Stopping Auditor Elector!");
@@ -237,7 +275,7 @@ public class AuditorBookieTest extends B
return auditors;
}
- private void shudownBookie(BookieServer bkServer) {
+ private void shutdownBookie(BookieServer bkServer) throws Exception {
String addr = StringUtils.addrToString(bkServer.getLocalAddress());
LOG.debug("Shutting down bookie:" + addr);
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java?rev=1423409&r1=1423408&r2=1423409&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java Tue Dec 18 11:00:49 2012
@@ -23,6 +23,7 @@ package org.apache.bookkeeper.replicatio
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -74,7 +75,7 @@ public class AuditorLedgerCheckerTest ex
private HashMap<String, AuditorElector> auditorElectors = new HashMap<String, AuditorElector>();
private LedgerUnderreplicationManager urLedgerMgr;
private Set<Long> urLedgerList;
- private Map<Long, String> urLedgerData;
+
private List<Long> ledgerList;
public AuditorLedgerCheckerTest(String ledgerManagerFactoryClass)
@@ -97,28 +98,27 @@ public class AuditorLedgerCheckerTest ex
startAuditorElectors();
rng = new Random(System.currentTimeMillis()); // Initialize the Random
urLedgerList = new HashSet<Long>();
- urLedgerData = new HashMap<Long, String>();
ledgerList = new ArrayList<Long>(2);
}
@Override
public void tearDown() throws Exception {
- super.tearDown();
stopAuditorElectors();
+ super.tearDown();
}
- private void startAuditorElectors() throws UnavailableException {
+ private void startAuditorElectors() throws Exception {
for (BookieServer bserver : bs) {
String addr = StringUtils.addrToString(bserver.getLocalAddress());
AuditorElector auditorElector = new AuditorElector(addr,
baseClientConf, zkc);
auditorElectors.put(addr, auditorElector);
- auditorElector.doElection();
+ auditorElector.start();
LOG.debug("Starting Auditor Elector");
}
}
- private void stopAuditorElectors() {
+ private void stopAuditorElectors() throws Exception {
for (AuditorElector auditorElector : auditorElectors.values()) {
auditorElector.shutdown();
LOG.debug("Stopping Auditor Elector!");
@@ -145,7 +145,7 @@ public class AuditorLedgerCheckerTest ex
// grace period for publishing the bk-ledger
LOG.debug("Waiting for ledgers to be marked as under replicated");
underReplicaLatch.await(5, TimeUnit.SECONDS);
-
+ Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList);
assertEquals("Missed identifying under replicated ledgers", 1,
urLedgerList.size());
@@ -191,6 +191,7 @@ public class AuditorLedgerCheckerTest ex
// grace period for publishing the bk-ledger
LOG.debug("Waiting for ledgers to be marked as under replicated");
underReplicaLatch.await(5, TimeUnit.SECONDS);
+ Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList);
assertEquals("Missed identifying under replicated ledgers", 2,
urLedgerList.size());
@@ -225,8 +226,7 @@ public class AuditorLedgerCheckerTest ex
// failing first bookie
shutdownBookie(bs.size() - 1);
// simulate re-replication
- doLedgerRereplication(lh1.getId());
- doLedgerRereplication(lh2.getId());
+ doLedgerRereplication(lh1.getId(), lh2.getId());
// failing another bookie
CountDownLatch underReplicaLatch = registerUrLedgerWatcher(ledgerList
@@ -236,6 +236,7 @@ public class AuditorLedgerCheckerTest ex
// grace period for publishing the bk-ledger
LOG.debug("Waiting for ledgers to be marked as under replicated");
underReplicaLatch.await(5, TimeUnit.SECONDS);
+ Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList);
assertEquals("Missed identifying under replicated ledgers", 2,
urLedgerList.size());
@@ -275,7 +276,7 @@ public class AuditorLedgerCheckerTest ex
// enabling ledger replication
urLedgerMgr.enableLedgerReplication();
- assertTrue("Ledger replication is not disabled!", urReplicaLatch.await(
+ assertTrue("Ledger replication is not enabled!", urReplicaLatch.await(
5, TimeUnit.SECONDS));
}
@@ -304,19 +305,21 @@ public class AuditorLedgerCheckerTest ex
final CountDownLatch underReplicaLatch = new CountDownLatch(count);
for (Long ledgerId : ledgerList) {
Watcher urLedgerWatcher = new ChildWatcher(underReplicaLatch);
- String znode = String.format("%s/urL%010d", getParentZnodePath(
- UNDERREPLICATED_PATH, ledgerId), ledgerId);
+ String znode = ZkLedgerUnderreplicationManager.getUrLedgerZnode(UNDERREPLICATED_PATH,
+ ledgerId);
zkc.exists(znode, urLedgerWatcher);
}
return underReplicaLatch;
}
- private void doLedgerRereplication(long ledgerId)
+ private void doLedgerRereplication(Long... ledgerIds)
throws UnavailableException {
- urLedgerMgr.getLedgerToRereplicate();
- urLedgerMgr.markLedgerReplicated(ledgerId);
- urLedgerMgr.releaseUnderreplicatedLedger(ledgerId);
- urLedgerData.clear();
+ for (int i = 0; i < ledgerIds.length; i++) {
+ long lid = urLedgerMgr.getLedgerToRereplicate();
+ assertTrue("Received unexpected ledgerid", Arrays.asList(ledgerIds).contains(lid));
+ urLedgerMgr.markLedgerReplicated(lid);
+ urLedgerMgr.releaseUnderreplicatedLedger(lid);
+ }
}
private String shutdownBookie(int bkShutdownIndex) throws IOException,
@@ -350,14 +353,16 @@ public class AuditorLedgerCheckerTest ex
}
}
- private String getParentZnodePath(String base, long ledgerId) {
- String subdir1 = String.format("%04x", ledgerId >> 48 & 0xffff);
- String subdir2 = String.format("%04x", ledgerId >> 32 & 0xffff);
- String subdir3 = String.format("%04x", ledgerId >> 16 & 0xffff);
- String subdir4 = String.format("%04x", ledgerId & 0xffff);
-
- return String.format("%s/%s/%s/%s/%s", base, subdir1, subdir2, subdir3,
- subdir4);
+ private Map<Long, String> getUrLedgerData(Set<Long> urLedgerList)
+ throws KeeperException, InterruptedException {
+ Map<Long, String> urLedgerData = new HashMap<Long, String>();
+ for (Long ledgerId : urLedgerList) {
+ String znode = ZkLedgerUnderreplicationManager.getUrLedgerZnode(UNDERREPLICATED_PATH,
+ ledgerId);
+ byte[] data = zkc.getData(znode, false, null);
+ urLedgerData.put(ledgerId, new String(data));
+ }
+ return urLedgerData;
}
private class ChildWatcher implements Watcher {
@@ -369,21 +374,11 @@ public class AuditorLedgerCheckerTest ex
@Override
public void process(WatchedEvent event) {
- LOG.debug("Recieved notification for the ledger path : "
+ LOG.info("Received notification for the ledger path : "
+ event.getPath());
for (Long ledgerId : ledgerList) {
if (event.getPath().contains(ledgerId + "")) {
urLedgerList.add(Long.valueOf(ledgerId));
- try {
- byte[] data = zkc.getData(event.getPath(), this, null);
- urLedgerData.put(ledgerId, new String(data));
- } catch (KeeperException e) {
- LOG.error("Exception while reading data from znode :"
- + event.getPath());
- } catch (InterruptedException e) {
- LOG.error("Exception while reading data from znode :"
- + event.getPath());
- }
}
}
LOG.debug("Count down and waiting for next notification");
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java?rev=1423409&r1=1423408&r2=1423409&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java Tue Dec 18 11:00:49 2012
@@ -46,6 +46,7 @@ import java.util.concurrent.atomic.Atomi
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
import org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat;
import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
@@ -741,19 +742,8 @@ public class TestLedgerUnderreplicationM
}
- private String getParentZnodePath(String base, long ledgerId) {
- String subdir1 = String.format("%04x", ledgerId >> 48 & 0xffff);
- String subdir2 = String.format("%04x", ledgerId >> 32 & 0xffff);
- String subdir3 = String.format("%04x", ledgerId >> 16 & 0xffff);
- String subdir4 = String.format("%04x", ledgerId & 0xffff);
-
- return String.format("%s/%s/%s/%s/%s", base, subdir1, subdir2, subdir3,
- subdir4);
- }
-
private String getUrLedgerZnode(long ledgerId) {
- return String.format("%s/urL%010d", getParentZnodePath(urLedgerPath,
- ledgerId), ledgerId);
+ return ZkLedgerUnderreplicationManager.getUrLedgerZnode(urLedgerPath, ledgerId);
}
private void takeLedgerAndRelease(final LedgerUnderreplicationManager m,
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java?rev=1423409&r1=1423408&r2=1423409&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java Tue Dec 18 11:00:49 2012
@@ -486,6 +486,39 @@ public class TestReplicationWorker exten
}
+ /**
+ * Test that the replication worker will shutdown if it lose its zookeeper session
+ */
+ @Test(timeout=30000)
+ public void testRWZKSessionLost() throws Exception {
+ ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(10000);
+ ZooKeeper zk = ZkUtils.createConnectedZookeeperClient(
+ zkUtil.getZooKeeperConnectString(), w);
+
+ try {
+ ReplicationWorker rw = new ReplicationWorker(zk, baseConf, getBookie(0));
+ rw.start();
+ for (int i = 0; i < 10; i++) {
+ if (rw.isRunning()) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ assertTrue("Replication worker should be running", rw.isRunning());
+ stopZKCluster();
+
+ for (int i = 0; i < 10; i++) {
+ if (!rw.isRunning()) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ assertFalse("Replication worker should have shut down", rw.isRunning());
+ } finally {
+ zk.close();
+ }
+ }
+
private void killAllBookies(LedgerHandle lh, InetSocketAddress excludeBK)
throws InterruptedException {
// Killing all bookies except newly replicated bookie
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java?rev=1423409&r1=1423408&r2=1423409&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java Tue Dec 18 11:00:49 2012
@@ -152,7 +152,7 @@ public abstract class BookKeeperClusterT
for (BookieServer server : bs) {
server.shutdown();
}
-
+ bs.clear();
for (File f : tmpDirs) {
FileUtils.deleteDirectory(f);
}