You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/08/08 18:04:51 UTC
[bookkeeper] branch master updated: Record ctime for
underreplicated ledger mark time.
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new f4094c4 Record ctime for underreplicated ledger mark time.
f4094c4 is described below
commit f4094c4992d5b22630fa633085a00a9152b87ffe
Author: cguttapalem <cg...@salesforce.com>
AuthorDate: Wed Aug 8 11:04:43 2018 -0700
Record ctime for underreplicated ledger mark time.
Descriptions of the changes in this PR:
Enable the Auditor to use system time as underreplicated ledger mark time.
If this is enabled, Auditor will write a ctime field into the
underreplicated ledger znode. This would be useful in knowing when the
ledger is marked underreplicated, using BookieShell commands and also later
we can add alerts/metrics on under replicated ledgers which remained under
replicated for more than acceptable duration.
Author: cguttapalem <cg...@salesforce.com>
Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
This closes #1576 from reddycharan/urlctime
---
bookkeeper-proto/src/main/proto/DataFormats.proto | 1 +
.../org/apache/bookkeeper/bookie/BookieShell.java | 14 ++++--
.../apache/bookkeeper/client/BookKeeperAdmin.java | 6 +--
.../bookkeeper/conf/AbstractConfiguration.java | 29 ++++++++++++
.../meta/LedgerUnderreplicationManager.java | 12 ++---
.../bookkeeper/meta/UnderreplicatedLedger.java | 54 ++++++++++++++++++++++
.../meta/ZkLedgerUnderreplicationManager.java | 38 ++++++++-------
.../service/ListUnderReplicatedLedgerService.java | 12 ++---
.../bookkeeper/client/BookKeeperAdminTest.java | 43 +++++++++++++----
.../bookkeeper/client/BookieDecommissionTest.java | 17 +++----
conf/bk_server.conf | 4 ++
site/_data/config/bk_server.yaml | 2 +
12 files changed, 172 insertions(+), 60 deletions(-)
diff --git a/bookkeeper-proto/src/main/proto/DataFormats.proto b/bookkeeper-proto/src/main/proto/DataFormats.proto
index 823257c..92eaa5f 100644
--- a/bookkeeper-proto/src/main/proto/DataFormats.proto
+++ b/bookkeeper-proto/src/main/proto/DataFormats.proto
@@ -69,6 +69,7 @@ message LedgerRereplicationLayoutFormat {
message UnderreplicatedLedgerFormat {
repeated string replica = 1;
+ optional int64 ctime = 2;
}
/**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 102a441..5b4eff8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -96,6 +96,7 @@ import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.discover.RegistrationManager;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.UnderreplicatedLedger;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
@@ -931,14 +932,17 @@ public class BookieShell implements Tool {
Thread.currentThread().interrupt();
throw new UncheckedExecutionException("Interrupted on newing ledger underreplicated manager", e);
}
- Iterator<Map.Entry<Long, List<String>>> iter = underreplicationManager
- .listLedgersToRereplicate(predicate, printMissingReplica);
+ Iterator<UnderreplicatedLedger> iter = underreplicationManager.listLedgersToRereplicate(predicate);
while (iter.hasNext()) {
- Map.Entry<Long, List<String>> urLedgerMapEntry = iter.next();
- long urLedgerId = urLedgerMapEntry.getKey();
+ UnderreplicatedLedger underreplicatedLedger = iter.next();
+ long urLedgerId = underreplicatedLedger.getLedgerId();
System.out.println(ledgerIdFormatter.formatLedgerId(urLedgerId));
+ long ctime = underreplicatedLedger.getCtime();
+ if (ctime != UnderreplicatedLedger.UNASSIGNED_CTIME) {
+ System.out.println("\tCtime : " + ctime);
+ }
if (printMissingReplica) {
- urLedgerMapEntry.getValue().forEach((missingReplica) -> {
+ underreplicatedLedger.getReplicaList().forEach((missingReplica) -> {
System.out.println("\tMissingReplica : " + missingReplica);
});
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index cadbe49..958dc77 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -68,6 +68,7 @@ import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.UnderreplicatedLedger;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -1490,15 +1491,14 @@ public class BookKeeperAdmin implements AutoCloseable {
// for double-checking, check if any ledgers are listed as underreplicated because of this bookie
Predicate<List<String>> predicate = replicasList -> replicasList.contains(bookieAddress.toString());
- Iterator<Map.Entry<Long, List<String>>> urLedgerIterator = underreplicationManager
- .listLedgersToRereplicate(predicate, false);
+ Iterator<UnderreplicatedLedger> urLedgerIterator = underreplicationManager.listLedgersToRereplicate(predicate);
if (urLedgerIterator.hasNext()) {
//if there are any then wait and make sure those ledgers are replicated properly
LOG.info("Still in some underreplicated ledgers metadata, this bookie is part of its ensemble. "
+ "Have to make sure that those ledger fragments are rereplicated");
List<Long> urLedgers = new ArrayList<>();
urLedgerIterator.forEachRemaining((urLedger) -> {
- urLedgers.add(urLedger.getKey());
+ urLedgers.add(urLedger.getLedgerId());
});
waitForLedgersToBeReplicated(urLedgers, bookieAddress, bkc.ledgerManager);
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index 481c5a5..1ce4cf3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -82,6 +82,8 @@ public abstract class AbstractConfiguration<T extends AbstractConfiguration>
protected static final String ZK_REQUEST_RATE_LIMIT = "zkRequestRateLimit";
protected static final String AVAILABLE_NODE = "available";
protected static final String REREPLICATION_ENTRY_BATCH_SIZE = "rereplicationEntryBatchSize";
+ protected static final String STORE_SYSTEMTIME_AS_LEDGER_UNDERREPLICATED_MARK_TIME =
+ "storeSystemTimeAsLedgerUnderreplicatedMarkTime";
// Metastore settings, only being used when LEDGER_MANAGER_FACTORY_CLASS is MSLedgerManagerFactory
protected static final String METASTORE_IMPL_CLASS = "metastoreImplClass";
@@ -813,6 +815,33 @@ public abstract class AbstractConfiguration<T extends AbstractConfiguration>
}
/**
+ * Enable the Auditor to use system time as underreplicated ledger mark
+ * time.
+ *
+ * <p>If this is enabled, Auditor will write a ctime field into the
+ * underreplicated ledger znode.
+ *
+ * @param enabled
+ * flag to enable/disable Auditor using system time as
+ * underreplicated ledger mark time.
+ */
+ public T setStoreSystemTimeAsLedgerUnderreplicatedMarkTime(boolean enabled) {
+ setProperty(STORE_SYSTEMTIME_AS_LEDGER_UNDERREPLICATED_MARK_TIME, enabled);
+ return getThis();
+ }
+
+ /**
+ * Return the flag that indicates whether auditor is using system time as
+ * underreplicated ledger mark time.
+ *
+ * @return the flag that indicates whether auditor is using system time as
+ * underreplicated ledger mark time.
+ */
+ public boolean getStoreSystemTimeAsLedgerUnderreplicatedMarkTime() {
+ return getBoolean(STORE_SYSTEMTIME_AS_LEDGER_UNDERREPLICATED_MARK_TIME, false);
+ }
+
+ /**
* Trickery to allow inheritance with fluent style.
*/
protected abstract T getThis();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
index 532f8d2..ed87154 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
@@ -19,7 +19,6 @@ package org.apache.bookkeeper.meta;
import java.util.Iterator;
import java.util.List;
-import java.util.Map.Entry;
import java.util.function.Predicate;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -44,21 +43,16 @@ public interface LedgerUnderreplicationManager extends AutoCloseable {
throws ReplicationException.UnavailableException;
/**
- * Get a list of all the ledgers which have been
+ * Get a list of all the underreplicated ledgers which have been
* marked for rereplication, filtered by the predicate on the missing replicas list.
*
* <p>Missing replicas list of an underreplicated ledger is the list of the bookies which are part of
* the ensemble of this ledger and are currently unavailable/down.
*
- * <p>If filtering is not needed then it is suggested to pass null for predicate,
- * otherwise it will read the content of the ZNode to decide on filtering.
- *
* @param predicate filter to use while listing under replicated ledgers. 'null' if filtering is not required
- * @param includeReplicaList whether to include missing replicalist in the output
- * @return an iterator which returns ledger ids
+ * @return an iterator which returns underreplicated ledgers.
*/
- Iterator<Entry<Long, List<String>>> listLedgersToRereplicate(Predicate<List<String>> predicate,
- boolean includeReplicaList);
+ Iterator<UnderreplicatedLedger> listLedgersToRereplicate(Predicate<List<String>> predicate);
/**
* Acquire a underreplicated ledger for rereplication. The ledger
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/UnderreplicatedLedger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/UnderreplicatedLedger.java
new file mode 100644
index 0000000..6ad3036
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/UnderreplicatedLedger.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import java.util.List;
+
+/**
+ * UnderReplicated ledger representation info.
+ */
+public class UnderreplicatedLedger {
+ private final long ledgerId;
+ private long ctime;
+ private List<String> replicaList;
+ public static final long UNASSIGNED_CTIME = -1L;
+
+ protected UnderreplicatedLedger(long ledgerId) {
+ this.ledgerId = ledgerId;
+ }
+
+ public long getCtime() {
+ return ctime;
+ }
+
+ protected void setCtime(long ctime) {
+ this.ctime = ctime;
+ }
+
+ public List<String> getReplicaList() {
+ return replicaList;
+ }
+
+ protected void setReplicaList(List<String> replicaList) {
+ this.replicaList = replicaList;
+ }
+
+ public long getLedgerId() {
+ return ledgerId;
+ }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
index fbf2839..00215f0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
@@ -26,7 +26,6 @@ import com.google.protobuf.TextFormat;
import com.google.protobuf.TextFormat.ParseException;
import java.net.UnknownHostException;
-import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
@@ -266,6 +265,9 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
while (true) {
UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder();
try {
+ if (conf.getStoreSystemTimeAsLedgerUnderreplicatedMarkTime()) {
+ builder.setCtime(System.currentTimeMillis());
+ }
builder.addReplica(missingReplica);
ZkUtils.createFullPathOptimistic(zkc, znode, TextFormat
.printToString(builder.build()).getBytes(UTF_8),
@@ -352,27 +354,22 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
}
/**
- * Get a list of all the ledgers which have been
+ * Get a list of all the underreplicated ledgers which have been
* marked for rereplication, filtered by the predicate on the replicas list.
*
* <p>Replicas list of an underreplicated ledger is the list of the bookies which are part of
* the ensemble of this ledger and are currently unavailable/down.
*
- * <p>If filtering is not needed then it is suggested to pass null for predicate,
- * otherwise it will read the content of the ZNode to decide on filtering.
- *
* @param predicate filter to use while listing under replicated ledgers. 'null' if filtering is not required.
- * @param includeReplicaList whether to include missing replicalist in the output.
- * @return an iterator which returns ledger ids
+ * @return an iterator which returns underreplicated ledgers.
*/
@Override
- public Iterator<Map.Entry<Long, List<String>>> listLedgersToRereplicate(final Predicate<List<String>> predicate,
- boolean includeReplicaList) {
+ public Iterator<UnderreplicatedLedger> listLedgersToRereplicate(final Predicate<List<String>> predicate) {
final Queue<String> queue = new LinkedList<String>();
queue.add(urLedgerPath);
- return new Iterator<Map.Entry<Long, List<String>>>() {
- final Queue<Map.Entry<Long, List<String>>> curBatch = new LinkedList<Map.Entry<Long, List<String>>>();
+ return new Iterator<UnderreplicatedLedger>() {
+ final Queue<UnderreplicatedLedger> curBatch = new LinkedList<UnderreplicatedLedger>();
@Override
public void remove() {
@@ -393,11 +390,18 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
String child = parent + "/" + c;
if (c.startsWith("urL")) {
long ledgerId = getLedgerId(child);
- List<String> replicaList = getLedgerUnreplicationInfo(ledgerId).getReplicaList();
- if ((predicate == null)
- || predicate.test(replicaList)) {
- curBatch.add(new AbstractMap.SimpleImmutableEntry<Long, List<String>>(ledgerId,
- ((includeReplicaList) ? replicaList : null)));
+ UnderreplicatedLedgerFormat underreplicatedLedgerFormat =
+ getLedgerUnreplicationInfo(ledgerId);
+ List<String> replicaList = underreplicatedLedgerFormat.getReplicaList();
+ long ctime = (underreplicatedLedgerFormat.hasCtime()
+ ? underreplicatedLedgerFormat.getCtime()
+ : UnderreplicatedLedger.UNASSIGNED_CTIME);
+ if ((predicate == null) || predicate.test(replicaList)) {
+ UnderreplicatedLedger underreplicatedLedger = new UnderreplicatedLedger(
+ ledgerId);
+ underreplicatedLedger.setCtime(ctime);
+ underreplicatedLedger.setReplicaList(replicaList);
+ curBatch.add(underreplicatedLedger);
}
} else {
queue.add(child);
@@ -419,7 +423,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
}
@Override
- public Map.Entry<Long, List<String>> next() {
+ public UnderreplicatedLedger next() {
assert curBatch.size() > 0;
return curBatch.remove();
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/ListUnderReplicatedLedgerService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/ListUnderReplicatedLedgerService.java
index 7303be0..ef71a68 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/ListUnderReplicatedLedgerService.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/ListUnderReplicatedLedgerService.java
@@ -34,6 +34,7 @@ import org.apache.bookkeeper.http.service.HttpServiceRequest;
import org.apache.bookkeeper.http.service.HttpServiceResponse;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.UnderreplicatedLedger;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
@@ -101,8 +102,7 @@ public class ListUnderReplicatedLedgerService implements HttpEndpointService {
Map<Long, List<String>> outputLedgersWithMissingReplica = null;
LedgerManagerFactory mFactory = bookieServer.getBookie().getLedgerManagerFactory();
LedgerUnderreplicationManager underreplicationManager = mFactory.newLedgerUnderreplicationManager();
- Iterator<Map.Entry<Long, List<String>>> iter = underreplicationManager
- .listLedgersToRereplicate(predicate, printMissingReplica);
+ Iterator<UnderreplicatedLedger> iter = underreplicationManager.listLedgersToRereplicate(predicate);
hasURLedgers = iter.hasNext();
if (hasURLedgers) {
@@ -114,11 +114,11 @@ public class ListUnderReplicatedLedgerService implements HttpEndpointService {
}
while (iter.hasNext()) {
if (printMissingReplica) {
- Map.Entry<Long, List<String>> urlWithMissingReplica = iter.next();
- outputLedgersWithMissingReplica.put(urlWithMissingReplica.getKey(),
- urlWithMissingReplica.getValue());
+ UnderreplicatedLedger underreplicatedLedger = iter.next();
+ outputLedgersWithMissingReplica.put(underreplicatedLedger.getLedgerId(),
+ underreplicatedLedger.getReplicaList());
} else {
- outputLedgers.add(iter.next().getKey());
+ outputLedgers.add(iter.next().getLedgerId());
}
}
if (!hasURLedgers) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
index 2c349e2..6e8cf32 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
@@ -32,13 +32,12 @@ import java.io.File;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
import java.util.Random;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.UnderreplicatedLedger;
import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.proto.BookieServer;
@@ -89,8 +88,25 @@ public class BookKeeperAdminTest extends BookKeeperClusterTestCase {
}
@Test
- public void testTriggerAudit() throws Exception {
- ZkLedgerUnderreplicationManager urLedgerMgr = new ZkLedgerUnderreplicationManager(baseClientConf, zkc);
+ public void testTriggerAuditWithStoreSystemTimeAsLedgerUnderreplicatedMarkTime() throws Exception {
+ testTriggerAudit(true);
+ }
+
+ @Test
+ public void testTriggerAuditWithoutStoreSystemTimeAsLedgerUnderreplicatedMarkTime() throws Exception {
+ testTriggerAudit(false);
+ }
+
+ public void testTriggerAudit(boolean storeSystemTimeAsLedgerUnderreplicatedMarkTime) throws Exception {
+ ServerConfiguration thisServerConf = new ServerConfiguration(baseConf);
+ thisServerConf
+ .setStoreSystemTimeAsLedgerUnderreplicatedMarkTime(storeSystemTimeAsLedgerUnderreplicatedMarkTime);
+ restartBookies(thisServerConf);
+ ClientConfiguration thisClientConf = new ClientConfiguration(baseClientConf);
+ thisClientConf
+ .setStoreSystemTimeAsLedgerUnderreplicatedMarkTime(storeSystemTimeAsLedgerUnderreplicatedMarkTime);
+ long testStartSystime = System.currentTimeMillis();
+ ZkLedgerUnderreplicationManager urLedgerMgr = new ZkLedgerUnderreplicationManager(thisClientConf, zkc);
BookKeeperAdmin bkAdmin = new BookKeeperAdmin(zkUtil.getZooKeeperConnectString());
int lostBookieRecoveryDelayValue = bkAdmin.getLostBookieRecoveryDelay();
urLedgerMgr.disableLedgerReplication();
@@ -119,13 +135,20 @@ public class BookKeeperAdminTest extends BookKeeperClusterTestCase {
*/
bkAdmin.triggerAudit();
Thread.sleep(500);
- Iterator<Map.Entry<Long, List<String>>> ledgersToRereplicate = urLedgerMgr.listLedgersToRereplicate(null,
- true);
- assertTrue("There are supposed to be underreplicatedledgers", ledgersToRereplicate.hasNext());
- Entry<Long, List<String>> urlWithReplicaList = ledgersToRereplicate.next();
- assertEquals("Underreplicated ledgerId", ledgerId, urlWithReplicaList.getKey().longValue());
+ Iterator<UnderreplicatedLedger> underreplicatedLedgerItr = urLedgerMgr.listLedgersToRereplicate(null);
+ assertTrue("There are supposed to be underreplicatedledgers", underreplicatedLedgerItr.hasNext());
+ UnderreplicatedLedger underreplicatedLedger = underreplicatedLedgerItr.next();
+ assertEquals("Underreplicated ledgerId", ledgerId, underreplicatedLedger.getLedgerId());
assertTrue("Missingreplica of Underreplicated ledgerId should contain " + bookieToKill.getLocalAddress(),
- urlWithReplicaList.getValue().contains(bookieToKill.getLocalAddress().toString()));
+ underreplicatedLedger.getReplicaList().contains(bookieToKill.getLocalAddress().toString()));
+ if (storeSystemTimeAsLedgerUnderreplicatedMarkTime) {
+ long ctimeOfURL = underreplicatedLedger.getCtime();
+ assertTrue("ctime of underreplicated ledger should be greater than test starttime",
+ (ctimeOfURL > testStartSystime) && (ctimeOfURL < System.currentTimeMillis()));
+ } else {
+ assertEquals("ctime of underreplicated ledger should not be set", UnderreplicatedLedger.UNASSIGNED_CTIME,
+ underreplicatedLedger.getCtime());
+ }
bkAdmin.close();
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieDecommissionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieDecommissionTest.java
index b55a8c6..906db8f 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieDecommissionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieDecommissionTest.java
@@ -22,8 +22,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.bookie.Bookie;
@@ -31,6 +29,7 @@ import org.apache.bookkeeper.client.BKException.BKIllegalOpException;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.common.testing.annotations.FlakyTest;
import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.UnderreplicatedLedger;
import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.junit.Test;
@@ -92,11 +91,10 @@ public class BookieDecommissionTest extends BookKeeperClusterTestCase {
bkAdmin.decommissionBookie(Bookie.getBookieAddress(killedBookieConf));
bkAdmin.triggerAudit();
Thread.sleep(500);
- Iterator<Map.Entry<Long, List<String>>> ledgersToRereplicate = urLedgerMgr.listLedgersToRereplicate(null,
- false);
+ Iterator<UnderreplicatedLedger> ledgersToRereplicate = urLedgerMgr.listLedgersToRereplicate(null);
if (ledgersToRereplicate.hasNext()) {
while (ledgersToRereplicate.hasNext()) {
- Long ledgerId = ledgersToRereplicate.next().getKey();
+ Long ledgerId = ledgersToRereplicate.next().getLedgerId();
log.error("Ledger: {} is underreplicated which is not expected", ledgerId);
}
fail("There are not supposed to be any underreplicatedledgers");
@@ -106,10 +104,10 @@ public class BookieDecommissionTest extends BookKeeperClusterTestCase {
bkAdmin.decommissionBookie(Bookie.getBookieAddress(killedBookieConf));
bkAdmin.triggerAudit();
Thread.sleep(500);
- ledgersToRereplicate = urLedgerMgr.listLedgersToRereplicate(null, false);
+ ledgersToRereplicate = urLedgerMgr.listLedgersToRereplicate(null);
if (ledgersToRereplicate.hasNext()) {
while (ledgersToRereplicate.hasNext()) {
- Long ledgerId = ledgersToRereplicate.next().getKey();
+ Long ledgerId = ledgersToRereplicate.next().getLedgerId();
log.error("Ledger: {} is underreplicated which is not expected", ledgerId);
}
fail("There are not supposed to be any underreplicatedledgers");
@@ -166,11 +164,10 @@ public class BookieDecommissionTest extends BookKeeperClusterTestCase {
bkAdmin.decommissionBookie(Bookie.getBookieAddress(killedBookieConf));
bkAdmin.triggerAudit();
Thread.sleep(500);
- Iterator<Map.Entry<Long, List<String>>> ledgersToRereplicate = urLedgerMgr.listLedgersToRereplicate(null,
- false);
+ Iterator<UnderreplicatedLedger> ledgersToRereplicate = urLedgerMgr.listLedgersToRereplicate(null);
if (ledgersToRereplicate.hasNext()) {
while (ledgersToRereplicate.hasNext()) {
- Long ledgerId = ledgersToRereplicate.next().getKey();
+ long ledgerId = ledgersToRereplicate.next().getLedgerId();
log.error("Ledger: {} is underreplicated which is not expected", ledgerId);
}
fail("There are not supposed to be any underreplicatedledgers");
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 08d97a9..1eaa191 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -909,6 +909,10 @@ zkEnableSecurity=false
# How long to wait, in seconds, before starting auto recovery of a lost bookie
# lostBookieRecoveryDelay=0
+# Enable the Auditor to use system time as underreplicated ledger mark time.
+# If this is enabled, Auditor will write a ctime field into the underreplicated ledger znode.
+# storeSystemTimeAsLedgerUnderreplicatedMarkTime=false
+
#############################################################################
## Replication Worker settings
#############################################################################
diff --git a/site/_data/config/bk_server.yaml b/site/_data/config/bk_server.yaml
index afd0792..fd4f9a2 100644
--- a/site/_data/config/bk_server.yaml
+++ b/site/_data/config/bk_server.yaml
@@ -646,6 +646,8 @@ groups:
- param: lostBookieRecoveryDelay
description: How long to wait, in seconds, before starting autorecovery of a lost bookie.
default: 0
+ - param: storeSystemTimeAsLedgerUnderreplicatedMarkTime
+ description: Enable the Auditor to use system time as underreplicated ledger mark time. If this is enabled, Auditor will write a ctime field into the underreplicated ledger znode.
- name: AutoRecovery replication worker settings
params: