You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/06 00:46:58 UTC

[incubator-ratis] branch master updated: RATIS-1206. Use atomic update instead of synchronized in AppenderDaemon. (#324)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new c95d9a4  RATIS-1206. Use atomic update instead of synchronized in AppenderDaemon. (#324)
c95d9a4 is described below

commit c95d9a4cca08cc3c8561208064ae2f6951817f58
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Sun Dec 6 08:46:51 2020 +0800

    RATIS-1206. Use atomic update instead of synchronized in AppenderDaemon. (#324)
---
 .../main/java/org/apache/ratis/util/LifeCycle.java |  48 ++++++++
 .../org/apache/ratis/server/impl/LogAppender.java  | 103 ++---------------
 .../ratis/server/impl/LogAppenderDaemon.java       | 127 +++++++++++++++++++++
 .../apache/ratis/server/impl/ServerImplUtils.java  |   8 ++
 4 files changed, 191 insertions(+), 95 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java b/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
index f789e27..d543d58 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
+import java.util.function.UnaryOperator;
 
 /**
  * The life cycle of a machine.
@@ -172,6 +173,53 @@ public class LifeCycle {
   }
 
   /**
+   * Transition from the current state to the given state only if the transition is valid.
+   * If the transition is invalid, this is a no-op.
+   *
+   * @return true if the updated state equals to the given state.
+   */
+  public boolean transitionIfValid(final State to) {
+    final State updated = current.updateAndGet(from -> State.isValid(from, to)? to : from);
+    return updated == to;
+  }
+
+  /**
+   * Transition using the given operator.
+   *
+   * @return the updated state if there is a transition;
+   *         otherwise, return null to indicate no state change.
+   */
+  public State transition(UnaryOperator<State> operator) {
+    for(;;) {
+      final State previous = current.get();
+      final State applied = operator.apply(previous);
+      if (previous == applied) {
+        return null; // no change required
+      }
+      State.validate(name, previous, applied);
+      if (current.compareAndSet(previous, applied)) {
+        return applied;
+      }
+      // state has been changed, retry
+    }
+  }
+
+  /**
+   * Transition using the given operator.
+   *
+   * @return the updated state.
+   */
+  public State transitionAndGet(UnaryOperator<State> operator) {
+    return current.updateAndGet(previous -> {
+      final State applied = operator.apply(previous);
+      if (applied != previous) {
+        State.validate(name, previous, applied);
+      }
+      return applied;
+    });
+  }
+
+  /**
    * If the current state is equal to the specified from state,
    * then transition to the give to state; otherwise, make no change.
    *
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index 493b820..a922e00 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -21,8 +21,8 @@ import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerRpc;
-import org.apache.ratis.server.leader.FollowerInfo;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.leader.FollowerInfo;
 import org.apache.ratis.server.leader.LeaderState;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.RaftLog;
@@ -40,12 +40,6 @@ import java.util.*;
 
 import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
 import static org.apache.ratis.server.metrics.RaftLogMetrics.LOG_APPENDER_INSTALL_SNAPSHOT_METRIC;
-import static org.apache.ratis.util.LifeCycle.State.CLOSED;
-import static org.apache.ratis.util.LifeCycle.State.CLOSING;
-import static org.apache.ratis.util.LifeCycle.State.EXCEPTION;
-import static org.apache.ratis.util.LifeCycle.State.NEW;
-import static org.apache.ratis.util.LifeCycle.State.RUNNING;
-import static org.apache.ratis.util.LifeCycle.State.STARTING;
 
 /**
  * A daemon thread appending log entries to a follower peer.
@@ -53,88 +47,6 @@ import static org.apache.ratis.util.LifeCycle.State.STARTING;
 public class LogAppender {
   public static final Logger LOG = LoggerFactory.getLogger(LogAppender.class);
 
-  class AppenderDaemon {
-    private final String name = LogAppender.this + "-" + JavaUtils.getClassSimpleName(getClass());
-    private final LifeCycle lifeCycle = new LifeCycle(name);
-    private final Daemon daemon = new Daemon(this::run);
-
-    void start() {
-      // The life cycle state could be already closed due to server shutdown.
-      synchronized (lifeCycle) {
-        if (lifeCycle.compareAndTransition(NEW, STARTING)) {
-          daemon.start();
-        }
-      }
-    }
-
-    void run() {
-      synchronized (lifeCycle) {
-        if (!isRunning()) {
-          return;
-        }
-        transitionLifeCycle(RUNNING);
-      }
-      try {
-        runAppenderImpl();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        LOG.info(this + " was interrupted: " + e);
-      } catch (InterruptedIOException e) {
-        LOG.info(this + " was interrupted: " + e);
-      } catch (RaftLogIOException e) {
-        LOG.error(this + " failed RaftLog", e);
-        transitionLifeCycle(EXCEPTION);
-      } catch (IOException e) {
-        LOG.error(this + " failed IOException", e);
-        transitionLifeCycle(EXCEPTION);
-      } catch (Throwable e) {
-        LOG.error(this + " unexpected exception", e);
-        transitionLifeCycle(EXCEPTION);
-      } finally {
-        synchronized (lifeCycle) {
-          if (!lifeCycle.compareAndTransition(CLOSING, CLOSED)) {
-            lifeCycle.transitionIfNotEqual(EXCEPTION);
-          }
-          if (lifeCycle.getCurrentState() == EXCEPTION) {
-            leaderState.restart(LogAppender.this);
-          }
-        }
-      }
-    }
-
-    boolean isRunning() {
-      return !LifeCycle.States.CLOSING_OR_CLOSED_OR_EXCEPTION.contains(lifeCycle.getCurrentState());
-    }
-
-    void stop() {
-      synchronized (lifeCycle) {
-        if (LifeCycle.States.CLOSING_OR_CLOSED.contains(lifeCycle.getCurrentState())) {
-          return;
-        }
-        if (lifeCycle.compareAndTransition(NEW, CLOSED)) {
-          return;
-        }
-        transitionLifeCycle(CLOSING);
-      }
-      daemon.interrupt();
-    }
-
-    @Override
-    public String toString() {
-      return name;
-    }
-
-    private boolean transitionLifeCycle(LifeCycle.State to) {
-      synchronized (lifeCycle) {
-        if (LifeCycle.State.isValid(lifeCycle.getCurrentState(), to)) {
-          lifeCycle.transition(to);
-          return true;
-        }
-        return false;
-      }
-    }
-  }
-
   private final String name;
   private final RaftServer.Division server;
   private final LeaderState leaderState;
@@ -145,7 +57,7 @@ public class LogAppender {
   private final int snapshotChunkMaxSize;
   private final long halfMinTimeoutMs;
 
-  private final AppenderDaemon daemon;
+  private final LogAppenderDaemon daemon;
 
   public LogAppender(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) {
     this.follower = f;
@@ -161,7 +73,7 @@ public class LogAppender {
     final SizeInBytes bufferByteLimit = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
     final int bufferElementLimit = RaftServerConfigKeys.Log.Appender.bufferElementLimit(properties);
     this.buffer = new DataQueue<>(this, bufferByteLimit, bufferElementLimit, EntryWithData::getSerializedSize);
-    this.daemon = new AppenderDaemon();
+    this.daemon = new LogAppenderDaemon(this);
   }
 
   protected RaftServer.Division getServer() {
@@ -182,15 +94,15 @@ public class LogAppender {
   }
 
   void startAppender() {
-    daemon.start();
+    daemon.tryToStart();
   }
 
   public boolean isAppenderRunning() {
-    return daemon.isRunning();
+    return daemon.isWorking();
   }
 
   public void stopAppender() {
-    daemon.stop();
+    daemon.tryToClose();
   }
 
   public FollowerInfo getFollower() {
@@ -329,7 +241,8 @@ public class LogAppender {
 
   protected Iterable<InstallSnapshotRequestProto> newInstallSnapshotRequests(
       String requestId, SnapshotInfo snapshot) {
-    return new InstallSnapshotRequests(server, getFollowerId(), requestId, snapshot, snapshotChunkMaxSize);
+    return ServerImplUtils.newInstallSnapshotRequests(server, getFollowerId(), requestId,
+        snapshot, snapshotChunkMaxSize);
   }
 
   private InstallSnapshotReplyProto installSnapshot(SnapshotInfo snapshot) throws InterruptedIOException {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java
new file mode 100644
index 0000000..4d0a662
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.util.Daemon;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.LifeCycle;
+
+import java.io.InterruptedIOException;
+import java.util.function.UnaryOperator;
+
+import org.apache.ratis.util.LifeCycle.State;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.ratis.util.LifeCycle.State.CLOSED;
+import static org.apache.ratis.util.LifeCycle.State.CLOSING;
+import static org.apache.ratis.util.LifeCycle.State.EXCEPTION;
+import static org.apache.ratis.util.LifeCycle.State.NEW;
+import static org.apache.ratis.util.LifeCycle.State.RUNNING;
+import static org.apache.ratis.util.LifeCycle.State.STARTING;
+
+class LogAppenderDaemon {
+  public static final Logger LOG = LoggerFactory.getLogger(LogAppenderDaemon.class);
+
+  private final String name;
+  private final LifeCycle lifeCycle;
+  private final Daemon daemon;
+
+  private final LogAppender logAppender;
+
+  LogAppenderDaemon(LogAppender logAppender) {
+    this.logAppender = logAppender;
+    this.name = logAppender + "-" + JavaUtils.getClassSimpleName(getClass());
+    this.lifeCycle = new LifeCycle(name);
+    this.daemon = new Daemon(this::run, name);
+  }
+
+  public boolean isWorking() {
+    return !LifeCycle.States.CLOSING_OR_CLOSED_OR_EXCEPTION.contains(lifeCycle.getCurrentState());
+  }
+
+  public void tryToStart() {
+    if (lifeCycle.compareAndTransition(NEW, STARTING)) {
+      daemon.start();
+    }
+  }
+
+  static final UnaryOperator<State> TRY_TO_RUN = current -> {
+    if (current == STARTING) {
+      return RUNNING;
+    } else if (LifeCycle.States.CLOSING_OR_CLOSED_OR_EXCEPTION.contains(current)) {
+      return current;
+    }
+    // Other states are illegal.
+    throw new IllegalArgumentException("Cannot to tryToRun from " + current);
+  };
+
+  private void run() {
+    try {
+      if (lifeCycle.transition(TRY_TO_RUN) == RUNNING) {
+        logAppender.runAppenderImpl();
+      }
+      lifeCycle.compareAndTransition(RUNNING, CLOSING);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.info(this + " was interrupted: " + e);
+    } catch (InterruptedIOException e) {
+      LOG.info(this + " I/O was interrupted: " + e);
+    } catch (Throwable e) {
+      LOG.error(this + " failed", e);
+      lifeCycle.transitionIfValid(EXCEPTION);
+    } finally {
+      if (lifeCycle.transitionAndGet(TRANSITION_FINALLY) == EXCEPTION) {
+        logAppender.getLeaderState().restart(logAppender);
+      }
+    }
+  }
+
+  static final UnaryOperator<State> TRANSITION_FINALLY = current -> {
+    if (State.isValid(current, CLOSED)) {
+      return CLOSED;
+    } else if (State.isValid(current, EXCEPTION)) {
+      return EXCEPTION;
+    } else {
+      return current;
+    }
+  };
+
+  public void tryToClose() {
+    if (lifeCycle.transition(TRY_TO_CLOSE) == CLOSING) {
+      daemon.interrupt();
+    }
+  }
+
+  static final UnaryOperator<State> TRY_TO_CLOSE = current -> {
+    if (current == NEW) {
+      return CLOSED;
+    } else if (current.isClosingOrClosed()) {
+      return current;
+    } else if (State.isValid(current, CLOSING)) {
+      return CLOSING;
+    }
+    // Other states are illegal.
+    throw new IllegalArgumentException("Cannot to tryToClose from " + current);
+  };
+
+  @Override
+  public String toString() {
+    return name;
+  }
+}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index db452da..1117ceb 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -19,11 +19,13 @@ package org.apache.ratis.server.impl;
 
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.JavaUtils;
@@ -66,6 +68,12 @@ public final class ServerImplUtils {
     return proxy;
   }
 
+  public static Iterable<InstallSnapshotRequestProto> newInstallSnapshotRequests(RaftServer.Division server,
+      RaftPeerId targetId, String requestId, SnapshotInfo snapshot, int snapshotChunkMaxSize) {
+    return new InstallSnapshotRequests(server, targetId, requestId, snapshot, snapshotChunkMaxSize);
+  }
+
+
   static long effectiveCommitIndex(long leaderCommitIndex, TermIndex followerPrevious, int numAppendEntries) {
     final long p = Optional.ofNullable(followerPrevious).map(TermIndex::getIndex).orElse(RaftLog.LEAST_VALID_LOG_INDEX);
     return Math.min(leaderCommitIndex, p + numAppendEntries);