You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/16 23:38:33 UTC

[incubator-ratis] branch master updated: RATIS-1244. Move MetaFile and RaftLog.Metadata to ratis-server-api. (#358)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 12c4303  RATIS-1244. Move MetaFile and RaftLog.Metadata to ratis-server-api. (#358)
12c4303 is described below

commit 12c43030c98dfac15d114f5da7abc51335ec0085
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Dec 17 07:36:59 2020 +0800

    RATIS-1244. Move MetaFile and RaftLog.Metadata to ratis-server-api. (#358)
---
 .../java/org/apache/ratis/protocol/RaftPeerId.java |   5 +-
 .../apache/ratis/util/AtomicFileOutputStream.java  |  89 ++++++++------
 .../java/org/apache/ratis/util/AtomicUtils.java    |  29 ++++-
 .../ratis/server/storage/RaftStorageMetadata.java  |  82 +++++++++++++
 .../server/storage/RaftStorageMetadataFile.java    |  32 +++++
 .../org/apache/ratis/server/impl/ServerState.java  |   4 +-
 .../org/apache/ratis/server/raftlog/RaftLog.java   |  25 +---
 .../ratis/server/raftlog/memory/MemoryRaftLog.java |  10 +-
 .../server/raftlog/segmented/SegmentedRaftLog.java |  12 +-
 .../org/apache/ratis/server/storage/MetaFile.java  | 130 ---------------------
 .../apache/ratis/server/storage/RaftStorage.java   |  33 +++---
 .../storage/RaftStorageMetadataFileImpl.java       | 121 +++++++++++++++++++
 .../ratis/server/storage/TestRaftStorage.java      |  57 ++++-----
 13 files changed, 371 insertions(+), 258 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java
index a24c435..1328e1a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -27,6 +27,8 @@ import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Id of Raft Peer which is globally unique.
+ *
+ * This is a value-based class.
  */
 public final class RaftPeerId {
   private static final Map<ByteString, RaftPeerId> BYTE_STRING_MAP = new ConcurrentHashMap<>();
@@ -51,7 +53,6 @@ public final class RaftPeerId {
 
   private RaftPeerId(String id) {
     this.idString = Objects.requireNonNull(id, "id == null");
-    Preconditions.assertTrue(!id.isEmpty(), "id is an empty string.");
     this.id = ByteString.copyFrom(idString, StandardCharsets.UTF_8);
   }
 
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/AtomicFileOutputStream.java b/ratis-common/src/main/java/org/apache/ratis/util/AtomicFileOutputStream.java
index c2d626f..72608a6 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/AtomicFileOutputStream.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/AtomicFileOutputStream.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -25,60 +25,72 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.FilterOutputStream;
-
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
- * A FileOutputStream that has the property that it will only show
- * up at its destination once it has been entirely written and flushed
- * to disk. While being written, it will use a .tmp suffix.
+ * A {@link FilterOutputStream} that writes to a file atomically.
+ * The output file will not show up until it has been entirely written and sync'ed to disk.
+ * It uses a temporary file when it is being written.
+ * The default temporary file has a .tmp suffix.
  *
- * When the output stream is closed, it is flushed, fsynced, and
- * will be moved into place, overwriting any file that already
- * exists at that location.
+ * When the output stream is closed, it is
+ * (1) flushed
+ * (2) sync'ed, and
+ * (3) renamed/moved from the temporary file to the output file.
+ * If the output file already exists, it will be overwritten.
  *
- * <b>NOTE</b>: on Windows platforms, it will not atomically
- * replace the target file - instead the target file is deleted
- * before this one is moved into place.
+ * NOTE that on Windows platforms, the output file, if it exists, is deleted
+ * before the temporary file is moved.
  */
 public class AtomicFileOutputStream extends FilterOutputStream {
-
   public static final String TMP_EXTENSION = ".tmp";
 
   public static final Logger LOG = LoggerFactory.getLogger(AtomicFileOutputStream.class);
 
-  private final File origFile;
+  private final File outFile;
   private final File tmpFile;
+  private final AtomicBoolean isClosed = new AtomicBoolean();
+
+  public AtomicFileOutputStream(File outFile) throws FileNotFoundException {
+    this(outFile, new File(outFile.getParentFile(), outFile.getName() + TMP_EXTENSION));
+  }
 
-  public AtomicFileOutputStream(File f) throws FileNotFoundException {
-    // Code unfortunately must be duplicated below since we can't assign anything
-    // before calling super
-    super(new FileOutputStream(new File(f.getParentFile(), f.getName() + TMP_EXTENSION)));
-    origFile = f.getAbsoluteFile();
-    tmpFile = new File(f.getParentFile(), f.getName() + TMP_EXTENSION).getAbsoluteFile();
+  public AtomicFileOutputStream(File outFile, File tmpFile) throws FileNotFoundException {
+    super(new FileOutputStream(tmpFile));
+    this.outFile = outFile.getAbsoluteFile();
+    this.tmpFile = tmpFile.getAbsoluteFile();
+  }
+
+  public boolean isClosed() {
+    return isClosed.get();
   }
 
   @Override
   public void close() throws IOException {
-    boolean triedToClose = false, success = false;
+    if (!isClosed.compareAndSet(false, true)) {
+      return;
+    }
+    boolean forced = false;
+    boolean closed = false;
     try {
       flush();
       ((FileOutputStream)out).getChannel().force(true);
+      forced = true;
 
-      triedToClose = true;
       super.close();
-      success = true;
-    } finally {
-      if (success) {
-        boolean renamed = tmpFile.renameTo(origFile);
-        if (!renamed) {
-          // On windows, renameTo does not replace.
-          if (origFile.exists() && !origFile.delete()) {
-            throw new IOException("Could not delete original file " + origFile);
-          }
-          FileUtils.move(tmpFile, origFile);
+      closed = true;
+
+      final boolean renamed = tmpFile.renameTo(outFile);
+      if (!renamed) {
+        // On windows, renameTo does not replace.
+        if (outFile.exists() && !outFile.delete()) {
+          throw new IOException("Could not delete original file " + outFile);
         }
-      } else {
-        if (!triedToClose) {
+        FileUtils.move(tmpFile, outFile);
+      }
+    } finally {
+      if (!closed) {
+        if (!forced) {
           // If we failed when flushing, try to close it to not leak an FD
           IOUtils.cleanup(LOG, out);
         }
@@ -96,14 +108,17 @@ public class AtomicFileOutputStream extends FilterOutputStream {
    * in writing.
    */
   public void abort() {
+    if (isClosed.get()) {
+      return;
+    }
     try {
       super.close();
     } catch (IOException ioe) {
       LOG.warn("Unable to abort file " + tmpFile, ioe);
-    }
-    if (!tmpFile.delete()) {
-      LOG.warn("Unable to delete tmp file during abort " + tmpFile);
+    } finally {
+      if (!tmpFile.delete()) {
+        LOG.warn("Unable to delete tmp file during abort " + tmpFile);
+      }
     }
   }
-
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/AtomicUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/AtomicUtils.java
index df71c35..8222702 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/AtomicUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/AtomicUtils.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,7 +17,10 @@
  */
 package org.apache.ratis.util;
 
+import org.apache.ratis.util.function.CheckedFunction;
+
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Utilities related to atomic operations.
@@ -58,4 +61,28 @@ public interface AtomicUtils {
     }
   }
 
+  /**
+   * Similar to {@link AtomicReference#updateAndGet(java.util.function.UnaryOperator)}
+   * except that the update function is checked.
+   */
+  static <E, THROWABLE extends Throwable> E updateAndGet(AtomicReference<E> reference,
+      CheckedFunction<E, E, THROWABLE> update) throws THROWABLE {
+    final AtomicReference<Throwable> throwableRef = new AtomicReference<>();
+    final E updated = reference.updateAndGet(value -> {
+      try {
+        return update.apply(value);
+      } catch (Error | RuntimeException e) {
+        throw e;
+      } catch (Throwable t) {
+        throwableRef.set(t);
+        return value;
+      }
+    });
+    @SuppressWarnings("unchecked")
+    final THROWABLE t = (THROWABLE) throwableRef.get();
+    if (t != null) {
+      throw t;
+    }
+    return updated;
+  }
 }
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorageMetadata.java b/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorageMetadata.java
new file mode 100644
index 0000000..911cc41
--- /dev/null
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorageMetadata.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import org.apache.ratis.proto.RaftProtos.TermIndexProto;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.util.JavaUtils;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * The metadata for a raft storage.
+ *
+ * This is a value-based class.
+ */
+public final class RaftStorageMetadata {
+  private static final RaftStorageMetadata DEFAULT = valueOf(
+      TermIndexProto.getDefaultInstance().getTerm(), RaftPeerId.valueOf(""));
+
+  public static RaftStorageMetadata getDefault() {
+    return DEFAULT;
+  }
+
+  public static RaftStorageMetadata valueOf(long term, RaftPeerId votedFor) {
+    return new RaftStorageMetadata(term, votedFor);
+  }
+
+  private final long term;
+  private final RaftPeerId votedFor;
+
+  private RaftStorageMetadata(long term, RaftPeerId votedFor) {
+    this.term = term;
+    this.votedFor = Optional.ofNullable(votedFor).orElseGet(() -> getDefault().getVotedFor());
+  }
+
+  /** @return the term. */
+  public long getTerm() {
+    return term;
+  }
+
+  /** @return the server it voted for. */
+  public RaftPeerId getVotedFor() {
+    return votedFor;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    } else if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    final RaftStorageMetadata that = (RaftStorageMetadata) obj;
+    return this.term == that.term && Objects.equals(this.votedFor, that.votedFor);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(term, votedFor);
+  }
+
+  @Override
+  public String toString() {
+    return JavaUtils.getClassSimpleName(getClass()) + "{term=" + term + ", votedFor=" + votedFor + '}';
+  }
+}
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorageMetadataFile.java b/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorageMetadataFile.java
new file mode 100644
index 0000000..c986620
--- /dev/null
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorageMetadataFile.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import java.io.IOException;
+
+/**
+ * Represent a file on disk which persistently stores the metadata of a raft storage.
+ * The file is updated atomically.
+ */
+public interface RaftStorageMetadataFile {
+  /** @return the metadata persisted in this file. */
+  RaftStorageMetadata getMetadata() throws IOException;
+
+  /** Persist the given metadata. */
+  void persist(RaftStorageMetadata newMetadata) throws IOException;
+}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 0fd65a8..806300e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -125,7 +125,7 @@ class ServerState implements Closeable {
     // do not know whether the local log entries have been committed.
     this.log = initRaftLog(getMemberId(), server, storage, this::setRaftConf, getSnapshotIndexFromStateMachine, prop);
 
-    RaftLog.Metadata metadata = log.loadMetadata();
+    final RaftStorageMetadata metadata = log.loadMetadata();
     currentTerm.set(metadata.getTerm());
     votedFor = metadata.getVotedFor();
 
@@ -231,7 +231,7 @@ class ServerState implements Closeable {
   }
 
   void persistMetadata() throws IOException {
-    this.log.writeMetadata(currentTerm.get(), votedFor);
+    log.writeMetadata(RaftStorageMetadata.valueOf(currentTerm.get(), votedFor));
   }
 
   /**
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
index a401107..1e864a8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
@@ -20,12 +20,12 @@ package org.apache.ratis.server.raftlog;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.protocol.RaftGroupMemberId;
-import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.exceptions.StateMachineException;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftConfiguration;
 import org.apache.ratis.server.metrics.RaftLogMetrics;
 import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.RaftStorageMetadata;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.AutoCloseableLock;
@@ -437,10 +437,9 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
    * in the RaftPeer's lock. Later we can use an IO task queue to enforce the
    * order.
    */
-  public abstract void writeMetadata(long term, RaftPeerId votedFor)
-      throws IOException;
+  public abstract void writeMetadata(RaftStorageMetadata metadata) throws IOException;
 
-  public abstract Metadata loadMetadata() throws IOException;
+  public abstract RaftStorageMetadata loadMetadata() throws IOException;
 
   public abstract CompletableFuture<Long> syncWithSnapshot(long lastSnapshotIndex);
 
@@ -451,24 +450,6 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
     return getName() + ":" + state + ":c" + getLastCommittedIndex();
   }
 
-  public static class Metadata {
-    private final RaftPeerId votedFor;
-    private final long term;
-
-    public Metadata(RaftPeerId votedFor, long term) {
-      this.votedFor = votedFor;
-      this.term = term;
-    }
-
-    public RaftPeerId getVotedFor() {
-      return votedFor;
-    }
-
-    public long getTerm() {
-      return term;
-    }
-  }
-
   public AutoCloseableLock readLock() {
     return AutoCloseableLock.acquire(lock.readLock());
   }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index 3f19f5b..e80f16c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -19,10 +19,10 @@ package org.apache.ratis.server.raftlog.memory;
 
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftGroupMemberId;
-import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.storage.RaftStorageMetadata;
 import org.apache.ratis.util.AutoCloseableLock;
 import org.apache.ratis.util.Preconditions;
 
@@ -70,7 +70,7 @@ public class MemoryRaftLog extends RaftLog {
   }
 
   private final EntryList entries = new EntryList();
-  private final AtomicReference<Metadata> metadata = new AtomicReference<>(new Metadata(null, 0));
+  private final AtomicReference<RaftStorageMetadata> metadata = new AtomicReference<>(RaftStorageMetadata.getDefault());
 
   public MemoryRaftLog(RaftGroupMemberId memberId,
                        LongSupplier commitIndexSupplier,
@@ -209,12 +209,12 @@ public class MemoryRaftLog extends RaftLog {
   }
 
   @Override
-  public void writeMetadata(long term, RaftPeerId votedFor) {
-    metadata.set(new Metadata(votedFor, term));
+  public void writeMetadata(RaftStorageMetadata newMetadata) {
+    metadata.set(newMetadata);
   }
 
   @Override
-  public Metadata loadMetadata() {
+  public RaftStorageMetadata loadMetadata() {
     return metadata.get();
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index df3fe61..604eae5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -19,13 +19,13 @@ package org.apache.ratis.server.raftlog.segmented;
 
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftGroupMemberId;
-import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.LogProtoUtils;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.raftlog.RaftLogIOException;
+import org.apache.ratis.server.storage.RaftStorageMetadata;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.server.raftlog.segmented.LogSegment.LogRecord;
 import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.TruncateIndices;
@@ -462,15 +462,13 @@ public class SegmentedRaftLog extends RaftLog {
   }
 
   @Override
-  public void writeMetadata(long term, RaftPeerId votedFor) throws IOException {
-    storage.getMetaFile().set(term, votedFor != null ? votedFor.toString() : null);
+  public void writeMetadata(RaftStorageMetadata metadata) throws IOException {
+    storage.getMetaFile().persist(metadata);
   }
 
   @Override
-  public Metadata loadMetadata() throws IOException {
-    return new Metadata(
-        RaftPeerId.getRaftPeerId(storage.getMetaFile().getVotedFor()),
-        storage.getMetaFile().getTerm());
+  public RaftStorageMetadata loadMetadata() throws IOException {
+    return storage.getMetaFile().getMetadata();
   }
 
   @Override
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/MetaFile.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/MetaFile.java
deleted file mode 100644
index 119ccbb..0000000
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/MetaFile.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.ratis.server.storage;
-
-import org.apache.ratis.util.AtomicFileOutputStream;
-import org.apache.ratis.util.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-import java.nio.charset.StandardCharsets;
-import java.util.Properties;
-
-/**
- * Class that represents a file on disk which persistently stores
- * a single <code>long</code> value. The file is updated atomically
- * and durably (i.e fsynced).
- */
-public class MetaFile {
-  private static final Logger LOG = LoggerFactory.getLogger(MetaFile.class);
-  private static final String TERM_KEY = "term";
-  private static final String VOTEDFOR_KEY = "votedFor";
-  static final long DEFAULT_TERM = 0;
-  static final String EMPTY_VOTEFOR = "";
-
-  private final File file;
-  private boolean loaded = false;
-  private long term;
-  private String votedFor;
-
-  MetaFile(File file) {
-    this.file = file;
-    term = DEFAULT_TERM;
-    votedFor = EMPTY_VOTEFOR;
-  }
-
-  boolean exists() {
-    return this.file.exists();
-  }
-
-  public long getTerm() throws IOException {
-    if (!loaded) {
-      readFile();
-      loaded = true;
-    }
-    return term;
-  }
-
-  public String getVotedFor() throws IOException {
-    if (!loaded) {
-      readFile();
-      loaded = true;
-    }
-    return votedFor;
-  }
-
-  public void set(long newTerm, String newVotedFor) throws IOException {
-    newVotedFor = newVotedFor == null ? EMPTY_VOTEFOR : newVotedFor;
-    if (!loaded || (newTerm != term || !newVotedFor.equals(votedFor))) {
-      writeFile(newTerm, newVotedFor);
-    }
-    term = newTerm;
-    votedFor = newVotedFor;
-    loaded = true;
-  }
-
-  /**
-   * Atomically write the given term and votedFor information to the given file,
-   * including fsyncing.
-   *
-   * @throws IOException if the file cannot be written
-   */
-  void writeFile(long givenTerm, String votedForInfo) throws IOException {
-    AtomicFileOutputStream fos = new AtomicFileOutputStream(file);
-    Properties properties = new Properties();
-    properties.setProperty(TERM_KEY, Long.toString(givenTerm));
-    properties.setProperty(VOTEDFOR_KEY, votedForInfo);
-    try {
-      properties.store(
-          new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8)), "");
-      fos.close();
-      fos = null;
-    } finally {
-      if (fos != null) {
-        fos.abort();
-      }
-    }
-  }
-
-  void readFile() throws IOException {
-    term = DEFAULT_TERM;
-    votedFor = EMPTY_VOTEFOR;
-    if (file.exists()) {
-      BufferedReader br = new BufferedReader(
-          new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8));
-      try {
-        Properties properties = new Properties();
-        properties.load(br);
-        if (properties.containsKey(TERM_KEY) &&
-            properties.containsKey(VOTEDFOR_KEY)) {
-          term = Long.parseLong((String) properties.get(TERM_KEY));
-          votedFor = (String) properties.get(VOTEDFOR_KEY);
-        } else {
-          throw new IOException("Corrupted term/votedFor properties: "
-              + properties);
-        }
-      } catch(IOException e) {
-        LOG.warn("Cannot load term/votedFor properties from {}", file, e);
-        throw e;
-      } finally {
-        IOUtils.cleanup(LOG, br);
-      }
-    }
-  }
-}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
index ea2d8d9..9a5eb08 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
@@ -41,14 +42,14 @@ public class RaftStorage implements Closeable {
 
   public enum StartupOption {
     /** Format the storage. */
-    FORMAT;
+    FORMAT
   }
 
   // TODO support multiple storage directories
   private final RaftStorageDirectory storageDir;
   private final StorageState state;
   private final CorruptionPolicy logCorruptionPolicy;
-  private volatile MetaFile metaFile;
+  private volatile RaftStorageMetadataFileImpl metaFile;
 
   public RaftStorage(File dir, CorruptionPolicy logCorruptionPolicy) throws IOException {
     this(dir, logCorruptionPolicy, null);
@@ -85,29 +86,25 @@ public class RaftStorage implements Closeable {
 
   private void format() throws IOException {
     storageDir.clearDirectory();
-    metaFile = writeMetaFile(MetaFile.DEFAULT_TERM, MetaFile.EMPTY_VOTEFOR);
-    LOG.info("Storage directory " + storageDir.getRoot()
-        + " has been successfully formatted.");
-  }
-
-  private MetaFile writeMetaFile(long term, String votedFor) throws IOException {
-    MetaFile mFile = new MetaFile(storageDir.getMetaFile());
-    mFile.set(term, votedFor);
-    return mFile;
+    metaFile = new RaftStorageMetadataFileImpl(storageDir.getMetaFile());
+    metaFile.persist(RaftStorageMetadata.getDefault());
+    LOG.info("Storage directory " + storageDir.getRoot() + " has been successfully formatted.");
   }
 
   private void cleanMetaTmpFile() throws IOException {
     Files.deleteIfExists(storageDir.getMetaTmpFile().toPath());
   }
 
-  private StorageState analyzeAndRecoverStorage(boolean toLock)
-      throws IOException {
+  private StorageState analyzeAndRecoverStorage(boolean toLock) throws IOException {
     StorageState storageState = storageDir.analyzeStorage(toLock);
     if (storageState == StorageState.NORMAL) {
-      metaFile = new MetaFile(storageDir.getMetaFile());
-      Preconditions.assertTrue(metaFile.exists(),
-          () -> "Meta file " + metaFile + " does not exists.");
-      metaFile.readFile();
+      final File f = storageDir.getMetaFile();
+      if (!f.exists()) {
+        throw new FileNotFoundException("Metadata file " + f + " does not exists.");
+      }
+      metaFile = new RaftStorageMetadataFileImpl(f);
+      final RaftStorageMetadata metadata = metaFile.getMetadata();
+      LOG.info("Read {} from {}", metadata, f);
       // Existence of raft-meta.tmp means the change of votedFor/term has not
       // been committed. Thus we should delete the tmp file.
       cleanMetaTmpFile();
@@ -130,7 +127,7 @@ public class RaftStorage implements Closeable {
     storageDir.unlock();
   }
 
-  public MetaFile getMetaFile() {
+  public RaftStorageMetadataFile getMetaFile() {
     return metaFile;
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageMetadataFileImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageMetadataFileImpl.java
new file mode 100644
index 0000000..7c01da7
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageMetadataFileImpl.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.util.AtomicFileOutputStream;
+import org.apache.ratis.util.AtomicUtils;
+import org.apache.ratis.util.JavaUtils;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Represent a file on disk which persistently stores the metadata of a raft storage.
+ * The file is updated atomically.
+ */
+class RaftStorageMetadataFileImpl implements RaftStorageMetadataFile {
+  private static final String TERM_KEY = "term";
+  private static final String VOTED_FOR_KEY = "votedFor";
+
+  private final File file;
+  private final AtomicReference<RaftStorageMetadata> metadata = new AtomicReference<>();
+
+  RaftStorageMetadataFileImpl(File file) {
+    this.file = file;
+  }
+
+  @Override
+  public RaftStorageMetadata getMetadata() throws IOException {
+    return AtomicUtils.updateAndGet(metadata, value -> value != null? value: load(file));
+  }
+
+  @Override
+  public void persist(RaftStorageMetadata newMetadata) throws IOException {
+    AtomicUtils.updateAndGet(metadata,
+        old -> Objects.equals(old, newMetadata)? old: atomicWrite(newMetadata, file));
+  }
+
+  @Override
+  public String toString() {
+    return JavaUtils.getClassSimpleName(RaftStorageMetadataFile.class) + ":" + file;
+  }
+
+  /**
+   * Atomically write the given term and votedFor information to the given file,
+   * including fsyncing.
+   *
+   * @throws IOException if the file cannot be written
+   */
+  static RaftStorageMetadata atomicWrite(RaftStorageMetadata metadata, File file) throws IOException {
+    final Properties properties = new Properties();
+    properties.setProperty(TERM_KEY, Long.toString(metadata.getTerm()));
+    properties.setProperty(VOTED_FOR_KEY, metadata.getVotedFor().toString());
+
+    try(BufferedWriter out = new BufferedWriter(
+        new OutputStreamWriter(new AtomicFileOutputStream(file), StandardCharsets.UTF_8))) {
+      properties.store(out, "");
+    }
+    return metadata;
+  }
+
+  static Object getValue(String key, Properties properties) throws IOException {
+    return Optional.ofNullable(properties.getProperty(key)).orElseThrow(
+        () -> new IOException("'" + key + "' not found in properties: " + properties));
+  }
+
+  static long getTerm(Properties properties) throws IOException {
+    try {
+      return Long.parseLong((String) getValue(TERM_KEY, properties));
+    } catch (Exception e) {
+      throw new IOException("Failed to parse '" + TERM_KEY + "' from properties: " + properties, e);
+    }
+  }
+
+  static RaftPeerId getVotedFor(Properties properties) throws IOException {
+    try {
+      return RaftPeerId.valueOf((String) getValue(VOTED_FOR_KEY, properties));
+    } catch (Exception e) {
+      throw new IOException("Failed to parse '" + VOTED_FOR_KEY + "' from properties: " + properties, e);
+    }
+  }
+
+  static RaftStorageMetadata load(File file) throws IOException {
+    if (!file.exists()) {
+      return RaftStorageMetadata.getDefault();
+    }
+    try(BufferedReader br = new BufferedReader(new InputStreamReader(
+        new FileInputStream(file), StandardCharsets.UTF_8))) {
+      Properties properties = new Properties();
+      properties.load(br);
+      return RaftStorageMetadata.valueOf(getTerm(properties), getVotedFor(properties));
+    } catch (IOException e) {
+      throw new IOException("Failed to load " + file, e);
+    }
+  }
+}
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
index 2816d3b..ce9e0b7 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server.storage;
 import static org.apache.ratis.statemachine.impl.SimpleStateMachineStorage.SNAPSHOT_REGEX;
 
 import org.apache.ratis.BaseTest;
+import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.RaftStorageDirectory.StorageState;
 import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
@@ -37,6 +38,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 
 /**
@@ -107,50 +109,37 @@ public class TestRaftStorage extends BaseTest {
     storage.close();
 
     Assert.assertEquals(StorageState.NORMAL, sd.analyzeStorage(false));
-    File m = sd.getMetaFile();
-    Assert.assertTrue(m.exists());
-    MetaFile metaFile = new MetaFile(m);
-    Assert.assertEquals(MetaFile.DEFAULT_TERM, metaFile.getTerm());
-    Assert.assertEquals(MetaFile.EMPTY_VOTEFOR, metaFile.getVotedFor());
-
-    metaFile.set(123, "peer1");
-    metaFile.readFile();
-    Assert.assertEquals(123, metaFile.getTerm());
-    Assert.assertEquals("peer1", metaFile.getVotedFor());
-
-    MetaFile metaFile2 = new MetaFile(m);
-    Assert.assertFalse((Boolean) Whitebox.getInternalState(metaFile2, "loaded"));
-    Assert.assertEquals(123, metaFile.getTerm());
-    Assert.assertEquals("peer1", metaFile.getVotedFor());
+    assertMetadataFile(sd.getMetaFile());
 
     // test format
     storage = formatRaftStorage(storageDir);
     Assert.assertEquals(StorageState.NORMAL, storage.getState());
-    metaFile = new MetaFile(sd.getMetaFile());
-    Assert.assertEquals(MetaFile.DEFAULT_TERM, metaFile.getTerm());
-    Assert.assertEquals(MetaFile.EMPTY_VOTEFOR, metaFile.getVotedFor());
+    final RaftStorageMetadataFile metaFile = new RaftStorageMetadataFileImpl(sd.getMetaFile());
+    Assert.assertEquals(RaftStorageMetadata.getDefault(), metaFile.getMetadata());
     storage.close();
   }
 
+  static void assertMetadataFile(File m) throws Exception {
+    Assert.assertTrue(m.exists());
+    final RaftStorageMetadataFile metaFile = new RaftStorageMetadataFileImpl(m);
+    Assert.assertEquals(RaftStorageMetadata.getDefault(), metaFile.getMetadata());
+
+    final RaftPeerId peer1 = RaftPeerId.valueOf("peer1");
+    final RaftStorageMetadata metadata = RaftStorageMetadata.valueOf(123, peer1);
+    metaFile.persist(metadata);
+    Assert.assertEquals(metadata.getTerm(), 123);
+    Assert.assertEquals(metadata.getVotedFor(), peer1);
+    Assert.assertEquals(metadata, metaFile.getMetadata());
+
+    final RaftStorageMetadataFile metaFile2 = new RaftStorageMetadataFileImpl(m);
+    Assert.assertNull(((AtomicReference<?>) Whitebox.getInternalState(metaFile2, "metadata")).get());
+    Assert.assertEquals(metadata, metaFile2.getMetadata());
+  }
+
   @Test
   public void testMetaFile() throws Exception {
     RaftStorage storage = formatRaftStorage(storageDir);
-    File m = storage.getStorageDir().getMetaFile();
-    Assert.assertTrue(m.exists());
-    MetaFile metaFile = new MetaFile(m);
-    Assert.assertEquals(MetaFile.DEFAULT_TERM, metaFile.getTerm());
-    Assert.assertEquals(MetaFile.EMPTY_VOTEFOR, metaFile.getVotedFor());
-
-    metaFile.set(123, "peer1");
-    metaFile.readFile();
-    Assert.assertEquals(123, metaFile.getTerm());
-    Assert.assertEquals("peer1", metaFile.getVotedFor());
-
-    MetaFile metaFile2 = new MetaFile(m);
-    Assert.assertFalse((Boolean) Whitebox.getInternalState(metaFile2, "loaded"));
-    Assert.assertEquals(123, metaFile.getTerm());
-    Assert.assertEquals("peer1", metaFile.getVotedFor());
-
+    assertMetadataFile(storage.getStorageDir().getMetaFile());
     storage.close();
   }