You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by iv...@apache.org on 2018/11/30 10:25:45 UTC

[bookkeeper] branch master updated: Move serialization code out of LedgerMetadata

This is an automated email from the ASF dual-hosted git repository.

ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new d6007ed  Move serialization code out of LedgerMetadata
d6007ed is described below

commit d6007edc419751378e5f7efb2a944693c1956401
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Fri Nov 30 11:25:40 2018 +0100

    Move serialization code out of LedgerMetadata
    
    This PR moves the serialization code out of LedgerMetadata so that it
    can later be adapted to run different serialization code depending on
    the environment.
    
    Notable non-refactor changes:
    - LedgerMetadata#toString no longer uses #serialize because it's no
      longer available. Instead it uses the ToString helper from guava.
      byte[] fields are now base64 encoded.
    - There's a new state enum and getter in api.LedgerMetadata. This is
      so that LedgerMetadataFormat can be removed from
      client.LedgerMetadata.
    
    Master issue: #723
    
    
    Reviewers: Sijie Guo <si...@apache.org>
    
    This closes #1848 from ivankelly/refactor-md-serde
---
 .../org/apache/bookkeeper/bookie/BookieShell.java  |   8 +-
 .../apache/bookkeeper/client/LedgerMetadata.java   | 336 +++-----------------
 .../bookkeeper/client/LedgerMetadataBuilder.java   |  13 +-
 .../bookkeeper/client/api/LedgerMetadata.java      |  25 ++
 .../bookkeeper/meta/AbstractZkLedgerManager.java   |   8 +-
 .../bookkeeper/meta/LedgerMetadataSerDe.java       | 338 +++++++++++++++++++++
 .../bookkeeper/meta/MSLedgerManagerFactory.java    |   9 +-
 .../server/http/service/GetLedgerMetaService.java  |   6 +-
 .../server/http/service/ListLedgerService.java     |  10 +-
 .../bookkeeper/client/LedgerMetadataTest.java      |  19 +-
 .../meta/AbstractZkLedgerManagerTest.java          |  29 +-
 .../apache/bookkeeper/meta/MockLedgerManager.java  |   8 +-
 .../bookkeeper/server/http/TestHttpService.java    |   3 +-
 .../metadata/etcd/EtcdLedgerManager.java           |  15 +-
 14 files changed, 482 insertions(+), 345 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 12d85e2..c58fa83 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -98,6 +98,7 @@ import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.discover.RegistrationManager;
 import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerMetadataSerDe;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.meta.UnderreplicatedLedger;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
