You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/16 23:38:33 UTC
[incubator-ratis] branch master updated: RATIS-1244. Move MetaFile
and RaftLog.Metadata to ratis-server-api. (#358)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 12c4303 RATIS-1244. Move MetaFile and RaftLog.Metadata to ratis-server-api. (#358)
12c4303 is described below
commit 12c43030c98dfac15d114f5da7abc51335ec0085
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Dec 17 07:36:59 2020 +0800
RATIS-1244. Move MetaFile and RaftLog.Metadata to ratis-server-api. (#358)
---
.../java/org/apache/ratis/protocol/RaftPeerId.java | 5 +-
.../apache/ratis/util/AtomicFileOutputStream.java | 89 ++++++++------
.../java/org/apache/ratis/util/AtomicUtils.java | 29 ++++-
.../ratis/server/storage/RaftStorageMetadata.java | 82 +++++++++++++
.../server/storage/RaftStorageMetadataFile.java | 32 +++++
.../org/apache/ratis/server/impl/ServerState.java | 4 +-
.../org/apache/ratis/server/raftlog/RaftLog.java | 25 +---
.../ratis/server/raftlog/memory/MemoryRaftLog.java | 10 +-
.../server/raftlog/segmented/SegmentedRaftLog.java | 12 +-
.../org/apache/ratis/server/storage/MetaFile.java | 130 ---------------------
.../apache/ratis/server/storage/RaftStorage.java | 33 +++---
.../storage/RaftStorageMetadataFileImpl.java | 121 +++++++++++++++++++
.../ratis/server/storage/TestRaftStorage.java | 57 ++++-----
13 files changed, 371 insertions(+), 258 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java
index a24c435..1328e1a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeerId.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -27,6 +27,8 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* Id of Raft Peer which is globally unique.
+ *
+ * This is a value-based class.
*/
public final class RaftPeerId {
private static final Map<ByteString, RaftPeerId> BYTE_STRING_MAP = new ConcurrentHashMap<>();
@@ -51,7 +53,6 @@ public final class RaftPeerId {
private RaftPeerId(String id) {
this.idString = Objects.requireNonNull(id, "id == null");
- Preconditions.assertTrue(!id.isEmpty(), "id is an empty string.");
this.id = ByteString.copyFrom(idString, StandardCharsets.UTF_8);
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/AtomicFileOutputStream.java b/ratis-common/src/main/java/org/apache/ratis/util/AtomicFileOutputStream.java
index c2d626f..72608a6 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/AtomicFileOutputStream.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/AtomicFileOutputStream.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -25,60 +25,72 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FilterOutputStream;
-
+import java.util.concurrent.atomic.AtomicBoolean;
/**
- * A FileOutputStream that has the property that it will only show
- * up at its destination once it has been entirely written and flushed
- * to disk. While being written, it will use a .tmp suffix.
+ * A {@link FilterOutputStream} that writes to a file atomically.
+ * The output file will not show up until it has been entirely written and sync'ed to disk.
+ * It uses a temporary file when it is being written.
+ * The default temporary file has a .tmp suffix.
*
- * When the output stream is closed, it is flushed, fsynced, and
- * will be moved into place, overwriting any file that already
- * exists at that location.
+ * When the output stream is closed, it is
+ * (1) flushed
+ * (2) sync'ed, and
+ * (3) renamed/moved from the temporary file to the output file.
+ * If the output file already exists, it will be overwritten.
*
- * <b>NOTE</b>: on Windows platforms, it will not atomically
- * replace the target file - instead the target file is deleted
- * before this one is moved into place.
+ * NOTE that on Windows platforms, the output file, if it exists, is deleted
+ * before the temporary file is moved.
*/
public class AtomicFileOutputStream extends FilterOutputStream {
-
public static final String TMP_EXTENSION = ".tmp";
public static final Logger LOG = LoggerFactory.getLogger(AtomicFileOutputStream.class);
- private final File origFile;
+ private final File outFile;
private final File tmpFile;
+ private final AtomicBoolean isClosed = new AtomicBoolean();
+
+ public AtomicFileOutputStream(File outFile) throws FileNotFoundException {
+ this(outFile, new File(outFile.getParentFile(), outFile.getName() + TMP_EXTENSION));
+ }
- public AtomicFileOutputStream(File f) throws FileNotFoundException {
- // Code unfortunately must be duplicated below since we can't assign anything
- // before calling super
- super(new FileOutputStream(new File(f.getParentFile(), f.getName() + TMP_EXTENSION)));
- origFile = f.getAbsoluteFile();
- tmpFile = new File(f.getParentFile(), f.getName() + TMP_EXTENSION).getAbsoluteFile();
+ public AtomicFileOutputStream(File outFile, File tmpFile) throws FileNotFoundException {
+ super(new FileOutputStream(tmpFile));
+ this.outFile = outFile.getAbsoluteFile();
+ this.tmpFile = tmpFile.getAbsoluteFile();
+ }
+
+ public boolean isClosed() {
+ return isClosed.get();
}
@Override
public void close() throws IOException {
- boolean triedToClose = false, success = false;
+ if (!isClosed.compareAndSet(false, true)) {
+ return;
+ }
+ boolean forced = false;
+ boolean closed = false;
try {
flush();
((FileOutputStream)out).getChannel().force(true);
+ forced = true;
- triedToClose = true;
super.close();
- success = true;
- } finally {
- if (success) {
- boolean renamed = tmpFile.renameTo(origFile);
- if (!renamed) {
- // On windows, renameTo does not replace.
- if (origFile.exists() && !origFile.delete()) {
- throw new IOException("Could not delete original file " + origFile);
- }
- FileUtils.move(tmpFile, origFile);
+ closed = true;
+
+ final boolean renamed = tmpFile.renameTo(outFile);
+ if (!renamed) {
+ // On windows, renameTo does not replace.
+ if (outFile.exists() && !outFile.delete()) {
+ throw new IOException("Could not delete original file " + outFile);
}
- } else {
- if (!triedToClose) {
+ FileUtils.move(tmpFile, outFile);
+ }
+ } finally {
+ if (!closed) {
+ if (!forced) {
// If we failed when flushing, try to close it to not leak an FD
IOUtils.cleanup(LOG, out);
}
@@ -96,14 +108,17 @@ public class AtomicFileOutputStream extends FilterOutputStream {
* in writing.
*/
public void abort() {
+ if (isClosed.get()) {
+ return;
+ }
try {
super.close();
} catch (IOException ioe) {
LOG.warn("Unable to abort file " + tmpFile, ioe);
- }
- if (!tmpFile.delete()) {
- LOG.warn("Unable to delete tmp file during abort " + tmpFile);
+ } finally {
+ if (!tmpFile.delete()) {
+ LOG.warn("Unable to delete tmp file during abort " + tmpFile);
+ }
}
}
-
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/AtomicUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/AtomicUtils.java
index df71c35..8222702 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/AtomicUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/AtomicUtils.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,7 +17,10 @@
*/
package org.apache.ratis.util;
+import org.apache.ratis.util.function.CheckedFunction;
+
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Utilities related to atomic operations.
@@ -58,4 +61,28 @@ public interface AtomicUtils {
}
}
+ /**
+ * Similar to {@link AtomicReference#updateAndGet(java.util.function.UnaryOperator)}
+ * except that the update function is checked.
+ */
+ static <E, THROWABLE extends Throwable> E updateAndGet(AtomicReference<E> reference,
+ CheckedFunction<E, E, THROWABLE> update) throws THROWABLE {
+ final AtomicReference<Throwable> throwableRef = new AtomicReference<>();
+ final E updated = reference.updateAndGet(value -> {
+ try {
+ return update.apply(value);
+ } catch (Error | RuntimeException e) {
+ throw e;
+ } catch (Throwable t) {
+ throwableRef.set(t);
+ return value;
+ }
+ });
+ @SuppressWarnings("unchecked")
+ final THROWABLE t = (THROWABLE) throwableRef.get();
+ if (t != null) {
+ throw t;
+ }
+ return updated;
+ }
}
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorageMetadata.java b/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorageMetadata.java
new file mode 100644
index 0000000..911cc41
--- /dev/null
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorageMetadata.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import org.apache.ratis.proto.RaftProtos.TermIndexProto;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.util.JavaUtils;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * The metadata for a raft storage.
+ *
+ * This is a value-based class.
+ */
+public final class RaftStorageMetadata {
+ private static final RaftStorageMetadata DEFAULT = valueOf(
+ TermIndexProto.getDefaultInstance().getTerm(), RaftPeerId.valueOf(""));
+
+ public static RaftStorageMetadata getDefault() {
+ return DEFAULT;
+ }
+
+ public static RaftStorageMetadata valueOf(long term, RaftPeerId votedFor) {
+ return new RaftStorageMetadata(term, votedFor);
+ }
+
+ private final long term;
+ private final RaftPeerId votedFor;
+
+ private RaftStorageMetadata(long term, RaftPeerId votedFor) {
+ this.term = term;
+ this.votedFor = Optional.ofNullable(votedFor).orElseGet(() -> getDefault().getVotedFor());
+ }
+
+ /** @return the term. */
+ public long getTerm() {
+ return term;
+ }
+
+ /** @return the server it voted for. */
+ public RaftPeerId getVotedFor() {
+ return votedFor;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ } else if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ final RaftStorageMetadata that = (RaftStorageMetadata) obj;
+ return this.term == that.term && Objects.equals(this.votedFor, that.votedFor);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(term, votedFor);
+ }
+
+ @Override
+ public String toString() {
+ return JavaUtils.getClassSimpleName(getClass()) + "{term=" + term + ", votedFor=" + votedFor + '}';
+ }
+}
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorageMetadataFile.java b/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorageMetadataFile.java
new file mode 100644
index 0000000..c986620
--- /dev/null
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/storage/RaftStorageMetadataFile.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import java.io.IOException;
+
+/**
+ * Represent a file on disk which persistently stores the metadata of a raft storage.
+ * The file is updated atomically.
+ */
+public interface RaftStorageMetadataFile {
+ /** @return the metadata persisted in this file. */
+ RaftStorageMetadata getMetadata() throws IOException;
+
+ /** Persist the given metadata. */
+ void persist(RaftStorageMetadata newMetadata) throws IOException;
+}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 0fd65a8..806300e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -125,7 +125,7 @@ class ServerState implements Closeable {
// do not know whether the local log entries have been committed.
this.log = initRaftLog(getMemberId(), server, storage, this::setRaftConf, getSnapshotIndexFromStateMachine, prop);
- RaftLog.Metadata metadata = log.loadMetadata();
+ final RaftStorageMetadata metadata = log.loadMetadata();
currentTerm.set(metadata.getTerm());
votedFor = metadata.getVotedFor();
@@ -231,7 +231,7 @@ class ServerState implements Closeable {
}
void persistMetadata() throws IOException {
- this.log.writeMetadata(currentTerm.get(), votedFor);
+ log.writeMetadata(RaftStorageMetadata.valueOf(currentTerm.get(), votedFor));
}
/**
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
index a401107..1e864a8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
@@ -20,12 +20,12 @@ package org.apache.ratis.server.raftlog;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.protocol.RaftGroupMemberId;
-import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.metrics.RaftLogMetrics;
import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.RaftStorageMetadata;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.AutoCloseableLock;
@@ -437,10 +437,9 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
* in the RaftPeer's lock. Later we can use an IO task queue to enforce the
* order.
*/
- public abstract void writeMetadata(long term, RaftPeerId votedFor)
- throws IOException;
+ public abstract void writeMetadata(RaftStorageMetadata metadata) throws IOException;
- public abstract Metadata loadMetadata() throws IOException;
+ public abstract RaftStorageMetadata loadMetadata() throws IOException;
public abstract CompletableFuture<Long> syncWithSnapshot(long lastSnapshotIndex);
@@ -451,24 +450,6 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
return getName() + ":" + state + ":c" + getLastCommittedIndex();
}
- public static class Metadata {
- private final RaftPeerId votedFor;
- private final long term;
-
- public Metadata(RaftPeerId votedFor, long term) {
- this.votedFor = votedFor;
- this.term = term;
- }
-
- public RaftPeerId getVotedFor() {
- return votedFor;
- }
-
- public long getTerm() {
- return term;
- }
- }
-
public AutoCloseableLock readLock() {
return AutoCloseableLock.acquire(lock.readLock());
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index 3f19f5b..e80f16c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -19,10 +19,10 @@ package org.apache.ratis.server.raftlog.memory;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftGroupMemberId;
-import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.storage.RaftStorageMetadata;
import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.Preconditions;
@@ -70,7 +70,7 @@ public class MemoryRaftLog extends RaftLog {
}
private final EntryList entries = new EntryList();
- private final AtomicReference<Metadata> metadata = new AtomicReference<>(new Metadata(null, 0));
+ private final AtomicReference<RaftStorageMetadata> metadata = new AtomicReference<>(RaftStorageMetadata.getDefault());
public MemoryRaftLog(RaftGroupMemberId memberId,
LongSupplier commitIndexSupplier,
@@ -209,12 +209,12 @@ public class MemoryRaftLog extends RaftLog {
}
@Override
- public void writeMetadata(long term, RaftPeerId votedFor) {
- metadata.set(new Metadata(votedFor, term));
+ public void writeMetadata(RaftStorageMetadata newMetadata) {
+ metadata.set(newMetadata);
}
@Override
- public Metadata loadMetadata() {
+ public RaftStorageMetadata loadMetadata() {
return metadata.get();
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index df3fe61..604eae5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -19,13 +19,13 @@ package org.apache.ratis.server.raftlog.segmented;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftGroupMemberId;
-import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogIOException;
+import org.apache.ratis.server.storage.RaftStorageMetadata;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.server.raftlog.segmented.LogSegment.LogRecord;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.TruncateIndices;
@@ -462,15 +462,13 @@ public class SegmentedRaftLog extends RaftLog {
}
@Override
- public void writeMetadata(long term, RaftPeerId votedFor) throws IOException {
- storage.getMetaFile().set(term, votedFor != null ? votedFor.toString() : null);
+ public void writeMetadata(RaftStorageMetadata metadata) throws IOException {
+ storage.getMetaFile().persist(metadata);
}
@Override
- public Metadata loadMetadata() throws IOException {
- return new Metadata(
- RaftPeerId.getRaftPeerId(storage.getMetaFile().getVotedFor()),
- storage.getMetaFile().getTerm());
+ public RaftStorageMetadata loadMetadata() throws IOException {
+ return storage.getMetaFile().getMetadata();
}
@Override
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/MetaFile.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/MetaFile.java
deleted file mode 100644
index 119ccbb..0000000
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/MetaFile.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.server.storage;
-
-import org.apache.ratis.util.AtomicFileOutputStream;
-import org.apache.ratis.util.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-import java.nio.charset.StandardCharsets;
-import java.util.Properties;
-
-/**
- * Class that represents a file on disk which persistently stores
- * a single <code>long</code> value. The file is updated atomically
- * and durably (i.e fsynced).
- */
-public class MetaFile {
- private static final Logger LOG = LoggerFactory.getLogger(MetaFile.class);
- private static final String TERM_KEY = "term";
- private static final String VOTEDFOR_KEY = "votedFor";
- static final long DEFAULT_TERM = 0;
- static final String EMPTY_VOTEFOR = "";
-
- private final File file;
- private boolean loaded = false;
- private long term;
- private String votedFor;
-
- MetaFile(File file) {
- this.file = file;
- term = DEFAULT_TERM;
- votedFor = EMPTY_VOTEFOR;
- }
-
- boolean exists() {
- return this.file.exists();
- }
-
- public long getTerm() throws IOException {
- if (!loaded) {
- readFile();
- loaded = true;
- }
- return term;
- }
-
- public String getVotedFor() throws IOException {
- if (!loaded) {
- readFile();
- loaded = true;
- }
- return votedFor;
- }
-
- public void set(long newTerm, String newVotedFor) throws IOException {
- newVotedFor = newVotedFor == null ? EMPTY_VOTEFOR : newVotedFor;
- if (!loaded || (newTerm != term || !newVotedFor.equals(votedFor))) {
- writeFile(newTerm, newVotedFor);
- }
- term = newTerm;
- votedFor = newVotedFor;
- loaded = true;
- }
-
- /**
- * Atomically write the given term and votedFor information to the given file,
- * including fsyncing.
- *
- * @throws IOException if the file cannot be written
- */
- void writeFile(long givenTerm, String votedForInfo) throws IOException {
- AtomicFileOutputStream fos = new AtomicFileOutputStream(file);
- Properties properties = new Properties();
- properties.setProperty(TERM_KEY, Long.toString(givenTerm));
- properties.setProperty(VOTEDFOR_KEY, votedForInfo);
- try {
- properties.store(
- new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8)), "");
- fos.close();
- fos = null;
- } finally {
- if (fos != null) {
- fos.abort();
- }
- }
- }
-
- void readFile() throws IOException {
- term = DEFAULT_TERM;
- votedFor = EMPTY_VOTEFOR;
- if (file.exists()) {
- BufferedReader br = new BufferedReader(
- new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8));
- try {
- Properties properties = new Properties();
- properties.load(br);
- if (properties.containsKey(TERM_KEY) &&
- properties.containsKey(VOTEDFOR_KEY)) {
- term = Long.parseLong((String) properties.get(TERM_KEY));
- votedFor = (String) properties.get(VOTEDFOR_KEY);
- } else {
- throw new IOException("Corrupted term/votedFor properties: "
- + properties);
- }
- } catch(IOException e) {
- LOG.warn("Cannot load term/votedFor properties from {}", file, e);
- throw e;
- } finally {
- IOUtils.cleanup(LOG, br);
- }
- }
- }
-}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
index ea2d8d9..9a5eb08 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@@ -41,14 +42,14 @@ public class RaftStorage implements Closeable {
public enum StartupOption {
/** Format the storage. */
- FORMAT;
+ FORMAT
}
// TODO support multiple storage directories
private final RaftStorageDirectory storageDir;
private final StorageState state;
private final CorruptionPolicy logCorruptionPolicy;
- private volatile MetaFile metaFile;
+ private volatile RaftStorageMetadataFileImpl metaFile;
public RaftStorage(File dir, CorruptionPolicy logCorruptionPolicy) throws IOException {
this(dir, logCorruptionPolicy, null);
@@ -85,29 +86,25 @@ public class RaftStorage implements Closeable {
private void format() throws IOException {
storageDir.clearDirectory();
- metaFile = writeMetaFile(MetaFile.DEFAULT_TERM, MetaFile.EMPTY_VOTEFOR);
- LOG.info("Storage directory " + storageDir.getRoot()
- + " has been successfully formatted.");
- }
-
- private MetaFile writeMetaFile(long term, String votedFor) throws IOException {
- MetaFile mFile = new MetaFile(storageDir.getMetaFile());
- mFile.set(term, votedFor);
- return mFile;
+ metaFile = new RaftStorageMetadataFileImpl(storageDir.getMetaFile());
+ metaFile.persist(RaftStorageMetadata.getDefault());
+ LOG.info("Storage directory " + storageDir.getRoot() + " has been successfully formatted.");
}
private void cleanMetaTmpFile() throws IOException {
Files.deleteIfExists(storageDir.getMetaTmpFile().toPath());
}
- private StorageState analyzeAndRecoverStorage(boolean toLock)
- throws IOException {
+ private StorageState analyzeAndRecoverStorage(boolean toLock) throws IOException {
StorageState storageState = storageDir.analyzeStorage(toLock);
if (storageState == StorageState.NORMAL) {
- metaFile = new MetaFile(storageDir.getMetaFile());
- Preconditions.assertTrue(metaFile.exists(),
- () -> "Meta file " + metaFile + " does not exists.");
- metaFile.readFile();
+ final File f = storageDir.getMetaFile();
+ if (!f.exists()) {
+ throw new FileNotFoundException("Metadata file " + f + " does not exists.");
+ }
+ metaFile = new RaftStorageMetadataFileImpl(f);
+ final RaftStorageMetadata metadata = metaFile.getMetadata();
+ LOG.info("Read {} from {}", metadata, f);
// Existence of raft-meta.tmp means the change of votedFor/term has not
// been committed. Thus we should delete the tmp file.
cleanMetaTmpFile();
@@ -130,7 +127,7 @@ public class RaftStorage implements Closeable {
storageDir.unlock();
}
- public MetaFile getMetaFile() {
+ public RaftStorageMetadataFile getMetaFile() {
return metaFile;
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageMetadataFileImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageMetadataFileImpl.java
new file mode 100644
index 0000000..7c01da7
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageMetadataFileImpl.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.util.AtomicFileOutputStream;
+import org.apache.ratis.util.AtomicUtils;
+import org.apache.ratis.util.JavaUtils;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Represent a file on disk which persistently stores the metadata of a raft storage.
+ * The file is updated atomically.
+ */
+class RaftStorageMetadataFileImpl implements RaftStorageMetadataFile {
+ private static final String TERM_KEY = "term";
+ private static final String VOTED_FOR_KEY = "votedFor";
+
+ private final File file;
+ private final AtomicReference<RaftStorageMetadata> metadata = new AtomicReference<>();
+
+ RaftStorageMetadataFileImpl(File file) {
+ this.file = file;
+ }
+
+ @Override
+ public RaftStorageMetadata getMetadata() throws IOException {
+ return AtomicUtils.updateAndGet(metadata, value -> value != null? value: load(file));
+ }
+
+ @Override
+ public void persist(RaftStorageMetadata newMetadata) throws IOException {
+ AtomicUtils.updateAndGet(metadata,
+ old -> Objects.equals(old, newMetadata)? old: atomicWrite(newMetadata, file));
+ }
+
+ @Override
+ public String toString() {
+ return JavaUtils.getClassSimpleName(RaftStorageMetadataFile.class) + ":" + file;
+ }
+
+ /**
+ * Atomically write the given term and votedFor information to the given file,
+ * including fsyncing.
+ *
+ * @throws IOException if the file cannot be written
+ */
+ static RaftStorageMetadata atomicWrite(RaftStorageMetadata metadata, File file) throws IOException {
+ final Properties properties = new Properties();
+ properties.setProperty(TERM_KEY, Long.toString(metadata.getTerm()));
+ properties.setProperty(VOTED_FOR_KEY, metadata.getVotedFor().toString());
+
+ try(BufferedWriter out = new BufferedWriter(
+ new OutputStreamWriter(new AtomicFileOutputStream(file), StandardCharsets.UTF_8))) {
+ properties.store(out, "");
+ }
+ return metadata;
+ }
+
+ static Object getValue(String key, Properties properties) throws IOException {
+ return Optional.ofNullable(properties.getProperty(key)).orElseThrow(
+ () -> new IOException("'" + key + "' not found in properties: " + properties));
+ }
+
+ static long getTerm(Properties properties) throws IOException {
+ try {
+ return Long.parseLong((String) getValue(TERM_KEY, properties));
+ } catch (Exception e) {
+ throw new IOException("Failed to parse '" + TERM_KEY + "' from properties: " + properties, e);
+ }
+ }
+
+ static RaftPeerId getVotedFor(Properties properties) throws IOException {
+ try {
+ return RaftPeerId.valueOf((String) getValue(VOTED_FOR_KEY, properties));
+ } catch (Exception e) {
+ throw new IOException("Failed to parse '" + VOTED_FOR_KEY + "' from properties: " + properties, e);
+ }
+ }
+
+ static RaftStorageMetadata load(File file) throws IOException {
+ if (!file.exists()) {
+ return RaftStorageMetadata.getDefault();
+ }
+ try(BufferedReader br = new BufferedReader(new InputStreamReader(
+ new FileInputStream(file), StandardCharsets.UTF_8))) {
+ Properties properties = new Properties();
+ properties.load(br);
+ return RaftStorageMetadata.valueOf(getTerm(properties), getVotedFor(properties));
+ } catch (IOException e) {
+ throw new IOException("Failed to load " + file, e);
+ }
+ }
+}
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
index 2816d3b..ce9e0b7 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server.storage;
import static org.apache.ratis.statemachine.impl.SimpleStateMachineStorage.SNAPSHOT_REGEX;
import org.apache.ratis.BaseTest;
+import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftStorageDirectory.StorageState;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
@@ -37,6 +38,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
/**
@@ -107,50 +109,37 @@ public class TestRaftStorage extends BaseTest {
storage.close();
Assert.assertEquals(StorageState.NORMAL, sd.analyzeStorage(false));
- File m = sd.getMetaFile();
- Assert.assertTrue(m.exists());
- MetaFile metaFile = new MetaFile(m);
- Assert.assertEquals(MetaFile.DEFAULT_TERM, metaFile.getTerm());
- Assert.assertEquals(MetaFile.EMPTY_VOTEFOR, metaFile.getVotedFor());
-
- metaFile.set(123, "peer1");
- metaFile.readFile();
- Assert.assertEquals(123, metaFile.getTerm());
- Assert.assertEquals("peer1", metaFile.getVotedFor());
-
- MetaFile metaFile2 = new MetaFile(m);
- Assert.assertFalse((Boolean) Whitebox.getInternalState(metaFile2, "loaded"));
- Assert.assertEquals(123, metaFile.getTerm());
- Assert.assertEquals("peer1", metaFile.getVotedFor());
+ assertMetadataFile(sd.getMetaFile());
// test format
storage = formatRaftStorage(storageDir);
Assert.assertEquals(StorageState.NORMAL, storage.getState());
- metaFile = new MetaFile(sd.getMetaFile());
- Assert.assertEquals(MetaFile.DEFAULT_TERM, metaFile.getTerm());
- Assert.assertEquals(MetaFile.EMPTY_VOTEFOR, metaFile.getVotedFor());
+ final RaftStorageMetadataFile metaFile = new RaftStorageMetadataFileImpl(sd.getMetaFile());
+ Assert.assertEquals(RaftStorageMetadata.getDefault(), metaFile.getMetadata());
storage.close();
}
+ static void assertMetadataFile(File m) throws Exception {
+ Assert.assertTrue(m.exists());
+ final RaftStorageMetadataFile metaFile = new RaftStorageMetadataFileImpl(m);
+ Assert.assertEquals(RaftStorageMetadata.getDefault(), metaFile.getMetadata());
+
+ final RaftPeerId peer1 = RaftPeerId.valueOf("peer1");
+ final RaftStorageMetadata metadata = RaftStorageMetadata.valueOf(123, peer1);
+ metaFile.persist(metadata);
+ Assert.assertEquals(metadata.getTerm(), 123);
+ Assert.assertEquals(metadata.getVotedFor(), peer1);
+ Assert.assertEquals(metadata, metaFile.getMetadata());
+
+ final RaftStorageMetadataFile metaFile2 = new RaftStorageMetadataFileImpl(m);
+ Assert.assertNull(((AtomicReference<?>) Whitebox.getInternalState(metaFile2, "metadata")).get());
+ Assert.assertEquals(metadata, metaFile2.getMetadata());
+ }
+
@Test
public void testMetaFile() throws Exception {
RaftStorage storage = formatRaftStorage(storageDir);
- File m = storage.getStorageDir().getMetaFile();
- Assert.assertTrue(m.exists());
- MetaFile metaFile = new MetaFile(m);
- Assert.assertEquals(MetaFile.DEFAULT_TERM, metaFile.getTerm());
- Assert.assertEquals(MetaFile.EMPTY_VOTEFOR, metaFile.getVotedFor());
-
- metaFile.set(123, "peer1");
- metaFile.readFile();
- Assert.assertEquals(123, metaFile.getTerm());
- Assert.assertEquals("peer1", metaFile.getVotedFor());
-
- MetaFile metaFile2 = new MetaFile(m);
- Assert.assertFalse((Boolean) Whitebox.getInternalState(metaFile2, "loaded"));
- Assert.assertEquals(123, metaFile.getTerm());
- Assert.assertEquals("peer1", metaFile.getVotedFor());
-
+ assertMetadataFile(storage.getStorageDir().getMetaFile());
storage.close();
}