You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by dr...@apache.org on 2022/08/18 03:35:29 UTC

[ratis] branch branch-2 updated (7d3716db7 -> 913ed630e)

This is an automated email from the ASF dual-hosted git repository.

dragonyliu pushed a change to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git


    from 7d3716db7 RATIS-1675. Revert  RATIS-1601 & RATIS-1391 (#715)
     new 06e940107 RATIS-1676. SnapshotManager should use ServerStringUtils.toInstallSnapshotRequestString(..). (#717)
     new 5c20c85b4 RATIS-1673. Verify duplicate peerid when init raft group (#714)
     new 913ed630e RATIS-1642. Specify the thread-safety requirement in StateMachine. (#709)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/ratis/protocol/RaftGroup.java  |  1 +
 .../apache/ratis/statemachine/StateMachine.java    | 51 +++++++++++++++-------
 .../apache/ratis/server/impl/ServerProtoUtils.java | 32 +-------------
 .../ratis/server/storage/SnapshotManager.java      |  5 ++-
 .../{TestRaftId.java => TestRaftGroup.java}        | 34 +++++----------
 5 files changed, 53 insertions(+), 70 deletions(-)
 copy ratis-test/src/test/java/org/apache/ratis/protocol/{TestRaftId.java => TestRaftGroup.java} (57%)


[ratis] 01/03: RATIS-1676. SnapshotManager should use ServerStringUtils.toInstallSnapshotRequestString(..). (#717)

Posted by dr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dragonyliu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 06e9401071c515b9f2e86c87445059faf154050e
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Aug 16 19:14:37 2022 -0700

    RATIS-1676. SnapshotManager should use ServerStringUtils.toInstallSnapshotRequestString(..). (#717)
    
    
    (cherry picked from commit ef23b839b31fb24bc0f74156a6414f69f2b73a2c)
---
 .../apache/ratis/server/impl/ServerProtoUtils.java | 32 +---------------------
 .../ratis/server/storage/SnapshotManager.java      |  5 ++--
 2 files changed, 4 insertions(+), 33 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index eccebb6fc..deae754c3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -30,7 +30,7 @@ import java.util.List;
 import java.util.Optional;
 
 /** Server proto utilities for internal use. */
-public final class ServerProtoUtils {
+final class ServerProtoUtils {
   private ServerProtoUtils() {}
 
   private static RaftRpcReplyProto.Builder toRaftRpcReplyProtoBuilder(
@@ -171,34 +171,4 @@ public final class ServerProtoUtils {
         return false;
     }
   }
-
-  public static String convertToString(InstallSnapshotRequestProto request) {
-    final StringBuilder s = new StringBuilder();
-    final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunk =
-        request.getSnapshotChunk();
-    s.append(" { " + request.getServerRequest() + "leaderTerm: " + request.getLeaderTerm() + "\n");
-    if (request.hasSnapshotChunk()) {
-      s.append("snapshotChunk: {\n requestId: " + snapshotChunk.getRequestId() + "\n")
-          .append(" requestIndex: "  + snapshotChunk.getRequestIndex() + "\n")
-          .append(" raftConfiguration: " + snapshotChunk.getRaftConfiguration() + "\n")
-          .append(" termIndex: {\n  term: " + snapshotChunk.getTermIndex().getTerm() + "\n  index: " +
-              snapshotChunk.getTermIndex().getIndex() + "\n }\n");
-      for (FileChunkProto chunk : snapshotChunk.getFileChunksList()) {
-        s.append(" fileChunks: {\n  filename: " + chunk.getFilename() + "\n")
-            .append("  totalSize: " + chunk.getTotalSize() + "\n")
-            .append("  fileDigest: " + chunk.getFileDigest() + "\n")
-            .append("  chunkIndex: " + chunk.getChunkIndex() + "\n")
-            .append("  offset: " + chunk.getOffset() + "\n")
-            .append("  done: " + chunk.getDone() + "\n }\n");
-
-      }
-      s.append(" totalSize: " + snapshotChunk.getTotalSize() + "\n")
-          .append(" done: " + snapshotChunk.getDone()).append("\n}\n");
-    } else if (request.hasNotification()) {
-      s.append(" notification: " + request.getNotification() + "\n");
-    }
-
-    s.append(request.getLastRaftConfigurationLogEntryProto());
-    return s.toString();
-  }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
index c72e9f78e..aaa62a783 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
@@ -31,7 +31,7 @@ import org.apache.ratis.io.MD5Hash;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.proto.RaftProtos.FileChunkProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
-import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.util.ServerStringUtils;
 import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.util.FileUtils;
@@ -75,7 +75,8 @@ public class SnapshotManager {
     FileUtils.createDirectories(tmpDir);
     tmpDir.deleteOnExit();
 
-    LOG.info("Installing snapshot:{}, to tmp dir:{}", ServerProtoUtils.convertToString(request), tmpDir);
+    LOG.info("Installing snapshot:{}, to tmp dir:{}",
+        ServerStringUtils.toInstallSnapshotRequestString(request), tmpDir);
 
     // TODO: Make sure that subsequent requests for the same installSnapshot are coming in order,
     // and are not lost when whole request cycle is done. Check requestId and requestIndex here


[ratis] 03/03: RATIS-1642. Specify the thread-safety requirement in StateMachine. (#709)

Posted by dr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dragonyliu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 913ed630e1aaa064bd9c8eb9014f524433ed28cf
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Wed Aug 17 20:30:03 2022 -0700

    RATIS-1642. Specify the thread-safety requirement in StateMachine. (#709)
    
    * RATIS-1642. Specify the thread-safety requirement in StateMachine.
    
    * Revised the javadoc of applyTransaction(..)
    
    (cherry picked from commit 323bd1017afdbe03f70cccb172412e6fab221797)
---
 .../apache/ratis/statemachine/StateMachine.java    | 51 +++++++++++++++-------
 1 file changed, 36 insertions(+), 15 deletions(-)

diff --git a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 79f8818dc..e21411ec7 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -17,7 +17,6 @@
  */
 package org.apache.ratis.statemachine;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.ratis.proto.RaftProtos.*;
 import org.apache.ratis.protocol.ClientInvocationId;
 import org.apache.ratis.protocol.Message;
@@ -50,6 +49,10 @@ import java.util.function.Function;
  * StateMachine is the entry point for the custom implementation of replicated state as defined in
  * the "State Machine Approach" in the literature
  * (see https://en.wikipedia.org/wiki/State_machine_replication).
+ *
+ *  A {@link StateMachine} implementation must be threadsafe.
+ *  For example, the {@link #applyTransaction(TransactionContext)} method and the {@link #query(Message)} method
+ *  can be invoked in parallel.
  */
 public interface StateMachine extends Closeable {
   Logger LOG = LoggerFactory.getLogger(StateMachine.class);
@@ -81,7 +84,6 @@ public interface StateMachine extends Closeable {
      *
      * @return a future for the write task
      */
-    @SuppressFBWarnings("NP_NULL_PARAM_DEREF")
     default CompletableFuture<?> write(LogEntryProto entry) {
       return CompletableFuture.completedFuture(null);
     }
@@ -92,7 +94,6 @@ public interface StateMachine extends Closeable {
      *
      * @return a future of the stream.
      */
-    @SuppressFBWarnings("NP_NULL_PARAM_DEREF")
     default CompletableFuture<DataStream> stream(RaftClientRequest request) {
       return CompletableFuture.completedFuture(null);
     }
@@ -107,7 +108,6 @@ public interface StateMachine extends Closeable {
      * @param entry the log entry to be linked.
      * @return a future for the link task.
      */
-    @SuppressFBWarnings("NP_NULL_PARAM_DEREF")
     default CompletableFuture<?> link(DataStream stream, LogEntryProto entry) {
       return CompletableFuture.completedFuture(null);
     }
@@ -118,7 +118,6 @@ public interface StateMachine extends Closeable {
      * @param logIndex The log index to flush.
      * @return a future for the flush task.
      */
-    @SuppressFBWarnings("NP_NULL_PARAM_DEREF")
     default CompletableFuture<Void> flush(long logIndex) {
       return CompletableFuture.completedFuture(null);
     }
@@ -130,7 +129,6 @@ public interface StateMachine extends Closeable {
      * @param logIndex The last log index after truncation.
      * @return a future for truncate task.
      */
-    @SuppressFBWarnings("NP_NULL_PARAM_DEREF")
     default CompletableFuture<Void> truncate(long logIndex) {
       return CompletableFuture.completedFuture(null);
     }
@@ -258,7 +256,6 @@ public interface StateMachine extends Closeable {
      * @param firstTermIndexInLog The term-index of the first append entry available in the leader's log.
      * @return return the last term-index in the snapshot after the snapshot installation.
      */
-    @SuppressFBWarnings("NP_NULL_PARAM_DEREF")
     default CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
         RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) {
       return CompletableFuture.completedFuture(null);
@@ -474,18 +471,26 @@ public interface StateMachine extends Closeable {
   CompletableFuture<Message> queryStale(Message request, long minIndex);
 
   /**
-   * Validate/pre-process the incoming update request in the state machine.
-   * @return the content to be written to the log entry. Null means the request
-   * should be rejected.
+   * Start a transaction for the given request.
+   * This method can be invoked in parallel when there are multiple requests.
+   * The implementation should validate the request,
+   * prepare a {@link StateMachineLogEntryProto},
+   * and then build a {@link TransactionContext}.
+   * The implementation should also be light-weighted.
+   *
+   * @return null if the request should be rejected.
+   *         Otherwise, return a transaction with the content to be written to the log.
    * @throws IOException thrown by the state machine while validation
+   *
+   * @see TransactionContext.Builder
    */
   TransactionContext startTransaction(RaftClientRequest request) throws IOException;
 
   /**
    * This is called before the transaction passed from the StateMachine is appended to the raft log.
-   * This method will be called from log append and having the same strict serial order that the
-   * transactions will have in the RAFT log. Since this is called serially in the critical path of
-   * log append, it is important to do only required operations here.
+   * This method is called with the same strict serial order as the transaction order in the raft log.
+   * Since this is called serially in the critical path of log append,
+   * it is important to do only required operations here.
    * @return The Transaction context.
    */
   TransactionContext preAppendTransaction(TransactionContext trx) throws IOException;
@@ -515,8 +520,24 @@ public interface StateMachine extends Closeable {
    * method, which returns a future, is asynchronous. The state machine implementation may
    * choose to apply the log entries in parallel. In that case, the order of applying entries to
    * state machine could possibly be different from the log commit order.
-   * @param trx the transaction state including the log entry that has been committed to a quorum
-   *            of the raft peers
+   *
+   * The implementation must be deterministic so that the raft log can be replayed in any raft peers.
+   * Note that, if there are three or more servers,
+   * the Raft algorithm makes sure the that log remains consistent
+   * even if there are hardware errors in one machine (or less than the majority number of machines).
+   *
+   * Any exceptions thrown in this method are treated as unrecoverable errors (such as hardware errors).
+   * The server will be shut down when it occurs.
+   * Administrators should manually fix the underlying problem and then restart the machine.
+   *
+   * @param trx the transaction state including the log entry that has been replicated to a majority of the raft peers.
+   *
+   * @return a future containing the result message of the transaction,
+   *         where the result message will be replied to the client.
+   *         When there is an application level exception (e.g. access denied),
+   *         the state machine may complete the returned future exceptionally.
+   *         The exception will be wrapped in an {@link org.apache.ratis.protocol.exceptions.StateMachineException}
+   *         and then replied to the client.
    */
   CompletableFuture<Message> applyTransaction(TransactionContext trx);
 


[ratis] 02/03: RATIS-1673. Verify duplicate peerid when init raft group (#714)

Posted by dr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dragonyliu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 5c20c85b4a952f526252fb95cc8b20b07603f7a8
Author: leo65535 <le...@163.com>
AuthorDate: Wed Aug 17 12:06:19 2022 +0800

    RATIS-1673. Verify duplicate peerid when init raft group (#714)
    
    
    (cherry picked from commit 713aa38fe2c996f7961d8f8c03e13db898927e9b)
---
 .../java/org/apache/ratis/protocol/RaftGroup.java  |  1 +
 .../org/apache/ratis/protocol/TestRaftGroup.java   | 44 ++++++++++++++++++++++
 2 files changed, 45 insertions(+)

diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
index a18aa5ce9..0612a16f9 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
@@ -61,6 +61,7 @@ public final class RaftGroup {
     if (peers == null || !peers.iterator().hasNext()) {
       this.peers = Collections.emptyMap();
     } else {
+      Preconditions.assertUnique(peers);
       final Map<RaftPeerId, RaftPeer> map = new HashMap<>();
       peers.forEach(p -> map.put(p.getId(), p));
       this.peers = Collections.unmodifiableMap(map);
diff --git a/ratis-test/src/test/java/org/apache/ratis/protocol/TestRaftGroup.java b/ratis-test/src/test/java/org/apache/ratis/protocol/TestRaftGroup.java
new file mode 100644
index 000000000..5267b2238
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/protocol/TestRaftGroup.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
+
+public class TestRaftGroup extends BaseTest {
+  @Override
+  public int getGlobalTimeoutSeconds() {
+    return 1;
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testDuplicatePeerId() throws Exception {
+    UUID groupId = UUID.fromString("02511d47-d67c-49a3-9011-abb3109a44c1");
+
+    List<RaftPeer> peers = new LinkedList<>();
+    peers.add(RaftPeer.newBuilder().setId("n0").build());
+    peers.add(RaftPeer.newBuilder().setId("n0").build());
+    RaftGroup.valueOf(RaftGroupId.valueOf(groupId), peers);
+  }
+}