You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/08/15 20:48:27 UTC

[kafka] branch trunk updated: MINOR: Use max retries for consumer group tests to avoid flakiness (#7186)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 14215d1  MINOR: Use max retries for consumer group tests to avoid flakiness (#7186)
14215d1 is described below

commit 14215d1b84e937c4656e0984c1ce76d9aac65bdd
Author: Bob Barrett <bo...@confluent.io>
AuthorDate: Thu Aug 15 13:47:56 2019 -0700

    MINOR: Use max retries for consumer group tests to avoid flakiness (#7186)
    
    This patch updates ConsumerGroupCommandTest.scala to use the maximum possible number of AdminClient retries. The test runs will still be bounded by the request timeout. This address flakiness in tests such as testResetOffsetsNotExistingGroup and testResetOffsetsExistingTopic, which was caused by group coordinators being intermittently unavailable.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
---
 .../scala/kafka/admin/ConsumerGroupCommand.scala   |  8 ++--
 .../kafka/admin/ConsumerGroupCommandTest.scala     |  5 ++-
 .../kafka/admin/DeleteConsumerGroupsTest.scala     | 24 +++++------
 .../kafka/admin/DescribeConsumerGroupTest.scala    | 46 +++++++++++-----------
 .../unit/kafka/admin/ListConsumerGroupTest.scala   |  2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    | 23 ++++-------
 6 files changed, 51 insertions(+), 57 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 3f2ed32..8c4c5e4 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -158,9 +158,10 @@ object ConsumerGroupCommand extends Logging {
     }
   }
 
-  class ConsumerGroupService(val opts: ConsumerGroupCommandOptions) {
+  class ConsumerGroupService(val opts: ConsumerGroupCommandOptions,
+                             private[admin] val configOverrides: Map[String, String] = Map.empty) {
 
-    private val adminClient = createAdminClient()
+    private val adminClient = createAdminClient(configOverrides)
 
     // `consumers` are only needed for `describe`, so we instantiate them lazily
     private lazy val consumers: mutable.Map[String, KafkaConsumer[String, String]] = mutable.Map.empty
@@ -528,9 +529,10 @@ object ConsumerGroupCommand extends Logging {
       )
     }
 
-    private def createAdminClient(): Admin = {
+    private def createAdminClient(configOverrides: Map[String, String]): Admin = {
       val props = if (opts.options.has(opts.commandConfigOpt)) Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) else new Properties()
       props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
+      configOverrides.foreach { case (k, v) => props.put(k, v)}
       admin.AdminClient.create(props)
     }
 
diff --git a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
index d5eea98..c398940 100644
--- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
@@ -25,14 +25,15 @@ import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGr
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.AdminClientConfig
 import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.WakeupException
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.junit.{After, Before}
 
-import scala.collection.mutable.ArrayBuffer
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
 
 class ConsumerGroupCommandTest extends KafkaServerTestHarness {
   import ConsumerGroupCommandTest._
@@ -84,7 +85,7 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness {
 
   def getConsumerGroupService(args: Array[String]): ConsumerGroupService = {
     val opts = new ConsumerGroupCommandOptions(args)
-    val service = new ConsumerGroupService(opts)
+    val service = new ConsumerGroupService(opts, Map(AdminClientConfig.RETRIES_CONFIG -> Int.MaxValue.toString))
     consumerGroupService = service :: consumerGroupService
     service
   }
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
index 769f33a..63fb84a 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
@@ -71,7 +71,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
 
     TestUtils.waitUntilTrue(() => {
       service.collectGroupMembers(group, false)._2.get.size == 1
-    }, "The group did not initialize as expected.", maxRetries = 3)
+    }, "The group did not initialize as expected.")
 
     val output = TestUtils.grabConsoleOutput(service.deleteGroups())
     assertTrue(s"The expected error (${Errors.NON_EMPTY_GROUP}) was not detected while deleting consumer group. Output was: (${output})",
@@ -89,7 +89,7 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
 
     TestUtils.waitUntilTrue(() => {
       service.collectGroupMembers(group, false)._2.get.size == 1
-    }, "The group did not initialize as expected.", maxRetries = 3)
+    }, "The group did not initialize as expected.")
 
     val result = service.deleteGroups()
     assertNotNull(s"Group was deleted successfully, but it shouldn't have been. Result was:(${result})", result(group))
@@ -108,13 +108,13 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
 
     TestUtils.waitUntilTrue(() => {
       service.listGroups().contains(group)
-    }, "The group did not initialize as expected.", maxRetries = 3)
+    }, "The group did not initialize as expected.")
 
     executor.shutdown()
 
     TestUtils.waitUntilTrue(() => {
       service.collectGroupState(group).state == "Empty"
-    }, "The group did not become empty as expected.", maxRetries = 3)
+    }, "The group did not become empty as expected.")
 
     val output = TestUtils.grabConsoleOutput(service.deleteGroups())
     assertTrue(s"The consumer group could not be deleted as expected",
@@ -138,14 +138,14 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
 
     TestUtils.waitUntilTrue(() => {
       service.listGroups().forall(groupId => groups.keySet.contains(groupId))
-    }, "The group did not initialize as expected.", maxRetries = 3)
+    }, "The group did not initialize as expected.")
 
     // Shutdown consumers to empty out groups
     groups.values.foreach(executor => executor.shutdown())
 
     TestUtils.waitUntilTrue(() => {
       groups.keySet.forall(groupId => service.collectGroupState(groupId).state == "Empty")
-    }, "The group did not become empty as expected.", maxRetries = 3)
+    }, "The group did not become empty as expected.")
 
     val output = TestUtils.grabConsoleOutput(service.deleteGroups()).trim
     val expectedGroupsForDeletion = groups.keySet
