You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/12/06 18:39:03 UTC

incubator-ratis git commit: RATIS-447. LogAppender should times out if readStateMachineData takes a long time.

Repository: incubator-ratis
Updated Branches:
  refs/heads/master 27a8cbb49 -> bef9a72e3


RATIS-447. LogAppender should times out if readStateMachineData takes a long time.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/bef9a72e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/bef9a72e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/bef9a72e

Branch: refs/heads/master
Commit: bef9a72e32ee93adf3d532c2397e01e098afff59
Parents: 27a8cbb
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
Authored: Thu Dec 6 10:38:38 2018 -0800
Committer: Tsz Wo Nicholas Sze <sz...@apache.org>
Committed: Thu Dec 6 10:38:38 2018 -0800

----------------------------------------------------------------------
 .../org/apache/ratis/util/CollectionUtils.java  |   8 +
 .../java/org/apache/ratis/util/DataQueue.java   | 142 +++++++++++++++
 .../org/apache/ratis/util/TimeDuration.java     |  24 ++-
 .../java/org/apache/ratis/util/Timestamp.java   |  19 +-
 .../function/CheckedFunctionWithTimeout.java    |  32 ++++
 .../apache/ratis/util/function/TriConsumer.java |  28 +++
 .../java/org/apache/ratis/TestBatchAppend.java  | 169 ------------------
 .../apache/ratis/grpc/server/GrpcService.java   |   4 +-
 .../hadooprpc/TestLogAppenderWithHadoopRpc.java |  25 +++
 .../ratis/server/RaftServerConfigKeys.java      |  28 +--
 .../apache/ratis/server/impl/LogAppender.java   |  98 +++--------
 .../apache/ratis/server/impl/ServerState.java   |   2 +-
 .../apache/ratis/server/storage/RaftLog.java    |  20 ++-
 .../ratis/server/storage/SegmentedRaftLog.java  |   2 +-
 .../java/org/apache/ratis/LogAppenderTests.java | 151 ++++++++++++++++
 .../java/org/apache/ratis/RaftBasicTests.java   |   1 -
 .../org/apache/ratis/RaftExceptionBaseTest.java |   2 +-
 .../SimpleStateMachine4Testing.java             |   4 +-
 .../ratis/grpc/TestLogAppenderWithGrpc.java     |  25 +++
 .../ratis/netty/TestLogAppenderWithNetty.java   |  25 +++
 .../TestLogAppenderWithSimulatedRpc.java        |  25 +++
 .../org/apache/ratis/util/TestDataQueue.java    | 172 +++++++++++++++++++
 .../org/apache/ratis/util/TestTimeDuration.java |  65 ++++++-
 23 files changed, 800 insertions(+), 271 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
