You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/04/28 04:59:02 UTC

[GitHub] [kafka] guozhangwang opened a new pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

guozhangwang opened a new pull request #8568:
URL: https://github.com/apache/kafka/pull/8568


   This PR fixes and improves two major issues:
   
   1. When calling `KafkaStreams#store` we can always get an InvalidStateStoreException, and even waiting for Streams state to become RUNNING is not sufficient (this is also how OptimizedKTableIntegrationTest failed). So I wrapped all the function with a Util wrapper that captures and retries on that exception.
   
   2. While trouble-shooting this issue, I also realized a potential bug in test-util's `produceKeyValuesSynchronously`, which creates a new producer for each of the record to send in that batch --- i.e. if you are sending N records with a single call, within that call it will create N producers used to send one record each, which is very slow and costly.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#discussion_r416942992



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##########
@@ -810,21 +808,9 @@ private void writeInputData(final List<KeyValue<Long, Long>> records) throws Exc
     }
 
     private void verifyStateStore(final KafkaStreams streams,
-                                  final Set<KeyValue<Long, Long>> expectedStoreContent) {
-        ReadOnlyKeyValueStore<Long, Long> store = null;
-
-        final long maxWaitingTime = System.currentTimeMillis() + 300000L;
-        while (System.currentTimeMillis() < maxWaitingTime) {
-            try {
-                store = streams.store(StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore()));
-                break;
-            } catch (final InvalidStateStoreException okJustRetry) {
-                try {
-                    Thread.sleep(5000L);
-                } catch (final Exception ignore) { }
-            }
-        }
-
+                                  final Set<KeyValue<Long, Long>> expectedStoreContent) throws InterruptedException {
+        final ReadOnlyKeyValueStore<Long, Long> store = IntegrationTestUtils
+            .getStore(300000L, storeName, streams, QueryableStoreTypes.keyValueStore());

Review comment:
       Ack.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#discussion_r416910725



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
##########
@@ -158,8 +158,9 @@ public void shouldKStreamGlobalKTableLeftJoin() throws Exception {
 
         produceGlobalTableValues();
 
-        final ReadOnlyKeyValueStore<Long, String> replicatedStore =
-            kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore()));
+        final ReadOnlyKeyValueStore<Long, String> replicatedStore = IntegrationTestUtils
+            .getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore());
+        assertNotNull(replicatedStore);

