You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/01/31 21:16:49 UTC
[04/54] [abbrv] incubator-ratis git commit: Move o.a.r.s.* to
o.a.r.s.impl.
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/LogAppender.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/LogAppender.java b/raft-server/src/main/java/org/apache/raft/server/LogAppender.java
deleted file mode 100644
index 1d2983b..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/LogAppender.java
+++ /dev/null
@@ -1,481 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.commons.io.IOUtils;
-import org.apache.raft.server.LeaderState.StateUpdateEventType;
-import org.apache.raft.server.impl.ServerProtoUtils;
-import org.apache.raft.server.protocol.TermIndex;
-import org.apache.raft.server.storage.FileInfo;
-import org.apache.raft.server.storage.RaftLog;
-import org.apache.raft.shaded.com.google.protobuf.ByteString;
-import org.apache.raft.shaded.proto.RaftProtos.*;
-import org.apache.raft.statemachine.SnapshotInfo;
-import org.apache.raft.util.Daemon;
-import org.apache.raft.util.ProtoUtils;
-import org.apache.raft.util.Timestamp;
-import org.slf4j.Logger;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.nio.file.Path;
-import java.util.*;
-
-import static org.apache.raft.server.RaftServerConfigKeys.*;
-import static org.apache.raft.server.RaftServerConstants.INVALID_LOG_INDEX;
-
-/**
- * A daemon thread appending log entries to a follower peer.
- */
-public class LogAppender extends Daemon {
- public static final Logger LOG = RaftServer.LOG;
-
- protected final RaftServer server;
- private final LeaderState leaderState;
- protected final RaftLog raftLog;
- protected final FollowerInfo follower;
- private final int maxBufferSize;
- private final boolean batchSending;
- private final LogEntryBuffer buffer;
- private final long leaderTerm;
-
- private volatile boolean sending = true;
-
- public LogAppender(RaftServer server, LeaderState leaderState, FollowerInfo f) {
- this.follower = f;
- this.server = server;
- this.leaderState = leaderState;
- this.raftLog = server.getState().getLog();
- this.maxBufferSize = server.getProperties().getInt(
- RAFT_SERVER_LOG_APPENDER_BUFFER_CAPACITY_KEY,
- RAFT_SERVER_LOG_APPENDER_BUFFER_CAPACITY_DEFAULT);
- this.batchSending = server.getProperties().getBoolean(
- RAFT_SERVER_LOG_APPENDER_BATCH_ENABLED_KEY,
- RAFT_SERVER_LOG_APPENDER_BATCH_ENABLED_DEFAULT);
- this.buffer = new LogEntryBuffer();
- this.leaderTerm = server.getState().getCurrentTerm();
- }
-
- @Override
- public String toString() {
- return getClass().getSimpleName() + "(" + server.getId() + " -> " +
- follower.getPeer().getId() + ")";
- }
-
- @Override
- public void run() {
- try {
- checkAndSendAppendEntries();
- } catch (InterruptedException | InterruptedIOException e) {
- LOG.info(this + " was interrupted: " + e);
- }
- }
-
- protected boolean isAppenderRunning() {
- return sending;
- }
-
- public void stopSender() {
- this.sending = false;
- }
-
- public FollowerInfo getFollower() {
- return follower;
- }
-
- /**
- * A buffer for log entries with size limitation.
- */
- private class LogEntryBuffer {
- private final List<LogEntryProto> buf = new ArrayList<>();
- private int totalSize = 0;
-
- void addEntry(LogEntryProto entry) {
- buf.add(entry);
- totalSize += entry.getSerializedSize();
- }
-
- boolean isFull() {
- return totalSize >= maxBufferSize;
- }
-
- boolean isEmpty() {
- return buf.isEmpty();
- }
-
- AppendEntriesRequestProto getAppendRequest(TermIndex previous) {
- final AppendEntriesRequestProto request = server
- .createAppendEntriesRequest(leaderTerm, follower.getPeer().getId(),
- previous, buf, !follower.isAttendingVote());
- buf.clear();
- totalSize = 0;
- return request;
- }
-
- int getPendingEntryNum() {
- return buf.size();
- }
- }
-
- private TermIndex getPrevious() {
- TermIndex previous = ServerProtoUtils.toTermIndex(
- raftLog.get(follower.getNextIndex() - 1));
- if (previous == null) {
- // if previous is null, nextIndex must be equal to the log start
- // index (otherwise we will install snapshot).
- Preconditions.checkState(follower.getNextIndex() == raftLog.getStartIndex(),
- "follower's next index %s, local log start index %s",
- follower.getNextIndex(), raftLog.getStartIndex());
- SnapshotInfo snapshot = server.getState().getLatestSnapshot();
- previous = snapshot == null ? null : snapshot.getTermIndex();
- }
- return previous;
- }
-
- protected AppendEntriesRequestProto createRequest() {
- final TermIndex previous = getPrevious();
- final long leaderNext = raftLog.getNextIndex();
- long next = follower.getNextIndex() + buffer.getPendingEntryNum();
- boolean toSend = false;
-
- if (leaderNext == next && !buffer.isEmpty()) {
- // no new entries, then send out the entries in the buffer
- toSend = true;
- } else if (leaderNext > next) {
- while (leaderNext > next && !buffer.isFull()) {
- // stop adding entry once the buffer size is >= the max size
- buffer.addEntry(raftLog.get(next++));
- }
- if (buffer.isFull() || !batchSending) {
- // buffer is full or batch sending is disabled, send out a request
- toSend = true;
- }
- }
-
- if (toSend || shouldHeartbeat()) {
- return buffer.getAppendRequest(previous);
- }
- return null;
- }
-
- /** Send an appendEntries RPC; retry indefinitely. */
- private AppendEntriesReplyProto sendAppendEntriesWithRetries()
- throws InterruptedException, InterruptedIOException {
- int retry = 0;
- AppendEntriesRequestProto request = null;
- while (isAppenderRunning()) { // keep retrying for IOException
- try {
- if (request == null || request.getEntriesCount() == 0) {
- request = createRequest();
- }
-
- if (request == null) {
- LOG.trace("{} need not send AppendEntries now." +
- " Wait for more entries.", server.getId());
- return null;
- } else if (!isAppenderRunning()) {
- LOG.debug("LogAppender {} has been stopped. Skip the request.", this);
- return null;
- }
-
- follower.updateLastRpcSendTime();
- final AppendEntriesReplyProto r = server.getServerRpc()
- .sendAppendEntries(request);
- follower.updateLastRpcResponseTime();
-
- return r;
- } catch (InterruptedIOException iioe) {
- throw iioe;
- } catch (IOException ioe) {
- LOG.debug(this + ": failed to send appendEntries; retry " + retry++, ioe);
- }
- if (isAppenderRunning()) {
- Thread.sleep(leaderState.getSyncInterval());
- }
- }
- return null;
- }
-
- protected class SnapshotRequestIter
- implements Iterable<InstallSnapshotRequestProto> {
- private final SnapshotInfo snapshot;
- private final List<FileInfo> files;
- private FileInputStream in;
- private int fileIndex = 0;
-
- private FileInfo currentFileInfo;
- private byte[] currentBuf;
- private long currentFileSize;
- private long currentOffset = 0;
- private int chunkIndex = 0;
-
- private final String requestId;
- private int requestIndex = 0;
-
- public SnapshotRequestIter(SnapshotInfo snapshot, String requestId)
- throws IOException {
- this.snapshot = snapshot;
- this.requestId = requestId;
- this.files = snapshot.getFiles();
- if (files.size() > 0) {
- startReadFile();
- }
- }
-
- private void startReadFile() throws IOException {
- currentFileInfo = files.get(fileIndex);
- File snapshotFile = currentFileInfo.getPath().toFile();
- currentFileSize = snapshotFile.length();
- final int bufLength =
- (int) Math.min(leaderState.getSnapshotChunkMaxSize(), currentFileSize);
- currentBuf = new byte[bufLength];
- currentOffset = 0;
- chunkIndex = 0;
- in = new FileInputStream(snapshotFile);
- }
-
- @Override
- public Iterator<InstallSnapshotRequestProto> iterator() {
- return new Iterator<InstallSnapshotRequestProto>() {
- @Override
- public boolean hasNext() {
- return fileIndex < files.size();
- }
-
- @Override
- public InstallSnapshotRequestProto next() {
- if (fileIndex >= files.size()) {
- throw new NoSuchElementException();
- }
- int targetLength = (int) Math.min(currentFileSize - currentOffset,
- leaderState.getSnapshotChunkMaxSize());
- FileChunkProto chunk;
- try {
- chunk = readFileChunk(currentFileInfo, in, currentBuf,
- targetLength, currentOffset, chunkIndex);
- boolean done = (fileIndex == files.size() - 1) &&
- chunk.getDone();
- InstallSnapshotRequestProto request =
- server.createInstallSnapshotRequest(follower.getPeer().getId(),
- requestId, requestIndex++, snapshot,
- Lists.newArrayList(chunk), done);
- currentOffset += targetLength;
- chunkIndex++;
-
- if (currentOffset >= currentFileSize) {
- in.close();
- fileIndex++;
- if (fileIndex < files.size()) {
- startReadFile();
- }
- }
-
- return request;
- } catch (IOException e) {
- if (in != null) {
- try {
- in.close();
- } catch (IOException ignored) {
- }
- }
- LOG.warn("Got exception when preparing InstallSnapshot request", e);
- throw new RuntimeException(e);
- }
- }
- };
- }
- }
-
- private FileChunkProto readFileChunk(FileInfo fileInfo,
- FileInputStream in, byte[] buf, int length, long offset, int chunkIndex)
- throws IOException {
- FileChunkProto.Builder builder = FileChunkProto.newBuilder()
- .setOffset(offset).setChunkIndex(chunkIndex);
- IOUtils.readFully(in, buf, 0, length);
- Path relativePath = server.getState().getStorage().getStorageDir()
- .relativizeToRoot(fileInfo.getPath());
- builder.setFilename(relativePath.toString());
- builder.setDone(offset + length == fileInfo.getFileSize());
- builder.setFileDigest(
- ByteString.copyFrom(fileInfo.getFileDigest().getDigest()));
- builder.setData(ByteString.copyFrom(buf, 0, length));
- return builder.build();
- }
-
- private InstallSnapshotReplyProto installSnapshot(SnapshotInfo snapshot)
- throws InterruptedException, InterruptedIOException {
- String requestId = UUID.randomUUID().toString();
- InstallSnapshotReplyProto reply = null;
- try {
- for (InstallSnapshotRequestProto request :
- new SnapshotRequestIter(snapshot, requestId)) {
- follower.updateLastRpcSendTime();
- reply = server.getServerRpc().sendInstallSnapshot(request);
- follower.updateLastRpcResponseTime();
-
- if (!reply.getServerReply().getSuccess()) {
- return reply;
- }
- }
- } catch (InterruptedIOException iioe) {
- throw iioe;
- } catch (Exception ioe) {
- LOG.warn(this + ": failed to install SnapshotInfo " + snapshot.getFiles(),
- ioe);
- return null;
- }
-
- if (reply != null) {
- follower.updateMatchIndex(snapshot.getTermIndex().getIndex());
- follower.updateNextIndex(snapshot.getTermIndex().getIndex() + 1);
- LOG.info("{}: install snapshot-{} successfully on follower {}",
- server.getId(), snapshot.getTermIndex().getIndex(), follower.getPeer());
- }
- return reply;
- }
-
- protected SnapshotInfo shouldInstallSnapshot() {
- final long logStartIndex = raftLog.getStartIndex();
- // we should install snapshot if the follower needs to catch up and:
- // 1. there is no local log entry but there is snapshot
- // 2. or the follower's next index is smaller than the log start index
- if (follower.getNextIndex() < raftLog.getNextIndex()) {
- SnapshotInfo snapshot = server.getState().getLatestSnapshot();
- if (follower.getNextIndex() < logStartIndex ||
- (logStartIndex == INVALID_LOG_INDEX && snapshot != null)) {
- return snapshot;
- }
- }
- return null;
- }
-
- /** Check and send appendEntries RPC */
- private void checkAndSendAppendEntries()
- throws InterruptedException, InterruptedIOException {
- while (isAppenderRunning()) {
- if (shouldSendRequest()) {
- SnapshotInfo snapshot = shouldInstallSnapshot();
- if (snapshot != null) {
- LOG.info("{}: follower {}'s next index is {}," +
- " log's start index is {}, need to install snapshot",
- server.getId(), follower.getPeer(), follower.getNextIndex(),
- raftLog.getStartIndex());
-
- final InstallSnapshotReplyProto r = installSnapshot(snapshot);
- if (r != null && r.getResult() == InstallSnapshotResult.NOT_LEADER) {
- checkResponseTerm(r.getTerm());
- } // otherwise if r is null, retry the snapshot installation
- } else {
- final AppendEntriesReplyProto r = sendAppendEntriesWithRetries();
- if (r != null) {
- handleReply(r);
- }
- }
- }
- if (isAppenderRunning() && !shouldAppendEntries(
- follower.getNextIndex() + buffer.getPendingEntryNum())) {
- final long waitTime = getHeartbeatRemainingTime(
- follower.getLastRpcTime());
- if (waitTime > 0) {
- synchronized (this) {
- wait(waitTime);
- }
- }
- }
- }
- }
-
- private void handleReply(AppendEntriesReplyProto reply) {
- if (reply != null) {
- switch (reply.getResult()) {
- case SUCCESS:
- final long oldNextIndex = follower.getNextIndex();
- final long nextIndex = reply.getNextIndex();
- if (nextIndex < oldNextIndex) {
- throw new IllegalStateException("nextIndex=" + nextIndex
- + " < oldNextIndex=" + oldNextIndex
- + ", reply=" + ProtoUtils.toString(reply));
- }
-
- if (nextIndex > oldNextIndex) {
- follower.updateMatchIndex(nextIndex - 1);
- follower.updateNextIndex(nextIndex);
- submitEventOnSuccessAppend();
- }
- break;
- case NOT_LEADER:
- // check if should step down
- checkResponseTerm(reply.getTerm());
- break;
- case INCONSISTENCY:
- follower.decreaseNextIndex(reply.getNextIndex());
- break;
- case UNRECOGNIZED:
- LOG.warn("{} received UNRECOGNIZED AppendResult from {}",
- server.getId(), follower.getPeer().getId());
- break;
- }
- }
- }
-
- protected void submitEventOnSuccessAppend() {
- LeaderState.StateUpdateEvent e = follower.isAttendingVote() ?
- LeaderState.UPDATE_COMMIT_EVENT :
- LeaderState.STAGING_PROGRESS_EVENT;
- leaderState.submitUpdateStateEvent(e);
- }
-
- public synchronized void notifyAppend() {
- this.notify();
- }
-
- /** Should the leader send appendEntries RPC to this follower? */
- protected boolean shouldSendRequest() {
- return shouldAppendEntries(follower.getNextIndex()) || shouldHeartbeat();
- }
-
- private boolean shouldAppendEntries(long followerIndex) {
- return followerIndex < raftLog.getNextIndex();
- }
-
- private boolean shouldHeartbeat() {
- return getHeartbeatRemainingTime(follower.getLastRpcTime()) <= 0;
- }
-
- /**
- * @return the time in milliseconds that the leader should send a heartbeat.
- */
- protected long getHeartbeatRemainingTime(Timestamp lastTime) {
- return server.getMinTimeoutMs() / 2 - lastTime.elapsedTimeMs();
- }
-
- protected void checkResponseTerm(long responseTerm) {
- synchronized (server) {
- if (isAppenderRunning() && follower.isAttendingVote()
- && responseTerm > leaderState.getCurrentTerm()) {
- leaderState.submitUpdateStateEvent(
- new LeaderState.StateUpdateEvent(StateUpdateEventType.STEPDOWN,
- responseTerm));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/LogAppenderFactory.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/LogAppenderFactory.java b/raft-server/src/main/java/org/apache/raft/server/LogAppenderFactory.java
deleted file mode 100644
index 3cb2b06..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/LogAppenderFactory.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server;
-
-public interface LogAppenderFactory {
- LogAppender getLogAppender(RaftServer server, LeaderState state,
- FollowerInfo f);
-
- class SynchronousLogAppenderFactory implements LogAppenderFactory {
- @Override
- public LogAppender getLogAppender(RaftServer server, LeaderState state,
- FollowerInfo f) {
- return new LogAppender(server, state, f);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/PeerConfiguration.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/PeerConfiguration.java b/raft-server/src/main/java/org/apache/raft/server/PeerConfiguration.java
deleted file mode 100644
index 9f01390..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/PeerConfiguration.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.protocol.RaftPeer;
-
-import java.util.*;
-
-/**
- * The peer configuration of a raft cluster.
- *
- * The objects of this class are immutable.
- */
-class PeerConfiguration {
- private final Map<String, RaftPeer> peers;
-
- PeerConfiguration(Iterable<RaftPeer> peers) {
- Preconditions.checkNotNull(peers);
- Map<String, RaftPeer> map = new HashMap<>();
- for(RaftPeer p : peers) {
- map.put(p.getId(), p);
- }
- this.peers = Collections.unmodifiableMap(map);
- Preconditions.checkState(!this.peers.isEmpty());
- }
-
- Collection<RaftPeer> getPeers() {
- return Collections.unmodifiableCollection(peers.values());
- }
-
- int size() {
- return peers.size();
- }
-
- @Override
- public String toString() {
- return peers.values().toString();
- }
-
- RaftPeer getPeer(String id) {
- return peers.get(id);
- }
-
- boolean contains(String id) {
- return peers.containsKey(id);
- }
-
- List<RaftPeer> getOtherPeers(String selfId) {
- List<RaftPeer> others = new ArrayList<>();
- for (Map.Entry<String, RaftPeer> entry : peers.entrySet()) {
- if (!selfId.equals(entry.getValue().getId())) {
- others.add(entry.getValue());
- }
- }
- return others;
- }
-
- boolean hasMajority(Collection<String> others, String selfId) {
- Preconditions.checkArgument(!others.contains(selfId));
- int num = 0;
- if (contains(selfId)) {
- num++;
- }
- for (String other : others) {
- if (contains(other)) {
- num++;
- }
- if (num > size() / 2) {
- return true;
- }
- }
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/PendingRequest.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/PendingRequest.java b/raft-server/src/main/java/org/apache/raft/server/PendingRequest.java
deleted file mode 100644
index 3598349..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/PendingRequest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.protocol.Message;
-import org.apache.raft.protocol.RaftClientReply;
-import org.apache.raft.protocol.RaftClientRequest;
-import org.apache.raft.protocol.SetConfigurationRequest;
-import org.apache.raft.statemachine.TransactionContext;
-
-import java.util.concurrent.CompletableFuture;
-
-public class PendingRequest implements Comparable<PendingRequest> {
- private final Long index;
- private final RaftClientRequest request;
- private final TransactionContext entry;
- private final CompletableFuture<RaftClientReply> future;
-
- PendingRequest(long index, RaftClientRequest request,
- TransactionContext entry) {
- this.index = index;
- this.request = request;
- this.entry = entry;
- this.future = new CompletableFuture<>();
- }
-
- PendingRequest(SetConfigurationRequest request) {
- this(RaftServerConstants.INVALID_LOG_INDEX, request, null);
- }
-
- long getIndex() {
- return index;
- }
-
- RaftClientRequest getRequest() {
- return request;
- }
-
- public CompletableFuture<RaftClientReply> getFuture() {
- return future;
- }
-
- TransactionContext getEntry() {
- return entry;
- }
-
- synchronized void setException(Throwable e) {
- Preconditions.checkArgument(e != null);
- future.completeExceptionally(e);
- }
-
- synchronized void setReply(RaftClientReply r) {
- Preconditions.checkArgument(r != null);
- future.complete(r);
- }
-
- void setSuccessReply(Message message) {
- setReply(new RaftClientReply(getRequest(), message));
- }
-
- @Override
- public int compareTo(PendingRequest that) {
- return Long.compare(this.index, that.index);
- }
-
- @Override
- public String toString() {
- return getClass().getSimpleName() + "(index=" + index
- + ", request=" + request;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/PendingRequests.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/PendingRequests.java b/raft-server/src/main/java/org/apache/raft/server/PendingRequests.java
deleted file mode 100644
index a5731fd..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/PendingRequests.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.stream.Collectors;
-
-import org.apache.raft.protocol.Message;
-import org.apache.raft.protocol.RaftClientReply;
-import org.apache.raft.protocol.RaftClientRequest;
-import org.apache.raft.protocol.RaftException;
-import org.apache.raft.protocol.SetConfigurationRequest;
-import org.apache.raft.statemachine.TransactionContext;
-import org.slf4j.Logger;
-
-import com.google.common.base.Preconditions;
-
-class PendingRequests {
- private static final Logger LOG = RaftServer.LOG;
-
- private PendingRequest pendingSetConf;
- private final RaftServer server;
- private final ConcurrentMap<Long, PendingRequest> pendingRequests = new ConcurrentHashMap<>();
- private PendingRequest last = null;
-
- PendingRequests(RaftServer server) {
- this.server = server;
- }
-
- PendingRequest addPendingRequest(long index, RaftClientRequest request,
- TransactionContext entry) {
- // externally synced for now
- Preconditions.checkArgument(!request.isReadOnly());
- Preconditions.checkState(last == null || index == last.getIndex() + 1);
- return add(index, request, entry);
- }
-
- private PendingRequest add(long index, RaftClientRequest request,
- TransactionContext entry) {
- final PendingRequest pending = new PendingRequest(index, request, entry);
- pendingRequests.put(index, pending);
- last = pending;
- return pending;
- }
-
- PendingRequest addConfRequest(SetConfigurationRequest request) {
- Preconditions.checkState(pendingSetConf == null);
- pendingSetConf = new PendingRequest(request);
- return pendingSetConf;
- }
-
- void replySetConfiguration() {
- // we allow the pendingRequest to be null in case that the new leader
- // commits the new configuration while it has not received the retry
- // request from the client
- if (pendingSetConf != null) {
- // for setConfiguration we do not need to wait for statemachine. send back
- // reply after it's committed.
- pendingSetConf.setSuccessReply(null);
- pendingSetConf = null;
- }
- }
-
- void failSetConfiguration(RaftException e) {
- Preconditions.checkState(pendingSetConf != null);
- pendingSetConf.setException(e);
- pendingSetConf = null;
- }
-
- TransactionContext getTransactionContext(long index) {
- PendingRequest pendingRequest = pendingRequests.get(index);
- // it is possible that the pendingRequest is null if this peer just becomes
- // the new leader and commits transactions received by the previous leader
- return pendingRequest != null ? pendingRequest.getEntry() : null;
- }
-
- void replyPendingRequest(long index, CompletableFuture<Message> messageFuture) {
- final PendingRequest pending = pendingRequests.get(index);
- if (pending != null) {
- Preconditions.checkState(pending.getIndex() == index);
-
- messageFuture.whenComplete((reply, exception) -> {
- if (exception == null) {
- pending.setSuccessReply(reply);
- } else {
- pending.setException(exception);
- }
- });
- }
- }
-
- /**
- * The leader state is stopped. Send NotLeaderException to all the pending
- * requests since they have not got applied to the state machine yet.
- */
- void sendNotLeaderResponses() throws IOException {
- LOG.info("{} sends responses before shutting down PendingRequestsHandler",
- server.getId());
-
- Collection<TransactionContext> pendingEntries = pendingRequests.values().stream()
- .map(PendingRequest::getEntry).collect(Collectors.toList());
- // notify the state machine about stepping down
- server.getStateMachine().notifyNotLeader(pendingEntries);
- pendingRequests.values().forEach(this::setNotLeaderException);
- if (pendingSetConf != null) {
- setNotLeaderException(pendingSetConf);
- }
- }
-
- private void setNotLeaderException(PendingRequest pending) {
- RaftClientReply reply = new RaftClientReply(pending.getRequest(),
- server.generateNotLeaderException());
- pending.setReply(reply);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/RaftConfiguration.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/RaftConfiguration.java b/raft-server/src/main/java/org/apache/raft/server/RaftConfiguration.java
deleted file mode 100644
index 54ed9d6..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/RaftConfiguration.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.raft.protocol.RaftPeer;
-
-import java.util.*;
-import java.util.concurrent.ThreadLocalRandom;
-
-/**
- * The configuration of the raft cluster.
- *
- * The configuration is stable if there is no on-going peer change. Otherwise,
- * the configuration is transitional, i.e. in the middle of a peer change.
- *
- * The objects of this class are immutable.
- */
-public class RaftConfiguration {
- /** Create a {@link Builder}. */
- public static Builder newBuilder() {
- return new Builder();
- }
-
- /** To build {@link RaftConfiguration} objects. */
- public static class Builder {
- private PeerConfiguration oldConf;
- private PeerConfiguration conf;
- private long logEntryIndex = RaftServerConstants.INVALID_LOG_INDEX;
-
- private boolean forceStable = false;
- private boolean forceTransitional = false;
-
- private Builder() {}
-
- public Builder setConf(PeerConfiguration conf) {
- Preconditions.checkNotNull(conf);
- Preconditions.checkState(this.conf == null, "conf is already set.");
- this.conf = conf;
- return this;
- }
-
- public Builder setConf(Iterable<RaftPeer> peers) {
- return setConf(new PeerConfiguration(peers));
- }
-
- public Builder setConf(RaftPeer[] peers) {
- return setConf(Arrays.asList(peers));
- }
-
- Builder setConf(RaftConfiguration transitionalConf) {
- Preconditions.checkNotNull(transitionalConf);
- Preconditions.checkState(transitionalConf.isTransitional());
-
- Preconditions.checkState(!forceTransitional);
- forceStable = true;
- return setConf(transitionalConf.conf);
- }
-
-
- public Builder setOldConf(PeerConfiguration oldConf) {
- Preconditions.checkNotNull(oldConf);
- Preconditions.checkState(this.oldConf == null, "oldConf is already set.");
- this.oldConf = oldConf;
- return this;
- }
-
- public Builder setOldConf(Iterable<RaftPeer> oldPeers) {
- return setOldConf(new PeerConfiguration(oldPeers));
- }
-
- public Builder setOldConf(RaftPeer[] oldPeers) {
- return setOldConf(Arrays.asList(oldPeers));
- }
-
- Builder setOldConf(RaftConfiguration stableConf) {
- Preconditions.checkNotNull(stableConf);
- Preconditions.checkState(stableConf.isStable());
-
- Preconditions.checkState(!forceStable);
- forceTransitional = true;
- return setOldConf(stableConf.conf);
- }
-
- public Builder setLogEntryIndex(long logEntryIndex) {
- Preconditions.checkArgument(
- logEntryIndex != RaftServerConstants.INVALID_LOG_INDEX);
- Preconditions.checkState(
- this.logEntryIndex == RaftServerConstants.INVALID_LOG_INDEX,
- "logEntryIndex is already set.");
- this.logEntryIndex = logEntryIndex;
- return this;
- }
-
- /** Build a {@link RaftConfiguration}. */
- public RaftConfiguration build() {
- if (forceTransitional) {
- Preconditions.checkState(oldConf != null);
- }
- if (forceStable) {
- Preconditions.checkState(oldConf == null);
- }
- return new RaftConfiguration(conf, oldConf, logEntryIndex);
- }
- }
-
- /** Non-null only if this configuration is transitional. */
- private final PeerConfiguration oldConf;
- /**
- * The current peer configuration while this configuration is stable;
- * or the new peer configuration while this configuration is transitional.
- */
- private final PeerConfiguration conf;
-
- /** The index of the corresponding log entry for this configuration. */
- private final long logEntryIndex;
-
- private RaftConfiguration(PeerConfiguration conf, PeerConfiguration oldConf,
- long logEntryIndex) {
- Preconditions.checkNotNull(conf);
- this.conf = conf;
- this.oldConf = oldConf;
- this.logEntryIndex = logEntryIndex;
- }
-
- /** Is this configuration transitional, i.e. in the middle of a peer change? */
- public boolean isTransitional() {
- return oldConf != null;
- }
-
- /** Is this configuration stable, i.e. no on-going peer change? */
- public boolean isStable() {
- return oldConf == null;
- }
-
- boolean containsInConf(String peerId) {
- return conf.contains(peerId);
- }
-
- boolean containsInOldConf(String peerId) {
- return oldConf != null && oldConf.contains(peerId);
- }
-
- public boolean contains(String peerId) {
- return containsInConf(peerId) && (oldConf == null || containsInOldConf(peerId));
- }
-
- /**
- * @return the peer corresponding to the given id;
- * or return null if the peer is not in this configuration.
- */
- public RaftPeer getPeer(String id) {
- if (id == null) {
- return null;
- }
- RaftPeer peer = conf.getPeer(id);
- if (peer != null) {
- return peer;
- } else if (oldConf != null) {
- return oldConf.getPeer(id);
- }
- return null;
- }
-
- /** @return all the peers from the conf, and the old conf if it exists. */
- public Collection<RaftPeer> getPeers() {
- final Collection<RaftPeer> peers = new ArrayList<>(conf.getPeers());
- if (oldConf != null) {
- oldConf.getPeers().stream().filter(p -> !peers.contains(p))
- .forEach(peers::add);
- }
- return peers;
- }
-
- /**
- * @return all the peers other than the given self id from the conf,
- * and the old conf if it exists.
- */
- public Collection<RaftPeer> getOtherPeers(String selfId) {
- Collection<RaftPeer> others = conf.getOtherPeers(selfId);
- if (oldConf != null) {
- oldConf.getOtherPeers(selfId).stream()
- .filter(p -> !others.contains(p))
- .forEach(others::add);
- }
- return others;
- }
-
- /** @return true if the self id together with the others are in the majority. */
- public boolean hasMajority(Collection<String> others, String selfId) {
- Preconditions.checkArgument(!others.contains(selfId));
- return conf.hasMajority(others, selfId) &&
- (oldConf == null || oldConf.hasMajority(others, selfId));
- }
-
- @Override
- public String toString() {
- return conf + (oldConf != null ? "old:" + oldConf : "");
- }
-
- @VisibleForTesting
- boolean hasNoChange(RaftPeer[] newMembers) {
- if (!isStable() || conf.size() != newMembers.length) {
- return false;
- }
- for (RaftPeer peer : newMembers) {
- if (!conf.contains(peer.getId())) {
- return false;
- }
- }
- return true;
- }
-
- long getLogEntryIndex() {
- return logEntryIndex;
- }
-
- static Collection<RaftPeer> computeNewPeers(RaftPeer[] newMembers,
- RaftConfiguration old) {
- List<RaftPeer> peers = new ArrayList<>();
- for (RaftPeer p : newMembers) {
- if (!old.containsInConf(p.getId())) {
- peers.add(p);
- }
- }
- return peers;
- }
-
- RaftPeer getRandomPeer(String exclusiveId) {
- final List<RaftPeer> peers = conf.getOtherPeers(exclusiveId);
- if (peers.isEmpty()) {
- return null;
- }
- final int index = ThreadLocalRandom.current().nextInt(peers.size());
- return peers.get(index);
- }
-
- public Collection<RaftPeer> getPeersInOldConf() {
- return oldConf != null ? oldConf.getPeers() : Collections.emptyList();
- }
-
- public Collection<RaftPeer> getPeersInConf() {
- return conf.getPeers();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/RaftServer.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/RaftServer.java b/raft-server/src/main/java/org/apache/raft/server/RaftServer.java
deleted file mode 100644
index c7ea6b2..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/RaftServer.java
+++ /dev/null
@@ -1,750 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.protocol.*;
-import org.apache.raft.server.protocol.RaftServerProtocol;
-import org.apache.raft.server.impl.ServerProtoUtils;
-import org.apache.raft.server.protocol.TermIndex;
-import org.apache.raft.server.storage.FileInfo;
-import org.apache.raft.shaded.proto.RaftProtos.*;
-import org.apache.raft.statemachine.SnapshotInfo;
-import org.apache.raft.statemachine.StateMachine;
-import org.apache.raft.statemachine.TransactionContext;
-import org.apache.raft.util.CodeInjectionForTesting;
-import org.apache.raft.util.LifeCycle;
-import org.apache.raft.util.ProtoUtils;
-import org.apache.raft.util.RaftUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.OptionalLong;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.raft.server.LeaderState.UPDATE_COMMIT_EVENT;
-import static org.apache.raft.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*;
-import static org.apache.raft.util.LifeCycle.State.*;
-
-public class RaftServer implements RaftServerProtocol, Closeable {
- public static final Logger LOG = LoggerFactory.getLogger(RaftServer.class);
-
- private static final String CLASS_NAME = RaftServer.class.getSimpleName();
- static final String REQUEST_VOTE = CLASS_NAME + ".requestVote";
- static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries";
- static final String INSTALL_SNAPSHOT = CLASS_NAME + ".installSnapshot";
-
-
- private final int minTimeoutMs;
- private final int maxTimeoutMs;
-
- private final LifeCycle lifeCycle;
- private final ServerState state;
- private final StateMachine stateMachine;
- private final RaftProperties properties;
- private volatile Role role;
-
- /** used when the peer is follower, to monitor election timeout */
- private volatile FollowerState heartbeatMonitor;
-
- /** used when the peer is candidate, to request votes from other peers */
- private volatile LeaderElection electionDaemon;
-
- /** used when the peer is leader */
- private volatile LeaderState leaderState;
-
- private RaftServerRpc serverRpc;
-
- private final LogAppenderFactory appenderFactory;
-
- public RaftServer(String id, RaftConfiguration raftConf,
- RaftProperties properties, StateMachine stateMachine) throws IOException {
- this.lifeCycle = new LifeCycle(id);
- minTimeoutMs = properties.getInt(
- RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_KEY,
- RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT);
- maxTimeoutMs = properties.getInt(
- RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_KEY,
- RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_DEFAULT);
- Preconditions.checkArgument(maxTimeoutMs > minTimeoutMs,
- "max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs);
- this.properties = properties;
- this.stateMachine = stateMachine;
- this.state = new ServerState(id, raftConf, properties, this, stateMachine);
- appenderFactory = initAppenderFactory();
- }
-
- public int getMinTimeoutMs() {
- return minTimeoutMs;
- }
-
- public int getMaxTimeoutMs() {
- return maxTimeoutMs;
- }
-
- public int getRandomTimeoutMs() {
- return RaftUtils.getRandomBetween(minTimeoutMs, maxTimeoutMs);
- }
-
- public StateMachine getStateMachine() {
- return this.stateMachine;
- }
-
- public LogAppenderFactory getLogAppenderFactory() {
- return appenderFactory;
- }
-
- private LogAppenderFactory initAppenderFactory() {
- Class<? extends LogAppenderFactory> factoryClass = properties.getClass(
- RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY,
- RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_DEFAULT,
- LogAppenderFactory.class);
- return RaftUtils.newInstance(factoryClass);
- }
-
- /**
- * Used by tests to set initial raft configuration with correct port bindings.
- */
- @VisibleForTesting
- public void setInitialConf(RaftConfiguration conf) {
- this.state.setInitialConf(conf);
- }
-
- public void setServerRpc(RaftServerRpc serverRpc) {
- this.serverRpc = serverRpc;
- // add peers into rpc service
- RaftConfiguration conf = getRaftConf();
- if (conf != null) {
- addPeersToRPC(conf.getPeers());
- }
- }
-
- public RaftServerRpc getServerRpc() {
- return serverRpc;
- }
-
- public void start() {
- lifeCycle.transition(STARTING);
- state.start();
- RaftConfiguration conf = getRaftConf();
- if (conf != null && conf.contains(getId())) {
- LOG.debug("{} starts as a follower", getId());
- startAsFollower();
- } else {
- LOG.debug("{} starts with initializing state", getId());
- startInitializing();
- }
- }
-
- /**
- * The peer belongs to the current configuration, should start as a follower
- */
- private void startAsFollower() {
- role = Role.FOLLOWER;
- heartbeatMonitor = new FollowerState(this);
- heartbeatMonitor.start();
-
- serverRpc.start();
- lifeCycle.transition(RUNNING);
- }
-
- /**
- * The peer does not have any configuration (maybe it will later be included
- * in some configuration). Start still as a follower but will not vote or
- * start election.
- */
- private void startInitializing() {
- role = Role.FOLLOWER;
- // do not start heartbeatMonitoring
- serverRpc.start();
- }
-
- public ServerState getState() {
- return this.state;
- }
-
- public String getId() {
- return getState().getSelfId();
- }
-
- public RaftConfiguration getRaftConf() {
- return getState().getRaftConf();
- }
-
- @Override
- public void close() {
- lifeCycle.checkStateAndClose(() -> {
- try {
- shutdownHeartbeatMonitor();
- shutdownElectionDaemon();
- shutdownLeaderState();
-
- serverRpc.shutdown();
- state.close();
- } catch (Exception ignored) {
- LOG.warn("Failed to kill " + state.getSelfId(), ignored);
- }
- });
- }
-
- public boolean isAlive() {
- return !lifeCycle.getCurrentState().isOneOf(CLOSING, CLOSED);
- }
-
- public boolean isFollower() {
- return role == Role.FOLLOWER;
- }
-
- public boolean isCandidate() {
- return role == Role.CANDIDATE;
- }
-
- public boolean isLeader() {
- return role == Role.LEADER;
- }
-
- Role getRole() {
- return role;
- }
-
- /**
- * Change the server state to Follower if necessary
- * @param newTerm The new term.
- * @param sync We will call {@link ServerState#persistMetadata()} if this is
- * set to true and term/votedFor get updated.
- * @return if the term/votedFor should be updated to the new term
- * @throws IOException if term/votedFor persistence failed.
- */
- synchronized boolean changeToFollower(long newTerm, boolean sync)
- throws IOException {
- final Role old = role;
- role = Role.FOLLOWER;
-
- boolean metadataUpdated = false;
- if (newTerm > state.getCurrentTerm()) {
- state.setCurrentTerm(newTerm);
- state.resetLeaderAndVotedFor();
- metadataUpdated = true;
- }
-
- if (old == Role.LEADER) {
- assert leaderState != null;
- shutdownLeaderState();
- } else if (old == Role.CANDIDATE) {
- shutdownElectionDaemon();
- }
-
- if (old != Role.FOLLOWER) {
- heartbeatMonitor = new FollowerState(this);
- heartbeatMonitor.start();
- }
-
- if (metadataUpdated && sync) {
- state.persistMetadata();
- }
- return metadataUpdated;
- }
-
- private synchronized void shutdownLeaderState() {
- final LeaderState leader = leaderState;
- if (leader != null) {
- leader.stop();
- }
- leaderState = null;
- // TODO: make sure that StateMachineUpdater has applied all transactions that have context
- }
-
- private void shutdownElectionDaemon() {
- final LeaderElection election = electionDaemon;
- if (election != null) {
- election.stopRunning();
- // no need to interrupt the election thread
- }
- electionDaemon = null;
- }
-
- synchronized void changeToLeader() {
- Preconditions.checkState(isCandidate());
- shutdownElectionDaemon();
- role = Role.LEADER;
- state.becomeLeader();
- // start sending AppendEntries RPC to followers
- leaderState = new LeaderState(this, properties);
- leaderState.start();
- }
-
- private void shutdownHeartbeatMonitor() {
- final FollowerState hm = heartbeatMonitor;
- if (hm != null) {
- hm.stopRunning();
- hm.interrupt();
- }
- heartbeatMonitor = null;
- }
-
- synchronized void changeToCandidate() {
- Preconditions.checkState(isFollower());
- shutdownHeartbeatMonitor();
- role = Role.CANDIDATE;
- // start election
- electionDaemon = new LeaderElection(this);
- electionDaemon.start();
- }
-
- @Override
- public String toString() {
- return role + " " + state + " " + lifeCycle.getCurrentState();
- }
-
- /**
- * @return null if the server is in leader state.
- */
- CompletableFuture<RaftClientReply> checkLeaderState(
- RaftClientRequest request) {
- if (!isLeader()) {
- NotLeaderException exception = generateNotLeaderException();
- CompletableFuture<RaftClientReply> future = new CompletableFuture<>();
- future.complete(new RaftClientReply(request, exception));
- return future;
- }
- return null;
- }
-
- NotLeaderException generateNotLeaderException() {
- if (lifeCycle.getCurrentState() != RUNNING) {
- return new NotLeaderException(getId(), null, null);
- }
- String leaderId = state.getLeaderId();
- if (leaderId == null || leaderId.equals(state.getSelfId())) {
- // No idea about who is the current leader. Or the peer is the current
- // leader, but it is about to step down
- RaftPeer suggestedLeader = state.getRaftConf()
- .getRandomPeer(state.getSelfId());
- leaderId = suggestedLeader == null ? null : suggestedLeader.getId();
- }
- RaftConfiguration conf = getRaftConf();
- Collection<RaftPeer> peers = conf.getPeers();
- return new NotLeaderException(getId(), conf.getPeer(leaderId),
- peers.toArray(new RaftPeer[peers.size()]));
- }
-
- /**
- * Handle a normal update request from client.
- */
- public CompletableFuture<RaftClientReply> appendTransaction(
- RaftClientRequest request, TransactionContext entry)
- throws RaftException {
- LOG.debug("{}: receive client request({})", getId(), request);
- lifeCycle.assertCurrentState(RUNNING);
- CompletableFuture<RaftClientReply> reply;
-
- final PendingRequest pending;
- synchronized (this) {
- reply = checkLeaderState(request);
- if (reply != null) {
- return reply;
- }
-
- // append the message to its local log
- final long entryIndex;
- try {
- entryIndex = state.applyLog(entry);
- } catch (IOException e) {
- throw new RaftException(e);
- }
-
- // put the request into the pending queue
- pending = leaderState.addPendingRequest(entryIndex, request, entry);
- leaderState.notifySenders();
- }
- return pending.getFuture();
- }
-
- /**
- * Handle a raft configuration change request from client.
- */
- public CompletableFuture<RaftClientReply> setConfiguration(
- SetConfigurationRequest request) throws IOException {
- LOG.debug("{}: receive setConfiguration({})", getId(), request);
- lifeCycle.assertCurrentState(RUNNING);
- CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
- if (reply != null) {
- return reply;
- }
-
- final RaftPeer[] peersInNewConf = request.getPeersInNewConf();
- final PendingRequest pending;
- synchronized (this) {
- reply = checkLeaderState(request);
- if (reply != null) {
- return reply;
- }
-
- final RaftConfiguration current = getRaftConf();
- // make sure there is no other raft reconfiguration in progress
- if (!current.isStable() || leaderState.inStagingState() ||
- !state.isCurrentConfCommitted()) {
- throw new ReconfigurationInProgressException(
- "Reconfiguration is already in progress: " + current);
- }
-
- // return true if the new configuration is the same with the current one
- if (current.hasNoChange(peersInNewConf)) {
- pending = leaderState.returnNoConfChange(request);
- return pending.getFuture();
- }
-
- // add new peers into the rpc service
- addPeersToRPC(Arrays.asList(peersInNewConf));
- // add staging state into the leaderState
- pending = leaderState.startSetConfiguration(request);
- }
- return pending.getFuture();
- }
-
- private boolean shouldWithholdVotes() {
- return isLeader() || (isFollower() && state.hasLeader()
- && heartbeatMonitor.shouldWithholdVotes());
- }
-
- /**
- * check if the remote peer is not included in the current conf
- * and should shutdown. should shutdown if all the following stands:
- * 1. this is a leader
- * 2. current conf is stable and has been committed
- * 3. candidate id is not included in conf
- * 4. candidate's last entry's index < conf's index
- */
- private boolean shouldSendShutdown(String candidateId,
- TermIndex candidateLastEntry) {
- return isLeader()
- && getRaftConf().isStable()
- && getState().isConfCommitted()
- && !getRaftConf().containsInConf(candidateId)
- && candidateLastEntry.getIndex() < getRaftConf().getLogEntryIndex()
- && !leaderState.isBootStrappingPeer(candidateId);
- }
-
- @Override
- public RequestVoteReplyProto requestVote(RequestVoteRequestProto r)
- throws IOException {
- final String candidateId = r.getServerRequest().getRequestorId();
- return requestVote(candidateId, r.getCandidateTerm(),
- ServerProtoUtils.toTermIndex(r.getCandidateLastEntry()));
- }
-
- private RequestVoteReplyProto requestVote(String candidateId,
- long candidateTerm, TermIndex candidateLastEntry) throws IOException {
- CodeInjectionForTesting.execute(REQUEST_VOTE, getId(),
- candidateId, candidateTerm, candidateLastEntry);
- LOG.debug("{}: receive requestVote({}, {}, {})",
- getId(), candidateId, candidateTerm, candidateLastEntry);
- lifeCycle.assertCurrentState(RUNNING);
-
- boolean voteGranted = false;
- boolean shouldShutdown = false;
- final RequestVoteReplyProto reply;
- synchronized (this) {
- if (shouldWithholdVotes()) {
- LOG.info("{} Withhold vote from server {} with term {}. " +
- "This server:{}, last rpc time from leader {} is {}", getId(),
- candidateId, candidateTerm, this, this.getState().getLeaderId(),
- (isFollower() ? heartbeatMonitor.getLastRpcTime() : -1));
- } else if (state.recognizeCandidate(candidateId, candidateTerm)) {
- boolean termUpdated = changeToFollower(candidateTerm, false);
- // see Section 5.4.1 Election restriction
- if (state.isLogUpToDate(candidateLastEntry)) {
- heartbeatMonitor.updateLastRpcTime(false);
- state.grantVote(candidateId);
- voteGranted = true;
- }
- if (termUpdated || voteGranted) {
- state.persistMetadata(); // sync metafile
- }
- }
- if (!voteGranted && shouldSendShutdown(candidateId, candidateLastEntry)) {
- shouldShutdown = true;
- }
- reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId, getId(),
- voteGranted, state.getCurrentTerm(), shouldShutdown);
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} replies to vote request: {}. Peer's state: {}",
- getId(), ProtoUtils.toString(reply), state);
- }
- }
- return reply;
- }
-
- private void validateEntries(long expectedTerm, TermIndex previous,
- LogEntryProto... entries) {
- if (entries != null && entries.length > 0) {
- final long index0 = entries[0].getIndex();
-
- if (previous == null || previous.getTerm() == 0) {
- Preconditions.checkArgument(index0 == 0,
- "Unexpected Index: previous is null but entries[%s].getIndex()=%s",
- 0, index0);
- } else {
- Preconditions.checkArgument(previous.getIndex() == index0 - 1,
- "Unexpected Index: previous is %s but entries[%s].getIndex()=%s",
- previous, 0, index0);
- }
-
- for (int i = 0; i < entries.length; i++) {
- final long t = entries[i].getTerm();
- Preconditions.checkArgument(expectedTerm >= t,
- "Unexpected Term: entries[%s].getTerm()=%s but expectedTerm=%s",
- i, t, expectedTerm);
-
- final long indexi = entries[i].getIndex();
- Preconditions.checkArgument(indexi == index0 + i,
- "Unexpected Index: entries[%s].getIndex()=%s but entries[0].getIndex()=%s",
- i, indexi, index0);
- }
- }
- }
-
- @Override
- public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r)
- throws IOException {
- // TODO avoid converting list to array
- final LogEntryProto[] entries = r.getEntriesList()
- .toArray(new LogEntryProto[r.getEntriesCount()]);
- final TermIndex previous = r.hasPreviousLog() ?
- ServerProtoUtils.toTermIndex(r.getPreviousLog()) : null;
- return appendEntries(r.getServerRequest().getRequestorId(),
- r.getLeaderTerm(), previous, r.getLeaderCommit(), r.getInitializing(),
- entries);
- }
-
- private AppendEntriesReplyProto appendEntries(String leaderId, long leaderTerm,
- TermIndex previous, long leaderCommit, boolean initializing,
- LogEntryProto... entries) throws IOException {
- CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(),
- leaderId, leaderTerm, previous, leaderCommit, initializing, entries);
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: receive appendEntries({}, {}, {}, {}, {}, {})", getId(),
- leaderId, leaderTerm, previous, leaderCommit, initializing,
- ServerProtoUtils.toString(entries));
- }
- lifeCycle.assertCurrentState(STARTING, RUNNING);
-
- try {
- validateEntries(leaderTerm, previous, entries);
- } catch (IllegalArgumentException e) {
- throw new IOException(e);
- }
-
- final long currentTerm;
- long nextIndex = state.getLog().getNextIndex();
- synchronized (this) {
- final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
- currentTerm = state.getCurrentTerm();
- if (!recognized) {
- final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto(
- leaderId, getId(), currentTerm, nextIndex, NOT_LEADER);
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: do not recognize leader. Reply: {}",
- getId(), ProtoUtils.toString(reply));
- }
- return reply;
- }
- changeToFollower(leaderTerm, true);
- state.setLeader(leaderId);
-
- if (!initializing && lifeCycle.compareAndTransition(STARTING, RUNNING)) {
- heartbeatMonitor = new FollowerState(this);
- heartbeatMonitor.start();
- }
- if (lifeCycle.getCurrentState() == RUNNING) {
- heartbeatMonitor.updateLastRpcTime(true);
- }
-
- // We need to check if "previous" is in the local peer. Note that it is
- // possible that "previous" is covered by the latest snapshot: e.g.,
- // it's possible there's no log entries outside of the latest snapshot.
- // However, it is not possible that "previous" index is smaller than the
- // last index included in snapshot. This is because indices <= snapshot's
- // last index should have been committed.
- if (previous != null && !containPrevious(previous)) {
- final AppendEntriesReplyProto reply =
- ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getId(),
- currentTerm, Math.min(nextIndex, previous.getIndex()), INCONSISTENCY);
- LOG.debug("{}: inconsistency entries. Leader previous:{}, Reply:{}",
- getId(), previous, ServerProtoUtils.toString(reply));
- return reply;
- }
-
- state.getLog().append(entries);
- state.updateConfiguration(entries);
- state.updateStatemachine(leaderCommit, currentTerm);
- }
- if (entries != null && entries.length > 0) {
- try {
- state.getLog().logSync();
- } catch (InterruptedException e) {
- throw new InterruptedIOException("logSync got interrupted");
- }
- nextIndex = entries[entries.length - 1].getIndex() + 1;
- }
- synchronized (this) {
- if (lifeCycle.getCurrentState() == RUNNING && isFollower()
- && getState().getCurrentTerm() == currentTerm) {
- // reset election timer to avoid punishing the leader for our own
- // long disk writes
- heartbeatMonitor.updateLastRpcTime(false);
- }
- }
- final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto(
- leaderId, getId(), currentTerm, nextIndex, SUCCESS);
- LOG.debug("{}: succeeded to handle AppendEntries. Reply: {}", getId(),
- ServerProtoUtils.toString(reply));
- return reply;
- }
-
- private boolean containPrevious(TermIndex previous) {
- LOG.debug("{}: prev:{}, latestSnapshot:{}, getLatestInstalledSnapshot:{}",
- getId(), previous, state.getLatestSnapshot(), state.getLatestInstalledSnapshot());
- return state.getLog().contains(previous)
- || (state.getLatestSnapshot() != null
- && state.getLatestSnapshot().getTermIndex().equals(previous))
- || (state.getLatestInstalledSnapshot() != null)
- && state.getLatestInstalledSnapshot().equals(previous);
- }
-
- @Override
- public InstallSnapshotReplyProto installSnapshot(
- InstallSnapshotRequestProto request) throws IOException {
- final String leaderId = request.getServerRequest().getRequestorId();
- CodeInjectionForTesting.execute(INSTALL_SNAPSHOT, getId(), leaderId, request);
- LOG.debug("{}: receive installSnapshot({})", getId(), request);
-
- lifeCycle.assertCurrentState(STARTING, RUNNING);
-
- final long currentTerm;
- final long leaderTerm = request.getLeaderTerm();
- final TermIndex lastTermIndex = ServerProtoUtils.toTermIndex(
- request.getTermIndex());
- final long lastIncludedIndex = lastTermIndex.getIndex();
- synchronized (this) {
- final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
- currentTerm = state.getCurrentTerm();
- if (!recognized) {
- final InstallSnapshotReplyProto reply = ServerProtoUtils
- .toInstallSnapshotReplyProto(leaderId, getId(), currentTerm,
- request.getRequestIndex(), InstallSnapshotResult.NOT_LEADER);
- LOG.debug("{}: do not recognize leader for installing snapshot." +
- " Reply: {}", getId(), reply);
- return reply;
- }
- changeToFollower(leaderTerm, true);
- state.setLeader(leaderId);
-
- if (lifeCycle.getCurrentState() == RUNNING) {
- heartbeatMonitor.updateLastRpcTime(true);
- }
-
- // Check and append the snapshot chunk. We simply put this in lock
- // considering a follower peer requiring a snapshot installation does not
- // have a lot of requests
- Preconditions.checkState(
- state.getLog().getNextIndex() <= lastIncludedIndex,
- "%s log's next id is %s, last included index in snapshot is %s",
- getId(), state.getLog().getNextIndex(), lastIncludedIndex);
-
- //TODO: We should only update State with installed snapshot once the request is done.
- state.installSnapshot(request);
-
- // update the committed index
- // re-load the state machine if this is the last chunk
- if (request.getDone()) {
- state.reloadStateMachine(lastIncludedIndex, leaderTerm);
- }
- if (lifeCycle.getCurrentState() == RUNNING) {
- heartbeatMonitor.updateLastRpcTime(false);
- }
- }
- if (request.getDone()) {
- LOG.info("{}: successfully install the whole snapshot-{}", getId(),
- lastIncludedIndex);
- }
- return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getId(),
- currentTerm, request.getRequestIndex(), InstallSnapshotResult.SUCCESS);
- }
-
- AppendEntriesRequestProto createAppendEntriesRequest(long leaderTerm,
- String targetId, TermIndex previous, List<LogEntryProto> entries,
- boolean initializing) {
- return ServerProtoUtils.toAppendEntriesRequestProto(getId(), targetId,
- leaderTerm, entries, state.getLog().getLastCommittedIndex(),
- initializing, previous);
- }
-
- synchronized InstallSnapshotRequestProto createInstallSnapshotRequest(
- String targetId, String requestId, int requestIndex, SnapshotInfo snapshot,
- List<FileChunkProto> chunks, boolean done) {
- OptionalLong totalSize = snapshot.getFiles().stream()
- .mapToLong(FileInfo::getFileSize).reduce(Long::sum);
- assert totalSize.isPresent();
- return ServerProtoUtils.toInstallSnapshotRequestProto(getId(), targetId,
- requestId, requestIndex, state.getCurrentTerm(), snapshot.getTermIndex(),
- chunks, totalSize.getAsLong(), done);
- }
-
- synchronized RequestVoteRequestProto createRequestVoteRequest(String targetId,
- long term, TermIndex lastEntry) {
- return ServerProtoUtils.toRequestVoteRequestProto(getId(), targetId, term,
- lastEntry);
- }
-
- public synchronized void submitLocalSyncEvent() {
- if (isLeader() && leaderState != null) {
- leaderState.submitUpdateStateEvent(UPDATE_COMMIT_EVENT);
- }
- }
-
- public void addPeersToRPC(Iterable<RaftPeer> peers) {
- serverRpc.addPeers(peers);
- }
-
- synchronized void replyPendingRequest(long logIndex,
- CompletableFuture<Message> message) {
- if (isLeader() && leaderState != null) { // is leader and is running
- leaderState.replyPendingRequest(logIndex, message);
- }
- }
-
- TransactionContext getTransactionContext(long index) {
- if (leaderState != null) { // is leader and is running
- return leaderState.getTransactionContext(index);
- }
- return null;
- }
-
- public RaftProperties getProperties() {
- return this.properties;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/RaftServerConfigKeys.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/RaftServerConfigKeys.java b/raft-server/src/main/java/org/apache/raft/server/RaftServerConfigKeys.java
index 837b53b..2ce0326 100644
--- a/raft-server/src/main/java/org/apache/raft/server/RaftServerConfigKeys.java
+++ b/raft-server/src/main/java/org/apache/raft/server/RaftServerConfigKeys.java
@@ -17,6 +17,7 @@
*/
package org.apache.raft.server;
+import org.apache.raft.server.impl.LogAppenderFactory;
import org.apache.raft.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/RaftServerConstants.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/RaftServerConstants.java b/raft-server/src/main/java/org/apache/raft/server/RaftServerConstants.java
deleted file mode 100644
index f6781f3..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/RaftServerConstants.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server;
-
-import org.apache.raft.client.RaftClient;
-
-public interface RaftServerConstants {
- long INVALID_LOG_INDEX = -1;
- byte LOG_TERMINATE_BYTE = 0;
- long DEFAULT_SEQNUM = RaftClient.DEFAULT_SEQNUM;
-
- enum StartupOption {
- FORMAT("format"),
- REGULAR("regular");
-
- private final String option;
-
- StartupOption(String arg) {
- this.option = arg;
- }
-
- public static StartupOption getOption(String arg) {
- for (StartupOption s : StartupOption.values()) {
- if (s.option.equals(arg)) {
- return s;
- }
- }
- return REGULAR;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/RaftServerRpc.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/RaftServerRpc.java b/raft-server/src/main/java/org/apache/raft/server/RaftServerRpc.java
deleted file mode 100644
index de81ec2..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/RaftServerRpc.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server;
-
-import org.apache.raft.protocol.RaftPeer;
-import org.apache.raft.shaded.proto.RaftProtos.*;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-public interface RaftServerRpc {
- void start();
-
- void shutdown();
-
- InetSocketAddress getInetSocketAddress();
-
- AppendEntriesReplyProto sendAppendEntries(
- AppendEntriesRequestProto request) throws IOException;
-
- InstallSnapshotReplyProto sendInstallSnapshot(
- InstallSnapshotRequestProto request) throws IOException;
-
- RequestVoteReplyProto sendRequestVote(RequestVoteRequestProto request)
- throws IOException;
-
- /** add information of the given peers */
- void addPeers(Iterable<RaftPeer> peers);
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/RequestDispatcher.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/RequestDispatcher.java b/raft-server/src/main/java/org/apache/raft/server/RequestDispatcher.java
deleted file mode 100644
index e281bfa..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/RequestDispatcher.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server;
-
-import org.apache.raft.protocol.*;
-import org.apache.raft.server.protocol.RaftServerProtocol;
-import org.apache.raft.shaded.proto.RaftProtos.*;
-import org.apache.raft.statemachine.StateMachine;
-import org.apache.raft.statemachine.TransactionContext;
-import org.apache.raft.util.RaftUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-
-/**
- * Each RPC request is first handled by the RequestDispatcher:
- * 1. A request from another RaftPeer is to be handled by RaftServer.
- *
- * If the raft peer is the leader, then:
- *
- * 2. A read-only request from client is to be handled by the state machine.
- * 3. A write request from client is first validated by the state machine. The
- * state machine returns the content of the raft log entry, which is then passed
- * to the RaftServer for replication.
- */
-public class RequestDispatcher implements RaftClientProtocol, RaftServerProtocol {
- static final Logger LOG = LoggerFactory.getLogger(RequestDispatcher.class);
-
- private final RaftServer server;
- private final StateMachine stateMachine;
-
- public RequestDispatcher(RaftServer server) {
- this.server = server;
- this.stateMachine = server.getStateMachine();
- }
-
- public CompletableFuture<RaftClientReply> handleClientRequest(
- RaftClientRequest request) throws IOException {
- // first check the server's leader state
- CompletableFuture<RaftClientReply> reply = server.checkLeaderState(request);
- if (reply != null) {
- return reply;
- }
-
- // let the state machine handle read-only request from client
- if (request.isReadOnly()) {
- // TODO: We might not be the leader anymore by the time this completes. See the RAFT paper,
- // section 8 (last part)
- return stateMachine.query(request);
- }
-
- // TODO: this client request will not be added to pending requests
- // until later which means that any failure in between will leave partial state in the
- // state machine. We should call cancelTransaction() for failed requests
- TransactionContext entry = stateMachine.startTransaction(request);
- if (entry.getException().isPresent()) {
- throw RaftUtils.asIOException(entry.getException().get());
- }
-
- return server.appendTransaction(request, entry);
- }
-
- @Override
- public RaftClientReply submitClientRequest(RaftClientRequest request)
- throws IOException {
- return waitForReply(server.getId(), request, handleClientRequest(request));
- }
-
- public CompletableFuture<RaftClientReply> setConfigurationAsync(
- SetConfigurationRequest request) throws IOException {
- return server.setConfiguration(request);
- }
-
- @Override
- public RaftClientReply setConfiguration(SetConfigurationRequest request)
- throws IOException {
- return waitForReply(server.getId(), request, setConfigurationAsync(request));
- }
-
- private static RaftClientReply waitForReply(String serverId,
- RaftClientRequest request, CompletableFuture<RaftClientReply> future)
- throws IOException {
- try {
- return future.get();
- } catch (InterruptedException e) {
- final String s = serverId + ": Interrupted when waiting for reply, request=" + request;
- LOG.info(s, e);
- throw RaftUtils.toInterruptedIOException(s, e);
- } catch (ExecutionException e) {
- final Throwable cause = e.getCause();
- if (cause == null) {
- throw new IOException(e);
- }
- if (cause instanceof NotLeaderException) {
- return new RaftClientReply(request, (NotLeaderException)cause);
- } else {
- throw RaftUtils.asIOException(cause);
- }
- }
- }
-
- @Override
- public RequestVoteReplyProto requestVote(RequestVoteRequestProto request)
- throws IOException {
- return server.requestVote(request);
- }
-
- @Override
- public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request)
- throws IOException {
- return server.appendEntries(request);
- }
-
- @Override
- public InstallSnapshotReplyProto installSnapshot(
- InstallSnapshotRequestProto request) throws IOException {
- return server.installSnapshot(request);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c36810ed/raft-server/src/main/java/org/apache/raft/server/Role.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/Role.java b/raft-server/src/main/java/org/apache/raft/server/Role.java
deleted file mode 100644
index a7e2f4c..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/Role.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server;
-
-/**
- * Role of Raft peer
- */
-public enum Role {
- LEADER, CANDIDATE, FOLLOWER
-}