You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2017/08/28 21:04:03 UTC

incubator-ratis git commit: RATIS-112. testRevertConfigurationChange may fail.

Repository: incubator-ratis
Updated Branches:
  refs/heads/master ee262d7e3 -> e46aee2c8


RATIS-112. testRevertConfigurationChange may fail.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/e46aee2c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/e46aee2c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/e46aee2c

Branch: refs/heads/master
Commit: e46aee2c87e85fe7317f2e807cdf97da0be31ddc
Parents: ee262d7
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Mon Aug 28 14:03:24 2017 -0700
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Mon Aug 28 14:03:24 2017 -0700

----------------------------------------------------------------------
 .../org/apache/ratis/util/CheckedSupplier.java  |  8 +++
 .../java/org/apache/ratis/util/JavaUtils.java   | 64 ++++++++++++++++++++
 .../ratis/server/impl/ServerImplUtils.java      | 14 ++++-
 .../java/org/apache/ratis/RaftTestUtil.java     | 23 -------
 .../impl/RaftReconfigurationBaseTest.java       | 51 +++++++++-------
 .../ratis/server/impl/RaftServerTestUtil.java   |  5 +-
 .../server/storage/RaftStorageTestUtils.java    | 55 +++++++++++++++++
 7 files changed, 172 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e46aee2c/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java b/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java
