You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/12/06 11:56:37 UTC

[incubator-ratis] branch RATIS-1209 created (now c140982)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a change to branch RATIS-1209
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git.


      at c140982  RATIS-1209. Compare the performance between DataStreamApi and AsyncApi.

This branch includes the following new commits:

     new 2b811c1  rename methods
     new c140982  RATIS-1209. Compare the performance between DataStreamApi and AsyncApi.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-ratis] 01/02: rename methods

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch RATIS-1209
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git

commit 2b811c1bffd830049eb86d050e2b95e2d3d719ea
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Sun Dec 6 13:41:48 2020 +0800

    rename methods
---
 .../apache/ratis/grpc/server/GrpcLogAppender.java  | 34 ++++++++-----
 .../apache/ratis/server/impl/LeaderStateImpl.java  | 24 ++++-----
 .../org/apache/ratis/server/impl/LogAppender.java  | 57 ++++++++++------------
 .../ratis/server/impl/LogAppenderDaemon.java       |  2 +-
 4 files changed, 59 insertions(+), 58 deletions(-)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 85d3572..dc8e40f 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -114,10 +114,18 @@ public class GrpcLogAppender extends LogAppender {
     getFollower().decreaseNextIndex(nextIndex);
   }
 
+  private boolean haveLogEntriesToSendOut() {
+    return shouldAppendEntries(getFollower().getNextIndex());
+  }
+
+  private boolean isFollowerCommitBehindLastCommitIndex() {
+    return getRaftLog().getLastCommittedIndex() > getFollower().getCommitIndex();
+  }
+
   @Override
