You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/06 00:46:58 UTC
[incubator-ratis] branch master updated: RATIS-1206. Use atomic
update instead of synchronized in AppenderDaemon. (#324)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new c95d9a4 RATIS-1206. Use atomic update instead of synchronized in AppenderDaemon. (#324)
c95d9a4 is described below
commit c95d9a4cca08cc3c8561208064ae2f6951817f58
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Sun Dec 6 08:46:51 2020 +0800
RATIS-1206. Use atomic update instead of synchronized in AppenderDaemon. (#324)
---
.../main/java/org/apache/ratis/util/LifeCycle.java | 48 ++++++++
.../org/apache/ratis/server/impl/LogAppender.java | 103 ++---------------
.../ratis/server/impl/LogAppenderDaemon.java | 127 +++++++++++++++++++++
.../apache/ratis/server/impl/ServerImplUtils.java | 8 ++
4 files changed, 191 insertions(+), 95 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java b/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
index f789e27..d543d58 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
@@ -31,6 +31,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
+import java.util.function.UnaryOperator;
/**
* The life cycle of a machine.
@@ -172,6 +173,53 @@ public class LifeCycle {
}
/**
+ * Transition from the current state to the given state only if the transition is valid.
+ * If the transition is invalid, this is a no-op.
+ *
+ * @return true if the updated state equals to the given state.
+ */
+ public boolean transitionIfValid(final State to) {
+ final State updated = current.updateAndGet(from -> State.isValid(from, to)? to : from);
+ return updated == to;
+ }
+
+ /**
+ * Transition using the given operator.
+ *
+ * @return the updated state if there is a transition;
+ * otherwise, return null to indicate no state change.
+ */
+ public State transition(UnaryOperator<State> operator) {
+ for(;;) {
+ final State previous = current.get();
+ final State applied = operator.apply(previous);
+ if (previous == applied) {
+ return null; // no change required
+ }
+ State.validate(name, previous, applied);
+ if (current.compareAndSet(previous, applied)) {
+ return applied;
+ }
+ // state has been changed, retry
+ }
+ }
+
+ /**
+ * Transition using the given operator.
+ *
+ * @return the updated state.
+ */
+ public State transitionAndGet(UnaryOperator<State> operator) {
+ return current.updateAndGet(previous -> {
+ final State applied = operator.apply(previous);
+ if (applied != previous) {
+ State.validate(name, previous, applied);
+ }
+ return applied;
+ });
+ }
+
+ /**
* If the current state is equal to the specified from state,
* then transition to the give to state; otherwise, make no change.
*
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index 493b820..a922e00 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -21,8 +21,8 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerRpc;
-import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.leader.LeaderState;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
@@ -40,12 +40,6 @@ import java.util.*;
import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
import static org.apache.ratis.server.metrics.RaftLogMetrics.LOG_APPENDER_INSTALL_SNAPSHOT_METRIC;
-import static org.apache.ratis.util.LifeCycle.State.CLOSED;
-import static org.apache.ratis.util.LifeCycle.State.CLOSING;
-import static org.apache.ratis.util.LifeCycle.State.EXCEPTION;
-import static org.apache.ratis.util.LifeCycle.State.NEW;
-import static org.apache.ratis.util.LifeCycle.State.RUNNING;
-import static org.apache.ratis.util.LifeCycle.State.STARTING;
/**
* A daemon thread appending log entries to a follower peer.
@@ -53,88 +47,6 @@ import static org.apache.ratis.util.LifeCycle.State.STARTING;
public class LogAppender {
public static final Logger LOG = LoggerFactory.getLogger(LogAppender.class);
- class AppenderDaemon {
- private final String name = LogAppender.this + "-" + JavaUtils.getClassSimpleName(getClass());
- private final LifeCycle lifeCycle = new LifeCycle(name);
- private final Daemon daemon = new Daemon(this::run);
-
- void start() {
- // The life cycle state could be already closed due to server shutdown.
- synchronized (lifeCycle) {
- if (lifeCycle.compareAndTransition(NEW, STARTING)) {
- daemon.start();
- }
- }
- }
-
- void run() {
- synchronized (lifeCycle) {
- if (!isRunning()) {
- return;
- }
- transitionLifeCycle(RUNNING);
- }
- try {
- runAppenderImpl();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.info(this + " was interrupted: " + e);
- } catch (InterruptedIOException e) {
- LOG.info(this + " was interrupted: " + e);
- } catch (RaftLogIOException e) {
- LOG.error(this + " failed RaftLog", e);
- transitionLifeCycle(EXCEPTION);
- } catch (IOException e) {
- LOG.error(this + " failed IOException", e);
- transitionLifeCycle(EXCEPTION);
- } catch (Throwable e) {
- LOG.error(this + " unexpected exception", e);
- transitionLifeCycle(EXCEPTION);
- } finally {
- synchronized (lifeCycle) {
- if (!lifeCycle.compareAndTransition(CLOSING, CLOSED)) {
- lifeCycle.transitionIfNotEqual(EXCEPTION);
- }
- if (lifeCycle.getCurrentState() == EXCEPTION) {
- leaderState.restart(LogAppender.this);
- }
- }
- }
- }
-
- boolean isRunning() {
- return !LifeCycle.States.CLOSING_OR_CLOSED_OR_EXCEPTION.contains(lifeCycle.getCurrentState());
- }
-
- void stop() {
- synchronized (lifeCycle) {
- if (LifeCycle.States.CLOSING_OR_CLOSED.contains(lifeCycle.getCurrentState())) {
- return;
- }
- if (lifeCycle.compareAndTransition(NEW, CLOSED)) {
- return;
- }
- transitionLifeCycle(CLOSING);
- }
- daemon.interrupt();
- }
-
- @Override
- public String toString() {
- return name;
- }
-
- private boolean transitionLifeCycle(LifeCycle.State to) {
- synchronized (lifeCycle) {
- if (LifeCycle.State.isValid(lifeCycle.getCurrentState(), to)) {
- lifeCycle.transition(to);
- return true;
- }
- return false;
- }
- }
- }
-
private final String name;
private final RaftServer.Division server;
private final LeaderState leaderState;
@@ -145,7 +57,7 @@ public class LogAppender {
private final int snapshotChunkMaxSize;
private final long halfMinTimeoutMs;
- private final AppenderDaemon daemon;
+ private final LogAppenderDaemon daemon;
public LogAppender(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) {
this.follower = f;
@@ -161,7 +73,7 @@ public class LogAppender {
final SizeInBytes bufferByteLimit = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
final int bufferElementLimit = RaftServerConfigKeys.Log.Appender.bufferElementLimit(properties);
this.buffer = new DataQueue<>(this, bufferByteLimit, bufferElementLimit, EntryWithData::getSerializedSize);
- this.daemon = new AppenderDaemon();
+ this.daemon = new LogAppenderDaemon(this);
}
protected RaftServer.Division getServer() {
@@ -182,15 +94,15 @@ public class LogAppender {
}
void startAppender() {
- daemon.start();
+ daemon.tryToStart();
}
public boolean isAppenderRunning() {
- return daemon.isRunning();
+ return daemon.isWorking();
}
public void stopAppender() {
- daemon.stop();
+ daemon.tryToClose();
}
public FollowerInfo getFollower() {
@@ -329,7 +241,8 @@ public class LogAppender {
protected Iterable<InstallSnapshotRequestProto> newInstallSnapshotRequests(
String requestId, SnapshotInfo snapshot) {
- return new InstallSnapshotRequests(server, getFollowerId(), requestId, snapshot, snapshotChunkMaxSize);
+ return ServerImplUtils.newInstallSnapshotRequests(server, getFollowerId(), requestId,
+ snapshot, snapshotChunkMaxSize);
}
private InstallSnapshotReplyProto installSnapshot(SnapshotInfo snapshot) throws InterruptedIOException {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java
new file mode 100644
index 0000000..4d0a662
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.util.Daemon;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.LifeCycle;
+
+import java.io.InterruptedIOException;
+import java.util.function.UnaryOperator;
+
+import org.apache.ratis.util.LifeCycle.State;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.ratis.util.LifeCycle.State.CLOSED;
+import static org.apache.ratis.util.LifeCycle.State.CLOSING;
+import static org.apache.ratis.util.LifeCycle.State.EXCEPTION;
+import static org.apache.ratis.util.LifeCycle.State.NEW;
+import static org.apache.ratis.util.LifeCycle.State.RUNNING;
+import static org.apache.ratis.util.LifeCycle.State.STARTING;
+
+class LogAppenderDaemon {
+ public static final Logger LOG = LoggerFactory.getLogger(LogAppenderDaemon.class);
+
+ private final String name;
+ private final LifeCycle lifeCycle;
+ private final Daemon daemon;
+
+ private final LogAppender logAppender;
+
+ LogAppenderDaemon(LogAppender logAppender) {
+ this.logAppender = logAppender;
+ this.name = logAppender + "-" + JavaUtils.getClassSimpleName(getClass());
+ this.lifeCycle = new LifeCycle(name);
+ this.daemon = new Daemon(this::run, name);
+ }
+
+ public boolean isWorking() {
+ return !LifeCycle.States.CLOSING_OR_CLOSED_OR_EXCEPTION.contains(lifeCycle.getCurrentState());
+ }
+
+ public void tryToStart() {
+ if (lifeCycle.compareAndTransition(NEW, STARTING)) {
+ daemon.start();
+ }
+ }
+
+ static final UnaryOperator<State> TRY_TO_RUN = current -> {
+ if (current == STARTING) {
+ return RUNNING;
+ } else if (LifeCycle.States.CLOSING_OR_CLOSED_OR_EXCEPTION.contains(current)) {
+ return current;
+ }
+ // Other states are illegal.
+ throw new IllegalArgumentException("Cannot to tryToRun from " + current);
+ };
+
+ private void run() {
+ try {
+ if (lifeCycle.transition(TRY_TO_RUN) == RUNNING) {
+ logAppender.runAppenderImpl();
+ }
+ lifeCycle.compareAndTransition(RUNNING, CLOSING);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.info(this + " was interrupted: " + e);
+ } catch (InterruptedIOException e) {
+ LOG.info(this + " I/O was interrupted: " + e);
+ } catch (Throwable e) {
+ LOG.error(this + " failed", e);
+ lifeCycle.transitionIfValid(EXCEPTION);
+ } finally {
+ if (lifeCycle.transitionAndGet(TRANSITION_FINALLY) == EXCEPTION) {
+ logAppender.getLeaderState().restart(logAppender);
+ }
+ }
+ }
+
+ static final UnaryOperator<State> TRANSITION_FINALLY = current -> {
+ if (State.isValid(current, CLOSED)) {
+ return CLOSED;
+ } else if (State.isValid(current, EXCEPTION)) {
+ return EXCEPTION;
+ } else {
+ return current;
+ }
+ };
+
+ public void tryToClose() {
+ if (lifeCycle.transition(TRY_TO_CLOSE) == CLOSING) {
+ daemon.interrupt();
+ }
+ }
+
+ static final UnaryOperator<State> TRY_TO_CLOSE = current -> {
+ if (current == NEW) {
+ return CLOSED;
+ } else if (current.isClosingOrClosed()) {
+ return current;
+ } else if (State.isValid(current, CLOSING)) {
+ return CLOSING;
+ }
+ // Other states are illegal.
+ throw new IllegalArgumentException("Cannot to tryToClose from " + current);
+ };
+
+ @Override
+ public String toString() {
+ return name;
+ }
+}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index db452da..1117ceb 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -19,11 +19,13 @@ package org.apache.ratis.server.impl;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
@@ -66,6 +68,12 @@ public final class ServerImplUtils {
return proxy;
}
+ public static Iterable<InstallSnapshotRequestProto> newInstallSnapshotRequests(RaftServer.Division server,
+ RaftPeerId targetId, String requestId, SnapshotInfo snapshot, int snapshotChunkMaxSize) {
+ return new InstallSnapshotRequests(server, targetId, requestId, snapshot, snapshotChunkMaxSize);
+ }
+
+
static long effectiveCommitIndex(long leaderCommitIndex, TermIndex followerPrevious, int numAppendEntries) {
final long p = Optional.ofNullable(followerPrevious).map(TermIndex::getIndex).orElse(RaftLog.LEAST_VALID_LOG_INDEX);
return Math.min(leaderCommitIndex, p + numAppendEntries);