You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/08/26 01:50:19 UTC
[ratis] branch master updated: RATIS-1677. Do not auto format RaftStorage in RECOVER. (#718)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new ebb39e840 RATIS-1677. Do not auto format RaftStorage in RECOVER. (#718)
ebb39e840 is described below
commit ebb39e840e6d13245f76a044c1d60bafc7ca44cf
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Aug 25 18:50:14 2022 -0700
RATIS-1677. Do not auto format RaftStorage in RECOVER. (#718)
---
.../ratis/client/api/GroupManagementApi.java | 12 +-
.../apache/ratis/client/impl/ClientProtoUtils.java | 9 +-
.../ratis/client/impl/GroupManagementImpl.java | 4 +-
.../ratis/protocol/GroupManagementRequest.java | 13 +-
.../apache/ratis/util/MemoizedCheckedSupplier.java | 90 ++++++++++++
.../java/org/apache/ratis/util/Preconditions.java | 5 +
.../java/org/apache/ratis/util/SizeInBytes.java | 1 +
ratis-proto/src/main/proto/Raft.proto | 1 +
.../java/org/apache/ratis/server/RaftServer.java | 16 ++-
.../apache/ratis/server/storage/RaftStorage.java | 5 +-
.../apache/ratis/server/impl/RaftServerImpl.java | 9 +-
.../apache/ratis/server/impl/RaftServerProxy.java | 26 ++--
.../apache/ratis/server/impl/ServerImplUtils.java | 5 +-
.../org/apache/ratis/server/impl/ServerState.java | 128 ++++++-----------
.../server/storage/RaftStorageDirectoryImpl.java | 7 +-
.../ratis/server/storage/RaftStorageImpl.java | 10 +-
.../ratis/server/storage/SnapshotManager.java | 12 +-
.../ratis/server/storage/StorageImplUtils.java | 160 ++++++++++++++++++---
.../apache/ratis/server/impl/MiniRaftCluster.java | 6 +-
.../server/impl/RaftReconfigurationBaseTest.java | 4 +-
.../ratis/server/storage/TestRaftStorage.java | 6 +-
.../TestStorageImplUtils.java} | 27 ++--
22 files changed, 382 insertions(+), 174 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/GroupManagementApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/GroupManagementApi.java
index 1d3bc00b1..558747048 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/GroupManagementApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/GroupManagementApi.java
@@ -29,8 +29,16 @@ import java.io.IOException;
* APIs to support group management operations such as add, remove, list and info to a particular server.
*/
public interface GroupManagementApi {
- /** Add a new group. */
- RaftClientReply add(RaftGroup newGroup) throws IOException;
+ /**
+ * Add a new group.
+ * @param format Should it format the storage?
+ */
+ RaftClientReply add(RaftGroup newGroup, boolean format) throws IOException;
+
+ /** The same as add(newGroup, true). */
+ default RaftClientReply add(RaftGroup newGroup) throws IOException {
+ return add(newGroup, true);
+ }
/** Remove a group. */
RaftClientReply remove(RaftGroupId groupId, boolean deleteDirectory, boolean renameDirectory) throws IOException;
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 859e1d4f0..1ac825850 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -571,8 +571,9 @@ public interface ClientProtoUtils {
final RaftPeerId serverId = RaftPeerId.valueOf(m.getReplyId());
switch(p.getOpCase()) {
case GROUPADD:
+ final GroupAddRequestProto add = p.getGroupAdd();
return GroupManagementRequest.newAdd(clientId, serverId, m.getCallId(),
- ProtoUtils.toRaftGroup(p.getGroupAdd().getGroup()));
+ ProtoUtils.toRaftGroup(add.getGroup()), add.getFormat());
case GROUPREMOVE:
final GroupRemoveRequestProto remove = p.getGroupRemove();
return GroupManagementRequest.newRemove(clientId, serverId, m.getCallId(),
@@ -609,8 +610,10 @@ public interface ClientProtoUtils {
.setRpcRequest(toRaftRpcRequestProtoBuilder(request));
final GroupManagementRequest.Add add = request.getAdd();
if (add != null) {
- b.setGroupAdd(GroupAddRequestProto.newBuilder().setGroup(
- ProtoUtils.toRaftGroupProtoBuilder(add.getGroup())).build());
+ b.setGroupAdd(GroupAddRequestProto.newBuilder()
+ .setGroup(ProtoUtils.toRaftGroupProtoBuilder(add.getGroup()))
+ .setFormat(add.isFormat())
+ .build());
}
final GroupManagementRequest.Remove remove = request.getRemove();
if (remove != null) {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
index 27e0bbffc..9501bc2ea 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
@@ -43,13 +43,13 @@ class GroupManagementImpl implements GroupManagementApi {
}
@Override
- public RaftClientReply add(RaftGroup newGroup) throws IOException {
+ public RaftClientReply add(RaftGroup newGroup, boolean format) throws IOException {
Objects.requireNonNull(newGroup, "newGroup == null");
final long callId = CallId.getAndIncrement();
client.getClientRpc().addRaftPeers(newGroup.getPeers());
return client.io().sendRequestWithRetry(
- () -> GroupManagementRequest.newAdd(client.getId(), server, callId, newGroup));
+ () -> GroupManagementRequest.newAdd(client.getId(), server, callId, newGroup, format));
}
@Override
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java
index d370dfc4c..2783d2c65 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java
@@ -26,9 +26,11 @@ public final class GroupManagementRequest extends RaftClientRequest {
public static class Add extends Op {
private final RaftGroup group;
+ private final boolean format;
- public Add(RaftGroup group) {
+ public Add(RaftGroup group, boolean format) {
this.group = group;
+ this.format = format;
}
@Override
@@ -40,6 +42,10 @@ public final class GroupManagementRequest extends RaftClientRequest {
return group;
}
+ public boolean isFormat() {
+ return format;
+ }
+
@Override
public String toString() {
return JavaUtils.getClassSimpleName(getClass()) + ":" + getGroup();
@@ -79,8 +85,9 @@ public final class GroupManagementRequest extends RaftClientRequest {
}
}
- public static GroupManagementRequest newAdd(ClientId clientId, RaftPeerId serverId, long callId, RaftGroup group) {
- return new GroupManagementRequest(clientId, serverId, callId, new Add(group));
+ public static GroupManagementRequest newAdd(ClientId clientId, RaftPeerId serverId, long callId,
+ RaftGroup group, boolean format) {
+ return new GroupManagementRequest(clientId, serverId, callId, new Add(group, format));
}
public static GroupManagementRequest newRemove(ClientId clientId, RaftPeerId serverId, long callId,
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/MemoizedCheckedSupplier.java b/ratis-common/src/main/java/org/apache/ratis/util/MemoizedCheckedSupplier.java
new file mode 100644
index 000000000..cf2d06023
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/MemoizedCheckedSupplier.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.ratis.util.function.CheckedSupplier;
+
+import java.util.Objects;
+
+/**
+ * A memoized supplier is a {@link CheckedSupplier}
+ * which gets a value by invoking its initializer once.
+ * and then keeps returning the same value as its supplied results.
+ *
+ * This class is thread safe.
+ *
+ * @param <RETURN> The return type of the supplier.
+ * @param <THROW> The throwable type of the supplier.
+ */
+public final class MemoizedCheckedSupplier<RETURN, THROW extends Throwable>
+ implements CheckedSupplier<RETURN, THROW> {
+ /**
+ * @param supplier to supply at most one non-null value.
+ * @return a {@link MemoizedCheckedSupplier} with the given supplier.
+ */
+ public static <RETURN, THROW extends Throwable> MemoizedCheckedSupplier<RETURN, THROW> valueOf(
+ CheckedSupplier<RETURN, THROW> supplier) {
+ return supplier instanceof MemoizedCheckedSupplier ?
+ (MemoizedCheckedSupplier<RETURN, THROW>) supplier : new MemoizedCheckedSupplier<>(supplier);
+ }
+
+ private final CheckedSupplier<RETURN, THROW> initializer;
+ private volatile RETURN value = null;
+
+ /**
+ * Create a memoized supplier.
+ * @param initializer to supply at most one non-null value.
+ */
+ private MemoizedCheckedSupplier(CheckedSupplier<RETURN, THROW> initializer) {
+ Objects.requireNonNull(initializer, "initializer == null");
+ this.initializer = initializer;
+ }
+
+ /** @return the lazily initialized object. */
+ @Override
+ public RETURN get() throws THROW {
+ RETURN v = value;
+ if (v == null) {
+ synchronized (this) {
+ v = value;
+ if (v == null) {
+ v = value = Objects.requireNonNull(initializer.get(), "initializer.get() returns null");
+ }
+ }
+ }
+ return v;
+ }
+
+ /**
+ * @return the already initialized object.
+ * @throws NullPointerException if the object is uninitialized.
+ */
+ public RETURN getUnchecked() {
+ return Objects.requireNonNull(value, "value == null");
+ }
+
+ /** @return is the object initialized? */
+ public boolean isInitialized() {
+ return value != null;
+ }
+
+ @Override
+ public String toString() {
+ return isInitialized()? "Memoized:" + value: "UNINITIALIZED";
+ }
+}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
index e9a002612..902c1f5e6 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
@@ -19,6 +19,7 @@ package org.apache.ratis.util;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
@@ -113,6 +114,10 @@ public interface Preconditions {
return clazz.cast(object);
}
+ static <K, V> void assertEmpty(Map<K, V> map, Object name) {
+ assertTrue(map.isEmpty(), () -> "The " + name + " map is non-empty: " + map);
+ }
+
static <T> void assertUnique(Iterable<T> first) {
assertUnique(first, Collections.emptyList());
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java b/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java
index 25667a378..683f0da62 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java
@@ -23,6 +23,7 @@ import java.util.Objects;
* Size which may be constructed with a {@link TraditionalBinaryPrefix}.
*/
public final class SizeInBytes {
+ public static final SizeInBytes ZERO = valueOf(0);
public static final SizeInBytes ONE_KB = valueOf("1k");
public static final SizeInBytes ONE_MB = valueOf("1m");
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index c571f0c73..9fe2494bf 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -469,6 +469,7 @@ message StartLeaderElectionReplyProto {
// A request to add a new group
message GroupAddRequestProto {
RaftGroupProto group = 1; // the group to be added.
+ bool format = 2; // Should it format the storage?
}
message GroupRemoveRequestProto {
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
index e9719b96c..8d00d29db 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -167,8 +167,8 @@ public interface RaftServer extends Closeable, RpcType.Get,
private static Method initNewRaftServerMethod() {
final String className = RaftServer.class.getPackage().getName() + ".impl.ServerImplUtils";
- final Class<?>[] argClasses = {RaftPeerId.class, RaftGroup.class, StateMachine.Registry.class,
- RaftProperties.class, Parameters.class};
+ final Class<?>[] argClasses = {RaftPeerId.class, RaftGroup.class, RaftStorage.StartupOption.class,
+ StateMachine.Registry.class, RaftProperties.class, Parameters.class};
try {
final Class<?> clazz = ReflectionUtils.getClassByName(className);
return clazz.getMethod("newRaftServer", argClasses);
@@ -177,12 +177,12 @@ public interface RaftServer extends Closeable, RpcType.Get,
}
}
- private static RaftServer newRaftServer(RaftPeerId serverId, RaftGroup group,
+ private static RaftServer newRaftServer(RaftPeerId serverId, RaftGroup group, RaftStorage.StartupOption option,
StateMachine.Registry stateMachineRegistry, RaftProperties properties, Parameters parameters)
throws IOException {
try {
return (RaftServer) NEW_RAFT_SERVER_METHOD.invoke(null,
- serverId, group, stateMachineRegistry, properties, parameters);
+ serverId, group, option, stateMachineRegistry, properties, parameters);
} catch (IllegalAccessException e) {
throw new IllegalStateException("Failed to build " + serverId, e);
} catch (InvocationTargetException e) {
@@ -193,6 +193,7 @@ public interface RaftServer extends Closeable, RpcType.Get,
private RaftPeerId serverId;
private StateMachine.Registry stateMachineRegistry ;
private RaftGroup group = null;
+ private RaftStorage.StartupOption option = RaftStorage.StartupOption.RECOVER;
private RaftProperties properties;
private Parameters parameters;
@@ -201,6 +202,7 @@ public interface RaftServer extends Closeable, RpcType.Get,
return newRaftServer(
serverId,
group,
+ option,
Objects.requireNonNull(stateMachineRegistry , "Neither 'stateMachine' nor 'setStateMachineRegistry' " +
"is initialized."),
Objects.requireNonNull(properties, "The 'properties' field is not initialized."),
@@ -230,6 +232,12 @@ public interface RaftServer extends Closeable, RpcType.Get,
return this;
}
+ /** Set the startup option for the group. */
+ public Builder setOption(RaftStorage.StartupOption option) {
+ this.option = option;
+ return this;
+ }
+
/** Set {@link RaftProperties}. */
public Builder setProperties(RaftProperties properties) {
this.properties = properties;
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java b/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
index dde3c31bc..59d87c37a 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
@@ -62,8 +62,7 @@ public interface RaftStorage extends Closeable {
private static Method initNewRaftStorageMethod() {
final String className = RaftStorage.class.getPackage().getName() + ".StorageImplUtils";
- //final String className = "org.apache.ratis.server.storage.RaftStorageImpl";
- final Class<?>[] argClasses = { File.class, CorruptionPolicy.class, StartupOption.class, long.class };
+ final Class<?>[] argClasses = {File.class, SizeInBytes.class, StartupOption.class, CorruptionPolicy.class};
try {
final Class<?> clazz = ReflectionUtils.getClassByName(className);
return clazz.getMethod("newRaftStorage", argClasses);
@@ -76,7 +75,7 @@ public interface RaftStorage extends Closeable {
StartupOption option, SizeInBytes storageFreeSpaceMin) throws IOException {
try {
return (RaftStorage) NEW_RAFT_STORAGE_METHOD.invoke(null,
- dir, logCorruptionPolicy, option, storageFreeSpaceMin.getSize());
+ dir, storageFreeSpaceMin, option, logCorruptionPolicy);
} catch (IllegalAccessException e) {
throw new IllegalStateException("Failed to build " + dir, e);
} catch (InvocationTargetException e) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 2ee191c56..258e94d9f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -189,7 +189,8 @@ class RaftServerImpl implements RaftServer.Division,
private final AtomicBoolean firstElectionSinceStartup = new AtomicBoolean(true);
- RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy) throws IOException {
+ RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy, RaftStorage.StartupOption option)
+ throws IOException {
final RaftPeerId id = proxy.getId();
LOG.info("{}: new RaftServerImpl for {} with {}", id, group, stateMachine);
this.lifeCycle = new LifeCycle(id);
@@ -202,7 +203,7 @@ class RaftServerImpl implements RaftServer.Division,
this.sleepDeviationThreshold = RaftServerConfigKeys.sleepDeviationThreshold(properties);
this.proxy = proxy;
- this.state = new ServerState(id, group, properties, this, stateMachine);
+ this.state = new ServerState(id, group, stateMachine, this, option, properties);
this.retryCache = new RetryCacheImpl(properties);
this.dataStreamMap = new DataStreamMapImpl(id);
@@ -575,8 +576,8 @@ class RaftServerImpl implements RaftServer.Division,
}
GroupInfoReply getGroupInfo(GroupInfoRequest request) {
- return new GroupInfoReply(request, getCommitInfos(),
- getGroup(), getRoleInfoProto(), state.getStorage().getStorageDir().isHealthy());
+ final RaftStorageDirectory dir = state.getStorage().getStorageDir();
+ return new GroupInfoReply(request, getCommitInfos(), getGroup(), getRoleInfoProto(), dir.isHealthy());
}
RoleInfoProto getRoleInfoProto() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index ad4d988ab..ad488f53b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -38,6 +38,7 @@ import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.ServerFactory;
+import org.apache.ratis.server.storage.RaftStorage.StartupOption;
import org.apache.ratis.util.ConcurrentUtils;
import org.apache.ratis.util.JvmPauseMonitor;
import org.apache.ratis.server.RaftServerConfigKeys;
@@ -80,7 +81,7 @@ class RaftServerProxy implements RaftServer {
private final ConcurrentMap<RaftGroupId, CompletableFuture<RaftServerImpl>> map = new ConcurrentHashMap<>();
private boolean isClosed = false;
- synchronized CompletableFuture<RaftServerImpl> addNew(RaftGroup group) {
+ synchronized CompletableFuture<RaftServerImpl> addNew(RaftGroup group, StartupOption option) {
if (isClosed) {
return JavaUtils.completeExceptionally(new AlreadyClosedException(
getId() + ": Failed to add " + group + " since the server is already closed"));
@@ -90,7 +91,7 @@ class RaftServerProxy implements RaftServer {
getId() + ": Failed to add " + group + " since the group already exists in the map."));
}
final RaftGroupId groupId = group.getGroupId();
- final CompletableFuture<RaftServerImpl> newImpl = newRaftServerImpl(group);
+ final CompletableFuture<RaftServerImpl> newImpl = newRaftServerImpl(group, option);
final CompletableFuture<RaftServerImpl> previous = map.put(groupId, newImpl);
Preconditions.assertNull(previous, "previous");
LOG.info("{}: addNew {} returns {}", getId(), group, toString(groupId, newImpl));
@@ -230,7 +231,7 @@ class RaftServerProxy implements RaftServer {
}
/** Check the storage dir and add groups*/
- void initGroups(RaftGroup group) {
+ void initGroups(RaftGroup group, StartupOption option) {
final Optional<RaftGroup> raftGroup = Optional.ofNullable(group);
final RaftGroupId raftGroupId = raftGroup.map(RaftGroup::getGroupId).orElse(null);
final Predicate<RaftGroupId> shouldAdd = gid -> gid != null && !gid.equals(raftGroupId);
@@ -241,7 +242,7 @@ class RaftServerProxy implements RaftServer {
.filter(File::isDirectory)
.forEach(sub -> initGroupDir(sub, shouldAdd)),
executor).join();
- raftGroup.ifPresent(this::addGroup);
+ raftGroup.ifPresent(g -> addGroup(g, option));
}
private void initGroupDir(File sub, Predicate<RaftGroupId> shouldAdd) {
@@ -255,7 +256,7 @@ class RaftServerProxy implements RaftServer {
" ignoring it. ", getId(), sub.getAbsolutePath());
}
if (shouldAdd.test(groupId)) {
- addGroup(RaftGroup.valueOf(groupId));
+ addGroup(RaftGroup.valueOf(groupId), StartupOption.RECOVER);
}
} catch (Exception e) {
LOG.warn(getId() + ": Failed to initialize the group directory "
@@ -269,11 +270,11 @@ class RaftServerProxy implements RaftServer {
getDataStreamServerRpc().addRaftPeers(others);
}
- private CompletableFuture<RaftServerImpl> newRaftServerImpl(RaftGroup group) {
+ private CompletableFuture<RaftServerImpl> newRaftServerImpl(RaftGroup group, StartupOption option) {
return CompletableFuture.supplyAsync(() -> {
try {
addRaftPeers(group.getPeers());
- return new RaftServerImpl(group, stateMachineRegistry.apply(group.getGroupId()), this);
+ return new RaftServerImpl(group, stateMachineRegistry.apply(group.getGroupId()), this, option);
} catch(IOException e) {
throw new CompletionException(getId() + ": Failed to initialize server for " + group, e);
}
@@ -341,8 +342,8 @@ class RaftServerProxy implements RaftServer {
return dataStreamServerRpc;
}
- private CompletableFuture<RaftServerImpl> addGroup(RaftGroup group) {
- return impls.addNew(group);
+ private CompletableFuture<RaftServerImpl> addGroup(RaftGroup group, StartupOption option) {
+ return impls.addNew(group, option);
}
private CompletableFuture<RaftServerImpl> getImplFuture(RaftGroupId groupId) {
@@ -466,7 +467,7 @@ class RaftServerProxy implements RaftServer {
}
final GroupManagementRequest.Add add = request.getAdd();
if (add != null) {
- return groupAddAsync(request, add.getGroup());
+ return groupAddAsync(request, add.getGroup(), add.isFormat());
}
final GroupManagementRequest.Remove remove = request.getRemove();
if (remove != null) {
@@ -477,12 +478,13 @@ class RaftServerProxy implements RaftServer {
getId() + ": Request not supported " + request));
}
- private CompletableFuture<RaftClientReply> groupAddAsync(GroupManagementRequest request, RaftGroup newGroup) {
+ private CompletableFuture<RaftClientReply> groupAddAsync(
+ GroupManagementRequest request, RaftGroup newGroup, boolean format) {
if (!request.getRaftGroupId().equals(newGroup.getGroupId())) {
return JavaUtils.completeExceptionally(new GroupMismatchException(
getId() + ": Request group id (" + request.getRaftGroupId() + ") does not match the new group " + newGroup));
}
- return impls.addNew(newGroup)
+ return impls.addNew(newGroup, format? StartupOption.FORMAT: StartupOption.RECOVER)
.thenApplyAsync(newImpl -> {
LOG.debug("{}: newImpl = {}", getId(), newImpl);
try {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index 6777b9093..6e1ddd548 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -26,6 +26,7 @@ import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
@@ -45,7 +46,7 @@ public final class ServerImplUtils {
/** Create a {@link RaftServerProxy}. */
public static RaftServerProxy newRaftServer(
- RaftPeerId id, RaftGroup group, StateMachine.Registry stateMachineRegistry,
+ RaftPeerId id, RaftGroup group, RaftStorage.StartupOption option, StateMachine.Registry stateMachineRegistry,
RaftProperties properties, Parameters parameters) throws IOException {
RaftServer.LOG.debug("newRaftServer: {}, {}", id, group);
if (group != null && !group.getPeers().isEmpty()) {
@@ -53,7 +54,7 @@ public final class ServerImplUtils {
Preconditions.assertNotNull(group.getPeer(id), "RaftPeerId %s is not in RaftGroup %s", id, group);
}
final RaftServerProxy proxy = newRaftServer(id, stateMachineRegistry, properties, parameters);
- proxy.initGroups(group);
+ proxy.initGroups(group, option);
return proxy;
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 1ee2cab5e..e92f9b911 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -36,20 +36,14 @@ import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedCheckedSupplier;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.Timestamp;
-import java.io.Closeable;
-import java.io.File;
import java.io.IOException;
-import java.nio.channels.OverlappingFileLockException;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
@@ -63,7 +57,7 @@ import static org.apache.ratis.server.RaftServer.Division.LOG;
/**
* Common states of a raft peer. Protected by RaftServer's lock.
*/
-class ServerState implements Closeable {
+class ServerState {
private final RaftGroupMemberId memberId;
private final RaftServerImpl server;
/** Raft log */
@@ -73,7 +67,7 @@ class ServerState implements Closeable {
/** The thread that applies committed log entries to the state machine */
private final MemoizedSupplier<StateMachineUpdater> stateMachineUpdater;
/** local storage for log and snapshot */
- private RaftStorageImpl storage;
+ private final MemoizedCheckedSupplier<RaftStorageImpl, IOException> raftStorage;
private final SnapshotManager snapshotManager;
private volatile Timestamp lastNoLeaderTime;
private final TimeDuration noLeaderTimeout;
@@ -100,9 +94,8 @@ class ServerState implements Closeable {
*/
private final AtomicReference<TermIndex> latestInstalledSnapshot = new AtomicReference<>();
- ServerState(RaftPeerId id, RaftGroup group, RaftProperties prop,
- RaftServerImpl server, StateMachine stateMachine)
- throws IOException {
+ ServerState(RaftPeerId id, RaftGroup group, StateMachine stateMachine, RaftServerImpl server,
+ RaftStorage.StartupOption option, RaftProperties prop) {
this.memberId = RaftGroupMemberId.valueOf(id, group.getGroupId());
this.server = server;
Collection<RaftPeer> followerPeers = group.getPeers().stream()
@@ -117,36 +110,11 @@ class ServerState implements Closeable {
configurationManager = new ConfigurationManager(initialConf);
LOG.info("{}: {}", getMemberId(), configurationManager);
- boolean storageFound = false;
- List<File> directories = RaftServerConfigKeys.storageDir(prop);
- while (!directories.isEmpty()) {
- // use full uuid string to create a subdirectory
- File dir = chooseStorageDir(directories, group.getGroupId().getUuid().toString());
- try {
- storage = (RaftStorageImpl) RaftStorage.newBuilder()
- .setDirectory(dir)
- .setOption(RaftStorage.StartupOption.RECOVER)
- .setLogCorruptionPolicy(RaftServerConfigKeys.Log.corruptionPolicy(prop))
- .setStorageFreeSpaceMin(RaftServerConfigKeys.storageFreeSpaceMin(prop))
- .build();
- storageFound = true;
- break;
- } catch (IOException e) {
- if (e.getCause() instanceof OverlappingFileLockException) {
- throw e;
- }
- LOG.warn("Failed to init RaftStorage under {} for {}: {}",
- dir.getParent(), group.getGroupId().getUuid().toString(), e);
- directories.removeIf(d -> d.getAbsolutePath().equals(dir.getParent()));
- }
- }
-
- if (!storageFound) {
- throw new IOException("No healthy directories found for RaftStorage among: " +
- RaftServerConfigKeys.storageDir(prop));
- }
+ final String storageDirName = group.getGroupId().getUuid().toString();
+ this.raftStorage = MemoizedCheckedSupplier.valueOf(
+ () -> StorageImplUtils.initRaftStorage(storageDirName, option, prop));
- snapshotManager = new SnapshotManager(storage, id);
+ this.snapshotManager = StorageImplUtils.newSnapshotManager(id);
// On start the leader is null, start the clock now
this.leaderId = null;
@@ -163,7 +131,8 @@ class ServerState implements Closeable {
}
void initialize(StateMachine stateMachine) throws IOException {
- storage.initialize();
+ // initialize raft storage
+ final RaftStorageImpl storage = raftStorage.get();
// read configuration from the storage
Optional.ofNullable(storage.readRaftConfiguration()).ifPresent(this::setRaftConf);
@@ -180,32 +149,8 @@ class ServerState implements Closeable {
return memberId;
}
- static File chooseStorageDir(List<File> volumes, String targetSubDir) throws IOException {
- final Map<File, Integer> numberOfStorageDirPerVolume = new HashMap<>();
- final File[] empty = {};
- final List<File> resultList = new ArrayList<>();
- volumes.stream().flatMap(volume -> {
- final File[] dirs = Optional.ofNullable(volume.listFiles()).orElse(empty);
- numberOfStorageDirPerVolume.put(volume, dirs.length);
- return Arrays.stream(dirs);
- }).filter(dir -> targetSubDir.equals(dir.getName()))
- .forEach(resultList::add);
-
- if (resultList.size() > 1) {
- throw new IOException("More than one directories found for " + targetSubDir + ": " + resultList);
- }
- if (resultList.size() == 1) {
- return resultList.get(0);
- }
- return numberOfStorageDirPerVolume.entrySet().stream()
- .min(Map.Entry.comparingByValue())
- .map(Map.Entry::getKey)
- .map(v -> new File(v, targetSubDir))
- .orElseThrow(() -> new IOException("No storage directory found."));
- }
-
void writeRaftConfiguration(LogEntryProto conf) {
- storage.writeRaftConfiguration(conf);
+ getStorage().writeRaftConfiguration(conf);
}
void start() {
@@ -214,7 +159,8 @@ class ServerState implements Closeable {
private RaftLog initRaftLog(LongSupplier getSnapshotIndexFromStateMachine, RaftProperties prop) {
try {
- return initRaftLog(getMemberId(), server, storage, this::setRaftConf, getSnapshotIndexFromStateMachine, prop);
+ return initRaftLog(getMemberId(), server, getStorage(), this::setRaftConf,
+ getSnapshotIndexFromStateMachine, prop);
} catch (IOException e) {
throw new IllegalStateException(getMemberId() + ": Failed to initRaftLog.", e);
}
@@ -259,10 +205,6 @@ class ServerState implements Closeable {
return leaderId;
}
- boolean hasLeader() {
- return leaderId != null;
- }
-
/**
* Become a candidate and start leader election
*/
@@ -434,7 +376,7 @@ class ServerState implements Closeable {
void updateConfiguration(List<LogEntryProto> entries) {
if (entries != null && !entries.isEmpty()) {
configurationManager.removeConfigurations(entries.get(0).getIndex());
- entries.stream().forEach(this::setRaftConf);
+ entries.forEach(this::setRaftConf);
}
}
@@ -455,29 +397,45 @@ class ServerState implements Closeable {
getStateMachineUpdater().reloadStateMachine();
}
- @Override
- public void close() throws IOException {
+ void close() {
+ try {
+ if (stateMachineUpdater.isInitialized()) {
+ getStateMachineUpdater().stopAndJoin();
+ }
+ } catch (Throwable e) {
+ LOG.warn(getMemberId() + ": Failed to join " + getStateMachineUpdater(), e);
+ }
+ LOG.info("{}: applyIndex: {}", getMemberId(), getLastAppliedIndex());
+
try {
- getStateMachineUpdater().stopAndJoin();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.warn("{}: Interrupted when joining stateMachineUpdater", getMemberId(), e);
+ if (log.isInitialized()) {
+ getLog().close();
+ }
+ } catch (Throwable e) {
+ LOG.warn(getMemberId() + ": Failed to close raft log " + getLog(), e);
}
- LOG.info("{}: closes. applyIndex: {}", getMemberId(), getLastAppliedIndex());
- getLog().close();
- storage.close();
+ try {
+ if (raftStorage.isInitialized()) {
+ getStorage().close();
+ }
+ } catch (Throwable e) {
+ LOG.warn(getMemberId() + ": Failed to close raft storage " + getStorage(), e);
+ }
}
- RaftStorage getStorage() {
- return storage;
+ RaftStorageImpl getStorage() {
+ if (!raftStorage.isInitialized()) {
+ throw new IllegalStateException(getMemberId() + ": raftStorage is uninitialized.");
+ }
+ return raftStorage.getUnchecked();
}
void installSnapshot(InstallSnapshotRequestProto request) throws IOException {
// TODO: verify that we need to install the snapshot
StateMachine sm = server.getStateMachine();
sm.pause(); // pause the SM to prepare for install snapshot
- snapshotManager.installSnapshot(sm, request);
+ snapshotManager.installSnapshot(request, sm, getStorage().getStorageDir());
updateInstalledSnapshotIndex(TermIndex.valueOf(request.getSnapshotChunk().getTermIndex()));
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java
index a7fa13b53..a86cdf56b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java
@@ -19,6 +19,7 @@ package org.apache.ratis.server.storage;
import org.apache.ratis.util.AtomicFileOutputStream;
import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.SizeInBytes;
import java.io.File;
import java.io.IOException;
@@ -51,13 +52,13 @@ class RaftStorageDirectoryImpl implements RaftStorageDirectory {
private final File root; // root directory
private FileLock lock; // storage lock
- private long freeSpaceMin;
+ private final SizeInBytes freeSpaceMin;
/**
* Constructor
* @param dir directory corresponding to the storage
*/
- RaftStorageDirectoryImpl(File dir, long freeSpaceMin) {
+ RaftStorageDirectoryImpl(File dir, SizeInBytes freeSpaceMin) {
this.root = dir;
this.lock = null;
this.freeSpaceMin = freeSpaceMin;
@@ -177,7 +178,7 @@ class RaftStorageDirectoryImpl implements RaftStorageDirectory {
}
private boolean hasEnoughSpace() {
- return root.getFreeSpace() > freeSpaceMin;
+ return root.getFreeSpace() >= freeSpaceMin.getSize();
}
/**
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java
index 6513efd30..56972c3f7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java
@@ -23,6 +23,7 @@ import org.apache.ratis.server.RaftServerConfigKeys.Log.CorruptionPolicy;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.storage.RaftStorageDirectoryImpl.StorageState;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.SizeInBytes;
import java.io.File;
import java.io.FileNotFoundException;
@@ -34,17 +35,16 @@ import java.util.Optional;
/** The storage of a {@link org.apache.ratis.server.RaftServer}. */
public class RaftStorageImpl implements RaftStorage {
-
- // TODO support multiple storage directories
private final RaftStorageDirectoryImpl storageDir;
private final StartupOption startupOption;
private final CorruptionPolicy logCorruptionPolicy;
private volatile StorageState state = StorageState.UNINITIALIZED;
private volatile RaftStorageMetadataFileImpl metaFile;
- RaftStorageImpl(File dir, CorruptionPolicy logCorruptionPolicy, StartupOption option,
- long storageFeeSpaceMin) {
- this.storageDir = new RaftStorageDirectoryImpl(dir, storageFeeSpaceMin);
+ RaftStorageImpl(File dir, SizeInBytes freeSpaceMin, StartupOption option, CorruptionPolicy logCorruptionPolicy) {
+ LOG.debug("newRaftStorage: {}, freeSpaceMin={}, option={}, logCorruptionPolicy={}",
+ dir, freeSpaceMin, option, logCorruptionPolicy);
+ this.storageDir = new RaftStorageDirectoryImpl(dir, freeSpaceMin);
this.logCorruptionPolicy = Optional.ofNullable(logCorruptionPolicy).orElseGet(CorruptionPolicy::getDefault);
this.startupOption = option;
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
index aaa62a783..17294b572 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
@@ -54,21 +54,17 @@ public class SnapshotManager {
private static final String CORRUPT = ".corrupt";
private static final String TMP = ".tmp";
- private final RaftStorage storage;
private final RaftPeerId selfId;
private final Supplier<MessageDigest> digester = JavaUtils.memoize(MD5Hash::getDigester);
- public SnapshotManager(RaftStorage storage, RaftPeerId selfId) {
- this.storage = storage;
+ SnapshotManager(RaftPeerId selfId) {
this.selfId = selfId;
}
- public void installSnapshot(StateMachine stateMachine,
- InstallSnapshotRequestProto request) throws IOException {
- final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest =
- request.getSnapshotChunk();
+ public void installSnapshot(InstallSnapshotRequestProto request, StateMachine stateMachine, RaftStorageDirectory dir)
+ throws IOException {
+ final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk();
final long lastIncludedIndex = snapshotChunkRequest.getTermIndex().getIndex();
- final RaftStorageDirectory dir = storage.getStorageDir();
// create a unique temporary directory
final File tmpDir = new File(dir.getTmpDir(), "snapshot-" + snapshotChunkRequest.getRequestId());
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java
index aeff60148..865e2b2b1 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java
@@ -17,38 +17,158 @@
*/
package org.apache.ratis.server.storage;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.util.IOUtils;
-import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.server.RaftServerConfigKeys.Log;
+import org.apache.ratis.server.storage.RaftStorage.StartupOption;
+import org.apache.ratis.util.SizeInBytes;
import java.io.File;
import java.io.IOException;
-import java.util.concurrent.TimeUnit;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.ratis.server.RaftServer.Division.LOG;
public final class StorageImplUtils {
+ private static final File[] EMPTY_FILE_ARRAY = {};
private StorageImplUtils() {
//Never constructed
}
+ public static SnapshotManager newSnapshotManager(RaftPeerId id) {
+ return new SnapshotManager(id);
+ }
+
/** Create a {@link RaftStorageImpl}. */
- public static RaftStorageImpl newRaftStorage(File dir, RaftServerConfigKeys.Log.CorruptionPolicy logCorruptionPolicy,
- RaftStorage.StartupOption option, long storageFeeSpaceMin) throws IOException {
- RaftStorage.LOG.debug("newRaftStorage: {}, {}, {}, {}",dir, logCorruptionPolicy, option, storageFeeSpaceMin);
-
- final TimeDuration sleepTime = TimeDuration.valueOf(500, TimeUnit.MILLISECONDS);
- final RaftStorageImpl raftStorage;
- try {
- // attempt multiple times to avoid temporary bind exception
- raftStorage = JavaUtils.attemptRepeatedly(
- () -> new RaftStorageImpl(dir, logCorruptionPolicy, option, storageFeeSpaceMin),
- 5, sleepTime, "new RaftStorageImpl", RaftStorage.LOG);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw IOUtils.toInterruptedIOException(
- "Interrupted when creating RaftStorage " + dir, e);
+ public static RaftStorageImpl newRaftStorage(File dir, SizeInBytes freeSpaceMin,
+ RaftStorage.StartupOption option, Log.CorruptionPolicy logCorruptionPolicy) {
+ return new RaftStorageImpl(dir, freeSpaceMin, option, logCorruptionPolicy);
+ }
+
+ /** @return a list of existing subdirectories matching the given storage directory name from the given volumes. */
+ static List<File> getExistingStorageSubs(List<File> volumes, String targetSubDir, Map<File, Integer> dirsPerVol) {
+ return volumes.stream().flatMap(volume -> {
+ final File[] dirs = Optional.ofNullable(volume.listFiles()).orElse(EMPTY_FILE_ARRAY);
+ Optional.ofNullable(dirsPerVol).ifPresent(map -> map.put(volume, dirs.length));
+ return Arrays.stream(dirs);
+ }).filter(dir -> targetSubDir.equals(dir.getName()))
+ .collect(Collectors.toList());
+ }
+
+ /** @return a volume with the min dirs. */
+ static File chooseMin(Map<File, Integer> dirsPerVol) throws IOException {
+ return dirsPerVol.entrySet().stream()
+ .min(Map.Entry.comparingByValue())
+ .map(Map.Entry::getKey)
+ .orElseThrow(() -> new IOException("No storage directory found."));
+ }
+
+ /**
+ * Choose a {@link RaftStorage} for the given storage directory name from the given configuration properties
+ * and then try to call {@link RaftStorage#initialize()}.
+ * <p />
+ * {@link StartupOption#FORMAT}:
+ * - When there are more than one existing directories, throw an exception.
+ * - When there is an existing directory, throw an exception.
+ * - When there is no existing directory, try to initialize a new directory from the list specified
+ * in the configuration properties until a directory succeeded or all directories failed.
+ * <p />
+ * {@link StartupOption#RECOVER}:
+ * - When there are more than one existing directories, throw an exception.
+ * - When there is an existing directory, if it fails to initialize, throw an exception but not try a new directory.
+ * - When there is no existing directory, throw an exception.
+ *
+ * @param storageDirName the storage directory name
+ * @param option the startup option
+ * @param properties the configuration properties
+ * @return the chosen storage, which is initialized successfully.
+ */
+ public static RaftStorageImpl initRaftStorage(String storageDirName, StartupOption option,
+ RaftProperties properties) throws IOException {
+ return new Op(storageDirName, option, properties).run();
+ }
+
+ private static class Op {
+ private final String storageDirName;
+ private final StartupOption option;
+
+ private final SizeInBytes freeSpaceMin;
+ private final Log.CorruptionPolicy logCorruptionPolicy;
+ private final List<File> dirsInConf;
+
+ private final List<File> existingSubs;
+ private final Map<File, Integer> dirsPerVol = new HashMap<>();
+
+ Op(String storageDirName, StartupOption option, RaftProperties properties) {
+ this.storageDirName = storageDirName;
+ this.option = option;
+
+ this.freeSpaceMin = RaftServerConfigKeys.storageFreeSpaceMin(properties);
+ this.logCorruptionPolicy = RaftServerConfigKeys.Log.corruptionPolicy(properties);
+ this.dirsInConf = RaftServerConfigKeys.storageDir(properties);
+
+ this.existingSubs = getExistingStorageSubs(dirsInConf, this.storageDirName, dirsPerVol);
+ }
+
+ RaftStorageImpl run() throws IOException {
+ if (option == StartupOption.FORMAT) {
+ return format();
+ } else if (option == StartupOption.RECOVER) {
+ return recover();
+ } else {
+ throw new IllegalArgumentException("Illegal option: " + option);
+ }
+ }
+
+ private RaftStorageImpl format() throws IOException {
+ if (!existingSubs.isEmpty()) {
+ throw new IOException("Failed to " + option + ": One or more existing directories found " + existingSubs
+ + " for " + storageDirName);
+ }
+
+ for (; !dirsPerVol.isEmpty(); ) {
+ final File vol = chooseMin(dirsPerVol);
+ final File dir = new File(vol, storageDirName);
+ try {
+ final RaftStorageImpl storage = newRaftStorage(dir, freeSpaceMin, StartupOption.FORMAT, logCorruptionPolicy);
+ storage.initialize();
+ return storage;
+ } catch (Throwable e) {
+ LOG.warn("Failed to initialize a new directory " + dir.getAbsolutePath(), e);
+ dirsPerVol.remove(vol);
+ }
+ }
+ throw new IOException("Failed to FORMAT a new storage dir for " + storageDirName + " from " + dirsInConf);
+ }
+
+ private RaftStorageImpl recover() throws IOException {
+ final int size = existingSubs.size();
+ if (size > 1) {
+ throw new IOException("Failed to " + option + ": More than one existing directories found "
+ + existingSubs + " for " + storageDirName);
+ } else if (size == 0) {
+ throw new IOException("Failed to " + option + ": Storage directory not found for "
+ + storageDirName + " from " + dirsInConf);
+ }
+
+ final File dir = existingSubs.get(0);
+ try {
+ final RaftStorageImpl storage = newRaftStorage(dir, freeSpaceMin, StartupOption.RECOVER, logCorruptionPolicy);
+ storage.initialize();
+ return storage;
+ } catch (Throwable e) {
+ if (e instanceof IOException) {
+ throw e;
+ }
+ throw new IOException("Failed to initialize the existing directory " + dir.getAbsolutePath(), e);
+ }
}
- return raftStorage;
}
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 1f4047524..dfed96952 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -40,6 +40,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.ServerFactory;
import org.apache.ratis.server.raftlog.memory.MemoryRaftLog;
import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.util.CollectionUtils;
@@ -385,8 +386,9 @@ public abstract class MiniRaftCluster implements Closeable {
}
final RaftProperties prop = new RaftProperties(properties);
RaftServerConfigKeys.setStorageDir(prop, Collections.singletonList(dir));
- return ServerImplUtils.newRaftServer(id, group, getStateMachineRegistry(prop), prop,
- setPropertiesAndInitParameters(id, group, prop));
+ return ServerImplUtils.newRaftServer(id, group,
+ format? RaftStorage.StartupOption.FORMAT: RaftStorage.StartupOption.RECOVER,
+ getStateMachineRegistry(prop), prop, setPropertiesAndInitParameters(id, group, prop));
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index e09ca19d1..5e6353f92 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -365,7 +365,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
// start the two new peers
LOG.info("Start new peers");
for (RaftPeer np : c1.newPeers) {
- cluster.restartServer(np.getId(), false);
+ cluster.restartServer(np.getId(), true);
}
Assert.assertTrue(client.admin().setConfiguration(c1.allPeersInNewConf).isSuccess());
}
@@ -504,7 +504,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
LOG.info("start new peers: {}", Arrays.asList(c1.newPeers));
for (RaftPeer np : c1.newPeers) {
- cluster.restartServer(np.getId(), false);
+ cluster.restartServer(np.getId(), true);
}
try {
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
index ffc8de597..7027bd8ea 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
@@ -114,7 +114,7 @@ public class TestRaftStorage extends BaseTest {
*/
@Test
public void testStorage() throws Exception {
- final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(storageDir, 0);
+ final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(storageDir, SizeInBytes.ZERO);
try {
StorageState state = sd.analyzeStorage(true);
Assert.assertEquals(StorageState.NOT_FORMATTED, state);
@@ -171,7 +171,7 @@ public class TestRaftStorage extends BaseTest {
Assert.assertEquals(StorageState.NORMAL, storage.getState());
storage.close();
- final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(storageDir, 0);
+ final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(storageDir, SizeInBytes.ZERO);
File metaFile = sd.getMetaFile();
FileUtils.move(metaFile, sd.getMetaTmpFile());
@@ -286,7 +286,7 @@ public class TestRaftStorage extends BaseTest {
File mockStorageDir = Mockito.spy(storageDir);
Mockito.when(mockStorageDir.getFreeSpace()).thenReturn(100L); // 100B
- final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(mockStorageDir, 104857600); // 100MB
+ final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(mockStorageDir, SizeInBytes.valueOf("100M"));
StorageState state = sd.analyzeStorage(false);
Assert.assertEquals(StorageState.NO_SPACE, state);
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/impl/TestServerState.java b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestStorageImplUtils.java
similarity index 82%
rename from ratis-test/src/test/java/org/apache/ratis/server/impl/TestServerState.java
rename to ratis-test/src/test/java/org/apache/ratis/server/storage/TestStorageImplUtils.java
index 75aef53a1..ff38a6bd9 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/impl/TestServerState.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestStorageImplUtils.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.server.impl;
+package org.apache.ratis.server.storage;
import org.apache.ratis.BaseTest;
import org.apache.ratis.util.FileUtils;
@@ -28,7 +28,9 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
@@ -37,13 +39,20 @@ import java.util.stream.IntStream;
/**
* Test cases to verify ServerState.
*/
-public class TestServerState {
+public class TestStorageImplUtils {
private static final Supplier<File> rootTestDir = JavaUtils.memoize(
() -> new File(BaseTest.getRootTestDir(),
- JavaUtils.getClassSimpleName(TestServerState.class) +
+ JavaUtils.getClassSimpleName(TestStorageImplUtils.class) +
Integer.toHexString(ThreadLocalRandom.current().nextInt())));
+ static File chooseNewStorageDir(List<File> volumes, String sub) throws IOException {
+ final Map<File, Integer> numDirPerVolume = new HashMap<>();
+ StorageImplUtils.getExistingStorageSubs(volumes, sub, numDirPerVolume);
+ final File vol = StorageImplUtils.chooseMin(numDirPerVolume);
+ return new File(vol, sub);
+ }
+
@AfterClass
public static void tearDown() throws IOException {
FileUtils.deleteFully(rootTestDir.get());
@@ -60,8 +69,8 @@ public class TestServerState {
List<File> directories = Collections.singletonList(testDir);
String subDirOne = UUID.randomUUID().toString();
String subDirTwo = UUID.randomUUID().toString();
- File storageDirOne = ServerState.chooseStorageDir(directories, subDirOne);
- File storageDirTwo = ServerState.chooseStorageDir(directories, subDirTwo);
+ final File storageDirOne = chooseNewStorageDir(directories, subDirOne);
+ final File storageDirTwo = chooseNewStorageDir(directories, subDirTwo);
File expectedOne = new File(testDir, subDirOne);
File expectedTwo = new File(testDir, subDirTwo);
Assert.assertEquals(expectedOne.getCanonicalPath(),
@@ -100,7 +109,7 @@ public class TestServerState {
}
});
String subDir = UUID.randomUUID().toString();
- File storageDirectory = ServerState.chooseStorageDir(directories, subDir);
+ final File storageDirectory = chooseNewStorageDir(directories, subDir);
File expected = new File(directories.get(6), subDir);
Assert.assertEquals(expected.getCanonicalPath(),
storageDirectory.getCanonicalPath());
@@ -108,19 +117,15 @@ public class TestServerState {
/**
* Tests choosing of storage directory when only no volume is configured.
- *
- * @throws IOException in case of exception.
*/
@Test
public void testChooseStorageDirWithNoVolume() {
try {
- ServerState.chooseStorageDir(
- Collections.emptyList(), UUID.randomUUID().toString());
+ chooseNewStorageDir(Collections.emptyList(), UUID.randomUUID().toString());
Assert.fail();
} catch (IOException ex) {
String expectedErrMsg = "No storage directory found.";
Assert.assertEquals(expectedErrMsg, ex.getMessage());
}
}
-
}
\ No newline at end of file