@@ -169,13 +169,13 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
 
     TestUtils.waitUntilTrue(() => {
       service.listGroups().contains(group)
-    }, "The group did not initialize as expected.", maxRetries = 3)
+    }, "The group did not initialize as expected.")
 
     executor.shutdown()
 
     TestUtils.waitUntilTrue(() => {
       service.collectGroupState(group).state == "Empty"
-    }, "The group did not become empty as expected.", maxRetries = 3)
+    }, "The group did not become empty as expected.")
 
     val result = service.deleteGroups()
     assertTrue(s"The consumer group could not be deleted as expected",
@@ -194,13 +194,13 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
 
     TestUtils.waitUntilTrue(() => {
       service.listGroups().contains(group)
-    }, "The group did not initialize as expected.", maxRetries = 3)
+    }, "The group did not initialize as expected.")
 
     executor.shutdown()
 
     TestUtils.waitUntilTrue(() => {
       service.collectGroupState(group).state == "Empty"
-    }, "The group did not become empty as expected.", maxRetries = 3)
+    }, "The group did not become empty as expected.")
 
     val service2 = getConsumerGroupService(cgcArgs ++ Array("--group", missingGroup))
     val output = TestUtils.grabConsoleOutput(service2.deleteGroups())
@@ -221,13 +221,13 @@ class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
 
     TestUtils.waitUntilTrue(() => {
       service.listGroups().contains(group)
-    }, "The group did not initialize as expected.", maxRetries = 3)
+    }, "The group did not initialize as expected.")
 
     executor.shutdown()
 
     TestUtils.waitUntilTrue(() => {
       service.collectGroupState(group).state == "Empty"
-    }, "The group did not become empty as expected.", maxRetries = 3)
+    }, "The group did not become empty as expected.")
 
     val service2 = getConsumerGroupService(cgcArgs ++ Array("--group", missingGroup))
     val result = service2.deleteGroups()
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index dcad72a..68a4f31 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -127,7 +127,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
       TestUtils.waitUntilTrue(() => {
         val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroups())
         output.trim.split("\n").length == 2 && error.isEmpty
-      }, s"Expected a data row and no error in describe results with describe type ${describeType.mkString(" ")}.", maxRetries = 3)
+      }, s"Expected a data row and no error in describe results with describe type ${describeType.mkString(" ")}.")
     }
   }
 
@@ -152,7 +152,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
         val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroups())
         val numLines = output.trim.split("\n").filterNot(line => line.isEmpty).length
         (numLines == expectedNumLines) && error.isEmpty
-      }, s"Expected a data row and no error in describe results with describe type ${describeType.mkString(" ")}.", maxRetries = 3)
+      }, s"Expected a data row and no error in describe results with describe type ${describeType.mkString(" ")}.")
     }
   }
 
@@ -176,7 +176,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
         val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroups())
         val numLines = output.trim.split("\n").filterNot(line => line.isEmpty).length
         (numLines == expectedNumLines) && error.isEmpty
-      }, s"Expected a data row and no error in describe results with describe type ${describeType.mkString(" ")}.", maxRetries = 3)
+      }, s"Expected a data row and no error in describe results with describe type ${describeType.mkString(" ")}.")
     }
   }
 
@@ -198,7 +198,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
         assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
         assignments.get.filter(_.group == group).head.clientId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
         assignments.get.filter(_.group == group).head.host.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
-    }, s"Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe results for group $group.", maxRetries = 3)
+    }, s"Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe results for group $group.")
   }
 
   @Test
@@ -222,7 +222,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
           case None =>
             false
         })
-    }, s"Expected a 'Stable' group status, rows and valid member information for group $group.", maxRetries = 3)
+    }, s"Expected a 'Stable' group status, rows and valid member information for group $group.")
 
     val (_, assignments) = service.collectGroupMembers(group, true)
     assignments match {
@@ -251,7 +251,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
         state.assignmentStrategy == "range" &&
         state.coordinator != null &&
         servers.map(_.config.brokerId).toList.contains(state.coordinator.id)
-    }, s"Expected a 'Stable' group status, with one member and round robin assignment strategy for group $group.", maxRetries = 3)
+    }, s"Expected a 'Stable' group status, with one member and round robin assignment strategy for group $group.")
   }
 
   @Test
@@ -270,7 +270,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
         state.assignmentStrategy == "roundrobin" &&
         state.coordinator != null &&
         servers.map(_.config.brokerId).toList.contains(state.coordinator.id)
-    }, s"Expected a 'Stable' group status, with one member and round robin assignment strategy for group $group.", maxRetries = 3)
+    }, s"Expected a 'Stable' group status, with one member and round robin assignment strategy for group $group.")
   }
 
   @Test
@@ -287,13 +287,13 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
       TestUtils.waitUntilTrue(() => {
         val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroups())
         output.trim.split("\n").length == 2 && error.isEmpty
-      }, s"Expected describe group results with one data row for describe type '${describeType.mkString(" ")}'", maxRetries = 3)
+      }, s"Expected describe group results with one data row for describe type '${describeType.mkString(" ")}'")
 
       // stop the consumer so the group has no active member anymore
       executor.shutdown()
       TestUtils.waitUntilTrue(() => {
         TestUtils.grabConsoleError(service.describeGroups()).contains(s"Consumer group '$group' has no active members.")
-      }, s"Expected no active member in describe group results with describe type ${describeType.mkString(" ")}", maxRetries = 3)
+      }, s"Expected no active member in describe group results with describe type ${describeType.mkString(" ")}")
     }
   }
 
@@ -310,7 +310,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     TestUtils.waitUntilTrue(() => {
       val (state, assignments) = service.collectGroupOffsets(group)
       state.contains("Stable") && assignments.exists(_.exists(_.group == group))
-    }, "Expected the group to initially become stable, and to find group in assignments after initial offset commit.", maxRetries = 3)
+    }, "Expected the group to initially become stable, and to find group in assignments after initial offset commit.")
 
     // stop the consumer so the group has no active member anymore
     executor.shutdown()
@@ -343,7 +343,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     TestUtils.waitUntilTrue(() => {
       val (state, assignments) = service.collectGroupMembers(group, false)
       state.contains("Stable") && assignments.exists(_.exists(_.group == group))
-    }, "Expected the group to initially become stable, and to find group in assignments after initial offset commit.", maxRetries = 3)
+    }, "Expected the group to initially become stable, and to find group in assignments after initial offset commit.")
 
     // stop the consumer so the group has no active member anymore
     executor.shutdown()
@@ -351,7 +351,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     TestUtils.waitUntilTrue(() => {
       val (state, assignments) = service.collectGroupMembers(group, false)
       state.contains("Empty") && assignments.isDefined && assignments.get.isEmpty
-    }, s"Expected no member in describe group members results for group '$group'", maxRetries = 3)
+    }, s"Expected no member in describe group members results for group '$group'")
   }
 
   @Test
@@ -370,7 +370,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
         state.numMembers == 1 &&
         state.coordinator != null &&
         servers.map(_.config.brokerId).toList.contains(state.coordinator.id)
-    }, s"Expected the group '$group' to initially become stable, and have a single member.", maxRetries = 3)
+    }, s"Expected the group '$group' to initially become stable, and have a single member.")
 
     // stop the consumer so the group has no active member anymore
     executor.shutdown()
@@ -378,7 +378,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     TestUtils.waitUntilTrue(() => {
       val state = service.collectGroupState(group)
       state.state == "Empty" && state.numMembers == 0 && state.assignmentStrategy == ""
-    }, s"Expected the group '$group' to become empty after the only member leaving.", maxRetries = 3)
+    }, s"Expected the group '$group' to become empty after the only member leaving.")
   }
 
   @Test
@@ -396,7 +396,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
         val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroups())
         val expectedNumRows = if (describeTypeMembers.contains(describeType)) 3 else 2
         error.isEmpty && output.trim.split("\n").size == expectedNumRows
-      }, s"Expected a single data row in describe group result with describe type '${describeType.mkString(" ")}'", maxRetries = 3)
+      }, s"Expected a single data row in describe group result with describe type '${describeType.mkString(" ")}'")
     }
   }
 
@@ -416,7 +416,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
         assignments.isDefined &&
         assignments.get.count(_.group == group) == 1 &&
         assignments.get.count { x => x.group == group && x.partition.isDefined } == 1
-    }, "Expected rows for consumers with no assigned partitions in describe group results", maxRetries = 3)
+    }, "Expected rows for consumers with no assigned partitions in describe group results")
   }
 
   @Test
@@ -437,7 +437,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
         assignments.get.count { x => x.group == group && x.numPartitions == 1 } == 1 &&
         assignments.get.count { x => x.group == group && x.numPartitions == 0 } == 1 &&
         assignments.get.count(_.assignment.nonEmpty) == 0
-    }, "Expected rows for consumers with no assigned partitions in describe group results", maxRetries = 3)
+    }, "Expected rows for consumers with no assigned partitions in describe group results")
 
     val (state, assignments) = service.collectGroupMembers(group, true)
     assertTrue("Expected additional columns in verbose version of describe members",
@@ -457,7 +457,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     TestUtils.waitUntilTrue(() => {
       val state = service.collectGroupState(group)
       state.state == "Stable" && state.numMembers == 2
-    }, "Expected two consumers in describe group results", maxRetries = 3)
+    }, "Expected two consumers in describe group results")
   }
 
   @Test
@@ -477,7 +477,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
         val (output, error) = TestUtils.grabConsoleOutputAndError(service.describeGroups())
         val expectedNumRows = if (describeTypeState.contains(describeType)) 2 else 3
         error.isEmpty && output.trim.split("\n").size == expectedNumRows
