You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/01/31 21:17:22 UTC
[37/54] [abbrv] incubator-ratis git commit: Renamed the packages from
raft to ratis in preperation for Apache Incubation - Moved all java packages
from org.apache.raft to org.apache.ratis. - Moved native package to
org_apache_ratis, and native lib to l
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java
deleted file mode 100644
index 4b8c442..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java
+++ /dev/null
@@ -1,813 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.impl;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.protocol.*;
-import org.apache.raft.server.RaftServer;
-import org.apache.raft.server.RaftServerConfigKeys;
-import org.apache.raft.server.RaftServerRpc;
-import org.apache.raft.server.protocol.TermIndex;
-import org.apache.raft.server.storage.FileInfo;
-import org.apache.raft.shaded.proto.RaftProtos.*;
-import org.apache.raft.statemachine.SnapshotInfo;
-import org.apache.raft.statemachine.StateMachine;
-import org.apache.raft.statemachine.TransactionContext;
-import org.apache.raft.util.CodeInjectionForTesting;
-import org.apache.raft.util.LifeCycle;
-import org.apache.raft.util.ProtoUtils;
-import org.apache.raft.util.RaftUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.OptionalLong;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-
-import static org.apache.raft.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*;
-import static org.apache.raft.util.LifeCycle.State.*;
-
-public class RaftServerImpl implements RaftServer {
- public static final Logger LOG = LoggerFactory.getLogger(RaftServerImpl.class);
-
- private static final String CLASS_NAME = RaftServerImpl.class.getSimpleName();
- static final String REQUEST_VOTE = CLASS_NAME + ".requestVote";
- static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries";
- static final String INSTALL_SNAPSHOT = CLASS_NAME + ".installSnapshot";
-
-
- /** Role of raft peer */
- enum Role {
- LEADER, CANDIDATE, FOLLOWER
- }
-
- private final int minTimeoutMs;
- private final int maxTimeoutMs;
-
- private final LifeCycle lifeCycle;
- private final ServerState state;
- private final StateMachine stateMachine;
- private final RaftProperties properties;
- private volatile Role role;
-
- /** used when the peer is follower, to monitor election timeout */
- private volatile FollowerState heartbeatMonitor;
-
- /** used when the peer is candidate, to request votes from other peers */
- private volatile LeaderElection electionDaemon;
-
- /** used when the peer is leader */
- private volatile LeaderState leaderState;
-
- private RaftServerRpc serverRpc;
-
- private final LogAppenderFactory appenderFactory;
-
- public RaftServerImpl(String id, RaftConfiguration raftConf,
- RaftProperties properties, StateMachine stateMachine) throws IOException {
- this.lifeCycle = new LifeCycle(id);
- minTimeoutMs = properties.getInt(
- RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_KEY,
- RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT);
- maxTimeoutMs = properties.getInt(
- RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_KEY,
- RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_DEFAULT);
- Preconditions.checkArgument(maxTimeoutMs > minTimeoutMs,
- "max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs);
- this.properties = properties;
- this.stateMachine = stateMachine;
- this.state = new ServerState(id, raftConf, properties, this, stateMachine);
- appenderFactory = initAppenderFactory();
- }
-
- int getMinTimeoutMs() {
- return minTimeoutMs;
- }
-
- int getMaxTimeoutMs() {
- return maxTimeoutMs;
- }
-
- int getRandomTimeoutMs() {
- return RaftUtils.getRandomBetween(minTimeoutMs, maxTimeoutMs);
- }
-
- @Override
- public StateMachine getStateMachine() {
- return this.stateMachine;
- }
-
- public LogAppenderFactory getLogAppenderFactory() {
- return appenderFactory;
- }
-
- private LogAppenderFactory initAppenderFactory() {
- Class<? extends LogAppenderFactory> factoryClass = properties.getClass(
- RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY,
- RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_DEFAULT,
- LogAppenderFactory.class);
- return RaftUtils.newInstance(factoryClass);
- }
-
- /**
- * Used by tests to set initial raft configuration with correct port bindings.
- */
- @VisibleForTesting
- public void setInitialConf(RaftConfiguration conf) {
- this.state.setInitialConf(conf);
- }
-
- @Override
- public void setServerRpc(RaftServerRpc serverRpc) {
- this.serverRpc = serverRpc;
- // add peers into rpc service
- RaftConfiguration conf = getRaftConf();
- if (conf != null) {
- serverRpc.addPeers(conf.getPeers());
- }
- }
-
- public RaftServerRpc getServerRpc() {
- return serverRpc;
- }
-
- @Override
- public void start() {
- lifeCycle.transition(STARTING);
- state.start();
- RaftConfiguration conf = getRaftConf();
- if (conf != null && conf.contains(getId())) {
- LOG.debug("{} starts as a follower", getId());
- startAsFollower();
- } else {
- LOG.debug("{} starts with initializing state", getId());
- startInitializing();
- }
- }
-
- /**
- * The peer belongs to the current configuration, should start as a follower
- */
- private void startAsFollower() {
- role = Role.FOLLOWER;
- heartbeatMonitor = new FollowerState(this);
- heartbeatMonitor.start();
-
- serverRpc.start();
- lifeCycle.transition(RUNNING);
- }
-
- /**
- * The peer does not have any configuration (maybe it will later be included
- * in some configuration). Start still as a follower but will not vote or
- * start election.
- */
- private void startInitializing() {
- role = Role.FOLLOWER;
- // do not start heartbeatMonitoring
- serverRpc.start();
- }
-
- public ServerState getState() {
- return this.state;
- }
-
- @Override
- public String getId() {
- return getState().getSelfId();
- }
-
- RaftConfiguration getRaftConf() {
- return getState().getRaftConf();
- }
-
- @Override
- public void close() {
- lifeCycle.checkStateAndClose(() -> {
- try {
- shutdownHeartbeatMonitor();
- shutdownElectionDaemon();
- shutdownLeaderState();
-
- serverRpc.close();
- state.close();
- } catch (Exception ignored) {
- LOG.warn("Failed to kill " + state.getSelfId(), ignored);
- }
- });
- }
-
- @VisibleForTesting
- public boolean isAlive() {
- return !lifeCycle.getCurrentState().isOneOf(CLOSING, CLOSED);
- }
-
- public boolean isFollower() {
- return role == Role.FOLLOWER;
- }
-
- public boolean isCandidate() {
- return role == Role.CANDIDATE;
- }
-
- public boolean isLeader() {
- return role == Role.LEADER;
- }
-
- /**
- * Change the server state to Follower if necessary
- * @param newTerm The new term.
- * @param sync We will call {@link ServerState#persistMetadata()} if this is
- * set to true and term/votedFor get updated.
- * @return if the term/votedFor should be updated to the new term
- * @throws IOException if term/votedFor persistence failed.
- */
- synchronized boolean changeToFollower(long newTerm, boolean sync)
- throws IOException {
- final Role old = role;
- role = Role.FOLLOWER;
-
- boolean metadataUpdated = false;
- if (newTerm > state.getCurrentTerm()) {
- state.setCurrentTerm(newTerm);
- state.resetLeaderAndVotedFor();
- metadataUpdated = true;
- }
-
- if (old == Role.LEADER) {
- assert leaderState != null;
- shutdownLeaderState();
- } else if (old == Role.CANDIDATE) {
- shutdownElectionDaemon();
- }
-
- if (old != Role.FOLLOWER) {
- heartbeatMonitor = new FollowerState(this);
- heartbeatMonitor.start();
- }
-
- if (metadataUpdated && sync) {
- state.persistMetadata();
- }
- return metadataUpdated;
- }
-
- private synchronized void shutdownLeaderState() {
- final LeaderState leader = leaderState;
- if (leader != null) {
- leader.stop();
- }
- leaderState = null;
- // TODO: make sure that StateMachineUpdater has applied all transactions that have context
- }
-
- private void shutdownElectionDaemon() {
- final LeaderElection election = electionDaemon;
- if (election != null) {
- election.stopRunning();
- // no need to interrupt the election thread
- }
- electionDaemon = null;
- }
-
- synchronized void changeToLeader() {
- Preconditions.checkState(isCandidate());
- shutdownElectionDaemon();
- role = Role.LEADER;
- state.becomeLeader();
- // start sending AppendEntries RPC to followers
- leaderState = new LeaderState(this, properties);
- leaderState.start();
- }
-
- private void shutdownHeartbeatMonitor() {
- final FollowerState hm = heartbeatMonitor;
- if (hm != null) {
- hm.stopRunning();
- hm.interrupt();
- }
- heartbeatMonitor = null;
- }
-
- synchronized void changeToCandidate() {
- Preconditions.checkState(isFollower());
- shutdownHeartbeatMonitor();
- role = Role.CANDIDATE;
- // start election
- electionDaemon = new LeaderElection(this);
- electionDaemon.start();
- }
-
- @Override
- public String toString() {
- return role + " " + state + " " + lifeCycle.getCurrentState();
- }
-
- /**
- * @return null if the server is in leader state.
- */
- private CompletableFuture<RaftClientReply> checkLeaderState(
- RaftClientRequest request) {
- if (!isLeader()) {
- NotLeaderException exception = generateNotLeaderException();
- CompletableFuture<RaftClientReply> future = new CompletableFuture<>();
- future.complete(new RaftClientReply(request, exception));
- return future;
- }
- return null;
- }
-
- NotLeaderException generateNotLeaderException() {
- if (lifeCycle.getCurrentState() != RUNNING) {
- return new NotLeaderException(getId(), null, null);
- }
- String leaderId = state.getLeaderId();
- if (leaderId == null || leaderId.equals(state.getSelfId())) {
- // No idea about who is the current leader. Or the peer is the current
- // leader, but it is about to step down
- RaftPeer suggestedLeader = state.getRaftConf()
- .getRandomPeer(state.getSelfId());
- leaderId = suggestedLeader == null ? null : suggestedLeader.getId();
- }
- RaftConfiguration conf = getRaftConf();
- Collection<RaftPeer> peers = conf.getPeers();
- return new NotLeaderException(getId(), conf.getPeer(leaderId),
- peers.toArray(new RaftPeer[peers.size()]));
- }
-
- /**
- * Handle a normal update request from client.
- */
- private CompletableFuture<RaftClientReply> appendTransaction(
- RaftClientRequest request, TransactionContext entry)
- throws RaftException {
- LOG.debug("{}: receive client request({})", getId(), request);
- lifeCycle.assertCurrentState(RUNNING);
- CompletableFuture<RaftClientReply> reply;
-
- final PendingRequest pending;
- synchronized (this) {
- reply = checkLeaderState(request);
- if (reply != null) {
- return reply;
- }
-
- // append the message to its local log
- final long entryIndex;
- try {
- entryIndex = state.applyLog(entry);
- } catch (IOException e) {
- throw new RaftException(e);
- }
-
- // put the request into the pending queue
- pending = leaderState.addPendingRequest(entryIndex, request, entry);
- leaderState.notifySenders();
- }
- return pending.getFuture();
- }
-
- @Override
- public CompletableFuture<RaftClientReply> submitClientRequestAsync(
- RaftClientRequest request) throws IOException {
- // first check the server's leader state
- CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
- if (reply != null) {
- return reply;
- }
-
- // let the state machine handle read-only request from client
- if (request.isReadOnly()) {
- // TODO: We might not be the leader anymore by the time this completes. See the RAFT paper,
- // section 8 (last part)
- return stateMachine.query(request);
- }
-
- // TODO: this client request will not be added to pending requests
- // until later which means that any failure in between will leave partial state in the
- // state machine. We should call cancelTransaction() for failed requests
- TransactionContext entry = stateMachine.startTransaction(request);
- if (entry.getException().isPresent()) {
- throw RaftUtils.asIOException(entry.getException().get());
- }
-
- return appendTransaction(request, entry);
- }
-
- @Override
- public RaftClientReply submitClientRequest(RaftClientRequest request)
- throws IOException {
- return waitForReply(getId(), request, submitClientRequestAsync(request));
- }
-
- private static RaftClientReply waitForReply(String id, RaftClientRequest request,
- CompletableFuture<RaftClientReply> future) throws IOException {
- try {
- return future.get();
- } catch (InterruptedException e) {
- final String s = id + ": Interrupted when waiting for reply, request=" + request;
- LOG.info(s, e);
- throw RaftUtils.toInterruptedIOException(s, e);
- } catch (ExecutionException e) {
- final Throwable cause = e.getCause();
- if (cause == null) {
- throw new IOException(e);
- }
- if (cause instanceof NotLeaderException) {
- return new RaftClientReply(request, (NotLeaderException)cause);
- } else {
- throw RaftUtils.asIOException(cause);
- }
- }
- }
-
- @Override
- public RaftClientReply setConfiguration(SetConfigurationRequest request)
- throws IOException {
- return waitForReply(getId(), request, setConfigurationAsync(request));
- }
-
- /**
- * Handle a raft configuration change request from client.
- */
- @Override
- public CompletableFuture<RaftClientReply> setConfigurationAsync(
- SetConfigurationRequest request) throws IOException {
- LOG.debug("{}: receive setConfiguration({})", getId(), request);
- lifeCycle.assertCurrentState(RUNNING);
- CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
- if (reply != null) {
- return reply;
- }
-
- final RaftPeer[] peersInNewConf = request.getPeersInNewConf();
- final PendingRequest pending;
- synchronized (this) {
- reply = checkLeaderState(request);
- if (reply != null) {
- return reply;
- }
-
- final RaftConfiguration current = getRaftConf();
- // make sure there is no other raft reconfiguration in progress
- if (!current.isStable() || leaderState.inStagingState() ||
- !state.isCurrentConfCommitted()) {
- throw new ReconfigurationInProgressException(
- "Reconfiguration is already in progress: " + current);
- }
-
- // return true if the new configuration is the same with the current one
- if (current.hasNoChange(peersInNewConf)) {
- pending = leaderState.returnNoConfChange(request);
- return pending.getFuture();
- }
-
- // add new peers into the rpc service
- getServerRpc().addPeers(Arrays.asList(peersInNewConf));
- // add staging state into the leaderState
- pending = leaderState.startSetConfiguration(request);
- }
- return pending.getFuture();
- }
-
- private boolean shouldWithholdVotes() {
- return isLeader() || (isFollower() && state.hasLeader()
- && heartbeatMonitor.shouldWithholdVotes());
- }
-
- /**
- * check if the remote peer is not included in the current conf
- * and should shutdown. should shutdown if all the following stands:
- * 1. this is a leader
- * 2. current conf is stable and has been committed
- * 3. candidate id is not included in conf
- * 4. candidate's last entry's index < conf's index
- */
- private boolean shouldSendShutdown(String candidateId,
- TermIndex candidateLastEntry) {
- return isLeader()
- && getRaftConf().isStable()
- && getState().isConfCommitted()
- && !getRaftConf().containsInConf(candidateId)
- && candidateLastEntry.getIndex() < getRaftConf().getLogEntryIndex()
- && !leaderState.isBootStrappingPeer(candidateId);
- }
-
- @Override
- public RequestVoteReplyProto requestVote(RequestVoteRequestProto r)
- throws IOException {
- final String candidateId = r.getServerRequest().getRequestorId();
- return requestVote(candidateId, r.getCandidateTerm(),
- ServerProtoUtils.toTermIndex(r.getCandidateLastEntry()));
- }
-
- private RequestVoteReplyProto requestVote(String candidateId,
- long candidateTerm, TermIndex candidateLastEntry) throws IOException {
- CodeInjectionForTesting.execute(REQUEST_VOTE, getId(),
- candidateId, candidateTerm, candidateLastEntry);
- LOG.debug("{}: receive requestVote({}, {}, {})",
- getId(), candidateId, candidateTerm, candidateLastEntry);
- lifeCycle.assertCurrentState(RUNNING);
-
- boolean voteGranted = false;
- boolean shouldShutdown = false;
- final RequestVoteReplyProto reply;
- synchronized (this) {
- if (shouldWithholdVotes()) {
- LOG.info("{} Withhold vote from server {} with term {}. " +
- "This server:{}, last rpc time from leader {} is {}", getId(),
- candidateId, candidateTerm, this, this.getState().getLeaderId(),
- (isFollower() ? heartbeatMonitor.getLastRpcTime() : -1));
- } else if (state.recognizeCandidate(candidateId, candidateTerm)) {
- boolean termUpdated = changeToFollower(candidateTerm, false);
- // see Section 5.4.1 Election restriction
- if (state.isLogUpToDate(candidateLastEntry)) {
- heartbeatMonitor.updateLastRpcTime(false);
- state.grantVote(candidateId);
- voteGranted = true;
- }
- if (termUpdated || voteGranted) {
- state.persistMetadata(); // sync metafile
- }
- }
- if (!voteGranted && shouldSendShutdown(candidateId, candidateLastEntry)) {
- shouldShutdown = true;
- }
- reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId, getId(),
- voteGranted, state.getCurrentTerm(), shouldShutdown);
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} replies to vote request: {}. Peer's state: {}",
- getId(), ProtoUtils.toString(reply), state);
- }
- }
- return reply;
- }
-
- private void validateEntries(long expectedTerm, TermIndex previous,
- LogEntryProto... entries) {
- if (entries != null && entries.length > 0) {
- final long index0 = entries[0].getIndex();
-
- if (previous == null || previous.getTerm() == 0) {
- Preconditions.checkArgument(index0 == 0,
- "Unexpected Index: previous is null but entries[%s].getIndex()=%s",
- 0, index0);
- } else {
- Preconditions.checkArgument(previous.getIndex() == index0 - 1,
- "Unexpected Index: previous is %s but entries[%s].getIndex()=%s",
- previous, 0, index0);
- }
-
- for (int i = 0; i < entries.length; i++) {
- final long t = entries[i].getTerm();
- Preconditions.checkArgument(expectedTerm >= t,
- "Unexpected Term: entries[%s].getTerm()=%s but expectedTerm=%s",
- i, t, expectedTerm);
-
- final long indexi = entries[i].getIndex();
- Preconditions.checkArgument(indexi == index0 + i,
- "Unexpected Index: entries[%s].getIndex()=%s but entries[0].getIndex()=%s",
- i, indexi, index0);
- }
- }
- }
-
- @Override
- public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r)
- throws IOException {
- // TODO avoid converting list to array
- final LogEntryProto[] entries = r.getEntriesList()
- .toArray(new LogEntryProto[r.getEntriesCount()]);
- final TermIndex previous = r.hasPreviousLog() ?
- ServerProtoUtils.toTermIndex(r.getPreviousLog()) : null;
- return appendEntries(r.getServerRequest().getRequestorId(),
- r.getLeaderTerm(), previous, r.getLeaderCommit(), r.getInitializing(),
- entries);
- }
-
- private AppendEntriesReplyProto appendEntries(String leaderId, long leaderTerm,
- TermIndex previous, long leaderCommit, boolean initializing,
- LogEntryProto... entries) throws IOException {
- CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(),
- leaderId, leaderTerm, previous, leaderCommit, initializing, entries);
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: receive appendEntries({}, {}, {}, {}, {}, {})", getId(),
- leaderId, leaderTerm, previous, leaderCommit, initializing,
- ServerProtoUtils.toString(entries));
- }
- lifeCycle.assertCurrentState(STARTING, RUNNING);
-
- try {
- validateEntries(leaderTerm, previous, entries);
- } catch (IllegalArgumentException e) {
- throw new IOException(e);
- }
-
- final long currentTerm;
- long nextIndex = state.getLog().getNextIndex();
- synchronized (this) {
- final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
- currentTerm = state.getCurrentTerm();
- if (!recognized) {
- final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto(
- leaderId, getId(), currentTerm, nextIndex, NOT_LEADER);
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: do not recognize leader. Reply: {}",
- getId(), ProtoUtils.toString(reply));
- }
- return reply;
- }
- changeToFollower(leaderTerm, true);
- state.setLeader(leaderId);
-
- if (!initializing && lifeCycle.compareAndTransition(STARTING, RUNNING)) {
- heartbeatMonitor = new FollowerState(this);
- heartbeatMonitor.start();
- }
- if (lifeCycle.getCurrentState() == RUNNING) {
- heartbeatMonitor.updateLastRpcTime(true);
- }
-
- // We need to check if "previous" is in the local peer. Note that it is
- // possible that "previous" is covered by the latest snapshot: e.g.,
- // it's possible there's no log entries outside of the latest snapshot.
- // However, it is not possible that "previous" index is smaller than the
- // last index included in snapshot. This is because indices <= snapshot's
- // last index should have been committed.
- if (previous != null && !containPrevious(previous)) {
- final AppendEntriesReplyProto reply =
- ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getId(),
- currentTerm, Math.min(nextIndex, previous.getIndex()), INCONSISTENCY);
- LOG.debug("{}: inconsistency entries. Leader previous:{}, Reply:{}",
- getId(), previous, ServerProtoUtils.toString(reply));
- return reply;
- }
-
- state.getLog().append(entries);
- state.updateConfiguration(entries);
- state.updateStatemachine(leaderCommit, currentTerm);
- }
- if (entries != null && entries.length > 0) {
- try {
- state.getLog().logSync();
- } catch (InterruptedException e) {
- throw new InterruptedIOException("logSync got interrupted");
- }
- nextIndex = entries[entries.length - 1].getIndex() + 1;
- }
- synchronized (this) {
- if (lifeCycle.getCurrentState() == RUNNING && isFollower()
- && getState().getCurrentTerm() == currentTerm) {
- // reset election timer to avoid punishing the leader for our own
- // long disk writes
- heartbeatMonitor.updateLastRpcTime(false);
- }
- }
- final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto(
- leaderId, getId(), currentTerm, nextIndex, SUCCESS);
- LOG.debug("{}: succeeded to handle AppendEntries. Reply: {}", getId(),
- ServerProtoUtils.toString(reply));
- return reply;
- }
-
- private boolean containPrevious(TermIndex previous) {
- LOG.debug("{}: prev:{}, latestSnapshot:{}, getLatestInstalledSnapshot:{}",
- getId(), previous, state.getLatestSnapshot(), state.getLatestInstalledSnapshot());
- return state.getLog().contains(previous)
- || (state.getLatestSnapshot() != null
- && state.getLatestSnapshot().getTermIndex().equals(previous))
- || (state.getLatestInstalledSnapshot() != null)
- && state.getLatestInstalledSnapshot().equals(previous);
- }
-
- @Override
- public InstallSnapshotReplyProto installSnapshot(
- InstallSnapshotRequestProto request) throws IOException {
- final String leaderId = request.getServerRequest().getRequestorId();
- CodeInjectionForTesting.execute(INSTALL_SNAPSHOT, getId(), leaderId, request);
- LOG.debug("{}: receive installSnapshot({})", getId(), request);
-
- lifeCycle.assertCurrentState(STARTING, RUNNING);
-
- final long currentTerm;
- final long leaderTerm = request.getLeaderTerm();
- final TermIndex lastTermIndex = ServerProtoUtils.toTermIndex(
- request.getTermIndex());
- final long lastIncludedIndex = lastTermIndex.getIndex();
- synchronized (this) {
- final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
- currentTerm = state.getCurrentTerm();
- if (!recognized) {
- final InstallSnapshotReplyProto reply = ServerProtoUtils
- .toInstallSnapshotReplyProto(leaderId, getId(), currentTerm,
- request.getRequestIndex(), InstallSnapshotResult.NOT_LEADER);
- LOG.debug("{}: do not recognize leader for installing snapshot." +
- " Reply: {}", getId(), reply);
- return reply;
- }
- changeToFollower(leaderTerm, true);
- state.setLeader(leaderId);
-
- if (lifeCycle.getCurrentState() == RUNNING) {
- heartbeatMonitor.updateLastRpcTime(true);
- }
-
- // Check and append the snapshot chunk. We simply put this in lock
- // considering a follower peer requiring a snapshot installation does not
- // have a lot of requests
- Preconditions.checkState(
- state.getLog().getNextIndex() <= lastIncludedIndex,
- "%s log's next id is %s, last included index in snapshot is %s",
- getId(), state.getLog().getNextIndex(), lastIncludedIndex);
-
- //TODO: We should only update State with installed snapshot once the request is done.
- state.installSnapshot(request);
-
- // update the committed index
- // re-load the state machine if this is the last chunk
- if (request.getDone()) {
- state.reloadStateMachine(lastIncludedIndex, leaderTerm);
- }
- if (lifeCycle.getCurrentState() == RUNNING) {
- heartbeatMonitor.updateLastRpcTime(false);
- }
- }
- if (request.getDone()) {
- LOG.info("{}: successfully install the whole snapshot-{}", getId(),
- lastIncludedIndex);
- }
- return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getId(),
- currentTerm, request.getRequestIndex(), InstallSnapshotResult.SUCCESS);
- }
-
- AppendEntriesRequestProto createAppendEntriesRequest(long leaderTerm,
- String targetId, TermIndex previous, List<LogEntryProto> entries,
- boolean initializing) {
- return ServerProtoUtils.toAppendEntriesRequestProto(getId(), targetId,
- leaderTerm, entries, state.getLog().getLastCommittedIndex(),
- initializing, previous);
- }
-
- synchronized InstallSnapshotRequestProto createInstallSnapshotRequest(
- String targetId, String requestId, int requestIndex, SnapshotInfo snapshot,
- List<FileChunkProto> chunks, boolean done) {
- OptionalLong totalSize = snapshot.getFiles().stream()
- .mapToLong(FileInfo::getFileSize).reduce(Long::sum);
- assert totalSize.isPresent();
- return ServerProtoUtils.toInstallSnapshotRequestProto(getId(), targetId,
- requestId, requestIndex, state.getCurrentTerm(), snapshot.getTermIndex(),
- chunks, totalSize.getAsLong(), done);
- }
-
- synchronized RequestVoteRequestProto createRequestVoteRequest(String targetId,
- long term, TermIndex lastEntry) {
- return ServerProtoUtils.toRequestVoteRequestProto(getId(), targetId, term,
- lastEntry);
- }
-
- public synchronized void submitLocalSyncEvent() {
- if (isLeader() && leaderState != null) {
- leaderState.submitUpdateStateEvent(LeaderState.UPDATE_COMMIT_EVENT);
- }
- }
-
- synchronized void replyPendingRequest(long logIndex,
- CompletableFuture<Message> message) {
- if (isLeader() && leaderState != null) { // is leader and is running
- leaderState.replyPendingRequest(logIndex, message);
- }
- }
-
- TransactionContext getTransactionContext(long index) {
- if (leaderState != null) { // is leader and is running
- return leaderState.getTransactionContext(index);
- }
- return null;
- }
-
- public RaftProperties getProperties() {
- return this.properties;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/impl/ServerProtoUtils.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/ServerProtoUtils.java b/raft-server/src/main/java/org/apache/raft/server/impl/ServerProtoUtils.java
deleted file mode 100644
index e30b979..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/impl/ServerProtoUtils.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.impl;
-
-import org.apache.raft.client.impl.ClientProtoUtils;
-import org.apache.raft.server.protocol.TermIndex;
-import org.apache.raft.shaded.proto.RaftProtos.*;
-import org.apache.raft.util.ProtoUtils;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
-import static org.apache.raft.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
-import static org.apache.raft.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.SUCCESS;
-
-/** Server proto utilities for internal use. */
-public class ServerProtoUtils {
- public static TermIndex toTermIndex(TermIndexProto p) {
- return p == null? null: TermIndex.newTermIndex(p.getTerm(), p.getIndex());
- }
-
- public static TermIndexProto toTermIndexProto(TermIndex ti) {
- return ti == null? null: TermIndexProto.newBuilder()
- .setTerm(ti.getTerm())
- .setIndex(ti.getIndex())
- .build();
- }
-
- public static TermIndex toTermIndex(LogEntryProto entry) {
- return entry == null ? null :
- TermIndex.newTermIndex(entry.getTerm(), entry.getIndex());
- }
-
- public static String toString(LogEntryProto... entries) {
- return entries == null? "null"
- : entries.length == 0 ? "[]"
- : entries.length == 1? "" + toTermIndex(entries[0])
- : "" + Arrays.stream(entries).map(ServerProtoUtils::toTermIndex)
- .collect(Collectors.toList());
- }
-
- public static String toString(AppendEntriesReplyProto reply) {
- return toString(reply.getServerReply()) + "," + reply.getResult()
- + ",nextIndex:" + reply.getNextIndex() + ",term:" + reply.getTerm();
- }
-
- private static String toString(RaftRpcReplyProto reply) {
- return reply.getRequestorId() + "->" + reply.getReplyId() + ","
- + reply.getSuccess();
- }
-
- public static RaftConfigurationProto toRaftConfigurationProto(
- RaftConfiguration conf) {
- return RaftConfigurationProto.newBuilder()
- .addAllPeers(ProtoUtils.toRaftPeerProtos(conf.getPeersInConf()))
- .addAllOldPeers(ProtoUtils.toRaftPeerProtos(conf.getPeersInOldConf()))
- .build();
- }
-
- public static RaftConfiguration toRaftConfiguration(
- long index, RaftConfigurationProto proto) {
- final RaftConfiguration.Builder b = RaftConfiguration.newBuilder()
- .setConf(ProtoUtils.toRaftPeerArray(proto.getPeersList()))
- .setLogEntryIndex(index);
- if (proto.getOldPeersCount() > 0) {
- b.setOldConf(ProtoUtils.toRaftPeerArray(proto.getOldPeersList()));
- }
- return b.build();
- }
-
- public static LogEntryProto toLogEntryProto(
- RaftConfiguration conf, long term, long index) {
- return LogEntryProto.newBuilder()
- .setTerm(term)
- .setIndex(index)
- .setConfigurationEntry(toRaftConfigurationProto(conf))
- .build();
- }
-
- public static RequestVoteReplyProto toRequestVoteReplyProto(
- String requestorId, String replyId, boolean success, long term,
- boolean shouldShutdown) {
- final RequestVoteReplyProto.Builder b = RequestVoteReplyProto.newBuilder();
- b.setServerReply(ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId, replyId,
- DEFAULT_SEQNUM, success))
- .setTerm(term)
- .setShouldShutdown(shouldShutdown);
- return b.build();
- }
-
- public static RequestVoteRequestProto toRequestVoteRequestProto(
- String requestorId, String replyId, long term, TermIndex lastEntry) {
- final RequestVoteRequestProto.Builder b = RequestVoteRequestProto.newBuilder()
- .setServerRequest(
- ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, replyId, DEFAULT_SEQNUM))
- .setCandidateTerm(term);
- if (lastEntry != null) {
- b.setCandidateLastEntry(toTermIndexProto(lastEntry));
- }
- return b.build();
- }
-
- public static InstallSnapshotReplyProto toInstallSnapshotReplyProto(
- String requestorId, String replyId, long term, int requestIndex,
- InstallSnapshotResult result) {
- final RaftRpcReplyProto.Builder rb = ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId,
- replyId, DEFAULT_SEQNUM, result == InstallSnapshotResult.SUCCESS);
- final InstallSnapshotReplyProto.Builder builder = InstallSnapshotReplyProto
- .newBuilder().setServerReply(rb).setTerm(term).setResult(result)
- .setRequestIndex(requestIndex);
- return builder.build();
- }
-
- public static InstallSnapshotRequestProto toInstallSnapshotRequestProto(
- String requestorId, String replyId, String requestId, int requestIndex,
- long term, TermIndex lastTermIndex, List<FileChunkProto> chunks,
- long totalSize, boolean done) {
- return InstallSnapshotRequestProto.newBuilder()
- .setServerRequest(
- ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, replyId, DEFAULT_SEQNUM))
- .setRequestId(requestId)
- .setRequestIndex(requestIndex)
- // .setRaftConfiguration() TODO: save and pass RaftConfiguration
- .setLeaderTerm(term)
- .setTermIndex(toTermIndexProto(lastTermIndex))
- .addAllFileChunks(chunks)
- .setTotalSize(totalSize)
- .setDone(done).build();
- }
-
- public static AppendEntriesReplyProto toAppendEntriesReplyProto(
- String requestorId, String replyId, long term,
- long nextIndex, AppendEntriesReplyProto.AppendResult appendResult) {
- RaftRpcReplyProto.Builder rb = ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId,
- replyId, DEFAULT_SEQNUM, appendResult == SUCCESS);
- final AppendEntriesReplyProto.Builder b = AppendEntriesReplyProto.newBuilder();
- b.setServerReply(rb).setTerm(term).setNextIndex(nextIndex)
- .setResult(appendResult);
- return b.build();
- }
-
- public static AppendEntriesRequestProto toAppendEntriesRequestProto(
- String requestorId, String replyId, long leaderTerm,
- List<LogEntryProto> entries, long leaderCommit, boolean initializing,
- TermIndex previous) {
- final AppendEntriesRequestProto.Builder b = AppendEntriesRequestProto
- .newBuilder()
- .setServerRequest(
- ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, replyId, DEFAULT_SEQNUM))
- .setLeaderTerm(leaderTerm)
- .setLeaderCommit(leaderCommit)
- .setInitializing(initializing);
- if (entries != null && !entries.isEmpty()) {
- b.addAllEntries(entries);
- }
-
- if (previous != null) {
- b.setPreviousLog(toTermIndexProto(previous));
- }
- return b.build();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java b/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java
deleted file mode 100644
index 8611101..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/impl/ServerState.java
+++ /dev/null
@@ -1,345 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.impl;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.server.protocol.TermIndex;
-import org.apache.raft.server.storage.*;
-import org.apache.raft.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.raft.statemachine.SnapshotInfo;
-import org.apache.raft.statemachine.StateMachine;
-import org.apache.raft.statemachine.TransactionContext;
-import org.apache.raft.util.ProtoUtils;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-import static org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_USE_MEMORY_LOG_DEFAULT;
-import static org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_USE_MEMORY_LOG_KEY;
-
-/**
- * Common states of a raft peer. Protected by RaftServer's lock.
- */
-public class ServerState implements Closeable {
- private final String selfId;
- private final RaftServerImpl server;
- /** Raft log */
- private final RaftLog log;
- /** Raft configuration */
- private final ConfigurationManager configurationManager;
- /** The thread that applies committed log entries to the state machine */
- private final StateMachineUpdater stateMachineUpdater;
- /** local storage for log and snapshot */
- private final RaftStorage storage;
- private final SnapshotManager snapshotManager;
-
- /**
- * Latest term server has seen. initialized to 0 on first boot, increases
- * monotonically.
- */
- private long currentTerm;
- /**
- * The server ID of the leader for this term. Null means either there is
- * no leader for this term yet or this server does not know who it is yet.
- */
- private String leaderId;
- /**
- * Candidate that this peer granted vote for in current term (or null if none)
- */
- private String votedFor;
-
- /**
- * Latest installed snapshot for this server. This maybe different than StateMachine's latest
- * snapshot. Once we successfully install a snapshot, the SM may not pick it up immediately.
- * Further, this will not get updated when SM does snapshots itself.
- */
- private TermIndex latestInstalledSnapshot;
-
- ServerState(String id, RaftConfiguration conf, RaftProperties prop,
- RaftServerImpl server, StateMachine stateMachine) throws IOException {
- this.selfId = id;
- this.server = server;
- configurationManager = new ConfigurationManager(conf);
- storage = new RaftStorage(prop, RaftServerConstants.StartupOption.REGULAR);
- snapshotManager = new SnapshotManager(storage, id);
-
- long lastApplied = initStatemachine(stateMachine, prop);
-
- leaderId = null;
- log = initLog(id, prop, server, lastApplied);
- RaftLog.Metadata metadata = log.loadMetadata();
- currentTerm = metadata.getTerm();
- votedFor = metadata.getVotedFor();
-
- stateMachineUpdater = new StateMachineUpdater(stateMachine, server, log,
- lastApplied, prop);
- }
-
- /**
- * Used by tests to set initial raft configuration with correct port bindings.
- */
- @VisibleForTesting
- public void setInitialConf(RaftConfiguration initialConf) {
- configurationManager.setInitialConf(initialConf);
- }
-
- private long initStatemachine(StateMachine sm, RaftProperties properties)
- throws IOException {
- sm.initialize(selfId, properties, storage);
- storage.setStateMachineStorage(sm.getStateMachineStorage());
- SnapshotInfo snapshot = sm.getLatestSnapshot();
-
- if (snapshot == null || snapshot.getTermIndex().getIndex() < 0) {
- return RaftServerConstants.INVALID_LOG_INDEX;
- }
-
- // get the raft configuration from the snapshot
- RaftConfiguration raftConf = sm.getRaftConfiguration();
- if (raftConf != null) {
- configurationManager.addConfiguration(raftConf.getLogEntryIndex(),
- raftConf);
- }
- return snapshot.getIndex();
- }
-
- void start() {
- stateMachineUpdater.start();
- }
-
- /**
- * note we do not apply log entries to the state machine here since we do not
- * know whether they have been committed.
- */
- private RaftLog initLog(String id, RaftProperties prop, RaftServerImpl server,
- long lastIndexInSnapshot) throws IOException {
- final RaftLog log;
- if (prop.getBoolean(RAFT_SERVER_USE_MEMORY_LOG_KEY,
- RAFT_SERVER_USE_MEMORY_LOG_DEFAULT)) {
- log = new MemoryRaftLog(id);
- } else {
- log = new SegmentedRaftLog(id, server, this.storage,
- lastIndexInSnapshot, prop);
- }
- log.open(configurationManager, lastIndexInSnapshot);
- return log;
- }
-
- public RaftConfiguration getRaftConf() {
- return configurationManager.getCurrent();
- }
-
- @VisibleForTesting
-
- public String getSelfId() {
- return this.selfId;
- }
-
- public long getCurrentTerm() {
- return currentTerm;
- }
-
- void setCurrentTerm(long term) {
- currentTerm = term;
- }
-
- String getLeaderId() {
- return leaderId;
- }
-
- boolean hasLeader() {
- return leaderId != null;
- }
-
- /**
- * Become a candidate and start leader election
- */
- long initElection() {
- votedFor = selfId;
- leaderId = null;
- return ++currentTerm;
- }
-
- void persistMetadata() throws IOException {
- this.log.writeMetadata(currentTerm, votedFor);
- }
-
- void resetLeaderAndVotedFor() {
- votedFor = null;
- leaderId = null;
- }
-
- /**
- * Vote for a candidate and update the local state.
- */
- void grantVote(String candidateId) {
- votedFor = candidateId;
- leaderId = null;
- }
-
- void setLeader(String leaderId) {
- this.leaderId = leaderId;
- }
-
- void becomeLeader() {
- leaderId = selfId;
- }
-
- public RaftLog getLog() {
- return log;
- }
-
- long applyLog(TransactionContext operation) throws IOException {
- return log.append(currentTerm, operation);
- }
-
- /**
- * Check if accept the leader selfId and term from the incoming AppendEntries rpc.
- * If accept, update the current state.
- * @return true if the check passes
- */
- boolean recognizeLeader(String leaderId, long leaderTerm) {
- if (leaderTerm < currentTerm) {
- return false;
- } else if (leaderTerm > currentTerm || this.leaderId == null) {
- // If the request indicates a term that is greater than the current term
- // or no leader has been set for the current term, make sure to update
- // leader and term later
- return true;
- }
- Preconditions.checkArgument(this.leaderId.equals(leaderId),
- "selfId:%s, this.leaderId:%s, received leaderId:%s",
- selfId, this.leaderId, leaderId);
- return true;
- }
-
- /**
- * Check if the candidate's term is acceptable
- */
- boolean recognizeCandidate(String candidateId,
- long candidateTerm) {
- if (candidateTerm > currentTerm) {
- return true;
- } else if (candidateTerm == currentTerm) {
- // has not voted yet or this is a retry
- return votedFor == null || votedFor.equals(candidateId);
- }
- return false;
- }
-
- boolean isLogUpToDate(TermIndex candidateLastEntry) {
- LogEntryProto lastEntry = log.getLastEntry();
- // need to take into account snapshot
- SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot();
- if (lastEntry == null && snapshot == null) {
- return true;
- } else if (candidateLastEntry == null) {
- return false;
- }
- TermIndex local = ServerProtoUtils.toTermIndex(lastEntry);
- if (local == null || (snapshot != null && snapshot.getIndex() > lastEntry.getIndex())) {
- local = snapshot.getTermIndex();
- }
- return local.compareTo(candidateLastEntry) <= 0;
- }
-
- @Override
- public String toString() {
- return selfId + ":t" + currentTerm + ", leader=" + leaderId
- + ", voted=" + votedFor + ", raftlog=" + log + ", conf=" + getRaftConf();
- }
-
- boolean isConfCommitted() {
- return getLog().getLastCommittedIndex() >=
- getRaftConf().getLogEntryIndex();
- }
-
- public void setRaftConf(long logIndex, RaftConfiguration conf) {
- configurationManager.addConfiguration(logIndex, conf);
- RaftServerImpl.LOG.info("{}: successfully update the configuration {}",
- getSelfId(), conf);
- }
-
- void updateConfiguration(LogEntryProto[] entries) {
- if (entries != null && entries.length > 0) {
- configurationManager.removeConfigurations(entries[0].getIndex());
- for (LogEntryProto entry : entries) {
- if (ProtoUtils.isConfigurationLogEntry(entry)) {
- final RaftConfiguration conf = ServerProtoUtils.toRaftConfiguration(
- entry.getIndex(), entry.getConfigurationEntry());
- configurationManager.addConfiguration(entry.getIndex(), conf);
- server.getServerRpc().addPeers(conf.getPeers());
- }
- }
- }
- }
-
- void updateStatemachine(long majorityIndex, long currentTerm) {
- log.updateLastCommitted(majorityIndex, currentTerm);
- stateMachineUpdater.notifyUpdater();
- }
-
- void reloadStateMachine(long lastIndexInSnapshot, long currentTerm)
- throws IOException {
- log.updateLastCommitted(lastIndexInSnapshot, currentTerm);
-
- stateMachineUpdater.reloadStateMachine();
- }
-
- @Override
- public void close() throws IOException {
- stateMachineUpdater.stop();
- RaftServerImpl.LOG.info("{} closes. The last applied log index is {}",
- getSelfId(), getLastAppliedIndex());
- storage.close();
- }
-
- @VisibleForTesting
- public RaftStorage getStorage() {
- return storage;
- }
-
- void installSnapshot(InstallSnapshotRequestProto request) throws IOException {
- // TODO: verify that we need to install the snapshot
- StateMachine sm = server.getStateMachine();
- sm.pause(); // pause the SM to prepare for install snapshot
- snapshotManager.installSnapshot(sm, request);
- log.syncWithSnapshot(request.getTermIndex().getIndex());
- this.latestInstalledSnapshot = ServerProtoUtils.toTermIndex(
- request.getTermIndex());
- }
-
- SnapshotInfo getLatestSnapshot() {
- return server.getStateMachine().getStateMachineStorage().getLatestSnapshot();
- }
-
- public TermIndex getLatestInstalledSnapshot() {
- return latestInstalledSnapshot;
- }
-
- @VisibleForTesting
- public long getLastAppliedIndex() {
- return stateMachineUpdater.getLastAppliedIndex();
- }
-
- boolean isCurrentConfCommitted() {
- return getRaftConf().getLogEntryIndex() <= getLog().getLastCommittedIndex();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/impl/ServerUtils.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/ServerUtils.java b/raft-server/src/main/java/org/apache/raft/server/impl/ServerUtils.java
deleted file mode 100644
index ac21386..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/impl/ServerUtils.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.impl;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.raft.server.protocol.TermIndex;
-
-/** Server utilities for internal use. */
-public class ServerUtils {
- public static TermIndex newTermIndex(long term, long index) {
- return new TermIndexImpl(term, index);
- }
-
- private static class TermIndexImpl implements TermIndex {
- private final long term;
- private final long index; //log index; first index is 1.
-
- TermIndexImpl(long term, long logIndex) {
- this.term = term;
- this.index = logIndex;
- }
-
- @Override
- public long getTerm() {
- return term;
- }
-
- @Override
- public long getIndex() {
- return index;
- }
-
- @Override
- public int compareTo(TermIndex that) {
- final int d = Long.compare(this.getTerm(), that.getTerm());
- return d != 0 ? d : Long.compare(this.getIndex(), that.getIndex());
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- } else if (obj == null || !(obj instanceof TermIndexImpl)) {
- return false;
- }
-
- final TermIndexImpl that = (TermIndexImpl) obj;
- return this.getTerm() == that.getTerm()
- && this.getIndex() == that.getIndex();
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder().append(term).append(index).hashCode();
- }
-
- private static String toString(long n) {
- return n < 0 ? "~" : "" + n;
- }
-
- @Override
- public String toString() {
- return "(t:" + toString(term) + ", i:" + toString(index) + ")";
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/impl/StateMachineUpdater.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/StateMachineUpdater.java b/raft-server/src/main/java/org/apache/raft/server/impl/StateMachineUpdater.java
deleted file mode 100644
index f85639b..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/impl/StateMachineUpdater.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.impl;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.protocol.Message;
-import org.apache.raft.server.RaftServerConfigKeys;
-import org.apache.raft.server.storage.RaftLog;
-import org.apache.raft.server.storage.RaftStorage;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.raft.statemachine.SnapshotInfo;
-import org.apache.raft.statemachine.StateMachine;
-import org.apache.raft.statemachine.TransactionContext;
-import org.apache.raft.util.Daemon;
-import org.apache.raft.util.ExitUtils;
-import org.apache.raft.util.LifeCycle;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.raft.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY;
-import static org.apache.raft.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.SMLOGENTRY;
-
-/**
- * This class tracks the log entries that have been committed in a quorum and
- * applies them to the state machine. We let a separate thread do this work
- * asynchronously so that this will not block normal raft protocol.
- *
- * If the auto log compaction is enabled, the state machine updater thread will
- * trigger a snapshot of the state machine by calling
- * {@link StateMachine#takeSnapshot} when the log size exceeds a limit.
- */
-class StateMachineUpdater implements Runnable {
- static final Logger LOG = LoggerFactory.getLogger(StateMachineUpdater.class);
-
- enum State {
- RUNNING, STOP, RELOAD
- }
-
- private final RaftProperties properties;
- private final StateMachine stateMachine;
- private final RaftServerImpl server;
- private final RaftLog raftLog;
-
- private volatile long lastAppliedIndex;
-
- private final boolean autoSnapshotEnabled;
- private final long snapshotThreshold;
- private long lastSnapshotIndex;
-
- private final Thread updater;
- private volatile State state = State.RUNNING;
-
- StateMachineUpdater(StateMachine stateMachine, RaftServerImpl server,
- RaftLog raftLog, long lastAppliedIndex, RaftProperties properties) {
- this.properties = properties;
- this.stateMachine = stateMachine;
- this.server = server;
- this.raftLog = raftLog;
-
- this.lastAppliedIndex = lastAppliedIndex;
- lastSnapshotIndex = lastAppliedIndex;
-
- autoSnapshotEnabled = properties.getBoolean(
- RaftServerConfigKeys.RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY,
- RaftServerConfigKeys.RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_DEFAULT);
- snapshotThreshold = properties.getLong(
- RaftServerConfigKeys.RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_KEY,
- RaftServerConfigKeys.RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_DEFAULT);
- updater = new Daemon(this);
- }
-
- void start() {
- updater.start();
- }
-
- void stop() {
- state = State.STOP;
- updater.interrupt();
- try {
- stateMachine.close();
- } catch (IOException ignored) {
- }
- }
-
- void reloadStateMachine() {
- state = State.RELOAD;
- notifyUpdater();
- }
-
- synchronized void notifyUpdater() {
- notifyAll();
- }
-
- @Override
- public String toString() {
- return this.getClass().getSimpleName() + "-" + raftLog.getSelfId();
- }
-
- @Override
- public void run() {
- final RaftStorage storage = server.getState().getStorage();
- while (isRunning()) {
- try {
- synchronized (this) {
- // when the peers just start, the committedIndex is initialized as 0
- // and will be updated only after the leader contacts other peers.
- // Thus initially lastAppliedIndex can be greater than lastCommitted.
- while (lastAppliedIndex >= raftLog.getLastCommittedIndex()) {
- wait();
- }
- }
-
- final long committedIndex = raftLog.getLastCommittedIndex();
- Preconditions.checkState(lastAppliedIndex < committedIndex);
-
- if (state == State.RELOAD) {
- Preconditions.checkState(stateMachine.getLifeCycleState() == LifeCycle.State.PAUSED);
-
- stateMachine.reinitialize(server.getId(), properties, storage);
-
- SnapshotInfo snapshot = stateMachine.getLatestSnapshot();
- Preconditions.checkState(snapshot != null && snapshot.getIndex() > lastAppliedIndex,
- "Snapshot: %s, lastAppliedIndex: %s", snapshot, lastAppliedIndex);
-
- lastAppliedIndex = snapshot.getIndex();
- lastSnapshotIndex = snapshot.getIndex();
- state = State.RUNNING;
- }
-
- while (lastAppliedIndex < committedIndex) {
- final LogEntryProto next = raftLog.get(lastAppliedIndex + 1);
- if (next != null) {
- if (next.getLogEntryBodyCase() == CONFIGURATIONENTRY) {
- // the reply should have already been set. only need to record
- // the new conf in the state machine.
- stateMachine.setRaftConfiguration(
- ServerProtoUtils.toRaftConfiguration(next.getIndex(),
- next.getConfigurationEntry()));
- } else if (next.getLogEntryBodyCase() == SMLOGENTRY) {
- // check whether there is a TransactionContext because we are the leader.
- TransactionContext trx = server.getTransactionContext(next.getIndex());
- if (trx == null) {
- trx = new TransactionContext(stateMachine, next);
- }
-
- // Let the StateMachine inject logic for committed transactions in sequential order.
- trx = stateMachine.applyTransactionSerial(trx);
-
- // TODO: This step can be parallelized
- CompletableFuture<Message> messageFuture =
- stateMachine.applyTransaction(trx);
- server.replyPendingRequest(next.getIndex(), messageFuture);
- }
- lastAppliedIndex++;
- } else {
- LOG.debug("{}: logEntry {} is null. There may be snapshot to load. state:{}",
- this, lastAppliedIndex + 1, state);
- break;
- }
- }
-
- // check if need to trigger a snapshot
- if (shouldTakeSnapshot(lastAppliedIndex)) {
- stateMachine.takeSnapshot();
- // TODO purge logs, including log cache. but should keep log for leader's RPCSenders
- lastSnapshotIndex = lastAppliedIndex;
- }
- } catch (InterruptedException e) {
- if (!isRunning()) {
- LOG.info("{}: the StateMachineUpdater is interrupted and will exit.", this);
- } else {
- final String s = this + ": the StateMachineUpdater is wrongly interrupted";
- ExitUtils.terminate(1, s, e, LOG);
- }
- } catch (Throwable t) {
- final String s = this + ": the StateMachineUpdater hits Throwable";
- ExitUtils.terminate(2, s, t, LOG);
- }
- }
- }
-
- private boolean isRunning() {
- return state != State.STOP;
- }
-
- private boolean shouldTakeSnapshot(long currentAppliedIndex) {
- return autoSnapshotEnabled && (state != State.RELOAD) &&
- (currentAppliedIndex - lastSnapshotIndex >= snapshotThreshold);
- }
-
- long getLastAppliedIndex() {
- return lastAppliedIndex;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/protocol/RaftServerProtocol.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/protocol/RaftServerProtocol.java b/raft-server/src/main/java/org/apache/raft/server/protocol/RaftServerProtocol.java
deleted file mode 100644
index 59e9bba..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/protocol/RaftServerProtocol.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.protocol;
-
-import org.apache.raft.shaded.proto.RaftProtos.*;
-
-import java.io.IOException;
-
-public interface RaftServerProtocol {
-
- RequestVoteReplyProto requestVote(RequestVoteRequestProto request) throws IOException;
-
- AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) throws IOException;
-
- InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/protocol/TermIndex.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/protocol/TermIndex.java b/raft-server/src/main/java/org/apache/raft/server/protocol/TermIndex.java
deleted file mode 100644
index df401d6..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/protocol/TermIndex.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.protocol;
-
-import org.apache.raft.server.impl.ServerUtils;
-
-/** The term and the log index defined in the Raft consensus algorithm. */
-public interface TermIndex extends Comparable<TermIndex> {
- /** @return the term. */
- long getTerm();
-
- /** @return the index. */
- long getIndex();
-
- /** Create a new {@link TermIndex} instance. */
- static TermIndex newTermIndex(long term, long index) {
- return ServerUtils.newTermIndex(term, index);
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/BufferedChannelBase.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/BufferedChannelBase.java b/raft-server/src/main/java/org/apache/raft/server/storage/BufferedChannelBase.java
deleted file mode 100644
index 4440be9..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/BufferedChannelBase.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.nio.channels.FileChannel;
-
-public abstract class BufferedChannelBase implements Closeable {
- protected final FileChannel fileChannel;
-
- protected BufferedChannelBase(FileChannel fc) {
- this.fileChannel = fc;
- }
-
- protected FileChannel validateAndGetFileChannel() throws IOException {
- if (!fileChannel.isOpen()) {
- throw new IOException(
- "Attempting to access a file channel that has already been closed");
- }
- return fileChannel;
- }
-
- /**
- * Get the current size of the underlying FileChannel.
- */
- public long size() throws IOException {
- return validateAndGetFileChannel().size();
- }
-
- /**
- * Get the {@link FileChannel} that this BufferedChannel wraps around.
- */
- public FileChannel getFileChannel() {
- return fileChannel;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/BufferedWriteChannel.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/BufferedWriteChannel.java b/raft-server/src/main/java/org/apache/raft/server/storage/BufferedWriteChannel.java
deleted file mode 100644
index 6c662d1..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/BufferedWriteChannel.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Provides a buffering layer in front of a FileChannel for writing.
- */
-public class BufferedWriteChannel extends BufferedChannelBase {
- // The capacity of the write buffer.
- private final int writeCapacity;
- // The position of the file channel's write pointer.
- private AtomicLong writeBufferStartPosition = new AtomicLong(0);
- // The buffer used to write operations.
- private final ByteBuffer writeBuffer;
- // The absolute position of the next write operation.
- private volatile long position;
-
- public BufferedWriteChannel(FileChannel fc, int writeCapacity)
- throws IOException {
- super(fc);
- this.writeCapacity = writeCapacity;
- this.position = fc.position();
- this.writeBufferStartPosition.set(position);
- this.writeBuffer = ByteBuffer.allocateDirect(writeCapacity);
- }
-
- /**
- * Write all the data in src to the {@link FileChannel}. Note that this function can
- * buffer or re-order writes based on the implementation. These writes will be flushed
- * to the disk only when flush() is invoked.
- *
- * @param src The source ByteBuffer which contains the data to be written.
- * @throws IOException if a write operation fails.
- */
- public void write(ByteBuffer src) throws IOException {
- int copied = 0;
- while (src.remaining() > 0) {
- int truncated = 0;
- if (writeBuffer.remaining() < src.remaining()) {
- truncated = src.remaining() - writeBuffer.remaining();
- src.limit(src.limit() - truncated);
- }
- copied += src.remaining();
- writeBuffer.put(src);
- src.limit(src.limit() + truncated);
- // if we have run out of buffer space, we should flush to the file
- if (writeBuffer.remaining() == 0) {
- flushInternal();
- }
- }
- position += copied;
- }
-
- /**
- * Write the specified byte.
- * @param b the byte to be written
- */
- public void write(int b) throws IOException {
- writeBuffer.put((byte) b);
- if (writeBuffer.remaining() == 0) {
- flushInternal();
- }
- position++;
- }
-
- public void write(byte[] b) throws IOException {
- int offset = 0;
- while (offset < b.length) {
- int toPut = Math.min(b.length - offset, writeBuffer.remaining());
- writeBuffer.put(b, offset, toPut);
- offset += toPut;
- if (writeBuffer.remaining() == 0) {
- flushInternal();
- }
- }
- position += b.length;
- }
-
- /**
- * Get the position where the next write operation will begin writing from.
- */
- public long position() {
- return position;
- }
-
- /**
- * Get the position of the file channel's write pointer.
- */
- public long getFileChannelPosition() {
- return writeBufferStartPosition.get();
- }
-
-
- /**
- * Write any data in the buffer to the file. If sync is set to true, force a
- * sync operation so that data is persisted to the disk.
- *
- * @throws IOException if the write or sync operation fails.
- */
- public void flush(boolean shouldForceWrite) throws IOException {
- synchronized (this) {
- flushInternal();
- }
- if (shouldForceWrite) {
- forceWrite(false);
- }
- }
-
- /**
- * Write any data in the buffer to the file and advance the writeBufferPosition
- * Callers are expected to synchronize appropriately
- *
- * @throws IOException if the write fails.
- */
- private void flushInternal() throws IOException {
- writeBuffer.flip();
- do {
- fileChannel.write(writeBuffer);
- } while (writeBuffer.hasRemaining());
- writeBuffer.clear();
- writeBufferStartPosition.set(fileChannel.position());
- }
-
- public long forceWrite(boolean forceMetadata) throws IOException {
- // This is the point up to which we had flushed to the file system page cache
- // before issuing this force write hence is guaranteed to be made durable by
- // the force write, any flush that happens after this may or may
- // not be flushed
- long positionForceWrite = writeBufferStartPosition.get();
- fileChannel.force(forceMetadata);
- return positionForceWrite;
- }
-
- @Override
- public void close() throws IOException {
- fileChannel.close();
- writeBuffer.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/FileInfo.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/FileInfo.java b/raft-server/src/main/java/org/apache/raft/server/storage/FileInfo.java
deleted file mode 100644
index cdeb622..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/FileInfo.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import org.apache.raft.io.MD5Hash;
-
-import java.nio.file.Path;
-
-/**
- * Metadata about a file.
- *
- * The objects of this class are immutable.
- */
-public class FileInfo {
- private final Path path;
- private final MD5Hash fileDigest;
- private final long fileSize;
-
- public FileInfo(Path path, MD5Hash fileDigest) {
- this.path = path;
- this.fileDigest = fileDigest;
- this.fileSize = path.toFile().length();
- }
-
- @Override
- public String toString() {
- return path.toString();
- }
-
- /** @return the path of the file. */
- public Path getPath() {
- return path;
- }
-
- /** @return the MD5 file digest of the file. */
- public MD5Hash getFileDigest() {
- return fileDigest;
- }
-
- /** @return the size of the file. */
- public long getFileSize() {
- return fileSize;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/LogInputStream.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/LogInputStream.java b/raft-server/src/main/java/org/apache/raft/server/storage/LogInputStream.java
deleted file mode 100644
index 95597b2..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/LogInputStream.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.raft.util.RaftUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.EOFException;
-import java.io.File;
-import java.io.IOException;
-
-import static org.apache.raft.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
-
-public class LogInputStream implements Closeable {
- static final Logger LOG = LoggerFactory.getLogger(LogInputStream.class);
-
- static class LogValidation {
- private final long validLength;
- private final long endIndex;
- private final boolean hasCorruptHeader;
-
- LogValidation(long validLength, long endIndex, boolean hasCorruptHeader) {
- this.validLength = validLength;
- this.endIndex = endIndex;
- this.hasCorruptHeader = hasCorruptHeader;
- }
-
- long getValidLength() {
- return validLength;
- }
-
- long getEndIndex() {
- return endIndex;
- }
-
- boolean hasCorruptHeader() {
- return hasCorruptHeader;
- }
- }
-
- private enum State {
- UNINIT,
- OPEN,
- CLOSED
- }
-
- private final File logFile;
- private final long startIndex;
- private final long endIndex;
- private final boolean isOpen;
- private State state = State.UNINIT;
- private LogReader reader;
-
- public LogInputStream(File log, long startIndex, long endIndex,
- boolean isOpen) {
- if (isOpen) {
- Preconditions.checkArgument(endIndex == INVALID_LOG_INDEX);
- } else {
- Preconditions.checkArgument(endIndex >= startIndex);
- }
-
- this.logFile = log;
- this.startIndex = startIndex;
- this.endIndex = endIndex;
- this.isOpen = isOpen;
- }
-
- private void init() throws IOException {
- Preconditions.checkState(state == State.UNINIT);
- try {
- reader = new LogReader(logFile);
- // read the log header
- String header = reader.readLogHeader();
- Preconditions.checkState(SegmentedRaftLog.HEADER_STR.equals(header),
- "Corrupted log header: %s", header);
- state = State.OPEN;
- } finally {
- if (reader == null) {
- state = State.CLOSED;
- }
- }
- }
-
- long getStartIndex() {
- return startIndex;
- }
-
- long getEndIndex() {
- return endIndex;
- }
-
- String getName() {
- return logFile.getName();
- }
-
- public LogEntryProto nextEntry() throws IOException {
- LogEntryProto entry = null;
- switch (state) {
- case UNINIT:
- try {
- init();
- } catch (Throwable e) {
- LOG.error("caught exception initializing " + this, e);
- Throwables.propagateIfPossible(e, IOException.class);
- }
- Preconditions.checkState(state != State.UNINIT);
- return nextEntry();
- case OPEN:
- entry = reader.readEntry();
- if (entry != null) {
- long index = entry.getIndex();
- if (!isOpen() && index >= endIndex) {
- /**
- * The end index may be derived from the segment recovery
- * process. It is possible that we still have some uncleaned garbage
- * in the end. We should skip them.
- */
- long skipAmt = logFile.length() - reader.getPos();
- if (skipAmt > 0) {
- LOG.debug("skipping {} bytes at the end of log '{}': reached" +
- " entry {} out of {}", skipAmt, getName(), index, endIndex);
- reader.skipFully(skipAmt);
- }
- }
- }
- break;
- case CLOSED:
- break; // return null
- }
- return entry;
- }
-
- long scanNextEntry() throws IOException {
- Preconditions.checkState(state == State.OPEN);
- return reader.scanEntry();
- }
-
- long getPosition() {
- if (state == State.OPEN) {
- return reader.getPos();
- } else {
- return 0;
- }
- }
-
- @Override
- public void close() throws IOException {
- if (state == State.OPEN) {
- reader.close();
- }
- state = State.CLOSED;
- }
-
- boolean isOpen() {
- return isOpen;
- }
-
- @Override
- public String toString() {
- return getName();
- }
-
- /**
- * @param file File being scanned and validated.
- * @param maxTxIdToScan Maximum Tx ID to try to scan.
- * The scan returns after reading this or a higher
- * ID. The file portion beyond this ID is
- * potentially being updated.
- * @return Result of the validation
- * @throws IOException
- */
- static LogValidation scanEditLog(File file, long maxTxIdToScan)
- throws IOException {
- LogInputStream in;
- try {
- in = new LogInputStream(file, INVALID_LOG_INDEX, INVALID_LOG_INDEX, false);
- // read the header, initialize the inputstream
- in.init();
- } catch (EOFException e) {
- LOG.warn("Log file " + file + " has no valid header", e);
- return new LogValidation(0, INVALID_LOG_INDEX, true);
- }
-
- try {
- return scanEditLog(in, maxTxIdToScan);
- } finally {
- RaftUtils.cleanup(LOG, in);
- }
- }
-
- /**
- * Find the last valid entry index in the stream.
- * If there are invalid or corrupt entries in the middle of the stream,
- * scanEditLog will skip over them.
- *
- * This reads through the stream but does not close it.
- *
- * @param maxIndexToScan Maximum entry index to try to scan. The scan returns
- * after reading this or a higher index. The file
- * portion beyond this index is potentially being
- * updated.
- */
- static LogValidation scanEditLog(LogInputStream in, long maxIndexToScan) {
- long lastPos = 0;
- long end = INVALID_LOG_INDEX;
- long numValid = 0;
- boolean hitError = false;
- while (end < maxIndexToScan) {
- long index;
- lastPos = in.getPosition();
- try {
- if (hitError) {
- LogEntryProto entry = in.nextEntry();
- index = entry != null ? entry.getIndex() : INVALID_LOG_INDEX;
- LOG.warn("After resync, position is " + in.getPosition());
- } else {
- index = in.scanNextEntry();
- }
- if (index == INVALID_LOG_INDEX) {
- break;
- } else {
- hitError = false;
- }
- } catch (Throwable t) {
- LOG.warn("Caught exception after scanning through {} ops from {}"
- + " while determining its valid length. Position was "
- + lastPos, numValid, in, t);
- hitError = true;
- continue;
- }
- if (end == INVALID_LOG_INDEX || index > end) {
- end = index;
- }
- numValid++;
- }
- return new LogValidation(lastPos, end, false);
- }
-}