You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/08/01 18:31:29 UTC
[kafka] branch trunk updated: KAFKA-8456: Stabilize flaky
StoreUpgradeIntegrationTest (#6941)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 06e246d KAFKA-8456: Stabilize flaky StoreUpgradeIntegrationTest (#6941)
06e246d is described below
commit 06e246d4a11c63d1b46516442de81f370f89a0ee
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Thu Aug 1 11:31:00 2019 -0700
KAFKA-8456: Stabilize flaky StoreUpgradeIntegrationTest (#6941)
Reviewers: Boyang Chen <bo...@confluent.io>, Guozhang Wang <gu...@confluent.io>, Bill Bejeck <bi...@confluent.io>, A. Sophie Blee-Goldman <so...@confluent.io>
---
.../integration/StoreUpgradeIntegrationTest.java | 273 +++++++++++----------
1 file changed, 150 insertions(+), 123 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
index ba5b08f..9a82ad9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
@@ -329,56 +329,65 @@ public class StoreUpgradeIntegrationTest {
IntegerSerializer.class),
CLUSTER.time);
- TestUtils.waitForCondition(() -> {
- try {
- final ReadOnlyKeyValueStore<K, V> store =
- kafkaStreams.store(STORE_NAME, QueryableStoreTypes.keyValueStore());
- try (final KeyValueIterator<K, V> all = store.all()) {
- final List<KeyValue<K, V>> storeContent = new LinkedList<>();
- while (all.hasNext()) {
- storeContent.add(all.next());
+ TestUtils.waitForCondition(
+ () -> {
+ try {
+ final ReadOnlyKeyValueStore<K, V> store =
+ kafkaStreams.store(STORE_NAME, QueryableStoreTypes.keyValueStore());
+ try (final KeyValueIterator<K, V> all = store.all()) {
+ final List<KeyValue<K, V>> storeContent = new LinkedList<>();
+ while (all.hasNext()) {
+ storeContent.add(all.next());
+ }
+ return storeContent.equals(expectedStoreContent);
}
- return storeContent.equals(expectedStoreContent);
+ } catch (final Exception swallow) {
+ swallow.printStackTrace();
+ System.err.println(swallow.getMessage());
+ return false;
}
- } catch (final Exception swallow) {
- swallow.printStackTrace();
- System.err.println(swallow.getMessage());
- return false;
- }
- }, "Could not get expected result in time.");
+ },
+ 60_000L,
+ "Could not get expected result in time.");
}
private <K> void verifyCountWithTimestamp(final K key,
final long value,
final long timestamp) throws Exception {
- TestUtils.waitForCondition(() -> {
- try {
- final ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> store =
- kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore());
- final ValueAndTimestamp<Long> count = store.get(key);
- return count.value() == value && count.timestamp() == timestamp;
- } catch (final Exception swallow) {
- swallow.printStackTrace();
- System.err.println(swallow.getMessage());
- return false;
- }
- }, "Could not get expected result in time.");
+ TestUtils.waitForCondition(
+ () -> {
+ try {
+ final ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> store =
+ kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore());
+ final ValueAndTimestamp<Long> count = store.get(key);
+ return count.value() == value && count.timestamp() == timestamp;
+ } catch (final Exception swallow) {
+ swallow.printStackTrace();
+ System.err.println(swallow.getMessage());
+ return false;
+ }
+ },
+ 60_000L,
+ "Could not get expected result in time.");
}
private <K> void verifyCountWithSurrogateTimestamp(final K key,
final long value) throws Exception {
- TestUtils.waitForCondition(() -> {
- try {
- final ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> store =
- kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore());
- final ValueAndTimestamp<Long> count = store.get(key);
- return count.value() == value && count.timestamp() == -1L;
- } catch (final Exception swallow) {
- swallow.printStackTrace();
- System.err.println(swallow.getMessage());
- return false;
- }
- }, "Could not get expected result in time.");
+ TestUtils.waitForCondition(
+ () -> {
+ try {
+ final ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> store =
+ kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore());
+ final ValueAndTimestamp<Long> count = store.get(key);
+ return count.value() == value && count.timestamp() == -1L;
+ } catch (final Exception swallow) {
+ swallow.printStackTrace();
+ System.err.println(swallow.getMessage());
+ return false;
+ }
+ },
+ 60_000L,
+ "Could not get expected result in time.");
}
private <K, V> void processKeyValueAndVerifyCount(final K key,
@@ -394,23 +403,26 @@ public class StoreUpgradeIntegrationTest {
IntegerSerializer.class),
timestamp);
- TestUtils.waitForCondition(() -> {
- try {
- final ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> store =
- kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore());
- try (final KeyValueIterator<K, ValueAndTimestamp<V>> all = store.all()) {
- final List<KeyValue<K, ValueAndTimestamp<V>>> storeContent = new LinkedList<>();
- while (all.hasNext()) {
- storeContent.add(all.next());
+ TestUtils.waitForCondition(
+ () -> {
+ try {
+ final ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> store =
+ kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore());
+ try (final KeyValueIterator<K, ValueAndTimestamp<V>> all = store.all()) {
+ final List<KeyValue<K, ValueAndTimestamp<V>>> storeContent = new LinkedList<>();
+ while (all.hasNext()) {
+ storeContent.add(all.next());
+ }
+ return storeContent.equals(expectedStoreContent);
}
- return storeContent.equals(expectedStoreContent);
+ } catch (final Exception swallow) {
+ swallow.printStackTrace();
+ System.err.println(swallow.getMessage());
+ return false;
}
- } catch (final Exception swallow) {
- swallow.printStackTrace();
- System.err.println(swallow.getMessage());
- return false;
- }
- }, "Could not get expected result in time.");
+ },
+ 60_000L,
+ "Could not get expected result in time.");
}
private <K, V> void processKeyValueAndVerifyCountWithTimestamp(final K key,
@@ -426,23 +438,26 @@ public class StoreUpgradeIntegrationTest {
IntegerSerializer.class),
timestamp);
- TestUtils.waitForCondition(() -> {
- try {
- final ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> store =
- kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore());
- try (final KeyValueIterator<K, ValueAndTimestamp<V>> all = store.all()) {
- final List<KeyValue<K, ValueAndTimestamp<V>>> storeContent = new LinkedList<>();
- while (all.hasNext()) {
- storeContent.add(all.next());
+ TestUtils.waitForCondition(
+ () -> {
+ try {
+ final ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> store =
+ kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore());
+ try (final KeyValueIterator<K, ValueAndTimestamp<V>> all = store.all()) {
+ final List<KeyValue<K, ValueAndTimestamp<V>>> storeContent = new LinkedList<>();
+ while (all.hasNext()) {
+ storeContent.add(all.next());
+ }
+ return storeContent.equals(expectedStoreContent);
}
- return storeContent.equals(expectedStoreContent);
+ } catch (final Exception swallow) {
+ swallow.printStackTrace();
+ System.err.println(swallow.getMessage());
+ return false;
}
- } catch (final Exception swallow) {
- swallow.printStackTrace();
- System.err.println(swallow.getMessage());
- return false;
- }
- }, "Could not get expected result in time.");
+ },
+ 60_000L,
+ "Could not get expected result in time.");
}
@Test
@@ -789,56 +804,65 @@ public class StoreUpgradeIntegrationTest {
IntegerSerializer.class),
CLUSTER.time);
- TestUtils.waitForCondition(() -> {
- try {
- final ReadOnlyWindowStore<K, V> store =
- kafkaStreams.store(STORE_NAME, QueryableStoreTypes.windowStore());
- try (final KeyValueIterator<Windowed<K>, V> all = store.all()) {
- final List<KeyValue<Windowed<K>, V>> storeContent = new LinkedList<>();
- while (all.hasNext()) {
- storeContent.add(all.next());
+ TestUtils.waitForCondition(
+ () -> {
+ try {
+ final ReadOnlyWindowStore<K, V> store =
+ kafkaStreams.store(STORE_NAME, QueryableStoreTypes.windowStore());
+ try (final KeyValueIterator<Windowed<K>, V> all = store.all()) {
+ final List<KeyValue<Windowed<K>, V>> storeContent = new LinkedList<>();
+ while (all.hasNext()) {
+ storeContent.add(all.next());
+ }
+ return storeContent.equals(expectedStoreContent);
}
- return storeContent.equals(expectedStoreContent);
+ } catch (final Exception swallow) {
+ swallow.printStackTrace();
+ System.err.println(swallow.getMessage());
+ return false;
}
- } catch (final Exception swallow) {
- swallow.printStackTrace();
- System.err.println(swallow.getMessage());
- return false;
- }
- }, "Could not get expected result in time.");
+ },
+ 60_000L,
+ "Could not get expected result in time.");
}
private <K> void verifyWindowedCountWithSurrogateTimestamp(final Windowed<K> key,
final long value) throws Exception {
- TestUtils.waitForCondition(() -> {
- try {
- final ReadOnlyWindowStore<K, ValueAndTimestamp<Long>> store =
- kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedWindowStore());
- final ValueAndTimestamp<Long> count = store.fetch(key.key(), key.window().start());
- return count.value() == value && count.timestamp() == -1L;
- } catch (final Exception swallow) {
- swallow.printStackTrace();
- System.err.println(swallow.getMessage());
- return false;
- }
- }, "Could not get expected result in time.");
+ TestUtils.waitForCondition(
+ () -> {
+ try {
+ final ReadOnlyWindowStore<K, ValueAndTimestamp<Long>> store =
+ kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedWindowStore());
+ final ValueAndTimestamp<Long> count = store.fetch(key.key(), key.window().start());
+ return count.value() == value && count.timestamp() == -1L;
+ } catch (final Exception swallow) {
+ swallow.printStackTrace();
+ System.err.println(swallow.getMessage());
+ return false;
+ }
+ },
+ 60_000L,
+ "Could not get expected result in time.");
}
private <K> void verifyWindowedCountWithTimestamp(final Windowed<K> key,
final long value,
final long timestamp) throws Exception {
- TestUtils.waitForCondition(() -> {
- try {
- final ReadOnlyWindowStore<K, ValueAndTimestamp<Long>> store =
- kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedWindowStore());
- final ValueAndTimestamp<Long> count = store.fetch(key.key(), key.window().start());
- return count.value() == value && count.timestamp() == timestamp;
- } catch (final Exception swallow) {
- swallow.printStackTrace();
- System.err.println(swallow.getMessage());
- return false;
- }
- }, "Could not get expected result in time.");
+ TestUtils.waitForCondition(
+ () -> {
+ try {
+ final ReadOnlyWindowStore<K, ValueAndTimestamp<Long>> store =
+ kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedWindowStore());
+ final ValueAndTimestamp<Long> count = store.fetch(key.key(), key.window().start());
+ return count.value() == value && count.timestamp() == timestamp;
+ } catch (final Exception swallow) {
+ swallow.printStackTrace();
+ System.err.println(swallow.getMessage());
+ return false;
+ }
+ },
+ 60_000L,
+ "Could not get expected result in time.");
}
private <K, V> void processKeyValueAndVerifyWindowedCountWithTimestamp(final K key,
@@ -854,23 +878,26 @@ public class StoreUpgradeIntegrationTest {
IntegerSerializer.class),
timestamp);
- TestUtils.waitForCondition(() -> {
- try {
- final ReadOnlyWindowStore<K, ValueAndTimestamp<V>> store =
- kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedWindowStore());
- try (final KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> all = store.all()) {
- final List<KeyValue<Windowed<K>, ValueAndTimestamp<V>>> storeContent = new LinkedList<>();
- while (all.hasNext()) {
- storeContent.add(all.next());
+ TestUtils.waitForCondition(
+ () -> {
+ try {
+ final ReadOnlyWindowStore<K, ValueAndTimestamp<V>> store =
+ kafkaStreams.store(STORE_NAME, QueryableStoreTypes.timestampedWindowStore());
+ try (final KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> all = store.all()) {
+ final List<KeyValue<Windowed<K>, ValueAndTimestamp<V>>> storeContent = new LinkedList<>();
+ while (all.hasNext()) {
+ storeContent.add(all.next());
+ }
+ return storeContent.equals(expectedStoreContent);
}
- return storeContent.equals(expectedStoreContent);
+ } catch (final Exception swallow) {
+ swallow.printStackTrace();
+ System.err.println(swallow.getMessage());
+ return false;
}
- } catch (final Exception swallow) {
- swallow.printStackTrace();
- System.err.println(swallow.getMessage());
- return false;
- }
- }, "Could not get expected result in time.");
+ },
+ 60_000L,
+ "Could not get expected result in time.");
}
private static class KeyValueProcessor implements Processor<Integer, Integer> {