You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/08/08 18:04:51 UTC

[bookkeeper] branch master updated: Record ctime for underreplicated ledger mark time.

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new f4094c4  Record ctime for underreplicated ledger mark time.
f4094c4 is described below

commit f4094c4992d5b22630fa633085a00a9152b87ffe
Author: cguttapalem <cg...@salesforce.com>
AuthorDate: Wed Aug 8 11:04:43 2018 -0700

    Record ctime for underreplicated ledger mark time.
    
    Descriptions of the changes in this PR:
    
    Enable the Auditor to use system time as underreplicated ledger mark time.
    If this is enabled, Auditor will write a ctime field into the
    underreplicated ledger znode. This would be useful in knowing when the
    ledger is marked underreplicated, using BookieShell commands and also later
    we can add alerts/metrics on under replicated ledgers which remained under
    replicated for more than acceptable duration.
    
    Author: cguttapalem <cg...@salesforce.com>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
    
    This closes #1576 from reddycharan/urlctime
---
 bookkeeper-proto/src/main/proto/DataFormats.proto  |  1 +
 .../org/apache/bookkeeper/bookie/BookieShell.java  | 14 ++++--
 .../apache/bookkeeper/client/BookKeeperAdmin.java  |  6 +--
 .../bookkeeper/conf/AbstractConfiguration.java     | 29 ++++++++++++
 .../meta/LedgerUnderreplicationManager.java        | 12 ++---
 .../bookkeeper/meta/UnderreplicatedLedger.java     | 54 ++++++++++++++++++++++
 .../meta/ZkLedgerUnderreplicationManager.java      | 38 ++++++++-------
 .../service/ListUnderReplicatedLedgerService.java  | 12 ++---
 .../bookkeeper/client/BookKeeperAdminTest.java     | 43 +++++++++++++----
 .../bookkeeper/client/BookieDecommissionTest.java  | 17 +++----
 conf/bk_server.conf                                |  4 ++
 site/_data/config/bk_server.yaml                   |  2 +
 12 files changed, 172 insertions(+), 60 deletions(-)

