You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2022/02/01 22:52:52 UTC

[GitHub] [samza] jia-gao opened a new pull request #1581: Upgrade to Kafka 2.4 and fix flaky tests

jia-gao opened a new pull request #1581:
URL: https://github.com/apache/samza/pull/1581


   
   1. Upgrade Kafka version in dependency-versions.gradle
   
   2. 
   Issue:
   StreamApplicationIntegrationTestHarness is flaky because in the test 
   there is a Kafka consumer subscribes to input topic and consumes from it
   then, the same consumer subscribes to the changelog topic. 
   During this change, Kafka needs to revoke the previous assignments and do rebalancing which causes the changelog topic is not assigned to any member. 
   
   Change:  create a separate consumer of a different consumer group to consume changelog topic, so it can avoid Kafka consumer group rebalance
   
   3.
   Issue:
   Apache Kafka 2.4 introduces sticky partition strategy:https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/
   which causes TestZkLocalApplicationRunner tests flaky.
   
   The reason is that in some of the tests,
   Kafka events published to an input topic with 5 partitions with DefaultPartitioner. With the sticky partition strategy, some of the partitions could be empty. The tests bring up some stream processors to process them and wait for processors to consume messages.
   
   However, a stream processor doesn't have any messages to process if it consumes from partitions that have no messages
   In such a case, the test will be stuck and timeout. 
   
   Change: publish Kafka events to all partitions in a round-robin way to make sure all partitions have messages
   
   
   Tests:
   /gradlew clean build
   
   API Changes:
   None
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] jia-gao commented on a change in pull request #1581: Upgrade to Kafka 2.4 and fix flaky tests

Posted by GitBox <gi...@apache.org>.
jia-gao commented on a change in pull request #1581:
URL: https://github.com/apache/samza/pull/1581#discussion_r797207085



##########
File path: samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
##########
@@ -128,32 +129,32 @@ public int clusterSize() {
     return Integer.parseInt(KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR());
   }
 
-
   /**
    * Read messages from the provided list of topics until {@param threshold} messages have been read or until
    * {@link #numEmptyPolls} polls return no messages.
    *
    * The default poll time out is determined by {@link #POLL_TIMEOUT_MS} and the number of empty polls are
    * determined by {@link #numEmptyPolls}
    *
-   * @param topics the list of topics to consume from
+   * @param topic the topic to consume from
    * @param threshold the number of messages to consume
    * @return the list of {@link ConsumerRecord}s whose size can be atmost {@param threshold}
    */
