You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2014/06/02 11:54:04 UTC
svn commit: r1599145 - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/
bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/
bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/ ...
Author: ivank
Date: Mon Jun 2 09:54:03 2014
New Revision: 1599145
URL: http://svn.apache.org/r1599145
Log:
BOOKKEEPER-745: Fix for false reports of ledger unreplication during rolling restarts. (ivank)
Added:
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java
zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestCallbacks.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1599145&r1=1599144&r2=1599145&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Jun 2 09:54:03 2014
@@ -200,6 +200,8 @@ Trunk (unreleased changes)
BOOKKEEPER-751: Ensure all the bookkeeper callbacks not run under ledger handle lock (sijie via ivank)
+ BOOKKEEPER-745: Fix for false reports of ledger unreplication during rolling restarts. (ivank)
+
hedwig-server:
BOOKKEEPER-601: readahead cache size isn't updated correctly (sijie via fpj)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java?rev=1599145&r1=1599144&r2=1599145&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java Mon Jun 2 09:54:03 2014
@@ -38,6 +38,7 @@ import org.apache.zookeeper.KeeperExcept
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.ZooDefs.Ids;
+import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.TextFormat;
import com.google.common.base.Joiner;
import static com.google.common.base.Charsets.UTF_8;
@@ -210,6 +211,15 @@ public class ZkLedgerUnderreplicationMan
return getUrLedgerZnode(urLedgerPath, ledgerId);
}
+ @VisibleForTesting
+ public UnderreplicatedLedgerFormat getLedgerUnreplicationInfo(long ledgerId)
+ throws KeeperException, TextFormat.ParseException, InterruptedException {
+ String znode = getUrLedgerZnode(ledgerId);
+ UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder();
+ byte[] data = zkc.getData(znode, false, null);
+ TextFormat.merge(new String(data, UTF_8), builder);
+ return builder.build();
+ }
@Override
public void markLedgerUnderreplicated(long ledgerId, String missingReplica)
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java?rev=1599145&r1=1599144&r2=1599145&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java Mon Jun 2 09:54:03 2014
@@ -171,8 +171,8 @@ public class Auditor implements BookiesL
if (lostBookies.size() > 0) {
knownBookies.removeAll(lostBookies);
- Map<String, Set<Long>> ledgerDetails = generateBookie2LedgersIndex();
- handleLostBookies(lostBookies, ledgerDetails);
+
+ auditBookies();
}
} catch (BKException bke) {
LOG.error("Exception getting bookie list", bke);
@@ -183,6 +183,8 @@ public class Auditor implements BookiesL
LOG.error("Exception while watching available bookies", bke);
} catch (UnavailableException ue) {
LOG.error("Exception while watching available bookies", ue);
+ } catch (KeeperException ke) {
+ LOG.error("Exception reading bookie list", ke);
}
}
});
@@ -233,37 +235,21 @@ public class Auditor implements BookiesL
} else {
LOG.info("Periodic checking disabled");
}
-
- Runnable bookieCheck = new Runnable() {
- public void run() {
- try {
- knownBookies = getAvailableBookies();
- auditingBookies(knownBookies);
- } catch (BKException bke) {
- LOG.error("Exception getting bookie list", bke);
- submitShutdownTask();
- } catch (KeeperException ke) {
- LOG.error("Exception while watching available bookies", ke);
- submitShutdownTask();
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- LOG.error("Interrupted while watching available bookies ", ie);
- submitShutdownTask();
- } catch (BKAuditException bke) {
- LOG.error("Exception while watching available bookies", bke);
- submitShutdownTask();
- }
- }
- };
+ try {
+ knownBookies = getAvailableBookies();
+ } catch (BKException bke) {
+ LOG.error("Couldn't get bookie list, exiting", bke);
+ submitShutdownTask();
+ }
long bookieCheckInterval = conf.getAuditorPeriodicBookieCheckInterval();
if (bookieCheckInterval == 0) {
LOG.info("Auditor periodic bookie checking disabled, running once check now anyhow");
- executor.submit(bookieCheck);
+ executor.submit(BOOKIE_CHECK);
} else {
LOG.info("Auditor periodic bookie checking enabled"
+ " 'auditorPeriodicBookieCheckInterval' {} seconds", bookieCheckInterval);
- executor.scheduleAtFixedRate(bookieCheck, 0, bookieCheckInterval, TimeUnit.SECONDS);
+ executor.scheduleAtFixedRate(BOOKIE_CHECK, 0, bookieCheckInterval, TimeUnit.SECONDS);
}
}
}
@@ -294,11 +280,33 @@ public class Auditor implements BookiesL
}
@SuppressWarnings("unchecked")
- private void auditingBookies(List<String> availableBookies)
- throws BKAuditException, KeeperException, InterruptedException {
+ private void auditBookies()
+ throws BKAuditException, KeeperException,
+ InterruptedException, BKException {
+ try {
+ waitIfLedgerReplicationDisabled();
+ } catch (UnavailableException ue) {
+ LOG.error("Underreplication unavailable, skipping audit."
+ + "Will retry after a period");
+ return;
+ }
+ // put exit cases here
Map<String, Set<Long>> ledgerDetails = generateBookie2LedgersIndex();
+ try {
+ if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
+ // has been disabled while we were generating the index
+ // discard this run, and schedule a new one
+ executor.submit(BOOKIE_CHECK);
+ return;
+ }
+ } catch (UnavailableException ue) {
+ LOG.error("Underreplication unavailable, skipping audit."
+ + "Will retry after a period");
+ return;
+ }
+ List<String> availableBookies = getAvailableBookies();
// find lost bookies
Set<String> knownBookies = ledgerDetails.keySet();
Collection<String> lostBookies = CollectionUtils.subtract(knownBookies,
@@ -523,4 +531,26 @@ public class Auditor implements BookiesL
public boolean isRunning() {
return !executor.isShutdown();
}
+
+ private final Runnable BOOKIE_CHECK = new Runnable() {
+ public void run() {
+ try {
+ auditBookies();
+ } catch (BKException bke) {
+ LOG.error("Couldn't get bookie list, exiting", bke);
+ submitShutdownTask();
+ } catch (KeeperException ke) {
+ LOG.error("Exception while watching available bookies", ke);
+ submitShutdownTask();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ LOG.error("Interrupted while watching available bookies ", ie);
+ submitShutdownTask();
+ } catch (BKAuditException bke) {
+ LOG.error("Exception while watching available bookies", bke);
+ submitShutdownTask();
+ }
+ }
+ };
+
}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java?rev=1599145&r1=1599144&r2=1599145&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java Mon Jun 2 09:54:03 2014
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.io.Serializable;
+import java.io.IOException;
import org.apache.bookkeeper.proto.DataFormats.AuditorVoteFormat;
import com.google.common.annotations.VisibleForTesting;
@@ -32,6 +33,7 @@ import java.util.concurrent.ExecutorServ
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.util.BookKeeperConstants;
@@ -68,6 +70,7 @@ public class AuditorElector {
private static final String VOTE_PREFIX = "V_";
// Represents path Separator
private static final String PATH_SEPARATOR = "/";
+ private static final String ELECTION_ZNODE = "auditorelection";
// Represents urLedger path in zk
private final String basePath;
// Represents auditor election path in zk
@@ -102,7 +105,7 @@ public class AuditorElector {
this.zkc = zkc;
basePath = conf.getZkLedgersRootPath() + '/'
+ BookKeeperConstants.UNDER_REPLICATION_NODE;
- electionPath = basePath + "/auditorelection";
+ electionPath = basePath + '/' + ELECTION_ZNODE;
createElectorPath();
executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
@Override
@@ -275,6 +278,31 @@ public class AuditorElector {
}
/**
+ * Query zookeeper for the currently elected auditor
+ * @return the bookie id of the current auditor
+ */
+ public static BookieSocketAddress getCurrentAuditor(ServerConfiguration conf, ZooKeeper zk)
+ throws KeeperException, InterruptedException, IOException {
+ String electionRoot = conf.getZkLedgersRootPath() + '/'
+ + BookKeeperConstants.UNDER_REPLICATION_NODE + '/' + ELECTION_ZNODE;
+
+ List<String> children = zk.getChildren(electionRoot, false);
+ Collections.sort(children, new AuditorElector.ElectionComparator());
+ if (children.size() < 1) {
+ return null;
+ }
+ String ledger = electionRoot + "/" + children.get(AUDITOR_INDEX);
+ byte[] data = zk.getData(ledger, false, null);
+
+ AuditorVoteFormat.Builder builder = AuditorVoteFormat.newBuilder();
+ TextFormat.merge(new String(data, UTF_8), builder);
+ AuditorVoteFormat v = builder.build();
+ String[] parts = v.getBookieId().split(":");
+ return new BookieSocketAddress(parts[0],
+ Integer.valueOf(parts[1]));
+ }
+
+ /**
* Shutting down AuditorElector
*/
public void shutdown() throws InterruptedException {
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java?rev=1599145&r1=1599144&r2=1599145&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java Mon Jun 2 09:54:03 2014
@@ -39,9 +39,10 @@ import org.apache.bookkeeper.client.BKEx
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+
import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat;
import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.test.MultiLedgerManagerTestCase;
@@ -74,7 +75,7 @@ public class AuditorLedgerCheckerTest ex
.getZkLedgersRootPath()
+ "/underreplication/ledgers";
private HashMap<String, AuditorElector> auditorElectors = new HashMap<String, AuditorElector>();
- private LedgerUnderreplicationManager urLedgerMgr;
+ private ZkLedgerUnderreplicationManager urLedgerMgr;
private Set<Long> urLedgerList;
private List<Long> ledgerList;
@@ -170,43 +171,19 @@ public class AuditorLedgerCheckerTest ex
@Test(timeout=60000)
public void testRestartBookie() throws Exception {
LedgerHandle lh1 = createAndAddEntriesToLedger();
- ledgerList.add(lh1.getId());
LedgerHandle lh2 = createAndAddEntriesToLedger();
- ledgerList.add(lh2.getId());
- LOG.debug("Created following ledgers : " + ledgerList);
- int count = ledgerList.size();
- final CountDownLatch underReplicaLatch = registerUrLedgerWatcher(count);
+ LOG.debug("Created following ledgers : {}, {}", lh1, lh2);
int bkShutdownIndex = bs.size() - 1;
ServerConfiguration bookieConf1 = bsConfs.get(bkShutdownIndex);
String shutdownBookie = shutdownBookie(bkShutdownIndex);
- // restart the failed bookie and simulate previously listed ledgers are
- // rereplicated
+ // restart the failed bookie
bs.add(startBookie(bookieConf1));
- // grace period for publishing the bk-ledger
- LOG.debug("Waiting for ledgers to be marked as under replicated");
- assertTrue("latch should have completed", underReplicaLatch.await(5, TimeUnit.SECONDS));
- Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList);
-
- assertEquals("Missed identifying under replicated ledgers", 2,
- urLedgerList.size());
-
- /*
- * Sample data format present in the under replicated ledger path
- *
- * {4=replica: "10.18.89.153:5002", 5=replica: "10.18.89.153:5003"}
- */
- for (Long ledgerId : ledgerList) {
- assertTrue("Ledger is not marked as underreplicated:" + ledgerId,
- urLedgerList.contains(ledgerId));
- String data = urLedgerData.get(ledgerId);
- assertTrue("Bookie " + shutdownBookie
- + " is not listed in the ledger as missing " + data, data
- .contains(shutdownBookie));
- }
+ waitForLedgerMissingReplicas(lh1.getId(), 10, shutdownBookie);
+ waitForLedgerMissingReplicas(lh2.getId(), 10, shutdownBookie);
}
/**
@@ -216,41 +193,20 @@ public class AuditorLedgerCheckerTest ex
@Test(timeout=60000)
public void testMultipleBookieFailures() throws Exception {
LedgerHandle lh1 = createAndAddEntriesToLedger();
- ledgerList.add(lh1.getId());
- LedgerHandle lh2 = createAndAddEntriesToLedger();
- ledgerList.add(lh2.getId());
- LOG.debug("Created following ledgers : " + ledgerList);
// failing first bookie
shutdownBookie(bs.size() - 1);
+
// simulate re-replication
- doLedgerRereplication(lh1.getId(), lh2.getId());
+ doLedgerRereplication(lh1.getId());
// failing another bookie
- CountDownLatch underReplicaLatch = registerUrLedgerWatcher(ledgerList
- .size());
String shutdownBookie = shutdownBookie(bs.size() - 1);
// grace period for publishing the bk-ledger
LOG.debug("Waiting for ledgers to be marked as under replicated");
- underReplicaLatch.await(5, TimeUnit.SECONDS);
- Map<Long, String> urLedgerData = getUrLedgerData(urLedgerList);
-
- assertEquals("Missed identifying under replicated ledgers", 2,
- urLedgerList.size());
-
- /*
- * Sample data format present in the under replicated ledger path
- * {4=replica: "10.18.89.153:5002", 5=replica: "10.18.89.153:5003"}
- */
- for (Long ledgerId : ledgerList) {
- assertTrue("Ledger is not marked as underreplicated:" + ledgerId,
- urLedgerList.contains(ledgerId));
- String data = urLedgerData.get(ledgerId);
- assertTrue("Bookie " + shutdownBookie
- + " is not listed in the ledger as missing " + data, data
- .contains(shutdownBookie));
- }
+ assertTrue("Ledger should be missing second replica",
+ waitForLedgerMissingReplicas(lh1.getId(), 10, shutdownBookie));
}
@Test(timeout = 30000)
@@ -321,6 +277,29 @@ public class AuditorLedgerCheckerTest ex
assertFalse("latch should not have completed", underReplicaLatch.await(5, TimeUnit.SECONDS));
}
+ /**
+ * Wait for ledger to be underreplicated, and to be missing all replicas specified
+ */
+ private boolean waitForLedgerMissingReplicas(Long ledgerId, long secondsToWait, String... replicas)
+ throws Exception {
+ for (int i = 0; i < secondsToWait; i++) {
+ try {
+ UnderreplicatedLedgerFormat data = urLedgerMgr.getLedgerUnreplicationInfo(ledgerId);
+ boolean all = true;
+ for (String r : replicas) {
+ all = all && data.getReplicaList().contains(r);
+ }
+ if (all) {
+ return true;
+ }
+ } catch (Exception e) {
+ // may not find node
+ }
+ Thread.sleep(1000);
+ }
+ return false;
+ }
+
private CountDownLatch registerUrLedgerWatcher(int count)
throws KeeperException, InterruptedException {
final CountDownLatch underReplicaLatch = new CountDownLatch(count);
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java?rev=1599145&r1=1599144&r2=1599145&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java Mon Jun 2 09:54:03 2014
@@ -79,7 +79,8 @@ public class AuditorPeriodicBookieCheckT
auditorZookeeper = ZkUtils.createConnectedZookeeperClient(
zkUtil.getZooKeeperConnectString(), w);
- auditorElector = new AuditorElector(addr, conf, auditorZookeeper);
+ auditorElector = new AuditorElector(addr, conf,
+ auditorZookeeper);
auditorElector.start();
}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java?rev=1599145&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java Mon Jun 2 09:54:03 2014
@@ -0,0 +1,73 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.replication;
+
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.test.TestCallbacks;
+
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+
+import org.junit.Test;
+
+/**
+ * Test auditor behaviours during a rolling restart
+ */
+public class AuditorRollingRestartTest extends BookKeeperClusterTestCase {
+
+ public AuditorRollingRestartTest() {
+ super(3);
+ // run the daemon within the bookie
+ baseConf.setAutoRecoveryDaemonEnabled(true);
+ }
+
+ /**
+ * Test no auditing during restart if disabled
+ */
+ @Test(timeout=600000) // 10 minutes
+ public void testAuditingDuringRollingRestart() throws Exception {
+ LedgerManagerFactory mFactory = LedgerManagerFactory.newLedgerManagerFactory(bsConfs.get(0), zkc);
+ final LedgerUnderreplicationManager underReplicationManager = mFactory.newLedgerUnderreplicationManager();
+
+ LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32, "passwd".getBytes());
+ for (int i = 0; i < 10; i++) {
+ lh.asyncAddEntry("foobar".getBytes(), new TestCallbacks.AddCallbackFuture(), null);
+ }
+ lh.addEntry("foobar".getBytes());
+ lh.close();
+
+ assertEquals("shouldn't be anything under replicated",
+ underReplicationManager.pollLedgerToRereplicate(), -1);
+ underReplicationManager.disableLedgerReplication();
+
+ BookieSocketAddress auditor = AuditorElector.getCurrentAuditor(baseConf, zkc);
+ ServerConfiguration conf = killBookie(auditor);
+ Thread.sleep(2000);
+ startBookie(conf);
+ Thread.sleep(2000); // give it time to run
+ assertEquals("shouldn't be anything under replicated", -1,
+ underReplicationManager.pollLedgerToRereplicate());
+ }
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestCallbacks.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestCallbacks.java?rev=1599145&r1=1599144&r2=1599145&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestCallbacks.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestCallbacks.java Mon Jun 2 09:54:03 2014
@@ -20,7 +20,9 @@
*/
package org.apache.bookkeeper.test;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import com.google.common.util.concurrent.AbstractFuture;
@@ -39,5 +41,17 @@ public class TestCallbacks {
}
}
}
+
+ public static class AddCallbackFuture
+ extends AbstractFuture<Long> implements AddCallback {
+ @Override
+ public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+ if (rc != BKException.Code.OK) {
+ setException(BKException.create(rc));
+ } else {
+ set(entryId);
+ }
+ }
+ }
}