diff --git a/bookkeeper-proto/src/main/proto/DataFormats.proto b/bookkeeper-proto/src/main/proto/DataFormats.proto
index 823257c..92eaa5f 100644
--- a/bookkeeper-proto/src/main/proto/DataFormats.proto
+++ b/bookkeeper-proto/src/main/proto/DataFormats.proto
@@ -69,6 +69,7 @@ message LedgerRereplicationLayoutFormat {
  
 message UnderreplicatedLedgerFormat {
     repeated string replica = 1;
+    optional int64 ctime = 2;
 }
 
 /**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 102a441..5b4eff8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -96,6 +96,7 @@ import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.discover.RegistrationManager;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.UnderreplicatedLedger;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
@@ -931,14 +932,17 @@ public class BookieShell implements Tool {
                     Thread.currentThread().interrupt();
                     throw new UncheckedExecutionException("Interrupted on newing ledger underreplicated manager", e);
                 }
-                Iterator<Map.Entry<Long, List<String>>> iter = underreplicationManager
-                        .listLedgersToRereplicate(predicate, printMissingReplica);
+                Iterator<UnderreplicatedLedger> iter = underreplicationManager.listLedgersToRereplicate(predicate);
                 while (iter.hasNext()) {
-                    Map.Entry<Long, List<String>> urLedgerMapEntry = iter.next();
-                    long urLedgerId = urLedgerMapEntry.getKey();
+                    UnderreplicatedLedger underreplicatedLedger = iter.next();
+                    long urLedgerId = underreplicatedLedger.getLedgerId();
                     System.out.println(ledgerIdFormatter.formatLedgerId(urLedgerId));
+                    long ctime = underreplicatedLedger.getCtime();
+                    if (ctime != UnderreplicatedLedger.UNASSIGNED_CTIME) {
+                        System.out.println("\tCtime : " + ctime);
+                    }
                     if (printMissingReplica) {
-                        urLedgerMapEntry.getValue().forEach((missingReplica) -> {
+                        underreplicatedLedger.getReplicaList().forEach((missingReplica) -> {
                             System.out.println("\tMissingReplica : " + missingReplica);
                         });
                     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index cadbe49..958dc77 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -68,6 +68,7 @@ import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.UnderreplicatedLedger;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -1490,15 +1491,14 @@ public class BookKeeperAdmin implements AutoCloseable {
 
         // for double-checking, check if any ledgers are listed as underreplicated because of this bookie
         Predicate<List<String>> predicate = replicasList -> replicasList.contains(bookieAddress.toString());
-        Iterator<Map.Entry<Long, List<String>>> urLedgerIterator = underreplicationManager
-                .listLedgersToRereplicate(predicate, false);
+        Iterator<UnderreplicatedLedger> urLedgerIterator = underreplicationManager.listLedgersToRereplicate(predicate);
         if (urLedgerIterator.hasNext()) {
             //if there are any then wait and make sure those ledgers are replicated properly
             LOG.info("Still in some underreplicated ledgers metadata, this bookie is part of its ensemble. "
                     + "Have to make sure that those ledger fragments are rereplicated");
             List<Long> urLedgers = new ArrayList<>();
             urLedgerIterator.forEachRemaining((urLedger) -> {
-                urLedgers.add(urLedger.getKey());
+                urLedgers.add(urLedger.getLedgerId());
             });
             waitForLedgersToBeReplicated(urLedgers, bookieAddress, bkc.ledgerManager);
         }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index 481c5a5..1ce4cf3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -82,6 +82,8 @@ public abstract class AbstractConfiguration<T extends AbstractConfiguration>
     protected static final String ZK_REQUEST_RATE_LIMIT = "zkRequestRateLimit";
     protected static final String AVAILABLE_NODE = "available";
     protected static final String REREPLICATION_ENTRY_BATCH_SIZE = "rereplicationEntryBatchSize";
+    protected static final String STORE_SYSTEMTIME_AS_LEDGER_UNDERREPLICATED_MARK_TIME =
+            "storeSystemTimeAsLedgerUnderreplicatedMarkTime";
 
     // Metastore settings, only being used when LEDGER_MANAGER_FACTORY_CLASS is MSLedgerManagerFactory
     protected static final String METASTORE_IMPL_CLASS = "metastoreImplClass";
@@ -813,6 +815,33 @@ public abstract class AbstractConfiguration<T extends AbstractConfiguration>
     }
 
     /**
+     * Enable the Auditor to use system time as underreplicated ledger mark
+     * time.
+     *
+     * <p>If this is enabled, Auditor will write a ctime field into the
+     * underreplicated ledger znode.
+     *
+     * @param enabled
+     *            flag to enable/disable Auditor using system time as
+     *            underreplicated ledger mark time.
+     */
+    public T setStoreSystemTimeAsLedgerUnderreplicatedMarkTime(boolean enabled) {
+        setProperty(STORE_SYSTEMTIME_AS_LEDGER_UNDERREPLICATED_MARK_TIME, enabled);
+        return getThis();
+    }
+
+    /**
+     * Return the flag that indicates whether auditor is using system time as
+     * underreplicated ledger mark time.
+     *
+     * @return the flag that indicates whether auditor is using system time as
+     *         underreplicated ledger mark time.
+     */
+    public boolean getStoreSystemTimeAsLedgerUnderreplicatedMarkTime() {
+        return getBoolean(STORE_SYSTEMTIME_AS_LEDGER_UNDERREPLICATED_MARK_TIME, false);
+    }
+
+    /**
      * Trickery to allow inheritance with fluent style.
      */
     protected abstract T getThis();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
index 532f8d2..ed87154 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
@@ -19,7 +19,6 @@ package org.apache.bookkeeper.meta;
 
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map.Entry;
 import java.util.function.Predicate;
 
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -44,21 +43,16 @@ public interface LedgerUnderreplicationManager extends AutoCloseable {
             throws ReplicationException.UnavailableException;
 
     /**
-     * Get a list of all the ledgers which have been
+     * Get a list of all the underreplicated ledgers which have been
      * marked for rereplication, filtered by the predicate on the missing replicas list.
      *
      * <p>Missing replicas list of an underreplicated ledger is the list of the bookies which are part of
      * the ensemble of this ledger and are currently unavailable/down.
      *
-     * <p>If filtering is not needed then it is suggested to pass null for predicate,
-     * otherwise it will read the content of the ZNode to decide on filtering.
-     *
      * @param predicate filter to use while listing under replicated ledgers. 'null' if filtering is not required
-     * @param includeReplicaList whether to include missing replicalist in the output
-     * @return an iterator which returns ledger ids
+     * @return an iterator which returns underreplicated ledgers.
      */
-    Iterator<Entry<Long, List<String>>> listLedgersToRereplicate(Predicate<List<String>> predicate,
-            boolean includeReplicaList);
+    Iterator<UnderreplicatedLedger> listLedgersToRereplicate(Predicate<List<String>> predicate);
 
     /**
      * Acquire a underreplicated ledger for rereplication. The ledger
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/UnderreplicatedLedger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/UnderreplicatedLedger.java
new file mode 100644
index 0000000..6ad3036
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/UnderreplicatedLedger.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import java.util.List;
+
+/**
+ * UnderReplicated ledger representation info.
+ */
+public class UnderreplicatedLedger {
+    private final long ledgerId;
+    private long ctime;
+    private List<String> replicaList;
+    public static final long UNASSIGNED_CTIME = -1L;
+
+    protected UnderreplicatedLedger(long ledgerId) {
+        this.ledgerId = ledgerId;
+    }
+
+    public long getCtime() {
+        return ctime;
+    }
+
+    protected void setCtime(long ctime) {
+        this.ctime = ctime;
+    }
+
+    public List<String> getReplicaList() {
+        return replicaList;
+    }
+
+    protected void setReplicaList(List<String> replicaList) {
+        this.replicaList = replicaList;
+    }
+
+    public long getLedgerId() {
+        return ledgerId;
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
index fbf2839..00215f0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
@@ -26,7 +26,6 @@ import com.google.protobuf.TextFormat;
 import com.google.protobuf.TextFormat.ParseException;
 
 import java.net.UnknownHostException;
-import java.util.AbstractMap;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
@@ -266,6 +265,9 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
             while (true) {
                 UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder();
                 try {
+                    if (conf.getStoreSystemTimeAsLedgerUnderreplicatedMarkTime()) {
+                        builder.setCtime(System.currentTimeMillis());
+                    }
                     builder.addReplica(missingReplica);
                     ZkUtils.createFullPathOptimistic(zkc, znode, TextFormat
                             .printToString(builder.build()).getBytes(UTF_8),
@@ -352,27 +354,22 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
     }
 
     /**
-     * Get a list of all the ledgers which have been
+     * Get a list of all the underreplicated ledgers which have been
      * marked for rereplication, filtered by the predicate on the replicas list.
      *
      * <p>Replicas list of an underreplicated ledger is the list of the bookies which are part of
      * the ensemble of this ledger and are currently unavailable/down.
      *
-     * <p>If filtering is not needed then it is suggested to pass null for predicate,
-     * otherwise it will read the content of the ZNode to decide on filtering.
-     *
      * @param predicate filter to use while listing under replicated ledgers. 'null' if filtering is not required.
-     * @param includeReplicaList whether to include missing replicalist in the output.
-     * @return an iterator which returns ledger ids
+     * @return an iterator which returns underreplicated ledgers.
      */
     @Override
-    public Iterator<Map.Entry<Long, List<String>>> listLedgersToRereplicate(final Predicate<List<String>> predicate,
-            boolean includeReplicaList) {
+    public Iterator<UnderreplicatedLedger> listLedgersToRereplicate(final Predicate<List<String>> predicate) {
         final Queue<String> queue = new LinkedList<String>();
         queue.add(urLedgerPath);
 
-        return new Iterator<Map.Entry<Long, List<String>>>() {
-            final Queue<Map.Entry<Long, List<String>>> curBatch = new LinkedList<Map.Entry<Long, List<String>>>();
+        return new Iterator<UnderreplicatedLedger>() {
+            final Queue<UnderreplicatedLedger> curBatch = new LinkedList<UnderreplicatedLedger>();
 
             @Override
             public void remove() {
@@ -393,11 +390,18 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
                                 String child = parent + "/" + c;
                                 if (c.startsWith("urL")) {
                                     long ledgerId = getLedgerId(child);
-                                    List<String> replicaList = getLedgerUnreplicationInfo(ledgerId).getReplicaList();
-                                    if ((predicate == null)
-                                            || predicate.test(replicaList)) {
-                                        curBatch.add(new AbstractMap.SimpleImmutableEntry<Long, List<String>>(ledgerId,
-                                                ((includeReplicaList) ? replicaList : null)));
+                                    UnderreplicatedLedgerFormat underreplicatedLedgerFormat =
+                                            getLedgerUnreplicationInfo(ledgerId);
+                                    List<String> replicaList = underreplicatedLedgerFormat.getReplicaList();
+                                    long ctime = (underreplicatedLedgerFormat.hasCtime()
+                                            ? underreplicatedLedgerFormat.getCtime()
+                                            : UnderreplicatedLedger.UNASSIGNED_CTIME);
+                                    if ((predicate == null) || predicate.test(replicaList)) {
+                                        UnderreplicatedLedger underreplicatedLedger = new UnderreplicatedLedger(
+                                                ledgerId);
+                                        underreplicatedLedger.setCtime(ctime);
+                                        underreplicatedLedger.setReplicaList(replicaList);
+                                        curBatch.add(underreplicatedLedger);
                                     }
                                 } else {
                                     queue.add(child);
@@ -419,7 +423,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
             }
 
             @Override
-            public Map.Entry<Long, List<String>> next() {
+            public UnderreplicatedLedger next() {
                 assert curBatch.size() > 0;
                 return curBatch.remove();
             }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/ListUnderReplicatedLedgerService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/ListUnderReplicatedLedgerService.java
index 7303be0..ef71a68 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/ListUnderReplicatedLedgerService.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/ListUnderReplicatedLedgerService.java
@@ -34,6 +34,7 @@ import org.apache.bookkeeper.http.service.HttpServiceRequest;
 import org.apache.bookkeeper.http.service.HttpServiceResponse;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.UnderreplicatedLedger;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -101,8 +102,7 @@ public class ListUnderReplicatedLedgerService implements HttpEndpointService {
                 Map<Long, List<String>> outputLedgersWithMissingReplica = null;
                 LedgerManagerFactory mFactory = bookieServer.getBookie().getLedgerManagerFactory();
                 LedgerUnderreplicationManager underreplicationManager = mFactory.newLedgerUnderreplicationManager();
-                Iterator<Map.Entry<Long, List<String>>> iter = underreplicationManager
-                        .listLedgersToRereplicate(predicate, printMissingReplica);
+                Iterator<UnderreplicatedLedger> iter = underreplicationManager.listLedgersToRereplicate(predicate);
 
                 hasURLedgers = iter.hasNext();
                 if (hasURLedgers) {
@@ -114,11 +114,11 @@ public class ListUnderReplicatedLedgerService implements HttpEndpointService {
                 }
                 while (iter.hasNext()) {
                     if (printMissingReplica) {
-                        Map.Entry<Long, List<String>> urlWithMissingReplica = iter.next();
-                        outputLedgersWithMissingReplica.put(urlWithMissingReplica.getKey(),
-                                urlWithMissingReplica.getValue());
+                        UnderreplicatedLedger underreplicatedLedger = iter.next();
+                        outputLedgersWithMissingReplica.put(underreplicatedLedger.getLedgerId(),
+                                underreplicatedLedger.getReplicaList());
                     } else {
-                        outputLedgers.add(iter.next().getKey());
+                        outputLedgers.add(iter.next().getLedgerId());
                     }
                 }
                 if (!hasURLedgers) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
index 2c349e2..6e8cf32 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
@@ -32,13 +32,12 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Random;
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.UnderreplicatedLedger;
 import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.proto.BookieServer;
@@ -89,8 +88,25 @@ public class BookKeeperAdminTest extends BookKeeperClusterTestCase {
     }
 
     @Test
-    public void testTriggerAudit() throws Exception {
-        ZkLedgerUnderreplicationManager urLedgerMgr = new ZkLedgerUnderreplicationManager(baseClientConf, zkc);
+    public void testTriggerAuditWithStoreSystemTimeAsLedgerUnderreplicatedMarkTime() throws Exception {
+        testTriggerAudit(true);
+    }
+
+    @Test
+    public void testTriggerAuditWithoutStoreSystemTimeAsLedgerUnderreplicatedMarkTime() throws Exception {
+        testTriggerAudit(false);
+    }
+
+    public void testTriggerAudit(boolean storeSystemTimeAsLedgerUnderreplicatedMarkTime) throws Exception {
+        ServerConfiguration thisServerConf = new ServerConfiguration(baseConf);
+        thisServerConf
+                .setStoreSystemTimeAsLedgerUnderreplicatedMarkTime(storeSystemTimeAsLedgerUnderreplicatedMarkTime);
+        restartBookies(thisServerConf);
+        ClientConfiguration thisClientConf = new ClientConfiguration(baseClientConf);
+        thisClientConf
+                .setStoreSystemTimeAsLedgerUnderreplicatedMarkTime(storeSystemTimeAsLedgerUnderreplicatedMarkTime);
+        long testStartSystime = System.currentTimeMillis();
+        ZkLedgerUnderreplicationManager urLedgerMgr = new ZkLedgerUnderreplicationManager(thisClientConf, zkc);
         BookKeeperAdmin bkAdmin = new BookKeeperAdmin(zkUtil.getZooKeeperConnectString());
         int lostBookieRecoveryDelayValue = bkAdmin.getLostBookieRecoveryDelay();
         urLedgerMgr.disableLedgerReplication();
@@ -119,13 +135,20 @@ public class BookKeeperAdminTest extends BookKeeperClusterTestCase {
          */
         bkAdmin.triggerAudit();
         Thread.sleep(500);
-        Iterator<Map.Entry<Long, List<String>>> ledgersToRereplicate = urLedgerMgr.listLedgersToRereplicate(null,
-                true);
-        assertTrue("There are supposed to be underreplicatedledgers", ledgersToRereplicate.hasNext());
-        Entry<Long, List<String>> urlWithReplicaList = ledgersToRereplicate.next();
-        assertEquals("Underreplicated ledgerId", ledgerId, urlWithReplicaList.getKey().longValue());
+        Iterator<UnderreplicatedLedger> underreplicatedLedgerItr = urLedgerMgr.listLedgersToRereplicate(null);
+        assertTrue("There are supposed to be underreplicatedledgers", underreplicatedLedgerItr.hasNext());
+        UnderreplicatedLedger underreplicatedLedger = underreplicatedLedgerItr.next();
+        assertEquals("Underreplicated ledgerId", ledgerId, underreplicatedLedger.getLedgerId());
         assertTrue("Missingreplica of Underreplicated ledgerId should contain " + bookieToKill.getLocalAddress(),
-                urlWithReplicaList.getValue().contains(bookieToKill.getLocalAddress().toString()));
+                underreplicatedLedger.getReplicaList().contains(bookieToKill.getLocalAddress().toString()));
+        if (storeSystemTimeAsLedgerUnderreplicatedMarkTime) {
+            long ctimeOfURL = underreplicatedLedger.getCtime();
+            assertTrue("ctime of underreplicated ledger should be greater than test starttime",
+                    (ctimeOfURL > testStartSystime) && (ctimeOfURL < System.currentTimeMillis()));
+        } else {
+            assertEquals("ctime of underreplicated ledger should not be set", UnderreplicatedLedger.UNASSIGNED_CTIME,
+                    underreplicatedLedger.getCtime());
+        }
         bkAdmin.close();
     }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieDecommissionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieDecommissionTest.java
index b55a8c6..906db8f 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieDecommissionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieDecommissionTest.java
@@ -22,8 +22,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.bookie.Bookie;
@@ -31,6 +29,7 @@ import org.apache.bookkeeper.client.BKException.BKIllegalOpException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.common.testing.annotations.FlakyTest;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.UnderreplicatedLedger;
 import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.junit.Test;
@@ -92,11 +91,10 @@ public class BookieDecommissionTest extends BookKeeperClusterTestCase {
         bkAdmin.decommissionBookie(Bookie.getBookieAddress(killedBookieConf));
         bkAdmin.triggerAudit();
         Thread.sleep(500);
-        Iterator<Map.Entry<Long, List<String>>> ledgersToRereplicate = urLedgerMgr.listLedgersToRereplicate(null,
-                false);
+        Iterator<UnderreplicatedLedger> ledgersToRereplicate = urLedgerMgr.listLedgersToRereplicate(null);
         if (ledgersToRereplicate.hasNext()) {
             while (ledgersToRereplicate.hasNext()) {
-                Long ledgerId = ledgersToRereplicate.next().getKey();
+                Long ledgerId = ledgersToRereplicate.next().getLedgerId();
                 log.error("Ledger: {} is underreplicated which is not expected", ledgerId);
             }
             fail("There are not supposed to be any underreplicatedledgers");
@@ -106,10 +104,10 @@ public class BookieDecommissionTest extends BookKeeperClusterTestCase {
         bkAdmin.decommissionBookie(Bookie.getBookieAddress(killedBookieConf));
         bkAdmin.triggerAudit();
         Thread.sleep(500);
-        ledgersToRereplicate = urLedgerMgr.listLedgersToRereplicate(null, false);
+        ledgersToRereplicate = urLedgerMgr.listLedgersToRereplicate(null);
         if (ledgersToRereplicate.hasNext()) {
             while (ledgersToRereplicate.hasNext()) {
-                Long ledgerId = ledgersToRereplicate.next().getKey();
+                Long ledgerId = ledgersToRereplicate.next().getLedgerId();
                 log.error("Ledger: {} is underreplicated which is not expected", ledgerId);
             }
             fail("There are not supposed to be any underreplicatedledgers");
@@ -166,11 +164,10 @@ public class BookieDecommissionTest extends BookKeeperClusterTestCase {
         bkAdmin.decommissionBookie(Bookie.getBookieAddress(killedBookieConf));
         bkAdmin.triggerAudit();
         Thread.sleep(500);
-        Iterator<Map.Entry<Long, List<String>>> ledgersToRereplicate = urLedgerMgr.listLedgersToRereplicate(null,
-                false);
+        Iterator<UnderreplicatedLedger> ledgersToRereplicate = urLedgerMgr.listLedgersToRereplicate(null);
         if (ledgersToRereplicate.hasNext()) {
             while (ledgersToRereplicate.hasNext()) {
-                Long ledgerId = ledgersToRereplicate.next().getKey();
+                long ledgerId = ledgersToRereplicate.next().getLedgerId();
                 log.error("Ledger: {} is underreplicated which is not expected", ledgerId);
             }
             fail("There are not supposed to be any underreplicatedledgers");
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 08d97a9..1eaa191 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -909,6 +909,10 @@ zkEnableSecurity=false
 # How long to wait, in seconds, before starting auto recovery of a lost bookie
 # lostBookieRecoveryDelay=0
 
+# Enable the Auditor to use system time as underreplicated ledger mark time.
+# If this is enabled, Auditor will write a ctime field into the underreplicated ledger znode.
+# storeSystemTimeAsLedgerUnderreplicatedMarkTime=false
+
 #############################################################################
 ## Replication Worker settings
 #############################################################################
diff --git a/site/_data/config/bk_server.yaml b/site/_data/config/bk_server.yaml
index afd0792..fd4f9a2 100644
--- a/site/_data/config/bk_server.yaml
+++ b/site/_data/config/bk_server.yaml
@@ -646,6 +646,8 @@ groups:
   - param: lostBookieRecoveryDelay
     description: How long to wait, in seconds, before starting autorecovery of a lost bookie.
     default: 0
+  - param: storeSystemTimeAsLedgerUnderreplicatedMarkTime
+    description: Enable the Auditor to use system time as underreplicated ledger mark time. If this is enabled, Auditor will write a ctime field into the underreplicated ledger znode.
 
 - name: AutoRecovery replication worker settings
   params: