You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by dr...@apache.org on 2022/08/29 02:51:41 UTC

[ratis] 01/05: RATIS-1677. Do not auto format RaftStorage in RECOVER. (#718)

This is an automated email from the ASF dual-hosted git repository.

dragonyliu pushed a commit to branch release-2.4.0
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 4391247683e834858f30baa42476be6fdda6d51d
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Aug 25 18:50:14 2022 -0700

    RATIS-1677. Do not auto format RaftStorage in RECOVER. (#718)
    
    (cherry picked from commit ebb39e840e6d13245f76a044c1d60bafc7ca44cf)
---
 .../ratis/client/api/GroupManagementApi.java       |  12 +-
 .../apache/ratis/client/impl/ClientProtoUtils.java |   9 +-
 .../ratis/client/impl/GroupManagementImpl.java     |   4 +-
 .../ratis/protocol/GroupManagementRequest.java     |  13 +-
 .../apache/ratis/util/MemoizedCheckedSupplier.java |  90 ++++++++++++
 .../java/org/apache/ratis/util/Preconditions.java  |   5 +
 .../java/org/apache/ratis/util/SizeInBytes.java    |   1 +
 ratis-proto/src/main/proto/Raft.proto              |   1 +
 .../java/org/apache/ratis/server/RaftServer.java   |  16 ++-
 .../apache/ratis/server/storage/RaftStorage.java   |   5 +-
 .../apache/ratis/server/impl/RaftServerImpl.java   |   9 +-
 .../apache/ratis/server/impl/RaftServerProxy.java  |  26 ++--
 .../apache/ratis/server/impl/ServerImplUtils.java  |   5 +-
 .../org/apache/ratis/server/impl/ServerState.java  | 128 ++++++-----------
 .../server/storage/RaftStorageDirectoryImpl.java   |   7 +-
 .../ratis/server/storage/RaftStorageImpl.java      |  10 +-
 .../ratis/server/storage/SnapshotManager.java      |  12 +-
 .../ratis/server/storage/StorageImplUtils.java     | 160 ++++++++++++++++++---
 .../apache/ratis/server/impl/MiniRaftCluster.java  |   6 +-
 .../server/impl/RaftReconfigurationBaseTest.java   |   4 +-
 .../ratis/server/storage/TestRaftStorage.java      |   6 +-
 .../TestStorageImplUtils.java}                     |  27 ++--
 22 files changed, 382 insertions(+), 174 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/GroupManagementApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/GroupManagementApi.java
index 1d3bc00b1..558747048 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/GroupManagementApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/GroupManagementApi.java
@@ -29,8 +29,16 @@ import java.io.IOException;
  * APIs to support group management operations such as add, remove, list and info to a particular server.
  */
 public interface GroupManagementApi {
-  /** Add a new group. */
-  RaftClientReply add(RaftGroup newGroup) throws IOException;
+  /**
+   * Add a new group.
+   * @param format Should it format the storage?
+   */
+  RaftClientReply add(RaftGroup newGroup, boolean format) throws IOException;
+
+  /** The same as add(newGroup, true). */
+  default RaftClientReply add(RaftGroup newGroup) throws IOException {
+    return add(newGroup, true);
+  }
 
   /** Remove a group. */
   RaftClientReply remove(RaftGroupId groupId, boolean deleteDirectory, boolean renameDirectory) throws IOException;
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 859e1d4f0..1ac825850 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -571,8 +571,9 @@ public interface ClientProtoUtils {
     final RaftPeerId serverId = RaftPeerId.valueOf(m.getReplyId());
     switch(p.getOpCase()) {
       case GROUPADD:
+        final GroupAddRequestProto add = p.getGroupAdd();
         return GroupManagementRequest.newAdd(clientId, serverId, m.getCallId(),
-            ProtoUtils.toRaftGroup(p.getGroupAdd().getGroup()));
+            ProtoUtils.toRaftGroup(add.getGroup()), add.getFormat());
       case GROUPREMOVE:
         final GroupRemoveRequestProto remove = p.getGroupRemove();
         return GroupManagementRequest.newRemove(clientId, serverId, m.getCallId(),
@@ -609,8 +610,10 @@ public interface ClientProtoUtils {
         .setRpcRequest(toRaftRpcRequestProtoBuilder(request));
     final GroupManagementRequest.Add add = request.getAdd();
     if (add != null) {
-      b.setGroupAdd(GroupAddRequestProto.newBuilder().setGroup(
-          ProtoUtils.toRaftGroupProtoBuilder(add.getGroup())).build());
+      b.setGroupAdd(GroupAddRequestProto.newBuilder()
+          .setGroup(ProtoUtils.toRaftGroupProtoBuilder(add.getGroup()))
+          .setFormat(add.isFormat())
+          .build());
     }
     final GroupManagementRequest.Remove remove = request.getRemove();
     if (remove != null) {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
index 27e0bbffc..9501bc2ea 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
@@ -43,13 +43,13 @@ class GroupManagementImpl implements GroupManagementApi {
   }
 
   @Override
-  public RaftClientReply add(RaftGroup newGroup) throws IOException {
+  public RaftClientReply add(RaftGroup newGroup, boolean format) throws IOException {
     Objects.requireNonNull(newGroup, "newGroup == null");
 
     final long callId = CallId.getAndIncrement();
     client.getClientRpc().addRaftPeers(newGroup.getPeers());
     return client.io().sendRequestWithRetry(
-        () -> GroupManagementRequest.newAdd(client.getId(), server, callId, newGroup));
+        () -> GroupManagementRequest.newAdd(client.getId(), server, callId, newGroup, format));
   }
 
   @Override
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java
index d370dfc4c..2783d2c65 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java
@@ -26,9 +26,11 @@ public final class GroupManagementRequest extends RaftClientRequest {
 
   public static class Add extends Op {
     private final RaftGroup group;
+    private final boolean format;
 
-    public Add(RaftGroup group) {
+    public Add(RaftGroup group, boolean format) {
       this.group = group;
+      this.format = format;
     }
 
     @Override
@@ -40,6 +42,10 @@ public final class GroupManagementRequest extends RaftClientRequest {
       return group;
     }
 
+    public boolean isFormat() {
+      return format;
+    }
+
     @Override
     public String toString() {
       return JavaUtils.getClassSimpleName(getClass()) + ":" + getGroup();
@@ -79,8 +85,9 @@ public final class GroupManagementRequest extends RaftClientRequest {
     }
   }
 
-  public static GroupManagementRequest newAdd(ClientId clientId, RaftPeerId serverId, long callId, RaftGroup group) {
-    return new GroupManagementRequest(clientId, serverId, callId, new Add(group));
+  public static GroupManagementRequest newAdd(ClientId clientId, RaftPeerId serverId, long callId,
+      RaftGroup group, boolean format) {
+    return new GroupManagementRequest(clientId, serverId, callId, new Add(group, format));
   }
 
   public static GroupManagementRequest newRemove(ClientId clientId, RaftPeerId serverId, long callId,
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/MemoizedCheckedSupplier.java b/ratis-common/src/main/java/org/apache/ratis/util/MemoizedCheckedSupplier.java
new file mode 100644
index 000000000..cf2d06023
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/MemoizedCheckedSupplier.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.ratis.util.function.CheckedSupplier;
+
+import java.util.Objects;
+
+/**
+ * A memoized supplier is a {@link CheckedSupplier}
+ * which gets a value by invoking its initializer once.
+ * and then keeps returning the same value as its supplied results.
+ *
+ * This class is thread safe.
+ *
+ * @param <RETURN> The return type of the supplier.
+ * @param <THROW> The throwable type of the supplier.
+ */
+public final class MemoizedCheckedSupplier<RETURN, THROW extends Throwable>
+    implements CheckedSupplier<RETURN, THROW> {
+  /**
+   * @param supplier to supply at most one non-null value.
+   * @return a {@link MemoizedCheckedSupplier} with the given supplier.
+   */
+  public static <RETURN, THROW extends Throwable> MemoizedCheckedSupplier<RETURN, THROW> valueOf(
+      CheckedSupplier<RETURN, THROW> supplier) {
+    return supplier instanceof MemoizedCheckedSupplier ?
+        (MemoizedCheckedSupplier<RETURN, THROW>) supplier : new MemoizedCheckedSupplier<>(supplier);
+  }
+
+  private final CheckedSupplier<RETURN, THROW> initializer;
+  private volatile RETURN value = null;
+
+  /**
+   * Create a memoized supplier.
+   * @param initializer to supply at most one non-null value.
+   */
+  private MemoizedCheckedSupplier(CheckedSupplier<RETURN, THROW> initializer) {
+    Objects.requireNonNull(initializer, "initializer == null");
+    this.initializer = initializer;
+  }
+
+  /** @return the lazily initialized object. */
+  @Override
+  public RETURN get() throws THROW {
+    RETURN v = value;
+    if (v == null) {
+      synchronized (this) {
+        v = value;
+        if (v == null) {
+          v = value = Objects.requireNonNull(initializer.get(), "initializer.get() returns null");
+        }
+      }
+    }
+    return v;
+  }
+
+  /**
+   * @return the already initialized object.
+   * @throws NullPointerException if the object is uninitialized.
+   */
+  public RETURN getUnchecked() {
+    return Objects.requireNonNull(value, "value == null");
+  }
+
+  /** @return is the object initialized? */
+  public boolean isInitialized() {
+    return value != null;
+  }
+
+  @Override
+  public String toString() {
+    return isInitialized()? "Memoized:" + value: "UNINITIALIZED";
+  }
+}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
index e9a002612..902c1f5e6 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
@@ -19,6 +19,7 @@ package org.apache.ratis.util;
 
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.function.Supplier;
 
@@ -113,6 +114,10 @@ public interface Preconditions {
     return clazz.cast(object);
   }
 
+  static <K, V> void assertEmpty(Map<K, V> map, Object name) {
+    assertTrue(map.isEmpty(), () -> "The " + name + " map is non-empty: " + map);
+  }
+
   static <T> void assertUnique(Iterable<T> first) {
     assertUnique(first, Collections.emptyList());
   }
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java b/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java
index 25667a378..683f0da62 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java
@@ -23,6 +23,7 @@ import java.util.Objects;
  * Size which may be constructed with a {@link TraditionalBinaryPrefix}.
  */
 public final class SizeInBytes {
+  public static final SizeInBytes ZERO = valueOf(0);
   public static final SizeInBytes ONE_KB = valueOf("1k");
   public static final SizeInBytes ONE_MB = valueOf("1m");
 
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index c571f0c73..9fe2494bf 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -469,6 +469,7 @@ message StartLeaderElectionReplyProto {
 // A request to add a new group
 message GroupAddRequestProto {
   RaftGroupProto group = 1; // the group to be added.
+  bool format = 2; // Should it format the storage?
 }
 
 message GroupRemoveRequestProto {
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
index e9719b96c..8d00d29db 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -167,8 +167,8 @@ public interface RaftServer extends Closeable, RpcType.Get,
 
     private static Method initNewRaftServerMethod() {
       final String className = RaftServer.class.getPackage().getName() + ".impl.ServerImplUtils";
-      final Class<?>[] argClasses = {RaftPeerId.class, RaftGroup.class, StateMachine.Registry.class,
-          RaftProperties.class, Parameters.class};
+      final Class<?>[] argClasses = {RaftPeerId.class, RaftGroup.class, RaftStorage.StartupOption.class,
+          StateMachine.Registry.class, RaftProperties.class, Parameters.class};
       try {
         final Class<?> clazz = ReflectionUtils.getClassByName(className);
         return clazz.getMethod("newRaftServer", argClasses);
@@ -177,12 +177,12 @@ public interface RaftServer extends Closeable, RpcType.Get,
       }
     }
 
-    private static RaftServer newRaftServer(RaftPeerId serverId, RaftGroup group,
+    private static RaftServer newRaftServer(RaftPeerId serverId, RaftGroup group, RaftStorage.StartupOption option,
         StateMachine.Registry stateMachineRegistry, RaftProperties properties, Parameters parameters)
         throws IOException {
       try {
         return (RaftServer) NEW_RAFT_SERVER_METHOD.invoke(null,
-            serverId, group, stateMachineRegistry, properties, parameters);
+            serverId, group, option, stateMachineRegistry, properties, parameters);
       } catch (IllegalAccessException e) {
         throw new IllegalStateException("Failed to build " + serverId, e);
       } catch (InvocationTargetException e) {
@@ -193,6 +193,7 @@ public interface RaftServer extends Closeable, RpcType.Get,
     private RaftPeerId serverId;
     private StateMachine.Registry stateMachineRegistry ;
     private RaftGroup group = null;
+    private RaftStorage.StartupOption option = RaftStorage.StartupOption.RECOVER;
     private RaftProperties properties;
     private Parameters parameters;
 
@@ -201,6 +202,7 @@ public interface RaftServer extends Closeable, RpcType.Get,
       return newRaftServer(
           serverId,
           group,
+          option,
           Objects.requireNonNull(stateMachineRegistry , "Neither 'stateMachine' nor 'setStateMachineRegistry' " +
               "is initialized."),
           Objects.requireNonNull(properties, "The 'properties' field is not initialized."),
@@ -230,6 +232,12 @@ public interface RaftServer extends Closeable, RpcType.Get,
       return this;
     }
 
+    /** Set the startup option for the group. */
+    public Builder setOption(RaftStorage.StartupOption option) {
+      this.option = option;
+      return this;
+    }
+
     /** Set {@link RaftProperties}. */
     public Builder setProperties(RaftProperties properties) {
       this.properties = properties;
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java b/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
index dde3c31bc..59d87c37a 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
@@ -62,8 +62,7 @@ public interface RaftStorage extends Closeable {
 
     private static Method initNewRaftStorageMethod() {
       final String className = RaftStorage.class.getPackage().getName() + ".StorageImplUtils";
-      //final String className = "org.apache.ratis.server.storage.RaftStorageImpl";
-      final Class<?>[] argClasses = { File.class, CorruptionPolicy.class, StartupOption.class, long.class };
+      final Class<?>[] argClasses = {File.class, SizeInBytes.class, StartupOption.class, CorruptionPolicy.class};
       try {
         final Class<?> clazz = ReflectionUtils.getClassByName(className);
         return clazz.getMethod("newRaftStorage", argClasses);
@@ -76,7 +75,7 @@ public interface RaftStorage extends Closeable {
         StartupOption option, SizeInBytes storageFreeSpaceMin) throws IOException {
       try {
         return (RaftStorage) NEW_RAFT_STORAGE_METHOD.invoke(null,
-            dir, logCorruptionPolicy, option, storageFreeSpaceMin.getSize());
+            dir, storageFreeSpaceMin, option, logCorruptionPolicy);
       } catch (IllegalAccessException e) {
         throw new IllegalStateException("Failed to build " + dir, e);
       } catch (InvocationTargetException e) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 1994fa8de..ee7b749dd 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -189,7 +189,8 @@ class RaftServerImpl implements RaftServer.Division,
 
   private final AtomicBoolean firstElectionSinceStartup = new AtomicBoolean(true);
 
-  RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy) throws IOException {
+  RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy, RaftStorage.StartupOption option)
+      throws IOException {
     final RaftPeerId id = proxy.getId();
     LOG.info("{}: new RaftServerImpl for {} with {}", id, group, stateMachine);
     this.lifeCycle = new LifeCycle(id);
@@ -202,7 +203,7 @@ class RaftServerImpl implements RaftServer.Division,
     this.sleepDeviationThreshold = RaftServerConfigKeys.sleepDeviationThreshold(properties);
     this.proxy = proxy;
 
-    this.state = new ServerState(id, group, properties, this, stateMachine);
+    this.state = new ServerState(id, group, stateMachine, this, option, properties);
     this.retryCache = new RetryCacheImpl(properties);
     this.dataStreamMap = new DataStreamMapImpl(id);
 
@@ -575,8 +576,8 @@ class RaftServerImpl implements RaftServer.Division,
   }
 
   GroupInfoReply getGroupInfo(GroupInfoRequest request) {
-    return new GroupInfoReply(request, getCommitInfos(),
-        getGroup(), getRoleInfoProto(), state.getStorage().getStorageDir().isHealthy());
+    final RaftStorageDirectory dir = state.getStorage().getStorageDir();
+    return new GroupInfoReply(request, getCommitInfos(), getGroup(), getRoleInfoProto(), dir.isHealthy());
   }
 
   RoleInfoProto getRoleInfoProto() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index ad4d988ab..ad488f53b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -38,6 +38,7 @@ import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.ServerFactory;
+import org.apache.ratis.server.storage.RaftStorage.StartupOption;
 import org.apache.ratis.util.ConcurrentUtils;
 import org.apache.ratis.util.JvmPauseMonitor;
 import org.apache.ratis.server.RaftServerConfigKeys;
@@ -80,7 +81,7 @@ class RaftServerProxy implements RaftServer {
     private final ConcurrentMap<RaftGroupId, CompletableFuture<RaftServerImpl>> map = new ConcurrentHashMap<>();
     private boolean isClosed = false;
 
-    synchronized CompletableFuture<RaftServerImpl> addNew(RaftGroup group) {
+    synchronized CompletableFuture<RaftServerImpl> addNew(RaftGroup group, StartupOption option) {
       if (isClosed) {
         return JavaUtils.completeExceptionally(new AlreadyClosedException(
             getId() + ": Failed to add " + group + " since the server is already closed"));
@@ -90,7 +91,7 @@ class RaftServerProxy implements RaftServer {
             getId() + ": Failed to add " + group + " since the group already exists in the map."));
       }
       final RaftGroupId groupId = group.getGroupId();
-      final CompletableFuture<RaftServerImpl> newImpl = newRaftServerImpl(group);
+      final CompletableFuture<RaftServerImpl> newImpl = newRaftServerImpl(group, option);
       final CompletableFuture<RaftServerImpl> previous = map.put(groupId, newImpl);
       Preconditions.assertNull(previous, "previous");
       LOG.info("{}: addNew {} returns {}", getId(), group, toString(groupId, newImpl));
@@ -230,7 +231,7 @@ class RaftServerProxy implements RaftServer {
   }
 
   /** Check the storage dir and add groups*/
-  void initGroups(RaftGroup group) {
+  void initGroups(RaftGroup group, StartupOption option) {
     final Optional<RaftGroup> raftGroup = Optional.ofNullable(group);
     final RaftGroupId raftGroupId = raftGroup.map(RaftGroup::getGroupId).orElse(null);
     final Predicate<RaftGroupId> shouldAdd = gid -> gid != null && !gid.equals(raftGroupId);
@@ -241,7 +242,7 @@ class RaftServerProxy implements RaftServer {
             .filter(File::isDirectory)
             .forEach(sub -> initGroupDir(sub, shouldAdd)),
         executor).join();
-    raftGroup.ifPresent(this::addGroup);
+    raftGroup.ifPresent(g -> addGroup(g, option));
   }
 
   private void initGroupDir(File sub, Predicate<RaftGroupId> shouldAdd) {
@@ -255,7 +256,7 @@ class RaftServerProxy implements RaftServer {
             " ignoring it. ", getId(), sub.getAbsolutePath());
       }
       if (shouldAdd.test(groupId)) {
-        addGroup(RaftGroup.valueOf(groupId));
+        addGroup(RaftGroup.valueOf(groupId), StartupOption.RECOVER);
       }
     } catch (Exception e) {
       LOG.warn(getId() + ": Failed to initialize the group directory "
@@ -269,11 +270,11 @@ class RaftServerProxy implements RaftServer {
     getDataStreamServerRpc().addRaftPeers(others);
   }
 
-  private CompletableFuture<RaftServerImpl> newRaftServerImpl(RaftGroup group) {
+  private CompletableFuture<RaftServerImpl> newRaftServerImpl(RaftGroup group, StartupOption option) {
     return CompletableFuture.supplyAsync(() -> {
       try {
         addRaftPeers(group.getPeers());
-        return new RaftServerImpl(group, stateMachineRegistry.apply(group.getGroupId()), this);
+        return new RaftServerImpl(group, stateMachineRegistry.apply(group.getGroupId()), this, option);
       } catch(IOException e) {
         throw new CompletionException(getId() + ": Failed to initialize server for " + group, e);
       }
@@ -341,8 +342,8 @@ class RaftServerProxy implements RaftServer {
     return dataStreamServerRpc;
   }
 
-  private CompletableFuture<RaftServerImpl> addGroup(RaftGroup group) {
-    return impls.addNew(group);
+  private CompletableFuture<RaftServerImpl> addGroup(RaftGroup group, StartupOption option) {
+    return impls.addNew(group, option);
   }
 
   private CompletableFuture<RaftServerImpl> getImplFuture(RaftGroupId groupId) {
@@ -466,7 +467,7 @@ class RaftServerProxy implements RaftServer {
     }
     final GroupManagementRequest.Add add = request.getAdd();
     if (add != null) {
-      return groupAddAsync(request, add.getGroup());
+      return groupAddAsync(request, add.getGroup(), add.isFormat());
     }
     final GroupManagementRequest.Remove remove = request.getRemove();
     if (remove != null) {
@@ -477,12 +478,13 @@ class RaftServerProxy implements RaftServer {
         getId() + ": Request not supported " + request));
   }
 
-  private CompletableFuture<RaftClientReply> groupAddAsync(GroupManagementRequest request, RaftGroup newGroup) {
+  private CompletableFuture<RaftClientReply> groupAddAsync(
+      GroupManagementRequest request, RaftGroup newGroup, boolean format) {
     if (!request.getRaftGroupId().equals(newGroup.getGroupId())) {
       return JavaUtils.completeExceptionally(new GroupMismatchException(
           getId() + ": Request group id (" + request.getRaftGroupId() + ") does not match the new group " + newGroup));
     }
-    return impls.addNew(newGroup)
+    return impls.addNew(newGroup, format? StartupOption.FORMAT: StartupOption.RECOVER)
         .thenApplyAsync(newImpl -> {
           LOG.debug("{}: newImpl = {}", getId(), newImpl);
           try {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index 6777b9093..6e1ddd548 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -26,6 +26,7 @@ import org.apache.ratis.server.RaftConfiguration;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.JavaUtils;
@@ -45,7 +46,7 @@ public final class ServerImplUtils {
 
   /** Create a {@link RaftServerProxy}. */
   public static RaftServerProxy newRaftServer(
-      RaftPeerId id, RaftGroup group, StateMachine.Registry stateMachineRegistry,
+      RaftPeerId id, RaftGroup group, RaftStorage.StartupOption option, StateMachine.Registry stateMachineRegistry,
       RaftProperties properties, Parameters parameters) throws IOException {
     RaftServer.LOG.debug("newRaftServer: {}, {}", id, group);
     if (group != null && !group.getPeers().isEmpty()) {
@@ -53,7 +54,7 @@ public final class ServerImplUtils {
       Preconditions.assertNotNull(group.getPeer(id), "RaftPeerId %s is not in RaftGroup %s", id, group);
     }
     final RaftServerProxy proxy = newRaftServer(id, stateMachineRegistry, properties, parameters);
-    proxy.initGroups(group);
+    proxy.initGroups(group, option);
     return proxy;
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 1ee2cab5e..e92f9b911 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -36,20 +36,14 @@ import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedCheckedSupplier;
 import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.Timestamp;
 
-import java.io.Closeable;
-import java.io.File;
 import java.io.IOException;
-import java.nio.channels.OverlappingFileLockException;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
@@ -63,7 +57,7 @@ import static org.apache.ratis.server.RaftServer.Division.LOG;
 /**
  * Common states of a raft peer. Protected by RaftServer's lock.
  */
-class ServerState implements Closeable {
+class ServerState {
   private final RaftGroupMemberId memberId;
   private final RaftServerImpl server;
   /** Raft log */
@@ -73,7 +67,7 @@ class ServerState implements Closeable {
   /** The thread that applies committed log entries to the state machine */
   private final MemoizedSupplier<StateMachineUpdater> stateMachineUpdater;
   /** local storage for log and snapshot */
-  private RaftStorageImpl storage;
+  private final MemoizedCheckedSupplier<RaftStorageImpl, IOException> raftStorage;
   private final SnapshotManager snapshotManager;
   private volatile Timestamp lastNoLeaderTime;
   private final TimeDuration noLeaderTimeout;
@@ -100,9 +94,8 @@ class ServerState implements Closeable {
    */
   private final AtomicReference<TermIndex> latestInstalledSnapshot = new AtomicReference<>();
 
-  ServerState(RaftPeerId id, RaftGroup group, RaftProperties prop,
-              RaftServerImpl server, StateMachine stateMachine)
-      throws IOException {
+  ServerState(RaftPeerId id, RaftGroup group, StateMachine stateMachine, RaftServerImpl server,
+      RaftStorage.StartupOption option, RaftProperties prop) {
     this.memberId = RaftGroupMemberId.valueOf(id, group.getGroupId());
     this.server = server;
     Collection<RaftPeer> followerPeers = group.getPeers().stream()
@@ -117,36 +110,11 @@ class ServerState implements Closeable {
     configurationManager = new ConfigurationManager(initialConf);
     LOG.info("{}: {}", getMemberId(), configurationManager);
 
-    boolean storageFound = false;
-    List<File> directories = RaftServerConfigKeys.storageDir(prop);
-    while (!directories.isEmpty()) {
-      // use full uuid string to create a subdirectory
-      File dir = chooseStorageDir(directories, group.getGroupId().getUuid().toString());
-      try {
-        storage = (RaftStorageImpl) RaftStorage.newBuilder()
-            .setDirectory(dir)
-            .setOption(RaftStorage.StartupOption.RECOVER)
-            .setLogCorruptionPolicy(RaftServerConfigKeys.Log.corruptionPolicy(prop))
-            .setStorageFreeSpaceMin(RaftServerConfigKeys.storageFreeSpaceMin(prop))
-            .build();
-        storageFound = true;
-        break;
-      } catch (IOException e) {
-        if (e.getCause() instanceof OverlappingFileLockException) {
-          throw e;
-        }
-        LOG.warn("Failed to init RaftStorage under {} for {}: {}",
-            dir.getParent(), group.getGroupId().getUuid().toString(), e);
-        directories.removeIf(d -> d.getAbsolutePath().equals(dir.getParent()));
-      }
-    }
-
-    if (!storageFound) {
-      throw new IOException("No healthy directories found for RaftStorage among: " +
-          RaftServerConfigKeys.storageDir(prop));
-    }
+    final String storageDirName = group.getGroupId().getUuid().toString();
+    this.raftStorage = MemoizedCheckedSupplier.valueOf(
+        () -> StorageImplUtils.initRaftStorage(storageDirName, option, prop));
 
-    snapshotManager = new SnapshotManager(storage, id);
+    this.snapshotManager = StorageImplUtils.newSnapshotManager(id);
 
     // On start the leader is null, start the clock now
     this.leaderId = null;
@@ -163,7 +131,8 @@ class ServerState implements Closeable {
   }
 
   void initialize(StateMachine stateMachine) throws IOException {
-    storage.initialize();
+    // initialize raft storage
+    final RaftStorageImpl storage = raftStorage.get();
     // read configuration from the storage
     Optional.ofNullable(storage.readRaftConfiguration()).ifPresent(this::setRaftConf);
 
@@ -180,32 +149,8 @@ class ServerState implements Closeable {
     return memberId;
   }
 
-  static File chooseStorageDir(List<File> volumes, String targetSubDir) throws IOException {
-    final Map<File, Integer> numberOfStorageDirPerVolume = new HashMap<>();
-    final File[] empty = {};
-    final List<File> resultList = new ArrayList<>();
-    volumes.stream().flatMap(volume -> {
-      final File[] dirs = Optional.ofNullable(volume.listFiles()).orElse(empty);
-      numberOfStorageDirPerVolume.put(volume, dirs.length);
-      return Arrays.stream(dirs);
-    }).filter(dir -> targetSubDir.equals(dir.getName()))
-        .forEach(resultList::add);
-
-    if (resultList.size() > 1) {
-      throw new IOException("More than one directories found for " + targetSubDir + ": " + resultList);
-    }
-    if (resultList.size() == 1) {
-      return resultList.get(0);
-    }
-    return numberOfStorageDirPerVolume.entrySet().stream()
-        .min(Map.Entry.comparingByValue())
-        .map(Map.Entry::getKey)
-        .map(v -> new File(v, targetSubDir))
-        .orElseThrow(() -> new IOException("No storage directory found."));
-  }
-
   void writeRaftConfiguration(LogEntryProto conf) {
-    storage.writeRaftConfiguration(conf);
+    getStorage().writeRaftConfiguration(conf);
   }
 
   void start() {
@@ -214,7 +159,8 @@ class ServerState implements Closeable {
 
   private RaftLog initRaftLog(LongSupplier getSnapshotIndexFromStateMachine, RaftProperties prop) {
     try {
-      return initRaftLog(getMemberId(), server, storage, this::setRaftConf, getSnapshotIndexFromStateMachine, prop);
+      return initRaftLog(getMemberId(), server, getStorage(), this::setRaftConf,
+          getSnapshotIndexFromStateMachine, prop);
     } catch (IOException e) {
       throw new IllegalStateException(getMemberId() + ": Failed to initRaftLog.", e);
     }
@@ -259,10 +205,6 @@ class ServerState implements Closeable {
     return leaderId;
   }
 
-  boolean hasLeader() {
-    return leaderId != null;
-  }
-
   /**
    * Become a candidate and start leader election
    */
@@ -434,7 +376,7 @@ class ServerState implements Closeable {
   void updateConfiguration(List<LogEntryProto> entries) {
     if (entries != null && !entries.isEmpty()) {
       configurationManager.removeConfigurations(entries.get(0).getIndex());
-      entries.stream().forEach(this::setRaftConf);
+      entries.forEach(this::setRaftConf);
     }
   }
 
@@ -455,29 +397,45 @@ class ServerState implements Closeable {
     getStateMachineUpdater().reloadStateMachine();
   }
 
-  @Override
-  public void close() throws IOException {
+  void close() {
+    try {
+      if (stateMachineUpdater.isInitialized()) {
+        getStateMachineUpdater().stopAndJoin();
+      }
+    } catch (Throwable e) {
+      LOG.warn(getMemberId() + ": Failed to join " + getStateMachineUpdater(), e);
+    }
+    LOG.info("{}: applyIndex: {}", getMemberId(), getLastAppliedIndex());
+
     try {
-      getStateMachineUpdater().stopAndJoin();
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      LOG.warn("{}: Interrupted when joining stateMachineUpdater", getMemberId(), e);
+      if (log.isInitialized()) {
+        getLog().close();
+      }
+    } catch (Throwable e) {
+      LOG.warn(getMemberId() + ": Failed to close raft log " + getLog(), e);
     }
-    LOG.info("{}: closes. applyIndex: {}", getMemberId(), getLastAppliedIndex());
 
-    getLog().close();
-    storage.close();
+    try {
+      if (raftStorage.isInitialized()) {
+        getStorage().close();
+      }
+    } catch (Throwable e) {
+      LOG.warn(getMemberId() + ": Failed to close raft storage " + getStorage(), e);
+    }
   }
 
-  RaftStorage getStorage() {
-    return storage;
+  RaftStorageImpl getStorage() {
+    if (!raftStorage.isInitialized()) {
+      throw new IllegalStateException(getMemberId() + ": raftStorage is uninitialized.");
+    }
+    return raftStorage.getUnchecked();
   }
 
   void installSnapshot(InstallSnapshotRequestProto request) throws IOException {
     // TODO: verify that we need to install the snapshot
     StateMachine sm = server.getStateMachine();
     sm.pause(); // pause the SM to prepare for install snapshot
-    snapshotManager.installSnapshot(sm, request);
+    snapshotManager.installSnapshot(request, sm, getStorage().getStorageDir());
     updateInstalledSnapshotIndex(TermIndex.valueOf(request.getSnapshotChunk().getTermIndex()));
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java
index a7fa13b53..a86cdf56b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java
@@ -19,6 +19,7 @@ package org.apache.ratis.server.storage;
 
 import org.apache.ratis.util.AtomicFileOutputStream;
 import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.SizeInBytes;
 
 import java.io.File;
 import java.io.IOException;
@@ -51,13 +52,13 @@ class RaftStorageDirectoryImpl implements RaftStorageDirectory {
 
   private final File root; // root directory
   private FileLock lock;   // storage lock
-  private long freeSpaceMin;
+  private final SizeInBytes freeSpaceMin;
 
   /**
    * Constructor
    * @param dir directory corresponding to the storage
    */
-  RaftStorageDirectoryImpl(File dir, long freeSpaceMin) {
+  RaftStorageDirectoryImpl(File dir, SizeInBytes freeSpaceMin) {
     this.root = dir;
     this.lock = null;
     this.freeSpaceMin = freeSpaceMin;
@@ -177,7 +178,7 @@ class RaftStorageDirectoryImpl implements RaftStorageDirectory {
   }
 
   private boolean hasEnoughSpace() {
-    return root.getFreeSpace() > freeSpaceMin;
+    return root.getFreeSpace() >= freeSpaceMin.getSize();
   }
 
   /**
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java
index 6513efd30..56972c3f7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java
@@ -23,6 +23,7 @@ import org.apache.ratis.server.RaftServerConfigKeys.Log.CorruptionPolicy;
 import org.apache.ratis.server.raftlog.LogProtoUtils;
 import org.apache.ratis.server.storage.RaftStorageDirectoryImpl.StorageState;
 import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.SizeInBytes;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -34,17 +35,16 @@ import java.util.Optional;
 
 /** The storage of a {@link org.apache.ratis.server.RaftServer}. */
 public class RaftStorageImpl implements RaftStorage {
-
-  // TODO support multiple storage directories
   private final RaftStorageDirectoryImpl storageDir;
   private final StartupOption startupOption;
   private final CorruptionPolicy logCorruptionPolicy;
   private volatile StorageState state = StorageState.UNINITIALIZED;
   private volatile RaftStorageMetadataFileImpl metaFile;
 
-  RaftStorageImpl(File dir, CorruptionPolicy logCorruptionPolicy, StartupOption option,
-      long storageFeeSpaceMin) {
-    this.storageDir = new RaftStorageDirectoryImpl(dir, storageFeeSpaceMin);
+  RaftStorageImpl(File dir, SizeInBytes freeSpaceMin, StartupOption option, CorruptionPolicy logCorruptionPolicy) {
+    LOG.debug("newRaftStorage: {}, freeSpaceMin={}, option={}, logCorruptionPolicy={}",
+        dir, freeSpaceMin, option, logCorruptionPolicy);
+    this.storageDir = new RaftStorageDirectoryImpl(dir, freeSpaceMin);
     this.logCorruptionPolicy = Optional.ofNullable(logCorruptionPolicy).orElseGet(CorruptionPolicy::getDefault);
     this.startupOption = option;
   }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
index aaa62a783..17294b572 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
@@ -54,21 +54,17 @@ public class SnapshotManager {
   private static final String CORRUPT = ".corrupt";
   private static final String TMP = ".tmp";
 
-  private final RaftStorage storage;
   private final RaftPeerId selfId;
   private final Supplier<MessageDigest> digester = JavaUtils.memoize(MD5Hash::getDigester);
 
-  public SnapshotManager(RaftStorage storage, RaftPeerId selfId) {
-    this.storage = storage;
+  SnapshotManager(RaftPeerId selfId) {
     this.selfId = selfId;
   }
 
-  public void installSnapshot(StateMachine stateMachine,
-      InstallSnapshotRequestProto request) throws IOException {
-    final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest =
-        request.getSnapshotChunk();
+  public void installSnapshot(InstallSnapshotRequestProto request, StateMachine stateMachine, RaftStorageDirectory dir)
+      throws IOException {
+    final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk();
     final long lastIncludedIndex = snapshotChunkRequest.getTermIndex().getIndex();
-    final RaftStorageDirectory dir = storage.getStorageDir();
 
     // create a unique temporary directory
     final File tmpDir =  new File(dir.getTmpDir(), "snapshot-" + snapshotChunkRequest.getRequestId());
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java
index aeff60148..865e2b2b1 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java
@@ -17,38 +17,158 @@
  */
 package org.apache.ratis.server.storage;
 
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.util.IOUtils;
-import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.server.RaftServerConfigKeys.Log;
+import org.apache.ratis.server.storage.RaftStorage.StartupOption;
+import org.apache.ratis.util.SizeInBytes;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.concurrent.TimeUnit;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.ratis.server.RaftServer.Division.LOG;
 
 public final class StorageImplUtils {
+  private static final File[] EMPTY_FILE_ARRAY = {};
 
   private StorageImplUtils() {
     //Never constructed
   }
 
+  public static SnapshotManager newSnapshotManager(RaftPeerId id) {
+    return new SnapshotManager(id);
+  }
+
   /** Create a {@link RaftStorageImpl}. */
-  public static RaftStorageImpl newRaftStorage(File dir, RaftServerConfigKeys.Log.CorruptionPolicy logCorruptionPolicy,
-      RaftStorage.StartupOption option, long storageFeeSpaceMin) throws IOException {
-    RaftStorage.LOG.debug("newRaftStorage: {}, {}, {}, {}",dir, logCorruptionPolicy, option, storageFeeSpaceMin);
-
-    final TimeDuration sleepTime = TimeDuration.valueOf(500, TimeUnit.MILLISECONDS);
-    final RaftStorageImpl raftStorage;
-    try {
-      // attempt multiple times to avoid temporary bind exception
-      raftStorage = JavaUtils.attemptRepeatedly(
-          () -> new RaftStorageImpl(dir, logCorruptionPolicy, option, storageFeeSpaceMin),
-          5, sleepTime, "new RaftStorageImpl", RaftStorage.LOG);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw IOUtils.toInterruptedIOException(
-          "Interrupted when creating RaftStorage " + dir, e);
+  public static RaftStorageImpl newRaftStorage(File dir, SizeInBytes freeSpaceMin,
+      RaftStorage.StartupOption option, Log.CorruptionPolicy logCorruptionPolicy) {
+    return new RaftStorageImpl(dir, freeSpaceMin, option, logCorruptionPolicy);
+  }
+
+  /** @return a list of existing subdirectories matching the given storage directory name from the given volumes. */
+  static List<File> getExistingStorageSubs(List<File> volumes, String targetSubDir, Map<File, Integer> dirsPerVol) {
+    return volumes.stream().flatMap(volume -> {
+          final File[] dirs = Optional.ofNullable(volume.listFiles()).orElse(EMPTY_FILE_ARRAY);
+          Optional.ofNullable(dirsPerVol).ifPresent(map -> map.put(volume, dirs.length));
+          return Arrays.stream(dirs);
+        }).filter(dir -> targetSubDir.equals(dir.getName()))
+        .collect(Collectors.toList());
+  }
+
+  /** @return a volume with the min dirs. */
+  static File chooseMin(Map<File, Integer> dirsPerVol) throws IOException {
+    return dirsPerVol.entrySet().stream()
+        .min(Map.Entry.comparingByValue())
+        .map(Map.Entry::getKey)
+        .orElseThrow(() -> new IOException("No storage directory found."));
+  }
+
+  /**
+   * Choose a {@link RaftStorage} for the given storage directory name from the given configuration properties
+   * and then try to call {@link RaftStorage#initialize()}.
+   * <p />
+   * {@link StartupOption#FORMAT}:
+   * - When there are more than one existing directories, throw an exception.
+   * - When there is an existing directory, throw an exception.
+   * - When there is no existing directory, try to initialize a new directory from the list specified
+   *   in the configuration properties until a directory succeeded or all directories failed.
+   * <p />
+   * {@link StartupOption#RECOVER}:
+   * - When there are more than one existing directories, throw an exception.
+   * - When there is an existing directory, if it fails to initialize, throw an exception but not try a new directory.
+   * - When there is no existing directory, throw an exception.
+   *
+   * @param storageDirName the storage directory name
+   * @param option the startup option
+   * @param properties the configuration properties
+   * @return the chosen storage, which is initialized successfully.
+   */
+  public static RaftStorageImpl initRaftStorage(String storageDirName, StartupOption option,
+      RaftProperties properties) throws IOException {
+    return new Op(storageDirName, option, properties).run();
+  }
+
+  private static class Op {
+    private final String storageDirName;
+    private final StartupOption option;
+
+    private final SizeInBytes freeSpaceMin;
+    private final Log.CorruptionPolicy logCorruptionPolicy;
+    private final List<File> dirsInConf;
+
+    private final List<File> existingSubs;
+    private final Map<File, Integer> dirsPerVol = new HashMap<>();
+
+    Op(String storageDirName, StartupOption option, RaftProperties properties) {
+      this.storageDirName = storageDirName;
+      this.option = option;
+
+      this.freeSpaceMin = RaftServerConfigKeys.storageFreeSpaceMin(properties);
+      this.logCorruptionPolicy = RaftServerConfigKeys.Log.corruptionPolicy(properties);
+      this.dirsInConf = RaftServerConfigKeys.storageDir(properties);
+
+      this.existingSubs = getExistingStorageSubs(dirsInConf, this.storageDirName, dirsPerVol);
+    }
+
+    RaftStorageImpl run() throws IOException {
+      if (option == StartupOption.FORMAT) {
+        return format();
+      } else if (option == StartupOption.RECOVER) {
+        return recover();
+      } else {
+        throw new IllegalArgumentException("Illegal option: " + option);
+      }
+    }
+
+    private RaftStorageImpl format() throws IOException {
+      if (!existingSubs.isEmpty()) {
+        throw new IOException("Failed to " + option + ": One or more existing directories found " + existingSubs
+            + " for " + storageDirName);
+      }
+
+      for (; !dirsPerVol.isEmpty(); ) {
+        final File vol = chooseMin(dirsPerVol);
+        final File dir = new File(vol, storageDirName);
+        try {
+          final RaftStorageImpl storage = newRaftStorage(dir, freeSpaceMin, StartupOption.FORMAT, logCorruptionPolicy);
+          storage.initialize();
+          return storage;
+        } catch (Throwable e) {
+          LOG.warn("Failed to initialize a new directory " + dir.getAbsolutePath(), e);
+          dirsPerVol.remove(vol);
+        }
+      }
+      throw new IOException("Failed to FORMAT a new storage dir for " + storageDirName + " from " + dirsInConf);
+    }
+
+    private RaftStorageImpl recover() throws IOException {
+      final int size = existingSubs.size();
+      if (size > 1) {
+        throw new IOException("Failed to " + option + ": More than one existing directories found "
+            + existingSubs + " for " + storageDirName);
+      } else if (size == 0) {
+        throw new IOException("Failed to " + option + ": Storage directory not found for "
+            + storageDirName + " from " + dirsInConf);
+      }
+
+      final File dir = existingSubs.get(0);
+      try {
+        final RaftStorageImpl storage = newRaftStorage(dir, freeSpaceMin, StartupOption.RECOVER, logCorruptionPolicy);
+        storage.initialize();
+        return storage;
+      } catch (Throwable e) {
+        if (e instanceof IOException) {
+          throw e;
+        }
+        throw new IOException("Failed to initialize the existing directory " + dir.getAbsolutePath(), e);
+      }
     }
-    return raftStorage;
   }
 }
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 1f4047524..dfed96952 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -40,6 +40,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.ServerFactory;
 import org.apache.ratis.server.raftlog.memory.MemoryRaftLog;
 import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.util.CollectionUtils;
@@ -385,8 +386,9 @@ public abstract class MiniRaftCluster implements Closeable {
       }
       final RaftProperties prop = new RaftProperties(properties);
       RaftServerConfigKeys.setStorageDir(prop, Collections.singletonList(dir));
-      return ServerImplUtils.newRaftServer(id, group, getStateMachineRegistry(prop), prop,
-          setPropertiesAndInitParameters(id, group, prop));
+      return ServerImplUtils.newRaftServer(id, group,
+          format? RaftStorage.StartupOption.FORMAT: RaftStorage.StartupOption.RECOVER,
+          getStateMachineRegistry(prop), prop, setPropertiesAndInitParameters(id, group, prop));
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index e09ca19d1..5e6353f92 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -365,7 +365,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
       // start the two new peers
       LOG.info("Start new peers");
       for (RaftPeer np : c1.newPeers) {
-        cluster.restartServer(np.getId(), false);
+        cluster.restartServer(np.getId(), true);
       }
       Assert.assertTrue(client.admin().setConfiguration(c1.allPeersInNewConf).isSuccess());
     }
@@ -504,7 +504,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
 
       LOG.info("start new peers: {}", Arrays.asList(c1.newPeers));
       for (RaftPeer np : c1.newPeers) {
-        cluster.restartServer(np.getId(), false);
+        cluster.restartServer(np.getId(), true);
       }
 
       try {
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
index ffc8de597..7027bd8ea 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
@@ -114,7 +114,7 @@ public class TestRaftStorage extends BaseTest {
    */
   @Test
   public void testStorage() throws Exception {
-    final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(storageDir, 0);
+    final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(storageDir, SizeInBytes.ZERO);
     try {
       StorageState state = sd.analyzeStorage(true);
       Assert.assertEquals(StorageState.NOT_FORMATTED, state);
@@ -171,7 +171,7 @@ public class TestRaftStorage extends BaseTest {
     Assert.assertEquals(StorageState.NORMAL, storage.getState());
     storage.close();
 
-    final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(storageDir, 0);
+    final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(storageDir, SizeInBytes.ZERO);
     File metaFile = sd.getMetaFile();
     FileUtils.move(metaFile, sd.getMetaTmpFile());
 
@@ -286,7 +286,7 @@ public class TestRaftStorage extends BaseTest {
     File mockStorageDir = Mockito.spy(storageDir);
     Mockito.when(mockStorageDir.getFreeSpace()).thenReturn(100L);  // 100B
 
-    final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(mockStorageDir, 104857600); // 100MB
+    final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(mockStorageDir, SizeInBytes.valueOf("100M"));
     StorageState state = sd.analyzeStorage(false);
     Assert.assertEquals(StorageState.NO_SPACE, state);
   }
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/impl/TestServerState.java b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestStorageImplUtils.java
similarity index 82%
rename from ratis-test/src/test/java/org/apache/ratis/server/impl/TestServerState.java
rename to ratis-test/src/test/java/org/apache/ratis/server/storage/TestStorageImplUtils.java
index 75aef53a1..ff38a6bd9 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/impl/TestServerState.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestStorageImplUtils.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ratis.server.impl;
+package org.apache.ratis.server.storage;
 
 import org.apache.ratis.BaseTest;
 import org.apache.ratis.util.FileUtils;
@@ -28,7 +28,9 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Supplier;
@@ -37,13 +39,20 @@ import java.util.stream.IntStream;
 /**
  * Test cases to verify ServerState.
  */
-public class TestServerState {
+public class TestStorageImplUtils {
 
   private static final Supplier<File> rootTestDir = JavaUtils.memoize(
       () -> new File(BaseTest.getRootTestDir(),
-          JavaUtils.getClassSimpleName(TestServerState.class) +
+          JavaUtils.getClassSimpleName(TestStorageImplUtils.class) +
               Integer.toHexString(ThreadLocalRandom.current().nextInt())));
 
+  static File chooseNewStorageDir(List<File> volumes, String sub) throws IOException {
+    final Map<File, Integer> numDirPerVolume = new HashMap<>();
+    StorageImplUtils.getExistingStorageSubs(volumes, sub, numDirPerVolume);
+    final File vol = StorageImplUtils.chooseMin(numDirPerVolume);
+    return new File(vol, sub);
+  }
+
   @AfterClass
   public static void tearDown() throws IOException {
     FileUtils.deleteFully(rootTestDir.get());
@@ -60,8 +69,8 @@ public class TestServerState {
     List<File> directories = Collections.singletonList(testDir);
     String subDirOne = UUID.randomUUID().toString();
     String subDirTwo = UUID.randomUUID().toString();
-    File storageDirOne = ServerState.chooseStorageDir(directories, subDirOne);
-    File storageDirTwo = ServerState.chooseStorageDir(directories, subDirTwo);
+    final File storageDirOne = chooseNewStorageDir(directories, subDirOne);
+    final File storageDirTwo = chooseNewStorageDir(directories, subDirTwo);
     File expectedOne = new File(testDir, subDirOne);
     File expectedTwo = new File(testDir, subDirTwo);
     Assert.assertEquals(expectedOne.getCanonicalPath(),
@@ -100,7 +109,7 @@ public class TestServerState {
               }
             });
     String subDir = UUID.randomUUID().toString();
-    File storageDirectory = ServerState.chooseStorageDir(directories, subDir);
+    final File storageDirectory = chooseNewStorageDir(directories, subDir);
     File expected = new File(directories.get(6), subDir);
     Assert.assertEquals(expected.getCanonicalPath(),
         storageDirectory.getCanonicalPath());
@@ -108,19 +117,15 @@ public class TestServerState {
 
   /**
    * Tests choosing of storage directory when only no volume is configured.
-   *
-   * @throws IOException in case of exception.
    */
   @Test
   public void testChooseStorageDirWithNoVolume() {
     try {
-      ServerState.chooseStorageDir(
-          Collections.emptyList(), UUID.randomUUID().toString());
+      chooseNewStorageDir(Collections.emptyList(), UUID.randomUUID().toString());
       Assert.fail();
     } catch (IOException ex) {
       String expectedErrMsg = "No storage directory found.";
       Assert.assertEquals(expectedErrMsg, ex.getMessage());
     }
   }
-
 }
\ No newline at end of file