You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2019/01/17 00:26:48 UTC

[bookkeeper] branch master updated: Binary metadata format

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 5343665  Binary metadata format
5343665 is described below

commit 534366536ea8180b405ad430604389a8948cf42f
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Wed Jan 16 16:26:44 2019 -0800

    Binary metadata format
    
    This patch adds a binary metadata format and bumps the metadata format
    version to 3. The contents of the binary metadata is the same as the
    contents of the text format for now. The difference is that the binary
    is more compact, and the fields can be added to the metadata when
    using the binary format, which isn't possible with the text
    format. With the text format, parsing with a client that didn't
    recognise the new field would fail.
    
    For now, the text format (version 2) is still used by default. We will
    provide a tool to allow administrators to bump to version 3.
    
    Some tests have been modified to provide digest and password to the
    builder. All protobuf metadata in released versions has had digest and
    password (first protobuf metadata was in release-4.2.0). So if new
    metadata is created or read with version 2, it will have this two
    fields set.
    
    Master issue: #723
    
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Matteo Merli <mm...@apache.org>, Sijie Guo <si...@apache.org>
    
    This closes #1866 from ivankelly/binary-metadata
---
 .../org/apache/bookkeeper/bookie/BookieShell.java  |   2 +-
 .../org/apache/bookkeeper/client/BKException.java  |  15 +
 .../apache/bookkeeper/client/api/BKException.java  |   9 +
 .../bookkeeper/meta/AbstractZkLedgerManager.java   |  22 +-
 .../bookkeeper/meta/LedgerMetadataSerDe.java       | 422 ++++++++++++++-------
 .../bookkeeper/meta/MSLedgerManagerFactory.java    |  26 +-
 .../bookkeeper/client/LedgerMetadataTest.java      |  32 --
 .../bookkeeper/client/MetadataUpdateLoopTest.java  |   7 +
 .../apache/bookkeeper/client/MockLedgerHandle.java |   8 +-
 .../bookkeeper/client/TestWatchEnsembleChange.java |   1 +
 .../meta/AbstractZkLedgerManagerTest.java          |   2 +
 .../org/apache/bookkeeper/meta/GcLedgersTest.java  |   3 +
 .../apache/bookkeeper/meta/MockLedgerManager.java  |   2 +-
 .../bookkeeper/meta/TestLedgerMetadataSerDe.java   | 184 +++++++++
 .../metadata/etcd/EtcdLedgerManager.java           |  23 +-
 15 files changed, 579 insertions(+), 179 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index dc9137e..9df7f49 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -1166,7 +1166,7 @@ public class BookieShell implements Tool {
     void printLedgerMetadata(long ledgerId, LedgerMetadata md, boolean printMeta) {
         System.out.println("ledgerID: " + ledgerIdFormatter.formatLedgerId(ledgerId));
         if (printMeta) {
-            System.out.println(new String(new LedgerMetadataSerDe().serialize(md), UTF_8));
+            System.out.println(md.toString());
         }
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
index 912fb1e..b21d745 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
@@ -122,6 +122,8 @@ public abstract class BKException extends org.apache.bookkeeper.client.api.BKExc
             return new BKLedgerIdOverflowException();
         case Code.SecurityException:
             return new BKSecurityException();
+        case Code.MetadataSerializationException:
+            return new BKMetadataSerializationException();
         default:
             return new BKUnexpectedConditionException();
         }
@@ -436,6 +438,19 @@ public abstract class BKException extends org.apache.bookkeeper.client.api.BKExc
     }
 
     /**
+     * Bookkeeper metadata serialization exception.
+     */
+    public static class BKMetadataSerializationException extends BKException {
+        public BKMetadataSerializationException() {
+            super(Code.MetadataSerializationException);
+        }
+
+        public BKMetadataSerializationException(Throwable cause) {
+            super(Code.MetadataSerializationException, cause);
+        }
+    }
+
+    /**
      * Extract an exception code from an BKException, or use a default if it's another type.
      * The throwable is null, assume that no exception took place and return
      * {@link BKException.Code.OK}.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BKException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BKException.java
index cc28158..058a9a7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BKException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/BKException.java
@@ -167,6 +167,8 @@ public class BKException extends Exception {
             return "Bookie operation timeout";
         case Code.SecurityException:
             return "Failed to establish a secure connection";
+        case Code.MetadataSerializationException:
+            return "Failed to serialize metadata";
         default:
             return "Unexpected condition";
         }
@@ -272,6 +274,13 @@ public class BKException extends Exception {
         int LedgerIdOverflowException = -106;
 
         /**
+         * Failure to serialize metadata.
+         *
+         * @since 4.9
+         */
+        int MetadataSerializationException = -107;
+
+        /**
          * Generic exception code used to propagate in replication pipeline.
          */
         int ReplicationException = -200;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
index 5dbdd06..96c2f0f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
@@ -265,9 +265,17 @@ public abstract class AbstractZkLedgerManager implements LedgerManager, Watcher
                 }
             }
         };
+        final byte[] data;
+        try {
+            data = serDe.serialize(metadata);
+        } catch (IOException ioe) {
+            promise.completeExceptionally(new BKException.BKMetadataSerializationException(ioe));
+            return promise;
+        }
+
         List<ACL> zkAcls = ZkUtils.getACLs(conf);
-        ZkUtils.asyncCreateFullPathOptimistic(zk, ledgerPath, serDe.serialize(metadata), zkAcls,
-                CreateMode.PERSISTENT, scb, null);
+        ZkUtils.asyncCreateFullPathOptimistic(zk, ledgerPath, data, zkAcls,
+                                              CreateMode.PERSISTENT, scb, null);
         return promise;
     }
 
@@ -422,8 +430,16 @@ public abstract class AbstractZkLedgerManager implements LedgerManager, Watcher
             return promise;
         }
         final LongVersion zv = (LongVersion) currentVersion;