Review comment:
       Since previously we would just throw the exception with the un-wrapped call, here asserting it is not null is equal to make sure that the store is indeed returned.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#discussion_r416809469



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
##########
@@ -163,8 +159,10 @@ public void shouldApplyUpdatesToStandbyStore() throws Exception {
         // Assert that all messages in the second batch were processed in a timely manner
         assertThat(semaphore.tryAcquire(batch2NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));

Review comment:
       `OptimizedKTableIntegrationTest.shouldApplyUpdatesToStandbyStore` still failed on [one of the builds](https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/149/testReport/junit/org.apache.kafka.streams.integration/OptimizedKTableIntegrationTest/shouldApplyUpdatesToStandbyStore/) at this line :/
   But, at least we got farther into the test before it failed so I'd say this is still an improvement 😄 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#issuecomment-620879654


   Merged to trunk.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#issuecomment-620847306


   I looked at the three failed tests:
   
   * `OptimizedKTableIntegrationTest.shouldApplyUpdatesToStandbyStore` is actually due to the issue that https://github.com/apache/kafka/pull/8548 tried to fix. Waiting for @vvcephei to review 8548
   * `EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState[exactly_once_beta]` is being looked at by @mjsax as KAFKA-9831
   * `GlobalKTableEOSIntegrationTest.shouldKStreamGlobalKTableLeftJoin[exactly_once_beta]` is a new issue, I created KAFKA-9928 for this, and my gut feeling is that it has the same root cause as KAFKA-9831. (also cc @mjsax )
   
   So I think this PR is good to be merged.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#discussion_r416921971



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##########
@@ -810,21 +808,9 @@ private void writeInputData(final List<KeyValue<Long, Long>> records) throws Exc
     }
 
     private void verifyStateStore(final KafkaStreams streams,
-                                  final Set<KeyValue<Long, Long>> expectedStoreContent) {
-        ReadOnlyKeyValueStore<Long, Long> store = null;
-
-        final long maxWaitingTime = System.currentTimeMillis() + 300000L;
-        while (System.currentTimeMillis() < maxWaitingTime) {
-            try {
-                store = streams.store(StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore()));
-                break;
-            } catch (final InvalidStateStoreException okJustRetry) {
-                try {
-                    Thread.sleep(5000L);
-                } catch (final Exception ignore) { }
-            }
-        }
-
+                                  final Set<KeyValue<Long, Long>> expectedStoreContent) throws InterruptedException {
+        final ReadOnlyKeyValueStore<Long, Long> store = IntegrationTestUtils
+            .getStore(300000L, storeName, streams, QueryableStoreTypes.keyValueStore());

Review comment:
       ```suggestion
               .getStore(300_000L, storeName, streams, QueryableStoreTypes.keyValueStore());
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
##########
@@ -337,8 +336,11 @@ public void shouldProxyKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws
         TestUtils.waitForCondition(
             () -> {
                 try {
-                    final ReadOnlyKeyValueStore<K, V> store =
-                        kafkaStreams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
+                    final ReadOnlyKeyValueStore<K, V> store = IntegrationTestUtils.getStore(STORE_NAME, kafkaStreams, QueryableStoreTypes.keyValueStore());
+
+                    if (store == null)
+                        return false;

Review comment:
       not a huge deal, but technically, these should have brackets.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##########
@@ -599,13 +595,6 @@ public static void waitForCompletion(final KafkaStreams streams,
         return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, waitTime, false);
     }
 
-    public static <K, V> List<KeyValueTimestamp<K, V>> waitUntilFinalKeyValueTimestampRecordsReceived(final Properties consumerConfig,

Review comment:
       thanks for the cleanup

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##########
@@ -269,24 +271,20 @@ public static void cleanStateAfterTest(final EmbeddedKafkaCluster cluster, final
      * @param <K>                 Key type of the data records
      * @param <V>                 Value type of the data records
      */
-    @SuppressWarnings("WeakerAccess")
     public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final String topic,
                                                                          final Collection<KeyValue<K, V>> records,
                                                                          final Properties producerConfig,
                                                                          final Headers headers,
                                                                          final Long timestamp,
-                                                                         final boolean enableTransactions)
-            throws ExecutionException, InterruptedException {
+                                                                         final boolean enableTransactions) {
 
         try (final Producer<K, V> producer = new KafkaProducer<>(producerConfig)) {
             if (enableTransactions) {
                 producer.initTransactions();
                 producer.beginTransaction();
             }
             for (final KeyValue<K, V> record : records) {
-                final Future<RecordMetadata> f = producer.send(
-                    new ProducerRecord<>(topic, null, timestamp, record.key, record.value, headers));
-                f.get();
+                producer.send(new ProducerRecord<>(topic, null, timestamp, record.key, record.value, headers));

Review comment:
       I guess the flush at the end makes it synchronous anyway?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#discussion_r416944392



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##########
@@ -269,24 +271,20 @@ public static void cleanStateAfterTest(final EmbeddedKafkaCluster cluster, final
      * @param <K>                 Key type of the data records
      * @param <V>                 Value type of the data records
      */
-    @SuppressWarnings("WeakerAccess")
     public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final String topic,
                                                                          final Collection<KeyValue<K, V>> records,
                                                                          final Properties producerConfig,
                                                                          final Headers headers,
                                                                          final Long timestamp,
-                                                                         final boolean enableTransactions)
-            throws ExecutionException, InterruptedException {
+                                                                         final boolean enableTransactions) {
 
         try (final Producer<K, V> producer = new KafkaProducer<>(producerConfig)) {
             if (enableTransactions) {
                 producer.initTransactions();
                 producer.beginTransaction();
             }
             for (final KeyValue<K, V> record : records) {
-                final Future<RecordMetadata> f = producer.send(
-                    new ProducerRecord<>(topic, null, timestamp, record.key, record.value, headers));
-                f.get();
+                producer.send(new ProducerRecord<>(topic, null, timestamp, record.key, record.value, headers));

Review comment:
       Previously we wait after sending each record, here we only wait once after sending all records, so it is more efficient.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#discussion_r416947800



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##########
@@ -269,24 +271,20 @@ public static void cleanStateAfterTest(final EmbeddedKafkaCluster cluster, final
      * @param <K>                 Key type of the data records
      * @param <V>                 Value type of the data records
      */
-    @SuppressWarnings("WeakerAccess")
     public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final String topic,
                                                                          final Collection<KeyValue<K, V>> records,
                                                                          final Properties producerConfig,
                                                                          final Headers headers,
                                                                          final Long timestamp,
-                                                                         final boolean enableTransactions)
-            throws ExecutionException, InterruptedException {
+                                                                         final boolean enableTransactions) {
 
         try (final Producer<K, V> producer = new KafkaProducer<>(producerConfig)) {
             if (enableTransactions) {
                 producer.initTransactions();
                 producer.beginTransaction();
             }
             for (final KeyValue<K, V> record : records) {
-                final Future<RecordMetadata> f = producer.send(
-                    new ProducerRecord<>(topic, null, timestamp, record.key, record.value, headers));
-                f.get();
+                producer.send(new ProducerRecord<>(topic, null, timestamp, record.key, record.value, headers));

Review comment:
       Thanks. That's what I was asking for confirmation on. I realize now the structure of my sentence was ambiguous.
   
   I agree that the method contract is that the batch should be synchronously produced, not that each record should be synchronously produced, so this change looks good to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#discussion_r416943907



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
##########
@@ -337,8 +336,11 @@ public void shouldProxyKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws
         TestUtils.waitForCondition(
             () -> {
                 try {
-                    final ReadOnlyKeyValueStore<K, V> store =
-                        kafkaStreams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
+                    final ReadOnlyKeyValueStore<K, V> store = IntegrationTestUtils.getStore(STORE_NAME, kafkaStreams, QueryableStoreTypes.keyValueStore());
+
+                    if (store == null)
+                        return false;

Review comment:
       ack.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#discussion_r416814163



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
##########
@@ -158,8 +158,9 @@ public void shouldKStreamGlobalKTableLeftJoin() throws Exception {
 
         produceGlobalTableValues();
 
-        final ReadOnlyKeyValueStore<Long, String> replicatedStore =
-            kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore()));
+        final ReadOnlyKeyValueStore<Long, String> replicatedStore = IntegrationTestUtils
+            .getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore());
+        assertNotNull(replicatedStore);

Review comment:
       Why do we have to check for null now?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#discussion_r416327335



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##########
@@ -210,16 +210,21 @@ public static void cleanStateAfterTest(final EmbeddedKafkaCluster cluster, final
                                                             final Properties producerConfig,
                                                             final Headers headers,
                                                             final Time time,
-                                                            final boolean enableTransactions)
-        throws ExecutionException, InterruptedException {
-        for (final KeyValue<K, V> record : records) {
-            produceKeyValuesSynchronouslyWithTimestamp(topic,
-                Collections.singleton(record),
-                producerConfig,
-                headers,
-                time.milliseconds(),
-                enableTransactions);
-            time.sleep(1L);
+                                                            final boolean enableTransactions) {
+
+        try (final Producer<K, V> producer = new KafkaProducer<>(producerConfig)) {

Review comment:
       This is fix 2).

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##########
@@ -599,13 +595,6 @@ public static void waitForCompletion(final KafkaStreams streams,
         return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, waitTime, false);
     }
 
-    public static <K, V> List<KeyValueTimestamp<K, V>> waitUntilFinalKeyValueTimestampRecordsReceived(final Properties consumerConfig,

Review comment:
       Those functions are not used anywhere, ditto below.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
##########
@@ -163,8 +159,10 @@ public void shouldApplyUpdatesToStandbyStore() throws Exception {
         // Assert that all messages in the second batch were processed in a timely manner
         assertThat(semaphore.tryAcquire(batch2NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
 
-        // Assert that the current value in store reflects all messages being processed
-        assertThat(newActiveStore.get(key), is(equalTo(totalNumMessages - 1)));
+        TestUtils.retryOnExceptionWithTimeout(100, 60 * 1000, () -> {

Review comment:
       This is a minor fix, that we should retry this condition.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
##########
@@ -227,10 +225,11 @@ private Properties streamsConfiguration() {
         config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+        config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

Review comment:
       This is a fix to the test itself: with caching the records are delayed sending to the sink topics.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org