You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/02/28 23:14:25 UTC

incubator-ratis git commit: RATIS-19. Include clientId and callId in log entries.

Repository: incubator-ratis
Updated Branches:
  refs/heads/master 8ac50a721 -> 392fa5141


RATIS-19. Include clientId and callId in log entries.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/392fa514
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/392fa514
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/392fa514

Branch: refs/heads/master
Commit: 392fa51415047b6e265a73d4ef175f1a468535b7
Parents: 8ac50a7
Author: Jing Zhao <ji...@apache.org>
Authored: Tue Feb 28 15:13:42 2017 -0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Tue Feb 28 15:13:42 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/ratis/util/ProtoUtils.java  |  5 ++++-
 ratis-proto-shaded/src/main/proto/Raft.proto    |  5 +++++
 .../ratis/server/impl/RaftServerImpl.java       |  3 ++-
 .../apache/ratis/server/impl/ServerState.java   |  6 ++++--
 .../apache/ratis/server/storage/RaftLog.java    |  8 ++++---
 .../ratis/server/storage/TestRaftLogCache.java  | 10 ++++++---
 .../server/storage/TestRaftLogReadWrite.java    | 20 ++++++++++++------
 .../server/storage/TestRaftLogSegment.java      | 22 ++++++++++++--------
 .../server/storage/TestSegmentedRaftLog.java    |  7 +++++--
 9 files changed, 59 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/392fa514/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index 8d9c25e..2613342 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.shaded.com.google.protobuf.ByteString;
