You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by iv...@apache.org on 2018/12/04 11:03:19 UTC

[bookkeeper] branch master updated: Add max ledger metadata format version to layout

This is an automated email from the ASF dual-hosted git repository.

ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new dce4fd4  Add max ledger metadata format version to layout
dce4fd4 is described below

commit dce4fd4c16e5423a76a4be7aa76b23a52335c7fa
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Tue Dec 4 12:03:14 2018 +0100

    Add max ledger metadata format version to layout
    
    The max ledger metadata format version is the maximum format version
    that will be used to write ledger metadata. By setting it in the
    ledger layout it becomes a cluster-wide configuration which is
    initialized along with the cluster. Any cluster initialized with the
    current code will end up with version 2. For this cluster serde will
    only ever serialize with up to version 2 of the ledger metadata
    format, so all clients that understand version 2 will continue to
    work, even as the software version increases and new metadata formats
    become available (such as the binary metadata format coming soon).
    
    Currently there is no handling in LedgerMetadataSerDe, because we
    don't have a new version, but when there is, it will use
    ```
    Math.min(maxLedgerMetadataFormatVersion,
             metadata.getMetadataFormatVersion())
    ```
    to decide which serialization method to use.
    
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
    
    This closes #1858 from ivankelly/format-version
---
 .../org/apache/bookkeeper/bookie/BookieShell.java  |  7 +++-
 .../bookkeeper/conf/AbstractConfiguration.java     | 24 ++++++++++++
 .../bookkeeper/meta/AbstractZkLedgerManager.java   |  2 +-
 .../meta/AbstractZkLedgerManagerFactory.java       |  1 +
 .../org/apache/bookkeeper/meta/LedgerLayout.java   | 43 ++++++++++++++++++----
 .../bookkeeper/meta/LedgerMetadataSerDe.java       |  6 +++
 .../bookkeeper/meta/MSLedgerManagerFactory.java    |  2 +-
 .../server/http/service/GetLedgerMetaService.java  |  2 +-
 .../server/http/service/ListLedgerService.java     |  2 +-
 .../bookkeeper/client/LedgerMetadataTest.java      |  6 ++-
 .../meta/AbstractZkLedgerManagerTest.java          |  2 +-
 .../apache/bookkeeper/meta/MockLedgerManager.java  |  2 +-
 .../apache/bookkeeper/meta/TestLedgerLayout.java   | 43 ++++++++++++++++++++++
 .../bookkeeper/server/http/TestHttpService.java    |  3 +-
 .../metadata/etcd/EtcdLedgerManager.java           | 30 ++++++++-------
 .../metadata/etcd/EtcdLedgerManagerFactory.java    |  4 +-
 .../metadata/etcd/EtcdLedgerManagerTest.java       |  3 +-
 17 files changed, 147 insertions(+), 35 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 9108ef3..9c6180b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -1106,7 +1106,7 @@ public class BookieShell implements Tool {
     void printLedgerMetadata(long ledgerId, LedgerMetadata md, boolean printMeta) {
         System.out.println("ledgerID: " + ledgerIdFormatter.formatLedgerId(ledgerId));
         if (printMeta) {
-            System.out.println(new String(new LedgerMetadataSerDe().serialize(md), UTF_8));
+            System.out.println(md.toString());
         }
     }
 
@@ -1115,7 +1115,10 @@ public class BookieShell implements Tool {
      */
     class LedgerMetadataCmd extends MyCommand {
         Options lOpts = new Options();
-        LedgerMetadataSerDe serDe = new LedgerMetadataSerDe();
+        // the max version won't actually take effect as this tool
+        // never creates new metadata (there'll already be a format version in the existing metadata)
+        LedgerMetadataSerDe serDe = new LedgerMetadataSerDe(
+                LedgerMetadataSerDe.CURRENT_METADATA_FORMAT_VERSION);
 
         LedgerMetadataCmd() {
             super(CMD_LEDGERMETADATA);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index 7f00d09..3057bfe 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -85,6 +85,7 @@ public abstract class AbstractConfiguration<T extends AbstractConfiguration>
     protected static final String STORE_SYSTEMTIME_AS_LEDGER_UNDERREPLICATED_MARK_TIME =
             "storeSystemTimeAsLedgerUnderreplicatedMarkTime";
     protected static final String STORE_SYSTEMTIME_AS_LEDGER_CREATION_TIME = "storeSystemTimeAsLedgerCreationTime";
+    protected static final String MAX_LEDGER_METADATA_FORMAT_VERSION = "maxLedgerMetadataFormatVersion";
 
     // Metastore settings, only being used when LEDGER_MANAGER_FACTORY_CLASS is MSLedgerManagerFactory
     protected static final String METASTORE_IMPL_CLASS = "metastoreImplClass";
@@ -856,6 +857,29 @@ public abstract class AbstractConfiguration<T extends AbstractConfiguration>
     }
 
     /**
+     * Set the maximum format version which should be used when serializing ledger metadata.
+     * Setting the maximum format allows us to ensure that all clients running against a cluster
+     * will be able to read back all written metadata.
+     * Normally this is set when loading the layout from the metadata store. Users should not
+     * set this directly.
+     */
+    public T setMaxLedgerMetadataFormatVersion(int maxVersion) {
+        setProperty(MAX_LEDGER_METADATA_FORMAT_VERSION, maxVersion);
+        return getThis();
+    }
+
+    /**
+     * Get the maximum format version which should be used when serializing ledger metadata.
+     * The default is 2, as that was the current version when this functionallity was introduced.
+     *
+     * @see #setMaxLedgerMetadataFormatVersion(int)
+     * @return the maximum format version with which to serialize metadata.
+     */
+    public int getMaxLedgerMetadataFormatVersion() {
+        return getInteger(MAX_LEDGER_METADATA_FORMAT_VERSION, 2);
+    }
+
+    /**
      * Whether to preserve MDC for tasks in Executor.
      *
      * @return flag to enable/disable MDC preservation in Executor.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
index 5dbdd06..97413b7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
@@ -160,7 +160,7 @@ public abstract class AbstractZkLedgerManager implements LedgerManager, Watcher
      *          ZooKeeper Client Handle
      */
     protected AbstractZkLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
-        this.serDe = new LedgerMetadataSerDe();
+        this.serDe = new LedgerMetadataSerDe(conf.getMaxLedgerMetadataFormatVersion());
         this.conf = conf;
         this.zk = zk;
         this.ledgerRootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerFactory.java
index 72bc3e8..4bb2fb1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerFactory.java
@@ -178,6 +178,7 @@ public abstract class AbstractZkLedgerManagerFactory implements LedgerManagerFac
         if (log.isDebugEnabled()) {
             log.debug("read ledger layout {}", layout);
         }
+        conf.setMaxLedgerMetadataFormatVersion(layout.getMaxLedgerMetadataFormatVersion());
 
         // there is existing layout, we need to look into the layout.
         // handle pre V2 layout
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerLayout.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerLayout.java
index eb5b705..c39e6b6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerLayout.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerLayout.java
@@ -39,8 +39,14 @@ public class LedgerLayout {
     // version of ledger layout metadata
     public static final int LAYOUT_FORMAT_VERSION = 2;
 
-    private static final String splitter = ":";
-    private static final String lSplitter = "\n";
+    private static final String FIELD_SPLITTER = ":";
+    private static final String LINE_SPLITTER = "\n";
+
+    // For version 2 and below, max ledger metadata format wasn't stored in the layout
+    // so assume 2 if it is missing.
+    private static final int DEFAULT_MAX_LEDGER_METADATA_FORMAT_VERSION = 2;
+    private static final String MAX_LEDGER_METADATA_FORMAT_VERSION_FIELD =
+        "MAX_LEDGER_METADATA_FORMAT_VERSION";
 
     // ledger manager factory class
     private final String managerFactoryClass;
@@ -50,6 +56,9 @@ public class LedgerLayout {
     // layout version of how to store layout information
     private final int layoutFormatVersion;
 
+    // maximum format version that can be used for storing ledger metadata
+    private final int maxLedgerMetadataFormatVersion;
+
     /**
      * Ledger Layout Constructor.
      *
@@ -59,13 +68,17 @@ public class LedgerLayout {
      *          Ledger Manager Version
      */
     public LedgerLayout(String managerFactoryCls, int managerVersion) {
-        this(managerFactoryCls, managerVersion, LAYOUT_FORMAT_VERSION);
+        this(managerFactoryCls, managerVersion,
+             LedgerMetadataSerDe.CURRENT_METADATA_FORMAT_VERSION,
+             LAYOUT_FORMAT_VERSION);
     }
 
     LedgerLayout(String managerFactoryCls, int managerVersion,
+                 int maxLedgerMetadataFormatVersion,
                  int layoutVersion) {
         this.managerFactoryClass = managerFactoryCls;
         this.managerVersion = managerVersion;
+        this.maxLedgerMetadataFormatVersion = maxLedgerMetadataFormatVersion;
         this.layoutFormatVersion = layoutVersion;
     }
 
@@ -76,8 +89,11 @@ public class LedgerLayout {
      */
     public byte[] serialize() throws IOException {
         String s =
-          new StringBuilder().append(layoutFormatVersion).append(lSplitter)
-              .append(managerFactoryClass).append(splitter).append(managerVersion).toString();
+          new StringBuilder().append(layoutFormatVersion).append(LINE_SPLITTER)
+            .append(managerFactoryClass).append(FIELD_SPLITTER).append(managerVersion).append(LINE_SPLITTER)
+            .append(MAX_LEDGER_METADATA_FORMAT_VERSION_FIELD).append(FIELD_SPLITTER)
+            .append(maxLedgerMetadataFormatVersion)
+            .toString();
 
         if (log.isDebugEnabled()) {
             log.debug("Serialized layout info: {}", s);
@@ -100,7 +116,7 @@ public class LedgerLayout {
             log.debug("Parsing Layout: {}", layout);
         }
 
-        String lines[] = layout.split(lSplitter);
+        String lines[] = layout.split(LINE_SPLITTER);
 
         try {
             int layoutFormatVersion = Integer.parseInt(lines[0]);
@@ -113,7 +129,7 @@ public class LedgerLayout {
                 throw new IOException("Ledger manager and its version absent from layout: " + layout);
             }
 
-            String[] parts = lines[1].split(splitter);
+            String[] parts = lines[1].split(FIELD_SPLITTER);
             if (parts.length != 2) {
                 throw new IOException("Invalid Ledger Manager defined in layout : " + layout);
             }
@@ -121,7 +137,18 @@ public class LedgerLayout {
             String managerFactoryCls = parts[0];
             // ledger manager version
             int managerVersion = Integer.parseInt(parts[1]);
-            return new LedgerLayout(managerFactoryCls, managerVersion, layoutFormatVersion);
+
+            int maxLedgerMetadataFormatVersion = DEFAULT_MAX_LEDGER_METADATA_FORMAT_VERSION;
+            if (lines.length >= 3) {
+                String[] metadataFormatParts = lines[2].split(FIELD_SPLITTER);
+                if (metadataFormatParts.length != 2
+                    || !metadataFormatParts[0].equals(MAX_LEDGER_METADATA_FORMAT_VERSION_FIELD)) {
+                    throw new IOException("Invalid field for max ledger metadata format:" + lines[2]);
+                }
+                maxLedgerMetadataFormatVersion = Integer.parseInt(metadataFormatParts[1]);
+            }
+            return new LedgerLayout(managerFactoryCls, managerVersion,
+                                    maxLedgerMetadataFormatVersion, layoutFormatVersion);
         } catch (NumberFormatException e) {
             throw new IOException("Could not parse layout '" + layout + "'", e);
         }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerMetadataSerDe.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerMetadataSerDe.java
index 6020a3b..4653af4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerMetadataSerDe.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerMetadataSerDe.java
@@ -63,6 +63,12 @@ public class LedgerMetadataSerDe {
     private static final String V1_CLOSED_TAG = "CLOSED";
     private static final int V1_IN_RECOVERY_ENTRY_ID = -102;
 
+    private final int maxLedgerMetadataFormatVersion;
+
+    public LedgerMetadataSerDe(int maxLedgerMetadataFormatVersion) {
+        this.maxLedgerMetadataFormatVersion = maxLedgerMetadataFormatVersion;
+    }
+
     public byte[] serialize(LedgerMetadata metadata) {
         if (metadata.getMetadataFormatVersion() == 1) {
             return serializeVersion1(metadata);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
index fc87632..38c6816 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
@@ -284,7 +284,7 @@ public class MSLedgerManagerFactory extends AbstractZkLedgerManagerFactory {
             this.conf = conf;
             this.zk = zk;
             this.metastore = metastore;
-            this.serDe = new LedgerMetadataSerDe();
+            this.serDe = new LedgerMetadataSerDe(conf.getMaxLedgerMetadataFormatVersion());
 
             try {
                 ledgerTable = metastore.createScannableTable(TABLE_NAME);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/GetLedgerMetaService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/GetLedgerMetaService.java
index 7fcaeda..537b50c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/GetLedgerMetaService.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/GetLedgerMetaService.java
@@ -53,7 +53,7 @@ public class GetLedgerMetaService implements HttpEndpointService {
         checkNotNull(conf);
         this.conf = conf;
         this.bookieServer = bookieServer;
-        this.serDe = new LedgerMetadataSerDe();
+        this.serDe = new LedgerMetadataSerDe(LedgerMetadataSerDe.CURRENT_METADATA_FORMAT_VERSION);
     }
 
     @Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/ListLedgerService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/ListLedgerService.java
index f553b64..13022e4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/ListLedgerService.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/ListLedgerService.java
@@ -58,7 +58,7 @@ public class ListLedgerService implements HttpEndpointService {
         checkNotNull(conf);
         this.conf = conf;
         this.bookieServer = bookieServer;
-        this.serDe = new LedgerMetadataSerDe();
+        this.serDe = new LedgerMetadataSerDe(LedgerMetadataSerDe.CURRENT_METADATA_FORMAT_VERSION);
 
     }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerMetadataTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerMetadataTest.java
index a9ffc2d..d9141e6 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerMetadataTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerMetadataTest.java
@@ -80,7 +80,8 @@ public class LedgerMetadataTest {
             .withCreationTime(System.currentTimeMillis())
             .storingCreationTime(true)
             .build();
-        LedgerMetadataFormat format = new LedgerMetadataSerDe().buildProtoFormat(lm);
+        LedgerMetadataFormat format = new LedgerMetadataSerDe(LedgerMetadataSerDe.CURRENT_METADATA_FORMAT_VERSION)
+            .buildProtoFormat(lm);
         assertTrue(format.hasCtime());
     }
 
@@ -94,7 +95,8 @@ public class LedgerMetadataTest {
         LedgerMetadata lm = LedgerMetadataBuilder.create()
             .newEnsembleEntry(0L, ensemble).build();
 
-        LedgerMetadataFormat format = new LedgerMetadataSerDe().buildProtoFormat(lm);
+        LedgerMetadataFormat format = new LedgerMetadataSerDe(LedgerMetadataSerDe.CURRENT_METADATA_FORMAT_VERSION)
+            .buildProtoFormat(lm);
         assertFalse(format.hasCtime());
     }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
index 37f55b3..20a8d51 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
@@ -142,7 +142,7 @@ public class AbstractZkLedgerManagerTest extends MockZooKeeperTestCase {
         assertSame(conf, ledgerManager.conf);
         assertSame(scheduler, ledgerManager.scheduler);
 
-        this.serDe = new LedgerMetadataSerDe();
+        this.serDe = new LedgerMetadataSerDe(LedgerMetadataSerDe.CURRENT_METADATA_FORMAT_VERSION);
     }
 
     @After
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MockLedgerManager.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MockLedgerManager.java
index 398bb07..a73adeb 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MockLedgerManager.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MockLedgerManager.java
@@ -72,7 +72,7 @@ public class MockLedgerManager implements LedgerManager {
         this.metadataMap = metadataMap;
         this.executor = executor;
         this.ownsExecutor = ownsExecutor;
-        this.serDe = new LedgerMetadataSerDe();
+        this.serDe = new LedgerMetadataSerDe(LedgerMetadataSerDe.CURRENT_METADATA_FORMAT_VERSION);
     }
 
     public MockLedgerManager newClient() {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerLayout.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerLayout.java
index 62315fe..16b30bd 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerLayout.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerLayout.java
@@ -18,8 +18,12 @@
  */
 package org.apache.bookkeeper.meta;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
 
 import org.junit.Test;
 
@@ -63,4 +67,43 @@ public class TestLedgerLayout {
             hierarchical1.getLayoutFormatVersion());
     }
 
+    @Test
+    public void testParseNoMaxLedgerMetadataFormatVersion() throws Exception {
+        LedgerLayout layout = LedgerLayout.parseLayout("1\nblahblahLM:3".getBytes(UTF_8));
+
+        assertEquals(layout.getMaxLedgerMetadataFormatVersion(), 2);
+    }
+
+    @Test
+    public void testParseWithMaxLedgerMetadataFormatVersion() throws Exception {
+        LedgerLayout layout = LedgerLayout.parseLayout(
+                "1\nblahblahLM:3\nMAX_LEDGER_METADATA_FORMAT_VERSION:123".getBytes(UTF_8));
+
+        assertEquals(layout.getMaxLedgerMetadataFormatVersion(), 123);
+    }
+
+    @Test
+    public void testCorruptMaxLedgerLayout() throws Exception {
+        try {
+            LedgerLayout.parseLayout("1\nblahblahLM:3\nMAXXX_LEDGER_METADATA_FORMAT_VERSION:123".getBytes(UTF_8));
+            fail("Shouldn't have been able to parse");
+        } catch (IOException ioe) {
+            // expected
+        }
+
+        try {
+            LedgerLayout.parseLayout("1\nblahblahLM:3\nMAXXX_LEDGER_METADATA_FORMAT_VERSION:blah".getBytes(UTF_8));
+            fail("Shouldn't have been able to parse");
+        } catch (IOException ioe) {
+            // expected
+        }
+    }
+
+    @Test
+    public void testMoreFieldsAdded() throws Exception {
+        LedgerLayout layout = LedgerLayout.parseLayout(
+                "1\nblahblahLM:3\nMAX_LEDGER_METADATA_FORMAT_VERSION:123\nFOO:BAR".getBytes(UTF_8));
+
+        assertEquals(layout.getMaxLedgerMetadataFormatVersion(), 123);
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java
index c4d441d..e88d29d 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java
@@ -387,7 +387,8 @@ public class TestHttpService extends BookKeeperClusterTestCase {
         assertEquals(1, respBody.size());
         // verify LedgerMetadata content is equal
         assertTrue(respBody.get(ledgerId.toString()).toString()
-                .equals(new String(new LedgerMetadataSerDe().serialize(lh[0].getLedgerMetadata()))));
+                .equals(new String(new LedgerMetadataSerDe(LedgerMetadataSerDe.CURRENT_METADATA_FORMAT_VERSION)
+                                   .serialize(lh[0].getLedgerMetadata()))));
     }
 
     @Test
diff --git a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
index d571bf8..e20a9ba 100644
--- a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
+++ b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
@@ -45,6 +45,7 @@ import java.util.function.Function;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerMetadataSerDe;
 import org.apache.bookkeeper.metadata.etcd.helpers.KeyIterator;
@@ -64,19 +65,8 @@ import org.apache.zookeeper.AsyncCallback.VoidCallback;
 @Slf4j
 class EtcdLedgerManager implements LedgerManager {
 
-    private final LedgerMetadataSerDe serDe = new LedgerMetadataSerDe();
-    private final Function<ByteSequence, LedgerMetadata> ledgerMetadataFunction = bs -> {
-        try {
-            return serDe.parseConfig(
-                bs.getBytes(),
-                Optional.empty()
-            );
-        } catch (IOException ioe) {
-            log.error("Could not parse ledger metadata : {}", bs.toStringUtf8(), ioe);
-            throw new RuntimeException(
-                "Could not parse ledger metadata : " + bs.toStringUtf8(), ioe);
-        }
-    };
+    private final LedgerMetadataSerDe serDe;
+    private final Function<ByteSequence, LedgerMetadata> ledgerMetadataFunction;
 
     private final String scope;
     private final Client client;
@@ -89,12 +79,24 @@ class EtcdLedgerManager implements LedgerManager {
 
     private volatile boolean closed = false;
 
-    EtcdLedgerManager(Client client,
+    EtcdLedgerManager(AbstractConfiguration conf,
+                      Client client,
                       String scope) {
         this.client = client;
         this.kvClient = client.getKVClient();
         this.scope = scope;
         this.watchClient = new EtcdWatchClient(client);
+        this.serDe = new LedgerMetadataSerDe(conf.getMaxLedgerMetadataFormatVersion());
+
+        this.ledgerMetadataFunction = bs -> {
+            try {
+                return serDe.parseConfig(bs.getBytes(), Optional.empty());
+            } catch (IOException ioe) {
+                log.error("Could not parse ledger metadata : {}", bs.toStringUtf8(), ioe);
+                throw new RuntimeException(
+                        "Could not parse ledger metadata : " + bs.toStringUtf8(), ioe);
+            }
+        };
     }
 
     private boolean isClosed() {
diff --git a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerFactory.java b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerFactory.java
index dc7d151..9c19df0 100644
--- a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerFactory.java
+++ b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerFactory.java
@@ -42,6 +42,7 @@ class EtcdLedgerManagerFactory implements LedgerManagerFactory {
 
     private String scope;
     private Client client;
+    private AbstractConfiguration conf;
 
     @Override
     public int getCurrentVersion() {
@@ -66,6 +67,7 @@ class EtcdLedgerManagerFactory implements LedgerManagerFactory {
             throw new IOException("Invalid metadata service uri", e);
         }
         this.client = etcdLayoutManager.getClient();
+        this.conf = conf;
         return this;
     }
 
@@ -82,7 +84,7 @@ class EtcdLedgerManagerFactory implements LedgerManagerFactory {
 
     @Override
     public LedgerManager newLedgerManager() {
-        return new EtcdLedgerManager(client, scope);
+        return new EtcdLedgerManager(conf, client, scope);
     }
 
     @Override
diff --git a/metadata-drivers/etcd/src/test/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerTest.java b/metadata-drivers/etcd/src/test/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerTest.java
index 984224c..27dc84c 100644
--- a/metadata-drivers/etcd/src/test/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerTest.java
+++ b/metadata-drivers/etcd/src/test/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerTest.java
@@ -53,6 +53,7 @@ import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerMetadataBuilder;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
 import org.apache.bookkeeper.metadata.etcd.helpers.ValueStream;
@@ -81,7 +82,7 @@ public class EtcdLedgerManagerTest extends EtcdTestBase {
     public void setUp() throws Exception {
         super.setUp();
         this.scope = RandomStringUtils.randomAlphabetic(8);
-        this.lm = new EtcdLedgerManager(etcdClient, scope);
+        this.lm = new EtcdLedgerManager(new ClientConfiguration(), etcdClient, scope);
     }
 
     @Override