You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/08/01 18:31:29 UTC

[kafka] branch trunk updated: KAFKA-8456: Stabilize flaky StoreUpgradeIntegrationTest (#6941)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 06e246d  KAFKA-8456: Stabilize flaky StoreUpgradeIntegrationTest (#6941)
06e246d is described below

commit 06e246d4a11c63d1b46516442de81f370f89a0ee
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Thu Aug 1 11:31:00 2019 -0700

    KAFKA-8456: Stabilize flaky StoreUpgradeIntegrationTest (#6941)
    
    Reviewers: Boyang Chen <bo...@confluent.io>, Guozhang Wang <gu...@confluent.io>, Bill Bejeck <bi...@confluent.io>, A. Sophie Blee-Goldman <so...@confluent.io>
---
 .../integration/StoreUpgradeIntegrationTest.java   | 273 +++++++++++----------
 1 file changed, 150 insertions(+), 123 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
index ba5b08f..9a82ad9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
@@ -329,56 +329,65 @@ public class StoreUpgradeIntegrationTest {
                 IntegerSerializer.class),
             CLUSTER.time);
 
-        TestUtils.waitForCondition(() -> {
-            try {
-                final ReadOnlyKeyValueStore<K, V> store =
-                    kafkaStreams.store(STORE_NAME, QueryableStoreTypes.keyValueStore());
-                try (final KeyValueIterator<K, V> all = store.all()) {
-                    final List<KeyValue<K, V>> storeContent = new LinkedList<>();
-                    while (all.hasNext()) {
-                        storeContent.add(all.next());
+        TestUtils.waitForCondition(
+            () -> {
+                try {
+                    final ReadOnlyKeyValueStore<K, V> store =
+                        kafkaStreams.store(STORE_NAME, QueryableStoreTypes.keyValueStore());
+                    try (final KeyValueIterator<K, V> all = store.all()) {
+                        final List<KeyValue<K, V>> storeContent = new LinkedList<>();
+                        while (all.hasNext()) {
+                            storeContent.add(all.next());
+                        }
+                        return storeContent.equals(expectedStoreContent);
                     }
-                    return storeContent.equals(expectedStoreContent);
+                } catch (final Exception swallow) {
+                    swallow.printStackTrace();
+                    System.err.println(swallow.getMessage());
+                    return false;
                 }
-            } catch (final Exception swallow) {
-                swallow.printStackTrace();
-                System.err.println(swallow.getMessage());
-                return false;
-            }
-        }, "Could not get expected result in time.");
+            },
+            60_000L,
+            "Could not get expected result in time.");
     }
 
     private <K> void verifyCountWithTimestamp(final K key,
                                               final long value,
                                               final long timestamp) throws Exception {
-        TestUtils.waitForCondition(() -> {
-            try {
-                final ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> store =
-                    kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore());
-                final ValueAndTimestamp<Long> count = store.get(key);
-                return count.value() == value && count.timestamp() == timestamp;
-            } catch (final Exception swallow) {
-                swallow.printStackTrace();
-                System.err.println(swallow.getMessage());
-                return false;
-            }
-        }, "Could not get expected result in time.");
+        TestUtils.waitForCondition(
+            () -> {
+                try {
+                    final ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> store =
+                        kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore());
+                    final ValueAndTimestamp<Long> count = store.get(key);
+                    return count.value() == value && count.timestamp() == timestamp;
+                } catch (final Exception swallow) {
+                    swallow.printStackTrace();
+                    System.err.println(swallow.getMessage());
+                    return false;
+                }
+            },
+            60_000L,
+            "Could not get expected result in time.");
     }
 
     private <K> void verifyCountWithSurrogateTimestamp(final K key,
                                                        final long value) throws Exception {
-        TestUtils.waitForCondition(() -> {
-            try {
-                final ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> store =
-                    kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore());
-                final ValueAndTimestamp<Long> count = store.get(key);
-                return count.value() == value && count.timestamp() == -1L;
-            } catch (final Exception swallow) {
-                swallow.printStackTrace();
-                System.err.println(swallow.getMessage());
-                return false;
-            }
-        }, "Could not get expected result in time.");
+        TestUtils.waitForCondition(
+            () -> {
+                try {
+                    final ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> store =
+                        kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore());
+                    final ValueAndTimestamp<Long> count = store.get(key);
+                    return count.value() == value && count.timestamp() == -1L;
+                } catch (final Exception swallow) {
+                    swallow.printStackTrace();
+                    System.err.println(swallow.getMessage());
+                    return false;
+                }
+            },
+            60_000L,
+            "Could not get expected result in time.");
     }
 
     private <K, V> void processKeyValueAndVerifyCount(final K key,
@@ -394,23 +403,26 @@ public class StoreUpgradeIntegrationTest {
                 IntegerSerializer.class),
             timestamp);
 