+
+        final byte[] data;
+        try {
+            data = serDe.serialize(metadata);
+        } catch (IOException ioe) {
+            promise.completeExceptionally(new BKException.BKMetadataSerializationException(ioe));
+            return promise;
+        }
         zk.setData(getLedgerPath(ledgerId),
-                   serDe.serialize(metadata), (int) zv.getLongVersion(),
+                   data, (int) zv.getLongVersion(),
                    new StatCallback() {
             @Override
             public void processResult(int rc, String path, Object ctx, Stat stat) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerMetadataSerDe.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerMetadataSerDe.java
index 6020a3b..aab72fb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerMetadataSerDe.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerMetadataSerDe.java
@@ -19,16 +19,23 @@ package org.apache.bookkeeper.meta;
 
 import static com.google.common.base.Charsets.UTF_8;
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.TextFormat;
 
 import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.StringReader;
-import java.nio.CharBuffer;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
+import java.util.Base64;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -51,122 +58,268 @@ import org.slf4j.LoggerFactory;
 public class LedgerMetadataSerDe {
     private static final Logger log = LoggerFactory.getLogger(LedgerMetadataSerDe.class);
 
-    public static final int CURRENT_METADATA_FORMAT_VERSION = 2;
-    private static final int LOWEST_COMPAT_METADATA_FORMAT_VERSION = 0;
+    /**
+     * Text based manual serialization.
+     * Available from v4.0.x onwards.
+     */
+    public static final int METADATA_FORMAT_VERSION_1 = 1;
+
+    /**
+     * Protobuf based, serialized using TextFormat.
+     * Available from v4.2.x onwards.
+     * Can contain ctime or not, but if it contains ctime it can only be parse by v4.4.x onwards.
+     */
+    public static final int METADATA_FORMAT_VERSION_2 = 2;
+
+    /**
+     * Protobuf based, serialized in binary format.
+     * Available from v4.9.x onwards.
+     */
+    public static final int METADATA_FORMAT_VERSION_3 = 3;
+
+    public static final int MAXIMUM_METADATA_FORMAT_VERSION = METADATA_FORMAT_VERSION_3;
+    public static final int CURRENT_METADATA_FORMAT_VERSION = METADATA_FORMAT_VERSION_2;
+    private static final int LOWEST_COMPAT_METADATA_FORMAT_VERSION = METADATA_FORMAT_VERSION_1;
 
     // for pulling the version
-    private static final String VERSION_KEY = "BookieMetadataFormatVersion";
+    private static final int MAX_VERSION_DIGITS = 10;
+    private static final byte[] VERSION_KEY_BYTES = "BookieMetadataFormatVersion\t".getBytes(UTF_8);
     private static final String LINE_SPLITTER = "\n";
+    private static final byte[] LINE_SPLITTER_BYTES = LINE_SPLITTER.getBytes(UTF_8);
     private static final String FIELD_SPLITTER = "\t";
 
     // old V1 constants
     private static final String V1_CLOSED_TAG = "CLOSED";
     private static final int V1_IN_RECOVERY_ENTRY_ID = -102;
 
-    public byte[] serialize(LedgerMetadata metadata) {
-        if (metadata.getMetadataFormatVersion() == 1) {
-            return serializeVersion1(metadata);
-        }
-
-        StringBuilder s = new StringBuilder();
-        s.append(VERSION_KEY).append(FIELD_SPLITTER)
-            .append(CURRENT_METADATA_FORMAT_VERSION).append(LINE_SPLITTER);
-        s.append(TextFormat.printToString(buildProtoFormat(metadata)));
-        if (log.isDebugEnabled()) {
-            log.debug("Serialized config: {}", s);
-        }
-        return s.toString().getBytes(UTF_8);
+    private static void writeHeader(OutputStream os, int version) throws IOException {
+        os.write(VERSION_KEY_BYTES);
+        os.write(String.valueOf(version).getBytes(UTF_8));
+        os.write(LINE_SPLITTER_BYTES);
     }
 
-    private byte[] serializeVersion1(LedgerMetadata metadata) {
-        StringBuilder s = new StringBuilder();
-        s.append(VERSION_KEY).append(FIELD_SPLITTER)
-            .append(metadata.getMetadataFormatVersion()).append(LINE_SPLITTER);
-        s.append(metadata.getWriteQuorumSize()).append(LINE_SPLITTER)
-            .append(metadata.getEnsembleSize()).append(LINE_SPLITTER).append(metadata.getLength());
-
-        for (Map.Entry<Long, ? extends List<BookieSocketAddress>> entry : metadata.getAllEnsembles().entrySet()) {
-            s.append(LINE_SPLITTER).append(entry.getKey());
-            for (BookieSocketAddress addr : entry.getValue()) {
-                s.append(FIELD_SPLITTER);
-                s.append(addr.toString());
-            }
-        }
+    private static int readHeader(InputStream is) throws IOException {
+        checkState(LINE_SPLITTER_BYTES.length == 1, "LINE_SPLITTER must be single byte");
 
-        if (metadata.getState() == State.IN_RECOVERY) {
-            s.append(LINE_SPLITTER).append(V1_IN_RECOVERY_ENTRY_ID)
-                .append(FIELD_SPLITTER).append(V1_CLOSED_TAG);
-        } else if (metadata.getState() == State.CLOSED) {
-            s.append(LINE_SPLITTER).append(metadata.getLastEntryId())
-                .append(FIELD_SPLITTER).append(V1_CLOSED_TAG);
-        } else {
-            checkArgument(metadata.getState() == State.OPEN,
-                          String.format("Unknown state %s for V1 serialization", metadata.getState()));
+        for (int i = 0; i < VERSION_KEY_BYTES.length; i++) {
+            int b = is.read();
+            if (b < 0 || ((byte) b) != VERSION_KEY_BYTES[i]) {
+                throw new IOException("Ledger metadata header corrupt at index " + i);
+            }
         }
-
-        if (log.isDebugEnabled()) {
-            log.debug("Serialized config: {}", s);
+        byte[] versionBuf = new byte[MAX_VERSION_DIGITS];
+        int i = 0;
+        while (i < MAX_VERSION_DIGITS) {
+            int b = is.read();
+            if (b == LINE_SPLITTER_BYTES[0]) {
+                String versionStr = new String(versionBuf, 0, i, UTF_8);
+                try {
+                    return Integer.parseInt(versionStr);
+                } catch (NumberFormatException nfe) {
+                    throw new IOException("Unable to parse version number from " + versionStr);
+                }
+            } else if (b < 0) {
+                break;
+            } else {
+                versionBuf[i++] = (byte) b;
+            }
         }
-
-        return s.toString().getBytes(UTF_8);
+        throw new IOException("Unable to find end of version number, metadata appears corrupt");
     }
 
-    @VisibleForTesting
-    public LedgerMetadataFormat buildProtoFormat(LedgerMetadata metadata) {
-        LedgerMetadataFormat.Builder builder = LedgerMetadataFormat.newBuilder();
-        builder.setQuorumSize(metadata.getWriteQuorumSize())
-            .setAckQuorumSize(metadata.getAckQuorumSize())
-            .setEnsembleSize(metadata.getEnsembleSize())
-            .setLength(metadata.getLength())
-            .setLastEntryId(metadata.getLastEntryId());
-
-        switch (metadata.getState()) {
-        case CLOSED:
-            builder.setState(LedgerMetadataFormat.State.CLOSED);
+    public byte[] serialize(LedgerMetadata metadata) throws IOException {
+        int formatVersion = metadata.getMetadataFormatVersion();
+        final byte[] serialized;
+        switch (formatVersion) {
+        case METADATA_FORMAT_VERSION_3:
+            serialized = serializeVersion3(metadata);
             break;
-        case IN_RECOVERY:
-            builder.setState(LedgerMetadataFormat.State.IN_RECOVERY);
+        case METADATA_FORMAT_VERSION_2:
+            serialized = serializeVersion2(metadata);
             break;
-        case OPEN:
-            builder.setState(LedgerMetadataFormat.State.OPEN);
+        case METADATA_FORMAT_VERSION_1:
+            serialized = serializeVersion1(metadata);
             break;
         default:
-            checkArgument(false,
-                          String.format("Unknown state %s for protobuf serialization", metadata.getState()));
-            break;
+            throw new IllegalArgumentException("Invalid format version " + formatVersion);
         }
+        if (log.isDebugEnabled()) {
+            String serializedStr;
+            if (formatVersion > METADATA_FORMAT_VERSION_2) {
+                serializedStr = Base64.getEncoder().encodeToString(serialized);
+            } else {
+                serializedStr = new String(serialized, UTF_8);
+            }
+            log.debug("Serialized with format {}: {}", formatVersion, serializedStr);
+        }
+        return serialized;
+    }
+
+    private static byte[] serializeVersion3(LedgerMetadata metadata) throws IOException {
+        try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
+            writeHeader(os, METADATA_FORMAT_VERSION_3);
+            LedgerMetadataFormat.Builder builder = LedgerMetadataFormat.newBuilder();
+            builder.setQuorumSize(metadata.getWriteQuorumSize())
+                .setAckQuorumSize(metadata.getAckQuorumSize())
+                .setEnsembleSize(metadata.getEnsembleSize())
+                .setLength(metadata.getLength())
+                .setLastEntryId(metadata.getLastEntryId());
+
+            switch (metadata.getState()) {
+            case CLOSED:
+                builder.setState(LedgerMetadataFormat.State.CLOSED);
+                break;
+            case IN_RECOVERY:
+                builder.setState(LedgerMetadataFormat.State.IN_RECOVERY);
+                break;
+            case OPEN:
+                builder.setState(LedgerMetadataFormat.State.OPEN);
+                break;
+            default:
+                checkArgument(false,
+                              String.format("Unknown state %s for protobuf serialization", metadata.getState()));
+                break;
+            }
 
-        /** Hack to get around fact that ctime was never versioned correctly */
-        if (LedgerMetadataUtils.shouldStoreCtime(metadata)) {
             builder.setCtime(metadata.getCtime());
-        }
+            builder.setDigestType(apiToProtoDigestType(metadata.getDigestType()));
+
+            serializePassword(metadata.getPassword(), builder);
+
+            Map<String, byte[]> customMetadata = metadata.getCustomMetadata();
+            if (customMetadata.size() > 0) {
+                LedgerMetadataFormat.cMetadataMapEntry.Builder cMetadataBuilder =
+                    LedgerMetadataFormat.cMetadataMapEntry.newBuilder();
+                for (Map.Entry<String, byte[]> entry : customMetadata.entrySet()) {
+                    cMetadataBuilder.setKey(entry.getKey()).setValue(ByteString.copyFrom(entry.getValue()));
+                    builder.addCustomMetadata(cMetadataBuilder.build());
+                }
+            }
 
-        if (metadata.hasPassword()) {
-            builder.setDigestType(apiToProtoDigestType(metadata.getDigestType()))
-                .setPassword(ByteString.copyFrom(metadata.getPassword()));
+            for (Map.Entry<Long, ? extends List<BookieSocketAddress>> entry : metadata.getAllEnsembles().entrySet()) {
+                LedgerMetadataFormat.Segment.Builder segmentBuilder = LedgerMetadataFormat.Segment.newBuilder();
+                segmentBuilder.setFirstEntryId(entry.getKey());
+                for (BookieSocketAddress addr : entry.getValue()) {
+                    segmentBuilder.addEnsembleMember(addr.toString());
+                }
+                builder.addSegment(segmentBuilder.build());
+            }
+
+            builder.build().writeDelimitedTo(os);
+            return os.toByteArray();
         }
+    }
 
-        Map<String, byte[]> customMetadata = metadata.getCustomMetadata();
-        if (customMetadata.size() > 0) {
-            LedgerMetadataFormat.cMetadataMapEntry.Builder cMetadataBuilder =
-                LedgerMetadataFormat.cMetadataMapEntry.newBuilder();
-            for (Map.Entry<String, byte[]> entry : customMetadata.entrySet()) {
-                cMetadataBuilder.setKey(entry.getKey()).setValue(ByteString.copyFrom(entry.getValue()));
-                builder.addCustomMetadata(cMetadataBuilder.build());
+    private static byte[] serializeVersion2(LedgerMetadata metadata) throws IOException {
+        try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
+            writeHeader(os, METADATA_FORMAT_VERSION_2);
+            try (PrintWriter writer = new PrintWriter(new OutputStreamWriter(os, UTF_8.name()))) {
+                /***********************************************************************
+                 * WARNING: Do not modify to add fields.
+                 * This code is purposefully duplicated, as version 2 does not support adding
+                 * fields, and if this code was shared with version 3, it would be easy to
+                 * accidently add new fields and create BC issues.
+                 **********************************************************************/
+                LedgerMetadataFormat.Builder builder = LedgerMetadataFormat.newBuilder();
+                builder.setQuorumSize(metadata.getWriteQuorumSize())
+                    .setAckQuorumSize(metadata.getAckQuorumSize())
+                    .setEnsembleSize(metadata.getEnsembleSize())
+                    .setLength(metadata.getLength())
+                    .setLastEntryId(metadata.getLastEntryId());
+
+                switch (metadata.getState()) {
+                case CLOSED:
+                    builder.setState(LedgerMetadataFormat.State.CLOSED);
+                    break;
+                case IN_RECOVERY:
+                    builder.setState(LedgerMetadataFormat.State.IN_RECOVERY);
+                    break;
+                case OPEN:
+                    builder.setState(LedgerMetadataFormat.State.OPEN);
+                    break;
+                default:
+                    checkArgument(false,
+                                  String.format("Unknown state %s for protobuf serialization", metadata.getState()));
+                    break;
+                }
+
+                /** Hack to get around fact that ctime was never versioned correctly */
+                if (LedgerMetadataUtils.shouldStoreCtime(metadata)) {
+                    builder.setCtime(metadata.getCtime());
+                }
+
+                builder.setDigestType(apiToProtoDigestType(metadata.getDigestType()));
+                serializePassword(metadata.getPassword(), builder);
+
+                Map<String, byte[]> customMetadata = metadata.getCustomMetadata();
+                if (customMetadata.size() > 0) {
+                    LedgerMetadataFormat.cMetadataMapEntry.Builder cMetadataBuilder =
+                        LedgerMetadataFormat.cMetadataMapEntry.newBuilder();
+                    for (Map.Entry<String, byte[]> entry : customMetadata.entrySet()) {
+                        cMetadataBuilder.setKey(entry.getKey()).setValue(ByteString.copyFrom(entry.getValue()));
+                        builder.addCustomMetadata(cMetadataBuilder.build());
+                    }
+                }
+
+                for (Map.Entry<Long, ? extends List<BookieSocketAddress>> entry :
+                         metadata.getAllEnsembles().entrySet()) {
+                    LedgerMetadataFormat.Segment.Builder segmentBuilder = LedgerMetadataFormat.Segment.newBuilder();
+                    segmentBuilder.setFirstEntryId(entry.getKey());
+                    for (BookieSocketAddress addr : entry.getValue()) {
+                        segmentBuilder.addEnsembleMember(addr.toString());
+                    }
+                    builder.addSegment(segmentBuilder.build());
+                }
+
+                TextFormat.print(builder.build(), writer);
+                writer.flush();
             }
+            return os.toByteArray();
         }
+    }
+
+    private static byte[] serializeVersion1(LedgerMetadata metadata) throws IOException {
+        try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
+            writeHeader(os, METADATA_FORMAT_VERSION_1);
+
+            try (PrintWriter writer = new PrintWriter(new OutputStreamWriter(os, UTF_8.name()))) {
+                writer.append(String.valueOf(metadata.getWriteQuorumSize())).append(LINE_SPLITTER);
+                writer.append(String.valueOf(metadata.getEnsembleSize())).append(LINE_SPLITTER);
+                writer.append(String.valueOf(metadata.getLength())).append(LINE_SPLITTER);
 
-        for (Map.Entry<Long, ? extends List<BookieSocketAddress>> entry : metadata.getAllEnsembles().entrySet()) {
-            LedgerMetadataFormat.Segment.Builder segmentBuilder = LedgerMetadataFormat.Segment.newBuilder();
-            segmentBuilder.setFirstEntryId(entry.getKey());
-            for (BookieSocketAddress addr : entry.getValue()) {
-                segmentBuilder.addEnsembleMember(addr.toString());
+                for (Map.Entry<Long, ? extends List<BookieSocketAddress>> entry :
+                         metadata.getAllEnsembles().entrySet()) {
+                    writer.append(String.valueOf(entry.getKey()));
+                    for (BookieSocketAddress addr : entry.getValue()) {
+                        writer.append(FIELD_SPLITTER).append(addr.toString());
+                    }
+                    writer.append(LINE_SPLITTER);
+                }
+
+                if (metadata.getState() == State.IN_RECOVERY) {
+                    writer.append(String.valueOf(V1_IN_RECOVERY_ENTRY_ID)).append(FIELD_SPLITTER).append(V1_CLOSED_TAG);
+                } else if (metadata.getState() == State.CLOSED) {
+                    writer.append(String.valueOf(metadata.getLastEntryId()))
+                        .append(FIELD_SPLITTER).append(V1_CLOSED_TAG);
+                } else {
+                    checkArgument(metadata.getState() == State.OPEN,
+                                  String.format("Unknown state %s for V1 serialization", metadata.getState()));
+                }
+                writer.flush();
+            } catch (UnsupportedEncodingException uee) {
+                throw new RuntimeException("UTF_8 should be supported everywhere");
             }
-            builder.addSegment(segmentBuilder.build());
+            return os.toByteArray();
         }
-        return builder.build();
     }
 
+    private static void serializePassword(byte[] password, LedgerMetadataFormat.Builder builder) {
+        if (password == null || password.length == 0) {
+            builder.setPassword(ByteString.EMPTY);
+        } else {
+            builder.setPassword(ByteString.copyFrom(password));
+        }
+    }
 
     /**
      * Parses a given byte array and transforms into a LedgerConfig object.
@@ -181,55 +334,65 @@ public class LedgerMetadataSerDe {
      */
     public LedgerMetadata parseConfig(byte[] bytes,
                                       Optional<Long> metadataStoreCtime) throws IOException {
-        String config = new String(bytes, UTF_8);
-
         if (log.isDebugEnabled()) {
-            log.debug("Parsing Config: {}", config);
-        }
-        BufferedReader reader = new BufferedReader(new StringReader(config));
-        String versionLine = reader.readLine();
-        if (versionLine == null) {
-            throw new IOException("Invalid metadata. Content missing");
-        }
-        final int metadataFormatVersion;
-        if (versionLine.startsWith(VERSION_KEY)) {
-            String parts[] = versionLine.split(FIELD_SPLITTER);
-            metadataFormatVersion = Integer.parseInt(parts[1]);
-        } else {
-            // if no version is set, take it to be version 1
-            // as the parsing is the same as what we had before
-            // we introduce versions
-            metadataFormatVersion = 1;
-            // reset the reader
-            reader.close();
-            reader = new BufferedReader(new StringReader(config));
-        }
-
-        if (metadataFormatVersion < LOWEST_COMPAT_METADATA_FORMAT_VERSION
-            || metadataFormatVersion > CURRENT_METADATA_FORMAT_VERSION) {
-            throw new IOException(
-                    String.format("Metadata version not compatible. Expected between %d and %d, but got %d",
-                                  LOWEST_COMPAT_METADATA_FORMAT_VERSION, CURRENT_METADATA_FORMAT_VERSION,
-                                  metadataFormatVersion));
+            log.debug("Deserializing {}", Base64.getEncoder().encodeToString(bytes));
         }
+        try (ByteArrayInputStream is = new ByteArrayInputStream(bytes)) {
+            int metadataFormatVersion = readHeader(is);
+            if (log.isDebugEnabled()) {
+                String contentStr = "";
+                if (metadataFormatVersion <= METADATA_FORMAT_VERSION_2) {
+                    contentStr = ", content: " + new String(bytes, UTF_8);
+                }
+                log.debug("Format version {} detected{}", metadataFormatVersion, contentStr);
+            }
 
-        if (metadataFormatVersion == 1) {
-            return parseVersion1Config(reader);
+            switch (metadataFormatVersion) {
+            case METADATA_FORMAT_VERSION_3:
+                return parseVersion3Config(is);
+            case METADATA_FORMAT_VERSION_2:
+                return parseVersion2Config(is, metadataStoreCtime);
+            case METADATA_FORMAT_VERSION_1:
+                return parseVersion1Config(is);
+            default:
+                throw new IOException(
+                        String.format("Metadata version not compatible. Expected between %d and %d, but got %d",
+                                      LOWEST_COMPAT_METADATA_FORMAT_VERSION, CURRENT_METADATA_FORMAT_VERSION,
+                                      metadataFormatVersion));
+            }
         }
+    }
 
+    private static LedgerMetadata parseVersion3Config(InputStream is) throws IOException {
         LedgerMetadataBuilder builder = LedgerMetadataBuilder.create()
-            .withMetadataFormatVersion(metadataFormatVersion);
+            .withMetadataFormatVersion(METADATA_FORMAT_VERSION_3);
+        LedgerMetadataFormat.Builder formatBuilder = LedgerMetadataFormat.newBuilder();
+        formatBuilder.mergeDelimitedFrom(is);
+        decodeFormat(formatBuilder.build(), builder);
+        return builder.build();
+    }
 
-        // remaining size is total minus the length of the version line and '\n'
-        char[] configBuffer = new char[config.length() - (versionLine.length() + 1)];
-        if (configBuffer.length != reader.read(configBuffer, 0, configBuffer.length)) {
-            throw new IOException("Invalid metadata buffer");
-        }
+    private static LedgerMetadata parseVersion2Config(InputStream is, Optional<Long> metadataStoreCtime)
+            throws IOException {
+        LedgerMetadataBuilder builder = LedgerMetadataBuilder.create()
+            .withMetadataFormatVersion(METADATA_FORMAT_VERSION_2);
 
         LedgerMetadataFormat.Builder formatBuilder = LedgerMetadataFormat.newBuilder();
-        TextFormat.merge((CharSequence) CharBuffer.wrap(configBuffer), formatBuilder);
+        try (InputStreamReader reader = new InputStreamReader(is, UTF_8.name())) {
+            TextFormat.merge(reader, formatBuilder);
+        }
         LedgerMetadataFormat data = formatBuilder.build();
+        decodeFormat(data, builder);
+        if (data.hasCtime()) {
+            // 'storingCreationTime' is only ever taken into account for serializing version 2
+            builder.storingCreationTime(true);
+        } else if (metadataStoreCtime.isPresent()) {
+            builder.withCreationTime(metadataStoreCtime.get()).storingCreationTime(false);
+        }
+        return builder.build();
+    }
 
+    private static void decodeFormat(LedgerMetadataFormat data, LedgerMetadataBuilder builder) throws IOException {
         builder.withEnsembleSize(data.getEnsembleSize());
         builder.withWriteQuorumSize(data.getQuorumSize());
         if (data.hasAckQuorumSize()) {
@@ -239,9 +402,7 @@ public class LedgerMetadataSerDe {
         }
 
         if (data.hasCtime()) {
-            builder.withCreationTime(data.getCtime()).storingCreationTime(true);
-        } else if (metadataStoreCtime.isPresent()) {
-            builder.withCreationTime(metadataStoreCtime.get()).storingCreationTime(false);
+            builder.withCreationTime(data.getCtime());
         }
 
         if (data.getState() == LedgerMetadataFormat.State.IN_RECOVERY) {
@@ -268,12 +429,11 @@ public class LedgerMetadataSerDe {
                                                Collectors.toMap(e -> e.getKey(),
                                                                 e -> e.getValue().toByteArray())));
         }
-        return builder.build();
     }
 
-    static LedgerMetadata parseVersion1Config(BufferedReader reader) throws IOException {
-        LedgerMetadataBuilder builder = LedgerMetadataBuilder.create().withMetadataFormatVersion(1);
-        try {
+    private static LedgerMetadata parseVersion1Config(InputStream is) throws IOException {
+        try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, UTF_8.name()))) {
+            LedgerMetadataBuilder builder = LedgerMetadataBuilder.create().withMetadataFormatVersion(1);
             int quorumSize = Integer.parseInt(reader.readLine());
             int ensembleSize = Integer.parseInt(reader.readLine());
             long length = Long.parseLong(reader.readLine());
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
index fc87632..0834147 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
@@ -395,8 +395,15 @@ public class MSLedgerManagerFactory extends AbstractZkLedgerManagerFactory {
                 }
             };
 
-            ledgerTable.put(ledgerId2Key(lid), new Value().setField(META_FIELD, serDe.serialize(metadata)),
-                    Version.NEW, msCallback, null);
+            final byte[] bytes;
+            try {
+                bytes = serDe.serialize(metadata);
+            } catch (IOException ioe) {
+                promise.completeExceptionally(new BKException.BKMetadataSerializationException(ioe));
+                return promise;
+            }
+            ledgerTable.put(ledgerId2Key(lid), new Value().setField(META_FIELD, bytes),
+                            Version.NEW, msCallback, null);
             return promise;
         }
 
@@ -413,7 +420,7 @@ public class MSLedgerManagerFactory extends AbstractZkLedgerManagerFactory {
                     } else if (MSException.Code.OK.getCode() == rc) {
                         FutureUtils.complete(promise, null);
                     } else {
-                        promise.completeExceptionally(new BKException.MetaStoreException());
+                        promise.completeExceptionally(new BKException.BKMetadataSerializationException());
                     }
                 }
             };
@@ -457,13 +464,22 @@ public class MSLedgerManagerFactory extends AbstractZkLedgerManagerFactory {
         @Override
         public CompletableFuture<Versioned<LedgerMetadata>> writeLedgerMetadata(long ledgerId, LedgerMetadata metadata,
                                                                                 Version currentVersion) {
-            Value data = new Value().setField(META_FIELD, serDe.serialize(metadata));
+
+            CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<>();
+            final byte[] bytes;
+            try {
+                bytes = serDe.serialize(metadata);
+            } catch (IOException ioe) {
+                promise.completeExceptionally(new BKException.MetaStoreException(ioe));
+                return promise;
+            }
+
+            Value data = new Value().setField(META_FIELD, bytes);
 
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Writing ledger {} metadata, version {}", new Object[] { ledgerId, currentVersion });
             }
 
-            CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<>();
             final String key = ledgerId2Key(ledgerId);
             MetastoreCallback<Version> msCallback = new MetastoreCallback<Version>() {
                 @Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerMetadataTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerMetadataTest.java
index a9ffc2d..6fee301 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerMetadataTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerMetadataTest.java
@@ -30,9 +30,7 @@ import java.util.Collections;
 import java.util.List;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
-import org.apache.bookkeeper.meta.LedgerMetadataSerDe;
 import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat;
 import org.junit.Test;
 
 /**
@@ -69,36 +67,6 @@ public class LedgerMetadataTest {
     }
 
     @Test
-    public void testStoreSystemtimeAsLedgerCtimeEnabled()
-            throws Exception {
-        List<BookieSocketAddress> ensemble = Lists.newArrayList(
-                new BookieSocketAddress("192.0.2.1", 1234),
-                new BookieSocketAddress("192.0.2.2", 1234),
-                new BookieSocketAddress("192.0.2.3", 1234));
-        LedgerMetadata lm = LedgerMetadataBuilder.create()
-            .newEnsembleEntry(0L, ensemble)
-            .withCreationTime(System.currentTimeMillis())
-            .storingCreationTime(true)
-            .build();
-        LedgerMetadataFormat format = new LedgerMetadataSerDe().buildProtoFormat(lm);
-        assertTrue(format.hasCtime());
-    }
-
-    @Test
-    public void testStoreSystemtimeAsLedgerCtimeDisabled()
-            throws Exception {
-        List<BookieSocketAddress> ensemble = Lists.newArrayList(
-                new BookieSocketAddress("192.0.2.1", 1234),
-                new BookieSocketAddress("192.0.2.2", 1234),
-                new BookieSocketAddress("192.0.2.3", 1234));
-        LedgerMetadata lm = LedgerMetadataBuilder.create()
-            .newEnsembleEntry(0L, ensemble).build();
-
-        LedgerMetadataFormat format = new LedgerMetadataSerDe().buildProtoFormat(lm);
-        assertFalse(format.hasCtime());
-    }
-
-    @Test
     public void testToString() {
         List<BookieSocketAddress> ensemble = Lists.newArrayList(
                 new BookieSocketAddress("192.0.2.1", 1234),
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
index ffacb21..b96859b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MetadataUpdateLoopTest.java
@@ -42,6 +42,7 @@ import java.util.stream.IntStream;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 
+import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.MockLedgerManager;
@@ -69,6 +70,7 @@ public class MetadataUpdateLoopTest {
     public void testBasicUpdate() throws Exception {
         try (LedgerManager lm = new MockLedgerManager()) {
             LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(5)
+                .withDigestType(DigestType.CRC32C).withPassword(new byte[0])
                 .newEnsembleEntry(0L, Lists.newArrayList(
                                           new BookieSocketAddress("0.0.0.0:3181"),
                                           new BookieSocketAddress("0.0.0.1:3181"),
@@ -115,6 +117,7 @@ public class MetadataUpdateLoopTest {
             BookieSocketAddress b3 = new BookieSocketAddress("0.0.0.3:3181");
 
             LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(2)
+                .withDigestType(DigestType.CRC32C).withPassword(new byte[0])
                 .withWriteQuorumSize(2).newEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
             Versioned<LedgerMetadata> writtenMetadata =
                 lm.createLedgerMetadata(ledgerId, initMeta).get();
@@ -181,6 +184,7 @@ public class MetadataUpdateLoopTest {
             BookieSocketAddress b2 = new BookieSocketAddress("0.0.0.2:3181");
 
             LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(2)
+                .withDigestType(DigestType.CRC32C).withPassword(new byte[0])
                 .withWriteQuorumSize(2).newEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
             Versioned<LedgerMetadata> writtenMetadata = lm.createLedgerMetadata(ledgerId, initMeta).get();
             AtomicReference<Versioned<LedgerMetadata>> reference = new AtomicReference<>(writtenMetadata);
@@ -234,6 +238,7 @@ public class MetadataUpdateLoopTest {
             BookieSocketAddress b3 = new BookieSocketAddress("0.0.0.3:3181");
 
             LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(2)
+                .withDigestType(DigestType.CRC32C).withPassword(new byte[0])
                 .withWriteQuorumSize(2).newEnsembleEntry(0L, Lists.newArrayList(b0, b1)).build();
             Versioned<LedgerMetadata> writtenMetadata = lm.createLedgerMetadata(ledgerId, initMeta).get();
             AtomicReference<Versioned<LedgerMetadata>> reference = new AtomicReference<>(writtenMetadata);
@@ -299,6 +304,7 @@ public class MetadataUpdateLoopTest {
                 .collect(Collectors.toList());
 
             LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(ensembleSize)
+                .withDigestType(DigestType.CRC32C).withPassword(new byte[0])
                 .newEnsembleEntry(0L, initialEnsemble).build();
             Versioned<LedgerMetadata> writtenMetadata = lm.createLedgerMetadata(ledgerId, initMeta).get();
 
@@ -342,6 +348,7 @@ public class MetadataUpdateLoopTest {
             BookieSocketAddress b1 = new BookieSocketAddress("0.0.0.1:3181");
 
             LedgerMetadata initMeta = LedgerMetadataBuilder.create().withEnsembleSize(1)
+                .withDigestType(DigestType.CRC32C).withPassword(new byte[0])
                 .withWriteQuorumSize(1).withAckQuorumSize(1)
                 .newEnsembleEntry(0L, Lists.newArrayList(b0)).build();
             Versioned<LedgerMetadata> writtenMetadata = lm.createLedgerMetadata(ledgerId, initMeta).get();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
index 3aaeedb..9d68ec8 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java
@@ -65,8 +65,8 @@ public class MockLedgerHandle extends LedgerHandle {
 
     MockLedgerHandle(MockBookKeeper bk, long id, DigestType digest, byte[] passwd) throws GeneralSecurityException {
         super(bk.getClientCtx(), id,
-              new Versioned<>(createMetadata(), new LongVersion(0L)),
-              DigestType.MAC, "".getBytes(), WriteFlag.NONE);
+              new Versioned<>(createMetadata(digest, passwd), new LongVersion(0L)),
+              digest, passwd, WriteFlag.NONE);
         this.bk = bk;
         this.id = id;
         this.digest = digest;
@@ -268,12 +268,14 @@ public class MockLedgerHandle extends LedgerHandle {
         return readHandle.readLastAddConfirmedAndEntryAsync(entryId, timeOutInMillis, parallel);
     }
 
-    private static LedgerMetadata createMetadata() {
+    private static LedgerMetadata createMetadata(DigestType digest, byte[] passwd) {
         List<BookieSocketAddress> ensemble = Lists.newArrayList(
                 new BookieSocketAddress("192.0.2.1", 1234),
                 new BookieSocketAddress("192.0.2.2", 1234),
                 new BookieSocketAddress("192.0.2.3", 1234));
         return LedgerMetadataBuilder.create()
+            .withDigestType(digest.toApiDigestType())
+            .withPassword(passwd)
             .newEnsembleEntry(0L, ensemble)
             .build();
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
index 5c6a8c7..26a265b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestWatchEnsembleChange.java
@@ -141,6 +141,7 @@ public class TestWatchEnsembleChange extends BookKeeperClusterTestCase {
                 @Override
                 public void operationComplete(int rc, final Long lid) {
                     LedgerMetadata metadata = LedgerMetadataBuilder.create()
+                        .withDigestType(digestType.toApiDigestType()).withPassword(new byte[0])
                         .withEnsembleSize(4).withWriteQuorumSize(2)
                         .withAckQuorumSize(2)
                         .newEnsembleEntry(0L, ensemble).build();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
index 37f55b3..5776af3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
@@ -51,6 +51,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.LedgerMetadataBuilder;
+import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.common.testing.executors.MockExecutorController;
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -121,6 +122,7 @@ public class AbstractZkLedgerManagerTest extends MockZooKeeperTestCase {
                 new BookieSocketAddress("192.0.2.4", 3181),
                 new BookieSocketAddress("192.0.2.5", 3181));
         this.metadata = LedgerMetadataBuilder.create()
+            .withDigestType(DigestType.CRC32C).withPassword(new byte[0])
             .withEnsembleSize(5)
             .withWriteQuorumSize(3)
             .withAckQuorumSize(3)
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index 31bd406..933c117 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -65,6 +65,7 @@ import org.apache.bookkeeper.bookie.ScanAndCompareGarbageCollector;
 import org.apache.bookkeeper.bookie.StateManager;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerMetadataBuilder;
+import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -113,6 +114,8 @@ public class GcLedgersTest extends LedgerManagerTestCase {
                     }
 
                     LedgerMetadata md = LedgerMetadataBuilder.create()
+                        .withDigestType(DigestType.CRC32C)
+                        .withPassword(new byte[0])
                         .withEnsembleSize(1).withWriteQuorumSize(1).withAckQuorumSize(1)
                         .newEnsembleEntry(0L, ensemble).build();
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MockLedgerManager.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MockLedgerManager.java
index 398bb07..f5cbe3a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MockLedgerManager.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MockLedgerManager.java
@@ -103,8 +103,8 @@ public class MockLedgerManager implements LedgerManager {
                 if (metadataMap.containsKey(ledgerId)) {
                     executeCallback(() -> promise.completeExceptionally(new BKException.BKLedgerExistException()));
                 } else {
-                    metadataMap.put(ledgerId, Pair.of(new LongVersion(0L), serDe.serialize(metadata)));
                     try {
+                        metadataMap.put(ledgerId, Pair.of(new LongVersion(0L), serDe.serialize(metadata)));
                         Versioned<LedgerMetadata> readBack = readMetadata(ledgerId);
                         executeCallback(() -> promise.complete(readBack));
                     } catch (Exception e) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerMetadataSerDe.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerMetadataSerDe.java
new file mode 100644
index 0000000..d9ce5a2
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/TestLedgerMetadataSerDe.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.Base64;
+import java.util.Optional;
+import java.util.Random;
+import org.apache.bookkeeper.client.LedgerMetadataBuilder;
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test Ledger Metadata serialization and deserialization.
+ */
+public class TestLedgerMetadataSerDe {
+    // as used in 4.0.x & 4.1.x
+    private static final String version1 =
+        "Qm9va2llTWV0YWRhdGFGb3JtYXRWZXJzaW9uCTEKMgozCjAKMAkxOTIuMC4yLjE6MTIzNAkxOTIu"
+        + "MC4yLjI6MTIzNAkxOTIuMC4yLjM6MTIzNAotMTAyCUNMT1NFRA==";
+
+    // as used in 4.2.x & 4.3.x (text protobuf based metadata, password and digest introduced)
+    private static final String version2 =
+        "Qm9va2llTWV0YWRhdGFGb3JtYXRWZXJzaW9uCTIKcXVvcnVtU2l6ZTogMgplbnNlbWJsZVNpemU6I"
+        + "DMKbGVuZ3RoOiAwCmxhc3RFbnRyeUlkOiAtMQpzdGF0ZTogSU5fUkVDT1ZFUlkKc2VnbWVudCB7"
+        + "CiAgZW5zZW1ibGVNZW1iZXI6ICIxOTIuMC4yLjE6MTIzNCIKICBlbnNlbWJsZU1lbWJlcjogIjE"
+        + "5Mi4wLjIuMjoxMjM0IgogIGVuc2VtYmxlTWVtYmVyOiAiMTkyLjAuMi4zOjEyMzQiCiAgZmlyc3"
+        + "RFbnRyeUlkOiAwCn0KZGlnZXN0VHlwZTogQ1JDMzIKcGFzc3dvcmQ6ICJwYXNzd2QiCmFja1F1b"
+        + "3J1bVNpemU6IDIK";
+
+    // version 2 + ctime, as used in 4.4.x to 4.8.x (ctime is optional from 4.6.x onwards)
+    private static final String version2ctime =
+        "Qm9va2llTWV0YWRhdGFGb3JtYXRWZXJzaW9uCTIKcXVvcnVtU2l6ZTogMgplbnNlbWJsZVNpemU6I"
+        + "DMKbGVuZ3RoOiAwCmxhc3RFbnRyeUlkOiAtMQpzdGF0ZTogSU5fUkVDT1ZFUlkKc2VnbWVudCB7"
+        + "CiAgZW5zZW1ibGVNZW1iZXI6ICIxOTIuMC4yLjE6MTIzNCIKICBlbnNlbWJsZU1lbWJlcjogIjE"
+        + "5Mi4wLjIuMjoxMjM0IgogIGVuc2VtYmxlTWVtYmVyOiAiMTkyLjAuMi4zOjEyMzQiCiAgZmlyc3"
+        + "RFbnRyeUlkOiAwCn0KZGlnZXN0VHlwZTogQ1JDMzIKcGFzc3dvcmQ6ICJwYXNzd2QiCmFja1F1b"
+        + "3J1bVNpemU6IDIKY3RpbWU6IDE1NDQwMDIzODMwNzUK";
+
+    // version 3, since 4.9.x, protobuf binary format
+    private static final String version3 =
+        "Qm9va2llTWV0YWRhdGFGb3JtYXRWZXJzaW9uCTMKXggCEAMYACD///////////8BKAEyMgoOMTkyL"
+        + "jAuMi4xOjMxODEKDjE5Mi4wLjIuMjozMTgxCg4xOTIuMC4yLjM6MzE4MRAAOANCBmZvb2JhckgB"
+        + "UP///////////wE=";
+
+    private static void testDecodeEncode(String encoded) throws Exception {
+        LedgerMetadataSerDe serDe = new LedgerMetadataSerDe();
+        LedgerMetadata md = serDe.parseConfig(Base64.getDecoder().decode(encoded), Optional.empty());
+        String reserialized = Base64.getEncoder().encodeToString(serDe.serialize(md));
+
+        Assert.assertEquals(encoded, reserialized);
+    }
+
+    @Test
+    public void testVersion1SerDe() throws Exception {
+        testDecodeEncode(version1);
+    }
+
+    @Test
+    public void testVersion2SerDe() throws Exception {
+        testDecodeEncode(version2);
+    }
+
+    @Test
+    public void testVersion2CtimeSerDe() throws Exception {
+        testDecodeEncode(version2ctime);
+    }
+
+    @Test
+    public void testVersion3SerDe() throws Exception {
+        testDecodeEncode(version3);
+    }
+
+    @Test(expected = IOException.class)
+    public void testJunkSerDe() throws Exception {
+        LedgerMetadataSerDe serDe = new LedgerMetadataSerDe();
+        String junk = "";
+        serDe.parseConfig(junk.getBytes(UTF_8), Optional.empty());
+    }
+
+    @Test(expected = IOException.class)
+    public void testJunk2SerDe() throws Exception {
+        byte[] randomBytes = new byte[1000];
+        new Random().nextBytes(randomBytes);
+        LedgerMetadataSerDe serDe = new LedgerMetadataSerDe();
+        serDe.parseConfig(randomBytes, Optional.empty());
+    }
+
+    @Test(expected = IOException.class)
+    public void testJunkVersionSerDe() throws Exception {
+        byte[] junkVersion = "BookieMetadataFormatVersion\tfoobar\nblahblah".getBytes(UTF_8);
+        LedgerMetadataSerDe serDe = new LedgerMetadataSerDe();
+        serDe.parseConfig(junkVersion, Optional.empty());
+    }
+
+    @Test(expected = IOException.class)
+    public void testVeryLongVersionSerDe() throws Exception {
+        byte[] veryLongVersion = "BookieMetadataFormatVersion\t123456789123456789\nblahblah".getBytes(UTF_8);
+        LedgerMetadataSerDe serDe = new LedgerMetadataSerDe();
+        serDe.parseConfig(veryLongVersion, Optional.empty());
+    }
+
+    @Test
+    public void testPeggedToV2SerDe() throws Exception {
+        LedgerMetadataSerDe serDe = new LedgerMetadataSerDe();
+        LedgerMetadata metadata = LedgerMetadataBuilder.create()
+            .withEnsembleSize(3).withWriteQuorumSize(2).withAckQuorumSize(1)
+            .withPassword("foobar".getBytes(UTF_8)).withDigestType(DigestType.CRC32C)
+            .newEnsembleEntry(0L, Lists.newArrayList(new BookieSocketAddress("192.0.2.1", 3181),
+                                                     new BookieSocketAddress("192.0.2.2", 3181),
+                                                     new BookieSocketAddress("192.0.2.3", 3181)))
+            .build();
+        byte[] encoded = serDe.serialize(metadata);
+
+        LedgerMetadata decoded = serDe.parseConfig(encoded, Optional.empty());
+        Assert.assertEquals(2, decoded.getMetadataFormatVersion());
+    }
+
+    @Test
+    public void testStoreSystemtimeAsLedgerCtimeEnabledWithVersion2()
+            throws Exception {
+        LedgerMetadata lm = LedgerMetadataBuilder.create()
+            .withEnsembleSize(3).withWriteQuorumSize(2).withAckQuorumSize(1)
+            .withPassword("foobar".getBytes(UTF_8)).withDigestType(DigestType.CRC32C)
+            .newEnsembleEntry(0L, Lists.newArrayList(
+                                      new BookieSocketAddress("192.0.2.1", 1234),
+                                      new BookieSocketAddress("192.0.2.2", 1234),
+                                      new BookieSocketAddress("192.0.2.3", 1234)))
+            .withCreationTime(123456L)
+            .storingCreationTime(true)
+            .build();
+        LedgerMetadataSerDe serDe = new LedgerMetadataSerDe();
+        byte[] serialized = serDe.serialize(lm);
+        LedgerMetadata deserialized = serDe.parseConfig(serialized, Optional.of(654321L));
+        Assert.assertEquals(deserialized.getCtime(), 123456L);
+
+        // give it another round
+        LedgerMetadata deserialized2 = serDe.parseConfig(serDe.serialize(deserialized), Optional.of(98765L));
+        Assert.assertEquals(deserialized2.getCtime(), 123456L);
+    }
+
+    @Test
+    public void testStoreSystemtimeAsLedgerCtimeDisabledWithVersion2()
+            throws Exception {
+        LedgerMetadata lm = LedgerMetadataBuilder.create()
+            .withEnsembleSize(3).withWriteQuorumSize(2).withAckQuorumSize(1)
+            .withPassword("foobar".getBytes(UTF_8)).withDigestType(DigestType.CRC32C)
+            .newEnsembleEntry(0L, Lists.newArrayList(
+                                      new BookieSocketAddress("192.0.2.1", 1234),
+                                      new BookieSocketAddress("192.0.2.2", 1234),
+                                      new BookieSocketAddress("192.0.2.3", 1234)))
+            .build();
+
+        LedgerMetadataSerDe serDe = new LedgerMetadataSerDe();
+        byte[] serialized = serDe.serialize(lm);
+        LedgerMetadata deserialized = serDe.parseConfig(serialized, Optional.of(654321L));
+        Assert.assertEquals(deserialized.getCtime(), 654321L);
+
+        // give it another round
+        LedgerMetadata deserialized2 = serDe.parseConfig(serDe.serialize(deserialized), Optional.of(98765L));
+        Assert.assertEquals(deserialized2.getCtime(), 98765L);
+    }
+}
diff --git a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
index d571bf8..4988a81 100644
--- a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
+++ b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
@@ -110,8 +110,16 @@ class EtcdLedgerManager implements LedgerManager {
                                                                              LedgerMetadata metadata) {
         CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<>();
         String ledgerKey = EtcdUtils.getLedgerKey(scope, ledgerId);
-        ByteSequence ledgerKeyBs = ByteSequence.fromString(ledgerKey);
         log.info("Create ledger metadata under key {}", ledgerKey);
+
+        ByteSequence ledgerKeyBs = ByteSequence.fromString(ledgerKey);
+        final ByteSequence valueBs;
+        try {
+            valueBs = ByteSequence.fromBytes(serDe.serialize(metadata));
+        } catch (IOException ioe) {
+            promise.completeExceptionally(new BKException.BKMetadataSerializationException(ioe));
+            return promise;
+        }
         kvClient.txn()
             .If(new Cmp(
                 ledgerKeyBs,
@@ -124,7 +132,7 @@ class EtcdLedgerManager implements LedgerManager {
                     .build()))
             .Else(com.coreos.jetcd.op.Op.put(
                 ledgerKeyBs,
-                ByteSequence.fromBytes(serDe.serialize(metadata)),
+                valueBs,
                 PutOption.DEFAULT))
             .commit()
             .thenAccept(resp -> {
@@ -255,6 +263,15 @@ class EtcdLedgerManager implements LedgerManager {
         final LongVersion lv = (LongVersion) currentVersion;
         String ledgerKey = EtcdUtils.getLedgerKey(scope, ledgerId);
         ByteSequence ledgerKeyBs = ByteSequence.fromString(ledgerKey);
+
+        final ByteSequence valueBs;
+        try {
+            valueBs = ByteSequence.fromBytes(serDe.serialize(metadata));
+        } catch (IOException ioe) {
+            promise.completeExceptionally(new BKException.BKMetadataSerializationException(ioe));
+            return promise;
+        }
+
         kvClient.txn()
             .If(new Cmp(
                 ledgerKeyBs,
@@ -262,7 +279,7 @@ class EtcdLedgerManager implements LedgerManager {
                 CmpTarget.modRevision(lv.getLongVersion())))
             .Then(com.coreos.jetcd.op.Op.put(
                 ledgerKeyBs,
-                ByteSequence.fromBytes(serDe.serialize(metadata)),
+                valueBs,
                 PutOption.DEFAULT))
             .Else(com.coreos.jetcd.op.Op.get(
                 ledgerKeyBs,