-      }, s"Expected a single data row in describe group result with describe type '${describeType.mkString(" ")}'", maxRetries = 3)
+      }, s"Expected a single data row in describe group result with describe type '${describeType.mkString(" ")}'")
     }
   }
 
@@ -500,7 +500,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
         assignments.get.count(_.group == group) == 2 &&
         assignments.get.count{ x => x.group == group && x.partition.isDefined} == 2 &&
         assignments.get.count{ x => x.group == group && x.partition.isEmpty} == 0
-    }, "Expected two rows (one row per consumer) in describe group results.", maxRetries = 3)
+    }, "Expected two rows (one row per consumer) in describe group results.")
   }
 
   @Test
@@ -522,7 +522,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
         assignments.get.count(_.group == group) == 2 &&
         assignments.get.count{ x => x.group == group && x.numPartitions == 1 } == 2 &&
         assignments.get.count{ x => x.group == group && x.numPartitions == 0 } == 0
-    }, "Expected two rows (one row per consumer) in describe group members results.", maxRetries = 3)
+    }, "Expected two rows (one row per consumer) in describe group members results.")
 
     val (state, assignments) = service.collectGroupMembers(group, true)
     assertTrue("Expected additional columns in verbose version of describe members",
@@ -544,7 +544,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     TestUtils.waitUntilTrue(() => {
       val state = service.collectGroupState(group)
       state.state == "Stable" && state.group == group && state.numMembers == 2
-    }, "Expected a stable group with two members in describe group state result.", maxRetries = 3)
+    }, "Expected a stable group with two members in describe group state result.")
   }
 
   @Test
@@ -562,7 +562,7 @@ class DescribeConsumerGroupTest extends ConsumerGroupCommandTest {
     TestUtils.waitUntilTrue(() => {
       val (state, assignments) = service.collectGroupOffsets(group)
       state.contains("Empty") && assignments.isDefined && assignments.get.count(_.group == group) == 2
-    }, "Expected a stable group with two members in describe group state result.", maxRetries = 3)
+    }, "Expected a stable group with two members in describe group state result.")
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
index 1a35c4c..32f6614 100644
--- a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
@@ -36,7 +36,7 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest {
     TestUtils.waitUntilTrue(() => {
       foundGroups = service.listGroups().toSet
       expectedGroups == foundGroups
-    }, s"Expected --list to show groups $expectedGroups, but found $foundGroups.", maxRetries = 3)
+    }, s"Expected --list to show groups $expectedGroups, but found $foundGroups.")
   }
 
   @Test(expected = classOf[OptionException])
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 960ab4c..30ca2d8 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -808,27 +808,18 @@ object TestUtils extends Logging {
     * @param msg error message
     * @param waitTimeMs maximum time to wait and retest the condition before failing the test
     * @param pause delay between condition checks
-    * @param maxRetries maximum number of retries to check the given condition if a retriable exception is thrown
     */
   def waitUntilTrue(condition: () => Boolean, msg: => String,
-                    waitTimeMs: Long = JTestUtils.DEFAULT_MAX_WAIT_MS, pause: Long = 100L, maxRetries: Int = 0): Unit = {
+                    waitTimeMs: Long = JTestUtils.DEFAULT_MAX_WAIT_MS, pause: Long = 100L): Unit = {
     val startTime = System.currentTimeMillis()
-    var retry = 0
     while (true) {
-      try {
-        if (condition())
-          return
-        if (System.currentTimeMillis() > startTime + waitTimeMs)
-          fail(msg)
-        Thread.sleep(waitTimeMs.min(pause))
-      }
-      catch {
-        case e: RetriableException if retry < maxRetries =>
-          debug("Retrying after error", e)
-          retry += 1
-        case e : Throwable => throw e
-      }
+      if (condition())
+        return
+      if (System.currentTimeMillis() > startTime + waitTimeMs)
+        fail(msg)
+      Thread.sleep(waitTimeMs.min(pause))
     }
+
     // should never hit here
     throw new RuntimeException("unexpected error")
   }