-        TestUtils.waitForCondition(() -> {
-            try {
-                final ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> store =
-                    kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore());
-                try (final KeyValueIterator<K, ValueAndTimestamp<V>> all = store.all()) {
-                    final List<KeyValue<K, ValueAndTimestamp<V>>> storeContent = new LinkedList<>();
-                    while (all.hasNext()) {
-                        storeContent.add(all.next());
+        TestUtils.waitForCondition(
+            () -> {
+                try {
+                    final ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> store =
+                        kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore());
+                    try (final KeyValueIterator<K, ValueAndTimestamp<V>> all = store.all()) {
+                        final List<KeyValue<K, ValueAndTimestamp<V>>> storeContent = new LinkedList<>();
+                        while (all.hasNext()) {
+                            storeContent.add(all.next());
+                        }
+                        return storeContent.equals(expectedStoreContent);
                     }
-                    return storeContent.equals(expectedStoreContent);
+                } catch (final Exception swallow) {
+                    swallow.printStackTrace();
+                    System.err.println(swallow.getMessage());
+                    return false;
                 }
-            } catch (final Exception swallow) {
-                swallow.printStackTrace();
-                System.err.println(swallow.getMessage());
-                return false;
-            }
-        }, "Could not get expected result in time.");
+            },
+            60_000L,
+            "Could not get expected result in time.");
     }
 
     private <K, V> void processKeyValueAndVerifyCountWithTimestamp(final K key,
@@ -426,23 +438,26 @@ public class StoreUpgradeIntegrationTest {
                 IntegerSerializer.class),
             timestamp);
 
-        TestUtils.waitForCondition(() -> {
-            try {
-                final ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> store =
-                    kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore());
-                try (final KeyValueIterator<K, ValueAndTimestamp<V>> all = store.all()) {
-                    final List<KeyValue<K, ValueAndTimestamp<V>>> storeContent = new LinkedList<>();
-                    while (all.hasNext()) {
-                        storeContent.add(all.next());
+        TestUtils.waitForCondition(
+            () -> {
+                try {
+                    final ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> store =
+                        kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore());
+                    try (final KeyValueIterator<K, ValueAndTimestamp<V>> all = store.all()) {
+                        final List<KeyValue<K, ValueAndTimestamp<V>>> storeContent = new LinkedList<>();
+                        while (all.hasNext()) {
+                            storeContent.add(all.next());
+                        }
+                        return storeContent.equals(expectedStoreContent);
                     }
-                    return storeContent.equals(expectedStoreContent);
+                } catch (final Exception swallow) {
+                    swallow.printStackTrace();
+                    System.err.println(swallow.getMessage());
+                    return false;
                 }
-            } catch (final Exception swallow) {
-                swallow.printStackTrace();
-                System.err.println(swallow.getMessage());
-                return false;
-            }
-        }, "Could not get expected result in time.");
+            },
+            60_000L,
+            "Could not get expected result in time.");
     }
 
     @Test
@@ -789,56 +804,65 @@ public class StoreUpgradeIntegrationTest {
                 IntegerSerializer.class),
             CLUSTER.time);
 
