You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by dr...@apache.org on 2022/09/30 06:45:18 UTC

[ratis] 01/03: Revert "RATIS-1677. Do not auto format RaftStorage in RECOVER. (#718)"

This is an automated email from the ASF dual-hosted git repository.

dragonyliu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 4332b0fe20234915765b5c16ed13883d6d651370
Author: Yaolong Liu <ly...@163.com>
AuthorDate: Wed Sep 21 19:13:03 2022 +0800

    Revert "RATIS-1677. Do not auto format RaftStorage in RECOVER. (#718)"
    
    This reverts commit 7a4543b1cedfb6b9b391ed6c6094b88fecacd546.
---
 .../ratis/client/api/GroupManagementApi.java       |  12 +-
 .../apache/ratis/client/impl/ClientProtoUtils.java |   9 +-
 .../ratis/client/impl/GroupManagementImpl.java     |   4 +-
 .../ratis/protocol/GroupManagementRequest.java     |  13 +-
 .../apache/ratis/util/MemoizedCheckedSupplier.java |  90 ------------
 .../java/org/apache/ratis/util/Preconditions.java  |   5 -
 .../java/org/apache/ratis/util/SizeInBytes.java    |   1 -
 ratis-proto/src/main/proto/Raft.proto              |   1 -
 .../java/org/apache/ratis/server/RaftServer.java   |  16 +--
 .../apache/ratis/server/storage/RaftStorage.java   |   5 +-
 .../apache/ratis/server/impl/RaftServerImpl.java   |   9 +-
 .../apache/ratis/server/impl/RaftServerProxy.java  |  26 ++--
 .../apache/ratis/server/impl/ServerImplUtils.java  |   5 +-
 .../org/apache/ratis/server/impl/ServerState.java  | 128 +++++++++++------
 .../server/storage/RaftStorageDirectoryImpl.java   |   7 +-
 .../ratis/server/storage/RaftStorageImpl.java      |  10 +-
 .../ratis/server/storage/SnapshotManager.java      |  12 +-
 .../ratis/server/storage/StorageImplUtils.java     | 160 +++------------------
 .../apache/ratis/server/impl/MiniRaftCluster.java  |   6 +-
 .../server/impl/RaftReconfigurationBaseTest.java   |   4 +-
 .../TestServerState.java}                          |  27 ++--
 .../ratis/server/storage/TestRaftStorage.java      |   6 +-
 22 files changed, 174 insertions(+), 382 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/GroupManagementApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/GroupManagementApi.java
