You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ms...@apache.org on 2018/02/01 05:19:33 UTC

incubator-ratis git commit: RATIS-204. Add getLastAppliedTermIndex to StateMachine. Contributed by Tsz Wo Nicholas Sze.

Repository: incubator-ratis
Updated Branches:
  refs/heads/master dd15f2a29 -> 1beef8741


RATIS-204. Add getLastAppliedTermIndex to StateMachine. Contributed by Tsz Wo Nicholas Sze.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/1beef874
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/1beef874
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/1beef874

Branch: refs/heads/master
Commit: 1beef8741160aa471a3b5af4c677875296f0a0aa
Parents: dd15f2a
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Thu Feb 1 10:49:01 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Thu Feb 1 10:49:01 2018 +0530

----------------------------------------------------------------------
 .../java/org/apache/ratis/util/JavaUtils.java   | 27 ++-----
 .../org/apache/ratis/util/MemoizedSupplier.java | 74 ++++++++++++++++++++
 .../arithmetic/ArithmeticStateMachine.java      | 28 ++------
 .../filestore/FileStoreStateMachine.java        | 29 +++-----
 .../ratis/server/impl/RaftServerImpl.java       | 14 ++--
 .../ratis/server/impl/StateMachineUpdater.java  | 19 +++--
 .../apache/ratis/statemachine/StateMachine.java |  8 ++-
 .../ratis/statemachine/TransactionContext.java  | 12 ++--
 .../statemachine/impl/BaseStateMachine.java     | 38 ++++++++--
 .../SimpleStateMachine4Testing.java             | 21 ++----
 .../ratis/statemachine/TermIndexTracker.java    | 68 ------------------
 11 files changed, 167 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1beef874/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index 0a99527..8322209 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
 import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadInfo;
 import java.lang.management.ThreadMXBean;
