You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2021/08/28 03:13:35 UTC
[kafka] branch trunk updated: KAFKA-13128: extract retry checker
and update with retriable exception causing flaky StoreQueryIntegrationTest
(#11275)
This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 49aed78 KAFKA-13128: extract retry checker and update with retriable exception causing flaky StoreQueryIntegrationTest (#11275)
49aed78 is described below
commit 49aed781d8b398975e8952ef21578c05114064ed
Author: Walker Carlson <18...@users.noreply.github.com>
AuthorDate: Fri Aug 27 22:12:12 2021 -0500
KAFKA-13128: extract retry checker and update with retriable exception causing flaky StoreQueryIntegrationTest (#11275)
Add a new case to the list of possible retriable exceptions for the flaky tests to take care of threads starting up
Reviewers: Leah Thomas <lt...@confluent.io>, Anna Sophie Blee-Goldman
---
.../integration/StoreQueryIntegrationTest.java | 58 +++++++---------------
1 file changed, 18 insertions(+), 40 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
index c4a236b..76916c9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
@@ -148,16 +148,7 @@ public class StoreQueryIntegrationTest {
}
return true;
} catch (final InvalidStateStoreException exception) {
- assertThat(
- "Unexpected exception thrown while getting the value from store.",
- exception.getMessage(),
- is(
- anyOf(
- containsString("Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING"),
- containsString("The state store, source-table, may have migrated to another instance")
- )
- )
- );
+ verifyRetrievableException(exception);
LOG.info("Either streams wasn't running or a re-balancing took place. Will try again.");
return false;
}
@@ -241,16 +232,7 @@ public class StoreQueryIntegrationTest {
}
return true;
} catch (final InvalidStateStoreException exception) {
- assertThat(
- "Unexpected exception thrown while getting the value from store.",
- exception.getMessage(),
- is(
- anyOf(
- containsString("Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING"),
- containsString("The state store, source-table, may have migrated to another instance")
- )
- )
- );
+ verifyRetrievableException(exception);
LOG.info("Either streams wasn't running or a re-balancing took place. Will try again.");
return false;
}
@@ -513,16 +495,7 @@ public class StoreQueryIntegrationTest {
assertThat(store1.get(key3), is(notNullValue()));
return true;
} catch (final InvalidStateStoreException exception) {
- assertThat(
- "Unexpected exception thrown while getting the value from store.",
- exception.getMessage(),
- is(
- anyOf(
- containsString("Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING"),
- containsString("The state store, source-table, may have migrated to another instance")
- )
- )
- );
+ verifyRetrievableException(exception);
LOG.info("Either streams wasn't running or a re-balancing took place. Will try again.");
return false;
}
@@ -543,22 +516,27 @@ public class StoreQueryIntegrationTest {
assertThat(store1.get(key3), is(notNullValue()));
return true;
} catch (final InvalidStateStoreException exception) {
- assertThat(
- "Unexpected exception thrown while getting the value from store.",
- exception.getMessage(),
- is(
- anyOf(
- containsString("Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING"),
- containsString("The state store, source-table, may have migrated to another instance")
- )
- )
- );
+ verifyRetrievableException(exception);
LOG.info("Either streams wasn't running or a re-balancing took place. Will try again.");
return false;
}
});
}
+ private void verifyRetrievableException(final Exception exception) {
+ assertThat(
+ "Unexpected exception thrown while getting the value from store.",
+ exception.getMessage(),
+ is(
+ anyOf(
+ containsString("Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING"),
+ containsString("The state store, source-table, may have migrated to another instance"),
+ containsString("Cannot get state store source-table because the stream thread is STARTING, not RUNNING")
+ )
+ )
+ );
+ }
+
private static void until(final TestCondition condition) {
boolean success = false;
final long deadline = System.currentTimeMillis() + IntegrationTestUtils.DEFAULT_TIMEOUT;