You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ms...@apache.org on 2018/02/01 05:19:33 UTC
incubator-ratis git commit: RATIS-204. Add getLastAppliedTermIndex to
StateMachine. Contributed by Tsz Wo Nicholas Sze.
Repository: incubator-ratis
Updated Branches:
refs/heads/master dd15f2a29 -> 1beef8741
RATIS-204. Add getLastAppliedTermIndex to StateMachine. Contributed by Tsz Wo Nicholas Sze.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/1beef874
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/1beef874
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/1beef874
Branch: refs/heads/master
Commit: 1beef8741160aa471a3b5af4c677875296f0a0aa
Parents: dd15f2a
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Thu Feb 1 10:49:01 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Thu Feb 1 10:49:01 2018 +0530
----------------------------------------------------------------------
.../java/org/apache/ratis/util/JavaUtils.java | 27 ++-----
.../org/apache/ratis/util/MemoizedSupplier.java | 74 ++++++++++++++++++++
.../arithmetic/ArithmeticStateMachine.java | 28 ++------
.../filestore/FileStoreStateMachine.java | 29 +++-----
.../ratis/server/impl/RaftServerImpl.java | 14 ++--
.../ratis/server/impl/StateMachineUpdater.java | 19 +++--
.../apache/ratis/statemachine/StateMachine.java | 8 ++-
.../ratis/statemachine/TransactionContext.java | 12 ++--
.../statemachine/impl/BaseStateMachine.java | 38 ++++++++--
.../SimpleStateMachine4Testing.java | 21 ++----
.../ratis/statemachine/TermIndexTracker.java | 68 ------------------
11 files changed, 167 insertions(+), 171 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1beef874/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index 0a99527..8322209 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
+import java.util.List;
import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
@@ -88,26 +89,8 @@ public interface JavaUtils {
* @param <T> The supplier result type.
* @return a memoized supplier which is thread-safe.
*/
- static <T> Supplier<T> memoize(Supplier<T> initializer) {
- Objects.requireNonNull(initializer, "initializer == null");
- return new Supplier<T>() {
- private volatile T value = null;
-
- @Override
- public T get() {
- T v = value;
- if (v == null) {
- synchronized (this) {
- v = value;
- if (v == null) {
- v = value = Objects.requireNonNull(initializer.get(),
- "initializer.get() returns null");
- }
- }
- }
- return v;
- }
- };
+ static <T> MemoizedSupplier<T> memoize(Supplier<T> initializer) {
+ return MemoizedSupplier.valueOf(initializer);
}
Supplier<ThreadGroup> ROOT_THREAD_GROUP = memoize(() -> {
@@ -213,4 +196,8 @@ public interface JavaUtils {
Objects.requireNonNull(t, "t == null");
return t instanceof CompletionException && t.getCause() != null? t.getCause(): t;
}
+
+ static <T> CompletableFuture<Void> allOf(List<CompletableFuture<T>> futures) {
+ return CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()]));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1beef874/ratis-common/src/main/java/org/apache/ratis/util/MemoizedSupplier.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/MemoizedSupplier.java b/ratis-common/src/main/java/org/apache/ratis/util/MemoizedSupplier.java
new file mode 100644
index 0000000..47a4a87
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/MemoizedSupplier.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import java.util.Objects;
+import java.util.function.Supplier;
+
+/**
+ * A memoized supplier is a {@link Supplier}
+ * which gets a value by invoking its initializer once
+ * and then keeps returning the same value as its supplied results.
+ *
+ * This class is thread safe.
+ *
+ * @param <T> The supplier result type.
+ */
+public class MemoizedSupplier<T> implements Supplier<T> {
+ /**
+ * @param supplier to supply at most one non-null value.
+ * @return a {@link MemoizedSupplier} with the given supplier.
+ */
+ public static <T> MemoizedSupplier<T> valueOf(Supplier<T> supplier) {
+ return supplier instanceof MemoizedSupplier ?
+ (MemoizedSupplier<T>) supplier : new MemoizedSupplier<>(supplier);
+ }
+
+ private final Supplier<T> initializer;
+ private volatile T value = null;
+
+ /**
+ * Create a memoized supplier.
+ * @param initializer to supply at most one non-null value.
+ */
+ private MemoizedSupplier(Supplier<T> initializer) {
+ Objects.requireNonNull(initializer, "initializer == null");
+ this.initializer = initializer;
+ }
+
+ /** @return the lazily initialized object. */
+ @Override
+ public T get() {
+ T v = value;
+ if (v == null) {
+ synchronized (this) {
+ v = value;
+ if (v == null) {
+ v = value = Objects.requireNonNull(initializer.get(),
+ "initializer.get() returns null");
+ }
+ }
+ }
+ return v;
+ }
+
+ /** @return is the object initialized? */
+ public boolean isInitialized() {
+ return value != null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1beef874/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java
index 4eb0756..0b08b89 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java
@@ -27,30 +27,24 @@ import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.statemachine.*;
+import org.apache.ratis.statemachine.StateMachineStorage;
+import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.apache.ratis.util.AutoCloseableLock;
-import org.apache.ratis.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ArithmeticStateMachine extends BaseStateMachine {
- static final Logger LOG = LoggerFactory.getLogger(ArithmeticStateMachine.class);
-
private final Map<String, Double> variables = new ConcurrentHashMap<>();
private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
- private final AtomicReference<TermIndex> latestTermIndex = new AtomicReference<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
@@ -64,7 +58,7 @@ public class ArithmeticStateMachine extends BaseStateMachine {
void reset() {
variables.clear();
- latestTermIndex.set(null);
+ setLastAppliedTermIndex(null);
}
@Override
@@ -83,12 +77,12 @@ public class ArithmeticStateMachine extends BaseStateMachine {
}
@Override
- public long takeSnapshot() throws IOException {
+ public long takeSnapshot() {
final Map<String, Double> copy;
final TermIndex last;
try(final AutoCloseableLock readLock = readLock()) {
copy = new HashMap<>(variables);
- last = latestTermIndex.get();
+ last = getLastAppliedTermIndex();
}
final File snapshotFile = storage.getSnapshotFile(last.getTerm(), last.getIndex());
@@ -127,7 +121,7 @@ public class ArithmeticStateMachine extends BaseStateMachine {
if (reload) {
reset();
}
- latestTermIndex.set(last);
+ setLastAppliedTermIndex(last);
variables.putAll((Map<String, Double>) in.readObject());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e);
@@ -171,7 +165,7 @@ public class ArithmeticStateMachine extends BaseStateMachine {
final Double result;
try(final AutoCloseableLock writeLock = writeLock()) {
result = assignment.evaluate(variables);
- updateLatestTermIndex(entry.getTerm(), index);
+ updateLastAppliedTermIndex(entry.getTerm(), index);
}
final Expression r = Expression.Utils.double2Expression(result);
LOG.debug("{}-{}: {} = {}", getId(), index, assignment, r);
@@ -180,12 +174,4 @@ public class ArithmeticStateMachine extends BaseStateMachine {
}
return CompletableFuture.completedFuture(Expression.Utils.toMessage(r));
}
-
- private void updateLatestTermIndex(long term, long index) {
- final TermIndex newTI = TermIndex.newTermIndex(term, index);
- final TermIndex oldTI = latestTermIndex.getAndSet(newTI);
- if (oldTI != null) {
- Preconditions.assertTrue(newTI.compareTo(oldTI) >= 0);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1beef874/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
index 2a09e1b..5ecb387 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
@@ -19,35 +19,30 @@ package org.apache.ratis.examples.filestore;
import org.apache.ratis.conf.ConfUtils;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.*;
-import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.ratis.shaded.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.shaded.proto.ExamplesProtos.*;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
-import org.apache.ratis.statemachine.impl.BaseStateMachine;
-import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.statemachine.impl.TransactionContextImpl;
import org.apache.ratis.util.FileUtils;
-import org.apache.ratis.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicReference;
public class FileStoreStateMachine extends BaseStateMachine {
- public static final Logger LOG = LoggerFactory.getLogger(FileStoreStateMachine.class);
-
private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
- private final AtomicReference<TermIndex> latestTermIndex = new AtomicReference<>();
private final FileStore files;
@@ -73,7 +68,7 @@ public class FileStoreStateMachine extends BaseStateMachine {
@Override
public void close() {
files.close();
- latestTermIndex.set(null);
+ setLastAppliedTermIndex(null);
}
@Override
@@ -137,7 +132,7 @@ public class FileStoreStateMachine extends BaseStateMachine {
final LogEntryProto entry = trx.getLogEntry();
final long index = entry.getIndex();
- updateLatestTermIndex(entry.getTerm(), index);
+ updateLastAppliedTermIndex(entry.getTerm(), index);
final SMLogEntryProto smLog = entry.getSmLogEntry();
final FileStoreRequestProto request;
@@ -176,12 +171,4 @@ public class FileStoreStateMachine extends BaseStateMachine {
DeleteReplyProto.newBuilder().setResolvedPath(
FileStoreCommon.toByteString(resolved)).build().toByteString());
}
-
- private void updateLatestTermIndex(long term, long index) {
- final TermIndex newTI = TermIndex.newTermIndex(term, index);
- final TermIndex oldTI = latestTermIndex.getAndSet(newTI);
- if (oldTI != null) {
- Preconditions.assertTrue(newTI.compareTo(oldTI) >= 0);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1beef874/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 29b3a7e..a619909 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -801,8 +801,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
logAppendEntries(isHeartbeat,
() -> getId() + ": succeeded to handle AppendEntries. Reply: "
+ ServerProtoUtils.toString(reply));
- return CompletableFuture
- .allOf(futures.toArray(new CompletableFuture[futures.size()]))
+ return JavaUtils.allOf(futures)
.thenApply(v -> {
// reset election timer to avoid punishing the leader for our own
// long disk writes
@@ -932,8 +931,8 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
* @param stateMachineFuture the future returned by the state machine
* from which we will get transaction result later
*/
- private void replyPendingRequest(LogEntryProto logEntry,
- CompletableFuture<Message> stateMachineFuture) {
+ private CompletableFuture<Message> replyPendingRequest(
+ LogEntryProto logEntry, CompletableFuture<Message> stateMachineFuture) {
// update the retry cache
final ClientId clientId = ClientId.valueOf(logEntry.getClientId());
final long callId = logEntry.getCallId();
@@ -944,7 +943,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
retryCache.refreshEntry(new RetryCache.CacheEntry(cacheEntry.getKey()));
}
- stateMachineFuture.whenComplete((reply, exception) -> {
+ return stateMachineFuture.whenComplete((reply, exception) -> {
final RaftClientReply r;
if (exception == null) {
r = new RaftClientReply(clientId, serverId, groupId, callId, true, reply, null);
@@ -980,7 +979,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
return s.getFollowerNextIndices();
}
- public void applyLogToStateMachine(LogEntryProto next) {
+ CompletableFuture<Message> applyLogToStateMachine(LogEntryProto next) {
final StateMachine stateMachine = getStateMachine();
if (next.getLogEntryBodyCase() == CONFIGURATIONENTRY) {
// the reply should have already been set. only need to record
@@ -1000,8 +999,9 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
// TODO: This step can be parallelized
CompletableFuture<Message> stateMachineFuture =
stateMachine.applyTransaction(trx);
- replyPendingRequest(next, stateMachineFuture);
+ return replyPendingRequest(next, stateMachineFuture);
}
+ return null;
}
private class RaftServerJmxAdapter extends JmxRegister implements RaftServerMXBean {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1beef874/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index 9ef6ce7..5fffb1f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -18,20 +18,21 @@
package org.apache.ratis.server.impl;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.Message;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.storage.RaftLog;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
-import org.apache.ratis.util.Daemon;
-import org.apache.ratis.util.ExitUtils;
-import org.apache.ratis.util.LifeCycle;
-import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
/**
* This class tracks the log entries that have been committed in a quorum and
@@ -140,6 +141,8 @@ class StateMachineUpdater implements Runnable {
state = State.RUNNING;
}
+ final MemoizedSupplier<List<CompletableFuture<Message>>> futures
+ = MemoizedSupplier.valueOf(() -> new ArrayList<>());
while (lastAppliedIndex < committedIndex) {
final long nextIndex = lastAppliedIndex + 1;
final LogEntryProto next = raftLog.get(nextIndex);
@@ -148,7 +151,10 @@ class StateMachineUpdater implements Runnable {
LOG.debug("{}: applying nextIndex={}, nextLog={}",
this, nextIndex, ServerProtoUtils.toString(next));
}
- server.applyLogToStateMachine(next);
+ final CompletableFuture<Message> f = server.applyLogToStateMachine(next);
+ if (f != null) {
+ futures.get().add(f);
+ }
lastAppliedIndex = nextIndex;
} else {
LOG.debug("{}: logEntry {} is null. There may be snapshot to load. state:{}",
@@ -159,6 +165,9 @@ class StateMachineUpdater implements Runnable {
// check if need to trigger a snapshot
if (shouldTakeSnapshot(lastAppliedIndex)) {
+ if (futures.isInitialized()) {
+ JavaUtils.allOf(futures.get()).get();
+ }
stateMachine.takeSnapshot();
// TODO purge logs, including log cache. but should keep log for leader's RPCSenders
lastSnapshotIndex = lastAppliedIndex;
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1beef874/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 1d47815..cbedb08 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -24,10 +24,12 @@ import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftConfiguration;
-import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.util.LifeCycle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
@@ -40,6 +42,8 @@ import java.util.concurrent.CompletableFuture;
* (see https://en.wikipedia.org/wiki/State_machine_replication).
*/
public interface StateMachine extends Closeable {
+ Logger LOG = LoggerFactory.getLogger(StateMachine.class);
+
/**
* Initializes the State Machine with the given properties and storage. The state machine is
* responsible reading the latest snapshot from the file system (if any) and initialize itself
@@ -173,6 +177,8 @@ public interface StateMachine extends Closeable {
// TODO: We do not need to return CompletableFuture
CompletableFuture<Message> applyTransaction(TransactionContext trx);
+ TermIndex getLastAppliedTermIndex();
+
/**
* Notify the state machine that the raft peer is no longer leader.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1beef874/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
index 53ef2f8..5fbedf9 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
@@ -17,15 +17,13 @@
*/
package org.apache.ratis.statemachine;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Objects;
-
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase;
import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
-import org.apache.ratis.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Collection;
/**
* Context for a transaction.
@@ -123,12 +121,12 @@ public interface TransactionContext {
* log append, it is important to do only required operations here.
* @return The Transaction context.
*/
- public TransactionContext preAppendTransaction() throws IOException;
+ TransactionContext preAppendTransaction() throws IOException;
/**
* Called to notify the state machine that the Transaction passed cannot be appended (or synced).
* The exception field will indicate whether there was an exception or not.
* @return cancelled transaction
*/
- public TransactionContext cancelTransaction() throws IOException;
+ TransactionContext cancelTransaction() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1beef874/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
index 520d965..8f0b56a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
@@ -18,10 +18,6 @@
package org.apache.ratis.statemachine.impl;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
@@ -29,6 +25,7 @@ import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.impl.RaftConfiguration;
import org.apache.ratis.server.impl.RaftServerConstants;
+import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
import org.apache.ratis.statemachine.SnapshotInfo;
@@ -36,18 +33,25 @@ import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Base implementation for StateMachines.
*/
public class BaseStateMachine implements StateMachine {
-
private volatile RaftPeerId id;
protected RaftProperties properties;
protected RaftStorage storage;
protected RaftConfiguration raftConf;
protected final LifeCycle lifeCycle = new LifeCycle(getClass().getSimpleName());
+ private final AtomicReference<TermIndex> lastAppliedTermIndex = new AtomicReference<>();
+
public RaftPeerId getId() {
return id;
}
@@ -108,6 +112,30 @@ public class BaseStateMachine implements StateMachine {
}
@Override
+ public TermIndex getLastAppliedTermIndex() {
+ return lastAppliedTermIndex.get();
+ }
+
+ protected void setLastAppliedTermIndex(TermIndex newTI) {
+ lastAppliedTermIndex.set(newTI);
+ }
+
+ protected boolean updateLastAppliedTermIndex(long term, long index) {
+ final TermIndex newTI = TermIndex.newTermIndex(term, index);
+ final TermIndex oldTI = lastAppliedTermIndex.getAndSet(newTI);
+ if (!newTI.equals(oldTI)) {
+ LOG.debug("{}: update lastAppliedTermIndex from {} to {}", getId(), oldTI, newTI);
+ if (oldTI != null) {
+ Preconditions.assertTrue(newTI.compareTo(oldTI) >= 0,
+ () -> getId() + ": Failed updateLastAppliedTermIndex: newTI = "
+ + newTI + " < oldTI = " + oldTI);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
public long takeSnapshot() throws IOException {
return RaftServerConstants.INVALID_LOG_INDEX;
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1beef874/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 8d53faf..86b7b66 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -27,7 +27,6 @@ import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerConstants;
-import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.LogInputStream;
import org.apache.ratis.server.storage.LogOutputStream;
@@ -72,7 +71,6 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
Collections.synchronizedList(new ArrayList<>());
private final Daemon checkpointer;
private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
- private final TermIndexTracker termIndexTracker = new TermIndexTracker();
private final RaftProperties properties = new RaftProperties();
private long segmentMaxSize =
RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
@@ -89,7 +87,6 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
SimpleStateMachine4Testing() {
checkpointer = new Daemon(() -> {
while (running) {
- try {
if (list.get(list.size() - 1).getIndex() - endIndexLastCkpt >=
SNAPSHOT_THRESHOLD) {
endIndexLastCkpt = takeSnapshot();
@@ -98,9 +95,6 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
Thread.sleep(1000);
} catch (InterruptedException ignored) {
}
- } catch (IOException ioe) {
- LOG.warn("Received IOException in Checkpointer", ioe);
- }
}
});
}
@@ -139,14 +133,14 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
LogEntryProto entry = Objects.requireNonNull(trx.getLogEntry());
list.add(entry);
- termIndexTracker.update(ServerProtoUtils.toTermIndex(entry));
+ updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
return CompletableFuture.completedFuture(
new SimpleMessage(entry.getIndex() + " OK"));
}
@Override
- public long takeSnapshot() throws IOException {
- TermIndex termIndex = termIndexTracker.getLatestTermIndex();
+ public long takeSnapshot() {
+ final TermIndex termIndex = getLastAppliedTermIndex();
if (termIndex.getTerm() <= 0 || termIndex.getIndex() <= 0) {
return RaftServerConstants.INVALID_LOG_INDEX;
}
@@ -209,14 +203,14 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
LogEntryProto entry;
while ((entry = in.nextEntry()) != null) {
list.add(entry);
- termIndexTracker.update(ServerProtoUtils.toTermIndex(entry));
+ updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
}
}
Preconditions.assertTrue(
!list.isEmpty() && endIndex == list.get(list.size() - 1).getIndex(),
"endIndex=%s, list=%s", endIndex, list);
this.endIndexLastCkpt = endIndex;
- termIndexTracker.init(snapshot.getTermIndex());
+ setLastAppliedTermIndex(snapshot.getTermIndex());
this.storage.loadLatestSnapshot();
return endIndex;
}
@@ -248,11 +242,6 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
}
@Override
- public void notifyNotLeader(Collection<TransactionContext> pendingEntries) {
- // do nothing
- }
-
- @Override
public void close() {
lifeCycle.checkStateAndClose(() -> {
running = false;
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1beef874/ratis-server/src/test/java/org/apache/ratis/statemachine/TermIndexTracker.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/TermIndexTracker.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/TermIndexTracker.java
deleted file mode 100644
index bb359d7..0000000
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/TermIndexTracker.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.statemachine;
-
-import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.util.Preconditions;
-
-import java.util.Objects;
-
-import static org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
-
-/**
- * Tracks the term index that is applied to the StateMachine for simple state machines with
- * no concurrent snapshoting capabilities.
- */
-class TermIndexTracker {
- static final TermIndex INIT_TERMINDEX =
- TermIndex.newTermIndex(INVALID_LOG_INDEX, INVALID_LOG_INDEX);
-
- private TermIndex current = INIT_TERMINDEX;
-
- //TODO: developer note: everything is synchronized for now for convenience.
-
- /**
- * Initialize the tracker with a term index (likely from a snapshot).
- */
- public synchronized void init(TermIndex termIndex) {
- this.current = termIndex;
- }
-
- public synchronized void reset() {
- init(INIT_TERMINDEX);
- }
-
- /**
- * Update the tracker with a new TermIndex. It means that the StateMachine has
- * this index in memory.
- */
- public synchronized void update(TermIndex termIndex) {
- Objects.requireNonNull(termIndex);
- Preconditions.assertTrue(termIndex.compareTo(current) >= 0);
- this.current = termIndex;
- }
-
- /**
- * Return latest term and index that is inserted to this tracker as an atomic
- * entity.
- */
- public synchronized TermIndex getLatestTermIndex() {
- return current;
- }
-
-}