You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by dr...@apache.org on 2022/09/30 06:45:18 UTC
[ratis] 01/03: Revert "RATIS-1677. Do not auto format RaftStorage in RECOVER. (#718)"
This is an automated email from the ASF dual-hosted git repository.
dragonyliu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 4332b0fe20234915765b5c16ed13883d6d651370
Author: Yaolong Liu <ly...@163.com>
AuthorDate: Wed Sep 21 19:13:03 2022 +0800
Revert "RATIS-1677. Do not auto format RaftStorage in RECOVER. (#718)"
This reverts commit 7a4543b1cedfb6b9b391ed6c6094b88fecacd546.
---
.../ratis/client/api/GroupManagementApi.java | 12 +-
.../apache/ratis/client/impl/ClientProtoUtils.java | 9 +-
.../ratis/client/impl/GroupManagementImpl.java | 4 +-
.../ratis/protocol/GroupManagementRequest.java | 13 +-
.../apache/ratis/util/MemoizedCheckedSupplier.java | 90 ------------
.../java/org/apache/ratis/util/Preconditions.java | 5 -
.../java/org/apache/ratis/util/SizeInBytes.java | 1 -
ratis-proto/src/main/proto/Raft.proto | 1 -
.../java/org/apache/ratis/server/RaftServer.java | 16 +--
.../apache/ratis/server/storage/RaftStorage.java | 5 +-
.../apache/ratis/server/impl/RaftServerImpl.java | 9 +-
.../apache/ratis/server/impl/RaftServerProxy.java | 26 ++--
.../apache/ratis/server/impl/ServerImplUtils.java | 5 +-
.../org/apache/ratis/server/impl/ServerState.java | 128 +++++++++++------
.../server/storage/RaftStorageDirectoryImpl.java | 7 +-
.../ratis/server/storage/RaftStorageImpl.java | 10 +-
.../ratis/server/storage/SnapshotManager.java | 12 +-
.../ratis/server/storage/StorageImplUtils.java | 160 +++------------------
.../apache/ratis/server/impl/MiniRaftCluster.java | 6 +-
.../server/impl/RaftReconfigurationBaseTest.java | 4 +-
.../TestServerState.java} | 27 ++--
.../ratis/server/storage/TestRaftStorage.java | 6 +-
22 files changed, 174 insertions(+), 382 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/GroupManagementApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/GroupManagementApi.java
index 558747048..1d3bc00b1 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/GroupManagementApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/GroupManagementApi.java
@@ -29,16 +29,8 @@ import java.io.IOException;
* APIs to support group management operations such as add, remove, list and info to a particular server.
*/
public interface GroupManagementApi {
- /**
- * Add a new group.
- * @param format Should it format the storage?
- */
- RaftClientReply add(RaftGroup newGroup, boolean format) throws IOException;
-
- /** The same as add(newGroup, true). */
- default RaftClientReply add(RaftGroup newGroup) throws IOException {
- return add(newGroup, true);
- }
+ /** Add a new group. */
+ RaftClientReply add(RaftGroup newGroup) throws IOException;
/** Remove a group. */
RaftClientReply remove(RaftGroupId groupId, boolean deleteDirectory, boolean renameDirectory) throws IOException;
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 1ac825850..859e1d4f0 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -571,9 +571,8 @@ public interface ClientProtoUtils {
final RaftPeerId serverId = RaftPeerId.valueOf(m.getReplyId());
switch(p.getOpCase()) {
case GROUPADD:
- final GroupAddRequestProto add = p.getGroupAdd();
return GroupManagementRequest.newAdd(clientId, serverId, m.getCallId(),
- ProtoUtils.toRaftGroup(add.getGroup()), add.getFormat());
+ ProtoUtils.toRaftGroup(p.getGroupAdd().getGroup()));
case GROUPREMOVE:
final GroupRemoveRequestProto remove = p.getGroupRemove();
return GroupManagementRequest.newRemove(clientId, serverId, m.getCallId(),
@@ -610,10 +609,8 @@ public interface ClientProtoUtils {
.setRpcRequest(toRaftRpcRequestProtoBuilder(request));
final GroupManagementRequest.Add add = request.getAdd();
if (add != null) {
- b.setGroupAdd(GroupAddRequestProto.newBuilder()
- .setGroup(ProtoUtils.toRaftGroupProtoBuilder(add.getGroup()))
- .setFormat(add.isFormat())
- .build());
+ b.setGroupAdd(GroupAddRequestProto.newBuilder().setGroup(
+ ProtoUtils.toRaftGroupProtoBuilder(add.getGroup())).build());
}
final GroupManagementRequest.Remove remove = request.getRemove();
if (remove != null) {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
index 9501bc2ea..27e0bbffc 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
@@ -43,13 +43,13 @@ class GroupManagementImpl implements GroupManagementApi {
}
@Override
- public RaftClientReply add(RaftGroup newGroup, boolean format) throws IOException {
+ public RaftClientReply add(RaftGroup newGroup) throws IOException {
Objects.requireNonNull(newGroup, "newGroup == null");
final long callId = CallId.getAndIncrement();
client.getClientRpc().addRaftPeers(newGroup.getPeers());
return client.io().sendRequestWithRetry(
- () -> GroupManagementRequest.newAdd(client.getId(), server, callId, newGroup, format));
+ () -> GroupManagementRequest.newAdd(client.getId(), server, callId, newGroup));
}
@Override
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java
index 2783d2c65..d370dfc4c 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java
@@ -26,11 +26,9 @@ public final class GroupManagementRequest extends RaftClientRequest {
public static class Add extends Op {
private final RaftGroup group;
- private final boolean format;
- public Add(RaftGroup group, boolean format) {
+ public Add(RaftGroup group) {
this.group = group;
- this.format = format;
}
@Override
@@ -42,10 +40,6 @@ public final class GroupManagementRequest extends RaftClientRequest {
return group;
}
- public boolean isFormat() {
- return format;
- }
-
@Override
public String toString() {
return JavaUtils.getClassSimpleName(getClass()) + ":" + getGroup();
@@ -85,9 +79,8 @@ public final class GroupManagementRequest extends RaftClientRequest {
}
}
- public static GroupManagementRequest newAdd(ClientId clientId, RaftPeerId serverId, long callId,
- RaftGroup group, boolean format) {
- return new GroupManagementRequest(clientId, serverId, callId, new Add(group, format));
+ public static GroupManagementRequest newAdd(ClientId clientId, RaftPeerId serverId, long callId, RaftGroup group) {
+ return new GroupManagementRequest(clientId, serverId, callId, new Add(group));
}
public static GroupManagementRequest newRemove(ClientId clientId, RaftPeerId serverId, long callId,
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/MemoizedCheckedSupplier.java b/ratis-common/src/main/java/org/apache/ratis/util/MemoizedCheckedSupplier.java
deleted file mode 100644
index cf2d06023..000000000
--- a/ratis-common/src/main/java/org/apache/ratis/util/MemoizedCheckedSupplier.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.util;
-
-import org.apache.ratis.util.function.CheckedSupplier;
-
-import java.util.Objects;
-
-/**
- * A memoized supplier is a {@link CheckedSupplier}
- * which gets a value by invoking its initializer once.
- * and then keeps returning the same value as its supplied results.
- *
- * This class is thread safe.
- *
- * @param <RETURN> The return type of the supplier.
- * @param <THROW> The throwable type of the supplier.
- */
-public final class MemoizedCheckedSupplier<RETURN, THROW extends Throwable>
- implements CheckedSupplier<RETURN, THROW> {
- /**
- * @param supplier to supply at most one non-null value.
- * @return a {@link MemoizedCheckedSupplier} with the given supplier.
- */
- public static <RETURN, THROW extends Throwable> MemoizedCheckedSupplier<RETURN, THROW> valueOf(
- CheckedSupplier<RETURN, THROW> supplier) {
- return supplier instanceof MemoizedCheckedSupplier ?
- (MemoizedCheckedSupplier<RETURN, THROW>) supplier : new MemoizedCheckedSupplier<>(supplier);
- }
-
- private final CheckedSupplier<RETURN, THROW> initializer;
- private volatile RETURN value = null;
-
- /**
- * Create a memoized supplier.
- * @param initializer to supply at most one non-null value.
- */
- private MemoizedCheckedSupplier(CheckedSupplier<RETURN, THROW> initializer) {
- Objects.requireNonNull(initializer, "initializer == null");
- this.initializer = initializer;
- }
-
- /** @return the lazily initialized object. */
- @Override
- public RETURN get() throws THROW {
- RETURN v = value;
- if (v == null) {
- synchronized (this) {
- v = value;
- if (v == null) {
- v = value = Objects.requireNonNull(initializer.get(), "initializer.get() returns null");
- }
- }
- }
- return v;
- }
-
- /**
- * @return the already initialized object.
- * @throws NullPointerException if the object is uninitialized.
- */
- public RETURN getUnchecked() {
- return Objects.requireNonNull(value, "value == null");
- }
-
- /** @return is the object initialized? */
- public boolean isInitialized() {
- return value != null;
- }
-
- @Override
- public String toString() {
- return isInitialized()? "Memoized:" + value: "UNINITIALIZED";
- }
-}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
index 902c1f5e6..e9a002612 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
@@ -19,7 +19,6 @@ package org.apache.ratis.util;
import java.util.Collections;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
@@ -114,10 +113,6 @@ public interface Preconditions {
return clazz.cast(object);
}
- static <K, V> void assertEmpty(Map<K, V> map, Object name) {
- assertTrue(map.isEmpty(), () -> "The " + name + " map is non-empty: " + map);
- }
-
static <T> void assertUnique(Iterable<T> first) {
assertUnique(first, Collections.emptyList());
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java b/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java
index 683f0da62..25667a378 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java
@@ -23,7 +23,6 @@ import java.util.Objects;
* Size which may be constructed with a {@link TraditionalBinaryPrefix}.
*/
public final class SizeInBytes {
- public static final SizeInBytes ZERO = valueOf(0);
public static final SizeInBytes ONE_KB = valueOf("1k");
public static final SizeInBytes ONE_MB = valueOf("1m");
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index 9fe2494bf..c571f0c73 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -469,7 +469,6 @@ message StartLeaderElectionReplyProto {
// A request to add a new group
message GroupAddRequestProto {
RaftGroupProto group = 1; // the group to be added.
- bool format = 2; // Should it format the storage?
}
message GroupRemoveRequestProto {
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
index 8d00d29db..e9719b96c 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -167,8 +167,8 @@ public interface RaftServer extends Closeable, RpcType.Get,
private static Method initNewRaftServerMethod() {
final String className = RaftServer.class.getPackage().getName() + ".impl.ServerImplUtils";
- final Class<?>[] argClasses = {RaftPeerId.class, RaftGroup.class, RaftStorage.StartupOption.class,
- StateMachine.Registry.class, RaftProperties.class, Parameters.class};
+ final Class<?>[] argClasses = {RaftPeerId.class, RaftGroup.class, StateMachine.Registry.class,
+ RaftProperties.class, Parameters.class};
try {
final Class<?> clazz = ReflectionUtils.getClassByName(className);
return clazz.getMethod("newRaftServer", argClasses);
@@ -177,12 +177,12 @@ public interface RaftServer extends Closeable, RpcType.Get,
}
}
- private static RaftServer newRaftServer(RaftPeerId serverId, RaftGroup group, RaftStorage.StartupOption option,
+ private static RaftServer newRaftServer(RaftPeerId serverId, RaftGroup group,
StateMachine.Registry stateMachineRegistry, RaftProperties properties, Parameters parameters)
throws IOException {
try {
return (RaftServer) NEW_RAFT_SERVER_METHOD.invoke(null,
- serverId, group, option, stateMachineRegistry, properties, parameters);
+ serverId, group, stateMachineRegistry, properties, parameters);
} catch (IllegalAccessException e) {
throw new IllegalStateException("Failed to build " + serverId, e);
} catch (InvocationTargetException e) {
@@ -193,7 +193,6 @@ public interface RaftServer extends Closeable, RpcType.Get,
private RaftPeerId serverId;
private StateMachine.Registry stateMachineRegistry ;
private RaftGroup group = null;
- private RaftStorage.StartupOption option = RaftStorage.StartupOption.RECOVER;
private RaftProperties properties;
private Parameters parameters;
@@ -202,7 +201,6 @@ public interface RaftServer extends Closeable, RpcType.Get,
return newRaftServer(
serverId,
group,
- option,
Objects.requireNonNull(stateMachineRegistry , "Neither 'stateMachine' nor 'setStateMachineRegistry' " +
"is initialized."),
Objects.requireNonNull(properties, "The 'properties' field is not initialized."),
@@ -232,12 +230,6 @@ public interface RaftServer extends Closeable, RpcType.Get,
return this;
}
- /** Set the startup option for the group. */
- public Builder setOption(RaftStorage.StartupOption option) {
- this.option = option;
- return this;
- }
-
/** Set {@link RaftProperties}. */
public Builder setProperties(RaftProperties properties) {
this.properties = properties;
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java b/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
index 59d87c37a..dde3c31bc 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
@@ -62,7 +62,8 @@ public interface RaftStorage extends Closeable {
private static Method initNewRaftStorageMethod() {
final String className = RaftStorage.class.getPackage().getName() + ".StorageImplUtils";
- final Class<?>[] argClasses = {File.class, SizeInBytes.class, StartupOption.class, CorruptionPolicy.class};
+ //final String className = "org.apache.ratis.server.storage.RaftStorageImpl";
+ final Class<?>[] argClasses = { File.class, CorruptionPolicy.class, StartupOption.class, long.class };
try {
final Class<?> clazz = ReflectionUtils.getClassByName(className);
return clazz.getMethod("newRaftStorage", argClasses);
@@ -75,7 +76,7 @@ public interface RaftStorage extends Closeable {
StartupOption option, SizeInBytes storageFreeSpaceMin) throws IOException {
try {
return (RaftStorage) NEW_RAFT_STORAGE_METHOD.invoke(null,
- dir, storageFreeSpaceMin, option, logCorruptionPolicy);
+ dir, logCorruptionPolicy, option, storageFreeSpaceMin.getSize());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Failed to build " + dir, e);
} catch (InvocationTargetException e) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index ee7b749dd..1994fa8de 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -189,8 +189,7 @@ class RaftServerImpl implements RaftServer.Division,
private final AtomicBoolean firstElectionSinceStartup = new AtomicBoolean(true);
- RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy, RaftStorage.StartupOption option)
- throws IOException {
+ RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy) throws IOException {
final RaftPeerId id = proxy.getId();
LOG.info("{}: new RaftServerImpl for {} with {}", id, group, stateMachine);
this.lifeCycle = new LifeCycle(id);
@@ -203,7 +202,7 @@ class RaftServerImpl implements RaftServer.Division,
this.sleepDeviationThreshold = RaftServerConfigKeys.sleepDeviationThreshold(properties);
this.proxy = proxy;
- this.state = new ServerState(id, group, stateMachine, this, option, properties);
+ this.state = new ServerState(id, group, properties, this, stateMachine);
this.retryCache = new RetryCacheImpl(properties);
this.dataStreamMap = new DataStreamMapImpl(id);
@@ -576,8 +575,8 @@ class RaftServerImpl implements RaftServer.Division,
}
GroupInfoReply getGroupInfo(GroupInfoRequest request) {
- final RaftStorageDirectory dir = state.getStorage().getStorageDir();
- return new GroupInfoReply(request, getCommitInfos(), getGroup(), getRoleInfoProto(), dir.isHealthy());
+ return new GroupInfoReply(request, getCommitInfos(),
+ getGroup(), getRoleInfoProto(), state.getStorage().getStorageDir().isHealthy());
}
RoleInfoProto getRoleInfoProto() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 0825e7d12..b8cee7f53 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -38,7 +38,6 @@ import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.ServerFactory;
-import org.apache.ratis.server.storage.RaftStorage.StartupOption;
import org.apache.ratis.util.ConcurrentUtils;
import org.apache.ratis.util.JvmPauseMonitor;
import org.apache.ratis.server.RaftServerConfigKeys;
@@ -81,7 +80,7 @@ class RaftServerProxy implements RaftServer {
private final ConcurrentMap<RaftGroupId, CompletableFuture<RaftServerImpl>> map = new ConcurrentHashMap<>();
private boolean isClosed = false;
- synchronized CompletableFuture<RaftServerImpl> addNew(RaftGroup group, StartupOption option) {
+ synchronized CompletableFuture<RaftServerImpl> addNew(RaftGroup group) {
if (isClosed) {
return JavaUtils.completeExceptionally(new AlreadyClosedException(
getId() + ": Failed to add " + group + " since the server is already closed"));
@@ -91,7 +90,7 @@ class RaftServerProxy implements RaftServer {
getId() + ": Failed to add " + group + " since the group already exists in the map."));
}
final RaftGroupId groupId = group.getGroupId();
- final CompletableFuture<RaftServerImpl> newImpl = newRaftServerImpl(group, option);
+ final CompletableFuture<RaftServerImpl> newImpl = newRaftServerImpl(group);
final CompletableFuture<RaftServerImpl> previous = map.put(groupId, newImpl);
Preconditions.assertNull(previous, "previous");
LOG.info("{}: addNew {} returns {}", getId(), group, toString(groupId, newImpl));
@@ -231,7 +230,7 @@ class RaftServerProxy implements RaftServer {
}
/** Check the storage dir and add groups*/
- void initGroups(RaftGroup group, StartupOption option) {
+ void initGroups(RaftGroup group) {
final Optional<RaftGroup> raftGroup = Optional.ofNullable(group);
final RaftGroupId raftGroupId = raftGroup.map(RaftGroup::getGroupId).orElse(null);
final Predicate<RaftGroupId> shouldAdd = gid -> gid != null && !gid.equals(raftGroupId);
@@ -242,7 +241,7 @@ class RaftServerProxy implements RaftServer {
.filter(File::isDirectory)
.forEach(sub -> initGroupDir(sub, shouldAdd)),
executor).join();
- raftGroup.ifPresent(g -> addGroup(g, option));
+ raftGroup.ifPresent(this::addGroup);
}
private void initGroupDir(File sub, Predicate<RaftGroupId> shouldAdd) {
@@ -256,7 +255,7 @@ class RaftServerProxy implements RaftServer {
" ignoring it. ", getId(), sub.getAbsolutePath());
}
if (shouldAdd.test(groupId)) {
- addGroup(RaftGroup.valueOf(groupId), StartupOption.RECOVER);
+ addGroup(RaftGroup.valueOf(groupId));
}
} catch (Exception e) {
LOG.warn(getId() + ": Failed to initialize the group directory "
@@ -270,11 +269,11 @@ class RaftServerProxy implements RaftServer {
getDataStreamServerRpc().addRaftPeers(others);
}
- private CompletableFuture<RaftServerImpl> newRaftServerImpl(RaftGroup group, StartupOption option) {
+ private CompletableFuture<RaftServerImpl> newRaftServerImpl(RaftGroup group) {
return CompletableFuture.supplyAsync(() -> {
try {
addRaftPeers(group.getPeers());
- return new RaftServerImpl(group, stateMachineRegistry.apply(group.getGroupId()), this, option);
+ return new RaftServerImpl(group, stateMachineRegistry.apply(group.getGroupId()), this);
} catch(IOException e) {
throw new CompletionException(getId() + ": Failed to initialize server for " + group, e);
}
@@ -342,8 +341,8 @@ class RaftServerProxy implements RaftServer {
return dataStreamServerRpc;
}
- private CompletableFuture<RaftServerImpl> addGroup(RaftGroup group, StartupOption option) {
- return impls.addNew(group, option);
+ private CompletableFuture<RaftServerImpl> addGroup(RaftGroup group) {
+ return impls.addNew(group);
}
private CompletableFuture<RaftServerImpl> getImplFuture(RaftGroupId groupId) {
@@ -467,7 +466,7 @@ class RaftServerProxy implements RaftServer {
}
final GroupManagementRequest.Add add = request.getAdd();
if (add != null) {
- return groupAddAsync(request, add.getGroup(), add.isFormat());
+ return groupAddAsync(request, add.getGroup());
}
final GroupManagementRequest.Remove remove = request.getRemove();
if (remove != null) {
@@ -478,13 +477,12 @@ class RaftServerProxy implements RaftServer {
getId() + ": Request not supported " + request));
}
- private CompletableFuture<RaftClientReply> groupAddAsync(
- GroupManagementRequest request, RaftGroup newGroup, boolean format) {
+ private CompletableFuture<RaftClientReply> groupAddAsync(GroupManagementRequest request, RaftGroup newGroup) {
if (!request.getRaftGroupId().equals(newGroup.getGroupId())) {
return JavaUtils.completeExceptionally(new GroupMismatchException(
getId() + ": Request group id (" + request.getRaftGroupId() + ") does not match the new group " + newGroup));
}
- return impls.addNew(newGroup, format? StartupOption.FORMAT: StartupOption.RECOVER)
+ return impls.addNew(newGroup)
.thenApplyAsync(newImpl -> {
LOG.debug("{}: newImpl = {}", getId(), newImpl);
try {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index 6e1ddd548..6777b9093 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -26,7 +26,6 @@ import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
-import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
@@ -46,7 +45,7 @@ public final class ServerImplUtils {
/** Create a {@link RaftServerProxy}. */
public static RaftServerProxy newRaftServer(
- RaftPeerId id, RaftGroup group, RaftStorage.StartupOption option, StateMachine.Registry stateMachineRegistry,
+ RaftPeerId id, RaftGroup group, StateMachine.Registry stateMachineRegistry,
RaftProperties properties, Parameters parameters) throws IOException {
RaftServer.LOG.debug("newRaftServer: {}, {}", id, group);
if (group != null && !group.getPeers().isEmpty()) {
@@ -54,7 +53,7 @@ public final class ServerImplUtils {
Preconditions.assertNotNull(group.getPeer(id), "RaftPeerId %s is not in RaftGroup %s", id, group);
}
final RaftServerProxy proxy = newRaftServer(id, stateMachineRegistry, properties, parameters);
- proxy.initGroups(group, option);
+ proxy.initGroups(group);
return proxy;
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index e92f9b911..1ee2cab5e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -36,14 +36,20 @@ import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.MemoizedCheckedSupplier;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.Timestamp;
+import java.io.Closeable;
+import java.io.File;
import java.io.IOException;
+import java.nio.channels.OverlappingFileLockException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
@@ -57,7 +63,7 @@ import static org.apache.ratis.server.RaftServer.Division.LOG;
/**
* Common states of a raft peer. Protected by RaftServer's lock.
*/
-class ServerState {
+class ServerState implements Closeable {
private final RaftGroupMemberId memberId;
private final RaftServerImpl server;
/** Raft log */
@@ -67,7 +73,7 @@ class ServerState {
/** The thread that applies committed log entries to the state machine */
private final MemoizedSupplier<StateMachineUpdater> stateMachineUpdater;
/** local storage for log and snapshot */
- private final MemoizedCheckedSupplier<RaftStorageImpl, IOException> raftStorage;
+ private RaftStorageImpl storage;
private final SnapshotManager snapshotManager;
private volatile Timestamp lastNoLeaderTime;
private final TimeDuration noLeaderTimeout;
@@ -94,8 +100,9 @@ class ServerState {
*/
private final AtomicReference<TermIndex> latestInstalledSnapshot = new AtomicReference<>();
- ServerState(RaftPeerId id, RaftGroup group, StateMachine stateMachine, RaftServerImpl server,
- RaftStorage.StartupOption option, RaftProperties prop) {
+ ServerState(RaftPeerId id, RaftGroup group, RaftProperties prop,
+ RaftServerImpl server, StateMachine stateMachine)
+ throws IOException {
this.memberId = RaftGroupMemberId.valueOf(id, group.getGroupId());
this.server = server;
Collection<RaftPeer> followerPeers = group.getPeers().stream()
@@ -110,11 +117,36 @@ class ServerState {
configurationManager = new ConfigurationManager(initialConf);
LOG.info("{}: {}", getMemberId(), configurationManager);
- final String storageDirName = group.getGroupId().getUuid().toString();
- this.raftStorage = MemoizedCheckedSupplier.valueOf(
- () -> StorageImplUtils.initRaftStorage(storageDirName, option, prop));
+ boolean storageFound = false;
+ List<File> directories = RaftServerConfigKeys.storageDir(prop);
+ while (!directories.isEmpty()) {
+ // use full uuid string to create a subdirectory
+ File dir = chooseStorageDir(directories, group.getGroupId().getUuid().toString());
+ try {
+ storage = (RaftStorageImpl) RaftStorage.newBuilder()
+ .setDirectory(dir)
+ .setOption(RaftStorage.StartupOption.RECOVER)
+ .setLogCorruptionPolicy(RaftServerConfigKeys.Log.corruptionPolicy(prop))
+ .setStorageFreeSpaceMin(RaftServerConfigKeys.storageFreeSpaceMin(prop))
+ .build();
+ storageFound = true;
+ break;
+ } catch (IOException e) {
+ if (e.getCause() instanceof OverlappingFileLockException) {
+ throw e;
+ }
+ LOG.warn("Failed to init RaftStorage under {} for {}: {}",
+ dir.getParent(), group.getGroupId().getUuid().toString(), e);
+ directories.removeIf(d -> d.getAbsolutePath().equals(dir.getParent()));
+ }
+ }
+
+ if (!storageFound) {
+ throw new IOException("No healthy directories found for RaftStorage among: " +
+ RaftServerConfigKeys.storageDir(prop));
+ }
- this.snapshotManager = StorageImplUtils.newSnapshotManager(id);
+ snapshotManager = new SnapshotManager(storage, id);
// On start the leader is null, start the clock now
this.leaderId = null;
@@ -131,8 +163,7 @@ class ServerState {
}
void initialize(StateMachine stateMachine) throws IOException {
- // initialize raft storage
- final RaftStorageImpl storage = raftStorage.get();
+ storage.initialize();
// read configuration from the storage
Optional.ofNullable(storage.readRaftConfiguration()).ifPresent(this::setRaftConf);
@@ -149,8 +180,32 @@ class ServerState {
return memberId;
}
+ static File chooseStorageDir(List<File> volumes, String targetSubDir) throws IOException {
+ final Map<File, Integer> numberOfStorageDirPerVolume = new HashMap<>();
+ final File[] empty = {};
+ final List<File> resultList = new ArrayList<>();
+ volumes.stream().flatMap(volume -> {
+ final File[] dirs = Optional.ofNullable(volume.listFiles()).orElse(empty);
+ numberOfStorageDirPerVolume.put(volume, dirs.length);
+ return Arrays.stream(dirs);
+ }).filter(dir -> targetSubDir.equals(dir.getName()))
+ .forEach(resultList::add);
+
+ if (resultList.size() > 1) {
+ throw new IOException("More than one directories found for " + targetSubDir + ": " + resultList);
+ }
+ if (resultList.size() == 1) {
+ return resultList.get(0);
+ }
+ return numberOfStorageDirPerVolume.entrySet().stream()
+ .min(Map.Entry.comparingByValue())
+ .map(Map.Entry::getKey)
+ .map(v -> new File(v, targetSubDir))
+ .orElseThrow(() -> new IOException("No storage directory found."));
+ }
+
void writeRaftConfiguration(LogEntryProto conf) {
- getStorage().writeRaftConfiguration(conf);
+ storage.writeRaftConfiguration(conf);
}
void start() {
@@ -159,8 +214,7 @@ class ServerState {
private RaftLog initRaftLog(LongSupplier getSnapshotIndexFromStateMachine, RaftProperties prop) {
try {
- return initRaftLog(getMemberId(), server, getStorage(), this::setRaftConf,
- getSnapshotIndexFromStateMachine, prop);
+ return initRaftLog(getMemberId(), server, storage, this::setRaftConf, getSnapshotIndexFromStateMachine, prop);
} catch (IOException e) {
throw new IllegalStateException(getMemberId() + ": Failed to initRaftLog.", e);
}
@@ -205,6 +259,10 @@ class ServerState {
return leaderId;
}
+ boolean hasLeader() {
+ return leaderId != null;
+ }
+
/**
* Become a candidate and start leader election
*/
@@ -376,7 +434,7 @@ class ServerState {
void updateConfiguration(List<LogEntryProto> entries) {
if (entries != null && !entries.isEmpty()) {
configurationManager.removeConfigurations(entries.get(0).getIndex());
- entries.forEach(this::setRaftConf);
+ entries.stream().forEach(this::setRaftConf);
}
}
@@ -397,45 +455,29 @@ class ServerState {
getStateMachineUpdater().reloadStateMachine();
}
- void close() {
- try {
- if (stateMachineUpdater.isInitialized()) {
- getStateMachineUpdater().stopAndJoin();
- }
- } catch (Throwable e) {
- LOG.warn(getMemberId() + ": Failed to join " + getStateMachineUpdater(), e);
- }
- LOG.info("{}: applyIndex: {}", getMemberId(), getLastAppliedIndex());
-
+ @Override
+ public void close() throws IOException {
try {
- if (log.isInitialized()) {
- getLog().close();
- }
- } catch (Throwable e) {
- LOG.warn(getMemberId() + ": Failed to close raft log " + getLog(), e);
+ getStateMachineUpdater().stopAndJoin();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("{}: Interrupted when joining stateMachineUpdater", getMemberId(), e);
}
+ LOG.info("{}: closes. applyIndex: {}", getMemberId(), getLastAppliedIndex());
- try {
- if (raftStorage.isInitialized()) {
- getStorage().close();
- }
- } catch (Throwable e) {
- LOG.warn(getMemberId() + ": Failed to close raft storage " + getStorage(), e);
- }
+ getLog().close();
+ storage.close();
}
- RaftStorageImpl getStorage() {
- if (!raftStorage.isInitialized()) {
- throw new IllegalStateException(getMemberId() + ": raftStorage is uninitialized.");
- }
- return raftStorage.getUnchecked();
+ RaftStorage getStorage() {
+ return storage;
}
void installSnapshot(InstallSnapshotRequestProto request) throws IOException {
// TODO: verify that we need to install the snapshot
StateMachine sm = server.getStateMachine();
sm.pause(); // pause the SM to prepare for install snapshot
- snapshotManager.installSnapshot(request, sm, getStorage().getStorageDir());
+ snapshotManager.installSnapshot(sm, request);
updateInstalledSnapshotIndex(TermIndex.valueOf(request.getSnapshotChunk().getTermIndex()));
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java
index a86cdf56b..a7fa13b53 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectoryImpl.java
@@ -19,7 +19,6 @@ package org.apache.ratis.server.storage;
import org.apache.ratis.util.AtomicFileOutputStream;
import org.apache.ratis.util.FileUtils;
-import org.apache.ratis.util.SizeInBytes;
import java.io.File;
import java.io.IOException;
@@ -52,13 +51,13 @@ class RaftStorageDirectoryImpl implements RaftStorageDirectory {
private final File root; // root directory
private FileLock lock; // storage lock
- private final SizeInBytes freeSpaceMin;
+ private long freeSpaceMin;
/**
* Constructor
* @param dir directory corresponding to the storage
*/
- RaftStorageDirectoryImpl(File dir, SizeInBytes freeSpaceMin) {
+ RaftStorageDirectoryImpl(File dir, long freeSpaceMin) {
this.root = dir;
this.lock = null;
this.freeSpaceMin = freeSpaceMin;
@@ -178,7 +177,7 @@ class RaftStorageDirectoryImpl implements RaftStorageDirectory {
}
private boolean hasEnoughSpace() {
- return root.getFreeSpace() >= freeSpaceMin.getSize();
+ return root.getFreeSpace() > freeSpaceMin;
}
/**
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java
index 56972c3f7..6513efd30 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageImpl.java
@@ -23,7 +23,6 @@ import org.apache.ratis.server.RaftServerConfigKeys.Log.CorruptionPolicy;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.storage.RaftStorageDirectoryImpl.StorageState;
import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.SizeInBytes;
import java.io.File;
import java.io.FileNotFoundException;
@@ -35,16 +34,17 @@ import java.util.Optional;
/** The storage of a {@link org.apache.ratis.server.RaftServer}. */
public class RaftStorageImpl implements RaftStorage {
+
+ // TODO support multiple storage directories
private final RaftStorageDirectoryImpl storageDir;
private final StartupOption startupOption;
private final CorruptionPolicy logCorruptionPolicy;
private volatile StorageState state = StorageState.UNINITIALIZED;
private volatile RaftStorageMetadataFileImpl metaFile;
- RaftStorageImpl(File dir, SizeInBytes freeSpaceMin, StartupOption option, CorruptionPolicy logCorruptionPolicy) {
- LOG.debug("newRaftStorage: {}, freeSpaceMin={}, option={}, logCorruptionPolicy={}",
- dir, freeSpaceMin, option, logCorruptionPolicy);
- this.storageDir = new RaftStorageDirectoryImpl(dir, freeSpaceMin);
+ RaftStorageImpl(File dir, CorruptionPolicy logCorruptionPolicy, StartupOption option,
+ long storageFeeSpaceMin) {
+ this.storageDir = new RaftStorageDirectoryImpl(dir, storageFeeSpaceMin);
this.logCorruptionPolicy = Optional.ofNullable(logCorruptionPolicy).orElseGet(CorruptionPolicy::getDefault);
this.startupOption = option;
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
index 17294b572..aaa62a783 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
@@ -54,17 +54,21 @@ public class SnapshotManager {
private static final String CORRUPT = ".corrupt";
private static final String TMP = ".tmp";
+ private final RaftStorage storage;
private final RaftPeerId selfId;
private final Supplier<MessageDigest> digester = JavaUtils.memoize(MD5Hash::getDigester);
- SnapshotManager(RaftPeerId selfId) {
+ public SnapshotManager(RaftStorage storage, RaftPeerId selfId) {
+ this.storage = storage;
this.selfId = selfId;
}
- public void installSnapshot(InstallSnapshotRequestProto request, StateMachine stateMachine, RaftStorageDirectory dir)
- throws IOException {
- final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk();
+ public void installSnapshot(StateMachine stateMachine,
+ InstallSnapshotRequestProto request) throws IOException {
+ final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest =
+ request.getSnapshotChunk();
final long lastIncludedIndex = snapshotChunkRequest.getTermIndex().getIndex();
+ final RaftStorageDirectory dir = storage.getStorageDir();
// create a unique temporary directory
final File tmpDir = new File(dir.getTmpDir(), "snapshot-" + snapshotChunkRequest.getRequestId());
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java
index 865e2b2b1..aeff60148 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java
@@ -17,158 +17,38 @@
*/
package org.apache.ratis.server.storage;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.RaftServerConfigKeys.Log;
-import org.apache.ratis.server.storage.RaftStorage.StartupOption;
-import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.TimeDuration;
import java.io.File;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-import static org.apache.ratis.server.RaftServer.Division.LOG;
+import java.util.concurrent.TimeUnit;
public final class StorageImplUtils {
- private static final File[] EMPTY_FILE_ARRAY = {};
private StorageImplUtils() {
//Never constructed
}
- public static SnapshotManager newSnapshotManager(RaftPeerId id) {
- return new SnapshotManager(id);
- }
-
/** Create a {@link RaftStorageImpl}. */
- public static RaftStorageImpl newRaftStorage(File dir, SizeInBytes freeSpaceMin,
- RaftStorage.StartupOption option, Log.CorruptionPolicy logCorruptionPolicy) {
- return new RaftStorageImpl(dir, freeSpaceMin, option, logCorruptionPolicy);
- }
-
- /** @return a list of existing subdirectories matching the given storage directory name from the given volumes. */
- static List<File> getExistingStorageSubs(List<File> volumes, String targetSubDir, Map<File, Integer> dirsPerVol) {
- return volumes.stream().flatMap(volume -> {
- final File[] dirs = Optional.ofNullable(volume.listFiles()).orElse(EMPTY_FILE_ARRAY);
- Optional.ofNullable(dirsPerVol).ifPresent(map -> map.put(volume, dirs.length));
- return Arrays.stream(dirs);
- }).filter(dir -> targetSubDir.equals(dir.getName()))
- .collect(Collectors.toList());
- }
-
- /** @return a volume with the min dirs. */
- static File chooseMin(Map<File, Integer> dirsPerVol) throws IOException {
- return dirsPerVol.entrySet().stream()
- .min(Map.Entry.comparingByValue())
- .map(Map.Entry::getKey)
- .orElseThrow(() -> new IOException("No storage directory found."));
- }
-
- /**
- * Choose a {@link RaftStorage} for the given storage directory name from the given configuration properties
- * and then try to call {@link RaftStorage#initialize()}.
- * <p />
- * {@link StartupOption#FORMAT}:
- * - When there are more than one existing directories, throw an exception.
- * - When there is an existing directory, throw an exception.
- * - When there is no existing directory, try to initialize a new directory from the list specified
- * in the configuration properties until a directory succeeded or all directories failed.
- * <p />
- * {@link StartupOption#RECOVER}:
- * - When there are more than one existing directories, throw an exception.
- * - When there is an existing directory, if it fails to initialize, throw an exception but not try a new directory.
- * - When there is no existing directory, throw an exception.
- *
- * @param storageDirName the storage directory name
- * @param option the startup option
- * @param properties the configuration properties
- * @return the chosen storage, which is initialized successfully.
- */
- public static RaftStorageImpl initRaftStorage(String storageDirName, StartupOption option,
- RaftProperties properties) throws IOException {
- return new Op(storageDirName, option, properties).run();
- }
-
- private static class Op {
- private final String storageDirName;
- private final StartupOption option;
-
- private final SizeInBytes freeSpaceMin;
- private final Log.CorruptionPolicy logCorruptionPolicy;
- private final List<File> dirsInConf;
-
- private final List<File> existingSubs;
- private final Map<File, Integer> dirsPerVol = new HashMap<>();
-
- Op(String storageDirName, StartupOption option, RaftProperties properties) {
- this.storageDirName = storageDirName;
- this.option = option;
-
- this.freeSpaceMin = RaftServerConfigKeys.storageFreeSpaceMin(properties);
- this.logCorruptionPolicy = RaftServerConfigKeys.Log.corruptionPolicy(properties);
- this.dirsInConf = RaftServerConfigKeys.storageDir(properties);
-
- this.existingSubs = getExistingStorageSubs(dirsInConf, this.storageDirName, dirsPerVol);
- }
-
- RaftStorageImpl run() throws IOException {
- if (option == StartupOption.FORMAT) {
- return format();
- } else if (option == StartupOption.RECOVER) {
- return recover();
- } else {
- throw new IllegalArgumentException("Illegal option: " + option);
- }
- }
-
- private RaftStorageImpl format() throws IOException {
- if (!existingSubs.isEmpty()) {
- throw new IOException("Failed to " + option + ": One or more existing directories found " + existingSubs
- + " for " + storageDirName);
- }
-
- for (; !dirsPerVol.isEmpty(); ) {
- final File vol = chooseMin(dirsPerVol);
- final File dir = new File(vol, storageDirName);
- try {
- final RaftStorageImpl storage = newRaftStorage(dir, freeSpaceMin, StartupOption.FORMAT, logCorruptionPolicy);
- storage.initialize();
- return storage;
- } catch (Throwable e) {
- LOG.warn("Failed to initialize a new directory " + dir.getAbsolutePath(), e);
- dirsPerVol.remove(vol);
- }
- }
- throw new IOException("Failed to FORMAT a new storage dir for " + storageDirName + " from " + dirsInConf);
- }
-
- private RaftStorageImpl recover() throws IOException {
- final int size = existingSubs.size();
- if (size > 1) {
- throw new IOException("Failed to " + option + ": More than one existing directories found "
- + existingSubs + " for " + storageDirName);
- } else if (size == 0) {
- throw new IOException("Failed to " + option + ": Storage directory not found for "
- + storageDirName + " from " + dirsInConf);
- }
-
- final File dir = existingSubs.get(0);
- try {
- final RaftStorageImpl storage = newRaftStorage(dir, freeSpaceMin, StartupOption.RECOVER, logCorruptionPolicy);
- storage.initialize();
- return storage;
- } catch (Throwable e) {
- if (e instanceof IOException) {
- throw e;
- }
- throw new IOException("Failed to initialize the existing directory " + dir.getAbsolutePath(), e);
- }
+ public static RaftStorageImpl newRaftStorage(File dir, RaftServerConfigKeys.Log.CorruptionPolicy logCorruptionPolicy,
+ RaftStorage.StartupOption option, long storageFeeSpaceMin) throws IOException {
+ RaftStorage.LOG.debug("newRaftStorage: {}, {}, {}, {}",dir, logCorruptionPolicy, option, storageFeeSpaceMin);
+
+ final TimeDuration sleepTime = TimeDuration.valueOf(500, TimeUnit.MILLISECONDS);
+ final RaftStorageImpl raftStorage;
+ try {
+ // attempt multiple times to avoid temporary bind exception
+ raftStorage = JavaUtils.attemptRepeatedly(
+ () -> new RaftStorageImpl(dir, logCorruptionPolicy, option, storageFeeSpaceMin),
+ 5, sleepTime, "new RaftStorageImpl", RaftStorage.LOG);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw IOUtils.toInterruptedIOException(
+ "Interrupted when creating RaftStorage " + dir, e);
}
+ return raftStorage;
}
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index dfed96952..1f4047524 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -40,7 +40,6 @@ import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.ServerFactory;
import org.apache.ratis.server.raftlog.memory.MemoryRaftLog;
import org.apache.ratis.server.raftlog.RaftLog;
-import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.util.CollectionUtils;
@@ -386,9 +385,8 @@ public abstract class MiniRaftCluster implements Closeable {
}
final RaftProperties prop = new RaftProperties(properties);
RaftServerConfigKeys.setStorageDir(prop, Collections.singletonList(dir));
- return ServerImplUtils.newRaftServer(id, group,
- format? RaftStorage.StartupOption.FORMAT: RaftStorage.StartupOption.RECOVER,
- getStateMachineRegistry(prop), prop, setPropertiesAndInitParameters(id, group, prop));
+ return ServerImplUtils.newRaftServer(id, group, getStateMachineRegistry(prop), prop,
+ setPropertiesAndInitParameters(id, group, prop));
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index 5e6353f92..e09ca19d1 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -365,7 +365,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
// start the two new peers
LOG.info("Start new peers");
for (RaftPeer np : c1.newPeers) {
- cluster.restartServer(np.getId(), true);
+ cluster.restartServer(np.getId(), false);
}
Assert.assertTrue(client.admin().setConfiguration(c1.allPeersInNewConf).isSuccess());
}
@@ -504,7 +504,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
LOG.info("start new peers: {}", Arrays.asList(c1.newPeers));
for (RaftPeer np : c1.newPeers) {
- cluster.restartServer(np.getId(), true);
+ cluster.restartServer(np.getId(), false);
}
try {
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestStorageImplUtils.java b/ratis-test/src/test/java/org/apache/ratis/server/impl/TestServerState.java
similarity index 82%
rename from ratis-test/src/test/java/org/apache/ratis/server/storage/TestStorageImplUtils.java
rename to ratis-test/src/test/java/org/apache/ratis/server/impl/TestServerState.java
index ff38a6bd9..75aef53a1 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestStorageImplUtils.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/impl/TestServerState.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.server.storage;
+package org.apache.ratis.server.impl;
import org.apache.ratis.BaseTest;
import org.apache.ratis.util.FileUtils;
@@ -28,9 +28,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
@@ -39,20 +37,13 @@ import java.util.stream.IntStream;
/**
* Test cases to verify ServerState.
*/
-public class TestStorageImplUtils {
+public class TestServerState {
private static final Supplier<File> rootTestDir = JavaUtils.memoize(
() -> new File(BaseTest.getRootTestDir(),
- JavaUtils.getClassSimpleName(TestStorageImplUtils.class) +
+ JavaUtils.getClassSimpleName(TestServerState.class) +
Integer.toHexString(ThreadLocalRandom.current().nextInt())));
- static File chooseNewStorageDir(List<File> volumes, String sub) throws IOException {
- final Map<File, Integer> numDirPerVolume = new HashMap<>();
- StorageImplUtils.getExistingStorageSubs(volumes, sub, numDirPerVolume);
- final File vol = StorageImplUtils.chooseMin(numDirPerVolume);
- return new File(vol, sub);
- }
-
@AfterClass
public static void tearDown() throws IOException {
FileUtils.deleteFully(rootTestDir.get());
@@ -69,8 +60,8 @@ public class TestStorageImplUtils {
List<File> directories = Collections.singletonList(testDir);
String subDirOne = UUID.randomUUID().toString();
String subDirTwo = UUID.randomUUID().toString();
- final File storageDirOne = chooseNewStorageDir(directories, subDirOne);
- final File storageDirTwo = chooseNewStorageDir(directories, subDirTwo);
+ File storageDirOne = ServerState.chooseStorageDir(directories, subDirOne);
+ File storageDirTwo = ServerState.chooseStorageDir(directories, subDirTwo);
File expectedOne = new File(testDir, subDirOne);
File expectedTwo = new File(testDir, subDirTwo);
Assert.assertEquals(expectedOne.getCanonicalPath(),
@@ -109,7 +100,7 @@ public class TestStorageImplUtils {
}
});
String subDir = UUID.randomUUID().toString();
- final File storageDirectory = chooseNewStorageDir(directories, subDir);
+ File storageDirectory = ServerState.chooseStorageDir(directories, subDir);
File expected = new File(directories.get(6), subDir);
Assert.assertEquals(expected.getCanonicalPath(),
storageDirectory.getCanonicalPath());
@@ -117,15 +108,19 @@ public class TestStorageImplUtils {
/**
* Tests choosing of storage directory when only no volume is configured.
+ *
+ * @throws IOException in case of exception.
*/
@Test
public void testChooseStorageDirWithNoVolume() {
try {
- chooseNewStorageDir(Collections.emptyList(), UUID.randomUUID().toString());
+ ServerState.chooseStorageDir(
+ Collections.emptyList(), UUID.randomUUID().toString());
Assert.fail();
} catch (IOException ex) {
String expectedErrMsg = "No storage directory found.";
Assert.assertEquals(expectedErrMsg, ex.getMessage());
}
}
+
}
\ No newline at end of file
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
index 7027bd8ea..ffc8de597 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
@@ -114,7 +114,7 @@ public class TestRaftStorage extends BaseTest {
*/
@Test
public void testStorage() throws Exception {
- final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(storageDir, SizeInBytes.ZERO);
+ final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(storageDir, 0);
try {
StorageState state = sd.analyzeStorage(true);
Assert.assertEquals(StorageState.NOT_FORMATTED, state);
@@ -171,7 +171,7 @@ public class TestRaftStorage extends BaseTest {
Assert.assertEquals(StorageState.NORMAL, storage.getState());
storage.close();
- final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(storageDir, SizeInBytes.ZERO);
+ final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(storageDir, 0);
File metaFile = sd.getMetaFile();
FileUtils.move(metaFile, sd.getMetaTmpFile());
@@ -286,7 +286,7 @@ public class TestRaftStorage extends BaseTest {
File mockStorageDir = Mockito.spy(storageDir);
Mockito.when(mockStorageDir.getFreeSpace()).thenReturn(100L); // 100B
- final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(mockStorageDir, SizeInBytes.valueOf("100M"));
+ final RaftStorageDirectoryImpl sd = new RaftStorageDirectoryImpl(mockStorageDir, 104857600); // 100MB
StorageState state = sd.analyzeStorage(false);
Assert.assertEquals(StorageState.NO_SPACE, state);
}