You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2022/02/03 02:37:19 UTC

[GitHub] [samza] jia-gao commented on a change in pull request #1581: Upgrade to Kafka 2.4 and fix flaky tests

jia-gao commented on a change in pull request #1581:
URL: https://github.com/apache/samza/pull/1581#discussion_r797207085



##########
File path: samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
##########
@@ -128,32 +129,32 @@ public int clusterSize() {
     return Integer.parseInt(KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR());
   }
 
-
   /**
    * Read messages from the provided list of topics until {@param threshold} messages have been read or until
    * {@link #numEmptyPolls} polls return no messages.
    *
    * The default poll time out is determined by {@link #POLL_TIMEOUT_MS} and the number of empty polls are
    * determined by {@link #numEmptyPolls}
    *
-   * @param topics the list of topics to consume from
+   * @param topic the topic to consume from
    * @param threshold the number of messages to consume
    * @return the list of {@link ConsumerRecord}s whose size can be atmost {@param threshold}
    */
-  public List<ConsumerRecord<String, String>> consumeMessages(Collection<String> topics, int threshold) {
+  public List<ConsumerRecord<String, String>> consumeMessages(String topic, int threshold) {
     int emptyPollCount = 0;
     List<ConsumerRecord<String, String>> recordList = new ArrayList<>();
-    consumer.subscribe(topics);

Review comment:
       There is only one consumer instance (as a member), and this was the only place where the consumer member was used. So, there is no other place to replace it and we simply don't use it anymore
   
   In this change, we don't use that consumer member instead, we keep a topicsConsumerMap to create a new consumer for every new topic. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org