You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/04/27 09:55:27 UTC
bookkeeper git commit: BOOKKEEPER-895: GC ledgers that are no longer
a part of the ensemble
Repository: bookkeeper
Updated Branches:
refs/heads/master 40af841a3 -> dd575a16d
BOOKKEEPER-895: GC ledgers that are no longer a part of the ensemble
Author: Siddharth Boobna <sb...@yahoo-inc.com>
Reviewers: Matteo Merli <mm...@apache.org>, Guo Sijie <si...@apache.org>
Closes #25 from sboobna/BOOKKEEPER-895
Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/dd575a16
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/dd575a16
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/dd575a16
Branch: refs/heads/master
Commit: dd575a16decbeaab1f230db271bc400d7e434216
Parents: 40af841
Author: Siddharth Boobna <sb...@yahoo-inc.com>
Authored: Wed Apr 27 00:55:19 2016 -0700
Committer: Sijie Guo <si...@apache.org>
Committed: Wed Apr 27 00:55:19 2016 -0700
----------------------------------------------------------------------
bookkeeper-server/conf/bk_server.conf | 5 +
.../bookie/GarbageCollectorThread.java | 2 +-
.../bookie/ScanAndCompareGarbageCollector.java | 128 +++++++++-
.../bookkeeper/conf/ServerConfiguration.java | 32 ++-
.../meta/ZkLedgerUnderreplicationManager.java | 119 +++++----
.../bookkeeper/util/BookKeeperConstants.java | 1 +
.../bookie/TestGcOverreplicatedLedger.java | 240 +++++++++++++++++++
.../apache/bookkeeper/meta/GcLedgersTest.java | 119 +--------
.../bookkeeper/meta/LedgerManagerTestCase.java | 118 ++++++++-
.../test/BookKeeperClusterTestCase.java | 17 ++
10 files changed, 619 insertions(+), 162 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/dd575a16/bookkeeper-server/conf/bk_server.conf
----------------------------------------------------------------------
diff --git a/bookkeeper-server/conf/bk_server.conf b/bookkeeper-server/conf/bk_server.conf
index f73e633..b3f1637 100644
--- a/bookkeeper-server/conf/bk_server.conf
+++ b/bookkeeper-server/conf/bk_server.conf
@@ -162,6 +162,11 @@ ledgerDirectories=/tmp/bk-data
# interval if there is enough disk capacity.
# gcWaitTime=1000
+# How long the interval to trigger next garbage collection of overreplicated
+# ledgers, in milliseconds [Default: 1 day]. This should not be run very frequently since we read
+# the metadata for all the ledgers on the bookie from zk
+# gcOverreplicatedLedgerWaitTime=86400000
+
# How long the interval to flush ledger index pages to disk, in milliseconds
# Flushing index files will introduce much random disk I/O.
# If separating journal dir and ledger dirs each on different devices,
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/dd575a16/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index 4d7da25..2821ec8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -244,7 +244,7 @@ public class GarbageCollectorThread extends BookieThread {
}
};
- this.garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, ledgerStorage);
+ this.garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, ledgerStorage, conf);
// compaction parameters
minorCompactionThreshold = conf.getMinorCompactionThreshold();
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/dd575a16/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java
index 2b4e3c0..05cd958 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java
@@ -21,12 +21,27 @@
package org.apache.bookkeeper.bookie;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.NavigableSet;
import java.util.Set;
+import java.util.SortedMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
+import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,12 +64,32 @@ import com.google.common.collect.Sets;
public class ScanAndCompareGarbageCollector implements GarbageCollector{
static final Logger LOG = LoggerFactory.getLogger(ScanAndCompareGarbageCollector.class);
+ static final int MAX_CONCURRENT_ZK_REQUESTS = 1000;
+
private final LedgerManager ledgerManager;
private final CompactableLedgerStorage ledgerStorage;
-
- public ScanAndCompareGarbageCollector(LedgerManager ledgerManager, CompactableLedgerStorage ledgerStorage) {
+ private final ServerConfiguration conf;
+ private final BookieSocketAddress selfBookieAddress;
+ private ZooKeeper zk = null;
+ private boolean enableGcOverReplicatedLedger;
+ private final long gcOverReplicatedLedgerIntervalMillis;
+ private long lastOverReplicatedLedgerGcTimeMillis;
+ private final String zkLedgersRootPath;
+
+ public ScanAndCompareGarbageCollector(LedgerManager ledgerManager, CompactableLedgerStorage ledgerStorage,
+ ServerConfiguration conf) throws IOException {
this.ledgerManager = ledgerManager;
this.ledgerStorage = ledgerStorage;
+ this.conf = conf;
+ this.selfBookieAddress = Bookie.getBookieAddress(conf);
+ this.gcOverReplicatedLedgerIntervalMillis = conf.getGcOverreplicatedLedgerWaitTimeMillis();
+ this.lastOverReplicatedLedgerGcTimeMillis = MathUtils.now();
+ if (gcOverReplicatedLedgerIntervalMillis > 0) {
+ this.enableGcOverReplicatedLedger = true;
+ }
+ this.zkLedgersRootPath = conf.getZkLedgersRootPath();
+ LOG.info("Over Replicated Ledger Deletion : enabled=" + enableGcOverReplicatedLedger + ", interval="
+ + gcOverReplicatedLedgerIntervalMillis);
}
@Override
@@ -75,6 +110,22 @@ public class ScanAndCompareGarbageCollector implements GarbageCollector{
long lastEnd = -1;
+ long curTime = MathUtils.now();
+ boolean checkOverreplicatedLedgers = (enableGcOverReplicatedLedger && curTime
+ - lastOverReplicatedLedgerGcTimeMillis > gcOverReplicatedLedgerIntervalMillis);
+ if (checkOverreplicatedLedgers) {
+ zk = ZooKeeperClient.newBuilder().connectString(conf.getZkServers())
+ .sessionTimeoutMs(conf.getZkTimeout()).build();
+ // remove all the overreplicated ledgers from the local bookie
+ Set<Long> overReplicatedLedgers = removeOverReplicatedledgers(bkActiveLedgers, garbageCleaner);
+ if (overReplicatedLedgers.isEmpty()) {
+ LOG.info("No over-replicated ledgers found.");
+ } else {
+ LOG.info("Removed over-replicated ledgers: {}", overReplicatedLedgers);
+ }
+ lastOverReplicatedLedgerGcTimeMillis = MathUtils.now();
+ }
+
while(ledgerRangeIterator.hasNext()) {
LedgerRange lRange = ledgerRangeIterator.next();
@@ -100,8 +151,79 @@ public class ScanAndCompareGarbageCollector implements GarbageCollector{
} catch (Exception e) {
// ignore exception, collecting garbage next time
LOG.warn("Exception when iterating over the metadata {}", e);
+ } finally {
+ if (zk != null) {
+ try {
+ zk.close();
+ } catch (InterruptedException e) {
+ LOG.error("Error closing zk session", e);
+ }
+ zk = null;
+ }
}
}
-}
+ private Set<Long> removeOverReplicatedledgers(Set<Long> bkActiveledgers, final GarbageCleaner garbageCleaner)
+ throws InterruptedException, KeeperException {
+ final Set<Long> overReplicatedLedgers = Sets.newHashSet();
+ final Semaphore semaphore = new Semaphore(MAX_CONCURRENT_ZK_REQUESTS);
+ final CountDownLatch latch = new CountDownLatch(bkActiveledgers.size());
+ for (final Long ledgerId : bkActiveledgers) {
+ try {
+ // check if the ledger is being replicated already by the replication worker
+ if (ZkLedgerUnderreplicationManager.isLedgerBeingReplicated(zk, zkLedgersRootPath, ledgerId)) {
+ latch.countDown();
+ continue;
+ }
+ // we try to acquire the underreplicated ledger lock to not let the bookie replicate the ledger that is
+ // already being checked for deletion, since that might change the ledger ensemble to include the
+ // current bookie again and, in that case, we cannot remove the ledger from local storage
+ ZkLedgerUnderreplicationManager.acquireUnderreplicatedLedgerLock(zk, zkLedgersRootPath, ledgerId);
+ semaphore.acquire();
+ ledgerManager.readLedgerMetadata(ledgerId, new GenericCallback<LedgerMetadata>() {
+
+ @Override
+ public void operationComplete(int rc, LedgerMetadata ledgerMetadata) {
+ if (rc == BKException.Code.OK) {
+ // do not delete a ledger that is not closed, since the ensemble might change again and
+ // include the current bookie while we are deleting it
+ if (!ledgerMetadata.isClosed()) {
+ release();
+ return;
+ }
+ SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles = ledgerMetadata.getEnsembles();
+ for (ArrayList<BookieSocketAddress> ensemble : ensembles.values()) {
+ // check if this bookie is supposed to have this ledger
+ if (ensemble.contains(selfBookieAddress)) {
+ release();
+ return;
+ }
+ }
+ // this bookie is not supposed to have this ledger, thus we can delete this ledger now
+ overReplicatedLedgers.add(ledgerId);
+ garbageCleaner.clean(ledgerId);
+ }
+ release();
+ }
+ private void release() {
+ semaphore.release();
+ latch.countDown();
+ try {
+ ZkLedgerUnderreplicationManager.releaseUnderreplicatedLedgerLock(zk, zkLedgersRootPath,
+ ledgerId);
+ } catch (Exception e) {
+ LOG.error("Error removing underreplicated lock for ledger {}", ledgerId, e);
+ }
+ }
+ });
+ } catch (Exception e) {
+ LOG.error("Exception when iterating through the ledgers to check for over-replication", e);
+ latch.countDown();
+ }
+ }
+ latch.await();
+ bkActiveledgers.removeAll(overReplicatedLedgers);
+ return overReplicatedLedgers;
+ }
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/dd575a16/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index c717cd8..7d9b697 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -19,8 +19,10 @@ package org.apache.bookkeeper.conf;
import java.io.File;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import com.google.common.annotations.Beta;
+
import org.apache.bookkeeper.stats.NullStatsProvider;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.bookkeeper.util.BookKeeperConstants;
@@ -51,6 +53,7 @@ public class ServerConfiguration extends AbstractConfiguration {
// Gc Parameters
protected final static String GC_WAIT_TIME = "gcWaitTime";
protected final static String IS_FORCE_GC_ALLOW_WHEN_NO_SPACE = "isForceGCAllowWhenNoSpace";
+ protected final static String GC_OVERREPLICATED_LEDGER_WAIT_TIME = "gcOverreplicatedLedgerWaitTime";
// Sync Parameters
protected final static String FLUSH_INTERVAL = "flushInterval";
// Bookie death watch interval
@@ -210,10 +213,37 @@ public class ServerConfiguration extends AbstractConfiguration {
}
/**
+ * Get wait time in millis for garbage collection of overreplicated ledgers
+ *
+ * @return gc wait time
+ */
+ public long getGcOverreplicatedLedgerWaitTimeMillis() {
+ return this.getLong(GC_OVERREPLICATED_LEDGER_WAIT_TIME, TimeUnit.DAYS.toMillis(1));
+ }
+
+ /**
+ * Set wait time for garbage collection of overreplicated ledgers. Default: 1 day
+ *
+ * A ledger can be overreplicated under the following circumstances:
+ * 1. The ledger with few entries has bk1 and bk2 as its ensemble.
+ * 2. bk1 crashes.
+ * 3. bk3 replicates the ledger from bk2 and updates the ensemble to bk2 and bk3.
+ * 4. bk1 comes back up.
+ * 5. Now there are 3 copies of the ledger.
+ *
+ * @param gcWaitTime
+ * @return server configuration
+ */
+ public ServerConfiguration setGcOverreplicatedLedgerWaitTime(long gcWaitTime, TimeUnit unit) {
+ this.setProperty(GC_OVERREPLICATED_LEDGER_WAIT_TIME, Long.toString(unit.toMillis(gcWaitTime)));
+ return this;
+ }
+
+ /**
* Get flush interval. Default value is 10 second. It isn't useful to decrease
* this value, since ledger storage only checkpoints when an entry logger file
* is rolled.
- *
+ *
* @return flush interval
*/
public int getFlushInterval() {
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/dd575a16/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
index a4600bc..c49c5a2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
@@ -18,51 +18,45 @@
package org.apache.bookkeeper.meta;
+import static com.google.common.base.Charsets.UTF_8;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.net.DNS;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat;
+import org.apache.bookkeeper.proto.DataFormats.LockDataFormat;
+import org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat;
import org.apache.bookkeeper.replication.ReplicationEnableCb;
import org.apache.bookkeeper.replication.ReplicationException;
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.ZkUtils;
-
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
-import org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat;
-import org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat;
-import org.apache.bookkeeper.proto.DataFormats.LockDataFormat;
-import org.apache.bookkeeper.conf.AbstractConfiguration;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.TextFormat;
import com.google.common.base.Joiner;
-import static com.google.common.base.Charsets.UTF_8;
-
-import java.net.UnknownHostException;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.Map;
-import java.util.List;
-import java.util.Collections;
-import java.util.Arrays;
-import java.util.Deque;
-import java.util.ArrayDeque;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.ArrayList;
-
-import java.util.regex.Pattern;
-import java.util.regex.Matcher;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.google.protobuf.TextFormat;
/**
* ZooKeeper implementation of underreplication manager.
@@ -84,6 +78,8 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
static final String LAYOUT="BASIC";
static final int LAYOUT_VERSION=1;
+ public static final byte[] LOCK_DATA = getLockData();
+
private static class Lock {
private final String lockZNode;
private final int ledgerZNodeVersion;
@@ -103,22 +99,32 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
private final String urLedgerPath;
private final String urLockPath;
private final String layoutZNode;
- private final LockDataFormat lockData;
private final ZooKeeper zkc;
public ZkLedgerUnderreplicationManager(AbstractConfiguration conf, ZooKeeper zkc)
throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
- basePath = conf.getZkLedgersRootPath() + '/'
- + BookKeeperConstants.UNDER_REPLICATION_NODE;
+ basePath = getBasePath(conf.getZkLedgersRootPath());
layoutZNode = basePath + '/' + BookKeeperConstants.LAYOUT_ZNODE;
urLedgerPath = basePath
+ BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH;
- urLockPath = basePath + "/locks";
+ urLockPath = basePath + '/' + BookKeeperConstants.UNDER_REPLICATION_LOCK;
idExtractionPattern = Pattern.compile("urL(\\d+)$");
this.zkc = zkc;
+ checkLayout();
+ }
+
+ public static String getBasePath(String rootPath) {
+ return String.format("%s/%s", rootPath, BookKeeperConstants.UNDER_REPLICATION_NODE);
+ }
+
+ public static String getUrLockPath(String rootPath) {
+ return String.format("%s/%s", getBasePath(rootPath), BookKeeperConstants.UNDER_REPLICATION_LOCK);
+ }
+
+ public static byte[] getLockData() {
LockDataFormat.Builder lockDataBuilder = LockDataFormat.newBuilder();
try {
lockDataBuilder.setBookieId(DNS.getDefaultHost("default"));
@@ -126,9 +132,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
// if we cant get the address, ignore. it's optional
// in the data structure in any case
}
- lockData = lockDataBuilder.build();
-
- checkLayout();
+ return TextFormat.printToString(lockDataBuilder.build()).getBytes(UTF_8);
}
private void checkLayout()
@@ -212,6 +216,10 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
return String.format("%s/urL%010d", getParentZnodePath(base, ledgerId), ledgerId);
}
+ public static String getUrLedgerLockZnode(String base, long ledgerId) {
+ return String.format("%s/urL%010d", base, ledgerId);
+ }
+
private String getUrLedgerZnode(long ledgerId) {
return getUrLedgerZnode(urLedgerPath, ledgerId);
}
@@ -399,8 +407,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
}
long ledgerId = getLedgerId(tryChild);
- zkc.create(lockPath, TextFormat.printToString(lockData).getBytes(UTF_8),
- Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ zkc.create(lockPath, LOCK_DATA, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
heldLocks.put(ledgerId, new Lock(lockPath, stat.getVersion()));
return ledgerId;
} catch (KeeperException.NodeExistsException nee) {
@@ -627,4 +634,32 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
"Interrupted while contacting zookeeper", ie);
}
}
+
+ /**
+ * Check whether the ledger is being replicated by any bookie
+ */
+ public static boolean isLedgerBeingReplicated(ZooKeeper zkc, String zkLedgersRootPath, long ledgerId)
+ throws KeeperException,
+ InterruptedException {
+ return zkc.exists(getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId), false) != null;
+ }
+
+ /**
+ * Acquire the underreplicated ledger lock
+ */
+ public static void acquireUnderreplicatedLedgerLock(ZooKeeper zkc, String zkLedgersRootPath, long ledgerId)
+ throws KeeperException, InterruptedException {
+ ZkUtils.createFullPathOptimistic(zkc, getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId),
+ LOCK_DATA, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ }
+
+ /**
+ * Release the underreplicated ledger lock if it exists
+ */
+ public static void releaseUnderreplicatedLedgerLock(ZooKeeper zkc, String zkLedgersRootPath, long ledgerId)
+ throws InterruptedException, KeeperException {
+ if (isLedgerBeingReplicated(zkc, zkLedgersRootPath, ledgerId)) {
+ zkc.delete(getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId), -1);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/dd575a16/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
index 2c2ba38..987de7a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
@@ -38,6 +38,7 @@ public class BookKeeperConstants {
public static final String AVAILABLE_NODE = "available";
public static final String COOKIE_NODE = "cookies";
public static final String UNDER_REPLICATION_NODE = "underreplication";
+ public static final String UNDER_REPLICATION_LOCK = "locks";
public static final String DISABLE_NODE = "disable";
public static final String DEFAULT_ZK_LEDGERS_ROOT_PATH = "/ledgers";
public static final String LAYOUT_ZNODE = "LAYOUT";
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/dd575a16/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestGcOverreplicatedLedger.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestGcOverreplicatedLedger.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestGcOverreplicatedLedger.java
new file mode 100644
index 0000000..5004817
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestGcOverreplicatedLedger.java
@@ -0,0 +1,240 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.bookkeeper.bookie.GarbageCollector.GarbageCleaner;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.FlatLedgerManagerFactory;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.LedgerManagerTestCase;
+import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.util.SnapshotMap;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Lists;
+
+@RunWith(Parameterized.class)
+public class TestGcOverreplicatedLedger extends LedgerManagerTestCase {
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ ledgerManager = ledgerManagerFactory.newLedgerManager();
+ activeLedgers = new SnapshotMap<Long, Boolean>();
+ }
+
+ public TestGcOverreplicatedLedger(Class<? extends LedgerManagerFactory> lmFactoryCls) {
+ super(lmFactoryCls, 3);
+ }
+
+ @Parameters
+ public static Collection<Object[]> configs() {
+ return Arrays.asList(new Object[][] { { FlatLedgerManagerFactory.class } });
+ }
+
+ @Test(timeout = 60000)
+ public void testGcOverreplicatedLedger() throws Exception {
+ LedgerHandle lh = bkc.createLedger(2, 2, DigestType.MAC, "".getBytes());
+ activeLedgers.put(lh.getId(), true);
+
+ final AtomicReference<LedgerMetadata> newLedgerMetadata = new AtomicReference<>(null);
+ final CountDownLatch latch = new CountDownLatch(1);
+ ledgerManager.readLedgerMetadata(lh.getId(), new GenericCallback<LedgerMetadata>() {
+
+ @Override
+ public void operationComplete(int rc, LedgerMetadata result) {
+ if (rc == BKException.Code.OK) {
+ newLedgerMetadata.set(result);
+ }
+ latch.countDown();
+ }
+ });
+ latch.await();
+ if (newLedgerMetadata.get() == null) {
+ Assert.fail("No ledger metadata found");
+ }
+ BookieSocketAddress bookieNotInEnsemble = getBookieNotInEnsemble(newLedgerMetadata.get());
+ ServerConfiguration bkConf = getBkConf(bookieNotInEnsemble);
+ bkConf.setGcOverreplicatedLedgerWaitTime(10, TimeUnit.MILLISECONDS);
+
+ lh.close();
+
+ final CompactableLedgerStorage mockLedgerStorage = new MockLedgerStorage();
+ final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, mockLedgerStorage,
+ bkConf);
+ Thread.sleep(bkConf.getGcOverreplicatedLedgerWaitTimeMillis() + 1);
+ garbageCollector.gc(new GarbageCleaner() {
+
+ @Override
+ public void clean(long ledgerId) {
+ try {
+ mockLedgerStorage.deleteLedger(ledgerId);
+ } catch (IOException e) {
+ e.printStackTrace();
+ return;
+ }
+ }
+ });
+
+ Assert.assertFalse(activeLedgers.containsKey(lh.getId()));
+ }
+
+ @Test(timeout = 60000)
+ public void testNoGcOfLedger() throws Exception {
+ LedgerHandle lh = bkc.createLedger(2, 2, DigestType.MAC, "".getBytes());
+ activeLedgers.put(lh.getId(), true);
+
+ final AtomicReference<LedgerMetadata> newLedgerMetadata = new AtomicReference<>(null);
+ final CountDownLatch latch = new CountDownLatch(1);
+ ledgerManager.readLedgerMetadata(lh.getId(), new GenericCallback<LedgerMetadata>() {
+
+ @Override
+ public void operationComplete(int rc, LedgerMetadata result) {
+ if (rc == BKException.Code.OK) {
+ newLedgerMetadata.set(result);
+ }
+ latch.countDown();
+ }
+ });
+ latch.await();
+ if (newLedgerMetadata.get() == null) {
+ Assert.fail("No ledger metadata found");
+ }
+ BookieSocketAddress address = null;
+ SortedMap<Long, ArrayList<BookieSocketAddress>> ensembleMap = newLedgerMetadata.get().getEnsembles();
+ for (ArrayList<BookieSocketAddress> ensemble : ensembleMap.values()) {
+ address = ensemble.get(0);
+ }
+ ServerConfiguration bkConf = getBkConf(address);
+ bkConf.setGcOverreplicatedLedgerWaitTime(10, TimeUnit.MILLISECONDS);
+
+ lh.close();
+
+ final CompactableLedgerStorage mockLedgerStorage = new MockLedgerStorage();
+ final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, mockLedgerStorage,
+ bkConf);
+ Thread.sleep(bkConf.getGcOverreplicatedLedgerWaitTimeMillis() + 1);
+ garbageCollector.gc(new GarbageCleaner() {
+
+ @Override
+ public void clean(long ledgerId) {
+ try {
+ mockLedgerStorage.deleteLedger(ledgerId);
+ } catch (IOException e) {
+ e.printStackTrace();
+ return;
+ }
+ }
+ });
+
+ Assert.assertTrue(activeLedgers.containsKey(lh.getId()));
+ }
+
+ @Test(timeout = 60000)
+ public void testNoGcIfLedgerBeingReplicated() throws Exception {
+ LedgerHandle lh = bkc.createLedger(2, 2, DigestType.MAC, "".getBytes());
+ activeLedgers.put(lh.getId(), true);
+
+ final AtomicReference<LedgerMetadata> newLedgerMetadata = new AtomicReference<>(null);
+ final CountDownLatch latch = new CountDownLatch(1);
+ ledgerManager.readLedgerMetadata(lh.getId(), new GenericCallback<LedgerMetadata>() {
+
+ @Override
+ public void operationComplete(int rc, LedgerMetadata result) {
+ if (rc == BKException.Code.OK) {
+ newLedgerMetadata.set(result);
+ }
+ latch.countDown();
+ }
+ });
+ latch.await();
+ if (newLedgerMetadata.get() == null) {
+ Assert.fail("No ledger metadata found");
+ }
+ BookieSocketAddress bookieNotInEnsemble = getBookieNotInEnsemble(newLedgerMetadata.get());
+ ServerConfiguration bkConf = getBkConf(bookieNotInEnsemble);
+ bkConf.setGcOverreplicatedLedgerWaitTime(10, TimeUnit.MILLISECONDS);
+
+ lh.close();
+
+ ZkLedgerUnderreplicationManager.acquireUnderreplicatedLedgerLock(zkc, baseConf.getZkLedgersRootPath(),
+ lh.getId());
+
+ final CompactableLedgerStorage mockLedgerStorage = new MockLedgerStorage();
+ final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, mockLedgerStorage,
+ bkConf);
+ Thread.sleep(bkConf.getGcOverreplicatedLedgerWaitTimeMillis() + 1);
+ garbageCollector.gc(new GarbageCleaner() {
+
+ @Override
+ public void clean(long ledgerId) {
+ try {
+ mockLedgerStorage.deleteLedger(ledgerId);
+ } catch (IOException e) {
+ e.printStackTrace();
+ return;
+ }
+ }
+ });
+
+ Assert.assertTrue(activeLedgers.containsKey(lh.getId()));
+ }
+
+ private BookieSocketAddress getBookieNotInEnsemble(LedgerMetadata ledgerMetadata) throws UnknownHostException {
+ List<BookieSocketAddress> allAddresses = Lists.newArrayList();
+ for (BookieServer bk : bs) {
+ allAddresses.add(bk.getLocalAddress());
+ }
+ SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles = ledgerMetadata.getEnsembles();
+ for (ArrayList<BookieSocketAddress> fragmentEnsembles : ensembles.values()) {
+ for (BookieSocketAddress ensemble : fragmentEnsembles) {
+ allAddresses.remove(ensemble);
+ }
+ }
+ Assert.assertEquals(allAddresses.size(), 1);
+ return allAddresses.get(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/dd575a16/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index 3875ec4..d5866a5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -21,15 +21,18 @@
package org.apache.bookkeeper.meta;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
@@ -39,21 +42,12 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
-import org.apache.bookkeeper.bookie.EntryLocation;
-import org.apache.bookkeeper.bookie.CheckpointSource;
-import org.apache.bookkeeper.bookie.BookieException;
-import org.apache.bookkeeper.bookie.EntryLogger;
import org.apache.bookkeeper.bookie.GarbageCollector;
-import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.bookie.ScanAndCompareGarbageCollector;
-import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerMetadata;
-import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.jmx.BKMBeanInfo;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -62,8 +56,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.junit.Assert.*;
-
/**
* Test garbage collection ledgers in ledger manager
*/
@@ -175,7 +167,7 @@ public class GcLedgersTest extends LedgerManagerTestCase {
final CountDownLatch endLatch = new CountDownLatch(2);
final CompactableLedgerStorage mockLedgerStorage = new MockLedgerStorage();
final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(getLedgerManager(),
- mockLedgerStorage);
+ mockLedgerStorage, baseConf);
Thread gcThread = new Thread() {
@Override
public void run() {
@@ -246,7 +238,7 @@ public class GcLedgersTest extends LedgerManagerTestCase {
createLedgers(numLedgers, createdLedgers);
final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(getLedgerManager(),
- new MockLedgerStorage());
+ new MockLedgerStorage(), baseConf);
GarbageCollector.GarbageCleaner cleaner = new GarbageCollector.GarbageCleaner() {
@Override
public void clean(long ledgerId) {
@@ -282,7 +274,7 @@ public class GcLedgersTest extends LedgerManagerTestCase {
createLedgers(numLedgers, createdLedgers);
final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(getLedgerManager(),
- new MockLedgerStorage());
+ new MockLedgerStorage(), baseConf);
GarbageCollector.GarbageCleaner cleaner = new GarbageCollector.GarbageCleaner() {
@Override
public void clean(long ledgerId) {
@@ -309,97 +301,4 @@ public class GcLedgersTest extends LedgerManagerTestCase {
assertEquals("Should have cleaned something", 1, cleaned.size());
assertEquals("Should have cleaned first ledger" + first, (long)first, (long)cleaned.get(0));
}
-
- class MockLedgerStorage implements CompactableLedgerStorage {
-
- @Override
- public void initialize(ServerConfiguration conf, LedgerManager ledgerManager,
- LedgerDirsManager ledgerDirsManager,
- LedgerDirsManager indexDirsManager,
- CheckpointSource checkpointSource, StatsLogger statsLogger)
- throws IOException {}
-
- @Override
- public void start() {
- }
-
- @Override
- public void shutdown() throws InterruptedException {
- }
-
- @Override
- public boolean ledgerExists(long ledgerId) throws IOException {
- return false;
- }
-
- @Override
- public boolean setFenced(long ledgerId) throws IOException {
- return false;
- }
-
- @Override
- public boolean isFenced(long ledgerId) throws IOException {
- return false;
- }
-
- @Override
- public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
- }
-
- @Override
- public byte[] readMasterKey(long ledgerId) throws IOException, BookieException {
- return null;
- }
-
- @Override
- public long addEntry(ByteBuffer entry) throws IOException {
- return 0;
- }
-
- @Override
- public ByteBuffer getEntry(long ledgerId, long entryId) throws IOException {
- return null;
- }
-
- @Override
- public void flush() throws IOException {
- }
-
- @Override
- public Checkpoint checkpoint(Checkpoint checkpoint) throws IOException {
- return null;
- }
-
- @Override
- public void deleteLedger(long ledgerId) throws IOException {
- activeLedgers.remove(ledgerId);
- }
-
- @Override
- public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long lastLedgerId) {
- NavigableMap<Long, Boolean> bkActiveLedgersSnapshot = activeLedgers.snapshot();
- Map<Long, Boolean> subBkActiveLedgers = bkActiveLedgersSnapshot
- .subMap(firstLedgerId, true, lastLedgerId, false);
-
- return subBkActiveLedgers.keySet();
- }
-
- @Override
- public BKMBeanInfo getJMXBean() {
- return null;
- }
-
- @Override
- public EntryLogger getEntryLogger() {
- return null;
- }
-
- @Override
- public void updateEntriesLocations(Iterable<EntryLocation> locations) throws IOException {
- }
-
- @Override
- public void flushEntriesLocationsIndex() throws IOException {
- }
- }
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/dd575a16/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index dae61bc..1e7c9a6 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -22,9 +22,22 @@
package org.apache.bookkeeper.meta;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Map;
+import java.util.NavigableMap;
+import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
+import org.apache.bookkeeper.bookie.EntryLocation;
+import org.apache.bookkeeper.bookie.EntryLogger;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.jmx.BKMBeanInfo;
+import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.util.SnapshotMap;
import org.junit.After;
@@ -42,13 +55,17 @@ import org.slf4j.LoggerFactory;
public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
static final Logger LOG = LoggerFactory.getLogger(LedgerManagerTestCase.class);
- LedgerManagerFactory ledgerManagerFactory;
- LedgerManager ledgerManager = null;
- LedgerIdGenerator ledgerIdGenerator = null;
- SnapshotMap<Long, Boolean> activeLedgers = null;
+ protected LedgerManagerFactory ledgerManagerFactory;
+ protected LedgerManager ledgerManager = null;
+ protected LedgerIdGenerator ledgerIdGenerator = null;
+ protected SnapshotMap<Long, Boolean> activeLedgers = null;
public LedgerManagerTestCase(Class<? extends LedgerManagerFactory> lmFactoryCls) {
- super(0);
+ this(lmFactoryCls, 0);
+ }
+
+ public LedgerManagerTestCase(Class<? extends LedgerManagerFactory> lmFactoryCls, int numBookies) {
+ super(numBookies);
activeLedgers = new SnapshotMap<Long, Boolean>();
baseConf.setLedgerManagerFactoryClass(lmFactoryCls);
}
@@ -93,4 +110,95 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
super.tearDown();
}
+ public class MockLedgerStorage implements CompactableLedgerStorage {
+
+ @Override
+ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager,
+ LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
+ CheckpointSource checkpointSource, StatsLogger statsLogger) throws IOException {
+ }
+
+ @Override
+ public void start() {
+ }
+
+ @Override
+ public void shutdown() throws InterruptedException {
+ }
+
+ @Override
+ public boolean ledgerExists(long ledgerId) throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean setFenced(long ledgerId) throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean isFenced(long ledgerId) throws IOException {
+ return false;
+ }
+
+ @Override
+ public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
+ }
+
+ @Override
+ public byte[] readMasterKey(long ledgerId) throws IOException, BookieException {
+ return null;
+ }
+
+ @Override
+ public long addEntry(ByteBuffer entry) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public ByteBuffer getEntry(long ledgerId, long entryId) throws IOException {
+ return null;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ }
+
+ @Override
+ public Checkpoint checkpoint(Checkpoint checkpoint) throws IOException {
+ return null;
+ }
+
+ @Override
+ public void deleteLedger(long ledgerId) throws IOException {
+ activeLedgers.remove(ledgerId);
+ }
+
+ @Override
+ public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long lastLedgerId) {
+ NavigableMap<Long, Boolean> bkActiveLedgersSnapshot = activeLedgers.snapshot();
+ Map<Long, Boolean> subBkActiveLedgers = bkActiveLedgersSnapshot
+ .subMap(firstLedgerId, true, lastLedgerId, false);
+
+ return subBkActiveLedgers.keySet();
+ }
+
+ @Override
+ public BKMBeanInfo getJMXBean() {
+ return null;
+ }
+
+ @Override
+ public EntryLogger getEntryLogger() {
+ return null;
+ }
+
+ @Override
+ public void updateEntriesLocations(Iterable<EntryLocation> locations) throws IOException {
+ }
+
+ @Override
+ public void flushEntriesLocationsIndex() throws IOException {
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/dd575a16/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 28de0b2..efb8375 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -224,6 +224,23 @@ public abstract class BookKeeperClusterTestCase {
}
/**
+ * Get bookie configuration for bookie
+ */
+ public ServerConfiguration getBkConf(BookieSocketAddress addr) throws Exception {
+ int bkIndex = 0;
+ for (BookieServer server : bs) {
+ if (server.getLocalAddress().equals(addr)) {
+ break;
+ }
+ ++bkIndex;
+ }
+ if (bkIndex < bs.size()) {
+ return bsConfs.get(bkIndex);
+ }
+ return null;
+ }
+
+ /**
* Kill a bookie by its socket address. Also, stops the autorecovery process
* for the corresponding bookie server, if isAutoRecoveryEnabled is true.
*