You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2021/08/28 03:13:35 UTC

[kafka] branch trunk updated: KAFKA-13128: extract retry checker and update with retriable exception causing flaky StoreQueryIntegrationTest (#11275)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 49aed78  KAFKA-13128: extract retry checker and update with retriable exception causing flaky StoreQueryIntegrationTest (#11275)
49aed78 is described below

commit 49aed781d8b398975e8952ef21578c05114064ed
Author: Walker Carlson <18...@users.noreply.github.com>
AuthorDate: Fri Aug 27 22:12:12 2021 -0500

    KAFKA-13128: extract retry checker and update with retriable exception causing flaky StoreQueryIntegrationTest (#11275)
    
    Add a new case to the list of possible retriable exceptions for the flaky tests to take care of threads starting up
    
    Reviewers: Leah Thomas <lt...@confluent.io>, Anna Sophie Blee-Goldman
---
 .../integration/StoreQueryIntegrationTest.java     | 58 +++++++---------------
 1 file changed, 18 insertions(+), 40 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
index c4a236b..76916c9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
@@ -148,16 +148,7 @@ public class StoreQueryIntegrationTest {
                 }
                 return true;
             } catch (final InvalidStateStoreException exception) {
-                assertThat(
-                        "Unexpected exception thrown while getting the value from store.",
-                        exception.getMessage(),
-                        is(
-                                anyOf(
-                                        containsString("Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING"),
-                                        containsString("The state store, source-table, may have migrated to another instance")
-                                )
-                        )
-                );
+                verifyRetrievableException(exception);
                 LOG.info("Either streams wasn't running or a re-balancing took place. Will try again.");
                 return false;
             }
@@ -241,16 +232,7 @@ public class StoreQueryIntegrationTest {
                 }
                 return true;
             } catch (final InvalidStateStoreException exception) {
-                assertThat(
-                    "Unexpected exception thrown while getting the value from store.",
-                    exception.getMessage(),
-                    is(
-                        anyOf(
-                            containsString("Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING"),
-                            containsString("The state store, source-table, may have migrated to another instance")
-                        )
-                    )
-                );
+                verifyRetrievableException(exception);
                 LOG.info("Either streams wasn't running or a re-balancing took place. Will try again.");
                 return false;
             }
@@ -513,16 +495,7 @@ public class StoreQueryIntegrationTest {
                 assertThat(store1.get(key3), is(notNullValue()));
                 return true;
             } catch (final InvalidStateStoreException exception) {
-                assertThat(
-                    "Unexpected exception thrown while getting the value from store.",
-                    exception.getMessage(),
-                    is(
-                        anyOf(
-                            containsString("Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING"),
-                            containsString("The state store, source-table, may have migrated to another instance")
-                        )
-                    )
-                );
+                verifyRetrievableException(exception);
                 LOG.info("Either streams wasn't running or a re-balancing took place. Will try again.");
                 return false;
             }
@@ -543,22 +516,27 @@ public class StoreQueryIntegrationTest {
                 assertThat(store1.get(key3), is(notNullValue()));
                 return true;
             } catch (final InvalidStateStoreException exception) {
-                assertThat(
-                    "Unexpected exception thrown while getting the value from store.",
-                    exception.getMessage(),
-                    is(
-                        anyOf(
-                            containsString("Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING"),
-                            containsString("The state store, source-table, may have migrated to another instance")
-                        )
-                    )
-                );
+                verifyRetrievableException(exception);
                 LOG.info("Either streams wasn't running or a re-balancing took place. Will try again.");
                 return false;
             }
         });
     }
 
+    private void verifyRetrievableException(final Exception exception) {
+        assertThat(
+            "Unexpected exception thrown while getting the value from store.",
+            exception.getMessage(),
+            is(
+                anyOf(
+                    containsString("Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING"),
+                    containsString("The state store, source-table, may have migrated to another instance"),
+                    containsString("Cannot get state store source-table because the stream thread is STARTING, not RUNNING")
+                )
+            )
+        );
+    }
+
     private static void until(final TestCondition condition) {
         boolean success = false;
         final long deadline = System.currentTimeMillis() + IntegrationTestUtils.DEFAULT_TIMEOUT;