-  public List<ConsumerRecord<String, String>> consumeMessages(Collection<String> topics, int threshold) {
+  public List<ConsumerRecord<String, String>> consumeMessages(String topic, int threshold) {
     int emptyPollCount = 0;
     List<ConsumerRecord<String, String>> recordList = new ArrayList<>();
-    consumer.subscribe(topics);

Review comment:
       There is only one consumer instance (as a member), and this was the only place where the consumer member was used. So, there is no other place to replace it and we simply don't use it anymore
   
   In this change, we don't use that consumer member instead, we keep a topicsConsumerMap to create a new consumer for every new topic. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1581: Upgrade to Kafka 2.4 and fix flaky tests

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1581:
URL: https://github.com/apache/samza/pull/1581#discussion_r797189794



##########
File path: samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
##########
@@ -128,32 +129,32 @@ public int clusterSize() {
     return Integer.parseInt(KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR());
   }
 
-
   /**
    * Read messages from the provided list of topics until {@param threshold} messages have been read or until
    * {@link #numEmptyPolls} polls return no messages.
    *
    * The default poll time out is determined by {@link #POLL_TIMEOUT_MS} and the number of empty polls are
    * determined by {@link #numEmptyPolls}
    *
-   * @param topics the list of topics to consume from
+   * @param topic the topic to consume from
    * @param threshold the number of messages to consume
    * @return the list of {@link ConsumerRecord}s whose size can be atmost {@param threshold}
    */
-  public List<ConsumerRecord<String, String>> consumeMessages(Collection<String> topics, int threshold) {
+  public List<ConsumerRecord<String, String>> consumeMessages(String topic, int threshold) {
     int emptyPollCount = 0;
     List<ConsumerRecord<String, String>> recordList = new ArrayList<>();
-    consumer.subscribe(topics);

Review comment:
       should we replace all instances of consumer with kafkaConsumer here? If not why are only replace this usage?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] jia-gao commented on a change in pull request #1581: Upgrade to Kafka 2.4 and fix flaky tests

Posted by GitBox <gi...@apache.org>.
jia-gao commented on a change in pull request #1581:
URL: https://github.com/apache/samza/pull/1581#discussion_r799071898



##########
File path: samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
##########
@@ -128,32 +129,32 @@ public int clusterSize() {
     return Integer.parseInt(KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR());
   }
 
-
   /**
    * Read messages from the provided list of topics until {@param threshold} messages have been read or until
    * {@link #numEmptyPolls} polls return no messages.
    *
    * The default poll time out is determined by {@link #POLL_TIMEOUT_MS} and the number of empty polls are
    * determined by {@link #numEmptyPolls}
    *
-   * @param topics the list of topics to consume from
+   * @param topic the topic to consume from
    * @param threshold the number of messages to consume
    * @return the list of {@link ConsumerRecord}s whose size can be atmost {@param threshold}
    */
-  public List<ConsumerRecord<String, String>> consumeMessages(Collection<String> topics, int threshold) {
+  public List<ConsumerRecord<String, String>> consumeMessages(String topic, int threshold) {
     int emptyPollCount = 0;
     List<ConsumerRecord<String, String>> recordList = new ArrayList<>();
-    consumer.subscribe(topics);

Review comment:
       1. we can't create all consumers in setup because we don't know the topics in advance. added a comment to note that the topicConsumerMap will not be adding all consumers in setup()
   2. yes, added a tearDown() to unsubscribe and close all consumers




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1581: Upgrade to Kafka 2.4 and fix flaky tests

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1581:
URL: https://github.com/apache/samza/pull/1581#discussion_r798899937



##########
File path: samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
##########
@@ -128,32 +129,32 @@ public int clusterSize() {
     return Integer.parseInt(KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR());
   }
 
-
   /**
    * Read messages from the provided list of topics until {@param threshold} messages have been read or until
    * {@link #numEmptyPolls} polls return no messages.
    *
    * The default poll time out is determined by {@link #POLL_TIMEOUT_MS} and the number of empty polls are
    * determined by {@link #numEmptyPolls}
    *
-   * @param topics the list of topics to consume from
+   * @param topic the topic to consume from
    * @param threshold the number of messages to consume
    * @return the list of {@link ConsumerRecord}s whose size can be atmost {@param threshold}
    */
-  public List<ConsumerRecord<String, String>> consumeMessages(Collection<String> topics, int threshold) {
+  public List<ConsumerRecord<String, String>> consumeMessages(String topic, int threshold) {
     int emptyPollCount = 0;
     List<ConsumerRecord<String, String>> recordList = new ArrayList<>();
-    consumer.subscribe(topics);

Review comment:
       Sorry should have been more clear, I was talking about the `consumer` here which is from `IntegrationTestHarness` which this class extends, which has multiple uses except for this one.
   
   Do we need that `consumer` to be `kafkaConsumer` map instead?
   I wanted to verify that this class does not have any conflict with the base class when using different consumers




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen merged pull request #1581: Upgrade to Kafka 2.4 and fix flaky tests

Posted by GitBox <gi...@apache.org>.
dxichen merged pull request #1581:
URL: https://github.com/apache/samza/pull/1581


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] jia-gao commented on a change in pull request #1581: Upgrade to Kafka 2.4 and fix flaky tests

Posted by GitBox <gi...@apache.org>.
jia-gao commented on a change in pull request #1581:
URL: https://github.com/apache/samza/pull/1581#discussion_r798988796



##########
File path: samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
##########
@@ -128,32 +129,32 @@ public int clusterSize() {
     return Integer.parseInt(KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR());
   }
 
-
   /**
    * Read messages from the provided list of topics until {@param threshold} messages have been read or until
    * {@link #numEmptyPolls} polls return no messages.
    *
    * The default poll time out is determined by {@link #POLL_TIMEOUT_MS} and the number of empty polls are
    * determined by {@link #numEmptyPolls}
    *
-   * @param topics the list of topics to consume from
+   * @param topic the topic to consume from
    * @param threshold the number of messages to consume
    * @return the list of {@link ConsumerRecord}s whose size can be atmost {@param threshold}
    */
-  public List<ConsumerRecord<String, String>> consumeMessages(Collection<String> topics, int threshold) {
+  public List<ConsumerRecord<String, String>> consumeMessages(String topic, int threshold) {
     int emptyPollCount = 0;
     List<ConsumerRecord<String, String>> recordList = new ArrayList<>();
-    consumer.subscribe(topics);

Review comment:
       In base class IntegrationTestHarness, the only usage of the consumer is in setUp() and tearDown(), which create and close the consumer. For tests that extend StreamApplicationIntegrationTestHarness the only usage of the consumer is here. This means if we don't use the consumer here, there would be no other places for them to access the consumer.
   
   I don't think it is good to use the kafkaConsumer map for the base class since it could be extended by other classes and they don't have this consumeMessage() method that breaks tests. I prefer to only apply the fix to where it is needed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on a change in pull request #1581: Upgrade to Kafka 2.4 and fix flaky tests

Posted by GitBox <gi...@apache.org>.
dxichen commented on a change in pull request #1581:
URL: https://github.com/apache/samza/pull/1581#discussion_r798996658



##########
File path: samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
##########
@@ -128,32 +129,32 @@ public int clusterSize() {
     return Integer.parseInt(KafkaConfig.TOPIC_DEFAULT_REPLICATION_FACTOR());
   }
 
-
   /**
    * Read messages from the provided list of topics until {@param threshold} messages have been read or until
    * {@link #numEmptyPolls} polls return no messages.
    *
    * The default poll time out is determined by {@link #POLL_TIMEOUT_MS} and the number of empty polls are
    * determined by {@link #numEmptyPolls}
    *
-   * @param topics the list of topics to consume from
+   * @param topic the topic to consume from
    * @param threshold the number of messages to consume
    * @return the list of {@link ConsumerRecord}s whose size can be atmost {@param threshold}
    */
-  public List<ConsumerRecord<String, String>> consumeMessages(Collection<String> topics, int threshold) {
+  public List<ConsumerRecord<String, String>> consumeMessages(String topic, int threshold) {
     int emptyPollCount = 0;
     List<ConsumerRecord<String, String>> recordList = new ArrayList<>();
-    consumer.subscribe(topics);

Review comment:
       If the only use is setup or teardown, lets make sure we follow the lifecycle by:
   1. Adding the consumer construction in setup() instead of in consume() and 
   2. Adding consumer.stop() for each (currently missing)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org