+import java.util.List;
 import java.util.Objects;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -88,26 +89,8 @@ public interface JavaUtils {
    * @param <T> The supplier result type.
    * @return a memoized supplier which is thread-safe.
    */
-  static <T> Supplier<T> memoize(Supplier<T> initializer) {
-    Objects.requireNonNull(initializer, "initializer == null");
-    return new Supplier<T>() {
-      private volatile T value = null;
-
-      @Override
-      public T get() {
-        T v = value;
-        if (v == null) {
-          synchronized (this) {
-            v = value;
-            if (v == null) {
-              v = value = Objects.requireNonNull(initializer.get(),
-                  "initializer.get() returns null");
-            }
-          }
-        }
-        return v;
-      }
-    };
+  static <T> MemoizedSupplier<T> memoize(Supplier<T> initializer) {
+    return MemoizedSupplier.valueOf(initializer);
   }
 
   Supplier<ThreadGroup> ROOT_THREAD_GROUP = memoize(() -> {
@@ -213,4 +196,8 @@ public interface JavaUtils {
     Objects.requireNonNull(t, "t == null");
     return t instanceof CompletionException && t.getCause() != null? t.getCause(): t;
   }
+
+  static <T> CompletableFuture<Void> allOf(List<CompletableFuture<T>> futures) {
+    return CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()]));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1beef874/ratis-common/src/main/java/org/apache/ratis/util/MemoizedSupplier.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/MemoizedSupplier.java b/ratis-common/src/main/java/org/apache/ratis/util/MemoizedSupplier.java
new file mode 100644
index 0000000..47a4a87
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/MemoizedSupplier.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import java.util.Objects;
+import java.util.function.Supplier;
+
+/**
+ * A memoized supplier is a {@link Supplier}
+ * which gets a value by invoking its initializer once
+ * and then keeps returning the same value as its supplied results.
+ *
+ * This class is thread safe.
+ *
+ * @param <T> The supplier result type.
+ */
+public class MemoizedSupplier<T> implements Supplier<T> {
+  /**
+   * @param supplier to supply at most one non-null value.
+   * @return a {@link MemoizedSupplier} with the given supplier.
+   */
+  public static <T> MemoizedSupplier<T> valueOf(Supplier<T> supplier) {
+    return supplier instanceof MemoizedSupplier ?
+        (MemoizedSupplier<T>) supplier : new MemoizedSupplier<>(supplier);
+  }
+
+  private final Supplier<T> initializer;
+  private volatile T value = null;
+
+  /**
+   * Create a memoized supplier.
+   * @param initializer to supply at most one non-null value.
+   */
+  private MemoizedSupplier(Supplier<T> initializer) {
+    Objects.requireNonNull(initializer, "initializer == null");
+    this.initializer = initializer;
+  }
+
+  /** @return the lazily initialized object. */
+  @Override
+  public T get() {
+    T v = value;
+    if (v == null) {
+      synchronized (this) {
+        v = value;
+        if (v == null) {
+          v = value = Objects.requireNonNull(initializer.get(),
+              "initializer.get() returns null");
+        }
+      }
+    }
+    return v;
+  }
+
+  /** @return is the object initialized? */
+  public boolean isInitialized() {
+    return value != null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1beef874/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java
index 4eb0756..0b08b89 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java
@@ -27,30 +27,24 @@ import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.statemachine.*;
+import org.apache.ratis.statemachine.StateMachineStorage;
+import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
 import org.apache.ratis.util.AutoCloseableLock;
-import org.apache.ratis.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.*;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class ArithmeticStateMachine extends BaseStateMachine {
-  static final Logger LOG = LoggerFactory.getLogger(ArithmeticStateMachine.class);
-
   private final Map<String, Double> variables = new ConcurrentHashMap<>();
 
   private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
-  private final AtomicReference<TermIndex> latestTermIndex = new AtomicReference<>();
 
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
 
@@ -64,7 +58,7 @@ public class ArithmeticStateMachine extends BaseStateMachine {
 
   void reset() {
     variables.clear();
-    latestTermIndex.set(null);
+    setLastAppliedTermIndex(null);
   }
 
   @Override
@@ -83,12 +77,12 @@ public class ArithmeticStateMachine extends BaseStateMachine {
   }
 
   @Override
-  public long takeSnapshot() throws IOException {
+  public long takeSnapshot() {
     final Map<String, Double> copy;
     final TermIndex last;
     try(final AutoCloseableLock readLock = readLock()) {
       copy = new HashMap<>(variables);
-      last = latestTermIndex.get();
+      last = getLastAppliedTermIndex();
     }
 
     final File snapshotFile =  storage.getSnapshotFile(last.getTerm(), last.getIndex());
@@ -127,7 +121,7 @@ public class ArithmeticStateMachine extends BaseStateMachine {
       if (reload) {
         reset();
       }
-      latestTermIndex.set(last);
+      setLastAppliedTermIndex(last);
       variables.putAll((Map<String, Double>) in.readObject());
     } catch (ClassNotFoundException e) {
       throw new IllegalStateException(e);
@@ -171,7 +165,7 @@ public class ArithmeticStateMachine extends BaseStateMachine {
     final Double result;
     try(final AutoCloseableLock writeLock = writeLock()) {
       result = assignment.evaluate(variables);
-      updateLatestTermIndex(entry.getTerm(), index);
+      updateLastAppliedTermIndex(entry.getTerm(), index);
     }
     final Expression r = Expression.Utils.double2Expression(result);
     LOG.debug("{}-{}: {} = {}", getId(), index, assignment, r);
@@ -180,12 +174,4 @@ public class ArithmeticStateMachine extends BaseStateMachine {
     }
     return CompletableFuture.completedFuture(Expression.Utils.toMessage(r));
   }
-
-  private void updateLatestTermIndex(long term, long index) {
-    final TermIndex newTI = TermIndex.newTermIndex(term, index);
-    final TermIndex oldTI = latestTermIndex.getAndSet(newTI);
-    if (oldTI != null) {
-      Preconditions.assertTrue(newTI.compareTo(oldTI) >= 0);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1beef874/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
index 2a09e1b..5ecb387 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
@@ -19,35 +19,30 @@ package org.apache.ratis.examples.filestore;
 
 import org.apache.ratis.conf.ConfUtils;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.*;
-import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.shaded.com.google.protobuf.ByteString;
 import org.apache.ratis.shaded.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.shaded.proto.ExamplesProtos.*;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
-import org.apache.ratis.statemachine.impl.BaseStateMachine;
-import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 import org.apache.ratis.statemachine.StateMachineStorage;
 import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 import org.apache.ratis.statemachine.impl.TransactionContextImpl;
 import org.apache.ratis.util.FileUtils;
-import org.apache.ratis.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicReference;
 
 public class FileStoreStateMachine extends BaseStateMachine {
-  public static final Logger LOG = LoggerFactory.getLogger(FileStoreStateMachine.class);
-
   private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
-  private final AtomicReference<TermIndex> latestTermIndex = new AtomicReference<>();
 
   private final FileStore files;
 
@@ -73,7 +68,7 @@ public class FileStoreStateMachine extends BaseStateMachine {
   @Override
   public void close() {
     files.close();
-    latestTermIndex.set(null);
+    setLastAppliedTermIndex(null);
   }
 
   @Override
@@ -137,7 +132,7 @@ public class FileStoreStateMachine extends BaseStateMachine {
     final LogEntryProto entry = trx.getLogEntry();
 
     final long index = entry.getIndex();
-    updateLatestTermIndex(entry.getTerm(), index);
+    updateLastAppliedTermIndex(entry.getTerm(), index);
 
     final SMLogEntryProto smLog = entry.getSmLogEntry();
     final FileStoreRequestProto request;
@@ -176,12 +171,4 @@ public class FileStoreStateMachine extends BaseStateMachine {
         DeleteReplyProto.newBuilder().setResolvedPath(
             FileStoreCommon.toByteString(resolved)).build().toByteString());
   }
-
-  private void updateLatestTermIndex(long term, long index) {
-    final TermIndex newTI = TermIndex.newTermIndex(term, index);
-    final TermIndex oldTI = latestTermIndex.getAndSet(newTI);
-    if (oldTI != null) {
-      Preconditions.assertTrue(newTI.compareTo(oldTI) >= 0);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1beef874/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 29b3a7e..a619909 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -801,8 +801,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
     logAppendEntries(isHeartbeat,
         () -> getId() + ": succeeded to handle AppendEntries. Reply: "
             + ServerProtoUtils.toString(reply));
-    return CompletableFuture
-        .allOf(futures.toArray(new CompletableFuture[futures.size()]))
+    return JavaUtils.allOf(futures)
         .thenApply(v -> {
           // reset election timer to avoid punishing the leader for our own
           // long disk writes
@@ -932,8 +931,8 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
    * @param stateMachineFuture the future returned by the state machine
    *                           from which we will get transaction result later
    */
-  private void replyPendingRequest(LogEntryProto logEntry,
-      CompletableFuture<Message> stateMachineFuture) {
+  private CompletableFuture<Message> replyPendingRequest(
+      LogEntryProto logEntry, CompletableFuture<Message> stateMachineFuture) {
     // update the retry cache
     final ClientId clientId = ClientId.valueOf(logEntry.getClientId());
     final long callId = logEntry.getCallId();
@@ -944,7 +943,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
       retryCache.refreshEntry(new RetryCache.CacheEntry(cacheEntry.getKey()));
     }
 
-    stateMachineFuture.whenComplete((reply, exception) -> {
+    return stateMachineFuture.whenComplete((reply, exception) -> {
       final RaftClientReply r;
       if (exception == null) {
         r = new RaftClientReply(clientId, serverId, groupId, callId, true, reply, null);
@@ -980,7 +979,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
     return s.getFollowerNextIndices();
   }
 
-  public void applyLogToStateMachine(LogEntryProto next) {
+  CompletableFuture<Message> applyLogToStateMachine(LogEntryProto next) {
     final StateMachine stateMachine = getStateMachine();
     if (next.getLogEntryBodyCase() == CONFIGURATIONENTRY) {
       // the reply should have already been set. only need to record
@@ -1000,8 +999,9 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
       // TODO: This step can be parallelized
       CompletableFuture<Message> stateMachineFuture =
           stateMachine.applyTransaction(trx);
-      replyPendingRequest(next, stateMachineFuture);
+      return replyPendingRequest(next, stateMachineFuture);
     }
+    return null;
   }
 
   private class RaftServerJmxAdapter extends JmxRegister implements RaftServerMXBean {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1beef874/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index 9ef6ce7..5fffb1f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -18,20 +18,21 @@
 package org.apache.ratis.server.impl;
 
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.Message;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.storage.RaftLog;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.statemachine.StateMachine;
-import org.apache.ratis.util.Daemon;
-import org.apache.ratis.util.ExitUtils;
-import org.apache.ratis.util.LifeCycle;
-import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * This class tracks the log entries that have been committed in a quorum and
@@ -140,6 +141,8 @@ class StateMachineUpdater implements Runnable {
           state = State.RUNNING;
         }
 
+        final MemoizedSupplier<List<CompletableFuture<Message>>> futures
+            = MemoizedSupplier.valueOf(() -> new ArrayList<>());
         while (lastAppliedIndex < committedIndex) {
           final long nextIndex = lastAppliedIndex + 1;
           final LogEntryProto next = raftLog.get(nextIndex);
@@ -148,7 +151,10 @@ class StateMachineUpdater implements Runnable {
               LOG.debug("{}: applying nextIndex={}, nextLog={}",
                   this, nextIndex, ServerProtoUtils.toString(next));
             }
-            server.applyLogToStateMachine(next);
+            final CompletableFuture<Message> f = server.applyLogToStateMachine(next);
+            if (f != null) {
+              futures.get().add(f);
+            }
             lastAppliedIndex = nextIndex;
           } else {
             LOG.debug("{}: logEntry {} is null. There may be snapshot to load. state:{}",
@@ -159,6 +165,9 @@ class StateMachineUpdater implements Runnable {
 
         // check if need to trigger a snapshot
         if (shouldTakeSnapshot(lastAppliedIndex)) {
+          if (futures.isInitialized()) {
+            JavaUtils.allOf(futures.get()).get();
+          }
           stateMachine.takeSnapshot();
           // TODO purge logs, including log cache. but should keep log for leader's RPCSenders
           lastSnapshotIndex = lastAppliedIndex;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1beef874/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 1d47815..cbedb08 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -24,10 +24,12 @@ import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftConfiguration;
-import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.util.LifeCycle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -40,6 +42,8 @@ import java.util.concurrent.CompletableFuture;
  * (see https://en.wikipedia.org/wiki/State_machine_replication).
  */
 public interface StateMachine extends Closeable {
+  Logger LOG = LoggerFactory.getLogger(StateMachine.class);
+
   /**
    * Initializes the State Machine with the given properties and storage. The state machine is
    * responsible reading the latest snapshot from the file system (if any) and initialize itself
@@ -173,6 +177,8 @@ public interface StateMachine extends Closeable {
   // TODO: We do not need to return CompletableFuture
   CompletableFuture<Message> applyTransaction(TransactionContext trx);
 
+  TermIndex getLastAppliedTermIndex();
+
   /**
    * Notify the state machine that the raft peer is no longer leader.
    */

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1beef874/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
index 53ef2f8..5fbedf9 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
@@ -17,15 +17,13 @@
  */
 package org.apache.ratis.statemachine;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Objects;
-
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase;
 import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
-import org.apache.ratis.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Collection;
 
 /**
  * Context for a transaction.
@@ -123,12 +121,12 @@ public interface TransactionContext {
    * log append, it is important to do only required operations here.
    * @return The Transaction context.
    */
-  public TransactionContext preAppendTransaction() throws IOException;
+  TransactionContext preAppendTransaction() throws IOException;
 
   /**
    * Called to notify the state machine that the Transaction passed cannot be appended (or synced).
    * The exception field will indicate whether there was an exception or not.
    * @return cancelled transaction
    */
-  public TransactionContext cancelTransaction() throws IOException;
+  TransactionContext cancelTransaction() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1beef874/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
index 520d965..8f0b56a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
@@ -18,10 +18,6 @@
 
 package org.apache.ratis.statemachine.impl;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
@@ -29,6 +25,7 @@ import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.impl.RaftConfiguration;
 import org.apache.ratis.server.impl.RaftServerConstants;
+import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
 import org.apache.ratis.statemachine.SnapshotInfo;
@@ -36,18 +33,25 @@ import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.StateMachineStorage;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Base implementation for StateMachines.
  */
 public class BaseStateMachine implements StateMachine {
-
   private volatile RaftPeerId id;
   protected RaftProperties properties;
   protected RaftStorage storage;
   protected RaftConfiguration raftConf;
   protected final LifeCycle lifeCycle = new LifeCycle(getClass().getSimpleName());
 
+  private final AtomicReference<TermIndex> lastAppliedTermIndex = new AtomicReference<>();
+
   public RaftPeerId getId() {
     return id;
   }
@@ -108,6 +112,30 @@ public class BaseStateMachine implements StateMachine {
   }
 
   @Override
+  public TermIndex getLastAppliedTermIndex() {
+    return lastAppliedTermIndex.get();
+  }
+
+  protected void setLastAppliedTermIndex(TermIndex newTI) {
+    lastAppliedTermIndex.set(newTI);
+  }
+
+  protected boolean updateLastAppliedTermIndex(long term, long index) {
+    final TermIndex newTI = TermIndex.newTermIndex(term, index);
+    final TermIndex oldTI = lastAppliedTermIndex.getAndSet(newTI);
+    if (!newTI.equals(oldTI)) {
+      LOG.debug("{}: update lastAppliedTermIndex from {} to {}", getId(), oldTI, newTI);
+      if (oldTI != null) {
+        Preconditions.assertTrue(newTI.compareTo(oldTI) >= 0,
+            () -> getId() + ": Failed updateLastAppliedTermIndex: newTI = "
+                + newTI + " < oldTI = " + oldTI);
+      }
+      return true;
+    }
+    return false;
+  }
+
+  @Override
   public long takeSnapshot() throws IOException {
     return RaftServerConstants.INVALID_LOG_INDEX;
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1beef874/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 8d53faf..86b7b66 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -27,7 +27,6 @@ import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerConstants;
-import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.LogInputStream;
 import org.apache.ratis.server.storage.LogOutputStream;
@@ -72,7 +71,6 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
       Collections.synchronizedList(new ArrayList<>());
   private final Daemon checkpointer;
   private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
-  private final TermIndexTracker termIndexTracker = new TermIndexTracker();
   private final RaftProperties properties = new RaftProperties();
   private long segmentMaxSize =
       RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
@@ -89,7 +87,6 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
   SimpleStateMachine4Testing() {
     checkpointer = new Daemon(() -> {
       while (running) {
-        try {
           if (list.get(list.size() - 1).getIndex() - endIndexLastCkpt >=
               SNAPSHOT_THRESHOLD) {
             endIndexLastCkpt = takeSnapshot();
@@ -98,9 +95,6 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
             Thread.sleep(1000);
           } catch (InterruptedException ignored) {
           }
-        } catch (IOException ioe) {
-          LOG.warn("Received IOException in Checkpointer", ioe);
-        }
       }
     });
   }
@@ -139,14 +133,14 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
   public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
     LogEntryProto entry = Objects.requireNonNull(trx.getLogEntry());
     list.add(entry);
-    termIndexTracker.update(ServerProtoUtils.toTermIndex(entry));
+    updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
     return CompletableFuture.completedFuture(
         new SimpleMessage(entry.getIndex() + " OK"));
   }
 
   @Override
-  public long takeSnapshot() throws IOException {
-    TermIndex termIndex = termIndexTracker.getLatestTermIndex();
+  public long takeSnapshot() {
+    final TermIndex termIndex = getLastAppliedTermIndex();
     if (termIndex.getTerm() <= 0 || termIndex.getIndex() <= 0) {
       return RaftServerConstants.INVALID_LOG_INDEX;
     }
@@ -209,14 +203,14 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
         LogEntryProto entry;
         while ((entry = in.nextEntry()) != null) {
           list.add(entry);
-          termIndexTracker.update(ServerProtoUtils.toTermIndex(entry));
+          updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
         }
       }
       Preconditions.assertTrue(
           !list.isEmpty() && endIndex == list.get(list.size() - 1).getIndex(),
           "endIndex=%s, list=%s", endIndex, list);
       this.endIndexLastCkpt = endIndex;
-      termIndexTracker.init(snapshot.getTermIndex());
+      setLastAppliedTermIndex(snapshot.getTermIndex());
       this.storage.loadLatestSnapshot();
       return endIndex;
     }
@@ -248,11 +242,6 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
   }
 
   @Override
-  public void notifyNotLeader(Collection<TransactionContext> pendingEntries) {
-    // do nothing
-  }
-
-  @Override
   public void close() {
     lifeCycle.checkStateAndClose(() -> {
       running = false;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1beef874/ratis-server/src/test/java/org/apache/ratis/statemachine/TermIndexTracker.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/TermIndexTracker.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/TermIndexTracker.java
deleted file mode 100644
index bb359d7..0000000
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/TermIndexTracker.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.statemachine;
-
-import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.util.Preconditions;
-
-import java.util.Objects;
-
-import static org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
-
-/**
- * Tracks the term index that is applied to the StateMachine for simple state machines with
- * no concurrent snapshoting capabilities.
- */
-class TermIndexTracker {
-  static final TermIndex INIT_TERMINDEX =
-      TermIndex.newTermIndex(INVALID_LOG_INDEX, INVALID_LOG_INDEX);
-
-  private TermIndex current = INIT_TERMINDEX;
-
-  //TODO: developer note: everything is synchronized for now for convenience.
-
-  /**
-   * Initialize the tracker with a term index (likely from a snapshot).
-   */
-  public synchronized void init(TermIndex termIndex) {
-    this.current = termIndex;
-  }
-
-  public synchronized void reset() {
-    init(INIT_TERMINDEX);
-  }
-
-  /**
-   * Update the tracker with a new TermIndex. It means that the StateMachine has
-   * this index in memory.
-   */
-  public synchronized void update(TermIndex termIndex) {
-    Objects.requireNonNull(termIndex);
-    Preconditions.assertTrue(termIndex.compareTo(current) >= 0);
-    this.current = termIndex;
-  }
-
-  /**
-   * Return latest term and index that is inserted to this tracker as an atomic
-   * entity.
-   */
-  public synchronized TermIndex getLatestTermIndex() {
-    return current;
-  }
-
-}