You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/04/28 05:01:38 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

guozhangwang commented on a change in pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#discussion_r416327335



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##########
@@ -210,16 +210,21 @@ public static void cleanStateAfterTest(final EmbeddedKafkaCluster cluster, final
                                                             final Properties producerConfig,
                                                             final Headers headers,
                                                             final Time time,
-                                                            final boolean enableTransactions)
-        throws ExecutionException, InterruptedException {
-        for (final KeyValue<K, V> record : records) {
-            produceKeyValuesSynchronouslyWithTimestamp(topic,
-                Collections.singleton(record),
-                producerConfig,
-                headers,
-                time.milliseconds(),
-                enableTransactions);
-            time.sleep(1L);
+                                                            final boolean enableTransactions) {
+
+        try (final Producer<K, V> producer = new KafkaProducer<>(producerConfig)) {

Review comment:
       This is fix 2).

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##########
@@ -599,13 +595,6 @@ public static void waitForCompletion(final KafkaStreams streams,
         return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, waitTime, false);
     }
 
-    public static <K, V> List<KeyValueTimestamp<K, V>> waitUntilFinalKeyValueTimestampRecordsReceived(final Properties consumerConfig,

Review comment:
       Those functions are not used anywhere, ditto below.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
##########
@@ -163,8 +159,10 @@ public void shouldApplyUpdatesToStandbyStore() throws Exception {
         // Assert that all messages in the second batch were processed in a timely manner
         assertThat(semaphore.tryAcquire(batch2NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
 
-        // Assert that the current value in store reflects all messages being processed
-        assertThat(newActiveStore.get(key), is(equalTo(totalNumMessages - 1)));
+        TestUtils.retryOnExceptionWithTimeout(100, 60 * 1000, () -> {

Review comment:
       This is a minor fix, that we should retry this condition.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
##########
@@ -227,10 +225,11 @@ private Properties streamsConfiguration() {
         config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
+        config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

Review comment:
       This is a fix to the test itself: with caching the records are delayed sending to the sink topics.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org