You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/04/28 21:17:40 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #8568: KAFKA-9176: Retry on getting local stores from KafkaStreams

vvcephei commented on a change in pull request #8568:
URL: https://github.com/apache/kafka/pull/8568#discussion_r416921971



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##########
@@ -810,21 +808,9 @@ private void writeInputData(final List<KeyValue<Long, Long>> records) throws Exc
     }
 
     private void verifyStateStore(final KafkaStreams streams,
-                                  final Set<KeyValue<Long, Long>> expectedStoreContent) {
-        ReadOnlyKeyValueStore<Long, Long> store = null;
-
-        final long maxWaitingTime = System.currentTimeMillis() + 300000L;
-        while (System.currentTimeMillis() < maxWaitingTime) {
-            try {
-                store = streams.store(StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore()));
-                break;
-            } catch (final InvalidStateStoreException okJustRetry) {
-                try {
-                    Thread.sleep(5000L);
-                } catch (final Exception ignore) { }
-            }
-        }
-
+                                  final Set<KeyValue<Long, Long>> expectedStoreContent) throws InterruptedException {
+        final ReadOnlyKeyValueStore<Long, Long> store = IntegrationTestUtils
+            .getStore(300000L, storeName, streams, QueryableStoreTypes.keyValueStore());

Review comment:
       ```suggestion
               .getStore(300_000L, storeName, streams, QueryableStoreTypes.keyValueStore());
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
##########
@@ -337,8 +336,11 @@ public void shouldProxyKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws
         TestUtils.waitForCondition(
             () -> {
                 try {
-                    final ReadOnlyKeyValueStore<K, V> store =
-                        kafkaStreams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
+                    final ReadOnlyKeyValueStore<K, V> store = IntegrationTestUtils.getStore(STORE_NAME, kafkaStreams, QueryableStoreTypes.keyValueStore());
+
+                    if (store == null)
+                        return false;

Review comment:
       not a huge deal, but technically, these should have brackets.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##########
@@ -599,13 +595,6 @@ public static void waitForCompletion(final KafkaStreams streams,
         return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, waitTime, false);
     }
 
-    public static <K, V> List<KeyValueTimestamp<K, V>> waitUntilFinalKeyValueTimestampRecordsReceived(final Properties consumerConfig,

Review comment:
       thanks for the cleanup

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##########
@@ -269,24 +271,20 @@ public static void cleanStateAfterTest(final EmbeddedKafkaCluster cluster, final
      * @param <K>                 Key type of the data records
      * @param <V>                 Value type of the data records
      */
-    @SuppressWarnings("WeakerAccess")
     public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final String topic,
                                                                          final Collection<KeyValue<K, V>> records,
                                                                          final Properties producerConfig,
                                                                          final Headers headers,
                                                                          final Long timestamp,
-                                                                         final boolean enableTransactions)
-            throws ExecutionException, InterruptedException {
+                                                                         final boolean enableTransactions) {
 
         try (final Producer<K, V> producer = new KafkaProducer<>(producerConfig)) {
             if (enableTransactions) {
                 producer.initTransactions();
                 producer.beginTransaction();
             }
             for (final KeyValue<K, V> record : records) {
-                final Future<RecordMetadata> f = producer.send(
-                    new ProducerRecord<>(topic, null, timestamp, record.key, record.value, headers));
-                f.get();
+                producer.send(new ProducerRecord<>(topic, null, timestamp, record.key, record.value, headers));

Review comment:
       I guess the flush at the end makes it synchronous anyway?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org