-        TestUtils.waitForCondition(() -> {
-            try {
-                final ReadOnlyWindowStore<K, V> store =
-                    kafkaStreams.store(STORE_NAME, QueryableStoreTypes.windowStore());
-                try (final KeyValueIterator<Windowed<K>, V> all = store.all()) {
-                    final List<KeyValue<Windowed<K>, V>> storeContent = new LinkedList<>();
-                    while (all.hasNext()) {
-                        storeContent.add(all.next());
+        TestUtils.waitForCondition(
+            () -> {
+                try {
+                    final ReadOnlyWindowStore<K, V> store =
+                        kafkaStreams.store(STORE_NAME, QueryableStoreTypes.windowStore());
+                    try (final KeyValueIterator<Windowed<K>, V> all = store.all()) {
+                        final List<KeyValue<Windowed<K>, V>> storeContent = new LinkedList<>();
+                        while (all.hasNext()) {
+                            storeContent.add(all.next());
+                        }
+                        return storeContent.equals(expectedStoreContent);
                     }
-                    return storeContent.equals(expectedStoreContent);
+                } catch (final Exception swallow) {
+                    swallow.printStackTrace();
+                    System.err.println(swallow.getMessage());
+                    return false;
                 }
-            } catch (final Exception swallow) {
-                swallow.printStackTrace();
-                System.err.println(swallow.getMessage());
-                return false;
-            }
-        }, "Could not get expected result in time.");
+            },
+            60_000L,
+            "Could not get expected result in time.");
     }
 
     private <K> void verifyWindowedCountWithSurrogateTimestamp(final Windowed<K> key,
                                                                final long value) throws Exception {
-        TestUtils.waitForCondition(() -> {
-            try {
-                final ReadOnlyWindowStore<K, ValueAndTimestamp<Long>> store =
-                    kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedWindowStore());
-                final ValueAndTimestamp<Long> count = store.fetch(key.key(), key.window().start());
-                return count.value() == value && count.timestamp() == -1L;
-            } catch (final Exception swallow) {
-                swallow.printStackTrace();
-                System.err.println(swallow.getMessage());
-                return false;
-            }
-        }, "Could not get expected result in time.");
+        TestUtils.waitForCondition(
+            () -> {
+                try {
+                    final ReadOnlyWindowStore<K, ValueAndTimestamp<Long>> store =
+                        kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedWindowStore());
+                    final ValueAndTimestamp<Long> count = store.fetch(key.key(), key.window().start());
+                    return count.value() == value && count.timestamp() == -1L;
+                } catch (final Exception swallow) {
+                    swallow.printStackTrace();
+                    System.err.println(swallow.getMessage());
+                    return false;
+                }
+            },
+            60_000L,
+            "Could not get expected result in time.");
     }
 
     private <K> void verifyWindowedCountWithTimestamp(final Windowed<K> key,
                                                       final long value,
                                                       final long timestamp) throws Exception {
-        TestUtils.waitForCondition(() -> {
-            try {
-                final ReadOnlyWindowStore<K, ValueAndTimestamp<Long>> store =
-                    kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedWindowStore());
-                final ValueAndTimestamp<Long> count = store.fetch(key.key(), key.window().start());
-                return count.value() == value && count.timestamp() == timestamp;
-            } catch (final Exception swallow) {
-                swallow.printStackTrace();
-                System.err.println(swallow.getMessage());
-                return false;
-            }
-        }, "Could not get expected result in time.");
+        TestUtils.waitForCondition(
+            () -> {
+                try {
+                    final ReadOnlyWindowStore<K, ValueAndTimestamp<Long>> store =
+                        kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedWindowStore());
+                    final ValueAndTimestamp<Long> count = store.fetch(key.key(), key.window().start());
+                    return count.value() == value && count.timestamp() == timestamp;
+                } catch (final Exception swallow) {
+                    swallow.printStackTrace();
+                    System.err.println(swallow.getMessage());
+                    return false;
+                }
+            },
+            60_000L,
+            "Could not get expected result in time.");
     }
 
     private <K, V> void processKeyValueAndVerifyWindowedCountWithTimestamp(final K key,
@@ -854,23 +878,26 @@ public class StoreUpgradeIntegrationTest {
                 IntegerSerializer.class),
             timestamp);
 
-        TestUtils.waitForCondition(() -> {
-            try {
-                final ReadOnlyWindowStore<K, ValueAndTimestamp<V>> store =
-                    kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedWindowStore());
-                try (final KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> all = store.all()) {
-                    final List<KeyValue<Windowed<K>, ValueAndTimestamp<V>>> storeContent = new LinkedList<>();
-                    while (all.hasNext()) {
-                        storeContent.add(all.next());
+        TestUtils.waitForCondition(
+            () -> {
+                try {
+                    final ReadOnlyWindowStore<K, ValueAndTimestamp<V>> store =
+                        kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedWindowStore());
+                    try (final KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> all = store.all()) {
+                        final List<KeyValue<Windowed<K>, ValueAndTimestamp<V>>> storeContent = new LinkedList<>();
+                        while (all.hasNext()) {
+                            storeContent.add(all.next());
+                        }
+                        return storeContent.equals(expectedStoreContent);
                     }
-                    return storeContent.equals(expectedStoreContent);
+                } catch (final Exception swallow) {
+                    swallow.printStackTrace();
+                    System.err.println(swallow.getMessage());
+                    return false;
                 }
-            } catch (final Exception swallow) {
-                swallow.printStackTrace();
-                System.err.println(swallow.getMessage());
-                return false;
-            }
-        }, "Could not get expected result in time.");
+            },
+            60_000L,
+            "Could not get expected result in time.");
     }
 
     private static class KeyValueProcessor implements Processor<Integer, Integer> {