@@ -113,9 +114,11 @@ public class ProtoUtils {
   }
 
   public static LogEntryProto toLogEntryProto(
-      SMLogEntryProto operation, long term, long index) {
+      SMLogEntryProto operation, long term, long index,
+      ClientId clientId, long callId) {
     return LogEntryProto.newBuilder().setTerm(term).setIndex(index)
         .setSmLogEntry(operation)
+        .setClientId(toByteString(clientId.toBytes())).setCallId(callId)
         .build();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/392fa514/ratis-proto-shaded/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto
index 182a905..14901f6 100644
--- a/ratis-proto-shaded/src/main/proto/Raft.proto
+++ b/ratis-proto-shaded/src/main/proto/Raft.proto
@@ -50,6 +50,11 @@ message LogEntryProto {
     RaftConfigurationProto configurationEntry = 4;
     LeaderNoOp noOp = 5;
   }
+
+  // clientId and callId are used to rebuild the retry cache. They're not
+  // necessary for configuration change since re-conf is idempotent.
+  bytes clientId = 6;
+  uint64 callId = 7;
 }
 
 message TermIndexProto {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/392fa514/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 470141c..d729914 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -380,7 +380,8 @@ public class RaftServerImpl implements RaftServer {
       // append the message to its local log
       final long entryIndex;
       try {
-        entryIndex = state.applyLog(entry);
+        entryIndex = state.applyLog(entry, request.getClientId(),
+            request.getSeqNum());
       } catch (IOException e) {
         throw new RaftException(e);
       }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/392fa514/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index dd2d784..4b7efbd 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -24,6 +24,7 @@ import java.io.Closeable;
 import java.io.IOException;
 
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.MemoryRaftLog;
@@ -204,8 +205,9 @@ public class ServerState implements Closeable {
     return log;
   }
 
-  long applyLog(TransactionContext operation) throws IOException {
-    return log.append(currentTerm, operation);
+  long applyLog(TransactionContext operation, ClientId clientId, long callId)
+      throws IOException {
+    return log.append(currentTerm, operation, clientId, callId);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/392fa514/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
index f0c7b60..32422d3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.impl.ConfigurationManager;
 import org.apache.ratis.server.impl.RaftConfiguration;
@@ -125,7 +126,8 @@ public abstract class RaftLog implements Closeable {
    * Used by the leader.
    * @return the index of the new log entry.
    */
-  public long append(long term, TransactionContext operation) throws IOException {
+  public long append(long term, TransactionContext operation,
+      ClientId clientId, long callId) throws IOException {
     checkLogState();
     try(AutoCloseableLock writeLock = writeLock()) {
       final long nextIndex = getNextIndex();
@@ -136,7 +138,7 @@ public abstract class RaftLog implements Closeable {
 
       // build the log entry after calling the StateMachine
       final LogEntryProto e = ProtoUtils.toLogEntryProto(
-          operation.getSMLogEntry().get(), term, nextIndex);
+          operation.getSMLogEntry().get(), term, nextIndex, clientId, callId);
 
       appendEntry(e);
       operation.setLogEntry(e);
@@ -207,7 +209,7 @@ public abstract class RaftLog implements Closeable {
    * If an existing entry conflicts with a new one (same index but different
    * terms), delete the existing entry and all entries that follow it (�5.3).
    *
-   * This method, {@link #append(long, TransactionContext)},
+   * This method, {@link #append(long, TransactionContext, ClientId, long)},
    * {@link #append(long, RaftConfiguration)}, and {@link #truncate(long)},
    * do not guarantee the changes are persisted.
    * Need to call {@link #logSync()} to persist the changes.

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/392fa514/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
index f5a18ac..388c269 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server.storage;
 import java.util.Iterator;
 
 import org.apache.ratis.RaftTestUtil.SimpleOperation;
+import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.server.storage.RaftLogCache.TruncationSegments;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.util.ProtoUtils;
@@ -28,6 +29,9 @@ import org.junit.Before;
 import org.junit.Test;
 
 public class TestRaftLogCache {
+  private static final ClientId clientId = ClientId.createId();
+  private static final long callId = 0;
+
   private RaftLogCache cache;
 
   @Before
@@ -40,7 +44,7 @@ public class TestRaftLogCache {
     for (long i = start; i <= end; i++) {
       SimpleOperation m = new SimpleOperation("m" + i);
       LogEntryProto entry = ProtoUtils.toLogEntryProto(m.getLogEntryContent(),
-          0, i);
+          0, i, clientId, callId);
       s.appendToOpenSegment(entry);
     }
     if (!isOpen) {
@@ -130,7 +134,7 @@ public class TestRaftLogCache {
     final SimpleOperation m = new SimpleOperation("m");
     try {
       LogEntryProto entry = ProtoUtils.toLogEntryProto(m.getLogEntryContent(),
-          0, 0);
+          0, 0, clientId, callId);
       cache.appendEntry(entry);
       Assert.fail("the open segment is null");
     } catch (IllegalStateException ignored) {
@@ -140,7 +144,7 @@ public class TestRaftLogCache {
     cache.addSegment(openSegment);
     for (long index = 101; index < 200; index++) {
       LogEntryProto entry = ProtoUtils.toLogEntryProto(m.getLogEntryContent(),
-          0, index);
+          0, index, clientId, callId);
       cache.appendEntry(entry);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/392fa514/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java
index bcdb958..f72d007 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java
@@ -34,6 +34,7 @@ import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.RaftTestUtil.SimpleOperation;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.ChecksumException;
+import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.impl.RaftServerConstants.StartupOption;
 import org.apache.ratis.shaded.com.google.protobuf.CodedOutputStream;
@@ -53,9 +54,11 @@ import org.slf4j.LoggerFactory;
 public class TestRaftLogReadWrite {
   private static final Logger LOG = LoggerFactory.getLogger(TestRaftLogReadWrite.class);
 
+  private static final ClientId clientId = ClientId.createId();
+  private static final long callId = 0;
+
   private File storageDir;
   private RaftProperties properties;
-  private int segmentMaxSize;
 
   @Before
   public void setup() throws Exception {
@@ -90,7 +93,8 @@ public class TestRaftLogReadWrite {
     long size = 0;
     for (int i = 0; i < entries.length; i++) {
       SimpleOperation m = new SimpleOperation("m" + i);
-      entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
+      entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i,
+          clientId, callId);
       final int s = entries[i].getSerializedSize();
       size += CodedOutputStream.computeUInt32SizeNoTag(s) + s + 4;
       out.write(entries[i]);
@@ -131,7 +135,8 @@ public class TestRaftLogReadWrite {
              new LogOutputStream(openSegment, false, properties)) {
       for (int i = 0; i < 100; i++) {
         SimpleOperation m = new SimpleOperation("m" + i);
-        entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
+        entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i,
+            clientId, callId);
         out.write(entries[i]);
       }
     }
@@ -140,7 +145,8 @@ public class TestRaftLogReadWrite {
              new LogOutputStream(openSegment, true, properties)) {
       for (int i = 100; i < 200; i++) {
         SimpleOperation m = new SimpleOperation("m" + i);
-        entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
+        entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i,
+            clientId, callId);
         out.write(entries[i]);
       }
     }
@@ -196,7 +202,8 @@ public class TestRaftLogReadWrite {
     LogOutputStream out = new LogOutputStream(openSegment, false, properties);
     for (int i = 0; i < 10; i++) {
       SimpleOperation m = new SimpleOperation("m" + i);
-      entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
+      entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i,
+          clientId, callId);
       out.write(entries[i]);
     }
     out.flush();
@@ -243,7 +250,8 @@ public class TestRaftLogReadWrite {
              new LogOutputStream(openSegment, false, properties)) {
       for (int i = 0; i < 100; i++) {
         LogEntryProto entry = ProtoUtils.toLogEntryProto(
-            new SimpleOperation("m" + i).getLogEntryContent(), 0, i);
+            new SimpleOperation("m" + i).getLogEntryContent(), 0, i,
+            clientId, callId);
         out.write(entry);
       }
     } finally {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/392fa514/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
index 3092a21..fa72f64 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
@@ -32,6 +32,7 @@ import java.util.List;
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.RaftTestUtil.SimpleOperation;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerConstants.StartupOption;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
@@ -47,6 +48,9 @@ import org.junit.Test;
  * Test basic functionality of {@link LogSegment}
  */
 public class TestRaftLogSegment {
+  private static final ClientId clientId = ClientId.createId();
+  private static final long callId = 0;
+
   private File storageDir;
   private final RaftProperties properties = new RaftProperties();
 
@@ -75,7 +79,7 @@ public class TestRaftLogSegment {
       for (int i = 0; i < size; i++) {
         SimpleOperation op = new SimpleOperation("m" + i);
         entries[i] = ProtoUtils.toLogEntryProto(op.getLogEntryContent(),
-            term, i + start);
+            term, i + start, clientId, callId);
         out.write(entries[i]);
       }
     }
@@ -132,7 +136,7 @@ public class TestRaftLogSegment {
     while (size < max) {
       SimpleOperation op = new SimpleOperation("m" + i);
       LogEntryProto entry = ProtoUtils.toLogEntryProto(op.getLogEntryContent(),
-          term, i++ + start);
+          term, i++ + start, clientId, callId);
       size += getEntrySize(entry);
       list.add(entry);
     }
@@ -148,18 +152,18 @@ public class TestRaftLogSegment {
     SimpleOperation op = new SimpleOperation("m");
     final SMLogEntryProto m = op.getLogEntryContent();
     try {
-      LogEntryProto entry = ProtoUtils.toLogEntryProto(m, 0, 1001);
+      LogEntryProto entry = ProtoUtils.toLogEntryProto(m, 0, 1001, clientId, callId);
       segment.appendToOpenSegment(entry);
       Assert.fail("should fail since the entry's index needs to be 1000");
     } catch (Exception e) {
       Assert.assertTrue(e instanceof IllegalArgumentException);
     }
 
-    LogEntryProto entry = ProtoUtils.toLogEntryProto(m, 0, 1000);
+    LogEntryProto entry = ProtoUtils.toLogEntryProto(m, 0, 1000, clientId, callId);
     segment.appendToOpenSegment(entry);
 
     try {
-      entry = ProtoUtils.toLogEntryProto(m, 0, 1002);
+      entry = ProtoUtils.toLogEntryProto(m, 0, 1002, clientId, callId);
       segment.appendToOpenSegment(entry);
       Assert.fail("should fail since the entry's index needs to be 1001");
     } catch (Exception e) {
@@ -168,7 +172,7 @@ public class TestRaftLogSegment {
 
     LogEntryProto[] entries = new LogEntryProto[2];
     for (int i = 0; i < 2; i++) {
-      entries[i] = ProtoUtils.toLogEntryProto(m, 0, 1001 + i * 2);
+      entries[i] = ProtoUtils.toLogEntryProto(m, 0, 1001 + i * 2, clientId, callId);
     }
     try {
       segment.appendToOpenSegment(entries);
@@ -185,7 +189,7 @@ public class TestRaftLogSegment {
     LogSegment segment = LogSegment.newOpenSegment(start);
     for (int i = 0; i < 100; i++) {
       LogEntryProto entry = ProtoUtils.toLogEntryProto(
-          new SimpleOperation("m" + i).getLogEntryContent(), term, i + start);
+          new SimpleOperation("m" + i).getLogEntryContent(), term, i + start, clientId, callId);
       segment.appendToOpenSegment(entry);
     }
 
@@ -251,7 +255,7 @@ public class TestRaftLogSegment {
         getProperties(1024, 1024))) {
       SimpleOperation op = new SimpleOperation(new String(content));
       LogEntryProto entry = ProtoUtils.toLogEntryProto(op.getLogEntryContent(),
-          0, 0);
+          0, 0, clientId, callId);
       size = LogSegment.getEntrySize(entry);
       out.write(entry);
     }
@@ -282,7 +286,7 @@ public class TestRaftLogSegment {
     Arrays.fill(content, (byte) 1);
     SimpleOperation op = new SimpleOperation(new String(content));
     LogEntryProto entry = ProtoUtils.toLogEntryProto(op.getLogEntryContent(),
-        0, 0);
+        0, 0, clientId, callId);
     final long entrySize = LogSegment.getEntrySize(entry);
 
     long totalSize = SegmentedRaftLog.HEADER_BYTES.length;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/392fa514/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
index 08f671f..9b88321 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
@@ -33,6 +33,7 @@ import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.RaftTestUtil.SimpleOperation;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.ConfigurationManager;
@@ -53,6 +54,8 @@ public class TestSegmentedRaftLog {
   }
 
   private static final RaftPeerId peerId = new RaftPeerId("s0");
+  private static final ClientId clientId = ClientId.createId();
+  private static final long callId = 0;
 
   private static class SegmentRange {
     final long start;
@@ -103,7 +106,7 @@ public class TestSegmentedRaftLog {
         for (int i = 0; i < size; i++) {
           SimpleOperation m = new SimpleOperation("m" + (i + range.start));
           entries[i] = ProtoUtils.toLogEntryProto(m.getLogEntryContent(),
-              range.term, i + range.start);
+              range.term, i + range.start, clientId, callId);
           out.write(entries[i]);
         }
       }
@@ -153,7 +156,7 @@ public class TestSegmentedRaftLog {
             new SimpleOperation("m" + index) :
             new SimpleOperation(stringSupplier.get());
         eList.add(ProtoUtils.toLogEntryProto(m.getLogEntryContent(),
-            range.term, index));
+            range.term, index, clientId, callId));
       }
     }
     return eList;