index 558747048..1d3bc00b1 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/GroupManagementApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/GroupManagementApi.java
@@ -29,16 +29,8 @@ import java.io.IOException;
  * APIs to support group management operations such as add, remove, list and info to a particular server.
  */
 public interface GroupManagementApi {
-  /**
-   * Add a new group.
-   * @param format Should it format the storage?
-   */
-  RaftClientReply add(RaftGroup newGroup, boolean format) throws IOException;
-
-  /** The same as add(newGroup, true). */
-  default RaftClientReply add(RaftGroup newGroup) throws IOException {
-    return add(newGroup, true);
-  }
+  /** Add a new group. */
+  RaftClientReply add(RaftGroup newGroup) throws IOException;
 
   /** Remove a group. */
   RaftClientReply remove(RaftGroupId groupId, boolean deleteDirectory, boolean renameDirectory) throws IOException;
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 1ac825850..859e1d4f0 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -571,9 +571,8 @@ public interface ClientProtoUtils {
     final RaftPeerId serverId = RaftPeerId.valueOf(m.getReplyId());
     switch(p.getOpCase()) {
       case GROUPADD:
-        final GroupAddRequestProto add = p.getGroupAdd();
         return GroupManagementRequest.newAdd(clientId, serverId, m.getCallId(),
-            ProtoUtils.toRaftGroup(add.getGroup()), add.getFormat());
+            ProtoUtils.toRaftGroup(p.getGroupAdd().getGroup()));
       case GROUPREMOVE:
         final GroupRemoveRequestProto remove = p.getGroupRemove();
         return GroupManagementRequest.newRemove(clientId, serverId, m.getCallId(),
@@ -610,10 +609,8 @@ public interface ClientProtoUtils {
         .setRpcRequest(toRaftRpcRequestProtoBuilder(request));
     final GroupManagementRequest.Add add = request.getAdd();
     if (add != null) {
-      b.setGroupAdd(GroupAddRequestProto.newBuilder()
-          .setGroup(ProtoUtils.toRaftGroupProtoBuilder(add.getGroup()))
-          .setFormat(add.isFormat())
-          .build());
+      b.setGroupAdd(GroupAddRequestProto.newBuilder().setGroup(
+          ProtoUtils.toRaftGroupProtoBuilder(add.getGroup())).build());
     }
     final GroupManagementRequest.Remove remove = request.getRemove();
     if (remove != null) {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
index 9501bc2ea..27e0bbffc 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
@@ -43,13 +43,13 @@ class GroupManagementImpl implements GroupManagementApi {
   }
 
   @Override
-  public RaftClientReply add(RaftGroup newGroup, boolean format) throws IOException {
+  public RaftClientReply add(RaftGroup newGroup) throws IOException {
     Objects.requireNonNull(newGroup, "newGroup == null");
 
     final long callId = CallId.getAndIncrement();
     client.getClientRpc().addRaftPeers(newGroup.getPeers());
     return client.io().sendRequestWithRetry(
-        () -> GroupManagementRequest.newAdd(client.getId(), server, callId, newGroup, format));
+        () -> GroupManagementRequest.newAdd(client.getId(), server, callId, newGroup));
   }
 
   @Override
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java
index 2783d2c65..d370dfc4c 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java
@@ -26,11 +26,9 @@ public final class GroupManagementRequest extends RaftClientRequest {
 
   public static class Add extends Op {
     private final RaftGroup group;
-    private final boolean format;
 
-    public Add(RaftGroup group, boolean format) {
+    public Add(RaftGroup group) {
       this.group = group;
-      this.format = format;
     }
 
     @Override
@@ -42,10 +40,6 @@ public final class GroupManagementRequest extends RaftClientRequest {
       return group;
     }
 
-    public boolean isFormat() {
-      return format;
-    }
-
     @Override
     public String toString() {
       return JavaUtils.getClassSimpleName(getClass()) + ":" + getGroup();
@@ -85,9 +79,8 @@ public final class GroupManagementRequest extends RaftClientRequest {
     }
   }
 
-  public static GroupManagementRequest newAdd(ClientId clientId, RaftPeerId serverId, long callId,
-      RaftGroup group, boolean format) {
-    return new GroupManagementRequest(clientId, serverId, callId, new Add(group, format));
+  public static GroupManagementRequest newAdd(ClientId clientId, RaftPeerId serverId, long callId, RaftGroup group) {
+    return new GroupManagementRequest(clientId, serverId, callId, new Add(group));
   }
 
   public static GroupManagementRequest newRemove(ClientId clientId, RaftPeerId serverId, long callId,
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/MemoizedCheckedSupplier.java b/ratis-common/src/main/java/org/apache/ratis/util/MemoizedCheckedSupplier.java
deleted file mode 100644
index cf2d06023..000000000
--- a/ratis-common/src/main/java/org/apache/ratis/util/MemoizedCheckedSupplier.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.util;
-
-import org.apache.ratis.util.function.CheckedSupplier;
-
-import java.util.Objects;
-
-/**
- * A memoized supplier is a {@link CheckedSupplier}
- * which gets a value by invoking its initializer once.
- * and then keeps returning the same value as its supplied results.
- *
- * This class is thread safe.
- *
- * @param <RETURN> The return type of the supplier.
- * @param <THROW> The throwable type of the supplier.
- */
-public final class MemoizedCheckedSupplier<RETURN, THROW extends Throwable>
-    implements CheckedSupplier<RETURN, THROW> {
-  /**
-   * @param supplier to supply at most one non-null value.
-   * @return a {@link MemoizedCheckedSupplier} with the given supplier.
-   */
-  public static <RETURN, THROW extends Throwable> MemoizedCheckedSupplier<RETURN, THROW> valueOf(
-      CheckedSupplier<RETURN, THROW> supplier) {
-    return supplier instanceof MemoizedCheckedSupplier ?
-        (MemoizedCheckedSupplier<RETURN, THROW>) supplier : new MemoizedCheckedSupplier<>(supplier);
-  }
-
-  private final CheckedSupplier<RETURN, THROW> initializer;
-  private volatile RETURN value = null;
-
-  /**
-   * Create a memoized supplier.
-   * @param initializer to supply at most one non-null value.
-   */
-  private MemoizedCheckedSupplier(CheckedSupplier<RETURN, THROW> initializer) {
-    Objects.requireNonNull(initializer, "initializer == null");
-    this.initializer = initializer;
-  }
-
-  /** @return the lazily initialized object. */
-  @Override
-  public RETURN get() throws THROW {
-    RETURN v = value;
-    if (v == null) {
-      synchronized (this) {
-        v = value;
-        if (v == null) {
-          v = value = Objects.requireNonNull(initializer.get(), "initializer.get() returns null");
-        }
-      }
-    }
-    return v;
-  }
-
-  /**
-   * @return the already initialized object.
-   * @throws NullPointerException if the object is uninitialized.
-   */
-  public RETURN getUnchecked() {
-    return Objects.requireNonNull(value, "value == null");
-  }
-
-  /** @return is the object initialized? */
-  public boolean isInitialized() {
-    return value != null;
-  }
-
-  @Override
-  public String toString() {
-    return isInitialized()? "Memoized:" + value: "UNINITIALIZED";
-  }
-}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
index 902c1f5e6..e9a002612 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
@@ -19,7 +19,6 @@ package org.apache.ratis.util;
 
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
 import java.util.function.Supplier;
 
@@ -114,10 +113,6 @@ public interface Preconditions {
     return clazz.cast(object);
   }
 
-  static <K, V> void assertEmpty(Map<K, V> map, Object name) {
-    assertTrue(map.isEmpty(), () -> "The " + name + " map is non-empty: " + map);
-  }
-
   static <T> void assertUnique(Iterable<T> first) {
     assertUnique(first, Collections.emptyList());
   }
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java b/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java
index 683f0da62..25667a378 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java
@@ -23,7 +23,6 @@ import java.util.Objects;
  * Size which may be constructed with a {@link TraditionalBinaryPrefix}.
  */
 public final class SizeInBytes {
-  public static final SizeInBytes ZERO = valueOf(0);
   public static final SizeInBytes ONE_KB = valueOf("1k");
   public static final SizeInBytes ONE_MB = valueOf("1m");
 
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index 9fe2494bf..c571f0c73 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -469,7 +469,6 @@ message StartLeaderElectionReplyProto {
 // A request to add a new group
 message GroupAddRequestProto {
   RaftGroupProto group = 1; // the group to be added.
-  bool format = 2; // Should it format the storage?
 }
 
 message GroupRemoveRequestProto {
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
index 8d00d29db..e9719b96c 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -167,8 +167,8 @@ public interface RaftServer extends Closeable, RpcType.Get,
 
     private static Method initNewRaftServerMethod() {
       final String className = RaftServer.class.getPackage().getName() + ".impl.ServerImplUtils";
-      final Class<?>[] argClasses = {RaftPeerId.class, RaftGroup.class, RaftStorage.StartupOption.class,
-          StateMachine.Registry.class, RaftProperties.class, Parameters.class};
+      final Class<?>[] argClasses = {RaftPeerId.class, RaftGroup.class, StateMachine.Registry.class,
+          RaftProperties.class, Parameters.class};
       try {
         final Class<?> clazz = ReflectionUtils.getClassByName(className);
         return clazz.getMethod("newRaftServer", argClasses);
@@ -177,12 +177,12 @@ public interface RaftServer extends Closeable, RpcType.Get,
       }
     }
 
-    private static RaftServer newRaftServer(RaftPeerId serverId, RaftGroup group, RaftStorage.StartupOption option,
+    private static RaftServer newRaftServer(RaftPeerId serverId, RaftGroup group,
         StateMachine.Registry stateMachineRegistry, RaftProperties properties, Parameters parameters)
         throws IOException {
       try {
         return (RaftServer) NEW_RAFT_SERVER_METHOD.invoke(null,
-            serverId, group, option, stateMachineRegistry, properties, parameters);
+            serverId, group, stateMachineRegistry, properties, parameters);
       } catch (IllegalAccessException e) {
         throw new IllegalStateException("Failed to build " + serverId, e);
       } catch (InvocationTargetException e) {
@@ -193,7 +193,6 @@ public interface RaftServer extends Closeable, RpcType.Get,
     private RaftPeerId serverId;
     private StateMachine.Registry stateMachineRegistry ;
     private RaftGroup group = null;
-    private RaftStorage.StartupOption option = RaftStorage.StartupOption.RECOVER;
     private RaftProperties properties;
     private Parameters parameters;
 
@@ -202,7 +201,6 @@ public interface RaftServer extends Closeable, RpcType.Get,
       return newRaftServer(
           serverId,
           group,
-          option,
           Objects.requireNonNull(stateMachineRegistry , "Neither 'stateMachine' nor 'setStateMachineRegistry' " +
               "is initialized."),
           Objects.requireNonNull(properties, "The 'properties' field is not initialized."),
@@ -232,12 +230,6 @@ public interface RaftServer extends Closeable, RpcType.Get,
       return this;
     }
 
-    /** Set the startup option for the group. */
-    public Builder setOption(RaftStorage.StartupOption option) {
-      this.option = option;
-      return this;
-    }
-
     /** Set {@link RaftProperties}. */
     public Builder setProperties(RaftProperties properties) {
       this.properties = properties;
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java b/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
index 59d87c37a..dde3c31bc 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
@@ -62,7 +62,8 @@ public interface RaftStorage extends Closeable {
 
     private static Method initNewRaftStorageMethod() {
       final String className = RaftStorage.class.getPackage().getName() + ".StorageImplUtils";
-      final Class<?>[] argClasses = {File.class, SizeInBytes.class, StartupOption.class, CorruptionPolicy.class};
+      //final String className = "org.apache.ratis.server.storage.RaftStorageImpl";
+      final Class<?>[] argClasses = { File.class, CorruptionPolicy.class, StartupOption.class, long.class };
       try {
         final Class<?> clazz = ReflectionUtils.getClassByName(className);
         return clazz.getMethod("newRaftStorage", argClasses);
@@ -75,7 +76,7 @@ public interface RaftStorage extends Closeable {
         StartupOption option, SizeInBytes storageFreeSpaceMin) throws IOException {
       try {
         return (RaftStorage) NEW_RAFT_STORAGE_METHOD.invoke(null,
-            dir, storageFreeSpaceMin, option, logCorruptionPolicy);
+            dir, logCorruptionPolicy, option, storageFreeSpaceMin.getSize());
       } catch (IllegalAccessException e) {
         throw new IllegalStateException("Failed to build " + dir, e);
       } catch (InvocationTargetException e) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index ee7b749dd..1994fa8de 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -189,8 +189,7 @@ class RaftServerImpl implements RaftServer.Division,
 
   private final AtomicBoolean firstElectionSinceStartup = new AtomicBoolean(true);
 
-  RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy, RaftStorage.StartupOption option)
-      throws IOException {
+  RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy) throws IOException {
     final RaftPeerId id = proxy.getId();
     LOG.info("{}: new RaftServerImpl for {} with {}", id, group, stateMachine);
     this.lifeCycle = new LifeCycle(id);
@@ -203,7 +202,7 @@ class RaftServerImpl implements RaftServer.Division,
     this.sleepDeviationThreshold = RaftServerConfigKeys.sleepDeviationThreshold(properties);
     this.proxy = proxy;
 
-    this.state = new ServerState(id, group, stateMachine, this, option, properties);
+    this.state = new ServerState(id, group, properties, this, stateMachine);
     this.retryCache = new RetryCacheImpl(properties);
     this.dataStreamMap = new DataStreamMapImpl(id);
 
@@ -576,8 +575,8 @@ class RaftServerImpl implements RaftServer.Division,
   }
 
   GroupInfoReply getGroupInfo(GroupInfoRequest request) {
-    final RaftStorageDirectory dir = state.getStorage().getStorageDir();
-    return new GroupInfoReply(request, getCommitInfos(), getGroup(), getRoleInfoProto(), dir.isHealthy());
+    return new GroupInfoReply(request, getCommitInfos(),
+        getGroup(), getRoleInfoProto(), state.getStorage().getStorageDir().isHealthy());
   }
 
   RoleInfoProto getRoleInfoProto() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 0825e7d12..b8cee7f53 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -38,7 +38,6 @@ import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.ServerFactory;
-import org.apache.ratis.server.storage.RaftStorage.StartupOption;
 import org.apache.ratis.util.ConcurrentUtils;
 import org.apache.ratis.util.JvmPauseMonitor;
 import org.apache.ratis.server.RaftServerConfigKeys;
@@ -81,7 +80,7 @@ class RaftServerProxy implements RaftServer {
     private final ConcurrentMap<RaftGroupId, CompletableFuture<RaftServerImpl>> map = new ConcurrentHashMap<>();
     private boolean isClosed = false;
 
-    synchronized CompletableFuture<RaftServerImpl> addNew(RaftGroup group, StartupOption option) {
+    synchronized CompletableFuture<RaftServerImpl> addNew(RaftGroup group) {
       if (isClosed) {
         return JavaUtils.completeExceptionally(new AlreadyClosedException(
             getId() + ": Failed to add " + group + " since the server is already closed"));
@@ -91,7 +90,7 @@ class RaftServerProxy implements RaftServer {
             getId() + ": Failed to add " + group + " since the group already exists in the map."));
       }
       final RaftGroupId groupId = group.getGroupId();
-      final CompletableFuture<RaftServerImpl> newImpl = newRaftServerImpl(group, option);
+      final CompletableFuture<RaftServerImpl> newImpl = newRaftServerImpl(group);
       final CompletableFuture<RaftServerImpl> previous = map.put(groupId, newImpl);
       Preconditions.assertNull(previous, "previous");
       LOG.info("{}: addNew {} returns {}", getId(), group, toString(groupId, newImpl));
@@ -231,7 +230,7 @@ class RaftServerProxy implements RaftServer {
   }
 
   /** Check the storage dir and add groups*/
-  void initGroups(RaftGroup group, StartupOption option) {
+  void initGroups(RaftGroup group) {
     final Optional<RaftGroup> raftGroup = Optional.ofNullable(group);
     final RaftGroupId raftGroupId = raftGroup.map(RaftGroup::getGroupId).orElse(null);
     final Predicate<RaftGroupId> shouldAdd = gid -> gid != null && !gid.equals(raftGroupId);
@@ -242,7 +241,7 @@ class RaftServerProxy implements RaftServer {
             .filter(File::isDirectory)
             .forEach(sub -> initGroupDir(sub, shouldAdd)),
         executor).join();
-    raftGroup.ifPresent(g -> addGroup(g, option));
+    raftGroup.ifPresent(this::addGroup);
   }
 
   private void initGroupDir(File sub, Predicate<RaftGroupId> shouldAdd) {
@@ -256,7 +255,7 @@ class RaftServerProxy implements RaftServer {
             " ignoring it. ", getId(), sub.getAbsolutePath());
       }
       if (shouldAdd.test(groupId)) {
-        addGroup(RaftGroup.valueOf(groupId), StartupOption.RECOVER);
+        addGroup(RaftGroup.valueOf(groupId));
       }
     } catch (Exception e) {
       LOG.warn(getId() + ": Failed to initialize the group directory "
@@ -270,11 +269,11 @@ class RaftServerProxy implements RaftServer {
     getDataStreamServerRpc().addRaftPeers(others);
   }
 
-  private CompletableFuture<RaftServerImpl> newRaftServerImpl(RaftGroup group, StartupOption option) {
+  private CompletableFuture<RaftServerImpl> newRaftServerImpl(RaftGroup group) {
     return CompletableFuture.supplyAsync(() -> {
       try {
         addRaftPeers(group.getPeers());
-        return new RaftServerImpl(group, stateMachineRegistry.apply(group.getGroupId()), this, option);
+        return new RaftServerImpl(group, stateMachineRegistry.apply(group.getGroupId()), this);
       } catch(IOException e) {
         throw new CompletionException(getId() + ": Failed to initialize server for " + group, e);
       }
@@ -342,8 +341,8 @@ class RaftServerProxy implements RaftServer {
     return dataStreamServerRpc;
   }
 
-  private CompletableFuture<RaftServerImpl> addGroup(RaftGroup group, StartupOption option) {
-    return impls.addNew(group, option);
+  private CompletableFuture<RaftServerImpl> addGroup(RaftGroup group) {
+    return impls.addNew(group);
   }
 
   private CompletableFuture<RaftServerImpl> getImplFuture(RaftGroupId groupId) {
@@ -467,7 +466,7 @@ class RaftServerProxy implements RaftServer {
     }
     final GroupManagementRequest.Add add = request.getAdd();
     if (add != null) {
-      return groupAddAsync(request, add.getGroup(), add.isFormat());
+      return groupAddAsync(request, add.getGroup());
     }
     final GroupManagementRequest.Remove remove = request.getRemove();
     if (remove != null) {
@@ -478,13 +477,12 @@ class RaftServerProxy implements RaftServer {
         getId() + ": Request not supported " + request));
   }
 
-  private CompletableFuture<RaftClientReply> groupAddAsync(
-      GroupManagementRequest request, RaftGroup newGroup, boolean format) {
+  private CompletableFuture<RaftClientReply> groupAddAsync(GroupManagementRequest request, RaftGroup newGroup) {
     if (!request.getRaftGroupId().equals(newGroup.getGroupId())) {
       return JavaUtils.completeExceptionally(new GroupMismatchException(
           getId() + ": Request group id (" + request.getRaftGroupId() + ") does not match the new group " + newGroup));
     }
-    return impls.addNew(newGroup, format? StartupOption.FORMAT: StartupOption.RECOVER)
+    return impls.addNew(newGroup)
         .thenApplyAsync(newImpl -> {
           LOG.debug("{}: newImpl = {}", getId(), newImpl);
           try {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index 6e1ddd548..6777b9093 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -26,7 +26,6 @@ import org.apache.ratis.server.RaftConfiguration;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.RaftLog;
-import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.JavaUtils;
@@ -46,7 +45,7 @@ public final class ServerImplUtils {
 
   /** Create a {@link RaftServerProxy}. */
   public static RaftServerProxy newRaftServer(
-      RaftPeerId id, RaftGroup group, RaftStorage.StartupOption option, StateMachine.Registry stateMachineRegistry,
+      RaftPeerId id, RaftGroup group, StateMachine.Registry stateMachineRegistry,
       RaftProperties properties, Parameters parameters) throws IOException {
     RaftServer.LOG.debug("newRaftServer: {}, {}", id, group);
     if (group != null && !group.getPeers().isEmpty()) {
@@ -54,7 +53,7 @@ public final class ServerImplUtils {
       Preconditions.assertNotNull(group.getPeer(id), "RaftPeerId %s is not in RaftGroup %s", id, group);
     }
     final RaftServerProxy proxy = newRaftServer(id, stateMachineRegistry, properties, parameters);
-    proxy.initGroups(group, option);
+    proxy.initGroups(group);
     return proxy;
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index e92f9b911..1ee2cab5e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -36,14 +36,20 @@ import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.MemoizedCheckedSupplier;
 import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.Timestamp;
 
+import java.io.Closeable;
+import java.io.File;
 import java.io.IOException;
+import java.nio.channels.OverlappingFileLockException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
@@ -57,7 +63,7 @@ import static org.apache.ratis.server.RaftServer.Division.LOG;
 /**
  * Common states of a raft peer. Protected by RaftServer's lock.
  */
-class ServerState {
+class ServerState implements Closeable {
   private final RaftGroupMemberId memberId;
   private final RaftServerImpl server;
   /** Raft log */
@@ -67,7 +73,7 @@ class ServerState {
   /** The thread that applies committed log entries to the state machine */
   private final MemoizedSupplier<StateMachineUpdater> stateMachineUpdater;
   /** local storage for log and snapshot */
-  private final MemoizedCheckedSupplier<RaftStorageImpl, IOException> raftStorage;
+  private RaftStorageImpl storage;
   private final SnapshotManager snapshotManager;
   private volatile Timestamp lastNoLeaderTime;
   private final TimeDuration noLeaderTimeout;
@@ -94,8 +100,9 @@ class ServerState {
    */
   private final AtomicReference<TermIndex> latestInstalledSnapshot = new AtomicReference<>();
 
-  ServerState(RaftPeerId id, RaftGroup group, StateMachine stateMachine, RaftServerImpl server,
-      RaftStorage.StartupOption option, RaftProperties prop) {
+  ServerState(RaftPeerId id, RaftGroup group, RaftProperties prop,
+              RaftServerImpl server, StateMachine stateMachine)
+      throws IOException {
     this.memberId = RaftGroupMemberId.valueOf(id, group.getGroupId());
     this.server = server;
     Collection<RaftPeer> followerPeers = group.getPeers().stream()
@@ -110,11 +117,36 @@ class ServerState {
     configurationManager = new ConfigurationManager(initialConf);
     LOG.info("{}: {}", getMemberId(), configurationManager);
 
-    final String storageDirName = group.getGroupId().getUuid().toString();
-    this.raftStorage = MemoizedCheckedSupplier.valueOf(
-        () -> StorageImplUtils.initRaftStorage(storageDirName, option, prop));
+    boolean storageFound = false;
+    List<File> directories = RaftServerConfigKeys.storageDir(prop);
+    while (!directories.isEmpty()) {
+      // use full uuid string to create a subdirectory
+      File dir = chooseStorageDir(directories, group.getGroupId().getUuid().toString());
+      try {
+        storage = (RaftStorageImpl) RaftStorage.newBuilder()
+            .setDirectory(dir)
+            .setOption(RaftStorage.StartupOption.RECOVER)
+            .setLogCorruptionPolicy(RaftServerConfigKeys.Log.corruptionPolicy(prop))
+            .setStorageFreeSpaceMin(RaftServerConfigKeys.storageFreeSpaceMin(prop))
+            .build();
+        storageFound = true;
+        break;
+      } catch (IOException e) {
+        if (e.getCause() instanceof OverlappingFileLockException) {
+          throw e;
+        }
+        LOG.warn("Failed to init RaftStorage under {} for {}: {}",
+            dir.getParent(), group.getGroupId().getUuid().toString(), e);
+        directories.removeIf(d -> d.getAbsolutePath().equals(dir.getParent()));
+      }
+    }
+
+    if (!storageFound) {
+      throw new IOException("No healthy directories found for RaftStorage among: " +
+          RaftServerConfigKeys.storageDir(prop));
+    }
 
-    this.snapshotManager = StorageImplUtils.newSnapshotManager(id);
+    snapshotManager = new SnapshotManager(storage, id);
 
     // On start the leader is null, start the clock now
     this.leaderId = null;
@@ -131,8 +163,7 @@ class ServerState {
   }
 
   void initialize(StateMachine stateMachine) throws IOException {
-    // initialize raft storage
-    final RaftStorageImpl storage = raftStorage.get();
+    storage.initialize();
     // read configuration from the storage
     Optional.ofNullable(storage.readRaftConfiguration()).ifPresent(this::setRaftConf);
 
@@ -149,8 +180,32 @@ class ServerState {
     return memberId;
   }
 
+  static File chooseStorageDir(List<File> volumes, String targetSubDir) throws IOException {
+    final Map<File, Integer> numberOfStorageDirPerVolume = new HashMap<>();
+    final File[] empty = {};
+    final List<File> resultList = new ArrayList<>();
+    volumes.stream().flatMap(volume -> {
+      final File[] dirs = Optional.ofNullable(volume.listFiles()).orElse(empty);
+      numberOfStorageDirPerVolume.put(volume, dirs.length);
+      return Arrays.stream(dirs);
+    }).filter(dir -> targetSubDir.equals(dir.getName()))
+        .forEach(resultList::add);
+
+    if (resultList.size() > 1) {
+      throw new IOException("More than one directories found for " + targetSubDir + ": " + resultList);
+    }
+    if (resultList.size() == 1) {
+      return resultList.get(0);
+    }
+    return numberOfStorageDirPerVolume.entrySet().stream()
+        .min(Map.Entry.comparingByValue())
+        .map(Map.Entry::getKey)
+        .map(v -> new File(v, targetSubDir))
+        .orElseThrow(() -> new IOException("No storage directory found."));
+  }
+
   void writeRaftConfiguration(LogEntryProto conf) {
-    getStorage().writeRaftConfiguration(conf);
+    storage.writeRaftConfiguration(conf);
   }
 
   void start() {
@@ -159,8 +214,7 @@ class ServerState {
 
   private RaftLog initRaftLog(LongSupplier getSnapshotIndexFromStateMachine, RaftProperties prop) {
     try {
-      return initRaftLog(getMemberId(), server, getStorage(), this::setRaftConf,
-          getSnapshotIndexFromStateMachine, prop);
+      return initRaftLog(getMemberId(), server, storage, this::setRaftConf, getSnapshotIndexFromStateMachine, prop);
     } catch (IOException e) {
       throw new IllegalStateException(getMemberId() + ": Failed to initRaftLog.", e);
     }
@@ -205,6 +259,10 @@ class ServerState {
     return leaderId;
   }
 
+  boolean hasLeader() {
+    return leaderId != null;
+  }
+
   /**
    * Become a candidate and start leader election
    */
@@ -376,7 +434,7 @@ class ServerState {
   void updateConfiguration(List<LogEntryProto> entries) {
     if (entries != null && !entries.isEmpty()) {
       configurationManager.removeConfigurations(entries.get(0).getIndex());
-      entries.forEach(this::setRaftConf);
+      entries.stream().forEach(this::setRaftConf);
     }
   }
 
@@ -397,45 +455,29 @@ class ServerState {
     getStateMachineUpdater().reloadStateMachine();
   }
 
-  void close() {
-    try {
-      if (stateMachineUpdater.isInitialized()) {
-        getStateMachineUpdater().stopAndJoin();
-      }
-    } catch (Throwable e) {
-      LOG.warn(getMemberId() + ": Failed to join " + getStateMachineUpdater(), e);
-    }
-    LOG.info("{}: applyIndex: {}", getMemberId(), getLastAppliedIndex());
-
+  @Override
+  public void close() throws IOException {
     try {
-      if (log.isInitialized()) {
-        getLog().close();
-      }
-    } catch (Throwable e) {
-      LOG.warn(getMemberId() + ": Failed to close raft log " + getLog(), e);
+      getStateMachineUpdater().stopAndJoin();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.warn("{}: Interrupted when joining stateMachineUpdater", getMemberId(), e);
     }
+    LOG.info("{}: closes. applyIndex: {}", getMemberId(), getLastAppliedIndex());
 
-    try {
-      if (raftStorage.isInitialized()) {
-        getStorage().close();
-      }
-    } catch (Throwable e) {
-      LOG.warn(getMemberId() + ": Failed to close raft storage " + getStorage(), e);
-    }
+    getLog().close();
+    storage.close();
   }
 
-  RaftStorageImpl getStorage() {
-    if (!raftStorage.isInitialized()) {
-      throw new IllegalStateException(getMemberId() + ": raftStorage is uninitialized.");
-    }
-    return raftStorage.getUnchecked();
+  RaftStorage getStorage() {
+    return storage;
   }
 
   void installSnapshot(InstallSnapshotRequestProto request) throws IOException {
     // TODO: verify that we need to install the snapshot
     StateMachine sm = server.getStateMachine();
     sm.pause(); // pause the SM to prepare for install snapshot
-    snapshotManager.installSnapshot(request, sm, getStorage().getStorageDir());
+    snapshotManager.installSnapshot(sm, request);
     updateInstalledSnapshotIndex(TermIndex.valueOf(request.getSnapshotChunk().getTermIndex()));
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java
index a86cdf56b..a7fa13b53 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java
@@ -19,7 +19,6 @@ package org.apache.ratis.server.storage;
 
 import org.apache.ratis.util.AtomicFileOutputStream;
 import org.apache.ratis.util.FileUtils;
-import org.apache.ratis.util.SizeInBytes;
 
 import java.io.File;
 import java.io.IOException;
@@ -52,13 +51,13 @@ class RaftStorageDirectoryImpl implements RaftStorageDirectory {
 
   private final File root; // root directory
   private FileLock lock;   // storage lock
-  private final SizeInBytes freeSpaceMin;
+  private long freeSpaceMin;
 
   /**
    * Constructor
    * @param dir directory corresponding to the storage
    */
-  RaftStorageDirectoryImpl(File dir, SizeInBytes freeSpaceMin) {
+  RaftStorageDirectoryImpl(File dir, long freeSpaceMin) {
     this.root = dir;
     this.lock = null;
     this.freeSpaceMin = freeSpaceMin;
@@ -178,7 +177,7 @@ class RaftStorageDirectoryImpl implements RaftStorageDirectory {
   }
 
   private boolean hasEnoughSpace() {
-    return root.getFreeSpace() >= freeSpaceMin.getSize();
+    return root.getFreeSpace() > freeSpaceMin;
   }
 
   /**
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java
index 56972c3f7..6513efd30 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java
@@ -23,7 +23,6 @@ import org.apache.ratis.server.RaftServerConfigKeys.Log.CorruptionPolicy;
 import org.apache.ratis.server.raftlog.LogProtoUtils;
 import org.apache.ratis.server.storage.RaftStorageDirectoryImpl.StorageState;
 import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.SizeInBytes;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -35,16 +34,17 @@ import java.util.Optional;
 
 /** The storage of a {@link org.apache.ratis.server.RaftServer}. */
 public class RaftStorageImpl implements RaftStorage {
+
+  // TODO support multiple storage directories
   private final RaftStorageDirectoryImpl storageDir;
   private final StartupOption startupOption;
   private final CorruptionPolicy logCorruptionPolicy;
   private volatile StorageState state = StorageState.UNINITIALIZED;
   private volatile RaftStorageMetadataFileImpl metaFile;
 
-  RaftStorageImpl(File dir, SizeInBytes freeSpaceMin, StartupOption option, CorruptionPolicy logCorruptionPolicy) {
-    LOG.debug("newRaftStorage: {}, freeSpaceMin={}, option={}, logCorruptionPolicy={}",
-        dir, freeSpaceMin, option, logCorruptionPolicy);
-    this.storageDir = new RaftStorageDirectoryImpl(dir, freeSpaceMin);
+  RaftStorageImpl(File dir, CorruptionPolicy logCorruptionPolicy, StartupOption option,
+      long storageFeeSpaceMin) {
+    this.storageDir = new RaftStorageDirectoryImpl(dir, storageFeeSpaceMin);
     this.logCorruptionPolicy = Optional.ofNullable(logCorruptionPolicy).orElseGet(CorruptionPolicy::getDefault);
     this.startupOption = option;
   }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
index 17294b572..aaa62a783 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
@@ -54,17 +54,21 @@ public class SnapshotManager {
   private static final String CORRUPT = ".corrupt";
   private static final String TMP = ".tmp";
 
+  private final RaftStorage storage;
   private final RaftPeerId selfId;
   private final Supplier<MessageDigest> digester = JavaUtils.memoize(MD5Hash::getDigester);
 
-  SnapshotManager(RaftPeerId selfId) {
+  public SnapshotManager(RaftStorage storage, RaftPeerId selfId) {
+    this.storage = storage;
     this.selfId = selfId;
   }
 
-  public void installSnapshot(InstallSnapshotRequestProto request, StateMachine stateMachine, RaftStorageDirectory dir)
-      throws IOException {
-    final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk();
+  public void installSnapshot(StateMachine stateMachine,
+      InstallSnapshotRequestProto request) throws IOException {
+    final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest =
+        request.getSnapshotChunk();
     final long lastIncludedIndex = snapshotChunkRequest.getTermIndex().getIndex();
+    final RaftStorageDirectory dir = storage.getStorageDir();
 
     // create a unique temporary directory
     final File tmpDir =  new File(dir.getTmpDir(), "snapshot-" + snapshotChunkRequest.getRequestId());
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java
index 865e2b2b1..aeff60148 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java
@@ -17,158 +17,38 @@
  */
 package org.apache.ratis.server.storage;
 
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.RaftServerConfigKeys.Log;
-import org.apache.ratis.server.storage.RaftStorage.StartupOption;
-import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.TimeDuration;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-import static org.apache.ratis.server.RaftServer.Division.LOG;
+import java.util.concurrent.TimeUnit;
 
 public final class StorageImplUtils {
-  private static final File[] EMPTY_FILE_ARRAY = {};
 
   private StorageImplUtils() {
     //Never constructed
   }
 
-  public static SnapshotManager newSnapshotManager(RaftPeerId id) {
-    return new SnapshotManager(id);
-  }
-
   /** Create a {@link RaftStorageImpl}. */
-  public static RaftStorageImpl newRaftStorage(File dir, SizeInBytes freeSpaceMin,
-      RaftStorage.StartupOption option, Log.CorruptionPolicy logCorruptionPolicy) {
-    return new RaftStorageImpl(dir, freeSpaceMin, option, logCorruptionPolicy);
-  }
-
-  /** @return a list of existing subdirectories matching the given storage directory name from the given volumes. */
-  static List<File> getExistingStorageSubs(List<File> volumes, String targetSubDir, Map<File, Integer> dirsPerVol) {
-    return volumes.stream().flatMap(volume -> {
-          final File[] dirs = Optional.ofNullable(volume.listFiles()).orElse(EMPTY_FILE_ARRAY);
-          Optional.ofNullable(dirsPerVol).ifPresent(map -> map.put(volume, dirs.length));
-          return Arrays.stream(dirs);
-        }).filter(dir -> targetSubDir.equals(dir.getName()))
-        .collect(Collectors.toList());
-  }
-
-  /** @return a volume with the min dirs. */
-  static File chooseMin(Map<File, Integer> dirsPerVol) throws IOException {
-    return dirsPerVol.entrySet().stream()
-        .min(Map.Entry.comparingByValue())
-        .map(Map.Entry::getKey)
-        .orElseThrow(() -> new IOException("No storage directory found."));
-  }
-
-  /**
-   * Choose a {@link RaftStorage} for the given storage directory name from the given configuration properties
-   * and then try to call {@link RaftStorage#initialize()}.
-   * <p />
-   * {@link StartupOption#FORMAT}:
-   * - When there are more than one existing directories, throw an exception.
-   * - When there is an existing directory, throw an exception.
-   * - When there is no existing directory, try to initialize a new directory from the list specified
-   *   in the configuration properties until a directory succeeded or all directories failed.
-   * <p />
-   * {@link StartupOption#RECOVER}:
-   * - When there are more than one existing directories, throw an exception.
-   * - When there is an existing directory, if it fails to initialize, throw an exception but not try a new directory.
-   * - When there is no existing directory, throw an exception.
-   *
-   * @param storageDirName the storage directory name
-   * @param option the startup option
-   * @param properties the configuration properties
-   * @return the chosen storage, which is initialized successfully.
-   */
-  public static RaftStorageImpl initRaftStorage(String storageDirName, StartupOption option,
-      RaftProperties properties) throws IOException {
-    return new Op(storageDirName, option, properties).run();
-  }
-
-  private static class Op {
-    private final String storageDirName;
-    private final StartupOption option;
-
-    private final SizeInBytes freeSpaceMin;
-    private final Log.CorruptionPolicy logCorruptionPolicy;
-    private final List<File> dirsInConf;
-
-    private final List<File> existingSubs;
-    private final Map<File, Integer> dirsPerVol = new HashMap<>();
-
-    Op(String storageDirName, StartupOption option, RaftProperties properties) {
-      this.storageDirName = storageDirName;
-      this.option = option;
-
-      this.freeSpaceMin = RaftServerConfigKeys.storageFreeSpaceMin(properties);
-      this.logCorruptionPolicy = RaftServerConfigKeys.Log.corruptionPolicy(properties);
-      this.dirsInConf = RaftServerConfigKeys.storageDir(properties);
-
-      this.existingSubs = getExistingStorageSubs(dirsInConf, this.storageDirName, dirsPerVol);
-    }
-
-    RaftStorageImpl run() throws IOException {
-      if (option == StartupOption.FORMAT) {
-        return format();
-      } else if (option == StartupOption.RECOVER) {
-        return recover();
-      } else {
-        throw new IllegalArgumentException("Illegal option: " + option);
-      }
-    }
-
-    private RaftStorageImpl format() throws IOException {
-      if (!existingSubs.isEmpty()) {
-        throw new IOException("Failed to " + option + ": One or more existing directories found " + existingSubs
-            + " for " + storageDirName);
-      }
-
-      for (; !dirsPerVol.isEmpty(); ) {
-        final File vol = chooseMin(dirsPerVol);
-        final File dir = new File(vol, storageDirName);
-        try {
-          final RaftStorageImpl storage = newRaftStorage(dir, freeSpaceMin, StartupOption.FORMAT, logCorruptionPolicy);
-          storage.initialize();
-          return storage;
-        } catch (Throwable e) {
-          LOG.warn("Failed to initialize a new directory " + dir.getAbsolutePath(), e);
-          dirsPerVol.remove(vol);
-        }
-      }
-      throw new IOException("Failed to FORMAT a new storage dir for " + storageDirName + " from " + dirsInConf);
-    }
-
-    private RaftStorageImpl recover() throws IOException {
-      final int size = existingSubs.size();
-      if (size > 1) {
-        throw new IOException("Failed to " + option + ": More than one existing directories found "
-            + existingSubs + " for " + storageDirName);
-      } else if (size == 0) {
-        throw new IOException("Failed to " + option + ": Storage directory not found for "
-            + storageDirName + " from " + dirsInConf);
-      }
-
-      final File dir = existingSubs.get(0);
-      try {
-        final RaftStorageImpl storage = newRaftStorage(dir, freeSpaceMin, StartupOption.RECOVER, logCorruptionPolicy);
-        storage.initialize();
-        return storage;
-      } catch (Throwable e) {
-        if (e instanceof IOException) {
-          throw e;
-        }
-        throw new IOException("Failed to initialize the existing directory " + dir.getAbsolutePath(), e);
-      }
+  public static RaftStorageImpl newRaftStorage(File dir, RaftServerConfigKeys.Log.CorruptionPolicy logCorruptionPolicy,
+      RaftStorage.StartupOption option, long storageFeeSpaceMin) throws IOException {
+    RaftStorage.LOG.debug("newRaftStorage: {}, {}, {}, {}",dir, logCorruptionPolicy, option, storageFeeSpaceMin);
+
+    final TimeDuration sleepTime = TimeDuration.valueOf(500, TimeUnit.MILLISECONDS);
+    final RaftStorageImpl raftStorage;
+    try {
+      // attempt multiple times to avoid temporary bind exception
+      raftStorage = JavaUtils.attemptRepeatedly(
+          () -> new RaftStorageImpl(dir, logCorruptionPolicy, option, storageFeeSpaceMin),
+          5, sleepTime, "new RaftStorageImpl", RaftStorage.LOG);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw IOUtils.toInterruptedIOException(
+          "Interrupted when creating RaftStorage " + dir, e);
     }
+    return raftStorage;
   }
 }
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index dfed96952..1f4047524 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -40,7 +40,6 @@ import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.ServerFactory;
 import org.apache.ratis.server.raftlog.memory.MemoryRaftLog;
 import org.apache.ratis.server.raftlog.RaftLog;
-import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.util.CollectionUtils;
@@ -386,9 +385,8 @@ public abstract class MiniRaftCluster implements Closeable {
       }
       final RaftProperties prop = new RaftProperties(properties);
       RaftServerConfigKeys.setStorageDir(prop, Collections.singletonList(dir));
-      return ServerImplUtils.newRaftServer(id, group,
-          format? RaftStorage.StartupOption.FORMAT: RaftStorage.StartupOption.RECOVER,
-          getStateMachineRegistry(prop), prop, setPropertiesAndInitParameters(id, group, prop));
+      return ServerImplUtils.newRaftServer(id, group, getStateMachineRegistry(prop), prop,
+          setPropertiesAndInitParameters(id, group, prop));
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index 5e6353f92..e09ca19d1 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -365,7 +365,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
       // start the two new peers
       LOG.info("Start new peers");
       for (RaftPeer np : c1.newPeers) {
-        cluster.restartServer(np.getId(), true);
+        cluster.restartServer(np.getId(), false);
       }
       Assert.assertTrue(client.admin().setConfiguration(c1.allPeersInNewConf).isSuccess());
     }
@@ -504,7 +504,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
 
       LOG.info("start new peers: {}", Arrays.asList(c1.newPeers));
       for (RaftPeer np : c1.newPeers) {
-        cluster.restartServer(np.getId(), true);
+        cluster.restartServer(np.getId(), false);
       }
 
       try {
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestStorageImplUtils.java b/ratis-test/src/test/java/org/apache/ratis/server/impl/TestServerState.java
similarity index 82%
rename from ratis-test/src/test/java/org/apache/ratis/server/storage/TestStorageImplUtils.java
rename to ratis-test/src/test/java/org/apache/ratis/server/impl/TestServerState.java
index ff38a6bd9..75aef53a1 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestStorageImplUtils.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/impl/TestServerState.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ratis.server.storage;
+package org.apache.ratis.server.impl;
 
 import org.apache.ratis.BaseTest;
 import org.apache.ratis.util.FileUtils;
@@ -28,9 +28,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Supplier;
@@ -39,20 +37,13 @@ import java.util.stream.IntStream;
 /**
  * Test cases to verify ServerState.
  */
-public class TestStorageImplUtils {
+public class TestServerState {
 
   private static final Supplier<File> rootTestDir = JavaUtils.memoize(
       () -> new File(BaseTest.getRootTestDir(),
-          JavaUtils.getClassSimpleName(TestStorageImplUtils.class) +
+          JavaUtils.getClassSimpleName(TestServerState.class) +
               Integer.toHexString(ThreadLocalRandom.current().nextInt())));
 
-  static File chooseNewStorageDir(List<File> volumes, String sub) throws IOException {
-    final Map<File, Integer> numDirPerVolume = new HashMap<>();
-    StorageImplUtils.getExistingStorageSubs(volumes, sub, numDirPerVolume);
-    final File vol = StorageImplUtils.chooseMin(numDirPerVolume);
-    return new File(vol, sub);
-  }
-
   @AfterClass
   public static void tearDown() throws IOException {
     FileUtils.deleteFully(rootTestDir.get());
@@ -69,8 +60,8 @@ public class TestStorageImplUtils {
     List<File> directories = Collections.singletonList(testDir);
     String subDirOne = UUID.randomUUID().toString();
     String subDirTwo = UUID.randomUUID().toString();
-    final File storageDirOne = chooseNewStorageDir(directories, subDirOne);
-    final File storageDirTwo = chooseNewStorageDir(directories, subDirTwo);
+    File storageDirOne = ServerState.chooseStorageDir(directories, subDirOne);
+    File storageDirTwo = ServerState.chooseStorageDir(directories, subDirTwo);
     File expectedOne = new File(testDir, subDirOne);
     File expectedTwo = new File(testDir, subDirTwo);
     Assert.assertEquals(expectedOne.getCanonicalPath(),
@@ -109,7 +100,7 @@ public class TestStorageImplUtils {
               }
             });
     String subDir = UUID.randomUUID().toString();
-    final File storageDirectory = chooseNewStorageDir(directories, subDir);
+    File storageDirectory = ServerState.chooseStorageDir(directories, subDir);
     File expected = new File(directories.get(6), subDir);
     Assert.assertEquals(expected.getCanonicalPath(),
         storageDirectory.getCanonicalPath());
@@ -117,15 +108,19 @@ public class TestStorageImplUtils {
 
   /**
    * Tests choosing of storage directory when only no volume is configured.
+   *
+   * @throws IOException in case of exception.
    */
   @Test
   public void testChooseStorageDirWithNoVolume() {
     try {
-      chooseNewStorageDir(Collections.emptyList(), UUID.randomUUID().toString());
+      ServerState.chooseStorageDir(
+          Collections.emptyList(), UUID.randomUUID().toString());
       Assert.fail();
     } catch (IOException ex) {
       String expectedErrMsg = "No storage directory found.";
       Assert.assertEquals(expectedErrMsg, ex.getMessage());
     }
   }
+
 }
\ No newline at end of file
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
index 7027bd8ea..ffc8de597 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
@@ -114,7 +114,7 @@ public class TestRaftStorage extends BaseTest {
    */
   @Test
   public void testStorage() throws Exception {
-    final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(storageDir, SizeInBytes.ZERO);
+    final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(storageDir, 0);
     try {
       StorageState state = sd.analyzeStorage(true);
       Assert.assertEquals(StorageState.NOT_FORMATTED, state);
@@ -171,7 +171,7 @@ public class TestRaftStorage extends BaseTest {
     Assert.assertEquals(StorageState.NORMAL, storage.getState());
     storage.close();
 
-    final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(storageDir, SizeInBytes.ZERO);
+    final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(storageDir, 0);
     File metaFile = sd.getMetaFile();
     FileUtils.move(metaFile, sd.getMetaTmpFile());
 
@@ -286,7 +286,7 @@ public class TestRaftStorage extends BaseTest {
     File mockStorageDir = Mockito.spy(storageDir);
     Mockito.when(mockStorageDir.getFreeSpace()).thenReturn(100L);  // 100B
 
-    final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(mockStorageDir, SizeInBytes.valueOf("100M"));
+    final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(mockStorageDir, 104857600); // 100MB
     StorageState state = sd.analyzeStorage(false);
     Assert.assertEquals(StorageState.NO_SPACE, state);
   }