index 57222a6..cb49847 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
@@ -28,6 +28,14 @@ import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
 public interface CollectionUtils {
+  static <T> T min(T left, T right, Comparator<T> comparator) {
+    return comparator.compare(left, right) < 0? left: right;
+  }
+
+  static <T extends Comparable<T>> T min(T left, T right) {
+    return min(left, right, Comparator.naturalOrder());
+  }
+
   /**
    *  @return the next element in the iteration right after the given element;
    *          if the given element is not in the iteration, return the first one

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java b/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
new file mode 100644
index 0000000..d7819cf
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.ratis.util.function.CheckedFunctionWithTimeout;
+import org.apache.ratis.util.function.TriConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.ToIntFunction;
+
+/**
+ * A queue for data elements
+ * such that the queue imposes limits on both number of elements and the data size in bytes.
+ *
+ * Null element is NOT supported.
+ *
+ * This class is NOT threadsafe.
+ */
+public class DataQueue<E> {
+  public static final Logger LOG = LoggerFactory.getLogger(DataQueue.class);
+
+  private final Object name;
+  private final int byteLimit;
+  private final int elementLimit;
+  private final ToIntFunction<E> getNumBytes;
+
+  private final Queue<E> q;
+
+  private int numBytes = 0;
+
+  public DataQueue(Object name, SizeInBytes byteLimit, int elementLimit, ToIntFunction<E> getNumBytes) {
+    this.name = name != null? name: this;
+    this.byteLimit = byteLimit.getSizeInt();
+    this.elementLimit = elementLimit;
+    this.getNumBytes = getNumBytes;
+    this.q = new ArrayDeque<>(elementLimit);
+  }
+
+  public int getNumBytes() {
+    return numBytes;
+  }
+
+  public int getNumElements() {
+    return q.size();
+  }
+
+  public final boolean isEmpty() {
+    return getNumElements() == 0;
+  }
+
+  public void clear() {
+    q.clear();
+    numBytes = 0;
+  }
+
+  /**
+   * Adds an element to this queue.
+   *
+   * @return true if the element is added successfully;
+   *         otherwise, the element is not added, return false.
+   */
+  public boolean offer(E element) {
+    Objects.requireNonNull(element, "element == null");
+    if (elementLimit > 0 && q.size() >= elementLimit) {
+      return false;
+    }
+    final int elementNumBytes = getNumBytes.applyAsInt(element);
+    Preconditions.assertTrue(elementNumBytes >= 0,
+        () -> name + ": elementNumBytes = " + elementNumBytes + " < 0");
+    if (byteLimit > 0) {
+      Preconditions.assertTrue(elementNumBytes <= byteLimit,
+          () -> name + ": elementNumBytes = " + elementNumBytes + " > byteLimit = " + byteLimit);
+      if (numBytes > byteLimit - elementNumBytes) {
+        return false;
+      }
+    }
+    q.offer(element);
+    numBytes += elementNumBytes;
+    return true;
+  }
+
+  /** Poll a list of the results within the given timeout. */
+  public <RESULT, THROWABLE extends Throwable> List<RESULT> pollList(long timeoutMs,
+      CheckedFunctionWithTimeout<E, RESULT, THROWABLE> getResult,
+      TriConsumer<E, TimeDuration, TimeoutException> timeoutHandler) throws THROWABLE {
+    if (timeoutMs <= 0 || q.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    final Timestamp startTime = Timestamp.currentTime();
+    final TimeDuration limit = TimeDuration.valueOf(timeoutMs, TimeUnit.MILLISECONDS);
+    for(final List<RESULT> results = new ArrayList<>();;) {
+      final E peeked = q.peek();
+      if (peeked == null) { // q is empty
+        return results;
+      }
+
+      final TimeDuration remaining = limit.minus(startTime.elapsedTime());
+      try {
+        results.add(getResult.apply(peeked, remaining));
+      } catch (TimeoutException e) {
+        Optional.ofNullable(timeoutHandler).ifPresent(h -> h.accept(peeked, remaining, e));
+        return results;
+      }
+
+      final E polled = poll();
+      Preconditions.assertTrue(polled == peeked);
+    }
+  }
+
+  /** Poll out the head element from this queue. */
+  public E poll() {
+    final E polled = q.poll();
+    numBytes -= getNumBytes.applyAsInt(polled);
+    return polled;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
index 7daa4dd..41ba1c6 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
@@ -118,16 +118,38 @@ public class TimeDuration implements Comparable<TimeDuration> {
     return unit;
   }
 
+  /**
+   * Convert this {@link TimeDuration} to a long in the target unit.
+   * Note that the returned value may be truncated or saturated; see {@link TimeUnit#convert(long, TimeUnit)}.*
+   *
+   * @return the value in the target unit.
+   */
   public long toLong(TimeUnit targetUnit) {
     return targetUnit.convert(duration, unit);
   }
 
+  /**
+   * The same as Math.toIntExact(toLong(targetUnit));
+   * Similar to {@link #toLong(TimeUnit)}, the returned value may be truncated.
+   * However, the returned value is never saturated.  The method throws {@link ArithmeticException} if it overflows.
+   *
+   * @return the value in the target unit.
+   * @throws ArithmeticException if it overflows.
+   */
   public int toInt(TimeUnit targetUnit) {
     return Math.toIntExact(toLong(targetUnit));
   }
 
+  /** @return the {@link TimeDuration} in the target unit. */
   public TimeDuration to(TimeUnit targetUnit) {
-    return valueOf(toLong(targetUnit), targetUnit);
+    return this.unit == targetUnit? this: valueOf(toLong(targetUnit), targetUnit);
+  }
+
+  /** @return (this - that) in the minimum unit among them. */
+  public TimeDuration minus(TimeDuration that) {
+    Objects.requireNonNull(that, "that == null");
+    final TimeUnit minUnit = CollectionUtils.min(this.unit, that.unit);
+    return valueOf(this.toLong(minUnit) - that.toLong(minUnit), minUnit);
   }
 
   /** Round up to the given nanos to nearest multiple (in nanoseconds) of this {@link TimeDuration}. */

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java b/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java
index 8ab3f6b..c33a864 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java
@@ -17,6 +17,8 @@
  */
 package org.apache.ratis.util;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * Use {@link System#nanoTime()} as timestamps.
  *
@@ -39,6 +41,11 @@ public class Timestamp implements Comparable<Timestamp> {
     return System.nanoTime();
   }
 
+  /** @return a {@link Timestamp} for the current time. */
+  public static Timestamp currentTime() {
+    return valueOf(currentTimeNanos());
+  }
+
   /** @return the latest timestamp. */
   public static Timestamp latest(Timestamp a, Timestamp b) {
     return a.compareTo(b) > 0? a: b;
@@ -66,8 +73,7 @@ public class Timestamp implements Comparable<Timestamp> {
 
   /**
    * @return the elapsed time in milliseconds.
-   *         If the timestamp is a future time,
-   *         this method returns a negative value.
+   *         If the timestamp is a future time, the returned value is negative.
    */
   public long elapsedTimeMs() {
     final long d = System.nanoTime() - nanos;
@@ -75,6 +81,15 @@ public class Timestamp implements Comparable<Timestamp> {
   }
 
   /**
+   * @return the elapsed time in nanoseconds.
+   *         If the timestamp is a future time, the returned value is negative.
+   */
+  public TimeDuration elapsedTime() {
+    final long d = System.nanoTime() - nanos;
+    return TimeDuration.valueOf(d, TimeUnit.NANOSECONDS);
+  }
+
+  /**
    * Compare two timestamps, t0 (this) and t1 (that).
    * This method uses {@code t0 - t1 < 0}, not {@code t0 < t1},
    * in order to take care the possibility of numerical overflow.

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedFunctionWithTimeout.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedFunctionWithTimeout.java b/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedFunctionWithTimeout.java
new file mode 100644
index 0000000..fddfab2
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedFunctionWithTimeout.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util.function;
+
+import org.apache.ratis.util.TimeDuration;
+
+import java.util.concurrent.TimeoutException;
+
+/** Function with a timeout and a throws-clause. */
+@FunctionalInterface
+public interface CheckedFunctionWithTimeout<INPUT, OUTPUT, THROWABLE extends Throwable> {
+  /**
+   * The same as {@link org.apache.ratis.util.CheckedFunction#apply(Object)}
+   * except that this method has a timeout parameter and throws {@link TimeoutException}.
+   */
+  OUTPUT apply(INPUT input, TimeDuration timeout) throws TimeoutException, THROWABLE;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-common/src/main/java/org/apache/ratis/util/function/TriConsumer.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/function/TriConsumer.java b/ratis-common/src/main/java/org/apache/ratis/util/function/TriConsumer.java
new file mode 100644
index 0000000..a3cd283
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/function/TriConsumer.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util.function;
+
+/** Consumer with three input parameters. */
+@FunctionalInterface
+public interface TriConsumer<T, U, V> {
+  /**
+   * The same as {@link java.util.function.BiConsumer#accept(Object, Object)}}
+   * except that this method is declared with three parameters.
+   */
+  void accept(T t, U u, V v);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-examples/src/test/java/org/apache/ratis/TestBatchAppend.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/TestBatchAppend.java b/ratis-examples/src/test/java/org/apache/ratis/TestBatchAppend.java
deleted file mode 100644
index 7233e8f..0000000
--- a/ratis-examples/src/test/java/org/apache/ratis/TestBatchAppend.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis;
-
-import org.apache.log4j.Level;
-import org.apache.ratis.RaftTestUtil.SimpleMessage;
-import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.examples.ParameterizedBaseTest;
-import org.apache.ratis.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.ServerState;
-import org.apache.ratis.server.storage.RaftLog;
-import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
-import org.apache.ratis.statemachine.StateMachine;
-import org.apache.ratis.util.LogUtils;
-import org.apache.ratis.util.SizeInBytes;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.EnumMap;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/**
- * Enable raft.server.log.appender.batch.enabled and test LogAppender
- */
-@RunWith(Parameterized.class)
-public class TestBatchAppend extends BaseTest {
-  static {
-    LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
-    LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
-  }
-
-  @Parameterized.Parameters
-  public static Collection<Object[]> data() throws IOException {
-    RaftProperties prop = new RaftProperties();
-    prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
-        SimpleStateMachine4Testing.class, StateMachine.class);
-    RaftServerConfigKeys.Log.setSegmentSizeMax(prop, SizeInBytes.valueOf("8KB"));
-    // enable batch appending
-    RaftServerConfigKeys.Log.Appender.setBatchEnabled(prop, true);
-    // set batch appending buffer size to 4KB
-    RaftServerConfigKeys.Log.Appender.setBufferCapacity(prop, SizeInBytes.valueOf("8KB"));
-
-    return ParameterizedBaseTest.getMiniRaftClusters(prop, 3);
-  }
-
-  @Parameterized.Parameter
-  public MiniRaftCluster cluster;
-
-  @After
-  public void tearDown() {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-  }
-
-  private class Sender extends Thread {
-    private final RaftClient client;
-    private final CountDownLatch latch;
-    private final SimpleMessage[] msgs;
-    private final AtomicBoolean succeed = new AtomicBoolean(false);
-
-    Sender(RaftPeerId leaderId, CountDownLatch latch, int numMsg) {
-      this.latch = latch;
-      this.client = cluster.createClient(leaderId);
-      msgs = generateMsgs(numMsg);
-    }
-
-    SimpleMessage[] generateMsgs(int num) {
-      SimpleMessage[] msgs = new SimpleMessage[num * 6];
-      for (int i = 0; i < num; i++) {
-        for (int j = 0; j < 6; j++) {
-          byte[] bytes = new byte[1024 * (j + 1)];
-          Arrays.fill(bytes, (byte) (j + '0'));
-          msgs[i * 6 + j] = new SimpleMessage(new String(bytes));
-        }
-      }
-      return msgs;
-    }
-
-    @Override
-    public void run() {
-      try {
-        latch.await();
-      } catch (InterruptedException ignored) {
-        LOG.warn("Client {} waiting for countdown latch got interrupted",
-            client.getId());
-      }
-      for (SimpleMessage msg : msgs) {
-        try {
-          client.send(msg);
-        } catch (IOException e) {
-          succeed.set(false);
-          LOG.warn("Client {} hit exception {}", client.getId(), e);
-          return;
-        }
-      }
-      succeed.set(true);
-      try {
-        client.close();
-      } catch (IOException ignore) {
-      }
-    }
-  }
-
-  @Test
-  public void testAppend() throws Exception {
-    final int numMsgs = 10;
-    final int numClients = 5;
-    cluster.start();
-    RaftTestUtil.waitForLeader(cluster);
-    final RaftPeerId leaderId = cluster.getLeader().getId();
-
-    // start several clients and write concurrently
-    CountDownLatch latch = new CountDownLatch(1);
-    final List<Sender> senders = Stream.iterate(0, i -> i+1).limit(numClients)
-        .map(i -> new Sender(leaderId, latch, numMsgs))
-        .collect(Collectors.toList());
-    senders.forEach(Thread::start);
-
-    latch.countDown();
-
-    for (Sender s : senders) {
-      s.join();
-      Assert.assertTrue(s.succeed.get());
-    }
-
-    final ServerState leaderState = cluster.getLeader().getState();
-    final RaftLog leaderLog = leaderState.getLog();
-    final EnumMap<LogEntryBodyCase, AtomicLong> counts = RaftTestUtil.countEntries(leaderLog);
-    LOG.info("counts = " + counts);
-    Assert.assertEquals(6 * numMsgs * numClients, counts.get(LogEntryBodyCase.STATEMACHINELOGENTRY).get());
-
-    final LogEntryProto lastStateMachineEntry = RaftTestUtil.getLastEntry(LogEntryBodyCase.STATEMACHINELOGENTRY, leaderLog);
-    LOG.info("lastStateMachineEntry = " + lastStateMachineEntry);
-    Assert.assertTrue(lastStateMachineEntry.getIndex() <= leaderState.getLastAppliedIndex());
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
index 9c94cca..4bd370f 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -67,7 +67,7 @@ public class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocolClient
     this(server, server::getId,
         GrpcConfigKeys.Server.port(server.getProperties()),
         GrpcConfigKeys.messageSizeMax(server.getProperties(), LOG::info),
-        RaftServerConfigKeys.Log.Appender.bufferCapacity(server.getProperties()),
+        RaftServerConfigKeys.Log.Appender.bufferByteLimit(server.getProperties()),
         GrpcConfigKeys.flowControlWindow(server.getProperties(), LOG::info),
         RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties()));
   }
@@ -78,7 +78,7 @@ public class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocolClient
         p -> new GrpcServerProtocolClient(p, flowControlWindow.getSizeInt(), requestTimeoutDuration)));
     if (appenderBufferSize.getSize() > grpcMessageSizeMax.getSize()) {
       throw new IllegalArgumentException("Illegal configuration: "
-          + RaftServerConfigKeys.Log.Appender.BUFFER_CAPACITY_KEY + " = " + appenderBufferSize
+          + RaftServerConfigKeys.Log.Appender.BUFFER_BYTE_LIMIT_KEY + " = " + appenderBufferSize
           + " > " + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + " = " + grpcMessageSizeMax);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestLogAppenderWithHadoopRpc.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestLogAppenderWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestLogAppenderWithHadoopRpc.java
new file mode 100644
index 0000000..48489ad
--- /dev/null
+++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestLogAppenderWithHadoopRpc.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc;
+
+import org.apache.ratis.LogAppenderTests;
+
+public class TestLogAppenderWithHadoopRpc
+    extends LogAppenderTests<MiniRaftClusterWithHadoopRpc>
+    implements MiniRaftClusterWithHadoopRpc.Factory.Get {
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 25d4b0c..32f5752 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -227,23 +227,25 @@ public interface RaftServerConfigKeys {
     interface Appender {
       String PREFIX = Log.PREFIX + ".appender";
 
-      String BUFFER_CAPACITY_KEY = PREFIX + ".buffer.capacity";
-      SizeInBytes BUFFER_CAPACITY_DEFAULT =SizeInBytes.valueOf("4MB");
-      static SizeInBytes bufferCapacity(RaftProperties properties) {
-        return getSizeInBytes(properties::getSizeInBytes,
-            BUFFER_CAPACITY_KEY, BUFFER_CAPACITY_DEFAULT, getDefaultLog());
+      String BUFFER_ELEMENT_LIMIT_KEY = PREFIX + ".buffer.element-limit";
+      /** 0 means no limit. */
+      int BUFFER_ELEMENT_LIMIT_DEFAULT = 0;
+      static int bufferElementLimit(RaftProperties properties) {
+        return getInt(properties::getInt,
+            BUFFER_ELEMENT_LIMIT_KEY, BUFFER_ELEMENT_LIMIT_DEFAULT, getDefaultLog(), requireMin(0));
       }
-      static void setBufferCapacity(RaftProperties properties, SizeInBytes bufferCapacity) {
-        setSizeInBytes(properties::set, BUFFER_CAPACITY_KEY, bufferCapacity);
+      static void setBufferElementLimit(RaftProperties properties, int bufferElementLimit) {
+        setInt(properties::setInt, BUFFER_ELEMENT_LIMIT_KEY, bufferElementLimit);
       }
 
-      String BATCH_ENABLED_KEY = PREFIX + ".batch.enabled";
-      boolean BATCH_ENABLED_DEFAULT = false;
-      static boolean batchEnabled(RaftProperties properties) {
-        return getBoolean(properties::getBoolean, BATCH_ENABLED_KEY, BATCH_ENABLED_DEFAULT, getDefaultLog());
+      String BUFFER_BYTE_LIMIT_KEY = PREFIX + ".buffer.byte-limit";
+      SizeInBytes BUFFER_BYTE_LIMIT_DEFAULT = SizeInBytes.valueOf("4MB");
+      static SizeInBytes bufferByteLimit(RaftProperties properties) {
+        return getSizeInBytes(properties::getSizeInBytes,
+            BUFFER_BYTE_LIMIT_KEY, BUFFER_BYTE_LIMIT_DEFAULT, getDefaultLog());
       }
-      static void setBatchEnabled(RaftProperties properties, boolean batchEnabled) {
-        setBoolean(properties::setBoolean, BATCH_ENABLED_KEY, batchEnabled);
+      static void setBufferByteLimit(RaftProperties properties, SizeInBytes bufferByteLimit) {
+        setSizeInBytes(properties::set, BUFFER_BYTE_LIMIT_KEY, bufferByteLimit);
       }
 
       String SNAPSHOT_CHUNK_SIZE_MAX_KEY = PREFIX + ".snapshot.chunk.size.max";

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index 6cb8538..633496e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -58,9 +58,8 @@ public class LogAppender {
   private final LeaderState leaderState;
   protected final RaftLog raftLog;
   protected final FollowerInfo follower;
-  private final int maxBufferSize;
-  private final boolean batchSending;
-  private final LogEntryBuffer buffer;
+
+  private final DataQueue<EntryWithData> buffer;
   private final int snapshotChunkMaxSize;
   protected final long halfMinTimeoutMs;
 
@@ -74,12 +73,12 @@ public class LogAppender {
     this.raftLog = server.getState().getLog();
 
     final RaftProperties properties = server.getProxy().getProperties();
-    this.maxBufferSize = RaftServerConfigKeys.Log.Appender.bufferCapacity(properties).getSizeInt();
-    this.batchSending = RaftServerConfigKeys.Log.Appender.batchEnabled(properties);
     this.snapshotChunkMaxSize = RaftServerConfigKeys.Log.Appender.snapshotChunkSizeMax(properties).getSizeInt();
     this.halfMinTimeoutMs = server.getMinTimeoutMs() / 2;
 
-    this.buffer = new LogEntryBuffer();
+    final SizeInBytes bufferByteLimit = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
+    final int bufferElementLimit = RaftServerConfigKeys.Log.Appender.bufferElementLimit(properties);
+    this.buffer = new DataQueue<>(this, bufferByteLimit, bufferElementLimit, EntryWithData::getSerializedSize);
     this.lifeCycle = new LifeCycle(this);
   }
 
@@ -133,51 +132,6 @@ public class LogAppender {
     return getFollower().getPeer().getId();
   }
 
-  /**
-   * A buffer for log entries with size limitation.
-   */
-  private class LogEntryBuffer {
-    private final List<EntryWithData> buf = new ArrayList<>();
-    private int totalSize = 0;
-
-    /**
-     * Adds a log entry to the Log entry buffer.
-     * Checks if enough space is available before adding the entry to the buffer.
-     * @return true if the entry is added successfully;
-     *         otherwise, the entry is not added, return false.
-     */
-    boolean addEntry(EntryWithData entry) {
-      final int entrySize = entry.getSerializedSize();
-      if (totalSize + entrySize <= maxBufferSize) {
-        buf.add(entry);
-        totalSize += entrySize;
-        return true;
-      }
-      return false;
-    }
-
-    boolean isEmpty() {
-      return buf.isEmpty();
-    }
-
-    AppendEntriesRequestProto getAppendRequest(TermIndex previous, long callId) throws RaftLogIOException {
-      final List<LogEntryProto> protos = new ArrayList<>();
-      // Wait for all the log entry futures to complete and then create a list of LogEntryProtos.
-      for (EntryWithData bufEntry : buf) {
-        protos.add(bufEntry.getEntry());
-      }
-      final AppendEntriesRequestProto request = leaderState.newAppendEntriesRequestProto(
-          getFollowerId(), previous, protos, !follower.isAttendingVote(), callId);
-      buf.clear();
-      totalSize = 0;
-      return request;
-    }
-
-    int getPendingEntryNum() {
-      return buf.size();
-    }
-  }
-
   private TermIndex getPrevious() {
     TermIndex previous = raftLog.getTermIndex(follower.getNextIndex() - 1);
     if (previous == null) {
@@ -194,28 +148,29 @@ public class LogAppender {
 
   protected AppendEntriesRequestProto createRequest(long callId) throws RaftLogIOException {
     final TermIndex previous = getPrevious();
+    final long heartbeatRemainingMs = getHeartbeatRemainingTime();
+    if (heartbeatRemainingMs <= 0L) {
+      return leaderState.newAppendEntriesRequestProto(
+          getFollowerId(), previous, Collections.emptyList(), !follower.isAttendingVote(), callId);
+    }
+
+    Preconditions.assertTrue(buffer.isEmpty(), () -> "buffer has " + buffer.getNumElements() + " elements.");
+
     final long leaderNext = raftLog.getNextIndex();
-    long next = follower.getNextIndex() + buffer.getPendingEntryNum();
-    final boolean toSend;
-
-    if (leaderNext == next && !buffer.isEmpty()) {
-      // no new entries, then send out the entries in the buffer
-      toSend = true;
-    } else if (leaderNext > next) {
-      boolean hasSpace = true;
-      for(; hasSpace && leaderNext > next;) {
-        hasSpace = buffer.addEntry(raftLog.getEntryWithData(next++));
+    for (long next = follower.getNextIndex(); leaderNext > next; ) {
+      if (!buffer.offer(raftLog.getEntryWithData(next++))) {
+        break;
       }
-      // buffer is full or batch sending is disabled, send out a request
-      toSend = !hasSpace || !batchSending;
-    } else {
-      toSend = false;
     }
-
-    if (toSend || shouldHeartbeat()) {
-      return buffer.getAppendRequest(previous, callId);
+    if (buffer.isEmpty()) {
+      return null;
     }
-    return null;
+
+    final List<LogEntryProto> protos = buffer.pollList(heartbeatRemainingMs, EntryWithData::getEntry,
+        (entry, time, exception) -> LOG.warn(this + ": Failed get " + entry + " in " + time, exception));
+    buffer.clear();
+    return leaderState.newAppendEntriesRequestProto(
+        getFollowerId(), previous, protos, !follower.isAttendingVote(), callId);
   }
 
   /** Send an appendEntries RPC; retry indefinitely. */
@@ -442,8 +397,7 @@ public class LogAppender {
           }
         }
       }
-      if (isAppenderRunning() && !shouldAppendEntries(
-          follower.getNextIndex() + buffer.getPendingEntryNum())) {
+      if (isAppenderRunning() && !shouldAppendEntries(follower.getNextIndex())) {
         final long waitTime = getHeartbeatRemainingTime();
         if (waitTime > 0) {
           synchronized (this) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index ee5218d..7dfc331 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -182,7 +182,7 @@ public class ServerState implements Closeable {
     final RaftLog log;
     if (RaftServerConfigKeys.Log.useMemory(prop)) {
       final int maxBufferSize =
-          RaftServerConfigKeys.Log.Appender.bufferCapacity(prop).getSizeInt();
+          RaftServerConfigKeys.Log.Appender.bufferByteLimit(prop).getSizeInt();
       log = new MemoryRaftLog(id, lastIndexInSnapshot, maxBufferSize);
     } else {
       log = new SegmentedRaftLog(id, server, this.storage,

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
index 8478617..155122e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -31,6 +31,7 @@ import org.apache.ratis.util.AutoCloseableLock;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.OpenCloseState;
 import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +40,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
 
@@ -412,18 +414,25 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
       this.future = future;
     }
 
+    public long getIndex() {
+      return logEntry.getIndex();
+    }
+
     public int getSerializedSize() {
       return ServerProtoUtils.getSerializedSize(logEntry);
     }
 
-    public LogEntryProto getEntry() throws RaftLogIOException {
+    public LogEntryProto getEntry(TimeDuration timeout) throws RaftLogIOException, TimeoutException {
       LogEntryProto entryProto;
       if (future == null) {
         return logEntry;
       }
 
       try {
-        entryProto = future.thenApply(data -> ServerProtoUtils.addStateMachineData(data, logEntry)).join();
+        entryProto = future.thenApply(data -> ServerProtoUtils.addStateMachineData(data, logEntry))
+            .get(timeout.getDuration(), timeout.getUnit());
+      } catch (TimeoutException t) {
+        throw t;
       } catch (Throwable t) {
         final String err = selfId + ": Failed readStateMachineData for " +
             ServerProtoUtils.toLogEntryString(logEntry);
@@ -440,5 +449,10 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
       }
       return entryProto;
     }
+
+    @Override
+    public String toString() {
+      return ServerProtoUtils.toLogEntryString(logEntry);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
index d23e0a5..f5a7330 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
@@ -111,7 +111,7 @@ public class SegmentedRaftLog extends RaftLog {
   SegmentedRaftLog(RaftPeerId selfId, RaftServerImpl server,
       StateMachine stateMachine, Runnable submitUpdateCommitEvent,
       RaftStorage storage, long lastIndexInSnapshot, RaftProperties properties) {
-    super(selfId, lastIndexInSnapshot, RaftServerConfigKeys.Log.Appender.bufferCapacity(properties).getSizeInt());
+    super(selfId, lastIndexInSnapshot, RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties).getSizeInt());
     this.server = Optional.ofNullable(server);
     this.storage = storage;
     segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
new file mode 100644
index 0000000..c8ddc0d
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.LogAppender;
+import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.impl.ServerState;
+import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.LogUtils;
+import org.apache.ratis.util.SizeInBytes;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public abstract class LogAppenderTests<CLUSTER extends MiniRaftCluster>
+    extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+  {
+    LogUtils.setLogLevel(LogAppender.LOG, Level.DEBUG);
+  }
+
+  {
+    final RaftProperties prop = getProperties();
+    prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SimpleStateMachine4Testing.class, StateMachine.class);
+
+    final SizeInBytes n = SizeInBytes.valueOf("8KB");
+    RaftServerConfigKeys.Log.setSegmentSizeMax(prop, n);
+    RaftServerConfigKeys.Log.Appender.setBufferByteLimit(prop, n);
+  }
+
+  static SimpleMessage[] generateMsgs(int num) {
+    SimpleMessage[] msgs = new SimpleMessage[num * 6];
+    for (int i = 0; i < num; i++) {
+      for (int j = 0; j < 6; j++) {
+        byte[] bytes = new byte[1024 * (j + 1)];
+        Arrays.fill(bytes, (byte) (j + '0'));
+        msgs[i * 6 + j] = new SimpleMessage(new String(bytes));
+      }
+    }
+    return msgs;
+  }
+
+  private static class Sender extends Thread {
+    private final RaftClient client;
+    private final CountDownLatch latch;
+    private final SimpleMessage[] messages;
+    private final AtomicBoolean succeed = new AtomicBoolean(false);
+    private final AtomicReference<Exception> exception = new AtomicReference<>();
+
+    Sender(RaftClient client, int numMessages, CountDownLatch latch) {
+      this.latch = latch;
+      this.client = client;
+      this.messages = generateMsgs(numMessages);
+    }
+
+    @Override
+    public void run() {
+      try {
+        latch.await();
+        for (SimpleMessage msg : messages) {
+          client.send(msg);
+        }
+        client.close();
+        succeed.set(true);
+      } catch (Exception e) {
+        exception.compareAndSet(null, e);
+      }
+    }
+  }
+
+  @Test
+  public void testSingleElementBuffer() throws Exception {
+    RaftServerConfigKeys.Log.Appender.setBufferElementLimit(getProperties(), 1);
+    runWithNewCluster(3, this::runTest);
+  }
+
+  @Test
+  public void testUnlimitedElementBuffer() throws Exception {
+    RaftServerConfigKeys.Log.Appender.setBufferElementLimit(getProperties(), 0);
+    runWithNewCluster(3, this::runTest);
+  }
+
+  void runTest(CLUSTER cluster) throws Exception {
+    final int numMsgs = 10;
+    final int numClients = 5;
+    final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
+
+    // start several clients and write concurrently
+    final CountDownLatch latch = new CountDownLatch(1);
+    final List<Sender> senders = Stream.iterate(0, i -> i+1).limit(numClients)
+        .map(i -> new Sender(cluster.createClient(leaderId), numMsgs, latch))
+        .collect(Collectors.toList());
+    senders.forEach(Thread::start);
+
+    latch.countDown();
+
+    for (Sender s : senders) {
+      s.join();
+      final Exception e = s.exception.get();
+      if (e != null) {
+        throw e;
+      }
+      Assert.assertTrue(s.succeed.get());
+    }
+
+    final ServerState leaderState = cluster.getLeader().getState();
+    final RaftLog leaderLog = leaderState.getLog();
+    final EnumMap<LogEntryBodyCase, AtomicLong> counts = RaftTestUtil.countEntries(leaderLog);
+    LOG.info("counts = " + counts);
+    Assert.assertEquals(6 * numMsgs * numClients, counts.get(LogEntryBodyCase.STATEMACHINELOGENTRY).get());
+
+    final LogEntryProto last = RaftTestUtil.getLastEntry(LogEntryBodyCase.STATEMACHINELOGENTRY, leaderLog);
+    LOG.info("last = " + ServerProtoUtils.toLogEntryString(last));
+    Assert.assertNotNull(last);
+    Assert.assertTrue(last.getIndex() <= leaderState.getLastAppliedIndex());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 90cc627..a21796f 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -61,7 +61,6 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
   {
     LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
     LogUtils.setLogLevel(RaftServerTestUtil.getStateMachineUpdaterLog(), Level.DEBUG);
-    LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
 
     RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration.valueOf(5, TimeUnit.SECONDS));
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
index 58e26b2..bf43ba6 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
@@ -55,7 +55,7 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster>
   public void setup() throws IOException {
     final RaftProperties prop = getProperties();
     RaftServerConfigKeys.Log.Appender
-        .setBufferCapacity(prop, SizeInBytes.valueOf("4KB"));
+        .setBufferByteLimit(prop, SizeInBytes.valueOf("4KB"));
     cluster = newCluster(NUM_PEERS);
     cluster.start();
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index d4c4021..6306ce2 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -154,7 +154,9 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
     Preconditions.assertNull(previous, "previous");
     final String s = entry.getStateMachineLogEntry().getLogData().toStringUtf8();
     dataMap.put(s, entry);
-    LOG.info("{}: put {}, {} -> {}", getId(), entry.getIndex(), s, ServerProtoUtils.toLogEntryString(entry));
+    LOG.info("{}: put {}, {} -> {}", getId(), entry.getIndex(),
+        s.length() <= 10? s: s.substring(0, 10) + "...",
+        ServerProtoUtils.toLogEntryString(entry));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
new file mode 100644
index 0000000..5918efd
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.LogAppenderTests;
+
+public class TestLogAppenderWithGrpc
+    extends LogAppenderTests<MiniRaftClusterWithGrpc>
+    implements MiniRaftClusterWithGrpc.FactoryGet {
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-test/src/test/java/org/apache/ratis/netty/TestLogAppenderWithNetty.java
----------------------------------------------------------------------
diff --git a/ratis-test/src/test/java/org/apache/ratis/netty/TestLogAppenderWithNetty.java b/ratis-test/src/test/java/org/apache/ratis/netty/TestLogAppenderWithNetty.java
new file mode 100644
index 0000000..85427a7
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/netty/TestLogAppenderWithNetty.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty;
+
+import org.apache.ratis.LogAppenderTests;
+
+public class TestLogAppenderWithNetty
+    extends LogAppenderTests<MiniRaftClusterWithNetty>
+    implements MiniRaftClusterWithNetty.FactoryGet {
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestLogAppenderWithSimulatedRpc.java
----------------------------------------------------------------------
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestLogAppenderWithSimulatedRpc.java b/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestLogAppenderWithSimulatedRpc.java
new file mode 100644
index 0000000..a23ce1d
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestLogAppenderWithSimulatedRpc.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.simulation;
+
+import org.apache.ratis.LogAppenderTests;
+
+public class TestLogAppenderWithSimulatedRpc
+    extends LogAppenderTests<MiniRaftClusterWithSimulatedRpc>
+    implements MiniRaftClusterWithSimulatedRpc.FactoryGet {
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java
----------------------------------------------------------------------
diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java b/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java
new file mode 100644
index 0000000..e465a1d
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.ratis.util.function.TriConsumer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+public class TestDataQueue {
+  static <T> TriConsumer<T, TimeDuration, TimeoutException> getTimeoutHandler(boolean expctedTimeout) {
+    return (element, time, exception) -> {
+      if (!expctedTimeout) {
+        throw new AssertionError("Unexpected timeout to get element " + element + " in " + time, exception);
+      }
+    };
+  }
+
+  private void assertSizes(int expectedNumElements, int expectedNumBytes) {
+    Assert.assertEquals(expectedNumElements, q.getNumElements());
+    Assert.assertEquals(expectedNumBytes, q.getNumBytes());
+  }
+
+  final SizeInBytes byteLimit = SizeInBytes.valueOf(100);
+  final int elementLimit = 5;
+  final DataQueue<Integer> q = new DataQueue<>(null, byteLimit, elementLimit, Integer::intValue);
+
+  @Test(timeout = 1000)
+  public void testElementLimit() {
+    assertSizes(0, 0);
+
+    int numBytes = 0;
+    for (int i = 0; i < elementLimit; i++) {
+      Assert.assertEquals(i, q.getNumElements());
+      Assert.assertEquals(numBytes, q.getNumBytes());
+      final boolean offered = q.offer(i);
+      Assert.assertTrue(offered);
+      numBytes += i;
+      assertSizes(i+1, numBytes);
+    }
+    {
+      final boolean offered = q.offer(0);
+      Assert.assertFalse(offered);
+      assertSizes(elementLimit, numBytes);
+    }
+
+    { // poll all elements
+      final List<Integer> polled = q.pollList(100, (i, timeout) -> i, getTimeoutHandler(false));
+      Assert.assertEquals(elementLimit, polled.size());
+      for (int i = 0; i < polled.size(); i++) {
+        Assert.assertEquals(i, polled.get(i).intValue());
+      }
+    }
+    assertSizes(0, 0);
+  }
+
+  @Test(timeout = 1000)
+  public void testByteLimit() {
+    assertSizes(0, 0);
+
+    try {
+      q.offer(byteLimit.getSizeInt() + 1);
+      Assert.fail();
+    } catch (IllegalStateException ignored) {
+    }
+
+    final int halfBytes = byteLimit.getSizeInt() / 2;
+    {
+      final boolean offered = q.offer(halfBytes);
+      Assert.assertTrue(offered);
+      assertSizes(1, halfBytes);
+    }
+
+    {
+      final boolean offered = q.offer(halfBytes + 1);
+      Assert.assertFalse(offered);
+      assertSizes(1, halfBytes);
+    }
+
+    {
+      final boolean offered = q.offer(halfBytes);
+      Assert.assertTrue(offered);
+      assertSizes(2, byteLimit.getSizeInt());
+    }
+
+    {
+      final boolean offered = q.offer(1);
+      Assert.assertFalse(offered);
+      assertSizes(2, byteLimit.getSizeInt());
+    }
+
+    {
+      final boolean offered = q.offer(0);
+      Assert.assertTrue(offered);
+      assertSizes(3, byteLimit.getSizeInt());
+    }
+
+    { // poll all elements
+      final List<Integer> polled = q.pollList(100, (i, timeout) -> i, getTimeoutHandler(false));
+      Assert.assertEquals(3, polled.size());
+      Assert.assertEquals(halfBytes, polled.get(0).intValue());
+      Assert.assertEquals(halfBytes, polled.get(1).intValue());
+      Assert.assertEquals(0, polled.get(2).intValue());
+    }
+
+    assertSizes(0, 0);
+  }
+
+  @Test(timeout = 1000)
+  public void testTimeout() {
+    assertSizes(0, 0);
+
+    int numBytes = 0;
+    for (int i = 0; i < elementLimit; i++) {
+      Assert.assertEquals(i, q.getNumElements());
+      Assert.assertEquals(numBytes, q.getNumBytes());
+      final boolean offered = q.offer(i);
+      Assert.assertTrue(offered);
+      numBytes += i;
+      assertSizes(i+1, numBytes);
+    }
+
+    { // poll with zero time
+      final List<Integer> polled = q.pollList(0, (i, timeout) -> i, getTimeoutHandler(false));
+      Assert.assertTrue(polled.isEmpty());
+      assertSizes(elementLimit, numBytes);
+    }
+
+    final int halfElements = elementLimit / 2;
+    { // poll with timeout
+      final List<Integer> polled = q.pollList(100, (i, timeout) -> {
+        if (i == halfElements) {
+          // simulate timeout
+          throw new TimeoutException("i=" + i);
+        }
+        return i;
+      }, getTimeoutHandler(true));
+      Assert.assertEquals(halfElements, polled.size());
+      for (int i = 0; i < polled.size(); i++) {
+        Assert.assertEquals(i, polled.get(i).intValue());
+        numBytes -= i;
+      }
+      assertSizes(elementLimit - halfElements, numBytes);
+    }
+
+    { // poll the remaining elements
+      final List<Integer> polled = q.pollList(100, (i, timeout) -> i, getTimeoutHandler(false));
+      Assert.assertEquals(elementLimit - halfElements, polled.size());
+      for (int i = 0; i < polled.size(); i++) {
+        Assert.assertEquals(halfElements + i, polled.get(i).intValue());
+      }
+    }
+    assertSizes(0, 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java
----------------------------------------------------------------------
diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java
index 782d80d..feb7b6c 100644
--- a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,7 +19,6 @@ package org.apache.ratis.util;
 
 import org.junit.Test;
 
-import java.sql.Time;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -32,12 +31,12 @@ import static org.junit.Assert.assertNotNull;
 
 public class TestTimeDuration {
   @Test(timeout = 1000)
-  public void testTimeDuration() {
+  public void testAbbreviation() {
     Arrays.asList(TimeUnit.values())
         .forEach(a -> assertNotNull(Abbreviation.valueOf(a.name())));
     assertEquals(TimeUnit.values().length, Abbreviation.values().length);
 
-    final List<String> allSymbols = Arrays.asList(Abbreviation.values()).stream()
+    final List<String> allSymbols = Arrays.stream(Abbreviation.values())
         .map(Abbreviation::getSymbols)
         .flatMap(List::stream)
         .collect(Collectors.toList());
@@ -45,7 +44,10 @@ public class TestTimeDuration {
         allSymbols.stream()
             .map(s -> "0" + s)
             .forEach(s -> assertEquals(s, 0L, parse(s, unit))));
+  }
 
+  @Test(timeout = 1000)
+  public void testParse() {
     assertEquals(1L, parse("1000000 ns", TimeUnit.MILLISECONDS));
     assertEquals(10L, parse("10000000 nanos", TimeUnit.MILLISECONDS));
     assertEquals(100L, parse("100000000 nanosecond", TimeUnit.MILLISECONDS));
@@ -97,4 +99,59 @@ public class TestTimeDuration {
     assertEquals(nanosPerSecond, oneSecond.roundUp(nanosPerSecond));
     assertEquals(2*nanosPerSecond, oneSecond.roundUp(nanosPerSecond + 1));
   }
+
+  @Test(timeout = 1000)
+  public void testTo() {
+    final TimeDuration oneSecond = TimeDuration.valueOf(1, TimeUnit.SECONDS);
+    assertTo(1000, oneSecond, TimeUnit.MILLISECONDS);
+    final TimeDuration nanos = assertTo(1_000_000_000, oneSecond, TimeUnit.NANOSECONDS);
+    assertTo(1000, nanos, TimeUnit.MILLISECONDS);
+
+    assertTo(0, oneSecond, TimeUnit.MINUTES);
+    assertTo(0, nanos, TimeUnit.MINUTES);
+
+    final TimeDuration millis = TimeDuration.valueOf(1_999, TimeUnit.MILLISECONDS);
+    assertTo(1, millis, TimeUnit.SECONDS);
+    assertTo(0, millis, TimeUnit.MINUTES);
+  }
+
+  static TimeDuration assertTo(long expected, TimeDuration timeDuration, TimeUnit toUnit) {
+    final TimeDuration computed = timeDuration.to(toUnit);
+    assertEquals(expected, computed.getDuration());
+    assertEquals(toUnit, computed.getUnit());
+    return computed;
+  }
+
+  @Test(timeout = 1000)
+  public void testMinus() {
+    final TimeDuration oneSecond = TimeDuration.valueOf(1, TimeUnit.SECONDS);
+    final TimeDuration tenSecond = TimeDuration.valueOf(10, TimeUnit.SECONDS);
+    {
+      final TimeDuration d = oneSecond.minus(oneSecond);
+      assertEquals(0, d.getDuration());
+      assertEquals(TimeUnit.SECONDS, d.getUnit());
+    }
+    {
+      final TimeDuration d = tenSecond.minus(oneSecond);
+      assertEquals(9, d.getDuration());
+      assertEquals(TimeUnit.SECONDS, d.getUnit());
+    }
+    {
+      final TimeDuration d = oneSecond.minus(tenSecond);
+      assertEquals(-9, d.getDuration());
+      assertEquals(TimeUnit.SECONDS, d.getUnit());
+    }
+
+    final TimeDuration oneMS = TimeDuration.valueOf(1, TimeUnit.MILLISECONDS);
+    {
+      final TimeDuration d = oneSecond.minus(oneMS);
+      assertEquals(999, d.getDuration());
+      assertEquals(TimeUnit.MILLISECONDS, d.getUnit());
+    }
+    {
+      final TimeDuration d = oneMS.minus(oneSecond);
+      assertEquals(-999, d.getDuration());
+      assertEquals(TimeUnit.MILLISECONDS, d.getUnit());
+    }
+  }
 }