-  protected void runAppenderImpl() throws IOException {
+  protected void run() throws IOException {
     boolean installSnapshotRequired;
-    for(; isAppenderRunning(); mayWait()) {
+    for(; isRunning(); mayWait()) {
       installSnapshotRequired = false;
 
       //HB period is expired OR we have messages OR follower is behind with commit index
@@ -174,9 +182,9 @@ public class GrpcLogAppender extends LogAppender {
   }
 
   @Override
-  public void stopAppender() {
+  public void stop() {
     grpcServerMetrics.unregister();
-    super.stopAppender();
+    super.stop();
   }
 
   @Override
@@ -203,7 +211,7 @@ public class GrpcLogAppender extends LogAppender {
       // prepare and enqueue the append request. note changes on follower's
       // nextIndex and ops on pendingRequests should always be associated
       // together and protected by the lock
-      pending = createRequest(callId++, excludeLogEntries);
+      pending = newAppendEntriesRequest(callId++, excludeLogEntries);
       if (pending == null) {
         return;
       }
@@ -216,7 +224,7 @@ public class GrpcLogAppender extends LogAppender {
       s = appendLogRequestObserver;
     }
 
-    if (isAppenderRunning()) {
+    if (isRunning()) {
       sendRequest(request, pending, s);
     }
   }
@@ -316,7 +324,7 @@ public class GrpcLogAppender extends LogAppender {
         default:
           throw new IllegalStateException("Unexpected reply result: " + reply.getResult());
       }
-      notifyAppend();
+      notifyLogAppender();
     }
 
     /**
@@ -324,7 +332,7 @@ public class GrpcLogAppender extends LogAppender {
      */
     @Override
     public void onError(Throwable t) {
-      if (!isAppenderRunning()) {
+      if (!isRunning()) {
         LOG.info("{} is stopped", GrpcLogAppender.this);
         return;
       }
@@ -382,7 +390,7 @@ public class GrpcLogAppender extends LogAppender {
 
     void close() {
       done.set(true);
-      GrpcLogAppender.this.notifyAppend();
+      notifyLogAppender();
     }
 
     synchronized boolean hasAllResponse() {
@@ -439,7 +447,7 @@ public class GrpcLogAppender extends LogAppender {
 
     @Override
     public void onError(Throwable t) {
-      if (!isAppenderRunning()) {
+      if (!isRunning()) {
         LOG.info("{} is stopped", GrpcLogAppender.this);
         return;
       }
@@ -477,7 +485,7 @@ public class GrpcLogAppender extends LogAppender {
     try {
       snapshotRequestObserver = getClient().installSnapshot(responseHandler);
       for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) {
-        if (isAppenderRunning()) {
+        if (isRunning()) {
           snapshotRequestObserver.onNext(request);
           getFollower().updateLastRpcSendTime();
           responseHandler.addPending(request);
@@ -496,7 +504,7 @@ public class GrpcLogAppender extends LogAppender {
     }
 
     synchronized (this) {
-      while (isAppenderRunning() && !responseHandler.isDone()) {
+      while (isRunning() && !responseHandler.isDone()) {
         try {
           wait();
         } catch (InterruptedException ignored) {
@@ -541,7 +549,7 @@ public class GrpcLogAppender extends LogAppender {
     }
 
     synchronized (this) {
-      if (isAppenderRunning() && !responseHandler.isDone()) {
+      if (isRunning() && !responseHandler.isDone()) {
         try {
           wait();
         } catch (InterruptedException ignored) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 3d52e44..ad6d7c37 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -284,7 +284,7 @@ class LeaderStateImpl implements LeaderState {
         server.getId().toString(), null);
     raftLog.append(placeHolder);
     processor.start();
-    senders.forEach(LogAppender::startAppender);
+    senders.forEach(LogAppender::start);
     return placeHolder;
   }
 
@@ -295,7 +295,7 @@ class LeaderStateImpl implements LeaderState {
   void stop() {
     this.running = false;
     // do not interrupt event processor since it may be in the middle of logSync
-    senders.forEach(LogAppender::stopAppender);
+    senders.forEach(LogAppender::stop);
     final NotLeaderException nle = server.generateNotLeaderException();
     final Collection<CommitInfoProto> commitInfos = server.getCommitInfos();
     try {
@@ -313,7 +313,7 @@ class LeaderStateImpl implements LeaderState {
   }
 
   void notifySenders() {
-    senders.forEach(LogAppender::notifyAppend);
+    senders.forEach(LogAppender::notifyLogAppender);
   }
 
   boolean inStagingState() {
@@ -466,7 +466,7 @@ class LeaderStateImpl implements LeaderState {
    * Update sender list for setConfiguration request
    */
   void addAndStartSenders(Collection<RaftPeer> newPeers) {
-    addSenders(newPeers, RaftLog.LEAST_VALID_LOG_INDEX, false).forEach(LogAppender::startAppender);
+    addSenders(newPeers, RaftLog.LEAST_VALID_LOG_INDEX, false).forEach(LogAppender::start);
   }
 
   Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers, long nextIndex, boolean attendVote) {
@@ -486,7 +486,7 @@ class LeaderStateImpl implements LeaderState {
 
   void stopAndRemoveSenders(Predicate<LogAppender> predicate) {
     final List<LogAppender> toStop = senders.stream().filter(predicate).collect(Collectors.toList());
-    toStop.forEach(LogAppender::stopAppender);
+    toStop.forEach(LogAppender::stop);
     senders.removeAll(toStop);
   }
 
@@ -494,7 +494,7 @@ class LeaderStateImpl implements LeaderState {
   public void restart(LogAppender sender) {
     final FollowerInfo follower = sender.getFollower();
     LOG.info("{}: Restarting {} for {}", this, JavaUtils.getClassSimpleName(sender.getClass()), follower.getName());
-    sender.stopAppender();
+    sender.stop();
     senders.removeAll(Collections.singleton(sender));
     addAndStartSenders(Collections.singleton(follower.getPeer()));
   }
@@ -504,7 +504,7 @@ class LeaderStateImpl implements LeaderState {
    */
   private void updateSenders(RaftConfiguration conf) {
     Preconditions.assertTrue(conf.isStable() && !inStagingState());
-    stopAndRemoveSenders(s -> !conf.containsInConf(s.getFollower().getPeer().getId()));
+    stopAndRemoveSenders(s -> !conf.containsInConf(s.getFollowerId()));
   }
 
   void submitStepDownEvent(StepDownReason reason) {
@@ -845,14 +845,14 @@ class LeaderStateImpl implements LeaderState {
   private List<List<RaftPeerId>> divideFollowers(RaftConfiguration conf) {
     List<List<RaftPeerId>> lists = new ArrayList<>(2);
     List<RaftPeerId> listForNew = senders.stream()
-        .filter(sender -> conf.containsInConf(sender.getFollower().getPeer().getId()))
-        .map(sender -> sender.getFollower().getPeer().getId())
+        .map(LogAppender::getFollowerId)
+        .filter(conf::containsInConf)
         .collect(Collectors.toList());
     lists.add(listForNew);
     if (conf.isTransitional()) {
       List<RaftPeerId> listForOld = senders.stream()
-          .filter(sender -> conf.containsInOldConf(sender.getFollower().getPeer().getId()))
-          .map(sender -> sender.getFollower().getPeer().getId())
+          .map(LogAppender::getFollowerId)
+          .filter(conf::containsInOldConf)
           .collect(Collectors.toList());
       lists.add(listForOld);
     }
@@ -923,7 +923,7 @@ class LeaderStateImpl implements LeaderState {
         .filter(sender -> sender.getFollower()
                                 .getLastRpcResponseTime()
                                 .elapsedTimeMs() <= server.getMaxTimeoutMs())
-        .map(sender -> sender.getFollower().getPeer().getId())
+        .map(LogAppender::getFollowerId)
         .collect(Collectors.toList());
 
     final RaftConfiguration conf = server.getRaftConf();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index a922e00..ada5424 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -50,7 +50,6 @@ public class LogAppender {
   private final String name;
   private final RaftServer.Division server;
   private final LeaderState leaderState;
-  private final RaftLog raftLog;
   private final FollowerInfo follower;
 
   private final DataQueue<EntryWithData> buffer;
@@ -64,7 +63,6 @@ public class LogAppender {
     this.name = follower.getName() + "-" + JavaUtils.getClassSimpleName(getClass());
     this.server = server;
     this.leaderState = leaderState;
-    this.raftLog = server.getRaftLog();
 
     final RaftProperties properties = server.getRaftServer().getProperties();
     this.snapshotChunkMaxSize = RaftServerConfigKeys.Log.Appender.snapshotChunkSizeMax(properties).getSizeInt();
@@ -85,7 +83,7 @@ public class LogAppender {
   }
 
   public RaftLog getRaftLog() {
-    return raftLog;
+    return getServer().getRaftLog();
   }
 
   @Override
@@ -93,15 +91,15 @@ public class LogAppender {
     return name;
   }
 
-  void startAppender() {
+  public void start() {
     daemon.tryToStart();
   }
 
-  public boolean isAppenderRunning() {
+  public boolean isRunning() {
     return daemon.isWorking();
   }
 
-  public void stopAppender() {
+  public void stop() {
     daemon.tryToClose();
   }
 
@@ -123,7 +121,7 @@ public class LogAppender {
     }
 
     final long previousIndex = nextIndex - 1;
-    final TermIndex previous = raftLog.getTermIndex(previousIndex);
+    final TermIndex previous = getRaftLog().getTermIndex(previousIndex);
     if (previous != null) {
       return previous;
     }
@@ -139,7 +137,7 @@ public class LogAppender {
     return null;
   }
 
-  protected AppendEntriesRequestProto createRequest(long callId,
+  protected AppendEntriesRequestProto newAppendEntriesRequest(long callId,
       boolean heartbeat) throws RaftLogIOException {
     final TermIndex previous = getPrevious(follower.getNextIndex());
     final long snapshotIndex = follower.getSnapshotIndex();
@@ -151,11 +149,11 @@ public class LogAppender {
 
     Preconditions.assertTrue(buffer.isEmpty(), () -> "buffer has " + buffer.getNumElements() + " elements.");
 
-    final long leaderNext = raftLog.getNextIndex();
+    final long leaderNext = getRaftLog().getNextIndex();
     final long followerNext = follower.getNextIndex();
     final long halfMs = heartbeatRemainingMs/2;
     for (long next = followerNext; leaderNext > next && getHeartbeatRemainingTime() - halfMs > 0; ) {
-      if (!buffer.offer(raftLog.getEntryWithData(next++))) {
+      if (!buffer.offer(getRaftLog().getEntryWithData(next++))) {
         break;
       }
     }
@@ -195,16 +193,16 @@ public class LogAppender {
       throws InterruptedException, InterruptedIOException, RaftLogIOException {
     int retry = 0;
     AppendEntriesRequestProto request = null;
-    while (isAppenderRunning()) { // keep retrying for IOException
+    while (isRunning()) { // keep retrying for IOException
       try {
         if (request == null || request.getEntriesCount() == 0) {
-          request = createRequest(DEFAULT_CALLID, false);
+          request = newAppendEntriesRequest(DEFAULT_CALLID, false);
         }
 
         if (request == null) {
           LOG.trace("{} no entries to send now, wait ...", this);
           return null;
-        } else if (!isAppenderRunning()) {
+        } else if (!isRunning()) {
           LOG.info("{} is stopped. Skip appendEntries.", this);
           return null;
         }
@@ -224,7 +222,7 @@ public class LogAppender {
         }
         handleException(ioe);
       }
-      if (isAppenderRunning()) {
+      if (isRunning()) {
         server.properties().rpcSleepTime().sleep();
       }
     }
@@ -275,11 +273,11 @@ public class LogAppender {
   }
 
   protected SnapshotInfo shouldInstallSnapshot() {
-    final long logStartIndex = raftLog.getStartIndex();
+    final long logStartIndex = getRaftLog().getStartIndex();
     // we should install snapshot if the follower needs to catch up and:
     // 1. there is no local log entry but there is snapshot
     // 2. or the follower's next index is smaller than the log start index
-    if (follower.getNextIndex() < raftLog.getNextIndex()) {
+    if (follower.getNextIndex() < getRaftLog().getNextIndex()) {
       final SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot();
       if (follower.getNextIndex() < logStartIndex ||
           (logStartIndex == RaftLog.INVALID_LOG_INDEX && snapshot != null)) {
@@ -290,13 +288,13 @@ public class LogAppender {
   }
 
   /** Check and send appendEntries RPC */
-  protected void runAppenderImpl() throws InterruptedException, IOException {
-    while (isAppenderRunning()) {
+  protected void run() throws InterruptedException, IOException {
+    while (isRunning()) {
       if (shouldSendRequest()) {
         SnapshotInfo snapshot = shouldInstallSnapshot();
         if (snapshot != null) {
           LOG.info("{}: followerNextIndex = {} but logStartIndex = {}, send snapshot {} to follower",
-              this, follower.getNextIndex(), raftLog.getStartIndex(), snapshot);
+              this, follower.getNextIndex(), getRaftLog().getStartIndex(), snapshot);
 
           final InstallSnapshotReplyProto r = installSnapshot(snapshot);
           if (r != null && r.getResult() == InstallSnapshotResult.NOT_LEADER) {
@@ -309,7 +307,7 @@ public class LogAppender {
           }
         }
       }
-      if (isAppenderRunning() && !shouldAppendEntries(follower.getNextIndex())) {
+      if (isRunning() && !shouldAppendEntries(follower.getNextIndex())) {
         final long waitTime = getHeartbeatRemainingTime();
         if (waitTime > 0) {
           synchronized (this) {
@@ -359,8 +357,10 @@ public class LogAppender {
     getServerRpc().handleException(getFollowerId(), e, false);
   }
 
-  public synchronized void notifyAppend() {
-    this.notify();
+  public void notifyLogAppender() {
+    synchronized (this) {
+      this.notify();
+    }
   }
 
   /** Should the leader send appendEntries RPC to this follower? */
@@ -368,16 +368,9 @@ public class LogAppender {
     return shouldAppendEntries(follower.getNextIndex()) || heartbeatTimeout();
   }
 
-  protected boolean haveLogEntriesToSendOut() {
-    return shouldAppendEntries(follower.getNextIndex());
-  }
-
-  protected boolean isFollowerCommitBehindLastCommitIndex() {
-    return raftLog.getLastCommittedIndex() > follower.getCommitIndex();
-  }
 
-  private boolean shouldAppendEntries(long followerIndex) {
-    return followerIndex < raftLog.getNextIndex();
+  public boolean shouldAppendEntries(long followerIndex) {
+    return followerIndex < getRaftLog().getNextIndex();
   }
 
   protected boolean heartbeatTimeout() {
@@ -393,7 +386,7 @@ public class LogAppender {
 
   protected boolean checkResponseTerm(long responseTerm) {
     synchronized (server) {
-      return isAppenderRunning() && leaderState.onFollowerTerm(follower, responseTerm);
+      return isRunning() && leaderState.onFollowerTerm(follower, responseTerm);
     }
   }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java
index 4d0a662..4c375a8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java
@@ -74,7 +74,7 @@ class LogAppenderDaemon {
   private void run() {
     try {
       if (lifeCycle.transition(TRY_TO_RUN) == RUNNING) {
-        logAppender.runAppenderImpl();
+        logAppender.run();
       }
       lifeCycle.compareAndTransition(RUNNING, CLOSING);
     } catch (InterruptedException e) {


[incubator-ratis] 02/02: RATIS-1209. Compare the performance between DataStreamApi and AsyncApi.

Posted by sz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch RATIS-1209
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git

commit c140982b3a402d41df7d35754e3877511b3d9b3a
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Sun Dec 6 19:55:41 2020 +0800

    RATIS-1209. Compare the performance between DataStreamApi and AsyncApi.
---
 .../ratis/examples/filestore/cli/DataStream.java   | 47 +++++++++++++++-------
 1 file changed, 32 insertions(+), 15 deletions(-)

diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
index 5ca07d2..7857cd4 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
@@ -23,6 +23,10 @@ import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.api.DataStreamOutput;
 import org.apache.ratis.examples.filestore.FileStoreClient;
 import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBufAllocator;
+import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
+import org.apache.ratis.util.Preconditions;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -31,6 +35,7 @@ import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -71,6 +76,10 @@ public class DataStream extends Client {
     Map<String, List<CompletableFuture<DataStreamReply>>> fileMap = new HashMap<>();
     for(String path : paths) {
       File file = new File(path);
+      final long fileLength = file.length();
+      Preconditions.assertTrue(fileLength == getFileSizeInBytes(),
+          "Unexpected file size: expected size is " + getFileSizeInBytes()
+              + " but actual size is " + fileLength);
       FileInputStream fis = new FileInputStream(file);
       final DataStreamOutput dataStreamOutput = fileStoreClient.getStreamOutput(path, (int) file.length());
 
@@ -106,23 +115,31 @@ public class DataStream extends Client {
 
   private List<CompletableFuture<DataStreamReply>> writeByDirectByteBuffer(DataStreamOutput dataStreamOutput,
       FileChannel fileChannel) throws IOException {
-    List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
-
-    int bytesToRead = getBufferSizeInBytes();
-    if (getFileSizeInBytes() > 0L && getFileSizeInBytes() < getBufferSizeInBytes()) {
-      bytesToRead = getFileSizeInBytes();
+    final int fileSize = getFileSizeInBytes();
+    final int bufferSize = getBufferSizeInBytes();
+    if (fileSize <= 0) {
+      return Collections.emptyList();
     }
 
-    ByteBuffer byteBuffer = ByteBuffer.allocateDirect(bytesToRead);
-    long offset = 0L;
-
-    while (fileChannel.read(byteBuffer) > 0) {
-      byteBuffer.flip();
-      futures.add(dataStreamOutput.writeAsync(byteBuffer, offset + bytesToRead == getFileSizeInBytes()));
-      offset += bytesToRead;
-      bytesToRead = (int) Math.min(getFileSizeInBytes() - offset, getBufferSizeInBytes());
-      if (bytesToRead > 0) {
-        byteBuffer = ByteBuffer.allocateDirect(bytesToRead);
+    List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+    final ByteBufAllocator alloc = PooledByteBufAllocator.DEFAULT;
+
+    for(long offset = 0L; offset < fileSize;) {
+      final ByteBuf buf = alloc.directBuffer(bufferSize);
+      final ByteBuffer byteBuffer = buf.nioBuffers()[0];
+      Preconditions.assertTrue(byteBuffer.remaining() > 0);
+
+      final int bytesRead = fileChannel.read(byteBuffer);
+      if (bytesRead < 0) {
+        throw new IllegalStateException("Failed to read " + fileSize
+            + " byte(s). The channel has reached end-of-stream at " + offset);
+      } else if (bytesRead > 0) {
+        offset += bytesRead;
+
+        byteBuffer.flip();
+        final CompletableFuture<DataStreamReply> f = dataStreamOutput.writeAsync(byteBuffer, offset == fileSize);
+        f.thenRun(buf::release);
+        futures.add(f);
       }
     }