You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/12/06 18:39:03 UTC
incubator-ratis git commit: RATIS-447. LogAppender should times out
if readStateMachineData takes a long time.
Repository: incubator-ratis
Updated Branches:
refs/heads/master 27a8cbb49 -> bef9a72e3
RATIS-447. LogAppender should times out if readStateMachineData takes a long time.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/bef9a72e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/bef9a72e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/bef9a72e
Branch: refs/heads/master
Commit: bef9a72e32ee93adf3d532c2397e01e098afff59
Parents: 27a8cbb
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
Authored: Thu Dec 6 10:38:38 2018 -0800
Committer: Tsz Wo Nicholas Sze <sz...@apache.org>
Committed: Thu Dec 6 10:38:38 2018 -0800
----------------------------------------------------------------------
.../org/apache/ratis/util/CollectionUtils.java | 8 +
.../java/org/apache/ratis/util/DataQueue.java | 142 +++++++++++++++
.../org/apache/ratis/util/TimeDuration.java | 24 ++-
.../java/org/apache/ratis/util/Timestamp.java | 19 +-
.../function/CheckedFunctionWithTimeout.java | 32 ++++
.../apache/ratis/util/function/TriConsumer.java | 28 +++
.../java/org/apache/ratis/TestBatchAppend.java | 169 ------------------
.../apache/ratis/grpc/server/GrpcService.java | 4 +-
.../hadooprpc/TestLogAppenderWithHadoopRpc.java | 25 +++
.../ratis/server/RaftServerConfigKeys.java | 28 +--
.../apache/ratis/server/impl/LogAppender.java | 98 +++--------
.../apache/ratis/server/impl/ServerState.java | 2 +-
.../apache/ratis/server/storage/RaftLog.java | 20 ++-
.../ratis/server/storage/SegmentedRaftLog.java | 2 +-
.../java/org/apache/ratis/LogAppenderTests.java | 151 ++++++++++++++++
.../java/org/apache/ratis/RaftBasicTests.java | 1 -
.../org/apache/ratis/RaftExceptionBaseTest.java | 2 +-
.../SimpleStateMachine4Testing.java | 4 +-
.../ratis/grpc/TestLogAppenderWithGrpc.java | 25 +++
.../ratis/netty/TestLogAppenderWithNetty.java | 25 +++
.../TestLogAppenderWithSimulatedRpc.java | 25 +++
.../org/apache/ratis/util/TestDataQueue.java | 172 +++++++++++++++++++
.../org/apache/ratis/util/TestTimeDuration.java | 65 ++++++-
23 files changed, 800 insertions(+), 271 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
index 57222a6..cb49847 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
@@ -28,6 +28,14 @@ import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
public interface CollectionUtils {
+ static <T> T min(T left, T right, Comparator<T> comparator) {
+ return comparator.compare(left, right) < 0? left: right;
+ }
+
+ static <T extends Comparable<T>> T min(T left, T right) {
+ return min(left, right, Comparator.naturalOrder());
+ }
+
/**
* @return the next element in the iteration right after the given element;
* if the given element is not in the iteration, return the first one
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java b/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
new file mode 100644
index 0000000..d7819cf
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.ratis.util.function.CheckedFunctionWithTimeout;
+import org.apache.ratis.util.function.TriConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.ToIntFunction;
+
+/**
+ * A queue for data elements
+ * such that the queue imposes limits on both number of elements and the data size in bytes.
+ *
+ * Null element is NOT supported.
+ *
+ * This class is NOT threadsafe.
+ */
+public class DataQueue<E> {
+ public static final Logger LOG = LoggerFactory.getLogger(DataQueue.class);
+
+ private final Object name;
+ private final int byteLimit;
+ private final int elementLimit;
+ private final ToIntFunction<E> getNumBytes;
+
+ private final Queue<E> q;
+
+ private int numBytes = 0;
+
+ public DataQueue(Object name, SizeInBytes byteLimit, int elementLimit, ToIntFunction<E> getNumBytes) {
+ this.name = name != null? name: this;
+ this.byteLimit = byteLimit.getSizeInt();
+ this.elementLimit = elementLimit;
+ this.getNumBytes = getNumBytes;
+ this.q = new ArrayDeque<>(elementLimit);
+ }
+
+ public int getNumBytes() {
+ return numBytes;
+ }
+
+ public int getNumElements() {
+ return q.size();
+ }
+
+ public final boolean isEmpty() {
+ return getNumElements() == 0;
+ }
+
+ public void clear() {
+ q.clear();
+ numBytes = 0;
+ }
+
+ /**
+ * Adds an element to this queue.
+ *
+ * @return true if the element is added successfully;
+ * otherwise, the element is not added, return false.
+ */
+ public boolean offer(E element) {
+ Objects.requireNonNull(element, "element == null");
+ if (elementLimit > 0 && q.size() >= elementLimit) {
+ return false;
+ }
+ final int elementNumBytes = getNumBytes.applyAsInt(element);
+ Preconditions.assertTrue(elementNumBytes >= 0,
+ () -> name + ": elementNumBytes = " + elementNumBytes + " < 0");
+ if (byteLimit > 0) {
+ Preconditions.assertTrue(elementNumBytes <= byteLimit,
+ () -> name + ": elementNumBytes = " + elementNumBytes + " > byteLimit = " + byteLimit);
+ if (numBytes > byteLimit - elementNumBytes) {
+ return false;
+ }
+ }
+ q.offer(element);
+ numBytes += elementNumBytes;
+ return true;
+ }
+
+ /** Poll a list of the results within the given timeout. */
+ public <RESULT, THROWABLE extends Throwable> List<RESULT> pollList(long timeoutMs,
+ CheckedFunctionWithTimeout<E, RESULT, THROWABLE> getResult,
+ TriConsumer<E, TimeDuration, TimeoutException> timeoutHandler) throws THROWABLE {
+ if (timeoutMs <= 0 || q.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ final Timestamp startTime = Timestamp.currentTime();
+ final TimeDuration limit = TimeDuration.valueOf(timeoutMs, TimeUnit.MILLISECONDS);
+ for(final List<RESULT> results = new ArrayList<>();;) {
+ final E peeked = q.peek();
+ if (peeked == null) { // q is empty
+ return results;
+ }
+
+ final TimeDuration remaining = limit.minus(startTime.elapsedTime());
+ try {
+ results.add(getResult.apply(peeked, remaining));
+ } catch (TimeoutException e) {
+ Optional.ofNullable(timeoutHandler).ifPresent(h -> h.accept(peeked, remaining, e));
+ return results;
+ }
+
+ final E polled = poll();
+ Preconditions.assertTrue(polled == peeked);
+ }
+ }
+
+ /** Poll out the head element from this queue. */
+ public E poll() {
+ final E polled = q.poll();
+ numBytes -= getNumBytes.applyAsInt(polled);
+ return polled;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
index 7daa4dd..41ba1c6 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
@@ -118,16 +118,38 @@ public class TimeDuration implements Comparable<TimeDuration> {
return unit;
}
+ /**
+ * Convert this {@link TimeDuration} to a long in the target unit.
+ * Note that the returned value may be truncated or saturated; see {@link TimeUnit#convert(long, TimeUnit)}.*
+ *
+ * @return the value in the target unit.
+ */
public long toLong(TimeUnit targetUnit) {
return targetUnit.convert(duration, unit);
}
+ /**
+ * The same as Math.toIntExact(toLong(targetUnit));
+ * Similar to {@link #toLong(TimeUnit)}, the returned value may be truncated.
+ * However, the returned value is never saturated. The method throws {@link ArithmeticException} if it overflows.
+ *
+ * @return the value in the target unit.
+ * @throws ArithmeticException if it overflows.
+ */
public int toInt(TimeUnit targetUnit) {
return Math.toIntExact(toLong(targetUnit));
}
+ /** @return the {@link TimeDuration} in the target unit. */
public TimeDuration to(TimeUnit targetUnit) {
- return valueOf(toLong(targetUnit), targetUnit);
+ return this.unit == targetUnit? this: valueOf(toLong(targetUnit), targetUnit);
+ }
+
+ /** @return (this - that) in the minimum unit among them. */
+ public TimeDuration minus(TimeDuration that) {
+ Objects.requireNonNull(that, "that == null");
+ final TimeUnit minUnit = CollectionUtils.min(this.unit, that.unit);
+ return valueOf(this.toLong(minUnit) - that.toLong(minUnit), minUnit);
}
/** Round up to the given nanos to nearest multiple (in nanoseconds) of this {@link TimeDuration}. */
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java b/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java
index 8ab3f6b..c33a864 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/Timestamp.java
@@ -17,6 +17,8 @@
*/
package org.apache.ratis.util;
+import java.util.concurrent.TimeUnit;
+
/**
* Use {@link System#nanoTime()} as timestamps.
*
@@ -39,6 +41,11 @@ public class Timestamp implements Comparable<Timestamp> {
return System.nanoTime();
}
+ /** @return a {@link Timestamp} for the current time. */
+ public static Timestamp currentTime() {
+ return valueOf(currentTimeNanos());
+ }
+
/** @return the latest timestamp. */
public static Timestamp latest(Timestamp a, Timestamp b) {
return a.compareTo(b) > 0? a: b;
@@ -66,8 +73,7 @@ public class Timestamp implements Comparable<Timestamp> {
/**
* @return the elapsed time in milliseconds.
- * If the timestamp is a future time,
- * this method returns a negative value.
+ * If the timestamp is a future time, the returned value is negative.
*/
public long elapsedTimeMs() {
final long d = System.nanoTime() - nanos;
@@ -75,6 +81,15 @@ public class Timestamp implements Comparable<Timestamp> {
}
/**
+ * @return the elapsed time in nanoseconds.
+ * If the timestamp is a future time, the returned value is negative.
+ */
+ public TimeDuration elapsedTime() {
+ final long d = System.nanoTime() - nanos;
+ return TimeDuration.valueOf(d, TimeUnit.NANOSECONDS);
+ }
+
+ /**
* Compare two timestamps, t0 (this) and t1 (that).
* This method uses {@code t0 - t1 < 0}, not {@code t0 < t1},
* in order to take care the possibility of numerical overflow.
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedFunctionWithTimeout.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedFunctionWithTimeout.java b/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedFunctionWithTimeout.java
new file mode 100644
index 0000000..fddfab2
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/function/CheckedFunctionWithTimeout.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util.function;
+
+import org.apache.ratis.util.TimeDuration;
+
+import java.util.concurrent.TimeoutException;
+
+/** Function with a timeout and a throws-clause. */
+@FunctionalInterface
+public interface CheckedFunctionWithTimeout<INPUT, OUTPUT, THROWABLE extends Throwable> {
+ /**
+ * The same as {@link org.apache.ratis.util.CheckedFunction#apply(Object)}
+ * except that this method has a timeout parameter and throws {@link TimeoutException}.
+ */
+ OUTPUT apply(INPUT input, TimeDuration timeout) throws TimeoutException, THROWABLE;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-common/src/main/java/org/apache/ratis/util/function/TriConsumer.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/function/TriConsumer.java b/ratis-common/src/main/java/org/apache/ratis/util/function/TriConsumer.java
new file mode 100644
index 0000000..a3cd283
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/function/TriConsumer.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util.function;
+
+/** Consumer with three input parameters. */
+@FunctionalInterface
+public interface TriConsumer<T, U, V> {
+ /**
+ * The same as {@link java.util.function.BiConsumer#accept(Object, Object)}}
+ * except that this method is declared with three parameters.
+ */
+ void accept(T t, U u, V v);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-examples/src/test/java/org/apache/ratis/TestBatchAppend.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/TestBatchAppend.java b/ratis-examples/src/test/java/org/apache/ratis/TestBatchAppend.java
deleted file mode 100644
index 7233e8f..0000000
--- a/ratis-examples/src/test/java/org/apache/ratis/TestBatchAppend.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis;
-
-import org.apache.log4j.Level;
-import org.apache.ratis.RaftTestUtil.SimpleMessage;
-import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.examples.ParameterizedBaseTest;
-import org.apache.ratis.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.ServerState;
-import org.apache.ratis.server.storage.RaftLog;
-import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
-import org.apache.ratis.statemachine.StateMachine;
-import org.apache.ratis.util.LogUtils;
-import org.apache.ratis.util.SizeInBytes;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.EnumMap;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/**
- * Enable raft.server.log.appender.batch.enabled and test LogAppender
- */
-@RunWith(Parameterized.class)
-public class TestBatchAppend extends BaseTest {
- static {
- LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
- LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
- }
-
- @Parameterized.Parameters
- public static Collection<Object[]> data() throws IOException {
- RaftProperties prop = new RaftProperties();
- prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
- SimpleStateMachine4Testing.class, StateMachine.class);
- RaftServerConfigKeys.Log.setSegmentSizeMax(prop, SizeInBytes.valueOf("8KB"));
- // enable batch appending
- RaftServerConfigKeys.Log.Appender.setBatchEnabled(prop, true);
- // set batch appending buffer size to 4KB
- RaftServerConfigKeys.Log.Appender.setBufferCapacity(prop, SizeInBytes.valueOf("8KB"));
-
- return ParameterizedBaseTest.getMiniRaftClusters(prop, 3);
- }
-
- @Parameterized.Parameter
- public MiniRaftCluster cluster;
-
- @After
- public void tearDown() {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
-
- private class Sender extends Thread {
- private final RaftClient client;
- private final CountDownLatch latch;
- private final SimpleMessage[] msgs;
- private final AtomicBoolean succeed = new AtomicBoolean(false);
-
- Sender(RaftPeerId leaderId, CountDownLatch latch, int numMsg) {
- this.latch = latch;
- this.client = cluster.createClient(leaderId);
- msgs = generateMsgs(numMsg);
- }
-
- SimpleMessage[] generateMsgs(int num) {
- SimpleMessage[] msgs = new SimpleMessage[num * 6];
- for (int i = 0; i < num; i++) {
- for (int j = 0; j < 6; j++) {
- byte[] bytes = new byte[1024 * (j + 1)];
- Arrays.fill(bytes, (byte) (j + '0'));
- msgs[i * 6 + j] = new SimpleMessage(new String(bytes));
- }
- }
- return msgs;
- }
-
- @Override
- public void run() {
- try {
- latch.await();
- } catch (InterruptedException ignored) {
- LOG.warn("Client {} waiting for countdown latch got interrupted",
- client.getId());
- }
- for (SimpleMessage msg : msgs) {
- try {
- client.send(msg);
- } catch (IOException e) {
- succeed.set(false);
- LOG.warn("Client {} hit exception {}", client.getId(), e);
- return;
- }
- }
- succeed.set(true);
- try {
- client.close();
- } catch (IOException ignore) {
- }
- }
- }
-
- @Test
- public void testAppend() throws Exception {
- final int numMsgs = 10;
- final int numClients = 5;
- cluster.start();
- RaftTestUtil.waitForLeader(cluster);
- final RaftPeerId leaderId = cluster.getLeader().getId();
-
- // start several clients and write concurrently
- CountDownLatch latch = new CountDownLatch(1);
- final List<Sender> senders = Stream.iterate(0, i -> i+1).limit(numClients)
- .map(i -> new Sender(leaderId, latch, numMsgs))
- .collect(Collectors.toList());
- senders.forEach(Thread::start);
-
- latch.countDown();
-
- for (Sender s : senders) {
- s.join();
- Assert.assertTrue(s.succeed.get());
- }
-
- final ServerState leaderState = cluster.getLeader().getState();
- final RaftLog leaderLog = leaderState.getLog();
- final EnumMap<LogEntryBodyCase, AtomicLong> counts = RaftTestUtil.countEntries(leaderLog);
- LOG.info("counts = " + counts);
- Assert.assertEquals(6 * numMsgs * numClients, counts.get(LogEntryBodyCase.STATEMACHINELOGENTRY).get());
-
- final LogEntryProto lastStateMachineEntry = RaftTestUtil.getLastEntry(LogEntryBodyCase.STATEMACHINELOGENTRY, leaderLog);
- LOG.info("lastStateMachineEntry = " + lastStateMachineEntry);
- Assert.assertTrue(lastStateMachineEntry.getIndex() <= leaderState.getLastAppliedIndex());
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
index 9c94cca..4bd370f 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -67,7 +67,7 @@ public class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocolClient
this(server, server::getId,
GrpcConfigKeys.Server.port(server.getProperties()),
GrpcConfigKeys.messageSizeMax(server.getProperties(), LOG::info),
- RaftServerConfigKeys.Log.Appender.bufferCapacity(server.getProperties()),
+ RaftServerConfigKeys.Log.Appender.bufferByteLimit(server.getProperties()),
GrpcConfigKeys.flowControlWindow(server.getProperties(), LOG::info),
RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties()));
}
@@ -78,7 +78,7 @@ public class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocolClient
p -> new GrpcServerProtocolClient(p, flowControlWindow.getSizeInt(), requestTimeoutDuration)));
if (appenderBufferSize.getSize() > grpcMessageSizeMax.getSize()) {
throw new IllegalArgumentException("Illegal configuration: "
- + RaftServerConfigKeys.Log.Appender.BUFFER_CAPACITY_KEY + " = " + appenderBufferSize
+ + RaftServerConfigKeys.Log.Appender.BUFFER_BYTE_LIMIT_KEY + " = " + appenderBufferSize
+ " > " + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + " = " + grpcMessageSizeMax);
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestLogAppenderWithHadoopRpc.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestLogAppenderWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestLogAppenderWithHadoopRpc.java
new file mode 100644
index 0000000..48489ad
--- /dev/null
+++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestLogAppenderWithHadoopRpc.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc;
+
+import org.apache.ratis.LogAppenderTests;
+
+public class TestLogAppenderWithHadoopRpc
+ extends LogAppenderTests<MiniRaftClusterWithHadoopRpc>
+ implements MiniRaftClusterWithHadoopRpc.Factory.Get {
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 25d4b0c..32f5752 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -227,23 +227,25 @@ public interface RaftServerConfigKeys {
interface Appender {
String PREFIX = Log.PREFIX + ".appender";
- String BUFFER_CAPACITY_KEY = PREFIX + ".buffer.capacity";
- SizeInBytes BUFFER_CAPACITY_DEFAULT =SizeInBytes.valueOf("4MB");
- static SizeInBytes bufferCapacity(RaftProperties properties) {
- return getSizeInBytes(properties::getSizeInBytes,
- BUFFER_CAPACITY_KEY, BUFFER_CAPACITY_DEFAULT, getDefaultLog());
+ String BUFFER_ELEMENT_LIMIT_KEY = PREFIX + ".buffer.element-limit";
+ /** 0 means no limit. */
+ int BUFFER_ELEMENT_LIMIT_DEFAULT = 0;
+ static int bufferElementLimit(RaftProperties properties) {
+ return getInt(properties::getInt,
+ BUFFER_ELEMENT_LIMIT_KEY, BUFFER_ELEMENT_LIMIT_DEFAULT, getDefaultLog(), requireMin(0));
}
- static void setBufferCapacity(RaftProperties properties, SizeInBytes bufferCapacity) {
- setSizeInBytes(properties::set, BUFFER_CAPACITY_KEY, bufferCapacity);
+ static void setBufferElementLimit(RaftProperties properties, int bufferElementLimit) {
+ setInt(properties::setInt, BUFFER_ELEMENT_LIMIT_KEY, bufferElementLimit);
}
- String BATCH_ENABLED_KEY = PREFIX + ".batch.enabled";
- boolean BATCH_ENABLED_DEFAULT = false;
- static boolean batchEnabled(RaftProperties properties) {
- return getBoolean(properties::getBoolean, BATCH_ENABLED_KEY, BATCH_ENABLED_DEFAULT, getDefaultLog());
+ String BUFFER_BYTE_LIMIT_KEY = PREFIX + ".buffer.byte-limit";
+ SizeInBytes BUFFER_BYTE_LIMIT_DEFAULT = SizeInBytes.valueOf("4MB");
+ static SizeInBytes bufferByteLimit(RaftProperties properties) {
+ return getSizeInBytes(properties::getSizeInBytes,
+ BUFFER_BYTE_LIMIT_KEY, BUFFER_BYTE_LIMIT_DEFAULT, getDefaultLog());
}
- static void setBatchEnabled(RaftProperties properties, boolean batchEnabled) {
- setBoolean(properties::setBoolean, BATCH_ENABLED_KEY, batchEnabled);
+ static void setBufferByteLimit(RaftProperties properties, SizeInBytes bufferByteLimit) {
+ setSizeInBytes(properties::set, BUFFER_BYTE_LIMIT_KEY, bufferByteLimit);
}
String SNAPSHOT_CHUNK_SIZE_MAX_KEY = PREFIX + ".snapshot.chunk.size.max";
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index 6cb8538..633496e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -58,9 +58,8 @@ public class LogAppender {
private final LeaderState leaderState;
protected final RaftLog raftLog;
protected final FollowerInfo follower;
- private final int maxBufferSize;
- private final boolean batchSending;
- private final LogEntryBuffer buffer;
+
+ private final DataQueue<EntryWithData> buffer;
private final int snapshotChunkMaxSize;
protected final long halfMinTimeoutMs;
@@ -74,12 +73,12 @@ public class LogAppender {
this.raftLog = server.getState().getLog();
final RaftProperties properties = server.getProxy().getProperties();
- this.maxBufferSize = RaftServerConfigKeys.Log.Appender.bufferCapacity(properties).getSizeInt();
- this.batchSending = RaftServerConfigKeys.Log.Appender.batchEnabled(properties);
this.snapshotChunkMaxSize = RaftServerConfigKeys.Log.Appender.snapshotChunkSizeMax(properties).getSizeInt();
this.halfMinTimeoutMs = server.getMinTimeoutMs() / 2;
- this.buffer = new LogEntryBuffer();
+ final SizeInBytes bufferByteLimit = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
+ final int bufferElementLimit = RaftServerConfigKeys.Log.Appender.bufferElementLimit(properties);
+ this.buffer = new DataQueue<>(this, bufferByteLimit, bufferElementLimit, EntryWithData::getSerializedSize);
this.lifeCycle = new LifeCycle(this);
}
@@ -133,51 +132,6 @@ public class LogAppender {
return getFollower().getPeer().getId();
}
- /**
- * A buffer for log entries with size limitation.
- */
- private class LogEntryBuffer {
- private final List<EntryWithData> buf = new ArrayList<>();
- private int totalSize = 0;
-
- /**
- * Adds a log entry to the Log entry buffer.
- * Checks if enough space is available before adding the entry to the buffer.
- * @return true if the entry is added successfully;
- * otherwise, the entry is not added, return false.
- */
- boolean addEntry(EntryWithData entry) {
- final int entrySize = entry.getSerializedSize();
- if (totalSize + entrySize <= maxBufferSize) {
- buf.add(entry);
- totalSize += entrySize;
- return true;
- }
- return false;
- }
-
- boolean isEmpty() {
- return buf.isEmpty();
- }
-
- AppendEntriesRequestProto getAppendRequest(TermIndex previous, long callId) throws RaftLogIOException {
- final List<LogEntryProto> protos = new ArrayList<>();
- // Wait for all the log entry futures to complete and then create a list of LogEntryProtos.
- for (EntryWithData bufEntry : buf) {
- protos.add(bufEntry.getEntry());
- }
- final AppendEntriesRequestProto request = leaderState.newAppendEntriesRequestProto(
- getFollowerId(), previous, protos, !follower.isAttendingVote(), callId);
- buf.clear();
- totalSize = 0;
- return request;
- }
-
- int getPendingEntryNum() {
- return buf.size();
- }
- }
-
private TermIndex getPrevious() {
TermIndex previous = raftLog.getTermIndex(follower.getNextIndex() - 1);
if (previous == null) {
@@ -194,28 +148,29 @@ public class LogAppender {
protected AppendEntriesRequestProto createRequest(long callId) throws RaftLogIOException {
final TermIndex previous = getPrevious();
+ final long heartbeatRemainingMs = getHeartbeatRemainingTime();
+ if (heartbeatRemainingMs <= 0L) {
+ return leaderState.newAppendEntriesRequestProto(
+ getFollowerId(), previous, Collections.emptyList(), !follower.isAttendingVote(), callId);
+ }
+
+ Preconditions.assertTrue(buffer.isEmpty(), () -> "buffer has " + buffer.getNumElements() + " elements.");
+
final long leaderNext = raftLog.getNextIndex();
- long next = follower.getNextIndex() + buffer.getPendingEntryNum();
- final boolean toSend;
-
- if (leaderNext == next && !buffer.isEmpty()) {
- // no new entries, then send out the entries in the buffer
- toSend = true;
- } else if (leaderNext > next) {
- boolean hasSpace = true;
- for(; hasSpace && leaderNext > next;) {
- hasSpace = buffer.addEntry(raftLog.getEntryWithData(next++));
+ for (long next = follower.getNextIndex(); leaderNext > next; ) {
+ if (!buffer.offer(raftLog.getEntryWithData(next++))) {
+ break;
}
- // buffer is full or batch sending is disabled, send out a request
- toSend = !hasSpace || !batchSending;
- } else {
- toSend = false;
}
-
- if (toSend || shouldHeartbeat()) {
- return buffer.getAppendRequest(previous, callId);
+ if (buffer.isEmpty()) {
+ return null;
}
- return null;
+
+ final List<LogEntryProto> protos = buffer.pollList(heartbeatRemainingMs, EntryWithData::getEntry,
+ (entry, time, exception) -> LOG.warn(this + ": Failed get " + entry + " in " + time, exception));
+ buffer.clear();
+ return leaderState.newAppendEntriesRequestProto(
+ getFollowerId(), previous, protos, !follower.isAttendingVote(), callId);
}
/** Send an appendEntries RPC; retry indefinitely. */
@@ -442,8 +397,7 @@ public class LogAppender {
}
}
}
- if (isAppenderRunning() && !shouldAppendEntries(
- follower.getNextIndex() + buffer.getPendingEntryNum())) {
+ if (isAppenderRunning() && !shouldAppendEntries(follower.getNextIndex())) {
final long waitTime = getHeartbeatRemainingTime();
if (waitTime > 0) {
synchronized (this) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index ee5218d..7dfc331 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -182,7 +182,7 @@ public class ServerState implements Closeable {
final RaftLog log;
if (RaftServerConfigKeys.Log.useMemory(prop)) {
final int maxBufferSize =
- RaftServerConfigKeys.Log.Appender.bufferCapacity(prop).getSizeInt();
+ RaftServerConfigKeys.Log.Appender.bufferByteLimit(prop).getSizeInt();
log = new MemoryRaftLog(id, lastIndexInSnapshot, maxBufferSize);
} else {
log = new SegmentedRaftLog(id, server, this.storage,
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
index 8478617..155122e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -31,6 +31,7 @@ import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.OpenCloseState;
import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +40,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
@@ -412,18 +414,25 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
this.future = future;
}
+ public long getIndex() {
+ return logEntry.getIndex();
+ }
+
public int getSerializedSize() {
return ServerProtoUtils.getSerializedSize(logEntry);
}
- public LogEntryProto getEntry() throws RaftLogIOException {
+ public LogEntryProto getEntry(TimeDuration timeout) throws RaftLogIOException, TimeoutException {
LogEntryProto entryProto;
if (future == null) {
return logEntry;
}
try {
- entryProto = future.thenApply(data -> ServerProtoUtils.addStateMachineData(data, logEntry)).join();
+ entryProto = future.thenApply(data -> ServerProtoUtils.addStateMachineData(data, logEntry))
+ .get(timeout.getDuration(), timeout.getUnit());
+ } catch (TimeoutException t) {
+ throw t;
} catch (Throwable t) {
final String err = selfId + ": Failed readStateMachineData for " +
ServerProtoUtils.toLogEntryString(logEntry);
@@ -440,5 +449,10 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
}
return entryProto;
}
+
+ @Override
+ public String toString() {
+ return ServerProtoUtils.toLogEntryString(logEntry);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
index d23e0a5..f5a7330 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
@@ -111,7 +111,7 @@ public class SegmentedRaftLog extends RaftLog {
SegmentedRaftLog(RaftPeerId selfId, RaftServerImpl server,
StateMachine stateMachine, Runnable submitUpdateCommitEvent,
RaftStorage storage, long lastIndexInSnapshot, RaftProperties properties) {
- super(selfId, lastIndexInSnapshot, RaftServerConfigKeys.Log.Appender.bufferCapacity(properties).getSizeInt());
+ super(selfId, lastIndexInSnapshot, RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties).getSizeInt());
this.server = Optional.ofNullable(server);
this.storage = storage;
segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
new file mode 100644
index 0000000..c8ddc0d
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.LogAppender;
+import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.impl.ServerState;
+import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.LogUtils;
+import org.apache.ratis.util.SizeInBytes;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public abstract class LogAppenderTests<CLUSTER extends MiniRaftCluster>
+ extends BaseTest
+ implements MiniRaftCluster.Factory.Get<CLUSTER> {
+ {
+ LogUtils.setLogLevel(LogAppender.LOG, Level.DEBUG);
+ }
+
+ {
+ final RaftProperties prop = getProperties();
+ prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SimpleStateMachine4Testing.class, StateMachine.class);
+
+ final SizeInBytes n = SizeInBytes.valueOf("8KB");
+ RaftServerConfigKeys.Log.setSegmentSizeMax(prop, n);
+ RaftServerConfigKeys.Log.Appender.setBufferByteLimit(prop, n);
+ }
+
+ static SimpleMessage[] generateMsgs(int num) {
+ SimpleMessage[] msgs = new SimpleMessage[num * 6];
+ for (int i = 0; i < num; i++) {
+ for (int j = 0; j < 6; j++) {
+ byte[] bytes = new byte[1024 * (j + 1)];
+ Arrays.fill(bytes, (byte) (j + '0'));
+ msgs[i * 6 + j] = new SimpleMessage(new String(bytes));
+ }
+ }
+ return msgs;
+ }
+
+ private static class Sender extends Thread {
+ private final RaftClient client;
+ private final CountDownLatch latch;
+ private final SimpleMessage[] messages;
+ private final AtomicBoolean succeed = new AtomicBoolean(false);
+ private final AtomicReference<Exception> exception = new AtomicReference<>();
+
+ Sender(RaftClient client, int numMessages, CountDownLatch latch) {
+ this.latch = latch;
+ this.client = client;
+ this.messages = generateMsgs(numMessages);
+ }
+
+ @Override
+ public void run() {
+ try {
+ latch.await();
+ for (SimpleMessage msg : messages) {
+ client.send(msg);
+ }
+ client.close();
+ succeed.set(true);
+ } catch (Exception e) {
+ exception.compareAndSet(null, e);
+ }
+ }
+ }
+
+ @Test
+ public void testSingleElementBuffer() throws Exception {
+ RaftServerConfigKeys.Log.Appender.setBufferElementLimit(getProperties(), 1);
+ runWithNewCluster(3, this::runTest);
+ }
+
+ @Test
+ public void testUnlimitedElementBuffer() throws Exception {
+ RaftServerConfigKeys.Log.Appender.setBufferElementLimit(getProperties(), 0);
+ runWithNewCluster(3, this::runTest);
+ }
+
+ void runTest(CLUSTER cluster) throws Exception {
+ final int numMsgs = 10;
+ final int numClients = 5;
+ final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
+
+ // start several clients and write concurrently
+ final CountDownLatch latch = new CountDownLatch(1);
+ final List<Sender> senders = Stream.iterate(0, i -> i+1).limit(numClients)
+ .map(i -> new Sender(cluster.createClient(leaderId), numMsgs, latch))
+ .collect(Collectors.toList());
+ senders.forEach(Thread::start);
+
+ latch.countDown();
+
+ for (Sender s : senders) {
+ s.join();
+ final Exception e = s.exception.get();
+ if (e != null) {
+ throw e;
+ }
+ Assert.assertTrue(s.succeed.get());
+ }
+
+ final ServerState leaderState = cluster.getLeader().getState();
+ final RaftLog leaderLog = leaderState.getLog();
+ final EnumMap<LogEntryBodyCase, AtomicLong> counts = RaftTestUtil.countEntries(leaderLog);
+ LOG.info("counts = " + counts);
+ Assert.assertEquals(6 * numMsgs * numClients, counts.get(LogEntryBodyCase.STATEMACHINELOGENTRY).get());
+
+ final LogEntryProto last = RaftTestUtil.getLastEntry(LogEntryBodyCase.STATEMACHINELOGENTRY, leaderLog);
+ LOG.info("last = " + ServerProtoUtils.toLogEntryString(last));
+ Assert.assertNotNull(last);
+ Assert.assertTrue(last.getIndex() <= leaderState.getLastAppliedIndex());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 90cc627..a21796f 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -61,7 +61,6 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
{
LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
LogUtils.setLogLevel(RaftServerTestUtil.getStateMachineUpdaterLog(), Level.DEBUG);
- LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration.valueOf(5, TimeUnit.SECONDS));
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
index 58e26b2..bf43ba6 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
@@ -55,7 +55,7 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster>
public void setup() throws IOException {
final RaftProperties prop = getProperties();
RaftServerConfigKeys.Log.Appender
- .setBufferCapacity(prop, SizeInBytes.valueOf("4KB"));
+ .setBufferByteLimit(prop, SizeInBytes.valueOf("4KB"));
cluster = newCluster(NUM_PEERS);
cluster.start();
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index d4c4021..6306ce2 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -154,7 +154,9 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
Preconditions.assertNull(previous, "previous");
final String s = entry.getStateMachineLogEntry().getLogData().toStringUtf8();
dataMap.put(s, entry);
- LOG.info("{}: put {}, {} -> {}", getId(), entry.getIndex(), s, ServerProtoUtils.toLogEntryString(entry));
+ LOG.info("{}: put {}, {} -> {}", getId(), entry.getIndex(),
+ s.length() <= 10? s: s.substring(0, 10) + "...",
+ ServerProtoUtils.toLogEntryString(entry));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
new file mode 100644
index 0000000..5918efd
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.LogAppenderTests;
+
+public class TestLogAppenderWithGrpc
+ extends LogAppenderTests<MiniRaftClusterWithGrpc>
+ implements MiniRaftClusterWithGrpc.FactoryGet {
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-test/src/test/java/org/apache/ratis/netty/TestLogAppenderWithNetty.java
----------------------------------------------------------------------
diff --git a/ratis-test/src/test/java/org/apache/ratis/netty/TestLogAppenderWithNetty.java b/ratis-test/src/test/java/org/apache/ratis/netty/TestLogAppenderWithNetty.java
new file mode 100644
index 0000000..85427a7
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/netty/TestLogAppenderWithNetty.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty;
+
+import org.apache.ratis.LogAppenderTests;
+
+public class TestLogAppenderWithNetty
+ extends LogAppenderTests<MiniRaftClusterWithNetty>
+ implements MiniRaftClusterWithNetty.FactoryGet {
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestLogAppenderWithSimulatedRpc.java
----------------------------------------------------------------------
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestLogAppenderWithSimulatedRpc.java b/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestLogAppenderWithSimulatedRpc.java
new file mode 100644
index 0000000..a23ce1d
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestLogAppenderWithSimulatedRpc.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.simulation;
+
+import org.apache.ratis.LogAppenderTests;
+
+public class TestLogAppenderWithSimulatedRpc
+ extends LogAppenderTests<MiniRaftClusterWithSimulatedRpc>
+ implements MiniRaftClusterWithSimulatedRpc.FactoryGet {
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java
----------------------------------------------------------------------
diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java b/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java
new file mode 100644
index 0000000..e465a1d
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.ratis.util.function.TriConsumer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+public class TestDataQueue {
+ static <T> TriConsumer<T, TimeDuration, TimeoutException> getTimeoutHandler(boolean expctedTimeout) {
+ return (element, time, exception) -> {
+ if (!expctedTimeout) {
+ throw new AssertionError("Unexpected timeout to get element " + element + " in " + time, exception);
+ }
+ };
+ }
+
+ private void assertSizes(int expectedNumElements, int expectedNumBytes) {
+ Assert.assertEquals(expectedNumElements, q.getNumElements());
+ Assert.assertEquals(expectedNumBytes, q.getNumBytes());
+ }
+
+ final SizeInBytes byteLimit = SizeInBytes.valueOf(100);
+ final int elementLimit = 5;
+ final DataQueue<Integer> q = new DataQueue<>(null, byteLimit, elementLimit, Integer::intValue);
+
+ @Test(timeout = 1000)
+ public void testElementLimit() {
+ assertSizes(0, 0);
+
+ int numBytes = 0;
+ for (int i = 0; i < elementLimit; i++) {
+ Assert.assertEquals(i, q.getNumElements());
+ Assert.assertEquals(numBytes, q.getNumBytes());
+ final boolean offered = q.offer(i);
+ Assert.assertTrue(offered);
+ numBytes += i;
+ assertSizes(i+1, numBytes);
+ }
+ {
+ final boolean offered = q.offer(0);
+ Assert.assertFalse(offered);
+ assertSizes(elementLimit, numBytes);
+ }
+
+ { // poll all elements
+ final List<Integer> polled = q.pollList(100, (i, timeout) -> i, getTimeoutHandler(false));
+ Assert.assertEquals(elementLimit, polled.size());
+ for (int i = 0; i < polled.size(); i++) {
+ Assert.assertEquals(i, polled.get(i).intValue());
+ }
+ }
+ assertSizes(0, 0);
+ }
+
+ @Test(timeout = 1000)
+ public void testByteLimit() {
+ assertSizes(0, 0);
+
+ try {
+ q.offer(byteLimit.getSizeInt() + 1);
+ Assert.fail();
+ } catch (IllegalStateException ignored) {
+ }
+
+ final int halfBytes = byteLimit.getSizeInt() / 2;
+ {
+ final boolean offered = q.offer(halfBytes);
+ Assert.assertTrue(offered);
+ assertSizes(1, halfBytes);
+ }
+
+ {
+ final boolean offered = q.offer(halfBytes + 1);
+ Assert.assertFalse(offered);
+ assertSizes(1, halfBytes);
+ }
+
+ {
+ final boolean offered = q.offer(halfBytes);
+ Assert.assertTrue(offered);
+ assertSizes(2, byteLimit.getSizeInt());
+ }
+
+ {
+ final boolean offered = q.offer(1);
+ Assert.assertFalse(offered);
+ assertSizes(2, byteLimit.getSizeInt());
+ }
+
+ {
+ final boolean offered = q.offer(0);
+ Assert.assertTrue(offered);
+ assertSizes(3, byteLimit.getSizeInt());
+ }
+
+ { // poll all elements
+ final List<Integer> polled = q.pollList(100, (i, timeout) -> i, getTimeoutHandler(false));
+ Assert.assertEquals(3, polled.size());
+ Assert.assertEquals(halfBytes, polled.get(0).intValue());
+ Assert.assertEquals(halfBytes, polled.get(1).intValue());
+ Assert.assertEquals(0, polled.get(2).intValue());
+ }
+
+ assertSizes(0, 0);
+ }
+
+ @Test(timeout = 1000)
+ public void testTimeout() {
+ assertSizes(0, 0);
+
+ int numBytes = 0;
+ for (int i = 0; i < elementLimit; i++) {
+ Assert.assertEquals(i, q.getNumElements());
+ Assert.assertEquals(numBytes, q.getNumBytes());
+ final boolean offered = q.offer(i);
+ Assert.assertTrue(offered);
+ numBytes += i;
+ assertSizes(i+1, numBytes);
+ }
+
+ { // poll with zero time
+ final List<Integer> polled = q.pollList(0, (i, timeout) -> i, getTimeoutHandler(false));
+ Assert.assertTrue(polled.isEmpty());
+ assertSizes(elementLimit, numBytes);
+ }
+
+ final int halfElements = elementLimit / 2;
+ { // poll with timeout
+ final List<Integer> polled = q.pollList(100, (i, timeout) -> {
+ if (i == halfElements) {
+ // simulate timeout
+ throw new TimeoutException("i=" + i);
+ }
+ return i;
+ }, getTimeoutHandler(true));
+ Assert.assertEquals(halfElements, polled.size());
+ for (int i = 0; i < polled.size(); i++) {
+ Assert.assertEquals(i, polled.get(i).intValue());
+ numBytes -= i;
+ }
+ assertSizes(elementLimit - halfElements, numBytes);
+ }
+
+ { // poll the remaining elements
+ final List<Integer> polled = q.pollList(100, (i, timeout) -> i, getTimeoutHandler(false));
+ Assert.assertEquals(elementLimit - halfElements, polled.size());
+ for (int i = 0; i < polled.size(); i++) {
+ Assert.assertEquals(halfElements + i, polled.get(i).intValue());
+ }
+ }
+ assertSizes(0, 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/bef9a72e/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java
----------------------------------------------------------------------
diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java
index 782d80d..feb7b6c 100644
--- a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeDuration.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,7 +19,6 @@ package org.apache.ratis.util;
import org.junit.Test;
-import java.sql.Time;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -32,12 +31,12 @@ import static org.junit.Assert.assertNotNull;
public class TestTimeDuration {
@Test(timeout = 1000)
- public void testTimeDuration() {
+ public void testAbbreviation() {
Arrays.asList(TimeUnit.values())
.forEach(a -> assertNotNull(Abbreviation.valueOf(a.name())));
assertEquals(TimeUnit.values().length, Abbreviation.values().length);
- final List<String> allSymbols = Arrays.asList(Abbreviation.values()).stream()
+ final List<String> allSymbols = Arrays.stream(Abbreviation.values())
.map(Abbreviation::getSymbols)
.flatMap(List::stream)
.collect(Collectors.toList());
@@ -45,7 +44,10 @@ public class TestTimeDuration {
allSymbols.stream()
.map(s -> "0" + s)
.forEach(s -> assertEquals(s, 0L, parse(s, unit))));
+ }
+ @Test(timeout = 1000)
+ public void testParse() {
assertEquals(1L, parse("1000000 ns", TimeUnit.MILLISECONDS));
assertEquals(10L, parse("10000000 nanos", TimeUnit.MILLISECONDS));
assertEquals(100L, parse("100000000 nanosecond", TimeUnit.MILLISECONDS));
@@ -97,4 +99,59 @@ public class TestTimeDuration {
assertEquals(nanosPerSecond, oneSecond.roundUp(nanosPerSecond));
assertEquals(2*nanosPerSecond, oneSecond.roundUp(nanosPerSecond + 1));
}
+
+ @Test(timeout = 1000)
+ public void testTo() {
+ final TimeDuration oneSecond = TimeDuration.valueOf(1, TimeUnit.SECONDS);
+ assertTo(1000, oneSecond, TimeUnit.MILLISECONDS);
+ final TimeDuration nanos = assertTo(1_000_000_000, oneSecond, TimeUnit.NANOSECONDS);
+ assertTo(1000, nanos, TimeUnit.MILLISECONDS);
+
+ assertTo(0, oneSecond, TimeUnit.MINUTES);
+ assertTo(0, nanos, TimeUnit.MINUTES);
+
+ final TimeDuration millis = TimeDuration.valueOf(1_999, TimeUnit.MILLISECONDS);
+ assertTo(1, millis, TimeUnit.SECONDS);
+ assertTo(0, millis, TimeUnit.MINUTES);
+ }
+
+ static TimeDuration assertTo(long expected, TimeDuration timeDuration, TimeUnit toUnit) {
+ final TimeDuration computed = timeDuration.to(toUnit);
+ assertEquals(expected, computed.getDuration());
+ assertEquals(toUnit, computed.getUnit());
+ return computed;
+ }
+
+ @Test(timeout = 1000)
+ public void testMinus() {
+ final TimeDuration oneSecond = TimeDuration.valueOf(1, TimeUnit.SECONDS);
+ final TimeDuration tenSecond = TimeDuration.valueOf(10, TimeUnit.SECONDS);
+ {
+ final TimeDuration d = oneSecond.minus(oneSecond);
+ assertEquals(0, d.getDuration());
+ assertEquals(TimeUnit.SECONDS, d.getUnit());
+ }
+ {
+ final TimeDuration d = tenSecond.minus(oneSecond);
+ assertEquals(9, d.getDuration());
+ assertEquals(TimeUnit.SECONDS, d.getUnit());
+ }
+ {
+ final TimeDuration d = oneSecond.minus(tenSecond);
+ assertEquals(-9, d.getDuration());
+ assertEquals(TimeUnit.SECONDS, d.getUnit());
+ }
+
+ final TimeDuration oneMS = TimeDuration.valueOf(1, TimeUnit.MILLISECONDS);
+ {
+ final TimeDuration d = oneSecond.minus(oneMS);
+ assertEquals(999, d.getDuration());
+ assertEquals(TimeUnit.MILLISECONDS, d.getUnit());
+ }
+ {
+ final TimeDuration d = oneMS.minus(oneSecond);
+ assertEquals(-999, d.getDuration());
+ assertEquals(TimeUnit.MILLISECONDS, d.getUnit());
+ }
+ }
}