@@ -1105,7 +1106,7 @@ public class BookieShell implements Tool {
     void printLedgerMetadata(long ledgerId, LedgerMetadata md, boolean printMeta) {
         System.out.println("ledgerID: " + ledgerIdFormatter.formatLedgerId(ledgerId));
         if (printMeta) {
-            System.out.println(new String(md.serialize(), UTF_8));
+            System.out.println(new String(new LedgerMetadataSerDe().serialize(md), UTF_8));
         }
     }
 
@@ -1114,6 +1115,7 @@ public class BookieShell implements Tool {
      */
     class LedgerMetadataCmd extends MyCommand {
         Options lOpts = new Options();
+        LedgerMetadataSerDe serDe = new LedgerMetadataSerDe();
 
         LedgerMetadataCmd() {
             super(CMD_LEDGERMETADATA);
@@ -1139,11 +1141,11 @@ public class BookieShell implements Tool {
                     if (cmdLine.hasOption("dumptofile")) {
                         Versioned<LedgerMetadata> md = m.readLedgerMetadata(lid).join();
                         Files.write(FileSystems.getDefault().getPath(cmdLine.getOptionValue("dumptofile")),
-                                    md.getValue().serialize());
+                                    serDe.serialize(md.getValue()));
                     } else if (cmdLine.hasOption("restorefromfile")) {
                         byte[] serialized = Files.readAllBytes(
                                 FileSystems.getDefault().getPath(cmdLine.getOptionValue("restorefromfile")));
-                        LedgerMetadata md = LedgerMetadata.parseConfig(serialized, Optional.empty());
+                        LedgerMetadata md = serDe.parseConfig(serialized, Optional.empty());
                         m.createLedgerMetadata(lid, md).join();
                     } else {
                         printLedgerMetadata(lid, m.readLedgerMetadata(lid).get().getValue(), true);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
index 894a7b1..6317612 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
@@ -17,26 +17,19 @@
  */
 package org.apache.bookkeeper.client;
 
-import static com.google.common.base.Charsets.UTF_8;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.TextFormat;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.StringReader;
-import java.nio.CharBuffer;
-import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Base64;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.NavigableMap;
 import java.util.Optional;
 import java.util.Set;
@@ -45,8 +38,8 @@ import java.util.TreeMap;
 import java.util.stream.Collectors;
 import lombok.EqualsAndHashCode;
 import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.LedgerMetadata.State;
 import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,25 +53,12 @@ import org.slf4j.LoggerFactory;
 public class LedgerMetadata implements org.apache.bookkeeper.client.api.LedgerMetadata {
     static final Logger LOG = LoggerFactory.getLogger(LedgerMetadata.class);
 
-    private static final String closed = "CLOSED";
-    private static final String lSplitter = "\n";
-    private static final String tSplitter = "\t";
-
-    // can't use -1 for NOTCLOSED because that is reserved for a closed, empty
-    // ledger
-    private static final int NOTCLOSED = -101;
-    private static final int IN_RECOVERY = -102;
-
-    public static final int LOWEST_COMPAT_METADATA_FORMAT_VERSION = 0;
-    public static final int CURRENT_METADATA_FORMAT_VERSION = 2;
-    public static final String VERSION_KEY = "BookieMetadataFormatVersion";
-
     private final int metadataFormatVersion;
     private final int ensembleSize;
     private final int writeQuorumSize;
     private final int ackQuorumSize;
 
-    private final LedgerMetadataFormat.State state;
+    private final State state;
     private final long length;
     private final long lastEntryId;
     private final long ctime;
@@ -88,7 +68,7 @@ public class LedgerMetadata implements org.apache.bookkeeper.client.api.LedgerMe
     private final ImmutableList<BookieSocketAddress> currentEnsemble;
 
     private final boolean hasPassword;
-    private final LedgerMetadataFormat.DigestType digestType;
+    private final DigestType digestType;
     private final byte[] password;
 
     private final Map<String, byte[]> customMetadata;
@@ -97,7 +77,7 @@ public class LedgerMetadata implements org.apache.bookkeeper.client.api.LedgerMe
                    int ensembleSize,
                    int writeQuorumSize,
                    int ackQuorumSize,
-                   LedgerMetadataFormat.State state,
+                   State state,
                    Optional<Long> lastEntryId,
                    Optional<Long> length,
                    Map<Long, List<BookieSocketAddress>> ensembles,
@@ -107,7 +87,7 @@ public class LedgerMetadata implements org.apache.bookkeeper.client.api.LedgerMe
                    boolean storeCtime,
                    Map<String, byte[]> customMetadata) {
         checkArgument(ensembles.size() > 0, "There must be at least one ensemble in the ledger");
-        if (state == LedgerMetadataFormat.State.CLOSED) {
+        if (state == State.CLOSED) {
             checkArgument(length.isPresent(), "Closed ledger must have a length");
             checkArgument(lastEntryId.isPresent(), "Closed ledger must have a last entry");
         } else {
@@ -130,14 +110,13 @@ public class LedgerMetadata implements org.apache.bookkeeper.client.api.LedgerMe
                                                                       ImmutableList.copyOf(e.getValue())),
                                                       TreeMap::putAll));
 
-        if (state != LedgerMetadataFormat.State.CLOSED) {
+        if (state != State.CLOSED) {
             currentEnsemble = this.ensembles.lastEntry().getValue();
         } else {
             currentEnsemble = null;
         }
 
-        this.digestType = digestType.equals(DigestType.MAC)
-            ? LedgerMetadataFormat.DigestType.HMAC : LedgerMetadataFormat.DigestType.valueOf(digestType.toString());
+        this.digestType = digestType;
 
         if (password.isPresent()) {
             this.password = password.get();
@@ -183,7 +162,7 @@ public class LedgerMetadata implements org.apache.bookkeeper.client.api.LedgerMe
      *
      * @return whether the password has been stored in the metadata
      */
-    boolean hasPassword() {
+    public boolean hasPassword() {
         return hasPassword;
     }
 
@@ -198,18 +177,7 @@ public class LedgerMetadata implements org.apache.bookkeeper.client.api.LedgerMe
 
     @Override
     public DigestType getDigestType() {
-        switch (digestType) {
-            case HMAC:
-                return DigestType.MAC;
-            case CRC32:
-                return DigestType.CRC32;
-            case CRC32C:
-                return DigestType.CRC32C;
-            case DUMMY:
-                return DigestType.DUMMY;
-            default:
-                throw new IllegalArgumentException("Unable to convert digest type " + digestType);
-        }
+        return digestType;
     }
 
     @Override
@@ -224,14 +192,15 @@ public class LedgerMetadata implements org.apache.bookkeeper.client.api.LedgerMe
 
     @Override
     public boolean isClosed() {
-        return state == LedgerMetadataFormat.State.CLOSED;
+        return state == State.CLOSED;
     }
 
     public boolean isInRecovery() {
-        return state == LedgerMetadataFormat.State.IN_RECOVERY;
+        return state == State.IN_RECOVERY;
     }
 
-    public LedgerMetadataFormat.State getState() {
+    @Override
+    public State getState() {
         return state;
     }
 
@@ -272,236 +241,6 @@ public class LedgerMetadata implements org.apache.bookkeeper.client.api.LedgerMe
         return this.customMetadata;
     }
 
-    LedgerMetadataFormat buildProtoFormat() {
-        return buildProtoFormat(true);
-    }
-
-    LedgerMetadataFormat buildProtoFormat(boolean withPassword) {
-        LedgerMetadataFormat.Builder builder = LedgerMetadataFormat.newBuilder();
-        builder.setQuorumSize(writeQuorumSize).setAckQuorumSize(ackQuorumSize)
-            .setEnsembleSize(ensembleSize).setLength(length)
-            .setState(state).setLastEntryId(lastEntryId);
-
-        if (storeCtime) {
-            builder.setCtime(ctime);
-        }
-
-        if (hasPassword) {
-            builder.setDigestType(digestType);
-            if (withPassword) {
-                builder.setPassword(ByteString.copyFrom(password));
-            }
-        }
-
-        if (customMetadata != null) {
-            LedgerMetadataFormat.cMetadataMapEntry.Builder cMetadataBuilder =
-                LedgerMetadataFormat.cMetadataMapEntry.newBuilder();
-            for (Map.Entry<String, byte[]> entry : customMetadata.entrySet()) {
-                cMetadataBuilder.setKey(entry.getKey()).setValue(ByteString.copyFrom(entry.getValue()));
-                builder.addCustomMetadata(cMetadataBuilder.build());
-            }
-        }
-
-        for (Map.Entry<Long, ? extends List<BookieSocketAddress>> entry : ensembles.entrySet()) {
-            LedgerMetadataFormat.Segment.Builder segmentBuilder = LedgerMetadataFormat.Segment.newBuilder();
-            segmentBuilder.setFirstEntryId(entry.getKey());
-            for (BookieSocketAddress addr : entry.getValue()) {
-                segmentBuilder.addEnsembleMember(addr.toString());
-            }
-            builder.addSegment(segmentBuilder.build());
-        }
-        return builder.build();
-    }
-
-    /**
-     * Generates a byte array of this object.
-     *
-     * @return the metadata serialized into a byte array
-     */
-    public byte[] serialize() {
-        return serialize(true);
-    }
-
-    public byte[] serialize(boolean withPassword) {
-        if (metadataFormatVersion == 1) {
-            return serializeVersion1();
-        }
-
-        StringBuilder s = new StringBuilder();
-        s.append(VERSION_KEY).append(tSplitter).append(CURRENT_METADATA_FORMAT_VERSION).append(lSplitter);
-        s.append(TextFormat.printToString(buildProtoFormat(withPassword)));
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Serialized config: {}", s);
-        }
-        return s.toString().getBytes(UTF_8);
-    }
-
-    private byte[] serializeVersion1() {
-        StringBuilder s = new StringBuilder();
-        s.append(VERSION_KEY).append(tSplitter).append(metadataFormatVersion).append(lSplitter);
-        s.append(writeQuorumSize).append(lSplitter).append(ensembleSize).append(lSplitter).append(length);
-
-        for (Map.Entry<Long, ? extends List<BookieSocketAddress>> entry : ensembles.entrySet()) {
-            s.append(lSplitter).append(entry.getKey());
-            for (BookieSocketAddress addr : entry.getValue()) {
-                s.append(tSplitter);
-                s.append(addr.toString());
-            }
-        }
-
-        if (state == LedgerMetadataFormat.State.IN_RECOVERY) {
-            s.append(lSplitter).append(IN_RECOVERY).append(tSplitter).append(closed);
-        } else if (state == LedgerMetadataFormat.State.CLOSED) {
-            s.append(lSplitter).append(getLastEntryId()).append(tSplitter).append(closed);
-        }
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Serialized config: {}", s);
-        }
-
-        return s.toString().getBytes(UTF_8);
-    }
-
-    /**
-     * Parses a given byte array and transforms into a LedgerConfig object.
-     *
-     * @param bytes
-     *            byte array to parse
-     * @param metadataStoreCtime
-     *            metadata store creation time, used for legacy ledgers
-     * @return LedgerConfig
-     * @throws IOException
-     *             if the given byte[] cannot be parsed
-     */
-    public static LedgerMetadata parseConfig(byte[] bytes,
-                                             Optional<Long> metadataStoreCtime) throws IOException {
-        String config = new String(bytes, UTF_8);
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Parsing Config: {}", config);
-        }
-        BufferedReader reader = new BufferedReader(new StringReader(config));
-        String versionLine = reader.readLine();
-        if (versionLine == null) {
-            throw new IOException("Invalid metadata. Content missing");
-        }
-        final int metadataFormatVersion;
-        if (versionLine.startsWith(VERSION_KEY)) {
-            String parts[] = versionLine.split(tSplitter);
-            metadataFormatVersion = Integer.parseInt(parts[1]);
-        } else {
-            // if no version is set, take it to be version 1
-            // as the parsing is the same as what we had before
-            // we introduce versions
-            metadataFormatVersion = 1;
-            // reset the reader
-            reader.close();
-            reader = new BufferedReader(new StringReader(config));
-        }
-
-        if (metadataFormatVersion < LOWEST_COMPAT_METADATA_FORMAT_VERSION
-            || metadataFormatVersion > CURRENT_METADATA_FORMAT_VERSION) {
-            throw new IOException(
-                    String.format("Metadata version not compatible. Expected between %d and %d, but got %d",
-                                  LOWEST_COMPAT_METADATA_FORMAT_VERSION, CURRENT_METADATA_FORMAT_VERSION,
-                                  metadataFormatVersion));
-        }
-
-        if (metadataFormatVersion == 1) {
-            return parseVersion1Config(reader);
-        }
-
-        LedgerMetadataBuilder builder = LedgerMetadataBuilder.create()
-            .withMetadataFormatVersion(metadataFormatVersion);
-
-        // remaining size is total minus the length of the version line and '\n'
-        char[] configBuffer = new char[config.length() - (versionLine.length() + 1)];
-        if (configBuffer.length != reader.read(configBuffer, 0, configBuffer.length)) {
-            throw new IOException("Invalid metadata buffer");
-        }
-
-        LedgerMetadataFormat.Builder formatBuilder = LedgerMetadataFormat.newBuilder();
-        TextFormat.merge((CharSequence) CharBuffer.wrap(configBuffer), formatBuilder);
-        LedgerMetadataFormat data = formatBuilder.build();
-
-        builder.withEnsembleSize(data.getEnsembleSize());
-        builder.withWriteQuorumSize(data.getQuorumSize());
-        if (data.hasAckQuorumSize()) {
-            builder.withAckQuorumSize(data.getAckQuorumSize());
-        } else {
-            builder.withAckQuorumSize(data.getQuorumSize());
-        }
-
-        if (data.hasCtime()) {
-            builder.withCreationTime(data.getCtime()).storingCreationTime(true);
-        } else if (metadataStoreCtime.isPresent()) {
-            builder.withCreationTime(metadataStoreCtime.get()).storingCreationTime(false);
-        }
-
-        if (data.getState() == LedgerMetadataFormat.State.IN_RECOVERY) {
-            builder.withInRecoveryState();
-        } else if (data.getState() == LedgerMetadataFormat.State.CLOSED) {
-            builder.withClosedState().withLastEntryId(data.getLastEntryId()).withLength(data.getLength());
-        }
-
-        if (data.hasPassword()) {
-            builder.withPassword(data.getPassword().toByteArray())
-                .withDigestType(protoToApiDigestType(data.getDigestType()));
-        }
-
-        for (LedgerMetadataFormat.Segment s : data.getSegmentList()) {
-            List<BookieSocketAddress> addrs = new ArrayList<>();
-            for (String addr : s.getEnsembleMemberList()) {
-                addrs.add(new BookieSocketAddress(addr));
-            }
-            builder.newEnsembleEntry(s.getFirstEntryId(), addrs);
-        }
-
-        if (data.getCustomMetadataCount() > 0) {
-            builder.withCustomMetadata(data.getCustomMetadataList().stream().collect(
-                                               Collectors.toMap(e -> e.getKey(),
-                                                                e -> e.getValue().toByteArray())));
-        }
-        return builder.build();
-    }
-
-    static LedgerMetadata parseVersion1Config(BufferedReader reader) throws IOException {
-        LedgerMetadataBuilder builder = LedgerMetadataBuilder.create().withMetadataFormatVersion(1);
-        try {
-            int quorumSize = Integer.parseInt(reader.readLine());
-            int ensembleSize = Integer.parseInt(reader.readLine());
-            long length = Long.parseLong(reader.readLine());
-
-            builder.withEnsembleSize(ensembleSize).withWriteQuorumSize(quorumSize).withAckQuorumSize(quorumSize);
-
-            String line = reader.readLine();
-            while (line != null) {
-                String parts[] = line.split(tSplitter);
-
-                if (parts[1].equals(closed)) {
-                    Long l = Long.parseLong(parts[0]);
-                    if (l == IN_RECOVERY) {
-                        builder.withInRecoveryState();
-                    } else {
-                        builder.withClosedState().withLastEntryId(l).withLength(length);
-                    }
-                    break;
-                }
-
-                ArrayList<BookieSocketAddress> addrs = new ArrayList<BookieSocketAddress>();
-                for (int j = 1; j < parts.length; j++) {
-                    addrs.add(new BookieSocketAddress(parts[j]));
-                }
-                builder.newEnsembleEntry(Long.parseLong(parts[0]), addrs);
-
-                line = reader.readLine();
-            }
-            return builder.build();
-        } catch (NumberFormatException e) {
-            throw new IOException(e);
-        }
-    }
-
     @Override
     public String toString() {
         return toStringRepresentation(true);
@@ -519,9 +258,30 @@ public class LedgerMetadata implements org.apache.bookkeeper.client.api.LedgerMe
     }
 
     private String toStringRepresentation(boolean withPassword) {
-        StringBuilder sb = new StringBuilder();
-        sb.append("(meta:").append(new String(serialize(withPassword), UTF_8)).append(")");
-        return sb.toString();
+        MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper("LedgerMetadata");
+        helper.add("formatVersion", metadataFormatVersion)
+            .add("ensembleSize", ensembleSize)
+            .add("writeQuorumSize", writeQuorumSize)
+            .add("ackQuorumSize", ackQuorumSize)
+            .add("state", state);
+        if (state == State.CLOSED) {
+            helper.add("length", length)
+                .add("lastEntryId", lastEntryId);
+        }
+        if (hasPassword()) {
+            helper.add("digestType", digestType);
+            if (withPassword) {
+                helper.add("password", "base64:" + Base64.getEncoder().encodeToString(password));
+            } else {
+                helper.add("password", "OMITTED");
+            }
+        }
+        helper.add("ensembles", ensembles.toString());
+        helper.add("customMetadata",
+                   customMetadata.entrySet().stream().collect(
+                           Collectors.toMap(e -> e.getKey(),
+                                            e -> "base64:" + Base64.getEncoder().encodeToString(e.getValue()))));
+        return helper.toString();
     }
 
     Set<BookieSocketAddress> getBookiesInThisLedger() {
@@ -542,22 +302,12 @@ public class LedgerMetadata implements org.apache.bookkeeper.client.api.LedgerMe
         return ensembles.lastKey();
     }
 
-    int getMetadataFormatVersion() {
+    public int getMetadataFormatVersion() {
         return metadataFormatVersion;
     }
 
-    private static DigestType protoToApiDigestType(LedgerMetadataFormat.DigestType digestType) {
-        switch (digestType) {
-        case HMAC:
-            return DigestType.MAC;
-        case CRC32:
-            return DigestType.CRC32;
-        case CRC32C:
-            return DigestType.CRC32C;
-        case DUMMY:
-            return DigestType.DUMMY;
-        default:
-            throw new IllegalArgumentException("Unable to convert digest type " + digestType);
-        }
+    // temporarily method, until storeCtime is removed from the metadata object itself
+    public boolean shouldStoreCtime() {
+        return storeCtime;
     }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
index 9034e18..88f5089 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadataBuilder.java
@@ -19,6 +19,7 @@ package org.apache.bookkeeper.client;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
+import static org.apache.bookkeeper.meta.LedgerMetadataSerDe.CURRENT_METADATA_FORMAT_VERSION;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
@@ -31,10 +32,10 @@ import java.util.Optional;
 import java.util.TreeMap;
 
 import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.LedgerMetadata.State;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate;
 import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable;
 import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat;
 
 /**
  * Builder for building LedgerMetadata objects.
@@ -43,12 +44,12 @@ import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat;
 @Unstable
 @VisibleForTesting
 public class LedgerMetadataBuilder {
-    private int metadataFormatVersion = LedgerMetadata.CURRENT_METADATA_FORMAT_VERSION;
+    private int metadataFormatVersion = CURRENT_METADATA_FORMAT_VERSION;
     private int ensembleSize = 3;
     private int writeQuorumSize = 3;
     private int ackQuorumSize = 2;
 
-    private LedgerMetadataFormat.State state = LedgerMetadataFormat.State.OPEN;
+    private State state = State.OPEN;
     private Optional<Long> lastEntryId = Optional.empty();
     private Optional<Long> length = Optional.empty();
 
@@ -73,7 +74,7 @@ public class LedgerMetadataBuilder {
         builder.ackQuorumSize = other.getAckQuorumSize();
 
         builder.state = other.getState();
-        if (builder.state == LedgerMetadataFormat.State.CLOSED) {
+        if (builder.state == State.CLOSED) {
             builder.lastEntryId = Optional.of(other.getLastEntryId());
             builder.length = Optional.of(other.getLength());
         }
@@ -143,12 +144,12 @@ public class LedgerMetadataBuilder {
     }
 
     public LedgerMetadataBuilder withInRecoveryState() {
-        this.state = LedgerMetadataFormat.State.IN_RECOVERY;
+        this.state = State.IN_RECOVERY;
         return this;
     }
 
     public LedgerMetadataBuilder withClosedState() {
-        this.state = LedgerMetadataFormat.State.CLOSED;
+        this.state = State.CLOSED;
         return this;
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerMetadata.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerMetadata.java
index dc2deb6..de86832 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerMetadata.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/LedgerMetadata.java
@@ -117,5 +117,30 @@ public interface LedgerMetadata {
      */
     NavigableMap<Long, ? extends List<BookieSocketAddress>> getAllEnsembles();
 
+    /**
+     * Returns the state of the metadata.
+     *
+     * @return the state of the metadata.
+     */
+    State getState();
 
+    /**
+     * Possible metadata states.
+     */
+    enum State {
+        /** The ledger is open. New entry may be added to it. */
+        OPEN,
+
+        /** A reader has tried to, or may be trying to recover the ledger.
+            The writer may be able to add new entries if fencing hasn't already occurred,
+            but any attempt to change ensemble will fail and the write will be forced to
+            close the ledger.
+        */
+        IN_RECOVERY,
+
+        /** The ledger is closed. No new entries may be added to it.
+            The length and lastEntryId are fixed. Ensembles may change, but only for rereplication.
+        */
+        CLOSED;
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
index 306cf50..2bd9b13 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
@@ -75,6 +75,7 @@ public abstract class AbstractZkLedgerManager implements LedgerManager, Watcher
     @VisibleForTesting
     static final int ZK_CONNECT_BACKOFF_MS = 200;
 
+    private final LedgerMetadataSerDe serDe;
     protected final AbstractConfiguration conf;
     protected final ZooKeeper zk;
     protected final String ledgerRootPath;
@@ -159,6 +160,7 @@ public abstract class AbstractZkLedgerManager implements LedgerManager, Watcher
      *          ZooKeeper Client Handle
      */
     protected AbstractZkLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
+        this.serDe = new LedgerMetadataSerDe();
         this.conf = conf;
         this.zk = zk;
         this.ledgerRootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf);
@@ -264,7 +266,7 @@ public abstract class AbstractZkLedgerManager implements LedgerManager, Watcher
             }
         };
         List<ACL> zkAcls = ZkUtils.getACLs(conf);
-        ZkUtils.asyncCreateFullPathOptimistic(zk, ledgerPath, metadata.serialize(), zkAcls,
+        ZkUtils.asyncCreateFullPathOptimistic(zk, ledgerPath, serDe.serialize(metadata), zkAcls,
                 CreateMode.PERSISTENT, scb, null);
         return promise;
     }
@@ -400,7 +402,7 @@ public abstract class AbstractZkLedgerManager implements LedgerManager, Watcher
 
                 try {
                     LongVersion version = new LongVersion(stat.getVersion());
-                    LedgerMetadata metadata = LedgerMetadata.parseConfig(data, Optional.of(stat.getCtime()));
+                    LedgerMetadata metadata = serDe.parseConfig(data, Optional.of(stat.getCtime()));
                     promise.complete(new Versioned<>(metadata, version));
                 } catch (Throwable t) {
                     LOG.error("Could not parse ledger metadata for ledger: {}", ledgerId, t);
@@ -421,7 +423,7 @@ public abstract class AbstractZkLedgerManager implements LedgerManager, Watcher
         }
         final LongVersion zv = (LongVersion) currentVersion;
         zk.setData(getLedgerPath(ledgerId),
-                   metadata.serialize(), (int) zv.getLongVersion(),
+                   serDe.serialize(metadata), (int) zv.getLongVersion(),
                    new StatCallback() {
             @Override
             public void processResult(int rc, String path, Object ctx, Stat stat) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerMetadataSerDe.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerMetadataSerDe.java
new file mode 100644
index 0000000..26b616b
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerMetadataSerDe.java
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.TextFormat;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.StringReader;
+import java.nio.CharBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.client.LedgerMetadataBuilder;
+import org.apache.bookkeeper.client.api.DigestType;
+import org.apache.bookkeeper.client.api.LedgerMetadata.State;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Serialization and deserialization for LedgerMetadata.
+ */
+public class LedgerMetadataSerDe {
+    private static final Logger log = LoggerFactory.getLogger(LedgerMetadataSerDe.class);
+
+    public static final int CURRENT_METADATA_FORMAT_VERSION = 2;
+    private static final int LOWEST_COMPAT_METADATA_FORMAT_VERSION = 0;
+
+    // for pulling the version
+    private static final String VERSION_KEY = "BookieMetadataFormatVersion";
+    private static final String LINE_SPLITTER = "\n";
+    private static final String FIELD_SPLITTER = "\t";
+
+    // old V1 constants
+    private static final String V1_CLOSED_TAG = "CLOSED";
+    private static final int V1_IN_RECOVERY_ENTRY_ID = -102;
+
+    public byte[] serialize(LedgerMetadata metadata) {
+        if (metadata.getMetadataFormatVersion() == 1) {
+            return serializeVersion1(metadata);
+        }
+
+        StringBuilder s = new StringBuilder();
+        s.append(VERSION_KEY).append(FIELD_SPLITTER)
+            .append(CURRENT_METADATA_FORMAT_VERSION).append(LINE_SPLITTER);
+        s.append(TextFormat.printToString(buildProtoFormat(metadata)));
+        if (log.isDebugEnabled()) {
+            log.debug("Serialized config: {}", s);
+        }
+        return s.toString().getBytes(UTF_8);
+    }
+
+    private byte[] serializeVersion1(LedgerMetadata metadata) {
+        StringBuilder s = new StringBuilder();
+        s.append(VERSION_KEY).append(FIELD_SPLITTER)
+            .append(metadata.getMetadataFormatVersion()).append(LINE_SPLITTER);
+        s.append(metadata.getWriteQuorumSize()).append(LINE_SPLITTER)
+            .append(metadata.getEnsembleSize()).append(LINE_SPLITTER).append(metadata.getLength());
+
+        for (Map.Entry<Long, ? extends List<BookieSocketAddress>> entry : metadata.getAllEnsembles().entrySet()) {
+            s.append(LINE_SPLITTER).append(entry.getKey());
+            for (BookieSocketAddress addr : entry.getValue()) {
+                s.append(FIELD_SPLITTER);
+                s.append(addr.toString());
+            }
+        }
+
+        if (metadata.getState() == State.IN_RECOVERY) {
+            s.append(LINE_SPLITTER).append(V1_IN_RECOVERY_ENTRY_ID)
+                .append(FIELD_SPLITTER).append(V1_CLOSED_TAG);
+        } else if (metadata.getState() == State.CLOSED) {
+            s.append(LINE_SPLITTER).append(metadata.getLastEntryId())
+                .append(FIELD_SPLITTER).append(V1_CLOSED_TAG);
+        } else {
+            checkArgument(metadata.getState() == State.OPEN,
+                          String.format("Unknown state %s for V1 serialization", metadata.getState()));
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Serialized config: {}", s);
+        }
+
+        return s.toString().getBytes(UTF_8);
+    }
+
+    @VisibleForTesting
+    public LedgerMetadataFormat buildProtoFormat(LedgerMetadata metadata) {
+        LedgerMetadataFormat.Builder builder = LedgerMetadataFormat.newBuilder();
+        builder.setQuorumSize(metadata.getWriteQuorumSize())
+            .setAckQuorumSize(metadata.getAckQuorumSize())
+            .setEnsembleSize(metadata.getEnsembleSize())
+            .setLength(metadata.getLength())
+            .setLastEntryId(metadata.getLastEntryId());
+
+        switch (metadata.getState()) {
+        case CLOSED:
+            builder.setState(LedgerMetadataFormat.State.CLOSED);
+            break;
+        case IN_RECOVERY:
+            builder.setState(LedgerMetadataFormat.State.IN_RECOVERY);
+            break;
+        case OPEN:
+            builder.setState(LedgerMetadataFormat.State.OPEN);
+            break;
+        default:
+            checkArgument(false,
+                          String.format("Unknown state %s for protobuf serialization", metadata.getState()));
+            break;
+        }
+
+        if (metadata.shouldStoreCtime()) {
+            builder.setCtime(metadata.getCtime());
+        }
+
+        if (metadata.hasPassword()) {
+            builder.setDigestType(apiToProtoDigestType(metadata.getDigestType()))
+                .setPassword(ByteString.copyFrom(metadata.getPassword()));
+        }
+
+        Map<String, byte[]> customMetadata = metadata.getCustomMetadata();
+        if (customMetadata.size() > 0) {
+            LedgerMetadataFormat.cMetadataMapEntry.Builder cMetadataBuilder =
+                LedgerMetadataFormat.cMetadataMapEntry.newBuilder();
+            for (Map.Entry<String, byte[]> entry : customMetadata.entrySet()) {
+                cMetadataBuilder.setKey(entry.getKey()).setValue(ByteString.copyFrom(entry.getValue()));
+                builder.addCustomMetadata(cMetadataBuilder.build());
+            }
+        }
+
+        for (Map.Entry<Long, ? extends List<BookieSocketAddress>> entry : metadata.getAllEnsembles().entrySet()) {
+            LedgerMetadataFormat.Segment.Builder segmentBuilder = LedgerMetadataFormat.Segment.newBuilder();
+            segmentBuilder.setFirstEntryId(entry.getKey());
+            for (BookieSocketAddress addr : entry.getValue()) {
+                segmentBuilder.addEnsembleMember(addr.toString());
+            }
+            builder.addSegment(segmentBuilder.build());
+        }
+        return builder.build();
+    }
+
+
+    /**
+     * Parses a given byte array and transforms into a LedgerConfig object.
+     *
+     * @param bytes
+     *            byte array to parse
+     * @param metadataStoreCtime
+     *            metadata store creation time, used for legacy ledgers
+     * @return LedgerConfig
+     * @throws IOException
+     *             if the given byte[] cannot be parsed
+     */
+    public LedgerMetadata parseConfig(byte[] bytes,
+                                      Optional<Long> metadataStoreCtime) throws IOException {
+        String config = new String(bytes, UTF_8);
+
+        if (log.isDebugEnabled()) {
+            log.debug("Parsing Config: {}", config);
+        }
+        BufferedReader reader = new BufferedReader(new StringReader(config));
+        String versionLine = reader.readLine();
+        if (versionLine == null) {
+            throw new IOException("Invalid metadata. Content missing");
+        }
+        final int metadataFormatVersion;
+        if (versionLine.startsWith(VERSION_KEY)) {
+            String parts[] = versionLine.split(FIELD_SPLITTER);
+            metadataFormatVersion = Integer.parseInt(parts[1]);
+        } else {
+            // if no version is set, take it to be version 1
+            // as the parsing is the same as what we had before
+            // we introduce versions
+            metadataFormatVersion = 1;
+            // reset the reader
+            reader.close();
+            reader = new BufferedReader(new StringReader(config));
+        }
+
+        if (metadataFormatVersion < LOWEST_COMPAT_METADATA_FORMAT_VERSION
+            || metadataFormatVersion > CURRENT_METADATA_FORMAT_VERSION) {
+            throw new IOException(
+                    String.format("Metadata version not compatible. Expected between %d and %d, but got %d",
+                                  LOWEST_COMPAT_METADATA_FORMAT_VERSION, CURRENT_METADATA_FORMAT_VERSION,
+                                  metadataFormatVersion));
+        }
+
+        if (metadataFormatVersion == 1) {
+            return parseVersion1Config(reader);
+        }
+
+        LedgerMetadataBuilder builder = LedgerMetadataBuilder.create()
+            .withMetadataFormatVersion(metadataFormatVersion);
+
+        // remaining size is total minus the length of the version line and '\n'
+        char[] configBuffer = new char[config.length() - (versionLine.length() + 1)];
+        if (configBuffer.length != reader.read(configBuffer, 0, configBuffer.length)) {
+            throw new IOException("Invalid metadata buffer");
+        }
+
+        LedgerMetadataFormat.Builder formatBuilder = LedgerMetadataFormat.newBuilder();
+        TextFormat.merge((CharSequence) CharBuffer.wrap(configBuffer), formatBuilder);
+        LedgerMetadataFormat data = formatBuilder.build();
+
+        builder.withEnsembleSize(data.getEnsembleSize());
+        builder.withWriteQuorumSize(data.getQuorumSize());
+        if (data.hasAckQuorumSize()) {
+            builder.withAckQuorumSize(data.getAckQuorumSize());
+        } else {
+            builder.withAckQuorumSize(data.getQuorumSize());
+        }
+
+        if (data.hasCtime()) {
+            builder.withCreationTime(data.getCtime()).storingCreationTime(true);
+        } else if (metadataStoreCtime.isPresent()) {
+            builder.withCreationTime(metadataStoreCtime.get()).storingCreationTime(false);
+        }
+
+        if (data.getState() == LedgerMetadataFormat.State.IN_RECOVERY) {
+            builder.withInRecoveryState();
+        } else if (data.getState() == LedgerMetadataFormat.State.CLOSED) {
+            builder.withClosedState().withLastEntryId(data.getLastEntryId()).withLength(data.getLength());
+        }
+
+        if (data.hasPassword()) {
+            builder.withPassword(data.getPassword().toByteArray())
+                .withDigestType(protoToApiDigestType(data.getDigestType()));
+        }
+
+        for (LedgerMetadataFormat.Segment s : data.getSegmentList()) {
+            List<BookieSocketAddress> addrs = new ArrayList<>();
+            for (String addr : s.getEnsembleMemberList()) {
+                addrs.add(new BookieSocketAddress(addr));
+            }
+            builder.newEnsembleEntry(s.getFirstEntryId(), addrs);
+        }
+
+        if (data.getCustomMetadataCount() > 0) {
+            builder.withCustomMetadata(data.getCustomMetadataList().stream().collect(
+                                               Collectors.toMap(e -> e.getKey(),
+                                                                e -> e.getValue().toByteArray())));
+        }
+        return builder.build();
+    }
+
+    static LedgerMetadata parseVersion1Config(BufferedReader reader) throws IOException {
+        LedgerMetadataBuilder builder = LedgerMetadataBuilder.create().withMetadataFormatVersion(1);
+        try {
+            int quorumSize = Integer.parseInt(reader.readLine());
+            int ensembleSize = Integer.parseInt(reader.readLine());
+            long length = Long.parseLong(reader.readLine());
+
+            builder.withEnsembleSize(ensembleSize).withWriteQuorumSize(quorumSize).withAckQuorumSize(quorumSize);
+
+            String line = reader.readLine();
+            while (line != null) {
+                String parts[] = line.split(FIELD_SPLITTER);
+
+                if (parts[1].equals(V1_CLOSED_TAG)) {
+                    Long l = Long.parseLong(parts[0]);
+                    if (l == V1_IN_RECOVERY_ENTRY_ID) {
+                        builder.withInRecoveryState();
+                    } else {
+                        builder.withClosedState().withLastEntryId(l).withLength(length);
+                    }
+                    break;
+                }
+
+                ArrayList<BookieSocketAddress> addrs = new ArrayList<BookieSocketAddress>();
+                for (int j = 1; j < parts.length; j++) {
+                    addrs.add(new BookieSocketAddress(parts[j]));
+                }
+                builder.newEnsembleEntry(Long.parseLong(parts[0]), addrs);
+
+                line = reader.readLine();
+            }
+            return builder.build();
+        } catch (NumberFormatException e) {
+            throw new IOException(e);
+        }
+    }
+
+    private static LedgerMetadataFormat.DigestType apiToProtoDigestType(DigestType digestType) {
+        switch (digestType) {
+        case MAC:
+            return LedgerMetadataFormat.DigestType.HMAC;
+        case CRC32:
+            return LedgerMetadataFormat.DigestType.CRC32;
+        case CRC32C:
+            return LedgerMetadataFormat.DigestType.CRC32C;
+        case DUMMY:
+            return LedgerMetadataFormat.DigestType.DUMMY;
+        default:
+            throw new IllegalArgumentException("Unable to convert digest type " + digestType);
+        }
+    }
+
+    private static DigestType protoToApiDigestType(LedgerMetadataFormat.DigestType digestType) {
+        switch (digestType) {
+        case HMAC:
+            return DigestType.MAC;
+        case CRC32:
+            return DigestType.CRC32;
+        case CRC32C:
+            return DigestType.CRC32C;
+        case DUMMY:
+            return DigestType.DUMMY;
+        default:
+            throw new IllegalArgumentException("Unable to convert digest type " + digestType);
+        }
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
index 35658dd..7e28217 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
@@ -210,7 +210,7 @@ public class MSLedgerManagerFactory extends AbstractZkLedgerManagerFactory {
     static class MsLedgerManager implements LedgerManager, MetastoreWatcher {
         final ZooKeeper zk;
         final AbstractConfiguration conf;
-
+        private final LedgerMetadataSerDe serDe;
         final MetaStore metastore;
         final MetastoreScannableTable ledgerTable;
         final int maxEntriesPerScan;
@@ -284,6 +284,7 @@ public class MSLedgerManagerFactory extends AbstractZkLedgerManagerFactory {
             this.conf = conf;
             this.zk = zk;
             this.metastore = metastore;
+            this.serDe = new LedgerMetadataSerDe();
 
             try {
                 ledgerTable = metastore.createScannableTable(TABLE_NAME);
@@ -394,7 +395,7 @@ public class MSLedgerManagerFactory extends AbstractZkLedgerManagerFactory {
                 }
             };
 
-            ledgerTable.put(ledgerId2Key(lid), new Value().setField(META_FIELD, metadata.serialize()),
+            ledgerTable.put(ledgerId2Key(lid), new Value().setField(META_FIELD, serDe.serialize(metadata)),
                     Version.NEW, msCallback, null);
             return promise;
         }
@@ -440,7 +441,7 @@ public class MSLedgerManagerFactory extends AbstractZkLedgerManagerFactory {
                         return;
                     }
                     try {
-                        LedgerMetadata metadata = LedgerMetadata.parseConfig(
+                        LedgerMetadata metadata = serDe.parseConfig(
                                 value.getValue().getField(META_FIELD), Optional.empty());
                         promise.complete(new Versioned<>(metadata, value.getVersion()));
                     } catch (IOException e) {
@@ -456,7 +457,7 @@ public class MSLedgerManagerFactory extends AbstractZkLedgerManagerFactory {
         @Override
         public CompletableFuture<Versioned<LedgerMetadata>> writeLedgerMetadata(long ledgerId, LedgerMetadata metadata,
                                                                                 Version currentVersion) {
-            Value data = new Value().setField(META_FIELD, metadata.serialize());
+            Value data = new Value().setField(META_FIELD, serDe.serialize(metadata));
 
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Writing ledger {} metadata, version {}", new Object[] { ledgerId, currentVersion });
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/GetLedgerMetaService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/GetLedgerMetaService.java
index 1a924e5..4225c08 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/GetLedgerMetaService.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/GetLedgerMetaService.java
@@ -32,6 +32,7 @@ import org.apache.bookkeeper.http.service.HttpServiceRequest;
 import org.apache.bookkeeper.http.service.HttpServiceResponse;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.LedgerMetadataSerDe;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,10 +47,13 @@ public class GetLedgerMetaService implements HttpEndpointService {
 
     protected ServerConfiguration conf;
     protected BookieServer bookieServer;
+    private final LedgerMetadataSerDe serDe;
+
     public GetLedgerMetaService(ServerConfiguration conf, BookieServer bookieServer) {
         checkNotNull(conf);
         this.conf = conf;
         this.bookieServer = bookieServer;
+        this.serDe = new LedgerMetadataSerDe();
     }
 
     @Override
@@ -66,7 +70,7 @@ public class GetLedgerMetaService implements HttpEndpointService {
             // output <ledgerId: ledgerMetadata>
             Map<String, String> output = Maps.newHashMap();
             LedgerMetadata md = manager.readLedgerMetadata(ledgerId).get().getValue();
-            output.put(ledgerId.toString(), new String(md.serialize(), UTF_8));
+            output.put(ledgerId.toString(), new String(serDe.serialize(md), UTF_8));
 
             manager.close();
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/ListLedgerService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/ListLedgerService.java
index 0d10fcd..1df1b36 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/ListLedgerService.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/ListLedgerService.java
@@ -34,6 +34,7 @@ import org.apache.bookkeeper.http.service.HttpServiceRequest;
 import org.apache.bookkeeper.http.service.HttpServiceResponse;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.LedgerMetadataSerDe;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.slf4j.Logger;
@@ -51,21 +52,24 @@ public class ListLedgerService implements HttpEndpointService {
 
     protected ServerConfiguration conf;
     protected BookieServer bookieServer;
+    private final LedgerMetadataSerDe serDe;
 
     public ListLedgerService(ServerConfiguration conf, BookieServer bookieServer) {
         checkNotNull(conf);
         this.conf = conf;
         this.bookieServer = bookieServer;
+        this.serDe = new LedgerMetadataSerDe();
+
     }
 
     // Number of LedgerMetadata contains in each page
     static final int LIST_LEDGER_BATCH_SIZE = 100;
 
-    static void keepLedgerMetadata(long ledgerId, CompletableFuture<Versioned<LedgerMetadata>> future,
-                                   LinkedHashMap<String, String> output)
+    private void keepLedgerMetadata(long ledgerId, CompletableFuture<Versioned<LedgerMetadata>> future,
+                                    LinkedHashMap<String, String> output)
             throws Exception {
         LedgerMetadata md = future.get().getValue();
-        output.put(Long.valueOf(ledgerId).toString(), new String(md.serialize(), UTF_8));
+        output.put(Long.valueOf(ledgerId).toString(), new String(serDe.serialize(md), UTF_8));
     }
 
     @Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerMetadataTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerMetadataTest.java
index 38e5260..80e1c75 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerMetadataTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerMetadataTest.java
@@ -25,9 +25,11 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import com.google.common.collect.Lists;
+import java.util.Base64;
 import java.util.Collections;
 import java.util.List;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.meta.LedgerMetadataSerDe;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat;
 import org.junit.Test;
@@ -37,8 +39,7 @@ import org.junit.Test;
  */
 public class LedgerMetadataTest {
 
-    private static final String passwdStr = "testPasswd";
-    private static final byte[] passwd = passwdStr.getBytes(UTF_8);
+    private static final byte[] passwd = "testPasswd".getBytes(UTF_8);
 
     @Test
     public void testGetters() {
@@ -78,7 +79,7 @@ public class LedgerMetadataTest {
             .withCreationTime(System.currentTimeMillis())
             .storingCreationTime(true)
             .build();
-        LedgerMetadataFormat format = lm.buildProtoFormat();
+        LedgerMetadataFormat format = new LedgerMetadataSerDe().buildProtoFormat(lm);
         assertTrue(format.hasCtime());
     }
 
@@ -92,7 +93,7 @@ public class LedgerMetadataTest {
         LedgerMetadata lm = LedgerMetadataBuilder.create()
             .newEnsembleEntry(0L, ensemble).build();
 
-        LedgerMetadataFormat format = lm.buildProtoFormat();
+        LedgerMetadataFormat format = new LedgerMetadataSerDe().buildProtoFormat(lm);
         assertFalse(format.hasCtime());
     }
 
@@ -106,11 +107,11 @@ public class LedgerMetadataTest {
         LedgerMetadata lm1 = LedgerMetadataBuilder.create()
             .withDigestType(DigestType.CRC32.toApiDigestType())
             .withPassword(passwd)
-            .newEnsembleEntry(0L, ensemble).build();
+            .newEnsembleEntry(0L, ensemble)
+            .build();
 
-        assertTrue("toString should contain 'password' field", lm1.toString().contains("password"));
-        assertTrue("toString should contain password value", lm1.toString().contains(passwdStr));
-        assertFalse("toSafeString should not contain 'password' field", lm1.toSafeString().contains("password"));
-        assertFalse("toSafeString should not contain password value", lm1.toSafeString().contains(passwdStr));
+        assertTrue("toString should contain password value",
+                   lm1.toString().contains(Base64.getEncoder().encodeToString(passwd)));
+        assertTrue("toSafeString should not contain password value", lm1.toSafeString().contains("OMITTED"));
     }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
index fb56385..6e37fee 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
@@ -91,6 +91,7 @@ public class AbstractZkLedgerManagerTest extends MockZooKeeperTestCase {
     private ScheduledExecutorService scheduler;
     private MockExecutorController schedulerController;
     private LedgerMetadata metadata;
+    private LedgerMetadataSerDe serDe;
 
     @Before
     public void setup() throws Exception {
@@ -140,6 +141,8 @@ public class AbstractZkLedgerManagerTest extends MockZooKeeperTestCase {
         assertSame(mockZk, ledgerManager.zk);
         assertSame(conf, ledgerManager.conf);
         assertSame(scheduler, ledgerManager.scheduler);
+
+        this.serDe = new LedgerMetadataSerDe();
     }
 
     @After
@@ -338,7 +341,7 @@ public class AbstractZkLedgerManagerTest extends MockZooKeeperTestCase {
         when(stat.getCtime()).thenReturn(metadata.getCtime());
         mockZkGetData(
             ledgerStr, false,
-            KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+            KeeperException.Code.OK.intValue(), serDe.serialize(metadata), stat);
 
         Versioned<LedgerMetadata> readMetadata = result(ledgerManager.readLedgerMetadata(ledgerId));
         assertEquals(metadata, readMetadata.getValue());
@@ -395,7 +398,7 @@ public class AbstractZkLedgerManagerTest extends MockZooKeeperTestCase {
 
         mockZkGetData(
             ledgerStr, false,
-            KeeperException.Code.OK.intValue(), metadata.serialize(), null);
+            KeeperException.Code.OK.intValue(), serDe.serialize(metadata), null);
 
         try {
             result(ledgerManager.readLedgerMetadata(ledgerId));
@@ -440,7 +443,7 @@ public class AbstractZkLedgerManagerTest extends MockZooKeeperTestCase {
         when(stat.getVersion()).thenReturn(1235);
         when(stat.getCtime()).thenReturn(metadata.getCtime());
         mockZkSetData(
-            ledgerStr, metadata.serialize(), 1234,
+            ledgerStr, serDe.serialize(metadata), 1234,
             KeeperException.Code.OK.intValue(), stat);
 
         Version v = ledgerManager.writeLedgerMetadata(ledgerId, metadata, new LongVersion(1234L)).get().getVersion();
@@ -457,7 +460,7 @@ public class AbstractZkLedgerManagerTest extends MockZooKeeperTestCase {
         String ledgerStr = String.valueOf(ledgerId);
 
         mockZkSetData(
-            ledgerStr, metadata.serialize(), 1234,
+            ledgerStr, serDe.serialize(metadata), 1234,
             KeeperException.Code.BADVERSION.intValue(), null);
 
         try {
@@ -477,7 +480,7 @@ public class AbstractZkLedgerManagerTest extends MockZooKeeperTestCase {
         String ledgerStr = String.valueOf(ledgerId);
 
         mockZkSetData(
-            ledgerStr, metadata.serialize(), 1234,
+            ledgerStr, serDe.serialize(metadata), 1234,
             KeeperException.Code.CONNECTIONLOSS.intValue(), null);
 
         try {
@@ -531,7 +534,7 @@ public class AbstractZkLedgerManagerTest extends MockZooKeeperTestCase {
         when(stat.getCtime()).thenReturn(metadata.getCtime());
         mockZkGetData(
             ledgerStr, true,
-            KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+            KeeperException.Code.OK.intValue(), serDe.serialize(metadata), stat);
 
         ledgerManager.registerLedgerMetadataListener(ledgerId, listener);
 
@@ -551,7 +554,7 @@ public class AbstractZkLedgerManagerTest extends MockZooKeeperTestCase {
         when(stat.getVersion()).thenReturn(1235);
         mockZkGetData(
             ledgerStr, true,
-            KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+            KeeperException.Code.OK.intValue(), serDe.serialize(metadata), stat);
 
         // notify the watcher event
         notifyWatchedEvent(
@@ -593,7 +596,7 @@ public class AbstractZkLedgerManagerTest extends MockZooKeeperTestCase {
         when(stat.getCtime()).thenReturn(metadata.getCtime());
         mockZkGetData(
             ledgerStr, true,
-            KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+            KeeperException.Code.OK.intValue(), serDe.serialize(metadata), stat);
 
         ledgerManager.registerLedgerMetadataListener(ledgerId, listener);
         assertTrue(ledgerManager.listeners.containsKey(ledgerId));
@@ -645,7 +648,7 @@ public class AbstractZkLedgerManagerTest extends MockZooKeeperTestCase {
         when(stat.getCtime()).thenReturn(metadata.getCtime());
         mockZkGetData(
             ledgerStr, true,
-            KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+            KeeperException.Code.OK.intValue(), serDe.serialize(metadata), stat);
 
         ledgerManager.registerLedgerMetadataListener(ledgerId, listener);
         assertTrue(ledgerManager.listeners.containsKey(ledgerId));
@@ -707,7 +710,7 @@ public class AbstractZkLedgerManagerTest extends MockZooKeeperTestCase {
         // mock get data to return a valid response
         mockZkGetData(
             ledgerStr, true,
-            KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+            KeeperException.Code.OK.intValue(), serDe.serialize(metadata), stat);
 
         schedulerController.advance(Duration.ofMillis(ZK_CONNECT_BACKOFF_MS));
 
@@ -734,7 +737,7 @@ public class AbstractZkLedgerManagerTest extends MockZooKeeperTestCase {
         when(stat.getCtime()).thenReturn(metadata.getCtime());
         mockZkGetData(
             ledgerStr, true,
-            KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+            KeeperException.Code.OK.intValue(), serDe.serialize(metadata), stat);
 
         ledgerManager.registerLedgerMetadataListener(ledgerId, listener);
 
@@ -782,7 +785,7 @@ public class AbstractZkLedgerManagerTest extends MockZooKeeperTestCase {
         when(stat.getCtime()).thenReturn(metadata.getCtime());
         mockZkGetData(
             ledgerStr, true,
-            KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+            KeeperException.Code.OK.intValue(), serDe.serialize(metadata), stat);
 
         ledgerManager.registerLedgerMetadataListener(ledgerId, listener);
         assertTrue(ledgerManager.listeners.containsKey(ledgerId));
@@ -803,7 +806,7 @@ public class AbstractZkLedgerManagerTest extends MockZooKeeperTestCase {
         when(stat.getVersion()).thenReturn(1235);
         mockZkGetData(
             ledgerStr, true,
-            KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+            KeeperException.Code.OK.intValue(), serDe.serialize(metadata), stat);
 
         // unregister the listener
         ledgerManager.unregisterLedgerMetadataListener(ledgerId, listener);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MockLedgerManager.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MockLedgerManager.java
index a8b7e86..ff0126a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MockLedgerManager.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/MockLedgerManager.java
@@ -59,6 +59,7 @@ public class MockLedgerManager implements LedgerManager {
     final Map<Long, Pair<LongVersion, byte[]>> metadataMap;
     final ExecutorService executor;
     final boolean ownsExecutor;
+    final LedgerMetadataSerDe serDe;
     private Hook preWriteHook = (ledgerId, metadata) -> FutureUtils.value(null);
 
     public MockLedgerManager() {
@@ -71,6 +72,7 @@ public class MockLedgerManager implements LedgerManager {
         this.metadataMap = metadataMap;
         this.executor = executor;
         this.ownsExecutor = ownsExecutor;
+        this.serDe = new LedgerMetadataSerDe();
     }
 
     public MockLedgerManager newClient() {
@@ -82,7 +84,7 @@ public class MockLedgerManager implements LedgerManager {
         if (pair == null) {
             return null;
         } else {
-            return new Versioned<>(LedgerMetadata.parseConfig(pair.getRight(), Optional.empty()), pair.getLeft());
+            return new Versioned<>(serDe.parseConfig(pair.getRight(), Optional.empty()), pair.getLeft());
         }
     }
 
@@ -101,7 +103,7 @@ public class MockLedgerManager implements LedgerManager {
                 if (metadataMap.containsKey(ledgerId)) {
                     executeCallback(() -> promise.completeExceptionally(new BKException.BKLedgerExistException()));
                 } else {
-                    metadataMap.put(ledgerId, Pair.of(new LongVersion(0L), metadata.serialize()));
+                    metadataMap.put(ledgerId, Pair.of(new LongVersion(0L), serDe.serialize(metadata)));
                     try {
                         Versioned<LedgerMetadata> readBack = readMetadata(ledgerId);
                         executeCallback(() -> promise.complete(readBack));
@@ -154,7 +156,7 @@ public class MockLedgerManager implements LedgerManager {
                         } else {
                             LongVersion oldVersion = (LongVersion) oldMetadata.getVersion();
                             metadataMap.put(ledgerId, Pair.of(new LongVersion(oldVersion.getLongVersion() + 1),
-                                                              metadata.serialize()));
+                                                              serDe.serialize(metadata)));
                             Versioned<LedgerMetadata> readBack = readMetadata(ledgerId);
                             return FutureUtils.value(readBack);
                         }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java
index 3f587c7..c4d441d 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java
@@ -45,6 +45,7 @@ import org.apache.bookkeeper.http.service.HttpServiceRequest;
 import org.apache.bookkeeper.http.service.HttpServiceResponse;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.LedgerMetadataSerDe;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.replication.AuditorElector;
@@ -386,7 +387,7 @@ public class TestHttpService extends BookKeeperClusterTestCase {
         assertEquals(1, respBody.size());
         // verify LedgerMetadata content is equal
         assertTrue(respBody.get(ledgerId.toString()).toString()
-          .equals(new String(lh[0].getLedgerMetadata().serialize())));
+                .equals(new String(new LedgerMetadataSerDe().serialize(lh[0].getLedgerMetadata()))));
     }
 
     @Test
diff --git a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
index 8c183ab..5155fad 100644
--- a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
+++ b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManager.java
@@ -46,6 +46,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerMetadata;
 import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerMetadataSerDe;
 import org.apache.bookkeeper.metadata.etcd.helpers.KeyIterator;
 import org.apache.bookkeeper.metadata.etcd.helpers.KeyStream;
 import org.apache.bookkeeper.metadata.etcd.helpers.ValueStream;
@@ -63,9 +64,10 @@ import org.apache.zookeeper.AsyncCallback.VoidCallback;
 @Slf4j
 class EtcdLedgerManager implements LedgerManager {
 
-    private static final Function<ByteSequence, LedgerMetadata> LEDGER_METADATA_FUNCTION = bs -> {
+    private final LedgerMetadataSerDe serDe = new LedgerMetadataSerDe();
+    private final Function<ByteSequence, LedgerMetadata> ledgerMetadataFunction = bs -> {
         try {
-            return LedgerMetadata.parseConfig(
+            return serDe.parseConfig(
                 bs.getBytes(),
                 Optional.empty()
             );
@@ -84,6 +86,7 @@ class EtcdLedgerManager implements LedgerManager {
         new ConcurrentLongHashMap<>();
     private final ConcurrentMap<LedgerMetadataListener, LedgerMetadataConsumer> listeners =
         new ConcurrentHashMap<>();
+
     private volatile boolean closed = false;
 
     EtcdLedgerManager(Client client,
@@ -121,7 +124,7 @@ class EtcdLedgerManager implements LedgerManager {
                     .build()))
             .Else(com.coreos.jetcd.op.Op.put(
                 ledgerKeyBs,
-                ByteSequence.fromBytes(metadata.serialize()),
+                ByteSequence.fromBytes(serDe.serialize(metadata)),
                 PutOption.DEFAULT))
             .commit()
             .thenAccept(resp -> {
@@ -223,7 +226,7 @@ class EtcdLedgerManager implements LedgerManager {
                     KeyValue kv = getResp.getKvs().get(0);
                     byte[] data = kv.getValue().getBytes();
                     try {
-                        LedgerMetadata metadata = LedgerMetadata.parseConfig(data, Optional.empty());
+                        LedgerMetadata metadata = serDe.parseConfig(data, Optional.empty());
                         promise.complete(new Versioned<>(metadata, new LongVersion(kv.getModRevision())));
                     } catch (IOException ioe) {
                         log.error("Could not parse ledger metadata for ledger : {}", ledgerId, ioe);
@@ -259,7 +262,7 @@ class EtcdLedgerManager implements LedgerManager {
                 CmpTarget.modRevision(lv.getLongVersion())))
             .Then(com.coreos.jetcd.op.Op.put(
                 ledgerKeyBs,
-                ByteSequence.fromBytes(metadata.serialize()),
+                ByteSequence.fromBytes(serDe.serialize(metadata)),
                 PutOption.DEFAULT))
             .Else(com.coreos.jetcd.op.Op.get(
                 ledgerKeyBs,
@@ -307,7 +310,7 @@ class EtcdLedgerManager implements LedgerManager {
             ledgerId, (lid) -> new ValueStream<>(
                 client,
                 watchClient,
-                LEDGER_METADATA_FUNCTION,
+                ledgerMetadataFunction,
                 ByteSequence.fromString(EtcdUtils.getLedgerKey(scope, ledgerId)))
         );
         LedgerMetadataConsumer lmConsumer = listenerToConsumer(ledgerId, listener,