You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2017/08/28 21:04:03 UTC
incubator-ratis git commit: RATIS-112. testRevertConfigurationChange
may fail.
Repository: incubator-ratis
Updated Branches:
refs/heads/master ee262d7e3 -> e46aee2c8
RATIS-112. testRevertConfigurationChange may fail.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/e46aee2c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/e46aee2c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/e46aee2c
Branch: refs/heads/master
Commit: e46aee2c87e85fe7317f2e807cdf97da0be31ddc
Parents: ee262d7
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Mon Aug 28 14:03:24 2017 -0700
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Mon Aug 28 14:03:24 2017 -0700
----------------------------------------------------------------------
.../org/apache/ratis/util/CheckedSupplier.java | 8 +++
.../java/org/apache/ratis/util/JavaUtils.java | 64 ++++++++++++++++++++
.../ratis/server/impl/ServerImplUtils.java | 14 ++++-
.../java/org/apache/ratis/RaftTestUtil.java | 23 -------
.../impl/RaftReconfigurationBaseTest.java | 51 +++++++++-------
.../ratis/server/impl/RaftServerTestUtil.java | 5 +-
.../server/storage/RaftStorageTestUtils.java | 55 +++++++++++++++++
7 files changed, 172 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e46aee2c/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java b/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java
index 0c9de31..06abe4c 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java
@@ -27,4 +27,12 @@ public interface CheckedSupplier<OUTPUT, THROWABLE extends Throwable> {
* except that this method is declared with a throws-clause.
*/
OUTPUT get() throws THROWABLE;
+
+ static <THROWABLE extends Throwable> CheckedSupplier<?, THROWABLE> valueOf(
+ CheckedRunnable<THROWABLE> runnable) {
+ return () -> {
+ runnable.run();
+ return null;
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e46aee2c/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index 0664aec..c78f999 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
+import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Supplier;
@@ -104,4 +105,67 @@ public interface JavaUtils {
static ThreadGroup getRootThreadGroup() {
return ROOT_THREAD_GROUP.get();
}
+
+ /** Attempt to get a return value from the given supplier multiple times. */
+ static <RETURN, THROWABLE extends Throwable> RETURN attempt(
+ CheckedSupplier<RETURN, THROWABLE> supplier,
+ int numAttempts, long sleepMs, String name, Logger log)
+ throws THROWABLE, InterruptedException {
+ Objects.requireNonNull(supplier, "supplier == null");
+ Preconditions.assertTrue(numAttempts > 0, () -> "numAttempts = " + numAttempts + " <= 0");
+ Preconditions.assertTrue(sleepMs >= 0L, () -> "sleepMs = " + sleepMs + " < 0");
+
+ for(int i = 1; i <= numAttempts; i++) {
+ try {
+ return supplier.get();
+ } catch (Throwable t) {
+ if (i == numAttempts) {
+ throw t;
+ }
+ if (log != null && log.isWarnEnabled()) {
+ log.warn("FAILED " + name + " attempt #" + i + "/" + numAttempts
+ + ": " + t + ", sleep " + sleepMs + "ms and then retry.");
+ }
+ }
+
+ if (sleepMs > 0) {
+ Thread.sleep(sleepMs);
+ }
+ }
+ throw new IllegalStateException("BUG: this line should be unreachable.");
+ }
+
+ /** Attempt to run the given op multiple times. */
+ static <THROWABLE extends Throwable> void attempt(
+ CheckedRunnable<THROWABLE> op, int numAttempts, long sleepMs, String name, Logger log)
+ throws THROWABLE, InterruptedException {
+ attempt(CheckedSupplier.valueOf(op), numAttempts, sleepMs, name, log);
+ }
+
+ /** Attempt to wait the given condition to return true multiple times. */
+ static void attempt(
+ BooleanSupplier condition, int numAttempts, long sleepMs, String name, Logger log)
+ throws InterruptedException {
+ Objects.requireNonNull(condition, "condition == null");
+ Preconditions.assertTrue(numAttempts > 0, () -> "numAttempts = " + numAttempts + " <= 0");
+ Preconditions.assertTrue(sleepMs >= 0L, () -> "sleepMs = " + sleepMs + " < 0");
+
+ for(int i = 1; i <= numAttempts; i++) {
+ if (condition.getAsBoolean()) {
+ return;
+ }
+ if (log != null && log.isWarnEnabled()) {
+ log.warn("FAILED " + name + " attempt #" + i + "/" + numAttempts
+ + ": sleep " + sleepMs + "ms and then retry.");
+ }
+ if (sleepMs > 0) {
+ Thread.sleep(sleepMs);
+ }
+ }
+
+ if (!condition.getAsBoolean()) {
+ throw new IllegalStateException("Failed " + name + " for " + numAttempts + " attempts.");
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e46aee2c/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index bcbab9a..544ed13 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -23,6 +23,8 @@ import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.JavaUtils;
import java.io.IOException;
@@ -31,7 +33,17 @@ public class ServerImplUtils {
public static RaftServerProxy newRaftServer(
RaftPeerId id, RaftGroup group, StateMachine stateMachine,
RaftProperties properties, Parameters parameters) throws IOException {
- return new RaftServerProxy(id, stateMachine, group, properties, parameters);
+ try {
+ // attempt multiple times to avoid temporary bind exception
+ return JavaUtils.attempt(
+ () -> new RaftServerProxy(id, stateMachine, group, properties, parameters),
+ 5, 500L, "newRaftServer", RaftServerImpl.LOG);
+ } catch (InterruptedException e) {
+ throw IOUtils.toInterruptedIOException(
+ "Interrupted when creating RaftServer " + id + ", " + group, e);
+ } catch (IOException e) {
+ throw new IOException("Failed to create RaftServer " + id + ", " + group, e);
+ }
}
public static TermIndex newTermIndex(long term, long index) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e46aee2c/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 46ff7d8..c3972a1 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -31,19 +31,16 @@ import org.apache.ratis.server.storage.RaftLog;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
-import org.apache.ratis.util.CheckedRunnable;
import org.apache.ratis.util.JavaUtils;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Objects;
-import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BooleanSupplier;
import java.util.function.IntSupplier;
@@ -257,26 +254,6 @@ public interface RaftTestUtil {
}
}
- static <T extends Throwable> void attempt(
- int n, long sleepMs, CheckedRunnable<T> runnable)
- throws T, InterruptedException {
- for(int i = 1; i <= n; i++) {
- LOG.info("Attempt #" + i + "/" + n + ": sleep " + sleepMs + "ms");
- if (sleepMs > 0) {
- Thread.sleep(sleepMs);
- }
- try {
- runnable.run();
- return;
- } catch (Throwable t) {
- if (i == n) {
- throw t;
- }
- LOG.warn("Attempt #" + i + "/" + n + ": Ignoring " + t + " and retry.");
- }
- }
- }
-
static RaftPeerId changeLeader(MiniRaftCluster cluster, RaftPeerId oldLeader)
throws InterruptedException {
cluster.setBlockRequestsFrom(oldLeader.toString(), true);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e46aee2c/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index 34ade52..316377b 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -28,8 +28,9 @@ import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.*;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.storage.RaftStorageTestUtils;
+import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -48,6 +49,7 @@ import static java.util.Arrays.asList;
import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
import static org.apache.ratis.server.impl.RaftServerTestUtil.waitAndCheckNewConf;
import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY;
+import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.NOOP;
public abstract class RaftReconfigurationBaseTest extends BaseTest {
static {
@@ -511,6 +513,7 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest {
@Test
public void testRevertConfigurationChange() throws Exception {
LOG.info("Start testRevertConfigurationChange");
+ RaftLog log2 = null;
final MiniRaftCluster cluster = getCluster(5);
try {
cluster.start();
@@ -520,8 +523,8 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest {
final RaftPeerId leaderId = leader.getId();
final RaftLog log = leader.getState().getLog();
+ log2 = log;
Thread.sleep(1000);
- Assert.assertEquals(0, log.getLatestFlushedIndex());
// we block the incoming msg for the leader and block its requests to
// followers, so that we force the leader change and the old leader will
@@ -533,7 +536,7 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest {
PeerChanges change = cluster.removePeers(1, false, new ArrayList<>());
AtomicBoolean gotNotLeader = new AtomicBoolean(false);
- new Thread(() -> {
+ final Thread clientThread = new Thread(() -> {
try(final RaftClient client = cluster.createClient(leaderId)) {
LOG.info("client starts to change conf");
final RaftClientRpc sender = client.getClientRpc();
@@ -545,37 +548,41 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest {
} catch (IOException e) {
LOG.warn("Got unexpected exception when client1 changes conf", e);
}
- }).start();
+ });
+ clientThread.start();
+
+ // find CONFIGURATIONENTRY, there may be NOOP before and after it.
+ final long confIndex = JavaUtils.attempt(() -> {
+ final long last = log.getLastEntryTermIndex().getIndex();
+ for (long i = 1; i <= last; i++) {
+ if (log.get(i).getLogEntryBodyCase() == CONFIGURATIONENTRY) {
+ return i;
+ }
+ }
+ throw new Exception("CONFIGURATIONENTRY not found: last=" + last);
+ }, 10, 500, "confIndex", LOG);
// wait till the old leader persist the new conf
- for (int i = 0; i < 10 && log.getLatestFlushedIndex() < 1; i++) {
- Thread.sleep(500);
- }
- Assert.assertEquals(1, log.getLatestFlushedIndex());
- TermIndex last = log.getLastEntryTermIndex();
- Assert.assertEquals(CONFIGURATIONENTRY,
- log.get(last.getIndex()).getLogEntryBodyCase());
+ JavaUtils.attempt(() -> log.getLatestFlushedIndex() >= confIndex,
+ 10, 500L, "FLUSH", LOG);
+ final long committed = log.getLastCommittedIndex();
+ Assert.assertTrue(committed < confIndex);
// unblock the old leader
BlockRequestHandlingInjection.getInstance().unblockReplier(leaderId.toString());
cluster.setBlockRequestsFrom(leaderId.toString(), false);
// the client should get NotLeaderException
- for (int i = 0; i < 10 && !gotNotLeader.get(); i++) {
- Thread.sleep(500);
- }
+ clientThread.join(5000);
Assert.assertTrue(gotNotLeader.get());
// the old leader should have truncated the setConf from the log
- boolean newState = false;
- for (int i = 0; i < 10 && !newState; i++) {
- Thread.sleep(500);
- TermIndex lastTermIndex = log.getLastEntryTermIndex();
- newState = log.getLastCommittedIndex() == 1 &&
- log.get(lastTermIndex.getIndex()).getLogEntryBodyCase() != CONFIGURATIONENTRY;
- }
- Assert.assertTrue(newState);
+ JavaUtils.attempt(() -> log.getLastCommittedIndex() >= confIndex,
+ 10, 500L, "COMMIT", LOG);
+ Assert.assertEquals(NOOP, log.get(confIndex).getLogEntryBodyCase());
+ log2 = null;
} finally {
+ RaftStorageTestUtils.printLog(log2, s -> LOG.info(s));
cluster.shutdown();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e46aee2c/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index 62c68bf..909685f 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -21,6 +21,7 @@ import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.util.JavaUtils;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,8 +35,8 @@ public class RaftServerTestUtil {
RaftPeer[] peers, int numOfRemovedPeers, Collection<String> deadPeers)
throws Exception {
final long sleepMs = cluster.getMaxTimeout() * (numOfRemovedPeers + 2);
- RaftTestUtil.attempt(3, sleepMs,
- () -> waitAndCheckNewConf(cluster, peers, deadPeers));
+ JavaUtils.attempt(() -> waitAndCheckNewConf(cluster, peers, deadPeers),
+ 3, sleepMs, "waitAndCheckNewConf", LOG);
}
private static void waitAndCheckNewConf(MiniRaftCluster cluster,
RaftPeer[] peers, Collection<String> deadPeers)
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e46aee2c/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
new file mode 100644
index 0000000..55ae7f3
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.shaded.proto.RaftProtos;
+import org.apache.ratis.util.AutoCloseableLock;
+
+import java.util.function.Consumer;
+
+public interface RaftStorageTestUtils {
+ static void printLog(RaftLog log, Consumer<String> println) {
+ if (log == null) {
+ println.accept("log == null");
+ return;
+ }
+
+ final TermIndex last;
+ final long flushed, committed;
+ try(AutoCloseableLock readlock = log.readLock()) {
+ last = log.getLastEntryTermIndex();
+ flushed = log.getLatestFlushedIndex();
+ committed = log.getLastCommittedIndex();
+ }
+ final StringBuilder b = new StringBuilder();
+ for(long i = 0; i <= last.getIndex(); i++) {
+ b.setLength(0);
+ b.append(i == flushed? 'f': ' ');
+ b.append(i == committed? 'c': ' ');
+ b.append(String.format("%3d: ", i));
+ try {
+ final RaftProtos.LogEntryProto entry = log.get(i);
+ b.append(entry != null? entry.getLogEntryBodyCase(): null);
+ } catch (RaftLogIOException e) {
+ b.append(e);
+ }
+ println.accept(b.toString());
+ }
+ }
+}