index 0c9de31..06abe4c 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java
@@ -27,4 +27,12 @@ public interface CheckedSupplier<OUTPUT, THROWABLE extends Throwable> {
    * except that this method is declared with a throws-clause.
    */
   OUTPUT get() throws THROWABLE;
+
+  static <THROWABLE extends Throwable> CheckedSupplier<?, THROWABLE> valueOf(
+      CheckedRunnable<THROWABLE> runnable) {
+    return () -> {
+      runnable.run();
+      return null;
+    };
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e46aee2c/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index 0664aec..c78f999 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
 import java.util.Objects;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
@@ -104,4 +105,67 @@ public interface JavaUtils {
   static ThreadGroup getRootThreadGroup() {
     return ROOT_THREAD_GROUP.get();
   }
+
+  /** Attempt to get a return value from the given supplier multiple times. */
+  static <RETURN, THROWABLE extends Throwable> RETURN attempt(
+      CheckedSupplier<RETURN, THROWABLE> supplier,
+      int numAttempts, long sleepMs, String name, Logger log)
+      throws THROWABLE, InterruptedException {
+    Objects.requireNonNull(supplier, "supplier == null");
+    Preconditions.assertTrue(numAttempts > 0, () -> "numAttempts = " + numAttempts + " <= 0");
+    Preconditions.assertTrue(sleepMs >= 0L, () -> "sleepMs = " + sleepMs + " < 0");
+
+    for(int i = 1; i <= numAttempts; i++) {
+      try {
+        return supplier.get();
+      } catch (Throwable t) {
+        if (i == numAttempts) {
+          throw t;
+        }
+        if (log != null && log.isWarnEnabled()) {
+          log.warn("FAILED " + name + " attempt #" + i + "/" + numAttempts
+              + ": " + t + ", sleep " + sleepMs + "ms and then retry.");
+        }
+      }
+
+      if (sleepMs > 0) {
+        Thread.sleep(sleepMs);
+      }
+    }
+    throw new IllegalStateException("BUG: this line should be unreachable.");
+  }
+
+  /** Attempt to run the given op multiple times. */
+  static <THROWABLE extends Throwable> void attempt(
+      CheckedRunnable<THROWABLE> op, int numAttempts, long sleepMs, String name, Logger log)
+      throws THROWABLE, InterruptedException {
+    attempt(CheckedSupplier.valueOf(op), numAttempts, sleepMs, name, log);
+  }
+
+  /** Attempt to wait the given condition to return true multiple times. */
+  static void attempt(
+      BooleanSupplier condition, int numAttempts, long sleepMs, String name, Logger log)
+      throws InterruptedException {
+    Objects.requireNonNull(condition, "condition == null");
+    Preconditions.assertTrue(numAttempts > 0, () -> "numAttempts = " + numAttempts + " <= 0");
+    Preconditions.assertTrue(sleepMs >= 0L, () -> "sleepMs = " + sleepMs + " < 0");
+
+    for(int i = 1; i <= numAttempts; i++) {
+      if (condition.getAsBoolean()) {
+        return;
+      }
+      if (log != null && log.isWarnEnabled()) {
+        log.warn("FAILED " + name + " attempt #" + i + "/" + numAttempts
+            + ": sleep " + sleepMs + "ms and then retry.");
+      }
+      if (sleepMs > 0) {
+        Thread.sleep(sleepMs);
+      }
+    }
+
+    if (!condition.getAsBoolean()) {
+      throw new IllegalStateException("Failed " + name + " for " + numAttempts + " attempts.");
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e46aee2c/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index bcbab9a..544ed13 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -23,6 +23,8 @@ import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.JavaUtils;
 
 import java.io.IOException;
 
@@ -31,7 +33,17 @@ public class ServerImplUtils {
   public static RaftServerProxy newRaftServer(
       RaftPeerId id, RaftGroup group, StateMachine stateMachine,
       RaftProperties properties, Parameters parameters) throws IOException {
-    return new RaftServerProxy(id, stateMachine, group, properties, parameters);
+    try {
+      // attempt multiple times to avoid temporary bind exception
+      return JavaUtils.attempt(
+          () -> new RaftServerProxy(id, stateMachine, group, properties, parameters),
+          5, 500L, "newRaftServer", RaftServerImpl.LOG);
+    } catch (InterruptedException e) {
+      throw IOUtils.toInterruptedIOException(
+          "Interrupted when creating RaftServer " + id + ", " + group, e);
+    } catch (IOException e) {
+      throw new IOException("Failed to create RaftServer " + id + ", " + group, e);
+    }
   }
 
   public static TermIndex newTermIndex(long term, long index) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e46aee2c/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 46ff7d8..c3972a1 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -31,19 +31,16 @@ import org.apache.ratis.server.storage.RaftLog;
 import org.apache.ratis.shaded.com.google.protobuf.ByteString;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
-import org.apache.ratis.util.CheckedRunnable;
 import org.apache.ratis.util.JavaUtils;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Objects;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.BooleanSupplier;
 import java.util.function.IntSupplier;
 
@@ -257,26 +254,6 @@ public interface RaftTestUtil {
     }
   }
 
-  static <T extends Throwable> void attempt(
-      int n, long sleepMs, CheckedRunnable<T> runnable)
-      throws T, InterruptedException {
-    for(int i = 1; i <= n; i++) {
-      LOG.info("Attempt #" + i + "/" + n +  ": sleep " + sleepMs + "ms");
-      if (sleepMs > 0) {
-        Thread.sleep(sleepMs);
-      }
-      try {
-        runnable.run();
-        return;
-      } catch (Throwable t) {
-        if (i == n) {
-          throw t;
-        }
-        LOG.warn("Attempt #" + i + "/" + n + ": Ignoring " + t + " and retry.");
-      }
-    }
-  }
-
   static RaftPeerId changeLeader(MiniRaftCluster cluster, RaftPeerId oldLeader)
       throws InterruptedException {
     cluster.setBlockRequestsFrom(oldLeader.toString(), true);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e46aee2c/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index 34ade52..316377b 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -28,8 +28,9 @@ import org.apache.ratis.client.RaftClientRpc;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.storage.RaftStorageTestUtils;
+import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LogUtils;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -48,6 +49,7 @@ import static java.util.Arrays.asList;
 import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
 import static org.apache.ratis.server.impl.RaftServerTestUtil.waitAndCheckNewConf;
 import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY;
+import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.NOOP;
 
 public abstract class RaftReconfigurationBaseTest extends BaseTest {
   static {
@@ -511,6 +513,7 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest {
   @Test
   public void testRevertConfigurationChange() throws Exception {
     LOG.info("Start testRevertConfigurationChange");
+    RaftLog log2 = null;
     final MiniRaftCluster cluster = getCluster(5);
     try {
       cluster.start();
@@ -520,8 +523,8 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest {
       final RaftPeerId leaderId = leader.getId();
 
       final RaftLog log = leader.getState().getLog();
+      log2 = log;
       Thread.sleep(1000);
-      Assert.assertEquals(0, log.getLatestFlushedIndex());
 
       // we block the incoming msg for the leader and block its requests to
       // followers, so that we force the leader change and the old leader will
@@ -533,7 +536,7 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest {
       PeerChanges change = cluster.removePeers(1, false, new ArrayList<>());
 
       AtomicBoolean gotNotLeader = new AtomicBoolean(false);
-      new Thread(() -> {
+      final Thread clientThread = new Thread(() -> {
         try(final RaftClient client = cluster.createClient(leaderId)) {
           LOG.info("client starts to change conf");
           final RaftClientRpc sender = client.getClientRpc();
@@ -545,37 +548,41 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest {
         } catch (IOException e) {
           LOG.warn("Got unexpected exception when client1 changes conf", e);
         }
-      }).start();
+      });
+      clientThread.start();
+
+      // find CONFIGURATIONENTRY, there may be NOOP before and after it.
+      final long confIndex = JavaUtils.attempt(() -> {
+        final long last = log.getLastEntryTermIndex().getIndex();
+        for (long i = 1; i <= last; i++) {
+          if (log.get(i).getLogEntryBodyCase() == CONFIGURATIONENTRY) {
+            return i;
+          }
+        }
+        throw new Exception("CONFIGURATIONENTRY not found: last=" + last);
+      }, 10, 500, "confIndex", LOG);
 
       // wait till the old leader persist the new conf
-      for (int i = 0; i < 10 && log.getLatestFlushedIndex() < 1; i++) {
-        Thread.sleep(500);
-      }
-      Assert.assertEquals(1, log.getLatestFlushedIndex());
-      TermIndex last = log.getLastEntryTermIndex();
-      Assert.assertEquals(CONFIGURATIONENTRY,
-          log.get(last.getIndex()).getLogEntryBodyCase());
+      JavaUtils.attempt(() -> log.getLatestFlushedIndex() >= confIndex,
+          10, 500L, "FLUSH", LOG);
+      final long committed = log.getLastCommittedIndex();
+      Assert.assertTrue(committed < confIndex);
 
       // unblock the old leader
       BlockRequestHandlingInjection.getInstance().unblockReplier(leaderId.toString());
       cluster.setBlockRequestsFrom(leaderId.toString(), false);
 
       // the client should get NotLeaderException
-      for (int i = 0; i < 10 && !gotNotLeader.get(); i++) {
-        Thread.sleep(500);
-      }
+      clientThread.join(5000);
       Assert.assertTrue(gotNotLeader.get());
 
       // the old leader should have truncated the setConf from the log
-      boolean newState = false;
-      for (int i = 0; i < 10 && !newState; i++) {
-        Thread.sleep(500);
-        TermIndex lastTermIndex = log.getLastEntryTermIndex();
-        newState = log.getLastCommittedIndex() == 1 &&
-            log.get(lastTermIndex.getIndex()).getLogEntryBodyCase() != CONFIGURATIONENTRY;
-      }
-      Assert.assertTrue(newState);
+      JavaUtils.attempt(() -> log.getLastCommittedIndex() >= confIndex,
+          10, 500L, "COMMIT", LOG);
+      Assert.assertEquals(NOOP, log.get(confIndex).getLogEntryBodyCase());
+      log2 = null;
     } finally {
+      RaftStorageTestUtils.printLog(log2, s -> LOG.info(s));
       cluster.shutdown();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e46aee2c/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index 62c68bf..909685f 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -21,6 +21,7 @@ import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.util.JavaUtils;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,8 +35,8 @@ public class RaftServerTestUtil {
       RaftPeer[] peers, int numOfRemovedPeers, Collection<String> deadPeers)
       throws Exception {
     final long sleepMs = cluster.getMaxTimeout() * (numOfRemovedPeers + 2);
-    RaftTestUtil.attempt(3, sleepMs,
-        () -> waitAndCheckNewConf(cluster, peers, deadPeers));
+    JavaUtils.attempt(() -> waitAndCheckNewConf(cluster, peers, deadPeers),
+        3, sleepMs, "waitAndCheckNewConf", LOG);
   }
   private static void waitAndCheckNewConf(MiniRaftCluster cluster,
       RaftPeer[] peers, Collection<String> deadPeers)

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e46aee2c/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
new file mode 100644
index 0000000..55ae7f3
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.shaded.proto.RaftProtos;
+import org.apache.ratis.util.AutoCloseableLock;
+
+import java.util.function.Consumer;
+
+public interface RaftStorageTestUtils {
+  static void printLog(RaftLog log, Consumer<String> println) {
+    if (log == null) {
+      println.accept("log == null");
+      return;
+    }
+
+    final TermIndex last;
+    final long flushed, committed;
+    try(AutoCloseableLock readlock = log.readLock()) {
+      last = log.getLastEntryTermIndex();
+      flushed = log.getLatestFlushedIndex();
+      committed = log.getLastCommittedIndex();
+    }
+    final StringBuilder b = new StringBuilder();
+    for(long i = 0; i <= last.getIndex(); i++) {
+      b.setLength(0);
+      b.append(i == flushed? 'f': ' ');
+      b.append(i == committed? 'c': ' ');
+      b.append(String.format("%3d: ", i));
+      try {
+        final RaftProtos.LogEntryProto entry = log.get(i);
+        b.append(entry != null? entry.getLogEntryBodyCase(): null);
+      } catch (RaftLogIOException e) {
+        b.append(e);
+      }
+      println.accept(b.toString());
+    }
+  }
+}